Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- **A delivery/read receipt that arrives before the send is fully recorded is no longer lost.** A fast
`delivered`/`read`/`failed` ack could reach the server before the just-sent message's id (`waMessageId`)
was committed, so the status update matched no row and the message stayed stuck at "sent" indefinitely.
The ack now retries once after a short delay (still forward-only and session-scoped, so it can't
downgrade a higher status), reconciling the stored status.
- **Concurrent reactions on the same message no longer overwrite each other.** Two reactions arriving
together both read the same stored snapshot and the last save won, silently dropping a reaction.
Reaction writes are now serialized per message, so every reaction is preserved.
- **A failed delivery-status database write is logged with per-message context** instead of escaping to the
global unhandled-rejection backstop (the ack update was the one message handler missing its `.catch`).
- **A plugin hook that reports an error no longer has its partial output applied.** A hook returning both
mutated data and an error had the (possibly half-transformed) data propagated to persistence, webhooks,
and the WebSocket as if it succeeded; an errored hook's data mutation is now discarded.

## [0.4.2] - 2026-06-19

Bug-fix and hardening release: access-control tightening, session-lifecycle resilience, data-migration
Expand Down
30 changes: 30 additions & 0 deletions src/core/hooks/hook-manager.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,36 @@ describe('HookManager', () => {
expect(res.continue).toBe(true);
});

it('discards the mutated data of a handler that also returns an error (error means discard output)', async () => {
// A handler that signals an error must NOT have its (possibly partial/corrupted) mutation applied —
// otherwise a half-transformed payload reaches persistence/webhooks/WS presented as success.
hm.register(
'bad',
'message:received',
async () => ({ continue: true, data: 'CORRUPTED', error: new Error('partial failure') }),
10,
);

const res = await hm.execute('message:received', 'original', { source: 'test' });

expect(res.continue).toBe(true);
expect(res.data).toBe('original'); // the errored handler's mutation is dropped
});

it('discards an errored handler mutation even on the stop path (continue:false + error)', async () => {
hm.register(
'bad',
'message:received',
async () => ({ continue: false, data: 'CORRUPTED', error: new Error('x') }),
10,
);

const res = await hm.execute('message:received', 'original', { source: 'test' });

expect(res.continue).toBe(false); // the chain still stops
expect(res.data).toBe('original'); // but the errored mutation is not carried out on stop
});

it('register/unregister/hasHooks/getHookCount track registrations', () => {
expect(hm.hasHooks('session:created')).toBe(false);
const id = hm.register('p', 'session:created', async ctx => ({ continue: true, data: ctx.data }));
Expand Down
4 changes: 3 additions & 1 deletion src/core/hooks/hook-manager.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ export class HookManager {
ctx.data = currentData;
const result = await registration.handler(ctx);

if (result.data !== undefined) {
// A handler that reports an error discards its output: do NOT apply its (possibly partial or
// corrupted) data mutation, even though HookResult allows returning data and error together.
if (result.error === undefined && result.data !== undefined) {
currentData = result.data as T;
}

Expand Down
124 changes: 123 additions & 1 deletion src/modules/session/session.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken, getDataSourceToken } from '@nestjs/typeorm';
import { Repository, DataSource, In } from 'typeorm';
import { NotFoundException, ConflictException, BadRequestException } from '@nestjs/common';
import { SessionService } from './session.service';
import { SessionService, ACK_RECONCILE_DELAY_MS } from './session.service';
import { Session, SessionStatus } from './entities/session.entity';
import { Message, MessageStatus } from '../message/entities/message.entity';
import { EngineFactory } from '../../engine/engine.factory';
Expand Down Expand Up @@ -641,6 +641,128 @@ describe('SessionService', () => {
expect(dispatchedEvents('message.failed')).toHaveLength(0);
});

it('retries the ack update once after a delay when the row is not yet matchable (ack before commit)', async () => {
const callbacks = await startAndCaptureCallbacks();
(messageRepository.update as jest.Mock)
.mockClear()
.mockResolvedValueOnce({ affected: 0 }) // send's 2nd save (waMessageId) not committed yet
.mockResolvedValueOnce({ affected: 1 }); // retry now matches the row

jest.useFakeTimers();
try {
callbacks.onMessageAck!('wa-out-1', 'delivered');
await jest.advanceTimersByTimeAsync(0); // flush the first update's microtasks
expect(messageRepository.update as jest.Mock).toHaveBeenCalledTimes(1);

await jest.advanceTimersByTimeAsync(ACK_RECONCILE_DELAY_MS);
expect(messageRepository.update as jest.Mock).toHaveBeenCalledTimes(2);
} finally {
jest.useRealTimers();
}
});

it('does not schedule a retry when the first ack update advances a row', async () => {
const callbacks = await startAndCaptureCallbacks();
(messageRepository.update as jest.Mock).mockClear().mockResolvedValue({ affected: 1 });

jest.useFakeTimers();
try {
callbacks.onMessageAck!('wa-out-1', 'delivered');
await jest.advanceTimersByTimeAsync(ACK_RECONCILE_DELAY_MS);
expect(messageRepository.update as jest.Mock).toHaveBeenCalledTimes(1);
} finally {
jest.useRealTimers();
}
});

it('handles a rejected ack update without an unhandled rejection', async () => {
const callbacks = await startAndCaptureCallbacks();
(messageRepository.update as jest.Mock).mockClear().mockRejectedValue(new Error('data DB down'));

// Must not throw synchronously; the .catch keeps the rejection from escaping to the global backstop
// (a missing .catch here would surface as an unhandled rejection and fail the suite).
callbacks.onMessageAck!('wa-out-1', 'delivered');
await flush();
await flush();

expect(messageRepository.update as jest.Mock).toHaveBeenCalled();
});

it('serializes concurrent reactions on the same message so neither sender is clobbered', async () => {
const callbacks = await startAndCaptureCallbacks();

// Simulate a real DB: each findOne returns a FRESH snapshot of the persisted row, and save
// writes it back. Without per-message serialization the two handlers read the same empty
// snapshot and the second save clobbers the first sender's reaction.
type Row = { metadata?: Record<string, unknown> };
const clone = (r: Row): Row => JSON.parse(JSON.stringify(r)) as Row;
let stored: Row = { metadata: {} };
(messageRepository.findOne as jest.Mock).mockImplementation(() => Promise.resolve(clone(stored)));
(messageRepository.save as jest.Mock).mockImplementation((m: Row) => {
stored = clone(m);
return Promise.resolve(m);
});

callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'alice', reaction: '👍' });
callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'bob', reaction: '🎉' });

for (let i = 0; i < 5; i++) await flush();

expect(stored.metadata?.reactions).toEqual({ alice: '👍', bob: '🎉' });
});

it('removes a sender reaction on a cleared reaction event (delete branch)', async () => {
const callbacks = await startAndCaptureCallbacks();
type Row = { metadata?: Record<string, unknown> };
const clone = (r: Row): Row => JSON.parse(JSON.stringify(r)) as Row;
let stored: Row = { metadata: { reactions: { alice: '👍', bob: '🎉' } } };
(messageRepository.findOne as jest.Mock).mockImplementation(() => Promise.resolve(clone(stored)));
(messageRepository.save as jest.Mock).mockImplementation((m: Row) => {
stored = clone(m);
return Promise.resolve(m);
});

callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'alice', reaction: '' });

for (let i = 0; i < 3; i++) await flush();

expect(stored.metadata?.reactions).toEqual({ bob: '🎉' }); // alice removed, bob preserved
});

it('a failed reaction write does not block a later reaction on the same message', async () => {
const callbacks = await startAndCaptureCallbacks();
type Row = { metadata?: Record<string, unknown> };
const clone = (r: Row): Row => JSON.parse(JSON.stringify(r)) as Row;
let stored: Row = { metadata: {} };
(messageRepository.findOne as jest.Mock).mockImplementation(() => Promise.resolve(clone(stored)));
(messageRepository.save as jest.Mock)
.mockRejectedValueOnce(new Error('write blip')) // alice's write fails
.mockImplementation((m: Row) => {
stored = clone(m);
return Promise.resolve(m);
});

callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'alice', reaction: '👍' });
callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'bob', reaction: '🎉' });

for (let i = 0; i < 5; i++) await flush();

expect(stored.metadata?.reactions).toEqual({ bob: '🎉' }); // bob applied despite alice's failure
});

it('cleans up the per-message serialization entry after the chain drains (no leak)', async () => {
const callbacks = await startAndCaptureCallbacks();
(messageRepository.findOne as jest.Mock).mockResolvedValue({ metadata: {} });
(messageRepository.save as jest.Mock).mockResolvedValue(undefined);

callbacks.onMessageReaction!({ messageId: 'wa-1', chatId: 'c', senderId: 'alice', reaction: '👍' });

for (let i = 0; i < 3; i++) await flush();

const chains = (service as unknown as { reactionChains: Map<string, unknown> }).reactionChains;
expect(chains.size).toBe(0);
});

it('dispatches message.received (not message.sent) on an incoming message event', async () => {
const callbacks = await startAndCaptureCallbacks();

Expand Down
128 changes: 88 additions & 40 deletions src/modules/session/session.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
ChatState,
DeliveryStatus,
IncomingMessage,
ReactionEvent,
} from '../../engine/interfaces/whatsapp-engine.interface';
import { createLogger } from '../../common/services/logger.service';
import { EventsGateway } from '../events/events.gateway';
Expand All @@ -45,6 +46,12 @@ const RECONNECT_BASE_DELAY_MIN_MS = 1000;
const RECONNECT_BASE_DELAY_MAX_MS = 300_000;
const RECONNECT_MAX_ATTEMPTS_CAP = 20;
const RECONNECT_DELAY_CAP_MS = 3_600_000;
/**
* Delay before retrying an ack UPDATE that matched 0 rows. A fast delivered/read ack can arrive before
* the send's 2nd save (which writes waMessageId) has committed, so the first UPDATE finds no row. One
* retry after this delay closes that race; the forward-only transition guard keeps it idempotent.
*/
export const ACK_RECONCILE_DELAY_MS = 750;

const clampNumber = (n: number, min: number, max: number): number => Math.min(Math.max(n, min), max);

Expand Down Expand Up @@ -102,6 +109,11 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat
// awaited hook and orphan an engine the lifecycle could never destroy.
private initializingSessions: Set<string> = new Set();

// Serializes the read-modify-write of a message's reactions map per `${sessionId}:${waMessageId}`,
// so two concurrent reaction events on the same message don't clobber each other (both read the
// same snapshot, both full-row save, last writer wins). Entries are deleted once their chain drains.
private reactionChains: Map<string, Promise<void>> = new Map();

constructor(
@InjectRepository(Session, 'data')
private readonly sessionRepository: Repository<Session>,
Expand Down Expand Up @@ -591,25 +603,44 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat
// fire-and-forget writes race-safe at the DB level.
const messageStatus = deliveryStatusToMessageStatus(status);
if (messageStatus) {
void this.messageRepository
// Scope by sessionId: waMessageId is unique per account/chat, not global —
// an ack on one session must never advance a same-id row in another session.
.update(
{ sessionId: id, waMessageId: messageId, status: In(ackStatusTransitionFrom(messageStatus)) },
{ status: messageStatus },
)
.then(result => {
// affected:0 — the row was not advanced: either the send's 2nd save (which sets
// waMessageId) hasn't committed yet, or the status is already at/above the target.
if (result.affected === 0) {
this.logger.debug(`Message ack ${messageId}: no status row advanced to ${messageStatus} (${status})`, {
sessionId: id,
messageId,
status,
action: 'message_ack_noop',
});
}
// Scope by sessionId: waMessageId is unique per account/chat, not global — an ack on one
// session must never advance a same-id row in another session. The In() guard makes the
// UPDATE forward-only (a late/out-of-order ack can't downgrade) and idempotent on retry.
const advanceAck = (): Promise<number> =>
this.messageRepository
.update(
{ sessionId: id, waMessageId: messageId, status: In(ackStatusTransitionFrom(messageStatus)) },
{ status: messageStatus },
)
.then(result => result.affected ?? 0);

const logNoop = (): void =>
this.logger.debug(`Message ack ${messageId}: no status row advanced to ${messageStatus} (${status})`, {
sessionId: id,
messageId,
status,
action: 'message_ack_noop',
});

const onAckError = (err: unknown): void =>
this.logger.error(`Failed to advance ack for ${messageId}`, String(err));

void advanceAck()
.then(affected => {
if (affected > 0) return;
// affected:0 — most likely the send's 2nd save (which writes waMessageId) hasn't committed
// yet, so the row isn't matchable. Each ack is one-shot (WhatsApp won't necessarily resend),
// so retry ONCE after a short delay to close that race rather than leave it stuck at SENT.
const timer = setTimeout(() => {
void advanceAck()
.then(retried => {
if (retried === 0) logNoop();
})
.catch(onAckError);
}, ACK_RECONCILE_DELAY_MS);
timer.unref?.();
})
.catch(onAckError);
}

// Push the live delivery/read tick to the dashboard over the websocket (neutral status).
Expand Down Expand Up @@ -666,28 +697,18 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat
action: 'message_reaction_received',
});

void this.messageRepository
.findOne({ where: { sessionId: id, waMessageId: event.messageId } })
.then(async msg => {
if (!msg) return;
const metadata = msg.metadata || {};
const reactions = (metadata.reactions as Record<string, string>) || {};

if (!event.reaction) {
delete reactions[event.senderId];
} else {
reactions[event.senderId] = event.reaction;
}

metadata.reactions = reactions;
msg.metadata = metadata;
await this.messageRepository.save(msg);

this.eventsGateway.emitMessageReaction(id, { ...event, reactions });
})
.catch(err => {
this.logger.error(`Failed to update message reaction: ${event.messageId}`, String(err));
});
// Serialize per message so two concurrent reactions don't read the same snapshot and clobber
// each other on the full-row save. A prior chain's failure must not block later reactions.
const key = `${id}:${event.messageId}`;
const prior = this.reactionChains.get(key) ?? Promise.resolve();
const next = prior.catch(() => undefined).then(() => this.applyReaction(id, event));
this.reactionChains.set(key, next);
void next.finally(() => {
// Clean up only if no newer reaction chained after us, so the map can't leak per message.
if (this.reactionChains.get(key) === next) {
this.reactionChains.delete(key);
}
});
},
onDisconnected: (reason: string): void => {
this.logger.warn(`Session disconnected: ${reason}`, {
Expand Down Expand Up @@ -751,6 +772,33 @@ export class SessionService implements OnModuleDestroy, OnModuleInit, OnApplicat
});
}

/**
* Apply one reaction event to the stored message's reactions map (read-modify-write of the JSON
* column). Invoked through the per-message serialization chain in onMessageReaction, so concurrent
* reactions on the same message run sequentially and don't clobber each other.
*/
private async applyReaction(id: string, event: ReactionEvent): Promise<void> {
try {
const msg = await this.messageRepository.findOne({ where: { sessionId: id, waMessageId: event.messageId } });
if (!msg) return;

const metadata = msg.metadata || {};
const reactions = (metadata.reactions as Record<string, string>) || {};
if (!event.reaction) {
delete reactions[event.senderId];
} else {
reactions[event.senderId] = event.reaction;
}
metadata.reactions = reactions;
msg.metadata = metadata;
await this.messageRepository.save(msg);

this.eventsGateway.emitMessageReaction(id, { ...event, reactions });
} catch (err) {
this.logger.error(`Failed to update message reaction: ${event.messageId}`, String(err));
}
}

private scheduleReconnect(id: string, session: Session): void {
const state = this.reconnectStates.get(id);
if (!state) return;
Expand Down
Loading