A DAG-based pipeline framework for large-scale text analysis with LLM-driven extraction, classification, and synthesis. Built for research on AI governance, contextual integrity, and norm extraction from text corpora.
Trawler orchestrates multi-stage NLP pipelines over document collections, using LLMs for structured information extraction. Each pipeline is defined as a directed acyclic graph (DAG) in YAML. Stages run sequentially or are dispatched to SLURM, with vLLM handling GPU inference via tensor parallelism.
The framework currently supports four research domains, each implemented as a self-contained dagspace:
Processes global news articles to extract structured records of real-world AI deployments, incidents, risks, and benefits. The full pipeline:
- Classify Relevance — keyword pre-gating + LLM binary filter for AI-related content
- Decompose — extracts AI use-case tuples: deployment domain, purpose, capability, deployer, subject, location, date, harms, risks, benefits
- Verify — embedding similarity + entailment scoring to validate extracted tuples against source text
- Classify EU AI Act — maps each AI use case to EU AI Act risk tiers (Prohibited / High / Limited / Minimal)
- Classify Risks & Benefits — fine-grained categorization of specific risks and benefits mentioned
- Taxonomy / Topic / Synthesis — clustering and cross-article synthesis of extracted patterns
Extracts societal norms about information flows from historical and prescriptive texts (Project Gutenberg), grounded in Helen Nissenbaum's Contextual Integrity (CI) framework:
- Fetch Gutenberg — retrieves and chunks books by Gutenberg ID
- Norm/CI Reasoning — LLM analysis of text chunks for societal norms and information flow patterns
- Norm/CI Extraction — structures output as formal CI 5-tuples: subject, sender, recipient, information type, transmission principle
Applies the same CI framework to Reddit community rules:
- Classify — identifies rules governing privacy and information flows (vs. content moderation rules)
- Decompose — extracts CI tuples from relevant community governance rules
Evaluates how well LLMs understand contextual integrity through QA probing, agent action evaluation, judge calibration, active prompting ablation, and context collapse diagnostics. Based on the PrivacyLens dataset.
dagspaces/
├── common/ # Shared framework code
│ ├── orchestrator.py # DAG execution, SLURM dispatch, artifact tracking
│ ├── vllm_inference.py # Direct vLLM inference (GPU detection, NCCL config)
│ ├── wandb_logger.py # W&B experiment tracking
│ ├── stage_utils.py # Shared stage utilities
│ ├── config_schema.py # Pipeline/node dataclasses
│ └── runners/base.py # StageRunner protocol
├── uair/ # AI risk analysis dagspace
├── historical_norms/ # Literature norm extraction dagspace
├── rule_tuples/ # Reddit rule analysis dagspace
└── contextual_integrity_eval/ # CI evaluation dagspace
Each dagspace follows a consistent structure:
dagspaces/{name}/
├── cli.py # Hydra entry point
├── orchestrator.py # Pipeline-specific DAG logic + re-exports from common
├── wandb_logger.py # Thin shim over common/wandb_logger.py
├── conf/
│ ├── config.yaml # Base config (model, runtime, sampling, wandb)
│ ├── pipeline/ # DAG definitions (sources → nodes → outputs)
│ ├── prompt/ # LLM prompt templates
│ ├── model/ # vLLM engine configs (model path, TP size, memory)
│ └── hydra/launcher/ # SLURM launcher configs
├── runners/ # StageRunner implementations (one per stage)
└── stages/ # Stage logic (_pre/_post transforms + run function)
git clone <repo-url> && cd trawler
# Install with uv (recommended)
uv venv --python 3.12 && source .venv/bin/activate
uv pip install -e .Requires CUDA GPUs. Models are loaded from local paths configured in conf/model/*.yaml (e.g., Qwen 2.5-72B-AWQ, Qwen 3-30B).
# UAIR: full news analysis pipeline
python -m dagspaces.uair.cli \
pipeline=full_event_pipeline \
data.parquet_path=/path/to/articles.parquet
# Historical norms: CI extraction from Gutenberg texts
python -m dagspaces.historical_norms.cli \
pipeline=ci_extraction \
data.parquet_path=/path/to/texts.parquet
# Rule tuples: classify + decompose Reddit rules
python -m dagspaces.rule_tuples.cli \
runtime.stage=pipeline \
data.parquet_path=/path/to/rules.parquet
# Debug mode: sample 100 rows
python -m dagspaces.uair.cli \
pipeline=full_event_pipeline \
runtime.debug=true runtime.sample_n=100 \
data.parquet_path=/path/to/data.parquetpython -m dagspaces.uair.cli \
pipeline=full_event_pipeline \
hydra/launcher=g2_slurm_gpu_1x \
data.parquet_path=/path/to/data.parquetLauncher configs in conf/hydra/launcher/ define GPU count, memory, partition, and setup commands.
Pipelines are Hydra-composed YAML DAGs:
# conf/pipeline/my_pipeline.yaml
pipeline:
sources:
articles:
path: ${data.parquet_path}
graph:
nodes:
classify:
stage: classify_relevance
inputs: {articles: articles}
outputs: [classified]
extract:
stage: decompose_nbl
depends_on: [classify]
inputs: {articles: classified}
outputs: [extracted]Override anything from CLI: model=qwen2.5-72b-awq sampling_params.temperature=0.3 runtime.sample_n=500
- Implement in
dagspaces/{name}/stages/mystage.py:
from dagspaces.common.vllm_inference import run_vllm_inference
def _pre(row):
row["messages"] = [{"role": "user", "content": row["text"]}]
row["sampling_params"] = {"max_tokens": 512, "temperature": 0.0}
return row
def _post(row):
row["result"] = row["generated_text"]
return row
def run_mystage(df, cfg):
return run_vllm_inference(df, cfg, _pre, _post, "mystage")- Create runner in
dagspaces/{name}/runners/mystage.pyimplementingStageRunner.run(context). - Register in
runners/__init__.py.
- vLLM — LLM inference with tensor parallelism (
distributed_executor_backend="mp") - Hydra — hierarchical YAML configuration with CLI overrides
- Weights & Biases — experiment tracking, table logging, run grouping
- submitit — SLURM job submission from Hydra
MIT