Skip to content

joaopn/social-data-pipeline

Repository files navigation

Social Data Pipeline

Docker Python 3.10+ PostgreSQL 18 MongoDB 8 CUDA ONNX

A researcher-focused pipeline for processing, classifying, and ingesting large-scale JSON, CSV, and Parquet data dumps into analysis-ready databases. Interactive CLI setup, step-by-step execution with extensive configuration options. Built for the Reddit data dumps and configurable to any high-volume record-based dataset — including Hugging Face datasets.

TL;DR

# Configure databases
python sdp.py db setup                      # Configure PostgreSQL/MongoDB (one-time)
python sdp.py db start                      # Start database(s)

# Process and ingest data
python sdp.py source add reddit             # Add a data source (interactive setup)
python sdp.py run parse                     # Decompress dumps → parse to cleaned, structured files
python sdp.py run postgres_ingest           # Ingest parsed files into PostgreSQL
python sdp.py run mongo_ingest              # Ingest raw data into MongoDB

# Optional data-enrichment
python sdp.py run lingua                    # Adds language detection to parsed files
python sdp.py db setup-mcp                  # MCP servers for AI tool access
python sdp.py run ml                        # GPU classifiers (toxicity, emotions)
python sdp.py run postgres_ml               # Ingest classifier outputs

# Check status
python sdp.py db status                     # Database status
python sdp.py source status                 # Ingestion source status
python sdp.py source error-logs             # Show error details for failed datasets

Table of Contents

◾ Overview ◾ Requirements ◾ Quick Start ◾ CLI Reference ◾ Profiles ◾ Platform Support ◾ FAQ and Troubleshooting


◾ Overview

Social Data Pipeline provides a complete pipeline for working with large-scale social media data dumps:

  • Multi-platform supportReddit (with specialized features), custom JSON/CSV/Parquet platforms, or Hugging Face datasets
  • Automatic detection and decompression of .zst, .gz, .xz, and .tar.gz dump files
  • Parsing JSON, CSV, and Parquet input to structured files (Parquet or CSV) with configurable field extraction
  • Modular classification — CPU-based (Lingua) and GPU-based (transformers) with multi-GPU parallelization and language filtering
  • PostgreSQL ingestion of parsed files with finetuned settings and duplicate handling
  • MongoDB ingestion of raw JSON, CSV, and Parquet directly after extraction, for raw data inspection
  • Optional authentication with admin and read-only database users
  • MCP servers for PostgreSQL and MongoDB, exposing read-only databases to AI tools (Claude Desktop, VS Code, Cursor)

Architecture

flowchart TB
    subgraph Input
        RAW[raw/compressed files]
    end

    subgraph Files
        direction LR
        PARSED[parsed files]
        LINGUA[+ language]
        ML_OUT[+ ML classifiers]
        PARSED -->|lingua| LINGUA
        PARSED -->|ml| ML_OUT
        LINGUA -.->|lang filter| ML_OUT
    end

    subgraph Databases
        direction LR
        PG[(PostgreSQL)]
        MONGO[(MongoDB)]
        MCP{{MCP servers}}
        PG & MONGO -.-> MCP
    end

    RAW -->|parse| PARSED
    RAW -->|mongo_ingest| MONGO
    PARSED & LINGUA -->|postgres_ingest| PG
    ML_OUT -->|postgres_ml| PG
Loading

◾ Requirements

Tip

For GPU classification: NVIDIA Container Toolkit

Recommended for optimal performance:

  • Flash-based storage (NVMe SSDs strongly recommended)
  • High core count CPU (8+)
  • 64GB+ RAM
  • NVIDIA GPU with 8GB+ VRAM (for ml profile)

Note

SM datasets can be very large, and ML classification can take days to months for the full datasets. Check the benchmarks at joaopn/encoder-optimization-guide to estimate runtimes on your hardware.

◾ Quick Start

1. Configure

python sdp.py db setup              # Configure databases (PostgreSQL, MongoDB — one-time)
python sdp.py source add reddit     # Add a data source (interactive setup)

db setup configures database connections, data paths, generates .env, config/db/*.yaml, and postgresql.local.conf (with optional PGTune integration). source add walks you through data types, file patterns, fields, indexes, and classifier configuration — generating per-source config in config/sources/<name>/.

For Reddit, download the data dumps from Arctic Shift and place them in the dumps directory configured during setup. For Hugging Face datasets, see Platform Support. For manual configuration or to understand what each setting does, see the Configuration Reference.

2. Run

python sdp.py db start                        # Start configured database(s)
python sdp.py run parse --source reddit       # Decompress dumps → parse to structured files
python sdp.py run postgres_ingest --source reddit  # Ingest parsed files into PostgreSQL

The --source flag selects the target source (optional when only one is configured). source add prints the recommended run commands for your setup. See the TL;DR for the full set of profiles, or python sdp.py source status to check progress.

3. Analyze

With an optimized PostgreSQL database running, you can send large-scale analytical queries through:

  • The terminal with psql
  • A GUI with pgAdmin or DBeaver
  • AI tools via the built-in MCP servers (see below)

MCP servers (optional) expose your databases to AI tools like Claude Desktop, VS Code, and Cursor:

python sdp.py db setup-mcp           # Configure MCP servers (ports, read-only mode)
python sdp.py db start               # Starts databases + MCP servers together

Database authentication (optional) adds password-protected admin access and a read-only user (used by the MCP servers):

# Enable during initial setup — or re-run to add auth to existing databases
python sdp.py db setup               # Select "Enable database authentication"

Note

By default, databases accept local, unauthenticated connections. Authentication is optional and can be enabled at any time. See the Database Profiles docs for details.


◾ CLI Reference

All operations go through sdp.py with three command groups:

python sdp.py <db|source|run> [options]

Database Management (sdp.py db)

Command Description
sdp.py db setup Configure databases (PostgreSQL, MongoDB, optional auth) — global, one-time
sdp.py db setup-mcp Configure MCP servers for AI tool access (ports, read-only mode)
sdp.py db start [service] Start services: postgres|mongo|postgres-mcp|mongo-mcp (all if unspecified)
sdp.py db stop [service] Stop services: postgres|mongo|postgres-mcp|mongo-mcp (all if unspecified)
sdp.py db status Show database config, health, and MCP status
sdp.py db recover-password Reset database admin password (requires auth enabled)
sdp.py db create-indexes [--source <name>] Interactively create database indexes (PostgreSQL and/or MongoDB)
sdp.py db unsetup Remove database config; data deletion behind double confirmation
sdp.py db unsetup-mcp Remove MCP configuration and stop MCP containers

db setup generates .env, config/db/*.yaml, and config/postgres/postgresql.local.conf. When authentication is enabled, it also generates pg_hba.local.conf and MCP credential files. Database deletion in db unsetup requires two separate confirmations.

Source Management (sdp.py source)

Command Description
sdp.py source add <name> Add a new data source (interactive setup)
sdp.py source add <name> --hf <dataset_id> Add from a Hugging Face dataset (fetches metadata as defaults)
sdp.py source download <name> Download HF dataset files (mirrors to data/dumps/, organizes to data/extracted/)
sdp.py source configure <name> Reconfigure existing source (platform-specific)
sdp.py source add-classifiers <name> Add ML classifiers for a source
sdp.py source remove <name> Remove source configuration
sdp.py source list List configured sources
sdp.py source status [name] Show source processing/ingestion status
sdp.py source error-logs [name] Show database ingestion error logs

source add walks you through platform selection, file patterns, fields, indexes, and optional classifier configuration. With --hf, it fetches HF dataset metadata (configs, fields, types) to pre-populate defaults — you still go through the full interactive setup. source download accepts --token for private datasets and --data-type to download selectively. All per-source config is written to config/sources/<name>/.

Pipeline (sdp.py run)

Command Description
sdp.py run <profile> Run a pipeline profile
sdp.py run <profile> --source <name> Run for a specific source (auto-selects if only one configured)
sdp.py run <profile> --build Rebuild the Docker image before running

Valid profiles: parse, lingua, ml, postgres_ingest, postgres_ml, mongo_ingest. --build rebuilds the Docker image before running (needed after code or dependency changes). The global --tag flag (e.g. python sdp.py --tag db setup) prefixes each interactive prompt with a [tag_id] for automation tools like pexpect.

source status reads pipeline state files to show ingestion progress (datasets processed, in-progress, failed) without querying the database. source error-logs shows the full error details and relevant mongoimport log output for failed datasets. Use --profile to filter by ingestion profile (postgres_ingest, postgres_ml, mongo_ingest).

◾ Profiles

Profile Description Input Output
parse Decompress dumps, parse JSON/CSV/Parquet to Parquet/CSV Compressed dump files, extracted JSON/CSV/Parquet PARSED_PATH/
lingua Lingua language detection (CPU) Parsed files OUTPUT_PATH/lingua/
ml Transformer classifiers (GPU) Parsed files + Lingua output OUTPUT_PATH/{classifier}/
postgres PostgreSQL database server
postgres_ingest Ingest into PostgreSQL Parsed files (or Lingua-enriched) PostgreSQL tables
postgres_ml Ingest ML outputs into PostgreSQL Classifier output files PostgreSQL tables
mongo MongoDB database server
mongo_ingest Ingest raw data into MongoDB Extracted JSON/NDJSON/CSV/Parquet MongoDB collections

Note

GPU profile requires NVIDIA Container Toolkit. All profiles track progress and resume automatically — rerun any profile safely without reprocessing completed files.

For detailed configuration and algorithm documentation, see the per-profile docs:

◾ Platform Support

Platform Description
reddit Specialized Reddit features: waterfall deletion detection, base-36 ID conversion, format compatibility
custom/<name> JSON, CSV, and Parquet parsing for arbitrary data: dot-notation, array indexing, type enforcement

Platform is determined by source name: sdp.py source add reddit uses the Reddit platform, any other name uses custom/<name>. To process arbitrary JSON/NDJSON, CSV, or Parquet data, use any name other than reddit and configure your platform interactively.

Hugging Face Datasets

Datasets hosted on Hugging Face can be added directly:

python sdp.py source add mydata --hf user/dataset-name   # Fetches HF metadata as setup defaults
python sdp.py source download mydata                      # Download parquet files
python sdp.py run parse --source mydata                   # Parse (column selection, type enforcement)
python sdp.py run mongo_ingest --source mydata            # Ingest raw parquet into MongoDB

The --hf flag fetches dataset metadata (configs, fields, types) from the HF API and uses it to pre-populate the interactive setup. HF configs are grouped interactively into data types (e.g., grouping yearly configs into comments and submissions). source download mirrors the HF repo to data/dumps/ and then organizes files into data/extracted/ by data type. Files flow through the standard pipeline — parse handles column selection and type enforcement, mongo_ingest can ingest raw parquet directly. No additional local dependencies beyond pyyaml.

Extending functionality

  • Add new platforms: Create config files and an optional custom parser. See Adding Platforms.
  • Add custom classifiers: Config-only (add a HuggingFace model via YAML) or custom Python. See Custom Classifiers.
  • Full configuration reference: All environment variables, YAML files, and the source override system. See Configuration.

◾ FAQ and Troubleshooting

Can I run classifiers without the database?

Yes! Use python sdp.py run lingua or python sdp.py run ml independently. The database profile is optional.

Can I use this for non-Reddit data?

Yes! Select custom during python sdp.py source add <name> to process arbitrary JSON/NDJSON, CSV, or Parquet data. For Hugging Face datasets, use python sdp.py source add <name> --hf <dataset_id> to fetch metadata and pre-populate setup defaults. See the Custom Platform setup guide.

How do I add support for a new platform?

See Adding New Platforms. Create a platform template in config/templates/ and optionally a custom parser.

How do I reprocess data?

Delete the relevant output directories and rerun the profile:

rm -rf data/output/<source>/toxic_roberta/          # Reprocess a specific classifier
rm -rf data/output/<source>/                        # Reprocess all classifiers
rm -rf data/output/<source>/ data/parsed/<source>/ data/extracted/<source>/  # Full reprocess
Why no table partitioning?

This project targets large-scale, Reddit-wide analysis. For queries not limited to a few months, partitioning would split indexes into 200+ partitions, hurting query performance. It would also interfere with ID deduplication during ingestion.

Troubleshooting

Pipeline fails:

docker compose logs parse
docker compose logs lingua
docker compose logs postgres-ingest
docker compose logs postgres-ml

Database connection issues:

docker compose ps
docker compose logs postgres
docker compose logs mongo

Out of disk space:

  • Use cleanup_temp: true in pipeline.yaml
  • Check temp directories for leftover files
  • Consider sequential mode to reduce intermediate storage

GPU not detected:

docker run --rm --gpus all nvidia/cuda:12.1.1-base-ubuntu22.04 nvidia-smi

AI disclaimer

Most of the orchestration and dockerization glue code was written by LLMs, under human planning and code review. The algorithms and ingestion structure are a merge of a number of private repos developed over a period of almost 4 years.

License

See LICENSE file.

About

Pipeline for processing, classifying, and ingesting large-scale social data

Topics

Resources

License

Stars

Watchers

Forks

Contributors