Skip to content

Niteesh57/crowdshield-ai

Repository files navigation

CrowdShield AI: Technical Architecture and System Design Write-Up


Setup & Deployment Instructions (Step-by-Step)

Step 1: Install and Configure Splunk Enterprise

  1. Download and install Splunk Enterprise on your machine.
  2. Open the Splunk Enterprise Web UI (typically http://localhost:8000).
  3. Enable the data receiver port so that the Splunk Universal Forwarder can stream logs to it:
    • Go to Settings -> Forwarding and receiving.
    • Click Add new under Configure receiving.
    • Enter Port 9997 and save.

Step 2: Install Splunk Universal Forwarder & Sysmon

  1. Download and install the Splunk Universal Forwarder. During installation, specify your Splunk Enterprise instance host (e.g., 127.0.0.1) and the receiving port (9997).
  2. Run the following command sequence in an Administrator PowerShell console to deploy Sysmon, configure the log forwarder inputs, and start services (replace <PATH_TO_YOUR_PROJECT_ROOT> with your actual repository path):
    Start-Process powershell -Verb runAs -ArgumentList "-NoExit", "-Command", "cd <PATH_TO_YOUR_PROJECT_ROOT>/splunk_sysmon_config; Write-Host 'Updating Sysmon configuration...'; .\sysmon64.exe -c sysmonconfig.xml; Write-Host 'Copying Splunk config...'; Copy-Item .\inputs.conf -Destination 'C:\Program Files\SplunkUniversalForwarder\etc\system\local\inputs.conf' -Force; Write-Host 'Restarting Splunk Forwarder...'; & 'C:\Program Files\SplunkUniversalForwarder\bin\splunk.exe' restart; Write-Host 'Done!'"
    This command automates:
    • Loading Sysmon telemetry filters using sysmon64.exe -c sysmonconfig.xml.
    • Copying the custom inputs.conf forwarder configuration mapping to the Universal Forwarder config folder.
    • Restarting the forwarder service to register the Sysmon operational channel logs.

Step 3: Verify Sysmon Events Ingestion in Splunk

  1. Go to your Splunk Enterprise Web UI search page.

  2. Run a query to confirm that Sysmon logs are being received:

    index=main source="XmlWinEventLog:Microsoft-Windows-Sysmon/Operational"
    
  3. Verify that the search returns Sysmon events correctly, indicating the forwarder configuration is active:

    Splunk Event Verification

Step 4: Run Docker Compose for Kafka

Start the Kafka broker and dependencies to enable real-time alert streaming:

  1. Ensure Docker and Docker Compose are installed and running.
  2. Navigate to the project root directory and run:
    docker compose up -d
    This spins up the Apache Kafka broker and Zookeeper instances in the background.

Step 5: Configure Splunk Model Context Protocol (MCP) and Token

To allow the AI agents to run historical investigations on Splunk via MCP:

  1. Install the Splunk Model Context Protocol (MCP) app inside Splunk Enterprise.

  2. In the Splunk Web UI, navigate to the MCP app configuration page.

  3. Create a new access/bearer token to establish a connection:

    Splunk MCP Token Configuration

  4. Copy the environment variables example file:

    Copy-Item .env.example .env
  5. Open the .env file and populate the following keys:

    • SPLUNK_MCP_URL: The URL to the Splunk MCP endpoint (typically https://localhost:8089/services/mcp).
    • SPLUNK_MCP_TOKEN: Paste your copied bearer token here.
    • GROQ_API_KEY: Obtain this from the Groq Console.
    • CROWDSHIELD_LLM: The LLM model name. We recommend:
      • openai/gpt-oss-20b (faster, 21B MoE model optimized for speed and log analysis)
      • openai/gpt-oss-120b (reasoning-heavy model for complex root-cause analysis)
      • Fallback: llama-3.3-70b-versatile or gpt-4o-mini.

Step 6: Start the Application

Once the telemetry agents, environment variables, Splunk, and Kafka services are verified and active, run the entry script from the project root directory:

python main.py

1. Executive Summary & Criticality Analysis

The Modern Threat Vector: Supply-Chain & Trust-Source Hijacking

In modern software engineering, developers rely heavily on third-party dependencies (e.g., npm, PyPI, Maven) and browser extensions (e.g., Chrome, VS Code plugins). Attackers have increasingly shifted focus from perimeter compromise to supply-chain attacks. By compromising a low-profile dependency (such as the historical cases of node-ipc or event-source-polyfill) or injecting malicious code into browser extensions, adversaries gain immediate code-execution access to developers' machines.

Once executed, these malicious modules typically seek to harvest high-value credentials, specifically:

  • Environment variables (.env, .env.local, .env.production)
  • Cloud platform keys (aws_credentials, Google Cloud service accounts)
  • Repository auth tokens (.npmrc, .pypirc, .git/config)
  • Security/access keys (SSH private keys, crypto wallets)

The Proxy Blind Spot

Traditional enterprise network defenses—primarily corporate proxies—are entirely blind to these attacks. Because the network requests originate from inherently trusted applications (such as node.exe, chrome.exe, or Code.exe (VS Code)), the proxy logs see standard TLS traffic from approved processes to external domains. The proxy has no context on why a trusted node process is making an outbound HTTPS request immediately after reading a local .env file containing database passwords.

CrowdShield AI solves this critical blind spot by combining host-level behavior monitoring, SIEM log aggregation, real-time message queuing, and an AI-driven agentic orchestration pipeline.


2. Technical Architecture & Flow Diagram

The data flow within the CrowdShield AI platform spans from host telemetry collection up to autonomous threat mitigation and security analyst observability.

System Flow Diagram

System Flow Diagram

Learn more here architecture_diagram.md (system flow and low-level sequence diagrams).

3. How We Built It: Ingestion Pipeline & Dashboard Ticker

CrowdShield AI was built to solve the latency, safety, and visualization challenges of endpoint threat detection through two core pillars: an asynchronous event-driven Ingestion Pipeline and a real-time responsive Dashboard Ticker.

A. The Ingestion Pipeline (producer_pipeline.py)

The ingestion engine is designed as a robust background pipeline that continuously processes threat telemetry:

  1. Low-Latency Polling: The pipeline polls the Splunk Enterprise indexer at short intervals (optimized from 15s to 5s to reduce end-to-end telemetry propagation delay). It queries events using epoch-based watermarking to fetch only new Sysmon events.
  2. Normalized Processing: Telemetry logs are structured and fed into a PidCorrelationBuffer that holds a sliding 300-second window. This window aggregates events by Process ID (PID) to trace multi-step exfiltration behaviors (e.g. process starts, reads .env, makes a network connection).
  3. Parallel Scoring Engines: To balance LLM analysis cost/latency and rule-based reliability:
    • Deterministic Rule Engine: Runs regex-based checks for known attacks.
    • GenAI Scorer (Groq): Analyzes obfuscated or suspicious event payloads for anomalous behaviors.
  4. Latency-Optimized Bypass: If the Rule Engine detects a critical match (Risk >= 70), the system immediately bypasses the LLM call entirely. This optimization shaves 3–15 seconds off the detection latency for high-confidence threats, streaming the alert to Kafka instantly.
  5. Real-time Kafka Publishing: Validated threat scores and event sequences are categorized (High, Medium, Low) and published immediately to standard Apache Kafka topics (high-priority or medium-priority).

B. The Live Dashboard Ticker (app/controllers/app_controller.py)

To visualize threats and agent activity, the frontend utilizes a CustomTkinter dashboard driven by a thread-safe controller:

  1. Non-Blocking Kafka Consumer Loop: A KafkaConsumer runs on a dedicated background polling thread. When a new alert message arrives, it maps the JSON data structure and pushes it to the UI queue without freezing the user interface.
  2. Real-Time Alert Ticker: The Main Window displays a rolling threat feed ticker that populates live security events as they are consumed from Kafka.
  3. Dynamic Agent Flow Canvas: When an alert is processed, the LangGraph agent execution is visualized on an interactive canvas:
    • Pulsing Halo Glow: The active agent node glows dynamically using a timed loop.
    • Particle Flow Animations: Visual particle bullets travel along the connection lines to trace active data pathways.
    • Non-Blocking Animations: All rendering updates use non-blocking staggered delays (using .after()) rather than blocking sleep() commands, ensuring that the GUI remains highly responsive.
  4. Actionable OS HUD Alerts: High-severity events spawn topmost borderless desktop windows that present immediate operator actions ("Kill Process" or "Analyze Details").

4. Detailed Technological Implementation & Software Quality

The CrowdShield AI project demonstrates high-quality software engineering practices, utilizing decoupled architecture, asynchronous execution, and hybrid heuristic-AI orchestration.

A. Architectural Decomposition (MVC Pattern)

The codebase adheres strictly to the Model-View-Controller (MVC) architectural pattern, separating graphical interface logic, data management, and orchestration:

  • Model (app/models/threat_analyzer.py): Manages external data integrations. It pulls new Sysmon events from Splunk, maintains the latest event indexing watermark (last_splunk_epoch), handles pagination, and chunks events into analysis batches.
  • View (app/views/main_window.py): Renders all visual CustomTkinter frames. It manages UI state, coordinates canvas rendering (the agent flow graph), streams logs, and displays desktop notification alerts.
  • Controller (app/controllers/app_controller.py): Serves as the central bridge. It spins up the Kafka consumer background thread, parses priority topic queues, maps incoming JSON fields, triggers the graphical flow transitions, and dispatches OS-level mitigation actions (e.g., process termination).

B. Multi-Threaded, Non-Blocking Execution

To prevent UI thread lockups and visual freezes—a common issue in Tkinter desktop applications—all blocking operations run in dedicated background threads:

  • Splunk/Kafka Consumer Polling: Runs in a separate background thread (poll_events_loop).
  • Pipeline Logging Subprocess: Subprocess standard output reader runs in its own thread (_read_pipeline_output).
  • AI Analyst Observability Chats: The LangGraph pipeline and Groq LLM queries execute asynchronously inside a background worker (_run_observe_query).
  • Thread-Safe UI Updates: Visual callbacks use Tkinter's .after(0, callback) handler to ensure all widgets are only manipulated from the primary main-loop thread.

C. Heuristic-AI Hybrid Orchestration

Relying entirely on LLMs for EDR detection incurs high latency (2–4 seconds per evaluation) and token costs. CrowdShield AI implements a hybrid heuristic-AI model:

  1. Rule Pre-Filter Engine (producer_pipeline.py): Executes deterministic, regex-based heuristic checks over incoming telemetry (e.g., checking if Node or Chrome processes read secret files). If matched, it assigns a hardcoded risk value.
  2. LLM Score Escalation: A Groq-powered Detection Agent acts as a fallback to evaluate more subtle, complex, or obfuscated command sequences (e.g., base64 encoding).
  3. Maximization Resolution: The system selects the maximum risk score of the heuristic rules and the LLM analysis. This guarantees sub-millisecond detection times for known threat patterns while retaining the cognitive flexibility of GenAI for novel bypass attempts.

D. Resilience & Fallback Design

The codebase features multiple layers of fault tolerance:

  • SIEM Failover (Splunk MCP Integration): The investigation suite natively queries the integrated Splunk Model Context Protocol (MCP) server over standard JSON-RPC 2.0 headers. It hits the /services/mcp endpoint using a Bearer token (SPLUNK_MCP_TOKEN). If the MCP server returns an error or is unreachable, the tool falls back to an SDK-based connection. If the SDK connection fails, it defaults to a local mock log file database (events_log.json).
  • LLM Structured Parser Safe-Guards: The agents use LangChain's Pydantic structured output. If parsing fails (e.g., the model outputs extra Markdown explanation text), the engine falls back to a prompt that forces raw JSON string output and extracts it using robust regular expressions.

5. User Experience (UX) & Design Excellence

The system design reflects a highly polished, aesthetic, and functional desktop layout tailored for Security Operations Center (SOC) environments:

A. Curated Dark Theme Design Tokens

The UI departs from standard gray Tkinter layouts, employing a dark theme matching modern security dashboards:

  • BG_DARK (#0d0d1a): Deep navy background.
  • BG_CARD (#13131f): Contrasting card frames.
  • ACCENT_BLUE (#00b4ff), ACCENT_PURP (#9b51e0), ACCENT_RED (#ff3c5c): Semantic color tokens for Detection, Investigation, and Mitigation alerts.
  • Modern monospace typography (Consolas) utilized across log streams and terminal components.

B. Dynamic Agentic Flow Canvas

Instead of static status indicators, the Agentic Flow page features a dynamic Tkinter Canvas:

  • Draws structured nodes corresponding to active agents.
  • Triggers a pulsing halo glow effect around the active agent node using an asynchronous time-tick loop (update_glow).
  • Animates visual particle bullets flowing along the execution pathways when transitioning between nodes, providing real-time visual feedback on agent orchestration.

C. Non-Intrusive, Actionable Notifications

When high-priority alerts are captured by the Kafka consumer, the controller spawns a custom DesktopNotification window (ctk.CTkToplevel):

  • Dynamically positions itself at the bottom-right corner of the monitor.
  • Overrides OS frame borders (overrideredirect(True)) and forces topmost rendering.
  • Features semantic hazard colors based on priority (Red for High, Yellow for Medium).
  • Provides immediate actions: operators can directly trigger process termination ("Kill") or click "Analyse" to open the AI Observe chat panel.

D. AI Analyst Conversational Observability

The AI Observe tab pairs historical event tracking with conversational intelligence:

  • Provides one-click suggestion chips ("Why did this happen?", "Is this a false positive?") to accelerate investigator workflow.
  • Features a custom inline Markdown Bold text parser that styles key technical terms inside the chat log window dynamically.
  • Automatically prompts the LLM to output detailed timeline flow diagrams using ASCII block symbols.

6. Potential Impact

The implementation of CrowdShield AI holds massive potential in enterprise security management:

Dimension Description Enterprise Value
DevSecOps Security Secures the developer's laptop, which is historically the hardest boundary to monitor because of constant package installations and local scripts. Prevents repository source-code cloning and intellectual property theft.
Data Leak Prevention Intercepts exfiltration activity at the millisecond stage, blocking socket requests at the firewall level. Mitigates financial risks of leaked API keys, production database credentials, and AWS billing hijacking.
SOC Efficiency Replaces manual Splunk search queries and threat correlation with automated, multi-agent LangGraph analysis. Reduces Average Time to Resolve (MTTR) from hours to seconds.
Technology Integration Built on top of enterprise-standard tools (Windows Defender Firewall, Sysmon, Splunk, Apache Kafka). Can be integrated into existing corporate security architectures without replacing active agents.

7. Quality of the Idea (Creativity & Uniqueness)

CrowdShield AI introduces unique approaches to endpoint detection and response:

A. Temporal Attack Correlation (PidCorrelationBuffer)

Attackers often split exfiltration chains across time intervals to bypass simple EDR rules. By maintaining a 5-minute rolling memory buffer keyed on process ID, CrowdShield AI ensures that a process reading a file at t = 0s and making a network connection at t = 40s is caught and correlated immediately, regardless of polling frequency.

B. Generative AI as an Active SOC Agent

While most EDR platforms use AI simply for post-incident reporting, CrowdShield AI embeds LLMs directly into the active defense loop:

  • LangGraph State Machine: Orchestrates autonomous investigation, querying Splunk directly to check if a process has historical anomalies.
  • Autonomous Remediation: Translates security decisions directly into local OS actions (Windows Firewall rule creation, process termination), bridging the gap between detection and response.

8. Low-Level Pipeline Telemetry & Message Flow (Inch-by-Inch Specification)

To detail the operational mechanics of the platform, this section defines the low-level processing architecture. It maps telemetry generation, ingestion polling, correlation scoring, Kafka queuing, and agent consumption.

A. Telemetry & Ingestion Pipeline Schema

sequenceDiagram
    autonumber
    participant Host OS as Endpoint Host OS
    participant Sysmon as Sysmon Agent (sysmonconfig.xml)
    participant Forwarder as Splunk Forwarder (inputs.conf)
    participant Splunk as Splunk Enterprise (main index)
    participant Pipeline as Ingestion Pipeline (producer_pipeline.py)
    participant Scorer as Priority Scorer Agent (LLM + Rule Engine)
    participant Kafka as Kafka Broker (Port 9092)
    participant Agent as LangGraph Orchestrator (agent_process.py)
    participant SplunkMCP as Splunk MCP Server (services/mcp)

    Note over Host OS, Sysmon: Phase 1: Telemetry Generation
    Host OS->>Sysmon: Process actions (File read/delete, Socket connections)
    Sysmon->>Sysmon: Matches XML configuration selectors
    Sysmon->>Forwarder: Writes XML WinEventLog records

    Note over Forwarder, Splunk: Phase 2: Ingestion & Indexing
    Forwarder->>Splunk: Streams events (TCP/9997 SSL connection)
    Splunk->>Splunk: Indexes logs to main database with UTC timestamp

    Note over Splunk, Pipeline: Phase 3: Ingestion Polling Loop
    loop Every 5 Seconds
        Pipeline->>Splunk: Polls query (earliest={last_epoch_watermark})
        Splunk-->>Pipeline: Returns normalized raw logs array
    end

    Note over Pipeline, Scorer: Phase 4: Correlation & Scoring
    Pipeline->>Pipeline: Buffers logs inside PidCorrelationBuffer (300s window)
    Pipeline->>Scorer: Sends correlated PID sequence
    Scorer->>Scorer: Executes Heuristic Rule Engine (pre-filter)
    Scorer->>Scorer: Invokes LLM Priority Scorer (Groq API)
    Scorer-->>Pipeline: Resolves maximum threat score & priority category

    Note over Pipeline, Kafka: Phase 5: Streaming Priority Queue
    alt Risk >= 70%
        Pipeline->>Kafka: Publishes payload to 'high-priority' topic
    else Risk >= 40%
        Pipeline->>Kafka: Publishes payload to 'medium-priority' topic
    else Risk < 40%
        Pipeline->>Pipeline: Discards / Logs locally as low priority
    end

    Note over Kafka, Agent: Phase 6: Agentic Response Loop
    Kafka->>Agent: Consumes priority topics message stream
    Agent->>Agent: Instantiates LangGraph state sequence
    
    Note over Agent, SplunkMCP: Phase 7: Model Context Protocol (MCP) Investigation
    Agent->>SplunkMCP: POST tools/call JSON-RPC 2.0 (splunk_run_query)
    SplunkMCP-->>Agent: Returns structuredContent logs array
    
    Agent->>Agent: Invokes Response Agent & Mitigation Tools (Firewall / Kill)
Loading

B. Detailed Step-by-Step Technical Execution

Step 1: Endpoint Event Triggering & Logging

  • Mechanism: The operating system kernel schedules process activity. A process (e.g., node.exe or chrome.exe) issues system calls to access file handles (NtReadFile) or allocate TCP sockets (NtConnectPort).
  • Telemetry: The Sysmon Kernel Driver interceptor records the details. Based on sysmonconfig.xml inclusion rules, it formats the call parameters into a Windows Event Log record (Event ID 1 for ProcessCreate, 3 for NetworkConnect, 11 for FileCreate, 23 for FileDelete) and saves it in XML format within the Microsoft-Windows-Sysmon/Operational event channel.

Step 2: Forwarding & Data Ingestion

  • Mechanism: The Splunk Universal Forwarder service monitors the Sysmon channel path. It reads new XML logs, bundles them, and streams them over a TLS-encrypted connection (typically on port 9997) to the Splunk Enterprise indexer.
  • Indexing: Splunk indexes the raw log stream into the main index under the WinEventLog:Microsoft-Windows-Sysmon/Operational sourcetype, assigning internal fields like _time and _indextime.

Step 3: Ingestion Polling & Watermarking

  • Mechanism: The Python ingestion daemon (producer_pipeline.py) runs in a continuous loop, waking up every 5 seconds.
  • Query Structure: It queries the Splunk API using an epoch watermark: search index=main (sourcetype="*Sysmon*" OR source="*Sysmon*") | sort +_indextime The earliest query parameter is updated after each poll to _indextime > {last_epoch_watermark} + 0.001 to prevent duplicate processing.
  • Data Normalization: Raw WinEventLog records are regex-parsed line-by-line via _parse_raw_sysmon() and mapped into a structured Python dictionary representation.

Step 4: Temporal Correlation & State Buffering

  • Mechanism: Normal EDR systems evaluate alerts in isolation. CrowdShield AI pushes events into a PidCorrelationBuffer class.
  • Sliding-Window Logic: The buffer indexes incoming logs by their ProcessId. Each event is saved alongside its arrival epoch timestamp. An automated housekeeping routine (expire_old()) purges entries older than 300 seconds (5 minutes).
  • Multi-Step Aggregation: The pipeline groups all logs inside the sliding window for the offending PID. If a process registers $\ge 3$ unique security actions (e.g. starting a shell, reading a .env file, and making an external request), the pipeline flags it as a multi_step_process_activity and appends detailed telemetry context.

Step 5: Heuristic & LLM Priority Scoring

  • Heuristic Check: The pipeline runs the aggregated PID sequence through a deterministic Python rule evaluator (rule_based_analysis). For instance:
    • is_node + read_sensitive + made_network → Risk score: 97 (Topic: high-priority)
    • env_file_deleted → Risk score: 85 (Topic: high-priority)
  • LLM Scorer: Parallel to the rule engine, the data is sent to a LangChain model (CROWDSHIELD_LLM). We recommend the native Splunk-hosted generative AI models (openai/gpt-oss-20b or openai/gpt-oss-120b), with fallback support for Groq Cloud models (e.g. llama-3.3-70b-versatile from https://console.groq.com/home). It processes a system prompt containing classification criteria and outputs a structured JSON response specifying the target risk score, threat category, and PID.
  • Resolution: The maximum risk value is chosen. If the score is $\ge 70$, it is categorized as High Priority; if $\ge 40$, it is categorized as Medium Priority.

Step 6: Kafka Alert Broadcasting

  • Mechanism: The Ingestion daemon initializes a KafkaProducer bound to localhost:9092.
  • Deduplication Guard: Before publishing, the daemon checks corr_buffer.should_alert(pid, risk). If an alert was already broadcast for this PID at a similar risk level, it skips publishing to prevent alert fatigue.
  • Message Dispatch: If valid, the serialized payload (containing the raw telemetry event stream and the threat score object) is sent to the target topic (high-priority or medium-priority).

Step 7: Consumer Ingestion & Splunk MCP Querying

  • Consumer Loop: The graphical controller class (AppController) runs an active KafkaConsumer listener thread, subscribing to priority topics.
  • State Graph Launch: Upon receiving a message, the controller deserializes the payload, registers the event in the UI feed, and kicks off the LangGraph StateGraph pipeline (agent_process.py).
  • Splunk Model Context Protocol (MCP) Search: The Investigation Agent (agents/investigation_agent.py) translates the alert query and triggers a request to the Splunk MCP Server (/services/mcp on port 8089) over the JSON-RPC 2.0 tool-calling protocol:
    {
        "jsonrpc": "2.0",
        "method": "tools/call",
        "params": {
            "name": "splunk_run_query",
            "arguments": {
                "query": "search <alert_process_query>",
                "earliest_time": "-24h",
                "row_limit": 5
            }
        },
        "id": 1
    }
    The Splunk MCP endpoint authorizes via a Bearer token (SPLUNK_MCP_TOKEN), queries the internal indexes, and returns the threat history array in standard JSON-RPC format, which the agent summarizes to evaluate if the process has historical risks.
  • Mitigation Execution: If the final response decision outputs a risk $\ge 90$, the response agent calls the OS utilities (taskkill /F /PID and netsh advfirewall add rule ...) to terminate the target PID and block the C2 network destination.

About

CrowdShield AI is a real-time EDR pipeline that uses Splunk Enterprise to collect events, detects threats with AI, and sends priority alerts through Kafka.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages