Production-grade cryptocurrency data pipeline orchestrated with Dagster. Extracts market data from exchanges, stores raw data in MinIO, transforms with Polars, and loads into PostgreSQL.
# Install dependencies
uv sync
# Start Dagster development server
uv run dagster devVisit http://localhost:3000 to access the Dagster UI.
Note: For Kubernetes deployments that need to access external services (MinIO, databases) on custom domains like
.lan, you may need to configure CoreDNS. See DNS Configuration for details.
Comprehensive documentation is available in the docs/ directory.
To view the documentation locally:
# Serve documentation with MkDocs
./serve-docs.sh
# Or manually
uv run mkdocs serveThen open http://127.0.0.1:8000 in your browser.
- Home: Project overview and quick start
- Architecture: System design, data flow, and infrastructure
- Setup Guide: Environment setup and configuration
- Development Guide: Development workflow and best practices
- Deployment Guide: Kubernetes deployment and operations
- API Reference: Code documentation and data contracts
- Troubleshooting: Common issues and solutions
This Dagster code location implements a production-grade data pipeline:
- Extract: Fetch market data from cryptocurrency exchanges (Binance, ByBit, Gate.io)
- Load: Store raw JSON data in MinIO (S3-compatible object storage)
- Transform: Clean and validate data using Narwhals/Polars
- Load: Store structured data in PostgreSQL for analytics
- Type-Safe: Comprehensive Pydantic models with validation
- Tested: 150+ unit and integration tests with 82% coverage
- Observable: Structured logging with contextual information
- Resilient: Retry logic, error handling, and graceful degradation
- Scalable: Kubernetes-native with K8sRunLauncher
- Maintainable: Clean code, comprehensive documentation
- Flexible IO: Multiple storage backends (Filesystem, S3/MinIO, DuckDB, PostgreSQL, SQLite, KuzuDB)
| Component | Technology |
|---|---|
| Orchestration | Dagster 1.12+ |
| Data Processing | Narwhals + Polars |
| Data Validation | Pydantic 2.12+ |
| Exchange API | CCXT 4.5+ |
| Object Storage | MinIO / S3 |
| Database | PostgreSQL 14+ |
| Testing | Pytest 9.0+ |
| Container Runtime | Kubernetes |
- Implement extract asset factory with Pydantic validation
- Implement transform asset factory with Narwhals/Polars
- Add comprehensive test suites (150 tests passing, 82% coverage)
- Create custom IO Managers (Filesystem, S3/MinIO, DuckDB, SQL, KuzuDB)
- Implement Narwhals-compatible DataFrame interfaces
- Implement load asset factory for PostgreSQL
- Wire up complete extract β transform β load pipeline in Definitions
- Add Soda Core data quality checks
- Define data contracts for ticker data
- Implement data validation rules (nulls, ranges, types)
- Add data quality tests between pipeline stages
- Create data lineage documentation
- Create Dockerfile for containerized deployment
- Set up Kubernetes manifests (Helm + Kustomize)
- Configure K8sRunLauncher for job execution
- Implement secrets management (SealedSecrets)
- Add CI/CD pipeline (GitHub Actions)
- Add pipeline monitoring and alerting
- Implement data freshness checks
- Create operational dashboards
- Add structured logging for all assets
- Set up error notification system
- Create architecture diagram (D2 format)
- Create comprehensive MkDocs documentation site
- Document IO managers and data flow
- Add API reference documentation
- Create deployment guide
- Document troubleshooting procedures
- Document data contracts and schemas
- Add pipeline runbooks for operations
- Create comprehensive test data fixtures
- Add unit tests for all IO managers (46 tests)
- Add unit tests for extract assets (23 tests)
- Add unit tests for transform assets (17 tests)
- Add unit tests for resources and utilities (64 tests)
- Achieve 82% overall code coverage
- Add integration tests for full pipeline
- Implement data quality regression tests
- Add performance benchmarks
- Add end-to-end pipeline tests
- Add support for additional exchanges (Kraken, Coinbase, etc.)
- Implement incremental data loading
- Add data versioning with lakeFS
- Create semantic layer with Cube
- Build self-service analytics with Superset
- Implement real-time streaming pipeline
This repository contains the Dagster Code Location for the Crypto Data Pipeline. It defines the assets and jobs required to:
- Extract market data from centralized exchanges (Binance, ByBit, etc.).
- Load raw JSON data into MinIO (Object Storage).
- Transform data into a relational format in PostgreSQL.
This project utilizes a hybrid deployment strategy:
- Containerization (Docker): The Dockerfile packages the user code and dependencies.
- Manifests (Helm + Kustomize): We leverage the official Dagster User Code Helm Chart as a base. We use Kustomize to "inflate" this chart and apply local configurations (overlays) without maintaining raw deployment manifests.
- Execution (K8sRunLauncher): When a job runs, the Dagster Daemon launches a separate, ephemeral pod using the image defined in this repository.
flowchart LR
Daemon[Dagster Daemon] -->|gRPC| CodeLoc[Code Location Pod]
Daemon -->|Launch| JobPod[Ephemeral Job Pod]
JobPod -->|Write| MinIO[(MinIO)]
JobPod -->|Write| Postgres[(PostgreSQL)]
- Dagster Instance: A running Dagster platform (Webserver/Daemon).
- MinIO: Reachable at minio.lxc.svc.cluster.local.
- PostgreSQL: Reachable at postgresql.database.svc.cluster.local.
- Tools: kubectl, kustomize, docker, sops (optional but recommended).
Secrets are managed via SealedSecrets.
- dagster-crypto-secrets: Contains POSTGRESQL_PASSWORD and MINIO_PASSWORD.
Since this repo defines the execution environment for both the Code Location and the ephemeral Job Pods, you must build the image first.
# Build
docker build -t your-registry/dagster-crypto:latest .
# Push (Required for K8s to pull it)
docker push your-registry/dagster-crypto:latest
We use SealedSecrets to manage credentials.
Create a .env file (do not commit this):
DAGSTER\_POSTGRESQL\_PASSWORD=supersecret
MINIO\_PASSWORD=anothersecret
Run the generation script:
sops exec-env .env "nu create\_sealed\_secrets.nu"
Update the image tag in apps/dagster/overlays/prod/kustomization.yaml if necessary, then deploy. This command uses Kustomize to inflate the Helm chart with your production patches.
kubectl apply -k apps/dagster/overlays/prod
Check if the Dagster Platform has picked up the new location:
- Open Dagster UI.
- Go to Deployment > Code Locations.
- Ensure dagster-crypto is "Loaded" and green.
project/
βββ Dockerfile \# Defines runtime environment
βββ apps/
β βββ dagster/
β β βββ base/ \# Base Helm-based Kustomize config
β β βββ overlays/prod/ \# Production patches & SealedSecrets
βββ code/ \# Python business logic
βββ create\_sealed\_secrets.nu
| Variable | Description |
|---|---|
| POSTGRESQL_HOST | Database hostname. |
| MINIO_ENDPOINT | MinIO API endpoint. |
| DAGSTER_CURRENT_IMAGE | Propagates the image version to job pods. |
- Replicas: 1 (Stateless service).
- Resources: 100m CPU / 256Mi RAM (For the gRPC server only).
- Web Interface: http://dagster.homelab.lan (Served by the main Dagster platform).
- Internal gRPC: dagster-crypto.dagster.svc.cluster.local:3030.
An ingress-route.yaml file exists in base/ for compatibility but is not enabled for this code location, as it is an internal service.
- Raw Data: MinIO bucket crypto-raw.
- Processed Data: Postgres database crypto.
MinIO (Critical):
mc mirror minio-local/crypto-raw /backup-location/
PostgreSQL: Standard pg_dump.
- Job Stuck in "Starting": Usually indicates the Daemon cannot create the ephemeral pod. Check kubectl get events for ImagePullBackOff.
- Code Location Error: If the Python code fails to load (syntax error), the Deployment will crash loop. Check kubectl logs.
# View gRPC server logs
kubectl logs -n dagster -l app=dagster-crypto -f
# List running job pods
kubectl get pods -n dagster -l dagster/job
-
Update code and rebuild/push the Docker image.
-
Update the tag in kustomization.yaml.
-
Apply changes:
kubectl apply -k apps/dagster/overlays/prod -
Restart deployment to force code reload:
kubectl rollout restart deployment -n dagster dagster-crypto