An end-to-end industrial cybersecurity telemetry pipeline that ingests, parses, analyzes, and archives security event logs generated from a simulated Operational Technology (OT) network segmented according to the Purdue Reference Model.
The system collects raw network traffic drops and Intrusion Detection System (IDS) alerts from the OT Gateway, processes them serverlessly in real time, writes stateful detection records to DynamoDB, generates structured Incident Response reports, and stages long-term analytical data in columnar Parquet format.
graph TD
subgraph "OT Security Lab (Docker)"
A[OT Attacker - 172.24.0.10] -- Unauthorized Modbus/TCP --> B("OT Gateway - 172.24.0.2 / 172.21.0.2")
C[PLCs / HMI - 172.21.0.10] -. "Blocked Zone Traffic" .-> B
B -- Writes Logs --> D[alerts.json & iptables.log]
E[Fluent Bit Daemon] -- Tails & Gzips logs --> F("LocalStack S3 Raw Bucket")
end
subgraph "Serverless Telemetry Lake (AWS LocalStack)"
F -- S3 Put Event Notification --> G[SQS Ingest Queue]
G -- Trigger --> H[Lambda: ot-log-parser]
H -- Write Detections --> I[(DynamoDB ot_detections)]
H -- Write Staged Parquet --> J("S3 Staged Bucket")
I -- DynamoDB Streams --> K[Lambda: ot-incident-aggregator]
K -- Write IR Report --> L("S3 Reports Bucket")
K -- Publish Alert --> M[SNS: ot-incident-alerts]
M -- Deliver --> N[SQS: ot-incident-notifications]
end
style A fill:#f9f,stroke:#333,stroke-width:2px
style B fill:#bbf,stroke:#333,stroke-width:2px
style E fill:#f96,stroke:#333,stroke-width:2px
style H fill:#ff9,stroke:#333,stroke-width:2px
style K fill:#ff9,stroke:#333,stroke-width:2px
style I fill:#dfd,stroke:#333,stroke-width:2px
style L fill:#dfd,stroke:#333,stroke-width:2px
- Purdue Model Network Simulation: Multi-zone Docker environment covering Level 4/5 (IT/Attacker), Level 3 (Operations Gateway), and Level 1/2 (Control Zone with Modbus PLCs).
- Custom IDS Rule Engine: Gateway runs Scapy-based Python listeners detecting:
CROSS_ZONE_VIOLATION(MITRE T0886): Direct communication attempts bypassing Level 3 boundary rules.UNAUTHORIZED_MODBUS_WRITE(MITRE T0831): Unauthorized Modbus register writes from untrusted sources.OT_BRUTE_FORCE_SCAN(MITRE T0846): Excessive Modbus exceptions indicating scanner or brute-force activity.
- Firewall Log Capture: Mock packet sniffer emulates kernel iptables drop entries for cross-zone policy blocks.
- Structured Log Shipping: Fluent Bit collects, compresses, and ships log streams to S3 with Hive-partitioned keys (
source=,date=). - Serverless Ingestion Pipeline: S3 events → SQS → Lambda parses and routes logs with dynamic gzip detection.
- Stateful Security Correlation: Detection records written to DynamoDB with GSIs for severity and host-based querying.
- Automated Incident Response: DynamoDB Streams trigger the aggregator Lambda to produce NIST SP 800-61 IR reports and publish SOC alerts via SNS.
- Analytical Storage: Parsed logs staged as snappy-compressed Apache Parquet, partitioned by source and date.
This project is deployed against LocalStack Community Edition as a local AWS emulation environment. Three platform constraints required explicit engineering solutions:
- Fat-Zip Dependency Packaging: LocalStack Community does not mount Lambda Layers to
/optat runtime. Dependencies (pandas,awswrangler, etc.) are bundled directly inside the deployment archive underpython/and injected intosys.pathat cold-start time. - S3-Based Lambda Deployment: The fat-zip archive exceeds the 50 MB AWS direct API upload limit. Terraform stages the zip to S3 first (
aws_s3_object) and provisions the function by S3 reference (s3_bucket/s3_key). - Dynamic Gzip Detection: Fluent Bit uploads gzip-compressed payloads with a
.jsonkey extension. The Lambda handler inspects the first two magic bytes (\x1f\x8b) at runtime and renames the file with a.gzsuffix before passing it to the parser, enabling transparent decompression.
# Start LocalStack Community
docker run -d --name localstack -p 4566:4566 localstack/localstack
# Deploy all AWS resources via Terraform
cd terraform
terraform init
terraform apply -auto-approvecd ../ot-security-lab/lab-environment
docker compose up -d
# Apply zone firewall rules to the gateway
docker cp network-config/firewall-rules.sh ot_gateway:/firewall-rules.sh
docker exec ot_gateway chmod +x /firewall-rules.sh
docker exec ot_gateway /firewall-rules.shdocker exec -d ot_gateway python3 /scripts/sniffer.py
docker exec -d ot_gateway python3 /detection/rules/cross_zone_traffic.py
docker exec -d ot_gateway python3 /detection/rules/modbus_anomaly.py
docker exec -d ot_gateway python3 /detection/rules/ot_brute_force.py
docker exec -d ot_gateway python3 /detection/rules/process_safety_violation.pydocker exec -it ot_attacker python3 /attacker/simulate_attack.pyUploads a batch of Suricata EVE-format alerts simulating a Malcolm NDR sensor feed:
python3 src/malcolm_fixture/upload_malcolm_alerts.py --endpoint-url http://localhost:4566In a production AWS environment, the structured telemetry and incident reports are queried using Amazon Athena. The database schemas are declared via Terraform in terraform/analytics.tf:
- Database:
ot_telemetry - Tables:
staged_telemetry: An external snappy-parquet table mapped tos3://ot-telemetry-staged-<account_id>/parsed/partitioned bydate(string) andsource(string).incident_reports: An external JSON table mapped tos3://ot-telemetry-reports-<account_id>/incidents/parsing incident reports structure.
Note: Since Glue and Athena are premium APIs not implemented in the free LocalStack Community Edition, these resources are disabled by default in local deployments via the feature flag enable_analytics = false. Set this flag to true when deploying to a real AWS account.
Three production-ready analytical queries are pre-defined in the repository:
- High-Severity OT Anomalies Over Time: Groups and aggregates high-severity alerts (
CROSS_ZONE_VIOLATION,UNAUTHORIZED_MODBUS_WRITE,OT_BRUTE_FORCE_SCAN) by date and source. - Top Noisy Hosts: Identifies the most active logging sources across the network.
- SCADA TTP Patterns: Parses nested Malcolm NDR JSON alerts, matching specific industrial attack signatures (e.g. Modbus function code violations, S7comm lateral movement).
To run these queries locally against the LocalStack emulation environment, a query client utility is provided. It uses pandas and awswrangler to run the identical SQL aggregates directly against the S3 staged parquet files:
AWS_ACCESS_KEY_ID=mock AWS_SECRET_ACCESS_KEY=mock AWS_DEFAULT_REGION=us-east-1 \
AWS_ENDPOINT_URL=http://localhost:4566 \
.venv/bin/python src/analytics/query_lake.pyExpected output:
--- Security Data Lake Analytics CLI ---
Reading telemetry from s3://ot-telemetry-staged-000000000000/parsed/ (Endpoint: http://localhost:4566)...
Successfully loaded 24 telemetry logs.
=== QUERY 1: High-Severity OT Anomalies Over Time ===
date source anomaly_count
2026-05-20 ot_sensor 11
=== QUERY 2: Top Noisy Hosts ===
host_identifier log_count
pop-os 11
ot-gateway 9
malcolm-ndr 4
=== QUERY 3: SCADA TTP Patterns (Malcolm NDR) ===
timestamp src_ip dest_ip scada_signature scada_category
2026-05-20T19:27:26.798635+0000 172.24.0.10 172.21.0.10 ET SCADA Modbus Unauthorized Function Code from External Host SCADA/ICS Attack
2026-05-20T19:27:26.798635+0000 172.24.0.10 172.21.0.10 ET SCADA Modbus Write Single Register Attempt (FC 6) SCADA/ICS Attack
2026-05-20T19:27:26.798635+0000 172.24.0.10 172.22.0.10 ET SCADA S7comm PLC Read/Write Coils – Lateral Movement SCADA/ICS Lateral Movement
2026-05-20T19:27:26.798635+0000 172.24.0.10 172.21.0.10 ET SCADA Modbus Exception Response Flood – Brute Force/Scan SCADA/ICS Reconnaissance
Once the simulation completes, Fluent Bit ships logs to S3, the parser Lambda processes them, and the aggregator Lambda generates an IR report. All steps can be verified via the AWS CLI against LocalStack.
AWS_ACCESS_KEY_ID=mock AWS_SECRET_ACCESS_KEY=mock AWS_DEFAULT_REGION=us-east-1 \
aws --endpoint-url=http://localhost:4566 dynamodb scan --table-name ot_detectionsExpected incident_correlator values: CROSS_ZONE_VIOLATION#ot_sensor, UNAUTHORIZED_MODBUS_WRITE#ot_sensor, OT_BRUTE_FORCE_SCAN#ot_sensor.
AWS_ACCESS_KEY_ID=mock AWS_SECRET_ACCESS_KEY=mock AWS_DEFAULT_REGION=us-east-1 \
aws --endpoint-url=http://localhost:4566 s3 ls s3://ot-telemetry-staged-000000000000 --recursiveExpected partitions: parsed/date=<DATE>/source=iptables/, source=ot_sensor/, source=malcolm/.
AWS_ACCESS_KEY_ID=mock AWS_SECRET_ACCESS_KEY=mock AWS_DEFAULT_REGION=us-east-1 \
aws --endpoint-url=http://localhost:4566 s3 ls s3://ot-telemetry-reports-000000000000 --recursiveExpected: incidents/<uuid>.json — one report per detection batch, containing incident_id, severity, mitre_techniques, affected_assets, timeline, and recommended_actions.
AWS_ACCESS_KEY_ID=mock AWS_SECRET_ACCESS_KEY=mock AWS_DEFAULT_REGION=us-east-1 \
aws --endpoint-url=http://localhost:4566 sqs receive-message \
--queue-url http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/ot-incident-notifications \
--max-number-of-messages 3Expected subject format: [<SEVERITY>] OT Incident <ID> – <N> Detection(s) | Techniques: T0831, T0846, T0886.