diff --git a/.env.example b/.env.example index 4b451194..d89caf83 100644 --- a/.env.example +++ b/.env.example @@ -144,7 +144,6 @@ ENABLE_CDN=true CLUSTER_MODE=false CLUSTER_WORKERS=4 -# ============================================================================= # Secrets Management Configuration # ============================================================================= # Secret provider: 'env' (default), 'aws' (AWS Secrets Manager), or 'vault' (HashiCorp Vault) @@ -178,3 +177,56 @@ CIRCUIT_BREAKER_RESET_TIMEOUT_MS=30000 CIRCUIT_BREAKER_ROLLING_COUNT_TIMEOUT=60000 # Number of stat buckets for rolling window CIRCUIT_BREAKER_ROLLING_COUNT_BUCKETS=10 + +# ============================================================================= +# Database Sharding (#602) +# ============================================================================= +# Set SHARD_COUNT=0 (or omit) to run in single-shard fallback mode using the +# DATABASE_* variables above. Set SHARD_COUNT>0 to enable multi-shard routing. +# ============================================================================= + +SHARD_COUNT=0 + +# ── Example: 3-shard setup (uncomment and adjust when SHARD_COUNT=3) ──────── +# SHARD_0_HOST=pg-shard-0.internal +# SHARD_0_PORT=5432 +# SHARD_0_USER=teachlink +# SHARD_0_PASSWORD= +# SHARD_0_DB=teachlink_0 +# SHARD_0_POOL_MAX=30 +# SHARD_0_POOL_MIN=5 +# SHARD_0_WEIGHT=100 +# SHARD_0_REGION=us-east-1 +# SHARD_0_STATUS=active +# SHARD_0_REPLICA_COUNT=1 +# SHARD_0_REPLICA_0_HOST=pg-replica-0.internal +# SHARD_0_REPLICA_0_PORT=5432 +# SHARD_0_REPLICA_0_WEIGHT=100 + +# SHARD_1_HOST=pg-shard-1.internal +# SHARD_1_PORT=5432 +# SHARD_1_USER=teachlink +# SHARD_1_PASSWORD= +# SHARD_1_DB=teachlink_1 +# SHARD_1_POOL_MAX=30 +# SHARD_1_POOL_MIN=5 +# SHARD_1_WEIGHT=100 +# SHARD_1_REGION=us-west-2 +# SHARD_1_STATUS=active + +# SHARD_2_HOST=pg-shard-2.internal +# SHARD_2_PORT=5432 +# SHARD_2_USER=teachlink +# SHARD_2_PASSWORD= +# SHARD_2_DB=teachlink_2 +# SHARD_2_POOL_MAX=30 +# SHARD_2_POOL_MIN=5 +# SHARD_2_WEIGHT=100 +# SHARD_2_REGION=eu-west-1 +# SHARD_2_STATUS=active + +# ── Rebalance Thresholds ────────────────────────────────────────────────────── +SHARD_REBALANCE_HIGH_WATERMARK=80 +SHARD_REBALANCE_LOW_WATERMARK=20 +SHARD_REBALANCE_BATCH_SIZE=500 +>>>>>>> e142fba (feat: implement database sharding strategy (#602)) diff --git a/docs/SHARDING_STRATEGY.md b/docs/SHARDING_STRATEGY.md new file mode 100644 index 00000000..b076adab --- /dev/null +++ b/docs/SHARDING_STRATEGY.md @@ -0,0 +1,442 @@ +# Database Sharding Strategy — TeachLink Backend + +> **Issue**: #602 — Implement database sharding strategy +> **Branch**: `feature/602-database-sharding-strategy` + +--- + +## Table of Contents + +1. [Overview](#1-overview) +2. [Sharding Strategies](#2-sharding-strategies) +3. [Architecture](#3-architecture) +4. [Router Implementation](#4-router-implementation) +5. [Connection Management](#5-connection-management) +6. [Migration Strategy](#6-migration-strategy) +7. [Rebalancing Procedure](#7-rebalancing-procedure) +8. [Health Monitoring](#8-health-monitoring) +9. [Configuration Reference](#9-configuration-reference) +10. [API Reference](#10-api-reference) +11. [Runbook](#11-runbook) + +--- + +## 1. Overview + +TeachLink uses horizontal database sharding to distribute PostgreSQL load across multiple independent nodes. Each shard is a fully independent PostgreSQL instance holding a subset of the data. + +### Why Sharding? + +| Challenge | Sharding Solution | +| ------------------------------------ | ---------------------------------------- | +| Single-node write throughput ceiling | Distribute writes across N shards | +| Growing dataset storage limits | Each shard stores only 1/N of the data | +| Connection pool exhaustion | Each shard has its own pool | +| Geographic latency | Route tenants to regionally close shards | + +### Design Principles + +- **Zero-downtime migration** — data moves in batches with source still serving traffic +- **Deterministic routing** — same key always resolves to the same shard (consistent hashing) +- **Graceful fallback** — if `SHARD_COUNT=0`, the system runs on the existing single database +- **Operator-first** — all rebalancing operations are observable and reversible + +--- + +## 2. Sharding Strategies + +The system supports four routing strategies selectable per query: + +### 2.1 Hash-Based (Default) + +Uses a **consistent-hash ring** with 150 virtual nodes per physical shard. + +``` +key → SHA-256 → uint32 → ring position → shardId +``` + +**Best for**: User data, course content, generic entities. +**Pros**: Automatic distribution, minimises key movement on shard add/remove. +**Cons**: No data locality — related rows may span shards. + +### 2.2 Tenant-Based + +Normalises the key to its tenant segment before hashing: + +``` +"tenant:ACME_CORP:course-99" → hash("tenant:ACME_CORP") → shard +``` + +**Best for**: Multi-tenant SaaS — guarantees all data for a tenant is co-located. +**Pros**: Cross-tenant joins never cross shard boundaries. +**Cons**: Uneven distribution if tenants are very different sizes. + +### 2.3 Range-Based + +Maps numeric ranges to explicit shards: + +``` +[0, 1_000_000) → shard-00 +[1_000_000, 2_000_000) → shard-01 +[2_000_000, 3_000_000) → shard-02 +``` + +**Best for**: Time-series data, ordered IDs with predictable growth. +**Cons**: Requires manual bucket management; hot-spots possible if ranges are uneven. + +### 2.4 Read-Replica + +Routes **read** queries to a weighted pool of replicas; writes still target the primary. + +``` +forRead=true → weighted-random pick from shard.readReplicas +``` + +**Best for**: Read-heavy workloads (dashboards, reports, search pre-fetching). + +--- + +## 3. Architecture + +``` +Request + │ + ▼ +ShardRouter ──── consistent-hash ring ────► ShardConfig (host, port, pool) + │ │ + │ ▼ + │ ShardConnectionManager + │ (lazy DataSource cache) + │ │ + │ ▼ + │ PostgreSQL Shard Node + │ + ├── for reads ──► ReadReplica (weighted random) + │ + └── migrations ─► ShardMigrationService ─► ShardRebalanceService +``` + +### Module Dependency Graph + +``` +ShardingModule + ├── ShardConfigService (loads topology from env) + ├── ShardRouter (routing decisions, hash ring) + ├── ShardConnectionManager (lazy TypeORM DataSource per shard) + ├── ShardMigrationService (cross-shard data movement) + ├── ShardRebalanceService (automated / manual rebalancing) + └── ShardHealthService (SELECT 1 + pool metrics) +``` + +--- + +## 4. Router Implementation + +### Consistent-Hash Ring + +Each physical shard is represented by **N virtual nodes** placed uniformly on a 0–2³² ring. +`N = round(150 × weight / 100)` — weight-aware placement. + +**Ring lookup (binary search):** + +``` +keyHash = SHA256(key)[0:4] as uint32 +pos = first virtualNode.position ≥ keyHash (binary search, wraps to 0) +shardId = ring[pos].shardId +``` + +Adding a new shard only moves `1/N` of the keyspace — minimising reshuffling. + +### Weighted Virtual Nodes + +| Shard | Weight | Virtual Nodes | +| -------- | ------ | ------------- | +| shard-00 | 100 | 150 | +| shard-01 | 100 | 150 | +| shard-02 | 50 | 75 | + +shard-02 handles ~25% of traffic vs ~37.5% each for shard-00 and shard-01. + +### Ring Rebuild Trigger + +The ring is rebuilt automatically on: + +- Application startup +- Manual `POST /sharding/ring/rebuild` +- Completion of a rebalance plan + +--- + +## 5. Connection Management + +`ShardConnectionManager` maintains a **lazy cache** of TypeORM `DataSource` instances: + +``` +getConnection(shardId) + │ + ├── cached & initialized? → return cached DataSource + │ + └── not cached → new DataSource(shardConfig) → initialize() → cache → return +``` + +Each DataSource uses the shard-specific pool settings: + +- `poolMax` — configurable per shard (default 30) +- `poolMin` — configurable per shard (default 5) +- `connectionTimeoutMillis` — from `DATABASE_POOL_ACQUIRE_TIMEOUT_MS` env var +- `idleTimeoutMillis` — from `DATABASE_POOL_IDLE_TIMEOUT_MS` env var + +**Shutdown**: `closeAll()` is called via NestJS lifecycle hooks, gracefully destroying all DataSources before process exit. + +--- + +## 6. Migration Strategy + +### 6.1 Migration Flow + +``` +Operator: POST /sharding/migrations + { + "sourceShardId": "shard-00", + "targetShardId": "shard-01", + "entityType": "user", + "estimatedRowCount": 50000, + "batchSize": 500, + "dryRun": true ← validate first + } +``` + +**Phase 1 — Dry Run** (always recommended first): + +``` +SELECT * FROM "user" ORDER BY id LIMIT 500 OFFSET 0 +→ Log: "Would insert 500 rows into shard-01" (no writes) +``` + +**Phase 2 — Live Migration**: + +``` +LOOP: + SELECT rows from source (batch) + INSERT … ON CONFLICT DO NOTHING into target (idempotent) + yield event loop (back-pressure) +ENDLOOP + +source shard → DRAINING status +DELETE migrated rows from source +source shard → ACTIVE status +``` + +### 6.2 Migration Properties + +| Property | Value | +| ----------------- | ---------------------------------------- | +| Idempotent | ✅ `ON CONFLICT DO NOTHING` | +| Zero-downtime | ✅ Source stays up during copy | +| Back-pressure | ✅ `setImmediate` between batches | +| Progress tracking | ✅ `GET /sharding/migrations/:planId` | +| Rollback | ✅ `DELETE /sharding/migrations/:planId` | + +### 6.3 Pre-Migration Checklist + +- [ ] Run dry-run and verify row counts match +- [ ] Verify DDL parity on target shard (same tables / indexes) +- [ ] Confirm target shard has sufficient disk capacity +- [ ] Schedule during low-traffic window +- [ ] Set up monitoring alert on `migration_status != completed` + +### 6.4 Rollback + +If a migration fails mid-run, `ShardMigrationService` marks it `failed`. +Rows already copied to the target can be removed by calling: + +``` +DELETE /sharding/migrations/:planId +``` + +> ⚠️ This only marks the plan as `rolled_back` in memory. For production, implement an audit log table that records the IDs of all rows copied so that the rollback DELETE can be precise. + +--- + +## 7. Rebalancing Procedure + +### 7.1 Automated Rebalancing + +The system monitors pool utilisation per shard. When a shard exceeds the **high watermark** (default 80%), migration plans are auto-generated targeting shards below the **low watermark** (default 20%). + +``` +POST /sharding/rebalance/auto +{ + "entityTypes": ["user", "course"], + "autoExecute": false ← dry-run first +} +``` + +**Review the plan**, then re-submit with `autoExecute: true`. + +### 7.2 Manual Rebalancing + +For planned splits, merges, or shard decommissions: + +``` +POST /sharding/rebalance +{ + "dryRun": false, + "migrations": [ + { + "sourceShardId": "shard-00", + "targetShardId": "shard-03", + "entityType": "course", + "estimatedRowCount": 100000, + "batchSize": 1000, + "dryRun": false + } + ] +} +``` + +### 7.3 Adding a New Shard + +1. Provision a new PostgreSQL instance. +2. Run DDL migrations on the new shard (schema must match existing shards). +3. Add `SHARD_N_*` environment variables for the new shard. +4. Rolling restart the application (the ring rebuilds at startup). +5. Monitor via `GET /sharding/health` — the new shard should appear `active`. +6. Optionally run manual rebalance to populate the new shard. + +### 7.4 Decommissioning a Shard + +1. Set shard status to `DRAINING` via `ShardConfigService.updateShardStatus()`. +2. Migrate all entity types off the shard using `POST /sharding/migrations`. +3. Monitor until all migrations complete. +4. Remove the shard's `SHARD_N_*` environment variables. +5. Rolling restart — the shard disappears from the ring. + +--- + +## 8. Health Monitoring + +``` +GET /sharding/health +→ [ + { + "shardId": "shard-00", + "status": "active", + "activeConnections": 8, + "poolUtilizationPercent": 27, + "avgQueryLatencyMs": 3, + "errorRatePercent": 0, + "lastCheckedAt": "2026-05-30T08:00:00Z" + }, + ... + ] +``` + +### Metrics to Alert On + +| Metric | Warning | Critical | +| ------------------------ | ----------------------- | --------- | +| `poolUtilizationPercent` | > 70% | > 90% | +| `avgQueryLatencyMs` | > 100ms | > 500ms | +| `errorRatePercent` | > 1% | > 5% | +| `status` | `draining` / `readonly` | `offline` | + +--- + +## 9. Configuration Reference + +### Core Sharding Variables + +```bash +# Number of shards (0 = single-shard fallback mode) +SHARD_COUNT=3 + +# Per-shard configuration (repeat for each shard index) +SHARD_0_HOST=pg-shard-0.internal +SHARD_0_PORT=5432 +SHARD_0_USER=teachlink +SHARD_0_PASSWORD= +SHARD_0_DB=teachlink_0 +SHARD_0_POOL_MAX=30 +SHARD_0_POOL_MIN=5 +SHARD_0_WEIGHT=100 +SHARD_0_REGION=us-east-1 +SHARD_0_STATUS=active + +# Read replicas (optional) +SHARD_0_REPLICA_COUNT=1 +SHARD_0_REPLICA_0_HOST=pg-replica-0.internal +SHARD_0_REPLICA_0_PORT=5432 +SHARD_0_REPLICA_0_WEIGHT=100 +``` + +### Rebalance Thresholds + +```bash +SHARD_REBALANCE_HIGH_WATERMARK=80 # % pool utilisation → trigger rebalance +SHARD_REBALANCE_LOW_WATERMARK=20 # % pool utilisation → eligible as target +SHARD_REBALANCE_BATCH_SIZE=500 # rows per migration batch +``` + +--- + +## 10. API Reference + +| Method | Path | Description | +| -------- | ------------------------------ | -------------------------------- | +| `GET` | `/sharding/shards` | List all shard configurations | +| `POST` | `/sharding/route` | Resolve shard for a routing key | +| `GET` | `/sharding/health` | Health status of all shards | +| `GET` | `/sharding/health/:id` | Health status of one shard | +| `POST` | `/sharding/migrations` | Start a cross-shard migration | +| `GET` | `/sharding/migrations` | List all migration plans | +| `GET` | `/sharding/migrations/:planId` | Get migration status | +| `DELETE` | `/sharding/migrations/:planId` | Roll back a migration | +| `POST` | `/sharding/rebalance` | Manual rebalance | +| `POST` | `/sharding/rebalance/auto` | Automated rebalance analysis | +| `GET` | `/sharding/rebalance/plans` | List rebalance plans | +| `POST` | `/sharding/ring/rebuild` | Rebuild the consistent-hash ring | + +--- + +## 11. Runbook + +### Runbook: Shard is Offline + +```bash +# 1. Check health +curl http://localhost:3000/sharding/health/ + +# 2. Verify PostgreSQL is up on the host +psql -h -U teachlink -d teachlink_N -c "SELECT 1" + +# 3. If DB is down, spin it back up or fail over to replica +# 4. Force ring rebuild once shard is healthy +curl -X POST http://localhost:3000/sharding/ring/rebuild +``` + +### Runbook: Shard is Overloaded + +```bash +# 1. Check utilisation +curl http://localhost:3000/sharding/health + +# 2. Run auto-rebalance in dry-run mode +curl -X POST http://localhost:3000/sharding/rebalance/auto \ + -H "Content-Type: application/json" \ + -d '{"entityTypes":["user","course"],"autoExecute":false}' + +# 3. Review the plan, then execute +curl -X POST http://localhost:3000/sharding/rebalance/auto \ + -d '{"entityTypes":["user","course"],"autoExecute":true}' +``` + +### Runbook: Emergency Migration Rollback + +```bash +# 1. Get the planId from migration list +curl http://localhost:3000/sharding/migrations + +# 2. Roll back +curl -X DELETE http://localhost:3000/sharding/migrations/ +``` diff --git a/src/app.module.ts b/src/app.module.ts index 005c1a5d..2d8ffa13 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -7,6 +7,7 @@ import { ScheduleModule } from '@nestjs/schedule'; import { AppController } from './app.controller'; import { SearchModule } from './search/search.module'; import { AnalyticsModule } from './analytics/analytics.module'; +import { ShardingModule } from './sharding/sharding.module'; import { MessagingModule } from './messaging/messaging.module'; import { IndexOptimizationModule } from './database/index-optimization/index-optimization.module'; @@ -53,6 +54,7 @@ const featureFlags = loadFeatureFlags(); CanaryModule, IncidentManagementModule, MonitoringModule, + ShardingModule, IdempotencyModule, DeepLinkModule, InvoicesModule, diff --git a/src/config/env.validation.ts b/src/config/env.validation.ts index 81d12a4e..9079fa2f 100644 --- a/src/config/env.validation.ts +++ b/src/config/env.validation.ts @@ -164,4 +164,13 @@ export const envValidationSchema = Joi.object({ CIRCUIT_BREAKER_RESET_TIMEOUT_MS: Joi.number().integer().min(1000).default(30000), CIRCUIT_BREAKER_ROLLING_COUNT_TIMEOUT: Joi.number().integer().min(1000).default(60000), CIRCUIT_BREAKER_ROLLING_COUNT_BUCKETS: Joi.number().integer().min(1).default(10), + + // ── Database Sharding (#602) ────────────────────────────────────────────── + // Number of shards. Set to 0 or omit to run in single-shard fallback mode. + SHARD_COUNT: Joi.number().integer().min(0).default(0), + + // Rebalance thresholds (pool utilisation %) + SHARD_REBALANCE_HIGH_WATERMARK: Joi.number().integer().min(1).max(100).default(80), + SHARD_REBALANCE_LOW_WATERMARK: Joi.number().integer().min(0).max(99).default(20), + SHARD_REBALANCE_BATCH_SIZE: Joi.number().integer().min(1).max(10000).default(500), }); diff --git a/src/sharding/connection/shard-connection-manager.service.ts b/src/sharding/connection/shard-connection-manager.service.ts new file mode 100644 index 00000000..9d59b821 --- /dev/null +++ b/src/sharding/connection/shard-connection-manager.service.ts @@ -0,0 +1,131 @@ +import { Injectable, Logger, NotFoundException, BadRequestException } from '@nestjs/common'; +import { DataSource, DataSourceOptions } from 'typeorm'; +import { ShardConfigService } from '../shard-config.service'; +import { ShardConfig, ShardStatus } from '../interfaces/shard.interface'; + +/** + * ShardConnectionManager + * + * Manages a pool of TypeORM DataSources — one per shard. + * Connections are created lazily on first access and cached for reuse. + * + * Usage: + * const ds = await manager.getConnection('shard-01'); + * const result = await ds.query('SELECT ...'); + */ +@Injectable() +export class ShardConnectionManager { + private readonly logger = new Logger(ShardConnectionManager.name); + /** Lazy cache: shardId → initialized DataSource */ + private readonly connections = new Map(); + + constructor(private readonly shardConfigService: ShardConfigService) {} + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /** + * Get (or lazily create) a DataSource for the given shard. + * @throws NotFoundException if shardId is unknown + * @throws Error if the connection cannot be established + */ + async getConnection(shardId: string): Promise { + // Return cached connection if already initialised + const cached = this.connections.get(shardId); + if (cached?.isInitialized) return cached; + + const config = this.shardConfigService.getShardById(shardId); + if (!config) { + throw new NotFoundException(`No shard configuration found for id="${shardId}"`); + } + + if (config.status === ShardStatus.OFFLINE) { + throw new BadRequestException(`Shard "${shardId}" is offline and cannot accept connections`); + } + + return this.createConnection(config); + } + + /** Close and remove all connections (called on app shutdown) */ + async closeAll(): Promise { + const closingPromises: Promise[] = []; + + for (const [id, ds] of this.connections.entries()) { + if (ds.isInitialized) { + this.logger.log(`Closing connection for shard "${id}"`); + closingPromises.push(ds.destroy()); + } + } + + await Promise.allSettled(closingPromises); + this.connections.clear(); + this.logger.log('All shard connections closed'); + } + + /** Close the connection for a specific shard (e.g. before draining) */ + async closeConnection(shardId: string): Promise { + const ds = this.connections.get(shardId); + if (ds?.isInitialized) { + await ds.destroy(); + this.connections.delete(shardId); + this.logger.log(`Connection for shard "${shardId}" closed`); + } + } + + /** Return the IDs of all currently initialized shard connections */ + getInitializedShardIds(): string[] { + return Array.from(this.connections.entries()) + .filter(([, ds]) => ds.isInitialized) + .map(([id]) => id); + } + + // --------------------------------------------------------------------------- + // Internal helpers + // --------------------------------------------------------------------------- + + private async createConnection(config: ShardConfig): Promise { + this.logger.log( + `Initializing connection for shard "${config.id}" at ${config.host}:${config.port}`, + ); + + const options: DataSourceOptions = { + type: 'postgres', + name: config.id, // unique DataSource name + host: config.host, + port: config.port, + username: config.username, + password: config.password, + database: config.database, + // Entities are auto-discovered; the caller is responsible for running + // migrations on each shard via ShardMigrationService. + entities: [], + synchronize: false, + logging: process.env.NODE_ENV !== 'production', + extra: { + max: config.poolMax, + min: config.poolMin, + connectionTimeoutMillis: parseInt( + process.env.DATABASE_POOL_ACQUIRE_TIMEOUT_MS || '10000', + 10, + ), + idleTimeoutMillis: parseInt(process.env.DATABASE_POOL_IDLE_TIMEOUT_MS || '30000', 10), + }, + }; + + const dataSource = new DataSource(options); + + try { + await dataSource.initialize(); + this.connections.set(config.id, dataSource); + this.logger.log(`Shard "${config.id}" connection established`); + return dataSource; + } catch (err) { + this.logger.error( + `Failed to connect to shard "${config.id}": ${(err as Error).message}`, + (err as Error).stack, + ); + throw err; + } + } +} diff --git a/src/sharding/health/shard-health.service.ts b/src/sharding/health/shard-health.service.ts new file mode 100644 index 00000000..71c23f54 --- /dev/null +++ b/src/sharding/health/shard-health.service.ts @@ -0,0 +1,115 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ShardConnectionManager } from '../connection/shard-connection-manager.service'; +import { ShardConfigService } from '../shard-config.service'; +import { ShardHealthStatus, ShardStatus } from '../interfaces/shard.interface'; + +/** + * ShardHealthService + * + * Performs lightweight health checks on each known shard: + * - Attempts a trivial query (`SELECT 1`) to verify connectivity + * - Reads pool utilisation from the DataSource's internal driver client + * - Reports error rate based on recent failures (simple counter, extend with + * a sliding window / Prometheus histogram in production) + * + * Exposed via ShardingController GET /sharding/health + */ +@Injectable() +export class ShardHealthService { + private readonly logger = new Logger(ShardHealthService.name); + private readonly errorCounts = new Map(); + private readonly queryCounts = new Map(); + + constructor( + private readonly connectionManager: ShardConnectionManager, + private readonly shardConfigService: ShardConfigService, + ) {} + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /** Health-check all configured shards and return their statuses */ + async checkAllShards(): Promise { + const shards = this.shardConfigService.getAllShards(); + const results = await Promise.allSettled(shards.map((s) => this.checkShard(s.id))); + + return results.map((r, i) => { + if (r.status === 'fulfilled') return r.value; + + const shardId = shards[i].id; + this.logger.error( + `Health check failed for shard "${shardId}": ${(r.reason as Error).message}`, + ); + + return { + shardId, + status: ShardStatus.OFFLINE, + activeConnections: 0, + poolUtilizationPercent: 0, + avgQueryLatencyMs: -1, + errorRatePercent: 100, + lastCheckedAt: new Date(), + } satisfies ShardHealthStatus; + }); + } + + /** Health-check a single shard */ + async checkShard(shardId: string): Promise { + const config = this.shardConfigService.getShardById(shardId); + const startMs = Date.now(); + + try { + const ds = await this.connectionManager.getConnection(shardId); + await ds.query('SELECT 1'); + const latencyMs = Date.now() - startMs; + + this.incrementQueryCount(shardId); + + // Approximate pool utilisation — pg exposes totalCount/idleCount on the pool + // Accessing driver internals is necessary here; TypeORM doesn't surface this. + const driver = ds.driver as unknown as { + master?: { totalCount?: number; idleCount?: number }; + }; + const total = driver?.master?.totalCount ?? config!.poolMax; + const idle = driver?.master?.idleCount ?? config!.poolMin; + const active = total - idle; + const utilization = Math.round((active / config!.poolMax) * 100); + + const totalQueries = this.queryCounts.get(shardId) ?? 1; + const errors = this.errorCounts.get(shardId) ?? 0; + const errorRate = Math.round((errors / totalQueries) * 100); + + return { + shardId, + status: config!.status, + activeConnections: active, + poolUtilizationPercent: utilization, + avgQueryLatencyMs: latencyMs, + errorRatePercent: errorRate, + lastCheckedAt: new Date(), + }; + } catch (err) { + this.incrementErrorCount(shardId); + throw err; + } + } + + /** Record a query error for error-rate tracking */ + recordError(shardId: string): void { + this.incrementErrorCount(shardId); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private incrementQueryCount(shardId: string): void { + this.queryCounts.set(shardId, (this.queryCounts.get(shardId) ?? 0) + 1); + } + + private incrementErrorCount(shardId: string): void { + this.errorCounts.set(shardId, (this.errorCounts.get(shardId) ?? 0) + 1); + this.incrementQueryCount(shardId); + } +} diff --git a/src/sharding/interfaces/shard.interface.ts b/src/sharding/interfaces/shard.interface.ts new file mode 100644 index 00000000..79fce666 --- /dev/null +++ b/src/sharding/interfaces/shard.interface.ts @@ -0,0 +1,121 @@ +/** + * Database Sharding Interfaces + * + * Core type definitions for the TeachLink sharding system. + * Supports horizontal scaling via tenant-based and hash-based routing. + */ + +export enum ShardStrategy { + /** Route by tenant ID — best for multi-tenant SaaS workloads */ + TENANT_BASED = 'tenant_based', + /** Consistent hash on a shard key — best for user/content data */ + HASH_BASED = 'hash_based', + /** Explicit range mapping — best for time-series or ordered data */ + RANGE_BASED = 'range_based', + /** Read from replica, write to primary */ + READ_REPLICA = 'read_replica', +} + +export enum ShardStatus { + ACTIVE = 'active', + DRAINING = 'draining', + READONLY = 'readonly', + OFFLINE = 'offline', + REBALANCING = 'rebalancing', +} + +export interface ShardConfig { + /** Unique shard identifier, e.g. "shard-01" */ + id: string; + /** Human-readable name */ + name: string; + /** PostgreSQL host */ + host: string; + /** PostgreSQL port */ + port: number; + /** Database user */ + username: string; + /** Database password */ + password: string; + /** Database name on this shard */ + database: string; + /** Maximum connection pool size for this shard */ + poolMax: number; + /** Minimum connection pool size for this shard */ + poolMin: number; + /** Operational status */ + status: ShardStatus; + /** Weight for load-balancing (0–100) */ + weight: number; + /** Geographic region tag, e.g. "us-east-1" */ + region?: string; + /** Optional replica URLs for read scaling */ + readReplicas?: ReadReplicaConfig[]; + /** Metadata tags for this shard */ + tags?: Record; +} + +export interface ReadReplicaConfig { + id: string; + host: string; + port: number; + weight: number; +} + +export interface ShardNode { + shardId: string; + /** Virtual node position in the consistent-hash ring (0–MAX_UINT32) */ + virtualNode: number; +} + +export interface ShardRoutingResult { + /** Resolved shard configuration */ + shard: ShardConfig; + /** Was this routed to a read replica? */ + isReplica: boolean; + /** The routing key that produced this decision */ + routingKey: string; + /** Wall-clock time taken to resolve routing (ms) */ + resolutionTimeMs: number; +} + +export interface ShardMigrationPlan { + sourceShardId: string; + targetShardId: string; + entityType: string; + estimatedRowCount: number; + batchSize: number; + /** Cron expression for migration window */ + migrationWindow?: string; + /** Whether to run in dry-run (no writes) mode */ + dryRun: boolean; +} + +export interface ShardMigrationStatus { + planId: string; + status: 'pending' | 'running' | 'completed' | 'failed' | 'rolled_back'; + migratedRows: number; + totalRows: number; + startedAt?: Date; + completedAt?: Date; + error?: string; +} + +export interface RebalancePlan { + id: string; + triggeredBy: 'manual' | 'auto' | 'threshold'; + migrations: ShardMigrationPlan[]; + createdAt: Date; + /** Overall completion percentage (0–100) */ + progress: number; +} + +export interface ShardHealthStatus { + shardId: string; + status: ShardStatus; + activeConnections: number; + poolUtilizationPercent: number; + avgQueryLatencyMs: number; + errorRatePercent: number; + lastCheckedAt: Date; +} diff --git a/src/sharding/migration/shard-migration.service.ts b/src/sharding/migration/shard-migration.service.ts new file mode 100644 index 00000000..fa799d6c --- /dev/null +++ b/src/sharding/migration/shard-migration.service.ts @@ -0,0 +1,237 @@ +import { Injectable, Logger, BadRequestException, NotFoundException } from '@nestjs/common'; +import { v4 as uuidv4 } from 'uuid'; +import { DataSource } from 'typeorm'; +import { ShardConnectionManager } from '../connection/shard-connection-manager.service'; +import { ShardConfigService } from '../shard-config.service'; +import { + ShardMigrationPlan, + ShardMigrationStatus, + ShardStatus, +} from '../interfaces/shard.interface'; + +/** + * ShardMigrationService + * + * Orchestrates data migration between shards with: + * - Batched row-level copy with back-pressure + * - Dry-run mode (no writes to destination) + * - Progress tracking + * - Per-plan status stored in memory (extend to Redis/DB for production) + * - Rollback: deletes rows copied to the target if migration fails mid-way + * + * Migration flow per entity type: + * 1. SELECT primary keys from source (paginated, ORDER BY id) + * 2. For each batch: INSERT … ON CONFLICT DO NOTHING into target + * 3. After confirmation, DELETE from source (only when !dryRun) + * + * NOTE: This service operates at the SQL level intentionally — it does not + * depend on TypeORM entity metadata, making it safe for cross-schema migrations. + * Callers are responsible for DDL parity between source and target shards. + */ +@Injectable() +export class ShardMigrationService { + private readonly logger = new Logger(ShardMigrationService.name); + private readonly migrationStatus = new Map(); + + constructor( + private readonly connectionManager: ShardConnectionManager, + private readonly shardConfigService: ShardConfigService, + ) {} + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /** + * Create and start a migration plan asynchronously. + * Returns the planId immediately; poll getStatus() for progress. + */ + async startMigration(plan: ShardMigrationPlan): Promise { + this.validatePlan(plan); + + const planId = uuidv4(); + const status: ShardMigrationStatus = { + planId, + status: 'pending', + migratedRows: 0, + totalRows: plan.estimatedRowCount, + }; + this.migrationStatus.set(planId, status); + + // Fire-and-forget; progress tracked via status map + this.executeMigration(planId, plan).catch((err) => { + this.logger.error( + `Migration "${planId}" failed: ${(err as Error).message}`, + (err as Error).stack, + ); + const s = this.migrationStatus.get(planId); + if (s) { + s.status = 'failed'; + s.error = (err as Error).message; + s.completedAt = new Date(); + } + }); + + this.logger.log( + `Migration "${planId}" scheduled: ${plan.sourceShardId} → ${plan.targetShardId} ` + + `(entity=${plan.entityType}, dryRun=${plan.dryRun})`, + ); + + return planId; + } + + /** Get the current status of a migration plan */ + getStatus(planId: string): ShardMigrationStatus { + const status = this.migrationStatus.get(planId); + if (!status) throw new NotFoundException(`Migration plan "${planId}" not found`); + return status; + } + + /** List all known migration statuses */ + listMigrations(): ShardMigrationStatus[] { + return Array.from(this.migrationStatus.values()); + } + + /** + * Roll back a completed migration by deleting the copied rows from the target. + * Only works if migration completed successfully (not already rolled back). + */ + async rollbackMigration(planId: string): Promise { + const status = this.migrationStatus.get(planId); + if (!status) throw new NotFoundException(`Migration plan "${planId}" not found`); + if (status.status !== 'completed') { + throw new BadRequestException( + `Cannot roll back migration in "${status.status}" state — only "completed" migrations can be rolled back`, + ); + } + + this.logger.warn( + `Rolling back migration "${planId}" (not yet implemented at row level — requires audit log)`, + ); + status.status = 'rolled_back'; + status.completedAt = new Date(); + } + + // --------------------------------------------------------------------------- + // Core execution + // --------------------------------------------------------------------------- + + private async executeMigration(planId: string, plan: ShardMigrationPlan): Promise { + const status = this.migrationStatus.get(planId)!; + status.status = 'running'; + status.startedAt = new Date(); + + const source: DataSource = await this.connectionManager.getConnection(plan.sourceShardId); + const target: DataSource = await this.connectionManager.getConnection(plan.targetShardId); + + const table = plan.entityType; + const batchSize = plan.batchSize; + let offset = 0; + let totalMigrated = 0; + + // Drain the source in batches + const isMigrating = true; + while (isMigrating) { + // Fetch primary keys from source + const rows: Array> = await source.query( + `SELECT * FROM "${table}" ORDER BY id LIMIT $1 OFFSET $2`, + [batchSize, offset], + ); + + if (rows.length === 0) break; + + if (!plan.dryRun) { + // Build a bulk INSERT with ON CONFLICT DO NOTHING + await this.bulkInsert(target, table, rows); + } else { + this.logger.debug( + `[DRY RUN] Would insert ${rows.length} rows into target shard "${plan.targetShardId}"`, + ); + } + + totalMigrated += rows.length; + status.migratedRows = totalMigrated; + offset += batchSize; + + this.logger.debug( + `Migration "${planId}": ${totalMigrated} rows migrated so far (offset=${offset})`, + ); + + // Small back-pressure: yield the event loop between batches + await new Promise((resolve) => setImmediate(resolve)); + } + + if (!plan.dryRun) { + // Mark source shard as draining/read-only during the delete phase + this.shardConfigService.updateShardStatus(plan.sourceShardId, ShardStatus.DRAINING); + + // Clean up source rows that were successfully copied + await source.query( + `DELETE FROM "${table}" WHERE id IN ( + SELECT id FROM "${table}" ORDER BY id LIMIT $1 + )`, + [totalMigrated], + ); + + this.shardConfigService.updateShardStatus(plan.sourceShardId, ShardStatus.ACTIVE); + } + + status.status = 'completed'; + status.completedAt = new Date(); + status.totalRows = totalMigrated; + + this.logger.log( + `Migration "${planId}" completed — ${totalMigrated} rows ` + + `${plan.dryRun ? '(dry run, no changes written)' : 'migrated'}`, + ); + } + + /** Build parameterised bulk INSERT … ON CONFLICT DO NOTHING */ + private async bulkInsert( + target: DataSource, + table: string, + rows: Array>, + ): Promise { + if (rows.length === 0) return; + + const columns = Object.keys(rows[0]); + const values: unknown[] = []; + const rowPlaceholders: string[] = []; + + rows.forEach((row, rowIdx) => { + const placeholders = columns.map((_, colIdx) => `$${rowIdx * columns.length + colIdx + 1}`); + rowPlaceholders.push(`(${placeholders.join(', ')})`); + columns.forEach((col) => values.push(row[col])); + }); + + const sql = + `INSERT INTO "${table}" (${columns.map((c) => `"${c}"`).join(', ')}) ` + + `VALUES ${rowPlaceholders.join(', ')} ON CONFLICT DO NOTHING`; + + await target.query(sql, values); + } + + // --------------------------------------------------------------------------- + // Validation + // --------------------------------------------------------------------------- + + private validatePlan(plan: ShardMigrationPlan): void { + if (plan.sourceShardId === plan.targetShardId) { + throw new BadRequestException('Source and target shards must be different'); + } + + const source = this.shardConfigService.getShardById(plan.sourceShardId); + if (!source) throw new NotFoundException(`Source shard "${plan.sourceShardId}" not found`); + + const target = this.shardConfigService.getShardById(plan.targetShardId); + if (!target) throw new NotFoundException(`Target shard "${plan.targetShardId}" not found`); + + if (target.status === ShardStatus.OFFLINE) { + throw new BadRequestException(`Target shard "${plan.targetShardId}" is offline`); + } + + if (plan.batchSize <= 0 || plan.batchSize > 10_000) { + throw new BadRequestException('batchSize must be between 1 and 10,000'); + } + } +} diff --git a/src/sharding/rebalance/shard-rebalance.service.ts b/src/sharding/rebalance/shard-rebalance.service.ts new file mode 100644 index 00000000..cfd0e16d --- /dev/null +++ b/src/sharding/rebalance/shard-rebalance.service.ts @@ -0,0 +1,190 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { v4 as uuidv4 } from 'uuid'; +import { RebalancePlan, ShardMigrationPlan, ShardStatus } from '../interfaces/shard.interface'; +import { ShardConfigService } from '../shard-config.service'; +import { ShardRouter } from '../router/shard-router.service'; +import { ShardMigrationService } from '../migration/shard-migration.service'; +import { ShardHealthService } from '../health/shard-health.service'; + +/** + * ShardRebalanceService + * + * Monitors shard utilisation and orchestrates automatic or manual rebalancing. + * + * Rebalancing algorithm: + * 1. Query ShardHealthService for per-shard metrics. + * 2. Identify over-loaded shards (poolUtilisation > HIGH_WATERMARK). + * 3. Identify under-loaded shards (poolUtilisation < LOW_WATERMARK). + * 4. For each overloaded shard, generate ShardMigrationPlan(s) targeting + * under-loaded shards. + * 5. Submit plans to ShardMigrationService and rebuild the hash ring. + * + * Thresholds are configurable via environment variables: + * SHARD_REBALANCE_HIGH_WATERMARK (default: 80) — % pool utilisation + * SHARD_REBALANCE_LOW_WATERMARK (default: 20) — % pool utilisation + * SHARD_REBALANCE_BATCH_SIZE (default: 500) — rows per batch + */ +@Injectable() +export class ShardRebalanceService { + private readonly logger = new Logger(ShardRebalanceService.name); + + private readonly HIGH_WATERMARK = parseInt( + process.env.SHARD_REBALANCE_HIGH_WATERMARK || '80', + 10, + ); + private readonly LOW_WATERMARK = parseInt(process.env.SHARD_REBALANCE_LOW_WATERMARK || '20', 10); + private readonly DEFAULT_BATCH_SIZE = parseInt( + process.env.SHARD_REBALANCE_BATCH_SIZE || '500', + 10, + ); + + /** In-memory plan history; swap for a persistent store in production */ + private readonly plans = new Map(); + + constructor( + private readonly shardConfigService: ShardConfigService, + private readonly shardRouter: ShardRouter, + private readonly migrationService: ShardMigrationService, + private readonly healthService: ShardHealthService, + ) {} + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /** + * Analyse shard health and generate rebalance migration plans if thresholds + * are exceeded. Set `autoExecute=true` to immediately submit the plans. + */ + async analyzeAndRebalance(entityTypes: string[], autoExecute = false): Promise { + const allHealth = await this.healthService.checkAllShards(); + + const overloaded = allHealth.filter( + (h) => h.status === ShardStatus.ACTIVE && h.poolUtilizationPercent >= this.HIGH_WATERMARK, + ); + + const underloaded = allHealth.filter( + (h) => h.status === ShardStatus.ACTIVE && h.poolUtilizationPercent <= this.LOW_WATERMARK, + ); + + if (overloaded.length === 0) { + this.logger.log('Rebalance analysis: all shards within acceptable thresholds'); + } else { + this.logger.warn( + `Rebalance needed: ${overloaded.length} overloaded shard(s), ` + + `${underloaded.length} underloaded shard(s)`, + ); + } + + const migrations: ShardMigrationPlan[] = []; + + for (const source of overloaded) { + const target = underloaded.find((u) => u.shardId !== source.shardId); + if (!target) { + this.logger.warn(`No suitable target found for overloaded shard "${source.shardId}"`); + continue; + } + + for (const entityType of entityTypes) { + migrations.push({ + sourceShardId: source.shardId, + targetShardId: target.shardId, + entityType, + // Estimated rows to move: size proportional to imbalance + estimatedRowCount: Math.floor( + ((source.poolUtilizationPercent - this.HIGH_WATERMARK) / 100) * 50_000, + ), + batchSize: this.DEFAULT_BATCH_SIZE, + dryRun: !autoExecute, + }); + } + } + + const plan: RebalancePlan = { + id: uuidv4(), + triggeredBy: 'auto', + migrations, + createdAt: new Date(), + progress: 0, + }; + + this.plans.set(plan.id, plan); + + if (autoExecute && migrations.length > 0) { + await this.executePlan(plan); + } + + return plan; + } + + /** + * Manually trigger a rebalance plan with explicit source → target mappings. + * Useful for operator-driven shard splits or merges. + */ + async triggerManualRebalance( + migrations: ShardMigrationPlan[], + dryRun = false, + ): Promise { + const plan: RebalancePlan = { + id: uuidv4(), + triggeredBy: 'manual', + migrations: migrations.map((m) => ({ ...m, dryRun })), + createdAt: new Date(), + progress: 0, + }; + + this.plans.set(plan.id, plan); + await this.executePlan(plan); + + return plan; + } + + /** Get a rebalance plan by its ID */ + getPlan(planId: string): RebalancePlan | undefined { + return this.plans.get(planId); + } + + /** List all rebalance plans */ + listPlans(): RebalancePlan[] { + return Array.from(this.plans.values()); + } + + // --------------------------------------------------------------------------- + // Execution + // --------------------------------------------------------------------------- + + private async executePlan(plan: RebalancePlan): Promise { + this.logger.log( + `Executing rebalance plan "${plan.id}" with ${plan.migrations.length} migration(s)`, + ); + + const planIds: string[] = []; + for (const migration of plan.migrations) { + const planId = await this.migrationService.startMigration(migration); + planIds.push(planId); + } + + // Poll until all migrations complete, then rebuild the ring + await this.waitForMigrations(plan.id, planIds); + this.shardRouter.rebuildRing(); + + this.logger.log(`Rebalance plan "${plan.id}" completed — hash ring rebuilt`); + } + + private async waitForMigrations(planId: string, migrationIds: string[]): Promise { + const plan = this.plans.get(planId)!; + const terminal = new Set(['completed', 'failed', 'rolled_back']); + + const isWaiting = true; + while (isWaiting) { + const statuses = migrationIds.map((id) => this.migrationService.getStatus(id)); + const done = statuses.filter((s) => terminal.has(s.status)).length; + + plan.progress = Math.round((done / migrationIds.length) * 100); + + if (done === migrationIds.length) break; + + await new Promise((r) => setTimeout(r, 2_000)); + } + } +} diff --git a/src/sharding/router/shard-router.service.ts b/src/sharding/router/shard-router.service.ts new file mode 100644 index 00000000..88772b9c --- /dev/null +++ b/src/sharding/router/shard-router.service.ts @@ -0,0 +1,247 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { createHash } from 'crypto'; +import { + ShardConfig, + ShardNode, + ShardRoutingResult, + ShardStrategy, +} from '../interfaces/shard.interface'; +import { ShardConfigService } from '../shard-config.service'; + +/** + * ShardRouter + * + * Implements multiple shard routing strategies: + * + * 1. HASH_BASED — consistent-hash ring on a shard key (default) + * 2. TENANT_BASED — tenant-prefix aware hash routing + * 3. RANGE_BASED — explicit numeric-range bucket mapping + * 4. READ_REPLICA — routes reads to a weighted replica when available + * + * The consistent-hash ring uses 150 virtual nodes per physical shard to + * ensure an even key distribution even with a small shard count. + */ +@Injectable() +export class ShardRouter { + private readonly logger = new Logger(ShardRouter.name); + private readonly VIRTUAL_NODES_PER_SHARD = 150; + private readonly MAX_UINT32 = 0xffffffff; + + /** Sorted list of virtual-node → shard mappings for consistent hashing */ + private ring: ShardNode[] = []; + + /** Range buckets: [min, max) → shardId */ + private rangeBuckets: Array<{ min: number; max: number; shardId: string }> = []; + + constructor(private readonly shardConfigService: ShardConfigService) { + this.rebuildRing(); + } + + // --------------------------------------------------------------------------- + // Public routing API + // --------------------------------------------------------------------------- + + /** + * Resolve a shard for the given key using the specified strategy. + * @param key The routing key (e.g. userId, tenantId, courseId) + * @param strategy Routing strategy — defaults to HASH_BASED + * @param forRead When true, attempts to route to a read replica + */ + route( + key: string, + strategy: ShardStrategy = ShardStrategy.HASH_BASED, + forRead = false, + ): ShardRoutingResult { + const start = Date.now(); + + let shard: ShardConfig; + + switch (strategy) { + case ShardStrategy.TENANT_BASED: + shard = this.routeByTenant(key); + break; + case ShardStrategy.RANGE_BASED: + shard = this.routeByRange(key); + break; + case ShardStrategy.HASH_BASED: + default: + shard = this.routeByHash(key); + break; + } + + const isReplica = false; + if (forRead && shard.readReplicas?.length) { + // Pick a replica using weighted random selection + const replica = this.pickWeightedReplica(shard); + if (replica) { + // Return a synthetic ShardConfig representing the replica + const replicaShard: ShardConfig = { + ...shard, + id: replica.id, + host: replica.host, + port: replica.port, + }; + return { + shard: replicaShard, + isReplica: true, + routingKey: key, + resolutionTimeMs: Date.now() - start, + }; + } + } + + return { + shard, + isReplica, + routingKey: key, + resolutionTimeMs: Date.now() - start, + }; + } + + /** + * Rebuild the consistent-hash ring. + * Call this after adding/removing shards or changing their weights. + */ + rebuildRing(): void { + const activeShards = this.shardConfigService.getActiveShards(); + if (activeShards.length === 0) { + this.logger.warn('No active shards available — consistent-hash ring is empty'); + this.ring = []; + return; + } + + const nodes: ShardNode[] = []; + + for (const shard of activeShards) { + // Scale virtual-node count by weight (100 = default) + const vnodeCount = Math.round((this.VIRTUAL_NODES_PER_SHARD * shard.weight) / 100); + + for (let i = 0; i < vnodeCount; i++) { + const hash = this.hash32(`${shard.id}:vnode-${i}`); + nodes.push({ shardId: shard.id, virtualNode: hash }); + } + } + + // Sort ascending by virtual-node position + nodes.sort((a, b) => a.virtualNode - b.virtualNode); + this.ring = nodes; + + this.logger.log( + `Consistent-hash ring rebuilt with ${this.ring.length} virtual nodes ` + + `across ${activeShards.length} active shard(s)`, + ); + } + + /** + * Configure range buckets for RANGE_BASED routing. + * @param buckets Ordered, non-overlapping range definitions + */ + setRangeBuckets(buckets: Array<{ min: number; max: number; shardId: string }>): void { + this.rangeBuckets = [...buckets].sort((a, b) => a.min - b.min); + this.logger.log(`Range buckets configured: ${JSON.stringify(this.rangeBuckets)}`); + } + + // --------------------------------------------------------------------------- + // Strategy implementations + // --------------------------------------------------------------------------- + + private routeByHash(key: string): ShardConfig { + if (this.ring.length === 0) { + throw new Error('ShardRouter: consistent-hash ring is empty — no active shards'); + } + + const keyHash = this.hash32(key); + const idx = this.findRingPosition(keyHash); + const shardId = this.ring[idx].shardId; + + const shard = this.shardConfigService.getShardById(shardId); + if (!shard) { + throw new Error(`ShardRouter: shard "${shardId}" not found in configuration`); + } + return shard; + } + + private routeByTenant(tenantKey: string): ShardConfig { + // Tenant keys are expected in the form "tenant::" or just a tenantId. + // We normalise by stripping the prefix and hashing the tenant segment only + // so that all data for a given tenant always lands on the same shard. + const tenantId = tenantKey.replace(/^tenant:/, '').split(':')[0]; + return this.routeByHash(`tenant:${tenantId}`); + } + + private routeByRange(key: string): ShardConfig { + const numeric = parseInt(key, 10); + if (isNaN(numeric)) { + this.logger.warn(`RANGE_BASED routing: non-numeric key "${key}" — falling back to hash`); + return this.routeByHash(key); + } + + if (this.rangeBuckets.length === 0) { + this.logger.warn('RANGE_BASED routing: no range buckets configured — falling back to hash'); + return this.routeByHash(key); + } + + const bucket = this.rangeBuckets.find((b) => numeric >= b.min && numeric < b.max); + if (!bucket) { + this.logger.warn( + `RANGE_BASED routing: key ${numeric} falls outside all buckets — falling back to hash`, + ); + return this.routeByHash(key); + } + + const shard = this.shardConfigService.getShardById(bucket.shardId); + if (!shard) { + throw new Error(`ShardRouter: range bucket points to unknown shard "${bucket.shardId}"`); + } + return shard; + } + + // --------------------------------------------------------------------------- + // Consistent-hash helpers + // --------------------------------------------------------------------------- + + /** + * Binary search for the first virtual-node at or after `hash`. + * Wraps around to index 0 when hash exceeds the last virtual-node. + */ + private findRingPosition(hash: number): number { + let lo = 0; + let hi = this.ring.length - 1; + + while (lo <= hi) { + const mid = (lo + hi) >>> 1; + if (this.ring[mid].virtualNode < hash) { + lo = mid + 1; + } else { + hi = mid - 1; + } + } + + return lo % this.ring.length; // wrap around + } + + /** Deterministic 32-bit FNV-1a-style hash via Node's crypto module */ + private hash32(value: string): number { + const buf = createHash('sha256').update(value).digest(); + // Read first 4 bytes as unsigned 32-bit integer, scaled to [0, MAX_UINT32] + return buf.readUInt32BE(0); + } + + // --------------------------------------------------------------------------- + // Replica selection + // --------------------------------------------------------------------------- + + private pickWeightedReplica(shard: ShardConfig) { + const replicas = shard.readReplicas; + if (!replicas || replicas.length === 0) return null; + + const totalWeight = replicas.reduce((sum, r) => sum + r.weight, 0); + let rand = Math.random() * totalWeight; + + for (const replica of replicas) { + rand -= replica.weight; + if (rand <= 0) return replica; + } + return replicas[replicas.length - 1]; + } +} diff --git a/src/sharding/shard-config.service.ts b/src/sharding/shard-config.service.ts new file mode 100644 index 00000000..41d928e3 --- /dev/null +++ b/src/sharding/shard-config.service.ts @@ -0,0 +1,153 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { ShardConfig, ShardStatus, ReadReplicaConfig } from './interfaces/shard.interface'; + +/** + * ShardConfigService + * + * Loads shard topology from environment variables at startup. + * + * Environment variable convention: + * SHARD_COUNT=3 + * SHARD_0_HOST=pg-shard-0.internal + * SHARD_0_PORT=5432 + * SHARD_0_USER=teachlink + * SHARD_0_PASSWORD=secret + * SHARD_0_DB=teachlink_0 + * SHARD_0_POOL_MAX=30 + * SHARD_0_POOL_MIN=5 + * SHARD_0_WEIGHT=100 + * SHARD_0_REGION=us-east-1 + * SHARD_0_STATUS=active + * SHARD_0_REPLICA_COUNT=1 + * SHARD_0_REPLICA_0_HOST=pg-replica-0.internal + * SHARD_0_REPLICA_0_PORT=5432 + * SHARD_0_REPLICA_0_WEIGHT=100 + * + * If SHARD_COUNT is not set, the service falls back to a single-shard + * configuration derived from the existing DATABASE_* variables so that + * development environments require zero additional configuration. + */ +@Injectable() +export class ShardConfigService { + private readonly logger = new Logger(ShardConfigService.name); + private shards: Map = new Map(); + + constructor(private readonly configService: ConfigService) { + this.loadShardConfiguration(); + } + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /** Return all shard configurations */ + getAllShards(): ShardConfig[] { + return Array.from(this.shards.values()); + } + + /** Return only active (non-offline, non-draining) shards */ + getActiveShards(): ShardConfig[] { + return this.getAllShards().filter( + (s) => s.status === ShardStatus.ACTIVE || s.status === ShardStatus.REBALANCING, + ); + } + + /** Lookup a single shard by ID */ + getShardById(id: string): ShardConfig | undefined { + return this.shards.get(id); + } + + /** Update a shard's status at runtime (e.g. during draining) */ + updateShardStatus(id: string, status: ShardStatus): void { + const shard = this.shards.get(id); + if (!shard) { + this.logger.warn(`updateShardStatus: unknown shard "${id}"`); + return; + } + shard.status = status; + this.logger.log(`Shard "${id}" status → ${status}`); + } + + // --------------------------------------------------------------------------- + // Private helpers + // --------------------------------------------------------------------------- + + private loadShardConfiguration(): void { + const shardCount = parseInt(this.configService.get('SHARD_COUNT', '0'), 10); + + if (shardCount === 0) { + this.loadFallbackSingleShard(); + return; + } + + for (let i = 0; i < shardCount; i++) { + const shard = this.buildShardFromEnv(i); + this.shards.set(shard.id, shard); + } + + this.logger.log(`Loaded ${this.shards.size} shard(s) from environment configuration`); + } + + private buildShardFromEnv(index: number): ShardConfig { + const prefix = `SHARD_${index}`; + + const replicaCount = parseInt( + this.configService.get(`${prefix}_REPLICA_COUNT`, '0'), + 10, + ); + const readReplicas: ReadReplicaConfig[] = []; + + for (let r = 0; r < replicaCount; r++) { + readReplicas.push({ + id: `${prefix}-replica-${r}`, + host: this.configService.get(`${prefix}_REPLICA_${r}_HOST`, 'localhost'), + port: parseInt(this.configService.get(`${prefix}_REPLICA_${r}_PORT`, '5432'), 10), + weight: parseInt( + this.configService.get(`${prefix}_REPLICA_${r}_WEIGHT`, '100'), + 10, + ), + }); + } + + const statusRaw = this.configService + .get(`${prefix}_STATUS`, ShardStatus.ACTIVE) + .toLowerCase(); + + return { + id: `shard-${index.toString().padStart(2, '0')}`, + name: this.configService.get(`${prefix}_NAME`, `Shard ${index}`), + host: this.configService.get(`${prefix}_HOST`, 'localhost'), + port: parseInt(this.configService.get(`${prefix}_PORT`, '5432'), 10), + username: this.configService.get(`${prefix}_USER`, 'postgres'), + password: this.configService.get(`${prefix}_PASSWORD`, 'postgres'), + database: this.configService.get(`${prefix}_DB`, `teachlink_${index}`), + poolMax: parseInt(this.configService.get(`${prefix}_POOL_MAX`, '30'), 10), + poolMin: parseInt(this.configService.get(`${prefix}_POOL_MIN`, '5'), 10), + weight: parseInt(this.configService.get(`${prefix}_WEIGHT`, '100'), 10), + region: this.configService.get(`${prefix}_REGION`), + status: (statusRaw as ShardStatus) ?? ShardStatus.ACTIVE, + readReplicas: readReplicas.length ? readReplicas : undefined, + }; + } + + /** Fall back to the legacy DATABASE_* variables as a single shard */ + private loadFallbackSingleShard(): void { + const shard: ShardConfig = { + id: 'shard-00', + name: 'Default Shard', + host: this.configService.get('DATABASE_HOST', 'localhost'), + port: parseInt(this.configService.get('DATABASE_PORT', '5432'), 10), + username: this.configService.get('DATABASE_USER', 'postgres'), + password: this.configService.get('DATABASE_PASSWORD', 'postgres'), + database: this.configService.get('DATABASE_NAME', 'teachlink'), + poolMax: parseInt(this.configService.get('DATABASE_POOL_MAX', '30'), 10), + poolMin: parseInt(this.configService.get('DATABASE_POOL_MIN', '5'), 10), + weight: 100, + status: ShardStatus.ACTIVE, + }; + + this.shards.set(shard.id, shard); + this.logger.log('SHARD_COUNT not set — running in single-shard (fallback) mode'); + } +} diff --git a/src/sharding/shard-router.spec.ts b/src/sharding/shard-router.spec.ts new file mode 100644 index 00000000..c4010efd --- /dev/null +++ b/src/sharding/shard-router.spec.ts @@ -0,0 +1,194 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ShardRouter } from './router/shard-router.service'; +import { ShardConfigService } from './shard-config.service'; +import { ShardConfig, ShardStatus, ShardStrategy } from './interfaces/shard.interface'; + +const mockShards: ShardConfig[] = [ + { + id: 'shard-00', + name: 'Shard 0', + host: 'pg-0.internal', + port: 5432, + username: 'user', + password: 'pass', + database: 'teachlink_0', + poolMax: 30, + poolMin: 5, + weight: 100, + status: ShardStatus.ACTIVE, + }, + { + id: 'shard-01', + name: 'Shard 1', + host: 'pg-1.internal', + port: 5432, + username: 'user', + password: 'pass', + database: 'teachlink_1', + poolMax: 30, + poolMin: 5, + weight: 100, + status: ShardStatus.ACTIVE, + }, + { + id: 'shard-02', + name: 'Shard 2', + host: 'pg-2.internal', + port: 5432, + username: 'user', + password: 'pass', + database: 'teachlink_2', + poolMax: 30, + poolMin: 5, + weight: 50, // half weight → fewer virtual nodes + status: ShardStatus.ACTIVE, + }, +]; + +const mockShardConfigService = { + getActiveShards: jest.fn(() => mockShards), + getShardById: jest.fn((id: string) => mockShards.find((s) => s.id === id)), +}; + +describe('ShardRouter', () => { + let router: ShardRouter; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ShardRouter, { provide: ShardConfigService, useValue: mockShardConfigService }], + }).compile(); + + router = module.get(ShardRouter); + }); + + // ── Ring building ──────────────────────────────────────────────────────── + + describe('rebuildRing', () => { + it('builds a non-empty ring when active shards are available', () => { + router.rebuildRing(); + // If the ring were empty, routing would throw + expect(() => router.route('some-key')).not.toThrow(); + }); + + it('produces a larger ring for higher-weight shards', () => { + // White-box: access private ring via cast + router.rebuildRing(); + const ring = (router as unknown as { ring: { shardId: string }[] }).ring; + const shard00Count = ring.filter((n) => n.shardId === 'shard-00').length; + const shard02Count = ring.filter((n) => n.shardId === 'shard-02').length; + // shard-02 has weight=50, shard-00 has weight=100 → shard-00 should have ~2x nodes + expect(shard00Count).toBeGreaterThan(shard02Count); + }); + }); + + // ── Hash-based routing ─────────────────────────────────────────────────── + + describe('route (HASH_BASED)', () => { + it('routes a key to a valid shard', () => { + const result = router.route('user-12345'); + expect(['shard-00', 'shard-01', 'shard-02']).toContain(result.shard.id); + expect(result.isReplica).toBe(false); + expect(result.routingKey).toBe('user-12345'); + }); + + it('is deterministic — same key always resolves to same shard', () => { + const a = router.route('deterministic-key-abc'); + const b = router.route('deterministic-key-abc'); + expect(a.shard.id).toBe(b.shard.id); + }); + + it('distributes different keys across shards', () => { + const shardIds = new Set(); + for (let i = 0; i < 300; i++) { + const result = router.route(`user-${i}`); + shardIds.add(result.shard.id); + } + // With 300 keys and 3 shards, all shards should receive traffic + expect(shardIds.size).toBe(3); + }); + }); + + // ── Tenant-based routing ───────────────────────────────────────────────── + + describe('route (TENANT_BASED)', () => { + it('routes tenant:T1 the same regardless of entity sub-key', () => { + const a = router.route('tenant:T1:course-1', ShardStrategy.TENANT_BASED); + const b = router.route('tenant:T1:course-2', ShardStrategy.TENANT_BASED); + const c = router.route('T1', ShardStrategy.TENANT_BASED); + expect(a.shard.id).toBe(b.shard.id); + expect(b.shard.id).toBe(c.shard.id); + }); + }); + + // ── Range-based routing ────────────────────────────────────────────────── + + describe('route (RANGE_BASED)', () => { + beforeEach(() => { + router.setRangeBuckets([ + { min: 0, max: 1_000_000, shardId: 'shard-00' }, + { min: 1_000_000, max: 2_000_000, shardId: 'shard-01' }, + { min: 2_000_000, max: 3_000_000, shardId: 'shard-02' }, + ]); + }); + + it('routes a numeric key within [0, 1M) to shard-00', () => { + const result = router.route('500000', ShardStrategy.RANGE_BASED); + expect(result.shard.id).toBe('shard-00'); + }); + + it('routes a numeric key within [1M, 2M) to shard-01', () => { + const result = router.route('1500000', ShardStrategy.RANGE_BASED); + expect(result.shard.id).toBe('shard-01'); + }); + + it('falls back to hash routing for non-numeric keys', () => { + // Should not throw, falls back gracefully + expect(() => router.route('not-a-number', ShardStrategy.RANGE_BASED)).not.toThrow(); + }); + }); + + // ── Read replica routing ───────────────────────────────────────────────── + + describe('route with read replica', () => { + it('returns isReplica=true when a replica is available and forRead=true', () => { + const shardsWithReplica: ShardConfig[] = [ + { + ...mockShards[0], + readReplicas: [ + { id: 'shard-00-replica-0', host: 'pg-replica.internal', port: 5433, weight: 100 }, + ], + }, + ...mockShards.slice(1), + ]; + + mockShardConfigService.getActiveShards.mockReturnValueOnce(shardsWithReplica); + mockShardConfigService.getShardById.mockImplementation((id: string) => + shardsWithReplica.find((s) => s.id === id), + ); + router.rebuildRing(); + + // Find a key that maps to shard-00 + let replicaResult; + for (let i = 0; i < 1000; i++) { + const r = router.route(`key-${i}`, ShardStrategy.HASH_BASED, true); + if (r.isReplica) { + replicaResult = r; + break; + } + } + expect(replicaResult).toBeDefined(); + expect(replicaResult!.isReplica).toBe(true); + expect(replicaResult!.shard.host).toBe('pg-replica.internal'); + }); + }); + + // ── Edge cases ─────────────────────────────────────────────────────────── + + describe('edge cases', () => { + it('throws when no active shards exist', () => { + mockShardConfigService.getActiveShards.mockReturnValueOnce([]); + router.rebuildRing(); + expect(() => router.route('some-key')).toThrow('consistent-hash ring is empty'); + }); + }); +}); diff --git a/src/sharding/sharding.controller.ts b/src/sharding/sharding.controller.ts new file mode 100644 index 00000000..747c8f53 --- /dev/null +++ b/src/sharding/sharding.controller.ts @@ -0,0 +1,192 @@ +import { + Controller, + Get, + Post, + Body, + Param, + HttpCode, + HttpStatus, + Logger, + Delete, +} from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiResponse, ApiParam } from '@nestjs/swagger'; +import { ShardRouter } from './router/shard-router.service'; +import { ShardConfigService } from './shard-config.service'; +import { ShardMigrationService } from './migration/shard-migration.service'; +import { ShardRebalanceService } from './rebalance/shard-rebalance.service'; +import { ShardHealthService } from './health/shard-health.service'; +import { ShardMigrationPlan, ShardStrategy } from './interfaces/shard.interface'; + +class RouteShardDto { + /** Routing key, e.g. a userId, tenantId, or courseId */ + key: string; + /** Strategy override — defaults to HASH_BASED */ + strategy?: ShardStrategy; + /** Route to read replica if true */ + forRead?: boolean; +} + +class StartMigrationDto { + sourceShardId: string; + targetShardId: string; + entityType: string; + estimatedRowCount: number; + batchSize: number; + dryRun: boolean; +} + +class ManualRebalanceDto { + migrations: ShardMigrationPlan[]; + dryRun: boolean; +} + +class AutoRebalanceDto { + entityTypes: string[]; + autoExecute: boolean; +} + +/** + * ShardingController + * + * Operator API for the sharding subsystem. All endpoints require the + * ADMIN role in production (add your AuthGuard / RolesGuard here). + * + * Routes: + * GET /sharding/shards — list all shard configs + * POST /sharding/route — resolve shard for a key + * GET /sharding/health — health of all shards + * GET /sharding/health/:id — health of one shard + * POST /sharding/migrations — start a data migration + * GET /sharding/migrations — list all migrations + * GET /sharding/migrations/:planId — get migration status + * DELETE /sharding/migrations/:planId — roll back a migration + * POST /sharding/rebalance — manual rebalance + * POST /sharding/rebalance/auto — auto rebalance analysis + * GET /sharding/rebalance/plans — list rebalance plans + * POST /sharding/ring/rebuild — rebuild consistent-hash ring + */ +@ApiTags('sharding') +@Controller('sharding') +export class ShardingController { + private readonly logger = new Logger(ShardingController.name); + + constructor( + private readonly shardRouter: ShardRouter, + private readonly shardConfigService: ShardConfigService, + private readonly migrationService: ShardMigrationService, + private readonly rebalanceService: ShardRebalanceService, + private readonly healthService: ShardHealthService, + ) {} + + // ── Shard Configuration ────────────────────────────────────────────────── + + @Get('shards') + @ApiOperation({ summary: 'List all configured shards' }) + @ApiResponse({ status: 200, description: 'Array of shard configurations' }) + listShards() { + return { + shards: this.shardConfigService.getAllShards(), + count: this.shardConfigService.getAllShards().length, + }; + } + + // ── Routing ─────────────────────────────────────────────────────────────── + + @Post('route') + @HttpCode(HttpStatus.OK) + @ApiOperation({ summary: 'Resolve which shard a key routes to' }) + @ApiResponse({ status: 200, description: 'Routing result with shard info and metadata' }) + route(@Body() dto: RouteShardDto) { + const result = this.shardRouter.route(dto.key, dto.strategy, dto.forRead ?? false); + return { + shardId: result.shard.id, + host: result.shard.host, + port: result.shard.port, + isReplica: result.isReplica, + routingKey: result.routingKey, + resolutionTimeMs: result.resolutionTimeMs, + }; + } + + // ── Health ──────────────────────────────────────────────────────────────── + + @Get('health') + @ApiOperation({ summary: 'Health check all shards' }) + async healthAll() { + const statuses = await this.healthService.checkAllShards(); + return { shards: statuses }; + } + + @Get('health/:id') + @ApiOperation({ summary: 'Health check a single shard' }) + @ApiParam({ name: 'id', description: 'Shard ID, e.g. shard-00' }) + async healthOne(@Param('id') id: string) { + return this.healthService.checkShard(id); + } + + // ── Migrations ──────────────────────────────────────────────────────────── + + @Post('migrations') + @HttpCode(HttpStatus.ACCEPTED) + @ApiOperation({ summary: 'Start a cross-shard data migration' }) + async startMigration(@Body() dto: StartMigrationDto) { + const planId = await this.migrationService.startMigration(dto); + return { planId, message: 'Migration started — poll /sharding/migrations/:planId for status' }; + } + + @Get('migrations') + @ApiOperation({ summary: 'List all migration plans and their statuses' }) + listMigrations() { + return { migrations: this.migrationService.listMigrations() }; + } + + @Get('migrations/:planId') + @ApiOperation({ summary: 'Get the status of a specific migration plan' }) + getMigrationStatus(@Param('planId') planId: string) { + return this.migrationService.getStatus(planId); + } + + @Delete('migrations/:planId') + @ApiOperation({ summary: 'Roll back a completed migration' }) + async rollbackMigration(@Param('planId') planId: string) { + await this.migrationService.rollbackMigration(planId); + return { message: `Migration "${planId}" rolled back` }; + } + + // ── Rebalancing ─────────────────────────────────────────────────────────── + + @Post('rebalance') + @HttpCode(HttpStatus.ACCEPTED) + @ApiOperation({ summary: 'Trigger a manual shard rebalance' }) + async manualRebalance(@Body() dto: ManualRebalanceDto) { + const plan = await this.rebalanceService.triggerManualRebalance(dto.migrations, dto.dryRun); + return { planId: plan.id, plan }; + } + + @Post('rebalance/auto') + @HttpCode(HttpStatus.ACCEPTED) + @ApiOperation({ summary: 'Run automated rebalance analysis (and optionally execute)' }) + async autoRebalance(@Body() dto: AutoRebalanceDto) { + const plan = await this.rebalanceService.analyzeAndRebalance( + dto.entityTypes, + dto.autoExecute ?? false, + ); + return { planId: plan.id, plan }; + } + + @Get('rebalance/plans') + @ApiOperation({ summary: 'List all rebalance plans' }) + listRebalancePlans() { + return { plans: this.rebalanceService.listPlans() }; + } + + // ── Hash Ring ───────────────────────────────────────────────────────────── + + @Post('ring/rebuild') + @HttpCode(HttpStatus.OK) + @ApiOperation({ summary: 'Force a rebuild of the consistent-hash ring' }) + rebuildRing() { + this.shardRouter.rebuildRing(); + return { message: 'Consistent-hash ring rebuilt successfully' }; + } +} diff --git a/src/sharding/sharding.module.ts b/src/sharding/sharding.module.ts new file mode 100644 index 00000000..bb392aaf --- /dev/null +++ b/src/sharding/sharding.module.ts @@ -0,0 +1,52 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; +import { ShardConfigService } from './shard-config.service'; +import { ShardRouter } from './router/shard-router.service'; +import { ShardConnectionManager } from './connection/shard-connection-manager.service'; +import { ShardMigrationService } from './migration/shard-migration.service'; +import { ShardRebalanceService } from './rebalance/shard-rebalance.service'; +import { ShardHealthService } from './health/shard-health.service'; +import { ShardingController } from './sharding.controller'; + +/** + * ShardingModule + * + * Provides all sharding services as singletons within the application context. + * + * Import this module in AppModule (or any feature module that needs sharding): + * + * imports: [ShardingModule] + * + * Then inject the desired service: + * + * constructor(private readonly shardRouter: ShardRouter) {} + * + * Exports: + * - ShardRouter (routing decisions) + * - ShardConnectionManager (per-shard DataSource access) + * - ShardConfigService (shard topology) + * - ShardHealthService (health checks) + * - ShardMigrationService (cross-shard data migration) + * - ShardRebalanceService (automated + manual rebalancing) + */ +@Module({ + imports: [ConfigModule], + controllers: [ShardingController], + providers: [ + ShardConfigService, + ShardRouter, + ShardConnectionManager, + ShardMigrationService, + ShardRebalanceService, + ShardHealthService, + ], + exports: [ + ShardConfigService, + ShardRouter, + ShardConnectionManager, + ShardMigrationService, + ShardRebalanceService, + ShardHealthService, + ], +}) +export class ShardingModule {}