From 4c3b88cb12c71e00a8b973f3f85eef9f149014ce Mon Sep 17 00:00:00 2001 From: rmyndharis Date: Fri, 19 Jun 2026 19:33:21 +0700 Subject: [PATCH] fix(session): reconcile late acks, serialize concurrent reactions, harden hook + ack error paths - A fast delivered/read/failed ack could match 0 rows when it arrived before the send's 2nd save committed waMessageId, leaving the message stuck at SENT. onMessageAck now retries the forward-only, session-scoped status advance once after a short delay (ACK_RECONCILE_DELAY_MS) to close that race. - Two reactions on the same message both read the same stored snapshot and the last full-row save won, dropping a reaction. Reaction writes are now serialized per (sessionId, waMessageId) via a self-cleaning promise-chain map, preserving every reaction. - The ack status UPDATE was the one message handler without a .catch, so a connection-level rejection escaped to the global unhandled-rejection backstop without per-message context; it now logs locally. - A plugin hook returning both mutated data and an error had the (possibly partial/corrupted) data applied before the error was thrown, propagating it to persistence/webhooks/WS as success. The data mutation of an errored hook is now discarded. --- CHANGELOG.md | 16 +++ src/core/hooks/hook-manager.service.spec.ts | 30 +++++ src/core/hooks/hook-manager.service.ts | 4 +- src/modules/session/session.service.spec.ts | 124 ++++++++++++++++++- src/modules/session/session.service.ts | 128 ++++++++++++++------ 5 files changed, 260 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66bcc8f6..d2476e6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- **A delivery/read receipt that arrives before the send is fully recorded is no longer lost.** A fast + `delivered`/`read`/`failed` ack could reach the server before the just-sent message's id (`waMessageId`) + was committed, so the status update matched no row and the message stayed stuck at "sent" indefinitely. + The ack now retries once after a short delay (still forward-only and session-scoped, so it can't + downgrade a higher status), reconciling the stored status. +- **Concurrent reactions on the same message no longer overwrite each other.** Two reactions arriving + together both read the same stored snapshot and the last save won, silently dropping a reaction. + Reaction writes are now serialized per message, so every reaction is preserved. +- **A failed delivery-status database write is logged with per-message context** instead of escaping to the + global unhandled-rejection backstop (the ack update was the one message handler missing its `.catch`). +- **A plugin hook that reports an error no longer has its partial output applied.** A hook returning both + mutated data and an error had the (possibly half-transformed) data propagated to persistence, webhooks, + and the WebSocket as if it succeeded; an errored hook's data mutation is now discarded. + ## [0.4.2] - 2026-06-19 Bug-fix and hardening release: access-control tightening, session-lifecycle resilience, data-migration diff --git a/src/core/hooks/hook-manager.service.spec.ts b/src/core/hooks/hook-manager.service.spec.ts index 8725dd5e..919999f6 100644 --- a/src/core/hooks/hook-manager.service.spec.ts +++ b/src/core/hooks/hook-manager.service.spec.ts @@ -95,6 +95,36 @@ describe('HookManager', () => { expect(res.continue).toBe(true); }); + it('discards the mutated data of a handler that also returns an error (error means discard output)', async () => { + // A handler that signals an error must NOT have its (possibly partial/corrupted) mutation applied — + // otherwise a half-transformed payload reaches persistence/webhooks/WS presented as success. + hm.register( + 'bad', + 'message:received', + async () => ({ continue: true, data: 'CORRUPTED', error: new Error('partial failure') }), + 10, + ); + + const res = await hm.execute('message:received', 'original', { source: 'test' }); + + expect(res.continue).toBe(true); + expect(res.data).toBe('original'); // the errored handler's mutation is dropped + }); + + it('discards an errored handler mutation even on the stop path (continue:false + error)', async () => { + hm.register( + 'bad', + 'message:received', + async () => ({ continue: false, data: 'CORRUPTED', error: new Error('x') }), + 10, + ); + + const res = await hm.execute('message:received', 'original', { source: 'test' }); + + expect(res.continue).toBe(false); // the chain still stops + expect(res.data).toBe('original'); // but the errored mutation is not carried out on stop + }); + it('register/unregister/hasHooks/getHookCount track registrations', () => { expect(hm.hasHooks('session:created')).toBe(false); const id = hm.register('p', 'session:created', async ctx => ({ continue: true, data: ctx.data })); diff --git a/src/core/hooks/hook-manager.service.ts b/src/core/hooks/hook-manager.service.ts index 7a7bd6f6..c825b2e1 100644 --- a/src/core/hooks/hook-manager.service.ts +++ b/src/core/hooks/hook-manager.service.ts @@ -130,7 +130,9 @@ export class HookManager { ctx.data = currentData; const result = await registration.handler(ctx); - if (result.data !== undefined) { + // A handler that reports an error discards its output: do NOT apply its (possibly partial or + // corrupted) data mutation, even though HookResult allows returning data and error together. + if (result.error === undefined && result.data !== undefined) { currentData = result.data as T; } diff --git a/src/modules/session/session.service.spec.ts b/src/modules/session/session.service.spec.ts index 938b9b49..d179722a 100644 --- a/src/modules/session/session.service.spec.ts +++ b/src/modules/session/session.service.spec.ts @@ -2,7 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing'; import { getRepositoryToken, getDataSourceToken } from '@nestjs/typeorm'; import { Repository, DataSource, In } from 'typeorm'; import { NotFoundException, ConflictException, BadRequestException } from '@nestjs/common'; -import { SessionService } from './session.service'; +import { SessionService, ACK_RECONCILE_DELAY_MS } from './session.service'; import { Session, SessionStatus } from './entities/session.entity'; import { Message, MessageStatus } from '../message/entities/message.entity'; import { EngineFactory } from '../../engine/engine.factory'; @@ -641,6 +641,128 @@ describe('SessionService', () => { expect(dispatchedEvents('message.failed')).toHaveLength(0); }); + it('retries the ack update once after a delay when the row is not yet matchable (ack before commit)', async () => { + const callbacks = await startAndCaptureCallbacks(); + (messageRepository.update as jest.Mock) + .mockClear() + .mockResolvedValueOnce({ affected: 0 }) // send's 2nd save (waMessageId) not committed yet + .mockResolvedValueOnce({ affected: 1 }); // retry now matches the row + + jest.useFakeTimers(); + try { + callbacks.onMessageAck!('wa-out-1', 'delivered'); + await jest.advanceTimersByTimeAsync(0); // flush the first update's microtasks + expect(messageRepository.update as jest.Mock).toHaveBeenCalledTimes(1); + + await jest.advanceTimersByTimeAsync(ACK_RECONCILE_DELAY_MS); + expect(messageRepository.update as jest.Mock).toHaveBeenCalledTimes(2); + } finally { + jest.useRealTimers(); + } + }); + + it('does not schedule a retry when the first ack update advances a row', async () => { + const callbacks = await startAndCaptureCallbacks(); + (messageRepository.update as jest.Mock).mockClear().mockResolvedValue({ affected: 1 }); + + jest.useFakeTimers(); + try { + callbacks.onMessageAck!('wa-out-1', 'delivered'); + await jest.advanceTimersByTimeAsync(ACK_RECONCILE_DELAY_MS); + expect(messageRepository.update as jest.Mock).toHaveBeenCalledTimes(1); + } finally { + jest.useRealTimers(); + } + }); + + it('handles a rejected ack update without an unhandled rejection', async () => { + const callbacks = await startAndCaptureCallbacks(); + (messageRepository.update as jest.Mock).mockClear().mockRejectedValue(new Error('data DB down')); + + // Must not throw synchronously; the .catch keeps the rejection from escaping to the global backstop + // (a missing .catch here would surface as an unhandled rejection and fail the suite). + callbacks.onMessageAck!('wa-out-1', 'delivered'); + await flush(); + await flush(); + + expect(messageRepository.update as jest.Mock).toHaveBeenCalled(); + }); + + it('serializes concurrent reactions on the same message so neither sender is clobbered', async () => { + const callbacks = await startAndCaptureCallbacks(); + + // Simulate a real DB: each findOne returns a FRESH snapshot of the persisted row, and save + // writes it back. Without per-message serialization the two handlers read the same empty + // snapshot and the second save clobbers the first sender's reaction. + type Row = { metadata?: Record }; + const clone = (r: Row): Row => JSON.parse(JSON.stringify(r)) as Row; + let stored: Row = { metadata: {} }; + (messageRepository.findOne as jest.Mock).mockImplementation(() => Promise.resolve(clone(stored))); + (messageRepository.save as jest.Mock).mockImplementation((m: Row) => { + stored = clone(m); + return Promise.resolve(m); + }); + + callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'alice', reaction: '👍' }); + callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'bob', reaction: '🎉' }); + + for (let i = 0; i < 5; i++) await flush(); + + expect(stored.metadata?.reactions).toEqual({ alice: '👍', bob: '🎉' }); + }); + + it('removes a sender reaction on a cleared reaction event (delete branch)', async () => { + const callbacks = await startAndCaptureCallbacks(); + type Row = { metadata?: Record }; + const clone = (r: Row): Row => JSON.parse(JSON.stringify(r)) as Row; + let stored: Row = { metadata: { reactions: { alice: '👍', bob: '🎉' } } }; + (messageRepository.findOne as jest.Mock).mockImplementation(() => Promise.resolve(clone(stored))); + (messageRepository.save as jest.Mock).mockImplementation((m: Row) => { + stored = clone(m); + return Promise.resolve(m); + }); + + callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'alice', reaction: '' }); + + for (let i = 0; i < 3; i++) await flush(); + + expect(stored.metadata?.reactions).toEqual({ bob: '🎉' }); // alice removed, bob preserved + }); + + it('a failed reaction write does not block a later reaction on the same message', async () => { + const callbacks = await startAndCaptureCallbacks(); + type Row = { metadata?: Record }; + const clone = (r: Row): Row => JSON.parse(JSON.stringify(r)) as Row; + let stored: Row = { metadata: {} }; + (messageRepository.findOne as jest.Mock).mockImplementation(() => Promise.resolve(clone(stored))); + (messageRepository.save as jest.Mock) + .mockRejectedValueOnce(new Error('write blip')) // alice's write fails + .mockImplementation((m: Row) => { + stored = clone(m); + return Promise.resolve(m); + }); + + callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'alice', reaction: '👍' }); + callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'bob', reaction: '🎉' }); + + for (let i = 0; i < 5; i++) await flush(); + + expect(stored.metadata?.reactions).toEqual({ bob: '🎉' }); // bob applied despite alice's failure + }); + + it('cleans up the per-message serialization entry after the chain drains (no leak)', async () => { + const callbacks = await startAndCaptureCallbacks(); + (messageRepository.findOne as jest.Mock).mockResolvedValue({ metadata: {} }); + (messageRepository.save as jest.Mock).mockResolvedValue(undefined); + + callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'alice', reaction: '👍' }); + + for (let i = 0; i < 3; i++) await flush(); + + const chains = (service as unknown as { reactionChains: Map }).reactionChains; + expect(chains.size).toBe(0); + }); + it('dispatches message.received (not message.sent) on an incoming message event', async () => { const callbacks = await startAndCaptureCallbacks(); diff --git a/src/modules/session/session.service.ts b/src/modules/session/session.service.ts index b5e2b1b2..a4cd7b4a 100644 --- a/src/modules/session/session.service.ts +++ b/src/modules/session/session.service.ts @@ -20,6 +20,7 @@ import { ChatState, DeliveryStatus, IncomingMessage, + ReactionEvent, } from '../../engine/interfaces/whatsapp-engine.interface'; import { createLogger } from '../../common/services/logger.service'; import { EventsGateway } from '../events/events.gateway'; @@ -45,6 +46,12 @@ const RECONNECT_BASE_DELAY_MIN_MS = 1000; const RECONNECT_BASE_DELAY_MAX_MS = 300_000; const RECONNECT_MAX_ATTEMPTS_CAP = 20; const RECONNECT_DELAY_CAP_MS = 3_600_000; +/** + * Delay before retrying an ack UPDATE that matched 0 rows. A fast delivered/read ack can arrive before + * the send's 2nd save (which writes waMessageId) has committed, so the first UPDATE finds no row. One + * retry after this delay closes that race; the forward-only transition guard keeps it idempotent. + */ +export const ACK_RECONCILE_DELAY_MS = 750; const clampNumber = (n: number, min: number, max: number): number => Math.min(Math.max(n, min), max); @@ -102,6 +109,11 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat // awaited hook and orphan an engine the lifecycle could never destroy. private initializingSessions: Set = new Set(); + // Serializes the read-modify-write of a message's reactions map per `${sessionId}:${waMessageId}`, + // so two concurrent reaction events on the same message don't clobber each other (both read the + // same snapshot, both full-row save, last writer wins). Entries are deleted once their chain drains. + private reactionChains: Map> = new Map(); + constructor( @InjectRepository(Session, 'data') private readonly sessionRepository: Repository, @@ -591,25 +603,44 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat // fire-and-forget writes race-safe at the DB level. const messageStatus = deliveryStatusToMessageStatus(status); if (messageStatus) { - void this.messageRepository - // Scope by sessionId: waMessageId is unique per account/chat, not global — - // an ack on one session must never advance a same-id row in another session. - .update( - { sessionId: id, waMessageId: messageId, status: In(ackStatusTransitionFrom(messageStatus)) }, - { status: messageStatus }, - ) - .then(result => { - // affected:0 — the row was not advanced: either the send's 2nd save (which sets - // waMessageId) hasn't committed yet, or the status is already at/above the target. - if (result.affected === 0) { - this.logger.debug(`Message ack ${messageId}: no status row advanced to ${messageStatus} (${status})`, { - sessionId: id, - messageId, - status, - action: 'message_ack_noop', - }); - } + // Scope by sessionId: waMessageId is unique per account/chat, not global — an ack on one + // session must never advance a same-id row in another session. The In() guard makes the + // UPDATE forward-only (a late/out-of-order ack can't downgrade) and idempotent on retry. + const advanceAck = (): Promise => + this.messageRepository + .update( + { sessionId: id, waMessageId: messageId, status: In(ackStatusTransitionFrom(messageStatus)) }, + { status: messageStatus }, + ) + .then(result => result.affected ?? 0); + + const logNoop = (): void => + this.logger.debug(`Message ack ${messageId}: no status row advanced to ${messageStatus} (${status})`, { + sessionId: id, + messageId, + status, + action: 'message_ack_noop', }); + + const onAckError = (err: unknown): void => + this.logger.error(`Failed to advance ack for ${messageId}`, String(err)); + + void advanceAck() + .then(affected => { + if (affected > 0) return; + // affected:0 — most likely the send's 2nd save (which writes waMessageId) hasn't committed + // yet, so the row isn't matchable. Each ack is one-shot (WhatsApp won't necessarily resend), + // so retry ONCE after a short delay to close that race rather than leave it stuck at SENT. + const timer = setTimeout(() => { + void advanceAck() + .then(retried => { + if (retried === 0) logNoop(); + }) + .catch(onAckError); + }, ACK_RECONCILE_DELAY_MS); + timer.unref?.(); + }) + .catch(onAckError); } // Push the live delivery/read tick to the dashboard over the websocket (neutral status). @@ -666,28 +697,18 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat action: 'message_reaction_received', }); - void this.messageRepository - .findOne({ where: { sessionId: id, waMessageId: event.messageId } }) - .then(async msg => { - if (!msg) return; - const metadata = msg.metadata || {}; - const reactions = (metadata.reactions as Record) || {}; - - if (!event.reaction) { - delete reactions[event.senderId]; - } else { - reactions[event.senderId] = event.reaction; - } - - metadata.reactions = reactions; - msg.metadata = metadata; - await this.messageRepository.save(msg); - - this.eventsGateway.emitMessageReaction(id, { ...event, reactions }); - }) - .catch(err => { - this.logger.error(`Failed to update message reaction: ${event.messageId}`, String(err)); - }); + // Serialize per message so two concurrent reactions don't read the same snapshot and clobber + // each other on the full-row save. A prior chain's failure must not block later reactions. + const key = `${id}:${event.messageId}`; + const prior = this.reactionChains.get(key) ?? Promise.resolve(); + const next = prior.catch(() => undefined).then(() => this.applyReaction(id, event)); + this.reactionChains.set(key, next); + void next.finally(() => { + // Clean up only if no newer reaction chained after us, so the map can't leak per message. + if (this.reactionChains.get(key) === next) { + this.reactionChains.delete(key); + } + }); }, onDisconnected: (reason: string): void => { this.logger.warn(`Session disconnected: ${reason}`, { @@ -751,6 +772,33 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat }); } + /** + * Apply one reaction event to the stored message's reactions map (read-modify-write of the JSON + * column). Invoked through the per-message serialization chain in onMessageReaction, so concurrent + * reactions on the same message run sequentially and don't clobber each other. + */ + private async applyReaction(id: string, event: ReactionEvent): Promise { + try { + const msg = await this.messageRepository.findOne({ where: { sessionId: id, waMessageId: event.messageId } }); + if (!msg) return; + + const metadata = msg.metadata || {}; + const reactions = (metadata.reactions as Record) || {}; + if (!event.reaction) { + delete reactions[event.senderId]; + } else { + reactions[event.senderId] = event.reaction; + } + metadata.reactions = reactions; + msg.metadata = metadata; + await this.messageRepository.save(msg); + + this.eventsGateway.emitMessageReaction(id, { ...event, reactions }); + } catch (err) { + this.logger.error(`Failed to update message reaction: ${event.messageId}`, String(err)); + } + } + private scheduleReconnect(id: string, session: Session): void { const state = this.reconnectStates.get(id); if (!state) return;