From 172a1ee40409af5816d9cf7c66e43441b0292711 Mon Sep 17 00:00:00 2001 From: rmyndharis Date: Fri, 19 Jun 2026 14:30:25 +0700 Subject: [PATCH] fix(webhook): deliver session lifecycle events and key them per occurrence session.qr, session.authenticated, session.disconnected and session.status were declared in the webhook event set but never dispatched, so subscribers (including the n8n trigger node) waited indefinitely. Each is now wired from its engine-callback emit site. To keep a spec-following consumer from collapsing legitimately-distinct lifecycle events on the X-OpenWA-Idempotency-Key, the recurring lifecycle keys (status/authenticated/disconnected) are salted with a per-dispatch occurrence timestamp that stays stable across retries; message keys remain content-based. The session.status webhook is also guarded against firing twice when an engine signals one transition via both onStateChanged and a dedicated callback. The n8n docs are corrected to the real event names (session.authenticated, session.qr) and group.* is marked reserved (accepted on subscribe but not emitted yet). --- CHANGELOG.md | 10 +++ docs/22-n8n-integration.md | 22 ++++-- src/modules/session/session.service.spec.ts | 71 ++++++++++++++++++- src/modules/session/session.service.ts | 21 +++++- src/modules/webhook/dto/webhook.dto.ts | 1 + .../webhook/utils/idempotency.util.spec.ts | 64 +++++++++++++++++ src/modules/webhook/utils/idempotency.util.ts | 28 +++++--- src/modules/webhook/webhook.service.ts | 7 +- 8 files changed, 203 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66bcc8f6..e8e6eb8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- **Webhook subscriptions for session lifecycle events now deliver.** `session.status`, `session.qr`, + `session.authenticated` and `session.disconnected` were accepted on subscribe but were never dispatched, so + consumers (including the n8n trigger node) waited indefinitely. They now fire from the engine lifecycle. The + n8n integration docs are corrected to the real event names (`session.authenticated`, `session.qr` — + previously documented as the non-existent `session.connected`/`session.qr_ready`, which were rejected at + registration). `group.*` events remain accepted on subscribe but are documented as reserved (not emitted + yet). + ## [0.4.2] - 2026-06-19 Bug-fix and hardening release: access-control tightening, session-lifecycle resilience, data-migration diff --git a/docs/22-n8n-integration.md b/docs/22-n8n-integration.md index dc86bc11..81bab449 100644 --- a/docs/22-n8n-integration.md +++ b/docs/22-n8n-integration.md @@ -73,13 +73,21 @@ Start workflows when WhatsApp events occur. #### Supported Events -| Event | Description | Use Case | -| ---------------------- | ------------------------- | ------------------------ | -| `message.received` | New incoming message | Auto-reply, lead capture | -| `message.sent` | Message sent successfully | Delivery confirmation | -| `session.connected` | Session authenticated | Startup notifications | -| `session.disconnected` | Session lost connection | Alert monitoring | -| `session.qr_ready` | QR code generated | Reconnection alerts | +| Event | Description | Use Case | +| ----------------------- | ----------------------------------- | ------------------------- | +| `message.received` | New incoming message | Auto-reply, lead capture | +| `message.sent` | Message sent successfully | Delivery confirmation | +| `message.ack` | Delivery/read status advanced | Read receipts | +| `message.failed` | Outgoing message failed | Failure alerting | +| `message.revoked` | Message deleted for everyone | Deletion tracking | +| `session.status` | Session status changed | Lifecycle tracking | +| `session.qr` | QR code generated | Reconnection alerts | +| `session.authenticated` | Session logged in (phone available) | Startup notifications | +| `session.disconnected` | Session lost connection | Alert monitoring | + +> **Reserved:** `group.join`, `group.leave`, and `group.update` are accepted by the +> subscription API but are not emitted yet — don't depend on them until a release notes +> them as live. #### How It Works diff --git a/src/modules/session/session.service.spec.ts b/src/modules/session/session.service.spec.ts index 938b9b49..b35ee95a 100644 --- a/src/modules/session/session.service.spec.ts +++ b/src/modules/session/session.service.spec.ts @@ -9,7 +9,7 @@ import { EngineFactory } from '../../engine/engine.factory'; import { EventsGateway } from '../events/events.gateway'; import { WebhookService } from '../webhook/webhook.service'; import { HookManager } from '../../core/hooks'; -import { IncomingMessage, EngineEventCallbacks } from '../../engine/interfaces/whatsapp-engine.interface'; +import { IncomingMessage, EngineEventCallbacks, EngineStatus } from '../../engine/interfaces/whatsapp-engine.interface'; function createMockSession(overrides: Partial = {}): Session { return { @@ -760,6 +760,75 @@ describe('SessionService', () => { expect(dispatchedEvents('message.revoked')).toHaveLength(1); expect(eventsGateway.emitMessageRevoked as jest.Mock).toHaveBeenCalledWith('sess-uuid-1', expect.anything()); }); + + // ── session lifecycle events ────────────────────────────────────── + + it('dispatches session.qr with the QR payload when the engine emits a QR code', async () => { + const callbacks = await startAndCaptureCallbacks(); + expect(typeof callbacks.onQRCode).toBe('function'); + + callbacks.onQRCode!('qr-data-abc'); + await flush(); + + const qr = dispatchedEvents('session.qr'); + expect(qr).toHaveLength(1); + expect(qr[0][0]).toBe('sess-uuid-1'); + expect(qr[0][2]).toMatchObject({ sessionId: 'sess-uuid-1', qr: 'qr-data-abc' }); + }); + + it('dispatches session.authenticated with phone/pushName when the engine reports ready', async () => { + const callbacks = await startAndCaptureCallbacks(); + expect(typeof callbacks.onReady).toBe('function'); + + callbacks.onReady!('628123', 'Alice'); + await flush(); + + const auth = dispatchedEvents('session.authenticated'); + expect(auth).toHaveLength(1); + expect(auth[0][0]).toBe('sess-uuid-1'); + expect(auth[0][2]).toMatchObject({ sessionId: 'sess-uuid-1', phone: '628123', pushName: 'Alice' }); + }); + + it('dispatches session.disconnected with the reason when the engine disconnects', async () => { + const callbacks = await startAndCaptureCallbacks(); + expect(typeof callbacks.onDisconnected).toBe('function'); + // Isolate the dispatch from the reconnect scheduler, which would otherwise leave a live timer. + jest + .spyOn(service as unknown as { scheduleReconnect: (id: string, s: unknown) => void }, 'scheduleReconnect') + .mockImplementation(() => undefined); + + callbacks.onDisconnected!('logged out'); + await flush(); + + const disc = dispatchedEvents('session.disconnected'); + expect(disc).toHaveLength(1); + expect(disc[0][0]).toBe('sess-uuid-1'); + expect(disc[0][2]).toMatchObject({ sessionId: 'sess-uuid-1', reason: 'logged out' }); + }); + + it('dispatches session.status on a session status transition', async () => { + await startAndCaptureCallbacks(); + await flush(); + + // start() transitions the session to INITIALIZING via updateStatus(). + const status = dispatchedEvents('session.status'); + expect(status.length).toBeGreaterThanOrEqual(1); + expect(status[0][0]).toBe('sess-uuid-1'); + expect(status[0][2]).toMatchObject({ sessionId: 'sess-uuid-1', status: SessionStatus.INITIALIZING }); + }); + + it('does not double-dispatch session.status when onStateChanged and a dedicated callback report the same status', async () => { + const callbacks = await startAndCaptureCallbacks(); + // wwebjs signals a QR transition via BOTH onStateChanged(QR_READY) and onQRCode → updateStatus(QR_READY) twice. + callbacks.onStateChanged!(EngineStatus.QR_READY); + callbacks.onQRCode!('qr-data-abc'); + await flush(); + + const qrStatus = dispatchedEvents('session.status').filter( + c => (c[2] as { status?: string }).status === SessionStatus.QR_READY, + ); + expect(qrStatus).toHaveLength(1); + }); }); // ── stop ────────────────────────────────────────────────────────── diff --git a/src/modules/session/session.service.ts b/src/modules/session/session.service.ts index b5e2b1b2..73431b0b 100644 --- a/src/modules/session/session.service.ts +++ b/src/modules/session/session.service.ts @@ -91,6 +91,11 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat // Reconnection state per session private reconnectStates: Map = new Map(); + // Last session.status value dispatched to webhooks per session. Some engines signal one transition + // via BOTH onStateChanged and a dedicated callback (onQRCode/onDisconnected), so this guards the + // webhook POST against firing the same status twice. Cleared on delete(). + private readonly lastDispatchedStatus = new Map(); + // Sessions currently being stopped/deleted. An in-flight executeReconnect awaits // engine init, so a stop/delete during that window could re-register an engine AFTER // teardown (orphan). stop()/delete() add the id here; executeReconnect checks it after its @@ -345,6 +350,7 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat } finally { // Always clear the teardown mark so a later recreate/start with this id isn't suppressed. this.stoppingSessions.delete(id); + this.lastDispatchedStatus.delete(id); } } @@ -411,12 +417,14 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat await this.updateStatus(id, SessionStatus.INITIALIZING); await engine.initialize({ - onQRCode: (): void => { + onQRCode: (qr: string): void => { this.logger.log('QR code generated', { sessionId: id, action: 'qr_generated', }); + void this.webhookService.dispatch(id, 'session.qr', { sessionId: id, qr }); + // Execute hook for QR event void this.hookManager.execute( 'session:qr', @@ -437,6 +445,8 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat action: 'ready', }); + void this.webhookService.dispatch(id, 'session.authenticated', { sessionId: id, phone, pushName }); + // Execute hook for ready event void this.hookManager.execute( 'session:ready', @@ -696,6 +706,8 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat action: 'disconnected', }); + void this.webhookService.dispatch(id, 'session.disconnected', { sessionId: id, reason }); + // Execute hook for disconnected event void this.hookManager.execute( 'session:disconnected', @@ -1005,6 +1017,13 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat }); // Emit real-time event to connected WebSocket clients this.eventsGateway.emitSessionStatus(id, status); + // Mirror the status change to subscribed webhooks. Some engines signal one transition via both + // onStateChanged AND a dedicated callback (onQRCode/onDisconnected), which would POST the same + // status twice — dispatch only when the status actually changed from the last one we sent. + if (this.lastDispatchedStatus.get(id) !== status) { + this.lastDispatchedStatus.set(id, status); + void this.webhookService.dispatch(id, 'session.status', { sessionId: id, status }); + } } /** diff --git a/src/modules/webhook/dto/webhook.dto.ts b/src/modules/webhook/dto/webhook.dto.ts index aab2e33e..94d7229e 100644 --- a/src/modules/webhook/dto/webhook.dto.ts +++ b/src/modules/webhook/dto/webhook.dto.ts @@ -26,6 +26,7 @@ export const WEBHOOK_EVENTS = [ 'session.qr', 'session.authenticated', 'session.disconnected', + // Reserved: accepted on subscribe but not dispatched yet (no engine emit source). 'group.join', 'group.leave', 'group.update', diff --git a/src/modules/webhook/utils/idempotency.util.spec.ts b/src/modules/webhook/utils/idempotency.util.spec.ts index 0d56c730..062b9f41 100644 --- a/src/modules/webhook/utils/idempotency.util.spec.ts +++ b/src/modules/webhook/utils/idempotency.util.spec.ts @@ -47,6 +47,70 @@ describe('Idempotency Utils', () => { expect(key).toBe('sess_sess_1_CONNECTED'); }); + it('salts session.status keys with the occurrence time so repeated transitions to the same status stay distinct', () => { + const a = generateIdempotencyKey( + 'session.status', + { sessionId: 'A', status: 'DISCONNECTED' }, + '2026-06-19T00:00:00.000Z', + ); + const b = generateIdempotencyKey( + 'session.status', + { sessionId: 'A', status: 'DISCONNECTED' }, + '2026-06-19T02:00:00.000Z', + ); + expect(a).not.toBe(b); + }); + + it('salts session.authenticated keys so re-authentication (same phone, later time) is a distinct event', () => { + const a = generateIdempotencyKey( + 'session.authenticated', + { sessionId: 'A', phone: '628', pushName: 'Me' }, + '2026-06-19T00:00:00.000Z', + ); + const b = generateIdempotencyKey( + 'session.authenticated', + { sessionId: 'A', phone: '628', pushName: 'Me' }, + '2026-06-19T01:00:00.000Z', + ); + expect(a).not.toBe(b); + }); + + it('salts session.disconnected keys so repeat disconnects with the same reason stay distinct', () => { + const a = generateIdempotencyKey( + 'session.disconnected', + { sessionId: 'A', reason: 'logged out' }, + '2026-06-19T00:00:00.000Z', + ); + const b = generateIdempotencyKey( + 'session.disconnected', + { sessionId: 'A', reason: 'logged out' }, + '2026-06-19T03:00:00.000Z', + ); + expect(a).not.toBe(b); + }); + + it('is retry-stable: the same lifecycle occurrence regenerates the same key', () => { + const at = '2026-06-19T00:00:00.000Z'; + const a = generateIdempotencyKey('session.disconnected', { sessionId: 'A', reason: 'logged out' }, at); + const b = generateIdempotencyKey('session.disconnected', { sessionId: 'A', reason: 'logged out' }, at); + expect(a).toBe(b); + }); + + it('does not salt message-event keys with the occurrence time (content-based dedup preserved)', () => { + const a = generateIdempotencyKey( + 'message.ack', + { id: 'X', status: 'read', sessionId: 'A' }, + '2026-06-19T00:00:00.000Z', + ); + const b = generateIdempotencyKey( + 'message.ack', + { id: 'X', status: 'read', sessionId: 'A' }, + '2026-06-19T09:00:00.000Z', + ); + expect(a).toBe(b); + expect(a).toBe('ack_A_X_read'); + }); + it('should generate key for group.join', () => { const key = generateIdempotencyKey('group.join', { groupId: 'grp_1', diff --git a/src/modules/webhook/utils/idempotency.util.ts b/src/modules/webhook/utils/idempotency.util.ts index ec61405a..99140480 100644 --- a/src/modules/webhook/utils/idempotency.util.ts +++ b/src/modules/webhook/utils/idempotency.util.ts @@ -24,11 +24,17 @@ function hashData(data: Record): string { * Same event with same data will produce the same key (deterministic). * * @remarks - * Keys are content-based and do NOT include timestamps. - * This ensures that replayed/retried events with identical payloads - * produce the same key for proper deduplication. + * Message keys are content-based (keyed on the unique message id), so two deliveries of the same + * logical message dedupe. Lifecycle events (session.status/authenticated/disconnected) recur with + * identical content — the same phone on every reconnect, a constant disconnect reason — so they are + * salted with `occurredAt` (captured ONCE per dispatch and reused across retries): distinct + * occurrences get distinct keys while retries of the same occurrence stay stable. + * + * @param occurredAt - ISO timestamp captured once per dispatch; salts recurring lifecycle keys. */ -export function generateIdempotencyKey(event: string, data: Record): string { +export function generateIdempotencyKey(event: string, data: Record, occurredAt?: string): string { + // Salt applied only to the recurring lifecycle keys below; message/qr keys ignore it. + const occurrence = occurredAt ? `_${occurredAt}` : ''; switch (event) { case 'message.received': case 'message.sent': @@ -51,20 +57,22 @@ export function generateIdempotencyKey(event: string, data: Record w.events.includes(event) || w.events.includes('*')); - // Generate idempotency key (same for all webhooks receiving this event) - const idempotencyKey = generateIdempotencyKey(event, { ...data, sessionId }); + // Generate idempotency key (same for all webhooks receiving this event). occurredAt is captured + // once here and reused for every retry of this dispatch, so recurring lifecycle events get a + // distinct-per-occurrence key while retries of the same event stay stable. + const occurredAt = new Date().toISOString(); + const idempotencyKey = generateIdempotencyKey(event, { ...data, sessionId }, occurredAt); // Dispatch to all matching webhooks for (const webhook of matchingWebhooks) {