diff --git a/CHANGELOG.md b/CHANGELOG.md index 66bcc8f6..bf4822e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- **Bulk-sent messages are now recorded, their errors no longer leak internal addresses, and a running + batch can be cancelled across instances.** Messages sent via a bulk batch went straight to the engine + and were never written to the messages table, so they were invisible to chat history and statistics; a + blocked-destination (SSRF) failure stored the refused internal address verbatim in the batch result + (readable via the batch-status endpoint); and a cancellation was only honoured by the process that + created the batch. Bulk sends now persist like single sends, a blocked-destination error is reported as a + generic code, and the batch is re-checked against the database as it runs so a cancel issued by another + instance (or after a restart) stops it. + ## [0.4.2] - 2026-06-19 Bug-fix and hardening release: access-control tightening, session-lifecycle resilience, data-migration diff --git a/src/modules/message/bulk-message.service.spec.ts b/src/modules/message/bulk-message.service.spec.ts index ce579afb..7c72b30e 100644 --- a/src/modules/message/bulk-message.service.spec.ts +++ b/src/modules/message/bulk-message.service.spec.ts @@ -1,8 +1,11 @@ import { Test, TestingModule } from '@nestjs/testing'; import { getRepositoryToken } from '@nestjs/typeorm'; -import { BulkMessageService, resolveFinalBatchStatus } from './bulk-message.service'; +import { BulkMessageService, resolveFinalBatchStatus, sanitizeBatchError } from './bulk-message.service'; import { MessageBatch, BatchStatus } from './entities/message-batch.entity'; +import { MessageStatus } from './entities/message.entity'; import { SessionService } from '../session/session.service'; +import { MessageService } from './message.service'; +import { SsrfBlockedError } from '../../common/security/ssrf-guard'; /** Regression lock for the terminal-status decision (cancel-clobber + stopOnError overwrite bugs). */ describe('resolveFinalBatchStatus', () => { @@ -43,6 +46,7 @@ describe('BulkMessageService.onApplicationBootstrap', () => { BulkMessageService, { provide: getRepositoryToken(MessageBatch, 'data'), useValue: repo }, { provide: SessionService, useValue: { getEngine: jest.fn() } }, + { provide: MessageService, useValue: { saveOutgoingMessage: jest.fn() } }, ], }).compile(); service = module.get(BulkMessageService); @@ -65,3 +69,105 @@ describe('BulkMessageService.onApplicationBootstrap', () => { expect(repo.save).not.toHaveBeenCalled(); }); }); + +/** Regression lock: an SSRF block (which names the internal host/IP) must not be stored verbatim. */ +describe('sanitizeBatchError', () => { + it('replaces an SSRF block message with a generic one (no internal address leak)', () => { + const result = sanitizeBatchError( + new SsrfBlockedError('Host evil.example resolves to a blocked internal address: 169.254.169.254'), + ); + expect(result.message).not.toContain('169.254.169.254'); + expect(result.code).toBe('SEND_BLOCKED'); + }); + + it('passes through an ordinary error message under SEND_FAILED', () => { + const result = sanitizeBatchError(new Error('Session is not active')); + expect(result).toEqual({ code: 'SEND_FAILED', message: 'Session is not active' }); + }); +}); + +describe('BulkMessageService.processBatch', () => { + let service: BulkMessageService; + let repo: { findOne: jest.Mock; save: jest.Mock }; + let messageService: { saveOutgoingMessage: jest.Mock }; + let engine: { sendTextMessage: jest.Mock }; + let sessionService: { getEngine: jest.Mock; findOne: jest.Mock }; + + const makeBatch = (messageCount: number): MessageBatch => + ({ + id: 'b1', + batchId: 'bx', + sessionId: 's1', + status: BatchStatus.PENDING, + currentIndex: 0, + messages: Array.from({ length: messageCount }, (_, i) => ({ + chatId: `c${i}@c.us`, + type: 'text', + content: { text: 'hi' }, + })), + options: { delayBetweenMessages: 0, randomizeDelay: false, stopOnError: false }, + progress: { total: messageCount, sent: 0, failed: 0, pending: messageCount, cancelled: 0 }, + results: [], + }) as unknown as MessageBatch; + + beforeEach(async () => { + engine = { sendTextMessage: jest.fn().mockResolvedValue({ id: 'wa1', timestamp: 111 }) }; + sessionService = { + getEngine: jest.fn().mockReturnValue(engine), + findOne: jest.fn().mockResolvedValue({ phone: '628' }), + }; + messageService = { saveOutgoingMessage: jest.fn().mockResolvedValue(undefined) }; + repo = { findOne: jest.fn(), save: jest.fn().mockImplementation(b => Promise.resolve(b)) }; + const module: TestingModule = await Test.createTestingModule({ + providers: [ + BulkMessageService, + { provide: getRepositoryToken(MessageBatch, 'data'), useValue: repo }, + { provide: SessionService, useValue: sessionService }, + { provide: MessageService, useValue: messageService }, + ], + }).compile(); + service = module.get(BulkMessageService); + }); + + const runProcessBatch = (): Promise => + (service as unknown as { processBatch: (id: string) => Promise }).processBatch('b1'); + + it('persists every sent message so it appears in chat history / stats', async () => { + repo.findOne.mockResolvedValue(makeBatch(1)); + + await runProcessBatch(); + + expect(messageService.saveOutgoingMessage).toHaveBeenCalledWith( + 's1', + expect.objectContaining({ + waMessageId: 'wa1', + chatId: 'c0@c.us', + type: 'text', + status: MessageStatus.SENT, + }), + ); + }); + + it('stops sending when the batch is cancelled in the DB by another instance/restart', async () => { + // First load is the running batch; the cadence re-read reports a CANCELLED status. + repo.findOne.mockResolvedValueOnce(makeBatch(3)).mockResolvedValue({ status: BatchStatus.CANCELLED }); + + await runProcessBatch(); + + // Only the first message (before the cadence re-read saw CANCELLED) was sent. + expect(engine.sendTextMessage).toHaveBeenCalledTimes(1); + }); + + it('does not clobber a CANCELLED that landed after the last cadence read (final status stays CANCELLED)', async () => { + const batch = makeBatch(1); + repo.findOne + .mockResolvedValueOnce(batch) // processBatch initial load + .mockResolvedValueOnce(batch) // cadence re-read (i=0) — still PROCESSING + .mockResolvedValue({ status: BatchStatus.CANCELLED }); // FINAL pre-save re-read — cancel landed late + + await runProcessBatch(); + + const savedStatuses = (repo.save.mock.calls as [MessageBatch][]).map(c => c[0].status); + expect(savedStatuses[savedStatuses.length - 1]).toBe(BatchStatus.CANCELLED); + }); +}); diff --git a/src/modules/message/bulk-message.service.ts b/src/modules/message/bulk-message.service.ts index 103cf11b..c36b707e 100644 --- a/src/modules/message/bulk-message.service.ts +++ b/src/modules/message/bulk-message.service.ts @@ -10,8 +10,11 @@ import { BatchMessageResult, } from './entities/message-batch.entity'; import { SendBulkMessageDto } from './dto/bulk-message.dto'; +import { MessageStatus } from './entities/message.entity'; import { SessionService } from '../session/session.service'; -import { IWhatsAppEngine } from '../../engine/interfaces/whatsapp-engine.interface'; +import { MessageService } from './message.service'; +import { SsrfBlockedError } from '../../common/security/ssrf-guard'; +import { IWhatsAppEngine, MessageResult } from '../../engine/interfaces/whatsapp-engine.interface'; // Type definitions for bulk message content interface BulkMessageContent { @@ -40,6 +43,18 @@ export function resolveFinalBatchStatus( return progress.failed > 0 && progress.sent === 0 ? BatchStatus.FAILED : BatchStatus.COMPLETED; } +/** + * Build the error stored on a batch result. An SSRF block names the internal host/IP it refused, so + * it must never be persisted/returned verbatim — it would be readable via GET batch status. Map it to + * a generic, code-tagged message; ordinary errors keep their (non-sensitive) message. + */ +export function sanitizeBatchError(error: unknown): { code: string; message: string } { + if (error instanceof SsrfBlockedError) { + return { code: 'SEND_BLOCKED', message: 'Destination address is not allowed' }; + } + return { code: 'SEND_FAILED', message: error instanceof Error ? error.message : String(error) }; +} + @Injectable() export class BulkMessageService implements OnApplicationBootstrap { private readonly logger = new Logger(BulkMessageService.name); @@ -49,6 +64,7 @@ export class BulkMessageService implements OnApplicationBootstrap { @InjectRepository(MessageBatch, 'data') private readonly batchRepository: Repository, private readonly sessionService: SessionService, + private readonly messageService: MessageService, ) {} /** @@ -182,6 +198,7 @@ export class BulkMessageService implements OnApplicationBootstrap { const results: BatchMessageResult[] = batch.results || []; let stoppedOnError = false; + let cancelledByDb = false; for (let i = batch.currentIndex; i < batch.messages.length; i++) { // Check for cancellation @@ -209,17 +226,21 @@ export class BulkMessageService implements OnApplicationBootstrap { batch.progress.sent++; batch.progress.pending--; + // Persist like a single send so the message shows in chat history + stats. The engine echo + // (onMessageCreate) fires the webhook/WS but does NOT write the DB, so without this the + // bulk-sent message is invisible to the messages table. + await this.persistSentMessage(batch.sessionId, msg.chatId, msg.type, content, messageResult); + this.logger.debug(`Batch ${batch.batchId}: Sent message ${i + 1}/${batch.messages.length} to ${msg.chatId}`); } catch (error) { result.status = BatchMessageStatus.FAILED; - result.error = { - code: 'SEND_FAILED', - message: String(error), - }; + // Sanitize: an SSRF block names an internal address — never store/return/log it verbatim. + const sanitized = sanitizeBatchError(error); + result.error = sanitized; batch.progress.failed++; batch.progress.pending--; - this.logger.warn(`Batch ${batch.batchId}: Failed message ${i + 1} to ${msg.chatId}: ${String(error)}`); + this.logger.warn(`Batch ${batch.batchId}: Failed message ${i + 1} to ${msg.chatId}: ${sanitized.message}`); if (batch.options.stopOnError) { batch.status = BatchStatus.FAILED; @@ -235,6 +256,15 @@ export class BulkMessageService implements OnApplicationBootstrap { // Save progress periodically (every 10 messages or last message) if (i % 10 === 0 || i === batch.messages.length - 1) { + // Honor a cancellation issued by ANOTHER instance / after a restart — the in-memory Map only + // sees same-process cancels. Re-read the status BEFORE saving so we don't clobber a CANCELLED + // back to PROCESSING. + const fresh = await this.batchRepository.findOne({ where: { id: batch.id }, select: ['status'] }); + if (fresh?.status === BatchStatus.CANCELLED) { + cancelledByDb = true; + this.logger.log(`Batch ${batch.batchId} cancelled (DB) at index ${i}`); + break; + } await this.batchRepository.save(batch); } @@ -247,7 +277,16 @@ export class BulkMessageService implements OnApplicationBootstrap { // Final update. NOTE: `batch` still holds the in-memory PROCESSING status from the start, so a // cancellation persisted by cancelBatch would be overwritten if we saved without re-deriving it. - const cancelled = !this.processingBatches.get(batch.id); + // A cancel may also have landed AFTER the last cadence re-read (multi-replica / post-restart); the + // unconditional save below would clobber it back to a terminal non-cancelled status, so re-read + // once more here unless we already know the batch was cancelled. + if (!cancelledByDb) { + const fresh = await this.batchRepository.findOne({ where: { id: batch.id }, select: ['status'] }); + if (fresh?.status === BatchStatus.CANCELLED) { + cancelledByDb = true; + } + } + const cancelled = cancelledByDb || !this.processingBatches.get(batch.id); batch.status = resolveFinalBatchStatus(cancelled, stoppedOnError, batch.progress); if (cancelled) { // Reconcile the counters the same way cancelBatch does, so the persisted state is consistent. @@ -295,12 +334,48 @@ export class BulkMessageService implements OnApplicationBootstrap { return processValue(content) as BulkMessageContent; } + /** + * Persist a successfully-sent batch message via the shared single-send persistence path, so it + * shows up in chat history and stats like any other outgoing message. Best-effort: a persistence + * failure must never flip a message that actually went out to FAILED. + */ + private async persistSentMessage( + sessionId: string, + chatId: string, + type: string, + content: BulkMessageContent, + result: MessageResult, + ): Promise { + const media = content.image ?? content.video ?? content.audio ?? content.document; + try { + await this.messageService.saveOutgoingMessage(sessionId, { + waMessageId: result.id, + chatId, + body: content.text ?? content.caption ?? '', + type, + timestamp: result.timestamp, + status: MessageStatus.SENT, + metadata: media + ? { + media: { + mimetype: media.mimetype, + data: media.url ?? media.base64, + filename: content.document?.filename, + }, + } + : undefined, + }); + } catch (error) { + this.logger.warn(`Batch message persisted-after-send failed: ${String(error)}`); + } + } + private sendMessage( engine: IWhatsAppEngine, chatId: string, type: string, content: BulkMessageContent, - ): Promise<{ id: string }> { + ): Promise { switch (type) { case 'text': return engine.sendTextMessage(chatId, content.text || ''); diff --git a/src/modules/message/message.service.ts b/src/modules/message/message.service.ts index 24e6a5d1..4daca586 100644 --- a/src/modules/message/message.service.ts +++ b/src/modules/message/message.service.ts @@ -480,9 +480,10 @@ export class MessageService { /** * Save outgoing message to database. - * When called before sending, creates a record with PENDING status. + * When called before sending, creates a record with PENDING status; bulk send reuses this after a + * successful send (status SENT) so batch messages are persisted like single sends. */ - private async saveOutgoingMessage( + async saveOutgoingMessage( sessionId: string, data: { waMessageId?: string;