A production-grade real-time trading pipeline built with Python, featuring high-frequency data processing, risk management, and analytics capabilities.
- Real-time Data Streaming: Apache Kafka for high-throughput message processing
- In-Memory Caching: Redis for ultra-fast state management and real-time metrics
- Persistent Storage: PostgreSQL with time-series optimizations
- WebSocket API: Real-time updates for trading dashboards and alerts
- Order Management: Full order lifecycle with validation and execution
- Risk Controls: VaR calculations, drawdown monitoring, position sizing
- Circuit Breakers: Automatic trading halts under extreme conditions
- Real-time Alerts: Configurable risk threshold monitoring
- Live P&L Tracking: Real-time profit/loss calculation and attribution
- Technical Indicators: Moving averages, RSI, Bollinger Bands
- Performance Metrics: Sharpe ratio, max drawdown, win rate analysis
- Backtesting Framework: Historical strategy simulation and optimization
- Low Latency: <500ms data ingestion to alert, <100ms order processing
- High Throughput: Handles 10k+ events per second
- Monitoring: Prometheus metrics, structured logging, health checks
- Scalability: Horizontal scaling with Kafka partitioning
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Market Data βββββΆβ Apache Kafka βββββΆβ Risk Manager β
β Feeds β β (Streaming) β β & Validators β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β PostgreSQL ββββββ Redis βββββΆβ FastAPI β
β (Persistence) β β (Cache/State) β β (REST/WS) β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β Dashboard β
β & Clients β
βββββββββββββββββββ
- Nix with flakes enabled
- devenv and direnv (alternative)
- Docker and Docker Compose (fallback)
- Python 3.11+
Choose one of the following methods. Run ./setup-dev.sh to see what's available on your system.
git clone <repository>
cd trading-pipeline
# Enter development shell
nix develop
# Start services manually
start-postgres
start-redis
# Or use Docker for Kafka
docker-compose up -d kafkagit clone <repository>
cd trading-pipeline
direnv allow # Activates devenv automaticallygit clone <repository>
cd trading-pipeline
# Start all services
docker-compose up -d
# Install Python dependencies
python -m venv venv
source venv/bin/activate
pip install -r requirements.txtπ For detailed setup instructions and comparisons, see docs/development-setup.md
With Nix Flakes:
# Start individual services
start-postgres # PostgreSQL on localhost:5432
start-redis # Redis on localhost:6379
# Stop all services
stop-services
# Or use Docker for additional services
docker-compose up -d kafkaWith devenv: Services (PostgreSQL and Redis) start automatically when entering the shell.
- Run the API:
python -m trading_pipeline.api.main- Access Services:
- API Documentation: http://localhost:8000/docs
- Health Check: http://localhost:8000/health/detailed
- WebSocket: ws://localhost:8000/ws/live
# Start all services
docker-compose up -d
# Check service health
docker-compose ps
# View logs
docker-compose logs -f trading-apiimport httpx
response = httpx.post("http://localhost:8000/api/v1/trading/orders", json={
"symbol": "AAPL",
"side": "buy",
"order_type": "limit",
"quantity": "100",
"price": "150.50"
})response = httpx.get("http://localhost:8000/api/v1/risk/metrics/portfolio_1")
risk_data = response.json()const ws = new WebSocket('ws://localhost:8000/ws/live');
ws.onopen = () => {
// Subscribe to P&L updates
ws.send(JSON.stringify({
type: 'subscribe',
topic: 'pnl_updates'
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Real-time update:', data);
};Key environment variables (see .env.example):
# Database
DATABASE_URL=postgresql://trading_user:trading_pass@localhost:5432/trading_pipeline
# Redis
REDIS_URL=redis://localhost:6379
# Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
# Risk Management
MAX_POSITION_SIZE=100000.0
MAX_PORTFOLIO_RISK=0.02
CIRCUIT_BREAKER_THRESHOLD=0.05
# Performance Targets
MAX_LATENCY_MS=500
TARGET_THROUGHPUT=10000# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run with coverage
pytest --cov=trading_pipeline
# Integration tests
pytest tests/integration/- Order processing latency
- Risk calculation performance
- Cache hit rates
- WebSocket connection counts
import structlog
logger = structlog.get_logger()
logger.info(
"Order processed",
order_id=order.id,
symbol=order.symbol,
latency_ms=processing_time
)/health/- Basic service status/health/detailed- Full dependency health/health/ready- Kubernetes readiness probe/health/live- Kubernetes liveness probe
apiVersion: apps/v1
kind: Deployment
metadata:
name: trading-pipeline
spec:
replicas: 3
selector:
matchLabels:
app: trading-pipeline
template:
metadata:
labels:
app: trading-pipeline
spec:
containers:
- name: api
image: trading-pipeline:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: trading-secrets
key: database-url- Database: Connection pooling, read replicas, partitioning
- Redis: Clustering, memory optimization, eviction policies
- Kafka: Partition tuning, batch size optimization
- API: Worker processes, async connection limits
- JWT authentication for API access
- Input validation with Pydantic
- SQL injection prevention with SQLAlchemy
- Rate limiting and request size limits
- Secrets management with environment variables
trading-pipeline/
βββ src/trading_pipeline/
β βββ api/ # FastAPI routes and WebSocket
β βββ core/ # Configuration and models
β βββ data/ # Database and cache layers
β βββ streaming/ # Kafka integration
β βββ risk/ # Risk management engine
β βββ backtesting/ # Strategy backtesting
β βββ analytics/ # Performance calculations
βββ tests/
β βββ unit/ # Unit tests
β βββ integration/ # Integration tests
βββ deployment/ # Kubernetes manifests
βββ monitoring/ # Prometheus/Grafana config
βββ docs/ # Additional documentation
βββ devenv.nix # devenv configuration
βββ flake.nix # Nix flakes configuration
βββ .envrc # direnv configuration
βββ setup-dev.sh # Development setup helper
- Fork the repository
- Create a feature branch
- Make your changes with tests
- Run the test suite and linting
- Submit a pull request
- Follow PEP 8 style guide
- Add type hints to all functions
- Write comprehensive tests
- Update documentation for new features
- Use structured logging
MIT License - see LICENSE file for details.
- Issues: Create GitHub issues for bugs and feature requests
- Documentation: Check
/docsdirectory for detailed guides - Performance: See monitoring dashboards for system health
- Machine learning-based risk models
- Multi-exchange connectivity
- Advanced order types (iceberg, TWAP, VWAP)
- Mobile dashboard application
- Enhanced backtesting with walk-forward analysis