diff --git a/README.md b/README.md index 5931b1c..a3eec6e 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@
  • Row Level Security (RLS) template for preventing data access on unauthorized rows
  • Point in Time Rollbacks for rolling back your database to any minute in the past 30 days
  • Data Replication to scale reads beyond the 1,000 RPS limitation
  • -
  • Data Syncing between local source and your database
  • +
  • Data Syncing — pull external Postgres/MySQL into internal SQLite (Issue #72)
  • Scheduled CRON Tasks to execute code at desired intervals
  • diff --git a/plugins/data-sync/README.md b/plugins/data-sync/README.md new file mode 100644 index 0000000..0045d0d --- /dev/null +++ b/plugins/data-sync/README.md @@ -0,0 +1,77 @@ +# Data Sync Plugin + +Implements **Issue #72**: incremental pull sync from an external relational database into StarbaseDB’s internal SQLite (Durable Object). + +## Features + +- **Pull sync** with cursor by monotonic `id` or `updated_at`-style timestamp +- **Batching** + pagination on the upstream query +- **Idempotent writes** via SQLite `INSERT … ON CONFLICT DO UPDATE` +- **Metadata** in `tmp_data_sync_meta`, **logs** in `tmp_data_sync_log` +- **Retries** with exponential backoff on read failures +- **Admin-only HTTP API** for status, manual sync, and debug probe +- **PostgreSQL** (TCP via `pg` / Outerbase SDK) and **Hyperdrive** (`postgres` package); **MySQL** uses the same job format with MySQL-specific paging SQL + +## Configuration + +Cloudflare Workers do not receive arbitrary TOML tables as `env`. Use **`[vars]`** (and **secrets** for passwords). + +Conceptually this matches a `[plugins.data-sync]` block like: + +```toml +# Not loaded automatically — document your intent; mirror with [vars] below. +# [plugins.data-sync] +# sync_interval = 300 +# tables = ["users", "products"] +``` + +### `wrangler.toml` example + +```toml +[vars] +DATA_SYNC_ENABLED = "true" +DATA_SYNC_INTERVAL_SECONDS = "300" +# JSON array of job objects (escape quotes in TOML or use wrangler secret / dashboard) +DATA_SYNC_JOBS = """[{"externalTable":"public.users","localTable":"users","cursorKind":"incremental_id","cursorColumn":"id","pkColumns":["id"]}]""" +DATA_SYNC_BATCH_SIZE = "250" +DATA_SYNC_MAX_RETRIES = "3" +``` + +Also set the existing Starbase **external DB** variables (`EXTERNAL_DB_TYPE`, `EXTERNAL_DB_HOST`, etc.) or **Hyperdrive** so `dataSource.external` is populated. + +### Job object + +| Field | Description | +| --------------- | ---------------------------------------------------------------------------------------------- | +| `externalTable` | Upstream table, e.g. `public.users` | +| `localTable` | SQLite table in the DO (must already exist with compatible columns + PK/UNIQUE on `pkColumns`) | +| `cursorKind` | `incremental_id` or `timestamp` | +| `cursorColumn` | Column for paging (`id`, `updated_at`, …) | +| `pkColumns` | Primary key columns for upsert | +| `columnMap` | Optional map `{ "external_col": "sqlite_col" }` | + +## HTTP API (Bearer **admin** token) + +| Method | Path | Description | +| ------ | ------------------------ | ----------------------------------------------------------------------------------------- | +| `GET` | `/data-sync/sync-status` | Metadata rows + recent log | +| `POST` | `/data-sync/sync-data` | Run sync. Optional JSON body: `{ "tables": ["users"] }` to filter by **local** table name | +| `GET` | `/data-sync/debug` | Redacted external config + `SELECT 1` probe | + +## Scheduled sync (CRON) + +`DATA_SYNC_INTERVAL_SECONDS` is **informational** only. Schedule sync with: + +1. **Cloudflare Workers Cron Triggers** — add a cron in `wrangler.toml` and `fetch` your worker with a route that triggers `POST /data-sync/sync-data` (same host) using the admin token, **or** +2. **`CronPlugin`** — in `src/index.ts`, register a task that performs an HTTP callback to `/data-sync/sync-data`, **or** +3. External scheduler (GitHub Actions, etc.) calling the same endpoint. + +## Local demo + +See `example/README.md` and `example/docker-compose.yml`. + +## Edge notes + +- Keep **batch sizes** modest (default 250, max 1000) to respect CPU/time limits. +- **Hyperdrive** is recommended for Postgres from Workers in production. +- Ensure the SQLite side has a **UNIQUE** or **PRIMARY KEY** constraint matching `pkColumns` so `ON CONFLICT` works. diff --git a/plugins/data-sync/adapter.ts b/plugins/data-sync/adapter.ts new file mode 100644 index 0000000..4b63305 --- /dev/null +++ b/plugins/data-sync/adapter.ts @@ -0,0 +1,110 @@ +/** + * External database read adapter — PostgreSQL / Hyperdrive first; pluggable interface. + * Writes always go to SQLite via Durable Object RPC (not through this adapter). + */ +import postgres from 'postgres' +import type { DataSource, ExternalDatabaseSource } from '../../src/types' +import type { StarbaseDBConfiguration } from '../../src/handler' +import { executeSDKQuery } from '../../src/operation' + +export interface ExternalReadAdapter { + /** Run a read-only statement on the external database; returns row objects */ + query = Record>( + sql: string, + params?: unknown[] + ): Promise +} + +function isHyperdrivePostgres( + ext: ExternalDatabaseSource | undefined +): ext is Extract { + return ( + !!ext && + ext.dialect === 'postgresql' && + 'connectionString' in ext && + !!ext.connectionString + ) +} + +function isHostPostgres( + ext: ExternalDatabaseSource | undefined +): ext is Extract< + ExternalDatabaseSource, + { dialect: 'postgresql'; host: string } +> { + return !!ext && ext.dialect === 'postgresql' && 'host' in ext && !!ext.host +} + +/** + * Create a reader for the external DB configured on `dataSource.external`. + * Reuses StarbaseDB's existing drivers (pg via SDK) or `postgres` for Hyperdrive. + */ +export function createExternalReadAdapter( + dataSource: DataSource, + config: StarbaseDBConfiguration, + ctx?: ExecutionContext +): ExternalReadAdapter | null { + const ext = dataSource.external + if (!ext) return null + + if (isHyperdrivePostgres(ext)) { + return { + async query(sql, params = []) { + const sqlConn = postgres(ext.connectionString, { + max: 1, + fetch_types: false, + }) + try { + return (await sqlConn.unsafe( + sql, + params as never[] + )) as Record[] + } finally { + if (ctx) ctx.waitUntil(sqlConn.end()) + else await sqlConn.end() + } + }, + } + } + + if (isHostPostgres(ext) || ext.dialect === 'mysql') { + const readSource: DataSource = { + ...dataSource, + source: 'external', + external: ext, + } + return { + async query(sql, params = []) { + const rows = await executeSDKQuery({ + sql, + params, + dataSource: readSource, + config, + }) + return (Array.isArray(rows) ? rows : []) as Record< + string, + unknown + >[] + }, + } + } + + return null +} + +/** Quote a PostgreSQL identifier (schema/table/column) — validates simple names */ +export function quotePgIdent(ident: string): string { + const parts = ident + .split('.') + .map((p) => p.trim()) + .filter(Boolean) + const safe = /^[a-zA-Z_][a-zA-Z0-9_]*$/ + for (const p of parts) { + if (!safe.test(p)) { + throw new Error( + `[data-sync] Invalid PostgreSQL identifier segment: ${p}` + ) + } + } + return parts.map((p) => `"${p.replace(/"/g, '""')}"`).join('.') +} diff --git a/plugins/data-sync/config.test.ts b/plugins/data-sync/config.test.ts new file mode 100644 index 0000000..282a63d --- /dev/null +++ b/plugins/data-sync/config.test.ts @@ -0,0 +1,32 @@ +import { describe, expect, it } from 'vitest' +import { loadDataSyncConfig } from './config' + +describe('loadDataSyncConfig', () => { + it('parses jobs JSON', () => { + const jobs = [ + { + externalTable: 'public.users', + localTable: 'users', + cursorKind: 'incremental_id', + cursorColumn: 'id', + pkColumns: ['id'], + }, + ] + const c = loadDataSyncConfig({ + DATA_SYNC_ENABLED: 'true', + DATA_SYNC_JOBS: JSON.stringify(jobs), + DATA_SYNC_BATCH_SIZE: '100', + }) + expect(c.enabled).toBe(true) + expect(c.jobs).toHaveLength(1) + expect(c.jobs[0].localTable).toBe('users') + expect(c.batchSize).toBe(100) + }) + + it('defaults when env empty', () => { + const c = loadDataSyncConfig({}) + expect(c.enabled).toBe(false) + expect(c.jobs).toEqual([]) + expect(c.syncIntervalSeconds).toBe(300) + }) +}) diff --git a/plugins/data-sync/config.ts b/plugins/data-sync/config.ts new file mode 100644 index 0000000..a3fe497 --- /dev/null +++ b/plugins/data-sync/config.ts @@ -0,0 +1,119 @@ +import type { DataSyncPluginConfig, TableSyncJob } from './types' + +/** Environment / wrangler vars consumed by the data-sync plugin */ +export interface DataSyncEnv { + DATA_SYNC_ENABLED?: string + /** Seconds — informational; use Worker CRON or CronPlugin to invoke sync */ + DATA_SYNC_INTERVAL_SECONDS?: string + /** + * JSON array of TableSyncJob objects. + * Example: + * [{"externalTable":"public.users","localTable":"users","cursorKind":"incremental_id","cursorColumn":"id","pkColumns":["id"]}] + */ + DATA_SYNC_JOBS?: string + DATA_SYNC_BATCH_SIZE?: string + DATA_SYNC_MAX_RETRIES?: string + DATA_SYNC_RETRY_BASE_MS?: string +} + +const DEFAULT_BATCH = 250 +const DEFAULT_RETRIES = 3 +const DEFAULT_RETRY_BASE_MS = 400 + +function truthy(v: string | undefined): boolean { + if (!v) return false + const s = v.toLowerCase().trim() + return s === '1' || s === 'true' || s === 'yes' || s === 'on' +} + +function parseJobsJson(raw: string | undefined): TableSyncJob[] { + if (!raw?.trim()) return [] + try { + const parsed = JSON.parse(raw) as unknown + if (!Array.isArray(parsed)) { + console.error('[data-sync] DATA_SYNC_JOBS must be a JSON array') + return [] + } + const jobs: TableSyncJob[] = [] + for (const item of parsed) { + if (!item || typeof item !== 'object') continue + const j = item as Record + const externalTable = String(j.externalTable ?? '') + const localTable = String( + j.localTable ?? externalTable.split('.').pop() ?? '' + ) + const cursorKind = + j.cursorKind === 'timestamp' ? 'timestamp' : 'incremental_id' + const cursorColumn = String( + j.cursorColumn ?? + (cursorKind === 'timestamp' ? 'updated_at' : 'id') + ) + const pkColumns = Array.isArray(j.pkColumns) + ? j.pkColumns.map((x) => String(x)) + : j.pkColumns + ? [String(j.pkColumns)] + : ['id'] + const columnMap = + j.columnMap && typeof j.columnMap === 'object' + ? (j.columnMap as Record) + : undefined + if (!externalTable || !localTable) { + console.warn( + '[data-sync] Skipping job with missing externalTable/localTable', + item + ) + continue + } + jobs.push({ + externalTable, + localTable, + cursorKind, + cursorColumn, + pkColumns, + columnMap, + }) + } + return jobs + } catch (e) { + console.error('[data-sync] Failed to parse DATA_SYNC_JOBS:', e) + return [] + } +} + +/** + * Resolve plugin configuration from Worker environment variables. + * (Wrangler does not pass arbitrary TOML tables into `env`; use `vars` or secrets.) + */ +export function loadDataSyncConfig(env: DataSyncEnv): DataSyncPluginConfig { + const interval = Number(env.DATA_SYNC_INTERVAL_SECONDS ?? '300') + return { + enabled: truthy(env.DATA_SYNC_ENABLED), + syncIntervalSeconds: + Number.isFinite(interval) && interval > 0 ? interval : 300, + jobs: parseJobsJson(env.DATA_SYNC_JOBS), + batchSize: Math.min( + 1000, + Math.max( + 1, + Number(env.DATA_SYNC_BATCH_SIZE ?? DEFAULT_BATCH) || + DEFAULT_BATCH + ) + ), + maxRetries: Math.min( + 10, + Math.max( + 0, + Number(env.DATA_SYNC_MAX_RETRIES ?? DEFAULT_RETRIES) || + DEFAULT_RETRIES + ) + ), + retryBaseMs: Math.min( + 10_000, + Math.max( + 50, + Number(env.DATA_SYNC_RETRY_BASE_MS ?? DEFAULT_RETRY_BASE_MS) || + DEFAULT_RETRY_BASE_MS + ) + ), + } +} diff --git a/plugins/data-sync/example/README.md b/plugins/data-sync/example/README.md new file mode 100644 index 0000000..e18b5bc --- /dev/null +++ b/plugins/data-sync/example/README.md @@ -0,0 +1,71 @@ +# Data Sync — PostgreSQL example + +## 1. Start Postgres + +```bash +docker compose -f plugins/data-sync/example/docker-compose.yml up -d +``` + +Connection string (from host): + +`postgres://postgres:postgres@127.0.0.1:5433/syncdemo` + +## 2. Seed upstream schema + +```bash +psql "postgres://postgres:postgres@127.0.0.1:5433/syncdemo" -c " +CREATE TABLE IF NOT EXISTS public.products ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +INSERT INTO public.products (name) VALUES ('Widget A'), ('Widget B') +ON CONFLICT DO NOTHING; +" +``` + +## 3. Create matching SQLite tables (via StarbaseDB) + +After `pnpm dev`, with admin token: + +```bash +curl -s -X POST "http://127.0.0.1:8787/query" \ + -H "Authorization: Bearer ABC123" \ + -H "Content-Type: application/json" \ + -d "{\"sql\":\"CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY, name TEXT, updated_at TEXT)\"}" +``` + +## 4. Configure `wrangler.toml` (dev) + +Uncomment/set external DB vars to point at `127.0.0.1:5433`, and set: + +```toml +[vars] +DATA_SYNC_ENABLED = "true" +DATA_SYNC_JOBS = "[{\"externalTable\":\"public.products\",\"localTable\":\"products\",\"cursorKind\":\"timestamp\",\"cursorColumn\":\"updated_at\",\"pkColumns\":[\"id\"]}]" +``` + +## 5. Run sync + +```bash +curl -s -X POST "http://127.0.0.1:8787/data-sync/sync-data" \ + -H "Authorization: Bearer ABC123" \ + -H "Content-Type: application/json" \ + -d "{}" +``` + +## 6. Verify + +```bash +curl -s -X POST "http://127.0.0.1:8787/query" \ + -H "Authorization: Bearer ABC123" \ + -H "Content-Type: application/json" \ + -d "{\"sql\":\"SELECT * FROM products\"}" +``` + +## Integration test + +```bash +set DATA_SYNC_TEST_PG_URL=postgres://postgres:postgres@127.0.0.1:5433/syncdemo +pnpm vitest run plugins/data-sync/integration.pg.test.ts +``` diff --git a/plugins/data-sync/example/docker-compose.yml b/plugins/data-sync/example/docker-compose.yml new file mode 100644 index 0000000..83544be --- /dev/null +++ b/plugins/data-sync/example/docker-compose.yml @@ -0,0 +1,14 @@ +services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: syncdemo + ports: + - '5433:5432' + healthcheck: + test: ['CMD-SHELL', 'pg_isready -U postgres'] + interval: 5s + timeout: 5s + retries: 5 diff --git a/plugins/data-sync/index.ts b/plugins/data-sync/index.ts new file mode 100644 index 0000000..b83c3ee --- /dev/null +++ b/plugins/data-sync/index.ts @@ -0,0 +1,253 @@ +/** + * Data Sync Plugin — pull-based incremental sync from external RDBMS into StarbaseDB SQLite (DO). + * @see https://github.com/gittare/starbasedb — Issue #72 + */ +import { StarbaseApp, StarbaseDBConfiguration } from '../../src/handler' +import { StarbasePlugin } from '../../src/plugin' +import { createResponse } from '../../src/utils' +import type { DataSource } from '../../src/types' +import { loadDataSyncConfig, type DataSyncEnv } from './config' +import { createExternalReadAdapter } from './adapter' +import { ensureDataSyncTables, runFullSync } from './sync-engine' +import type { TableSyncJob } from './types' + +function getSqlDialect( + dataSource: DataSource +): 'postgresql' | 'mysql' | 'none' { + const ext = dataSource.external + if (!ext) return 'none' + if (ext.dialect === 'postgresql') return 'postgresql' + if (ext.dialect === 'mysql') return 'mysql' + return 'none' +} + +export class DataSyncPlugin extends StarbasePlugin { + pathPrefix = '/data-sync' + private dataSource?: DataSource + private honoConfig?: StarbaseDBConfiguration + private readonly workerEnv: DataSyncEnv + + constructor(workerEnv: DataSyncEnv) { + super('starbasedb:data-sync', { requiresAuth: true }) + this.workerEnv = workerEnv + } + + private requireAdmin() { + if (this.honoConfig?.role !== 'admin') { + return createResponse( + undefined, + 'Admin authorization required for data-sync routes.', + 403 + ) + } + return null + } + + override async register(app: StarbaseApp) { + app.use(async (c, next) => { + this.dataSource = c.get('dataSource') + this.honoConfig = c.get('config') + await next() + }) + + /** GET /data-sync/sync-status — metadata + recent log lines */ + app.get(`${this.pathPrefix}/sync-status`, async () => { + const denied = this.requireAdmin() + if (denied) return denied + if (!this.dataSource) { + return createResponse( + undefined, + 'Data source not initialized', + 500 + ) + } + + const pluginConfig = loadDataSyncConfig(this.workerEnv) + await ensureDataSyncTables(this.dataSource.rpc) + + const meta = (await this.dataSource.rpc.executeQuery({ + sql: `SELECT * FROM tmp_data_sync_meta ORDER BY table_name`, + params: [], + })) as Record[] + + const logs = (await this.dataSource.rpc.executeQuery({ + sql: `SELECT id, level, scope, message, created_at FROM tmp_data_sync_log ORDER BY id DESC LIMIT 100`, + params: [], + })) as Record[] + + return createResponse( + { + enabled: pluginConfig.enabled, + syncIntervalSeconds: pluginConfig.syncIntervalSeconds, + jobCount: pluginConfig.jobs.length, + meta, + logs, + }, + undefined, + 200 + ) + }) + + /** POST /data-sync/sync-data — run pull sync (optional body: { "tables": ["local_table_name"] }) */ + app.post(`${this.pathPrefix}/sync-data`, async (c) => { + const denied = this.requireAdmin() + if (denied) return denied + if (!this.dataSource || !this.honoConfig) { + return createResponse( + undefined, + 'Data source not initialized', + 500 + ) + } + + const pluginConfig = loadDataSyncConfig(this.workerEnv) + if (!pluginConfig.enabled) { + return createResponse( + { ok: false, message: 'DATA_SYNC_ENABLED is not set' }, + undefined, + 400 + ) + } + + if (!pluginConfig.jobs.length) { + return createResponse( + { + ok: false, + message: 'No jobs configured (DATA_SYNC_JOBS)', + }, + undefined, + 400 + ) + } + + let filter: string[] | undefined + try { + const body = await c.req.json().catch(() => ({})) + if (body && Array.isArray(body.tables)) { + filter = body.tables.map((t: unknown) => String(t)) + } + } catch { + filter = undefined + } + + let jobs: TableSyncJob[] = pluginConfig.jobs + if (filter?.length) { + jobs = jobs.filter((j) => filter!.includes(j.localTable)) + if (!jobs.length) { + return createResponse( + { + ok: false, + message: 'No jobs matched the tables filter', + }, + undefined, + 400 + ) + } + } + + const dialect = getSqlDialect(this.dataSource) + const adapter = createExternalReadAdapter( + this.dataSource, + this.honoConfig, + this.dataSource.executionContext + ) + + const summary = await runFullSync({ + jobs, + adapter, + dataSource: this.dataSource, + pluginConfig, + dialect, + }) + + return createResponse( + { ok: summary.overallStatus !== 'error', summary }, + undefined, + 200 + ) + }) + + /** GET /data-sync/debug — redacted config + connectivity probe */ + app.get(`${this.pathPrefix}/debug`, async () => { + const denied = this.requireAdmin() + if (denied) return denied + if (!this.dataSource || !this.honoConfig) { + return createResponse( + undefined, + 'Data source not initialized', + 500 + ) + } + + const pluginConfig = loadDataSyncConfig(this.workerEnv) + const dialect = getSqlDialect(this.dataSource) + const adapter = createExternalReadAdapter( + this.dataSource, + this.honoConfig + ) + + let probe: { ok: boolean; message?: string } = { + ok: false, + message: 'no adapter', + } + if (adapter) { + try { + const rows = await adapter.query<{ one: number }>( + dialect === 'mysql' + ? 'SELECT 1 AS one' + : 'SELECT 1 AS one' + ) + probe = { ok: rows.length > 0, message: 'SELECT 1 ok' } + } catch (e) { + probe = { + ok: false, + message: e instanceof Error ? e.message : String(e), + } + } + } + + const ext = this.dataSource.external + const redactedExternal = ext + ? { + dialect: ext.dialect, + host: 'host' in ext ? ext.host : undefined, + port: 'port' in ext ? ext.port : undefined, + database: 'database' in ext ? ext.database : undefined, + hasConnectionString: + 'connectionString' in ext && !!ext.connectionString, + } + : null + + return createResponse( + { + plugin: 'starbasedb:data-sync', + enabled: pluginConfig.enabled, + dialect, + batchSize: pluginConfig.batchSize, + jobsPreview: pluginConfig.jobs.map((j) => ({ + externalTable: j.externalTable, + localTable: j.localTable, + cursorKind: j.cursorKind, + cursorColumn: j.cursorColumn, + })), + external: redactedExternal, + probe, + }, + undefined, + 200 + ) + }) + } +} + +export { loadDataSyncConfig } from './config' +export type { DataSyncEnv } from './config' +export type * from './types' +export { createExternalReadAdapter } from './adapter' +export { + ensureDataSyncTables, + runFullSync, + mapValueForSqlite, + buildBatchUpsert, + assertSqliteIdent, +} from './sync-engine' diff --git a/plugins/data-sync/integration.pg.test.ts b/plugins/data-sync/integration.pg.test.ts new file mode 100644 index 0000000..8ffa860 --- /dev/null +++ b/plugins/data-sync/integration.pg.test.ts @@ -0,0 +1,37 @@ +/** + * Integration test: requires Docker PostgreSQL. + * Run: docker compose -f plugins/data-sync/example/docker-compose.yml up -d + * DATA_SYNC_TEST_PG_URL=postgres://postgres:postgres@127.0.0.1:5433/syncdemo pnpm vitest run plugins/data-sync/integration.pg.test.ts + */ +import { describe, it, expect, beforeAll, afterAll } from 'vitest' +import pg from 'pg' + +const url = process.env.DATA_SYNC_TEST_PG_URL + +describe.skipIf(!url)('data-sync PostgreSQL integration', () => { + let client: pg.Client + + beforeAll(async () => { + client = new pg.Client({ connectionString: url }) + await client.connect() + await client.query(` + CREATE SCHEMA IF NOT EXISTS demo; + DROP TABLE IF EXISTS demo.products; + CREATE TABLE demo.products ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + ); + INSERT INTO demo.products (name) VALUES ('alpha'), ('beta'); + `) + }) + + afterAll(async () => { + await client?.end().catch(() => {}) + }) + + it('reads seeded rows', async () => { + const r = await client.query('SELECT * FROM demo.products ORDER BY id') + expect(r.rows.length).toBeGreaterThanOrEqual(2) + }) +}) diff --git a/plugins/data-sync/meta.json b/plugins/data-sync/meta.json new file mode 100644 index 0000000..cc524c0 --- /dev/null +++ b/plugins/data-sync/meta.json @@ -0,0 +1,5 @@ +{ + "name": "data-sync", + "title": "Data Sync", + "description": "Pull-based incremental sync from PostgreSQL/MySQL (or Hyperdrive) into StarbaseDB SQLite." +} diff --git a/plugins/data-sync/sync-engine.test.ts b/plugins/data-sync/sync-engine.test.ts new file mode 100644 index 0000000..a418685 --- /dev/null +++ b/plugins/data-sync/sync-engine.test.ts @@ -0,0 +1,57 @@ +import { describe, expect, it } from 'vitest' +import { + assertSqliteIdent, + buildBatchUpsert, + mapValueForSqlite, +} from './sync-engine' +import type { TableSyncJob } from './types' + +describe('mapValueForSqlite', () => { + it('serializes bigint and Date', () => { + expect(mapValueForSqlite(1n)).toBe('1') + const d = new Date('2024-01-02T03:04:05.000Z') + expect(mapValueForSqlite(d)).toBe('2024-01-02T03:04:05.000Z') + }) + + it('JSON-encodes plain objects', () => { + expect(mapValueForSqlite({ a: 1 })).toBe('{"a":1}') + }) +}) + +describe('assertSqliteIdent', () => { + it('accepts valid identifiers', () => { + expect(() => assertSqliteIdent('users', 't')).not.toThrow() + expect(() => assertSqliteIdent('_x1', 't')).not.toThrow() + }) + + it('rejects invalid identifiers', () => { + expect(() => assertSqliteIdent('bad-name', 't')).toThrow() + expect(() => assertSqliteIdent("'; DROP--", 't')).toThrow() + }) +}) + +describe('buildBatchUpsert', () => { + const job: TableSyncJob = { + externalTable: 'public.users', + localTable: 'users', + cursorKind: 'incremental_id', + cursorColumn: 'id', + pkColumns: ['id'], + } + + it('builds INSERT ... ON CONFLICT for one row', () => { + const q = buildBatchUpsert(job, [{ id: 1, name: 'a' }]) + expect(q).not.toBeNull() + expect(q!.sql).toContain('INSERT INTO "users"') + expect(q!.sql).toContain('ON CONFLICT("id")') + expect(q!.params).toEqual([1, 'a']) + }) + + it('builds multi-row batch', () => { + const q = buildBatchUpsert(job, [ + { id: 1, name: 'a' }, + { id: 2, name: 'b' }, + ]) + expect(q!.params).toEqual([1, 'a', 2, 'b']) + }) +}) diff --git a/plugins/data-sync/sync-engine.ts b/plugins/data-sync/sync-engine.ts new file mode 100644 index 0000000..1b2faf6 --- /dev/null +++ b/plugins/data-sync/sync-engine.ts @@ -0,0 +1,495 @@ +import type { DataSource } from '../../src/types' +import type { + DataSyncPluginConfig, + SyncRunSummary, + TableSyncJob, + TableSyncResult, + SyncStatusState, +} from './types' +import type { ExternalReadAdapter } from './adapter' +import { quotePgIdent } from './adapter' + +export const META_TABLE = 'tmp_data_sync_meta' +export const LOG_TABLE = 'tmp_data_sync_log' + +const SQLITE_IDENT = /^[a-zA-Z_][a-zA-Z0-9_]*$/ + +export function assertSqliteIdent(name: string, label: string): void { + if (!SQLITE_IDENT.test(name)) { + throw new Error(`[data-sync] Invalid SQLite ${label}: ${name}`) + } +} + +/** Normalize driver values into SQLite-friendly primitives */ +export function mapValueForSqlite(value: unknown): unknown { + if (value === null || value === undefined) return null + if (typeof value === 'bigint') return value.toString() + if (value instanceof Date) return value.toISOString() + if (typeof value === 'object') { + try { + return JSON.stringify(value) + } catch { + return String(value) + } + } + return value +} + +export function buildRowForLocal( + row: Record, + job: TableSyncJob +): Record { + const out: Record = {} + if (job.columnMap) { + for (const [extCol, locCol] of Object.entries(job.columnMap)) { + assertSqliteIdent(locCol, 'column name') + if (Object.prototype.hasOwnProperty.call(row, extCol)) { + out[locCol] = mapValueForSqlite(row[extCol]) + } + } + return out + } + for (const [k, v] of Object.entries(row)) { + const loc = k + assertSqliteIdent(loc, 'column name') + out[loc] = mapValueForSqlite(v) + } + return out +} + +function quoteSqliteIdent(name: string): string { + assertSqliteIdent(name, 'identifier') + return `"${name.replace(/"/g, '""')}"` +} + +/** + * Build a multi-row UPSERT for SQLite (idempotent when PK matches). + */ +export function buildBatchUpsert( + job: TableSyncJob, + rows: Record[] +): { sql: string; params: unknown[] } | null { + if (!rows.length) return null + + const columns = Object.keys(rows[0]) + for (const r of rows) { + const keys = Object.keys(r) + if ( + keys.length !== columns.length || + keys.some((k) => !columns.includes(k)) + ) { + throw new Error('[data-sync] Inconsistent row shape in batch') + } + } + + for (const pk of job.pkColumns) { + assertSqliteIdent(pk, 'pk column') + } + + const tableSql = quoteSqliteIdent(job.localTable) + const colSql = columns.map(quoteSqliteIdent).join(', ') + const pkSql = job.pkColumns.map(quoteSqliteIdent).join(', ') + + const valueTuples: string[] = [] + const params: unknown[] = [] + for (const r of rows) { + const ph = columns.map(() => '?').join(', ') + valueTuples.push(`(${ph})`) + for (const c of columns) { + params.push(r[c] ?? null) + } + } + + const nonPk = columns.filter((c) => !job.pkColumns.includes(c)) + const updateClause = + nonPk.length > 0 + ? nonPk + .map( + (c) => + `${quoteSqliteIdent(c)} = excluded.${quoteSqliteIdent(c)}` + ) + .join(', ') + : job.pkColumns[0] + ? `${quoteSqliteIdent(job.pkColumns[0])} = excluded.${quoteSqliteIdent(job.pkColumns[0])}` + : '1 = 1' + + const sql = `INSERT INTO ${tableSql} (${colSql}) VALUES ${valueTuples.join(', ')} + ON CONFLICT(${pkSql}) DO UPDATE SET ${updateClause}` + + return { sql, params } +} + +export async function ensureDataSyncTables( + rpc: DataSource['rpc'] +): Promise { + await rpc.executeQuery({ + sql: ` + CREATE TABLE IF NOT EXISTS ${META_TABLE} ( + table_name TEXT NOT NULL PRIMARY KEY, + last_synced_at TEXT, + last_cursor_id TEXT, + last_cursor_ts TEXT, + sync_status TEXT NOT NULL DEFAULT 'idle', + error_message TEXT, + rows_last_run INTEGER DEFAULT 0, + updated_at TEXT DEFAULT (datetime('now')) + )`, + params: [], + }) + await rpc.executeQuery({ + sql: ` + CREATE TABLE IF NOT EXISTS ${LOG_TABLE} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + level TEXT NOT NULL, + scope TEXT NOT NULL, + message TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')) + )`, + params: [], + }) +} + +export async function appendLog( + rpc: DataSource['rpc'], + level: 'info' | 'warn' | 'error', + scope: string, + message: string +): Promise { + try { + await rpc.executeQuery({ + sql: `INSERT INTO ${LOG_TABLE} (level, scope, message) VALUES (?, ?, ?)`, + params: [level, scope, message], + }) + await rpc.executeQuery({ + sql: `DELETE FROM ${LOG_TABLE} WHERE id NOT IN ( + SELECT id FROM ${LOG_TABLE} ORDER BY id DESC LIMIT 500 + )`, + params: [], + }) + } catch (e) { + console.error('[data-sync] log insert failed:', e) + } +} + +async function withRetry( + fn: () => Promise, + opts: { maxRetries: number; baseMs: number } +): Promise { + let last: unknown + for (let attempt = 0; attempt <= opts.maxRetries; attempt++) { + try { + return await fn() + } catch (e) { + last = e + if (attempt === opts.maxRetries) break + const delay = opts.baseMs * Math.pow(2, attempt) + await new Promise((r) => setTimeout(r, delay)) + } + } + throw last +} + +function buildSelectPageSql( + job: TableSyncJob, + batchSize: number, + lastId: string | null, + lastTs: string | null +): { sql: string; params: unknown[] } { + const tableRef = quotePgIdent(job.externalTable) + const cursorCol = quotePgIdent(job.cursorColumn) + + if (job.cursorKind === 'incremental_id') { + const idParam = lastId ?? '0' + const sql = `SELECT * FROM ${tableRef} WHERE ${cursorCol}::text > $1::text ORDER BY ${cursorCol} ASC LIMIT ${batchSize}` + return { sql, params: [idParam] } + } + + const tsParam = lastTs ?? '1970-01-01T00:00:00.000Z' + const sql = `SELECT * FROM ${tableRef} WHERE ${cursorCol} > $1::timestamptz ORDER BY ${cursorCol} ASC LIMIT ${batchSize}` + return { sql, params: [tsParam] } +} + +/** MySQL variant (identifiers only; cursor comparison simplified) */ +function buildSelectPageSqlMysql( + job: TableSyncJob, + batchSize: number, + lastId: string | null, + lastTs: string | null +): { sql: string; params: unknown[] } { + const tableRef = job.externalTable + .split('.') + .map((p) => `\`${p.replace(/`/g, '')}\``) + .join('.') + const cursorCol = `\`${job.cursorColumn.replace(/`/g, '')}\`` + + if (job.cursorKind === 'incremental_id') { + const idParam = lastId ?? '0' + return { + sql: `SELECT * FROM ${tableRef} WHERE CAST(${cursorCol} AS CHAR) > ? ORDER BY ${cursorCol} ASC LIMIT ${batchSize}`, + params: [idParam], + } + } + const tsParam = lastTs ?? '1970-01-01 00:00:00' + return { + sql: `SELECT * FROM ${tableRef} WHERE ${cursorCol} > ? ORDER BY ${cursorCol} ASC LIMIT ${batchSize}`, + params: [tsParam], + } +} + +async function loadMeta( + rpc: DataSource['rpc'], + localTable: string +): Promise<{ + last_cursor_id: string | null + last_cursor_ts: string | null +}> { + const rows = (await rpc.executeQuery({ + sql: `SELECT last_cursor_id, last_cursor_ts FROM ${META_TABLE} WHERE table_name = ?`, + params: [localTable], + })) as Record[] + const row = rows[0] + return { + last_cursor_id: + row?.last_cursor_id != null ? String(row.last_cursor_id) : null, + last_cursor_ts: + row?.last_cursor_ts != null ? String(row.last_cursor_ts) : null, + } +} + +async function saveMeta( + rpc: DataSource['rpc'], + job: TableSyncJob, + patch: { + last_cursor_id?: string | null + last_cursor_ts?: string | null + sync_status: SyncStatusState + error_message?: string | null + rows_last_run: number + } +): Promise { + await rpc.executeQuery({ + sql: ` + INSERT INTO ${META_TABLE} (table_name, last_synced_at, last_cursor_id, last_cursor_ts, sync_status, error_message, rows_last_run, updated_at) + VALUES (?, datetime('now'), ?, ?, ?, ?, ?, datetime('now')) + ON CONFLICT(table_name) DO UPDATE SET + last_synced_at = excluded.last_synced_at, + last_cursor_id = COALESCE(excluded.last_cursor_id, last_cursor_id), + last_cursor_ts = COALESCE(excluded.last_cursor_ts, last_cursor_ts), + sync_status = excluded.sync_status, + error_message = excluded.error_message, + rows_last_run = excluded.rows_last_run, + updated_at = excluded.updated_at + `, + params: [ + job.localTable, + patch.last_cursor_id ?? null, + patch.last_cursor_ts ?? null, + patch.sync_status, + patch.error_message ?? null, + patch.rows_last_run, + ], + }) +} + +function maxCursorFromRows( + rows: Record[], + job: TableSyncJob +): { id: string | null; ts: string | null } { + let maxId: string | null = null + let maxTs: string | null = null + const col = job.cursorColumn + for (const r of rows) { + const v = r[col] + if (job.cursorKind === 'incremental_id') { + const s = v != null ? String(v) : null + if (!s) continue + if (!maxId) { + maxId = s + continue + } + const bothNumeric = /^\d+$/.test(s) && /^\d+$/.test(maxId) + if (bothNumeric) { + if (BigInt(s) > BigInt(maxId)) maxId = s + } else if (s > maxId) { + maxId = s + } + } else { + const iso = + v instanceof Date + ? v.toISOString() + : v != null + ? String(v) + : null + if (iso && (!maxTs || iso > maxTs)) maxTs = iso + } + } + return { id: maxId, ts: maxTs } +} + +export async function syncOneTable(opts: { + job: TableSyncJob + adapter: ExternalReadAdapter + dataSource: DataSource + pluginConfig: DataSyncPluginConfig + isMysql: boolean +}): Promise { + const { job, adapter, dataSource, pluginConfig, isMysql } = opts + const rpc = dataSource.rpc + let totalFetched = 0 + let totalWritten = 0 + + try { + await saveMeta(rpc, job, { + sync_status: 'running', + error_message: null, + rows_last_run: 0, + }) + + let { last_cursor_id, last_cursor_ts } = await loadMeta( + rpc, + job.localTable + ) + let hasMore = true + + while (hasMore) { + const { sql, params } = isMysql + ? buildSelectPageSqlMysql( + job, + pluginConfig.batchSize, + last_cursor_id, + last_cursor_ts + ) + : buildSelectPageSql( + job, + pluginConfig.batchSize, + last_cursor_id, + last_cursor_ts + ) + + const page = await withRetry( + () => adapter.query>(sql, params), + { + maxRetries: pluginConfig.maxRetries, + baseMs: pluginConfig.retryBaseMs, + } + ) + + if (!page.length) { + hasMore = false + break + } + + totalFetched += page.length + const mapped = page.map((r) => buildRowForLocal(r, job)) + const upsert = buildBatchUpsert(job, mapped) + if (upsert) { + await rpc.executeQuery({ + sql: upsert.sql, + params: upsert.params, + }) + totalWritten += page.length + } + + const { id, ts } = maxCursorFromRows(page, job) + if (job.cursorKind === 'incremental_id' && id) { + last_cursor_id = id + } + if (job.cursorKind === 'timestamp' && ts) { + last_cursor_ts = ts + } + + await saveMeta(rpc, job, { + last_cursor_id: + job.cursorKind === 'incremental_id' ? last_cursor_id : null, + last_cursor_ts: + job.cursorKind === 'timestamp' ? last_cursor_ts : null, + sync_status: 'running', + rows_last_run: totalWritten, + }) + + if (page.length < pluginConfig.batchSize) { + hasMore = false + } + } + + await saveMeta(rpc, job, { + last_cursor_id: + job.cursorKind === 'incremental_id' ? last_cursor_id : null, + last_cursor_ts: + job.cursorKind === 'timestamp' ? last_cursor_ts : null, + sync_status: 'ok', + error_message: null, + rows_last_run: totalWritten, + }) + + await appendLog( + rpc, + 'info', + job.localTable, + `Synced ${totalWritten} rows (fetched ${totalFetched})` + ) + + return { job, rowsFetched: totalFetched, rowsWritten: totalWritten } + } catch (e) { + const msg = e instanceof Error ? e.message : String(e) + await saveMeta(rpc, job, { + sync_status: 'error', + error_message: msg, + rows_last_run: totalWritten, + }) + await appendLog(rpc, 'error', job.localTable, msg) + return { + job, + rowsFetched: totalFetched, + rowsWritten: totalWritten, + error: msg, + } + } +} + +export async function runFullSync(opts: { + jobs: TableSyncJob[] + adapter: ExternalReadAdapter | null + dataSource: DataSource + pluginConfig: DataSyncPluginConfig + dialect: 'postgresql' | 'mysql' | 'none' +}): Promise { + const startedAt = new Date().toISOString() + const results: TableSyncResult[] = [] + + if (!opts.adapter || opts.dialect === 'none') { + return { + startedAt, + finishedAt: new Date().toISOString(), + results: [], + overallStatus: 'error', + } + } + + const isMysql = opts.dialect === 'mysql' + + await ensureDataSyncTables(opts.dataSource.rpc) + + for (const job of opts.jobs) { + assertSqliteIdent(job.localTable, 'table name') + const r = await syncOneTable({ + job, + adapter: opts.adapter, + dataSource: opts.dataSource, + pluginConfig: opts.pluginConfig, + isMysql, + }) + results.push(r) + } + + const finishedAt = new Date().toISOString() + const hasErr = results.some((x) => x.error) + const partial = hasErr && results.some((x) => !x.error) + const overallStatus: SyncStatusState = hasErr + ? partial + ? 'partial' + : 'error' + : 'ok' + + return { startedAt, finishedAt, results, overallStatus } +} diff --git a/plugins/data-sync/types.ts b/plugins/data-sync/types.ts new file mode 100644 index 0000000..e9ad12a --- /dev/null +++ b/plugins/data-sync/types.ts @@ -0,0 +1,65 @@ +/** + * Data Sync Plugin — shared types (Issue #72) + */ + +/** How we page through the upstream table */ +export type CursorKind = 'incremental_id' | 'timestamp' + +export interface TableSyncJob { + /** Qualified or bare table on the external engine, e.g. "public.users" or "users" */ + externalTable: string + /** Destination SQLite table inside the Durable Object */ + localTable: string + cursorKind: CursorKind + /** Column used for incremental pull (e.g. "id", "updated_at") */ + cursorColumn: string + /** Primary key column(s) in SQLite for idempotent UPSERT */ + pkColumns: string[] + /** Optional explicit column map: external_col -> sqlite_col */ + columnMap?: Record +} + +export interface DataSyncPluginConfig { + enabled: boolean + /** Suggested interval for external CRON / CronPlugin (seconds) */ + syncIntervalSeconds: number + jobs: TableSyncJob[] + batchSize: number + maxRetries: number + retryBaseMs: number +} + +export type SyncStatusState = 'idle' | 'running' | 'ok' | 'error' | 'partial' + +export interface TableSyncMetaRow { + table_name: string + last_synced_at: string | null + last_cursor_id: string | null + last_cursor_ts: string | null + sync_status: SyncStatusState + error_message: string | null + rows_last_run: number + updated_at: string | null +} + +export interface SyncLogEntry { + id?: number + level: 'info' | 'warn' | 'error' + scope: string + message: string + created_at?: string +} + +export interface TableSyncResult { + job: TableSyncJob + rowsFetched: number + rowsWritten: number + error?: string +} + +export interface SyncRunSummary { + startedAt: string + finishedAt: string + results: TableSyncResult[] + overallStatus: SyncStatusState +} diff --git a/src/index.ts b/src/index.ts index 4d08932..dc5431e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,6 +12,7 @@ import { QueryLogPlugin } from '../plugins/query-log' import { StatsPlugin } from '../plugins/stats' import { CronPlugin } from '../plugins/cron' import { InterfacePlugin } from '../plugins/interface' +import { DataSyncPlugin } from '../plugins/data-sync' export { StarbaseDBDurableObject } from './do' @@ -54,7 +55,15 @@ export interface Env { AUTH_ALGORITHM?: string AUTH_JWKS_ENDPOINT?: string - HYPERDRIVE: Hyperdrive + HYPERDRIVE?: Hyperdrive + + /** Data sync plugin (Issue #72) — configure via [vars] in wrangler.toml */ + DATA_SYNC_ENABLED?: string + DATA_SYNC_INTERVAL_SECONDS?: string + DATA_SYNC_JOBS?: string + DATA_SYNC_BATCH_SIZE?: string + DATA_SYNC_MAX_RETRIES?: string + DATA_SYNC_RETRY_BASE_MS?: string // ## DO NOT REMOVE: TEMPLATE INTERFACE ## } @@ -225,6 +234,7 @@ export default { cdcPlugin, cronPlugin, new StatsPlugin(), + new DataSyncPlugin(env), interfacePlugin, ] satisfies StarbasePlugin[] diff --git a/wrangler.toml b/wrangler.toml index 395c4ac..d092b97 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -75,6 +75,22 @@ ENABLE_RLS = 0 AUTH_ALGORITHM = "RS256" AUTH_JWKS_ENDPOINT = "" +# ----------------------------------------------------------------------------- +# Data Sync plugin (plugins/data-sync) — GitHub Issue #72 +# Workers env only receives flat vars; mirror any "plugins.data-sync" design here. +# ----------------------------------------------------------------------------- +# DATA_SYNC_ENABLED = "false" +# DATA_SYNC_INTERVAL_SECONDS = "300" +# DATA_SYNC_BATCH_SIZE = "250" +# DATA_SYNC_MAX_RETRIES = "3" +# DATA_SYNC_RETRY_BASE_MS = "400" +# Example DATA_SYNC_JOBS (single line JSON): +# DATA_SYNC_JOBS = "[{\"externalTable\":\"public.users\",\"localTable\":\"users\",\"cursorKind\":\"incremental_id\",\"cursorColumn\":\"id\",\"pkColumns\":[\"id\"]}]" + # [[hyperdrive]] # binding = "HYPERDRIVE" # id = "" + +# Optional: Worker cron — call POST /data-sync/sync-data with admin token from scheduled handler +# [triggers] +# crons = ["*/5 * * * *"]