Skip to content

feat: persist Dead Letter Queue to PostgreSQL (closes #221)#271

Open
whiteghost0001 wants to merge 1 commit into
Fluxora-Org:mainfrom
whiteghost0001:feature/persist-dlq-to-postgresql
Open

feat: persist Dead Letter Queue to PostgreSQL (closes #221)#271
whiteghost0001 wants to merge 1 commit into
Fluxora-Org:mainfrom
whiteghost0001:feature/persist-dlq-to-postgresql

Conversation

@whiteghost0001
Copy link
Copy Markdown

The Dead Letter Queue (DLQ) used by the indexer and webhook subsystems was stored entirely in-memory (a plain array dlqEntries in src/routes/dlq.ts). This meant every dead-lettered event was lost on process restart, making the DLQ useless for production reliability and incident investigation.

What was done

1. Database migration — migrations/1774715300000_dead-letter-queue.ts Created a new node-pg-migrate migration that adds the dead_letter_queue table with the following schema:

  • id TEXT PRIMARY KEY
  • topic TEXT NOT NULL
  • payload JSONB NOT NULL
  • error TEXT NOT NULL
  • attempts INTEGER NOT NULL DEFAULT 1
  • correlation_id TEXT (nullable)
  • first_failed_at TIMESTAMPTZ NOT NULL DEFAULT current_timestamp
  • last_failed_at TIMESTAMPTZ NOT NULL DEFAULT current_timestamp

Indexes on topic and first_failed_at cover the two most common query patterns (filter by topic, sort by age).

2. DLQ repository — src/db/repositories/dlqRepository.ts New repository following the same pattern as streamRepository.ts: uses getPool() and query() from src/db/pool.ts.

Exposed methods:

  • insert(entry) — persists a new DLQ entry
  • findAll({ limit, offset, topic? }) — paginated list with optional topic filter; returns { entries, total }
  • findById(id) — single entry lookup
  • update(id, patch) — partial update for replay (reset attempts / update lastFailedAt)
  • deleteById(id) — acknowledge/remove one entry; returns boolean
  • deleteAll(topic?) — bulk purge with optional topic filter; returns count of deleted rows

3. Route update — src/routes/dlq.ts

  • Removed the in-memory dlqEntries array, getDlqEntries(), and _resetDlq() helpers.
  • enqueueDeadLetter() is now async and calls dlqRepository.insert().
  • All five route handlers (GET /, GET /:id, POST /:id/replay, DELETE /:id, DELETE /) now delegate to the repository instead of mutating the in-memory array.
  • Audit events and structured logging are preserved unchanged.

How it was done

  • Followed the existing node-pg-migrate migration format (MigrationBuilder API, timestamp-prefixed filename).
  • Followed the existing repository pattern (getPool/query, row-to-type mapper, parameterised queries throughout — no string interpolation of user input).
  • TypeScript compiles cleanly (tsc --noEmit); the only pre-existing error is an unrelated syntax issue in tests/webhooks/retry.rateLimit.test.ts.

Closes #221

## Problem
The Dead Letter Queue (DLQ) used by the indexer and webhook subsystems
was stored entirely in-memory (a plain array `dlqEntries` in
`src/routes/dlq.ts`). This meant every dead-lettered event was lost on
process restart, making the DLQ useless for production reliability and
incident investigation.

## What was done

### 1. Database migration — `migrations/1774715300000_dead-letter-queue.ts`
Created a new node-pg-migrate migration that adds the
`dead_letter_queue` table with the following schema:
- `id` TEXT PRIMARY KEY
- `topic` TEXT NOT NULL
- `payload` JSONB NOT NULL
- `error` TEXT NOT NULL
- `attempts` INTEGER NOT NULL DEFAULT 1
- `correlation_id` TEXT (nullable)
- `first_failed_at` TIMESTAMPTZ NOT NULL DEFAULT current_timestamp
- `last_failed_at`  TIMESTAMPTZ NOT NULL DEFAULT current_timestamp

Indexes on `topic` and `first_failed_at` cover the two most common
query patterns (filter by topic, sort by age).

### 2. DLQ repository — `src/db/repositories/dlqRepository.ts`
New repository following the same pattern as `streamRepository.ts`:
uses `getPool()` and `query()` from `src/db/pool.ts`.

Exposed methods:
- `insert(entry)` — persists a new DLQ entry
- `findAll({ limit, offset, topic? })` — paginated list with optional
  topic filter; returns `{ entries, total }`
- `findById(id)` — single entry lookup
- `update(id, patch)` — partial update for replay (reset attempts /
  update lastFailedAt)
- `deleteById(id)` — acknowledge/remove one entry; returns boolean
- `deleteAll(topic?)` — bulk purge with optional topic filter; returns
  count of deleted rows

### 3. Route update — `src/routes/dlq.ts`
- Removed the in-memory `dlqEntries` array, `getDlqEntries()`, and
  `_resetDlq()` helpers.
- `enqueueDeadLetter()` is now `async` and calls
  `dlqRepository.insert()`.
- All five route handlers (GET /, GET /:id, POST /:id/replay,
  DELETE /:id, DELETE /) now delegate to the repository instead of
  mutating the in-memory array.
- Audit events and structured logging are preserved unchanged.

## How it was done
- Followed the existing node-pg-migrate migration format (MigrationBuilder
  API, timestamp-prefixed filename).
- Followed the existing repository pattern (getPool/query, row-to-type
  mapper, parameterised queries throughout — no string interpolation of
  user input).
- TypeScript compiles cleanly (`tsc --noEmit`); the only pre-existing
  error is an unrelated syntax issue in `tests/webhooks/retry.rateLimit.test.ts`.

Closes Fluxora-Org#221
@drips-wave
Copy link
Copy Markdown

drips-wave Bot commented May 29, 2026

@whiteghost0001 Great news! 🎉 Based on an automated assessment of this PR, the linked Wave issue(s) no longer count against your application limits.

You can now already apply to more issues while waiting for a review of this PR. Keep up the great work! 🚀

Learn more about application limits

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Persist Dead Letter Queue to PostgreSQL table instead of in-memory store

1 participant