From e999918a116aff0bce84298246264b633acea0f5 Mon Sep 17 00:00:00 2001 From: rmyndharis Date: Thu, 18 Jun 2026 10:26:00 +0700 Subject: [PATCH 1/7] feat(engine): add baileys_stored_messages entity + migration --- src/app.module.ts | 1 + ...000000000-AddBaileysStoredMessages.spec.ts | 30 +++++++++++++++ .../1781000000000-AddBaileysStoredMessages.ts | 38 +++++++++++++++++++ .../adapters/baileys-stored-message.entity.ts | 26 +++++++++++++ 4 files changed, 95 insertions(+) create mode 100644 src/database/migrations/1781000000000-AddBaileysStoredMessages.spec.ts create mode 100644 src/database/migrations/1781000000000-AddBaileysStoredMessages.ts create mode 100644 src/engine/adapters/baileys-stored-message.entity.ts diff --git a/src/app.module.ts b/src/app.module.ts index 4a86fb93..395b9839 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -90,6 +90,7 @@ if (process.env.QUEUE_ENABLED === 'true') { __dirname + '/modules/webhook/**/*.entity{.ts,.js}', __dirname + '/modules/message/**/*.entity{.ts,.js}', __dirname + '/modules/template/**/*.entity{.ts,.js}', + __dirname + '/engine/**/*.entity{.ts,.js}', ], migrations: [__dirname + '/database/migrations/*{.ts,.js}'], logging: configService.get('dataDatabase.logging', false), diff --git a/src/database/migrations/1781000000000-AddBaileysStoredMessages.spec.ts b/src/database/migrations/1781000000000-AddBaileysStoredMessages.spec.ts new file mode 100644 index 00000000..463b04fc --- /dev/null +++ b/src/database/migrations/1781000000000-AddBaileysStoredMessages.spec.ts @@ -0,0 +1,30 @@ +import { DataSource } from 'typeorm'; +import { AddBaileysStoredMessages1781000000000 } from './1781000000000-AddBaileysStoredMessages'; + +describe('AddBaileysStoredMessages migration', () => { + let ds: DataSource; + + beforeEach(async () => { + // A `sessions` table must exist for the FK; create a minimal stand-in. + ds = new DataSource({ type: 'sqlite', database: ':memory:' }); + await ds.initialize(); + await ds.query(`CREATE TABLE "sessions" ("id" varchar PRIMARY KEY NOT NULL)`); + }); + + afterEach(async () => { + await ds.destroy(); + }); + + it('creates and drops the table + indexes', async () => { + const runner = ds.createQueryRunner(); + const migration = new AddBaileysStoredMessages1781000000000(); + + await migration.up(runner); + expect(await runner.hasTable('baileys_stored_messages')).toBe(true); + + await migration.down(runner); + expect(await runner.hasTable('baileys_stored_messages')).toBe(false); + + await runner.release(); + }); +}); diff --git a/src/database/migrations/1781000000000-AddBaileysStoredMessages.ts b/src/database/migrations/1781000000000-AddBaileysStoredMessages.ts new file mode 100644 index 00000000..cb6ebc77 --- /dev/null +++ b/src/database/migrations/1781000000000-AddBaileysStoredMessages.ts @@ -0,0 +1,38 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +/** + * Creates `baileys_stored_messages` — the persisted Baileys message store backing + * reply/forward/react/delete. CASCADE-deleted with its session. Hand-authored because + * `synchronize` is off for the `data` connection on Postgres (and optional on SQLite). + */ +export class AddBaileysStoredMessages1781000000000 implements MigrationInterface { + name = 'AddBaileysStoredMessages1781000000000'; + + public async up(queryRunner: QueryRunner): Promise { + if (await queryRunner.hasTable('baileys_stored_messages')) return; + const isPostgres = queryRunner.connection.options.type === 'postgres'; + + if (isPostgres) { + await queryRunner.query( + `CREATE TABLE "baileys_stored_messages" ("id" varchar PRIMARY KEY NOT NULL DEFAULT gen_random_uuid()::varchar, "sessionId" varchar NOT NULL, "waMessageId" varchar NOT NULL, "serializedMessage" text NOT NULL, "createdAt" timestamp NOT NULL DEFAULT NOW(), CONSTRAINT "FK_baileys_stored_messages_sessionId" FOREIGN KEY ("sessionId") REFERENCES "sessions" ("id") ON DELETE CASCADE ON UPDATE NO ACTION)`, + ); + } else { + await queryRunner.query( + `CREATE TABLE "baileys_stored_messages" ("id" varchar PRIMARY KEY NOT NULL, "sessionId" varchar NOT NULL, "waMessageId" varchar NOT NULL, "serializedMessage" text NOT NULL, "createdAt" datetime NOT NULL DEFAULT (datetime('now')), CONSTRAINT "FK_baileys_stored_messages_sessionId" FOREIGN KEY ("sessionId") REFERENCES "sessions" ("id") ON DELETE CASCADE ON UPDATE NO ACTION)`, + ); + } + + await queryRunner.query( + `CREATE UNIQUE INDEX "UQ_baileys_stored_messages_session_wamsg" ON "baileys_stored_messages" ("sessionId", "waMessageId")`, + ); + await queryRunner.query( + `CREATE INDEX "IDX_baileys_stored_messages_session_created" ON "baileys_stored_messages" ("sessionId", "createdAt")`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "IDX_baileys_stored_messages_session_created"`); + await queryRunner.query(`DROP INDEX "UQ_baileys_stored_messages_session_wamsg"`); + await queryRunner.query(`DROP TABLE "baileys_stored_messages"`); + } +} diff --git a/src/engine/adapters/baileys-stored-message.entity.ts b/src/engine/adapters/baileys-stored-message.entity.ts new file mode 100644 index 00000000..55884631 --- /dev/null +++ b/src/engine/adapters/baileys-stored-message.entity.ts @@ -0,0 +1,26 @@ +import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm'; + +/** + * Persisted Baileys message store (the lib ships none). Holds the serialized WAMessage proto + * (via BufferJSON) so reply/forward/react/delete can resolve the original message/key by id across + * restarts. Engine-specific — lives in the engine layer, not the neutral `messages` table. + */ +@Entity('baileys_stored_messages') +@Index(['sessionId', 'waMessageId'], { unique: true }) // lookup + dedup (send-return vs upsert echo) +@Index(['sessionId', 'createdAt']) // eviction ordering +export class BaileysStoredMessage { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column() + sessionId: string; + + @Column() + waMessageId: string; + + @Column({ type: 'text' }) + serializedMessage: string; + + @CreateDateColumn() + createdAt: Date; +} From 1ecf6c63013ca2b6359d9bd00ca20f74091aa80e Mon Sep 17 00:00:00 2001 From: rmyndharis Date: Thu, 18 Jun 2026 10:34:14 +0700 Subject: [PATCH 2/7] feat(engine): add BaileysMessageStoreService (persisted, bounded) --- .../baileys-message-store.service.spec.ts | 66 +++++++++++++++++ .../adapters/baileys-message-store.service.ts | 70 +++++++++++++++++++ src/engine/engine.module.ts | 6 +- src/engine/types/baileys.types.ts | 17 +++++ test/__mocks__/@whiskeysockets/baileys.ts | 28 ++++++++ 5 files changed, 186 insertions(+), 1 deletion(-) create mode 100644 src/engine/adapters/baileys-message-store.service.spec.ts create mode 100644 src/engine/adapters/baileys-message-store.service.ts diff --git a/src/engine/adapters/baileys-message-store.service.spec.ts b/src/engine/adapters/baileys-message-store.service.spec.ts new file mode 100644 index 00000000..040cc096 --- /dev/null +++ b/src/engine/adapters/baileys-message-store.service.spec.ts @@ -0,0 +1,66 @@ +import { DataSource, Repository } from 'typeorm'; +import { BaileysStoredMessage } from './baileys-stored-message.entity'; +import { BaileysMessageStoreService } from './baileys-message-store.service'; + +describe('BaileysMessageStoreService', () => { + let ds: DataSource; + let repo: Repository; + let service: BaileysMessageStoreService; + + beforeEach(async () => { + ds = new DataSource({ type: 'sqlite', database: ':memory:', entities: [BaileysStoredMessage], synchronize: true }); + await ds.initialize(); + repo = ds.getRepository(BaileysStoredMessage); + service = new BaileysMessageStoreService(repo); + }); + + afterEach(async () => { + await ds.destroy(); + delete process.env.BAILEYS_MESSAGE_STORE_LIMIT; + }); + + // Partial WAMessage fixture — cast through unknown so strict checks don't fire on the incomplete shape. + const msg = (id: string) => + ({ + key: { id, remoteJid: '1@s.whatsapp.net', fromMe: false }, + message: { conversation: id }, + }) as unknown as Parameters[1]; + + it('round-trips a WAMessage through BufferJSON', async () => { + await service.put('s1', msg('M1')); + const got = await service.getMessage('s1', 'M1'); + expect(got?.key?.id).toBe('M1'); + expect(got?.message?.conversation).toBe('M1'); + }); + + it('returns null for an unknown id and is session-scoped', async () => { + await service.put('s1', msg('M1')); + expect(await service.getMessage('s1', 'NOPE')).toBeNull(); + expect(await service.getMessage('s2', 'M1')).toBeNull(); + }); + + it('is idempotent on (sessionId, waMessageId)', async () => { + await service.put('s1', msg('M1')); + await service.put('s1', msg('M1')); + expect(await repo.count({ where: { sessionId: 's1' } })).toBe(1); + }); + + it('evicts oldest beyond the per-session cap', async () => { + process.env.BAILEYS_MESSAGE_STORE_LIMIT = '2'; + const s = new BaileysMessageStoreService(repo); + await s.put('s1', msg('M1')); + await s.put('s1', msg('M2')); + await s.put('s1', msg('M3')); + expect(await s.getMessage('s1', 'M1')).toBeNull(); // oldest evicted + expect(await s.getMessage('s1', 'M3')).not.toBeNull(); + expect(await repo.count({ where: { sessionId: 's1' } })).toBeLessThanOrEqual(2); + }); + + it('clearSession removes only that session', async () => { + await service.put('s1', msg('M1')); + await service.put('s2', msg('M2')); + await service.clearSession('s1'); + expect(await service.getMessage('s1', 'M1')).toBeNull(); + expect(await service.getMessage('s2', 'M2')).not.toBeNull(); + }); +}); diff --git a/src/engine/adapters/baileys-message-store.service.ts b/src/engine/adapters/baileys-message-store.service.ts new file mode 100644 index 00000000..2299e899 --- /dev/null +++ b/src/engine/adapters/baileys-message-store.service.ts @@ -0,0 +1,70 @@ +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { BufferJSON } from '@whiskeysockets/baileys'; +import type { WAMessage } from '@whiskeysockets/baileys'; +import { BaileysStoredMessage } from './baileys-stored-message.entity'; +import { BaileysMessageStore } from '../types/baileys.types'; + +function positiveIntFromEnv(name: string, fallback: number): number { + const parsed = Number.parseInt(process.env[name] ?? '', 10); + return Number.isInteger(parsed) && parsed > 0 ? parsed : fallback; +} + +@Injectable() +export class BaileysMessageStoreService implements BaileysMessageStore { + constructor( + @InjectRepository(BaileysStoredMessage, 'data') + private readonly repo: Repository, + ) {} + + async put(sessionId: string, msg: WAMessage): Promise { + const waMessageId = msg.key?.id; + if (!waMessageId) { + return; + } + const serializedMessage = JSON.stringify(msg, BufferJSON.replacer); + // Idempotent: the same message arrives from the send return AND the messages.upsert echo. + await this.repo.upsert({ sessionId, waMessageId, serializedMessage }, ['sessionId', 'waMessageId']); + await this.enforceLimit(sessionId); + } + + async getMessage(sessionId: string, messageId: string): Promise { + const row = await this.repo.findOne({ where: { sessionId, waMessageId: messageId } }); + if (!row) { + return null; + } + return JSON.parse(row.serializedMessage, BufferJSON.reviver) as WAMessage; + } + + async clearSession(sessionId: string): Promise { + await this.repo.delete({ sessionId }); + } + + /** + * Per-session row cap: keep the newest N rows, delete the rest. + * Uses a NOT IN subquery over the N keep-IDs so ties in createdAt (fast inserts in tests) + * don't incorrectly evict recently added rows. + */ + private async enforceLimit(sessionId: string): Promise { + const limit = positiveIntFromEnv('BAILEYS_MESSAGE_STORE_LIMIT', 5000); + const total = await this.repo.count({ where: { sessionId } }); + if (total <= limit) { + return; + } + // Find the IDs of the N rows to keep (newest N by insertion order; waMessageId as stable tiebreaker). + const keepRows = await this.repo.find({ + where: { sessionId }, + order: { createdAt: 'DESC', waMessageId: 'DESC' }, + take: limit, + select: ['id'], + }); + const keepIds = keepRows.map(r => r.id); + await this.repo + .createQueryBuilder() + .delete() + .where('sessionId = :sessionId', { sessionId }) + .andWhere('id NOT IN (:...keepIds)', { keepIds }) + .execute(); + } +} diff --git a/src/engine/engine.module.ts b/src/engine/engine.module.ts index caf1485a..8ddf47d5 100644 --- a/src/engine/engine.module.ts +++ b/src/engine/engine.module.ts @@ -1,9 +1,13 @@ import { Global, Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; import { EngineFactory } from './engine.factory'; +import { BaileysStoredMessage } from './adapters/baileys-stored-message.entity'; +import { BaileysMessageStoreService } from './adapters/baileys-message-store.service'; @Global() @Module({ - providers: [EngineFactory], + imports: [TypeOrmModule.forFeature([BaileysStoredMessage], 'data')], + providers: [EngineFactory, BaileysMessageStoreService], exports: [EngineFactory], }) export class EngineModule {} diff --git a/src/engine/types/baileys.types.ts b/src/engine/types/baileys.types.ts index b380ce6d..c600c115 100644 --- a/src/engine/types/baileys.types.ts +++ b/src/engine/types/baileys.types.ts @@ -1,3 +1,18 @@ +import type { WAMessage } from '@whiskeysockets/baileys'; + +/** + * Persistence boundary for the Baileys engine's message store. The adapter depends on this narrow + * interface (not the concrete Nest service) so it stays unit-testable with a fake. + */ +export interface BaileysMessageStore { + /** Persist a message (idempotent on the same id) so it can be referenced by reply/forward/react/delete. */ + put(sessionId: string, msg: WAMessage): Promise; + /** Look up a previously-seen message by its id, or null. */ + getMessage(sessionId: string, messageId: string): Promise; + /** Remove all stored messages for a session (called on logout). */ + clearSession(sessionId: string): Promise; +} + /** * Per-call construction config for {@link BaileysAdapter}. Engine-neutral fields come from the * factory; `authDir` is the base multi-file auth directory from the opaque `engine.baileys.*` blob @@ -8,6 +23,8 @@ export interface BaileysAdapterConfig { authDir: string; proxyUrl?: string; proxyType?: 'http' | 'https' | 'socks4' | 'socks5'; + /** Persisted store for reply/forward/react/delete. Provided by the plugin; the four ops require it. */ + messageStore?: BaileysMessageStore; } /** diff --git a/test/__mocks__/@whiskeysockets/baileys.ts b/test/__mocks__/@whiskeysockets/baileys.ts index 16dc8b0e..e3df0b27 100644 --- a/test/__mocks__/@whiskeysockets/baileys.ts +++ b/test/__mocks__/@whiskeysockets/baileys.ts @@ -14,3 +14,31 @@ export const useMultiFileAuthState = jest.fn(); export const fetchLatestBaileysVersion = jest.fn(); export const getContentType = jest.fn(); export const DisconnectReason = { loggedOut: 401 }; + +// Inline implementation mirrored from @whiskeysockets/baileys/lib/Utils/generics.js +// (the package is pure ESM; ts-jest runs CJS, so the mock owns the serialisation helpers) + +type BufferLike = { type: 'Buffer'; data: string | number[] }; +type BufferJsonObject = { buffer?: boolean; type?: string; data?: string | number[]; value?: string | number[] }; + +export const BufferJSON = { + replacer: (_k: string, value: unknown): unknown => { + if (Buffer.isBuffer(value) || value instanceof Uint8Array) { + return { type: 'Buffer', data: Buffer.from(value).toString('base64') }; + } + if (typeof value === 'object' && value !== null && (value as BufferLike).type === 'Buffer') { + return { type: 'Buffer', data: Buffer.from((value as BufferLike).data).toString('base64') }; + } + return value; + }, + reviver: (_: string, value: unknown): unknown => { + if (typeof value === 'object' && value !== null) { + const obj = value as BufferJsonObject; + if (obj.buffer === true || obj.type === 'Buffer') { + const val = obj.data ?? obj.value; + return typeof val === 'string' ? Buffer.from(val, 'base64') : Buffer.from(val ?? []); + } + } + return value; + }, +}; From 6119919ce0c6766b2d7ce62a4cf8fde56e87f790 Mon Sep 17 00:00:00 2001 From: rmyndharis Date: Thu, 18 Jun 2026 10:40:30 +0700 Subject: [PATCH 3/7] perf(engine): baileys store eviction via createdAt+id cutoff, not NOT-IN(5000) --- .../baileys-message-store.service.spec.ts | 36 ++++++++++++++++--- .../adapters/baileys-message-store.service.ts | 27 ++++++-------- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/src/engine/adapters/baileys-message-store.service.spec.ts b/src/engine/adapters/baileys-message-store.service.spec.ts index 040cc096..07e95508 100644 --- a/src/engine/adapters/baileys-message-store.service.spec.ts +++ b/src/engine/adapters/baileys-message-store.service.spec.ts @@ -48,12 +48,40 @@ describe('BaileysMessageStoreService', () => { it('evicts oldest beyond the per-session cap', async () => { process.env.BAILEYS_MESSAGE_STORE_LIMIT = '2'; const s = new BaileysMessageStoreService(repo); - await s.put('s1', msg('M1')); - await s.put('s1', msg('M2')); + // Use distinct createdAt values so ordering is deterministic regardless of UUID tiebreaker. + const t0 = new Date('2024-01-01T00:00:00.000Z'); + const t1 = new Date('2024-01-01T00:00:01.000Z'); + const t2 = new Date('2024-01-01T00:00:02.000Z'); + for (const [waMessageId, createdAt] of [ + ['M1', t0], + ['M2', t1], + ['M3', t2], + ] as [string, Date][]) { + await repo.save(repo.create({ sessionId: 's1', waMessageId, serializedMessage: '{}', createdAt })); + } + // Trigger eviction: put M3 again (idempotent upsert) so enforceLimit runs with 3 rows and cap=2. await s.put('s1', msg('M3')); - expect(await s.getMessage('s1', 'M1')).toBeNull(); // oldest evicted + expect(await s.getMessage('s1', 'M1')).toBeNull(); // oldest (t0) evicted + expect(await s.getMessage('s1', 'M2')).not.toBeNull(); expect(await s.getMessage('s1', 'M3')).not.toBeNull(); - expect(await repo.count({ where: { sessionId: 's1' } })).toBeLessThanOrEqual(2); + expect(await repo.count({ where: { sessionId: 's1' } })).toBe(2); + }); + + it('keeps exactly limit rows when multiple share the same createdAt (tiebreaker via id)', async () => { + process.env.BAILEYS_MESSAGE_STORE_LIMIT = '2'; + const s = new BaileysMessageStoreService(repo); + // Insert 3 rows with identical createdAt to stress the (createdAt, id) tiebreaker. + // With UUID primary keys, id ordering is lexicographic — we can only assert count, not which survive. + const sharedTs = new Date('2024-01-01T00:00:00.000Z'); + for (const waMessageId of ['T1', 'T2', 'T3']) { + await repo.save(repo.create({ sessionId: 's2', waMessageId, serializedMessage: '{}', createdAt: sharedTs })); + } + // Trigger eviction: put a 4th message (distinct, newer ts) through the service. + await s.put('s2', msg('T4')); + // Exactly limit rows must remain — no over- or under-deletion. + expect(await repo.count({ where: { sessionId: 's2' } })).toBe(2); + // T4 is the newest (distinct createdAt = now) and must survive. + expect(await s.getMessage('s2', 'T4')).not.toBeNull(); }); it('clearSession removes only that session', async () => { diff --git a/src/engine/adapters/baileys-message-store.service.ts b/src/engine/adapters/baileys-message-store.service.ts index 2299e899..54b44858 100644 --- a/src/engine/adapters/baileys-message-store.service.ts +++ b/src/engine/adapters/baileys-message-store.service.ts @@ -41,30 +41,25 @@ export class BaileysMessageStoreService implements BaileysMessageStore { await this.repo.delete({ sessionId }); } - /** - * Per-session row cap: keep the newest N rows, delete the rest. - * Uses a NOT IN subquery over the N keep-IDs so ties in createdAt (fast inserts in tests) - * don't incorrectly evict recently added rows. - */ + /** Per-session row cap: keep the newest N, delete the rest. Deterministic via (createdAt, id). */ private async enforceLimit(sessionId: string): Promise { const limit = positiveIntFromEnv('BAILEYS_MESSAGE_STORE_LIMIT', 5000); - const total = await this.repo.count({ where: { sessionId } }); - if (total <= limit) { - return; - } - // Find the IDs of the N rows to keep (newest N by insertion order; waMessageId as stable tiebreaker). - const keepRows = await this.repo.find({ + const cutoff = await this.repo.find({ where: { sessionId }, - order: { createdAt: 'DESC', waMessageId: 'DESC' }, - take: limit, - select: ['id'], + order: { createdAt: 'DESC', id: 'DESC' }, + skip: limit, + take: 1, + select: ['id', 'createdAt'], }); - const keepIds = keepRows.map(r => r.id); + if (cutoff.length === 0) { + return; // under the cap — nothing to evict + } + const { id, createdAt } = cutoff[0]; await this.repo .createQueryBuilder() .delete() .where('sessionId = :sessionId', { sessionId }) - .andWhere('id NOT IN (:...keepIds)', { keepIds }) + .andWhere('(createdAt < :createdAt OR (createdAt = :createdAt AND id <= :id))', { createdAt, id }) .execute(); } } From c7a2e041870023a957e4235a212c38f21a2f30e3 Mon Sep 17 00:00:00 2001 From: rmyndharis Date: Thu, 18 Jun 2026 10:44:35 +0700 Subject: [PATCH 4/7] feat(engine): thread the baileys message store into the adapter --- src/engine/engine.factory.spec.ts | 12 ++++++++---- src/engine/engine.factory.ts | 4 +++- src/plugins/engines/baileys/index.spec.ts | 7 +++++++ src/plugins/engines/baileys/index.ts | 4 ++++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/engine/engine.factory.spec.ts b/src/engine/engine.factory.spec.ts index fc37c28a..2bdd7730 100644 --- a/src/engine/engine.factory.spec.ts +++ b/src/engine/engine.factory.spec.ts @@ -1,6 +1,7 @@ import { EngineFactory } from './engine.factory'; import { ConfigService } from '@nestjs/config'; import { PluginLoaderService, PluginType } from '../core/plugins'; +import { BaileysMessageStoreService } from './adapters/baileys-message-store.service'; describe('EngineFactory', () => { const engineBlob = { @@ -21,6 +22,9 @@ describe('EngineFactory', () => { return { get: jest.fn((key: string) => values[key]) } as unknown as ConfigService; }; + const buildMessageStore = (): BaileysMessageStoreService => + ({ put: jest.fn(), getMessage: jest.fn(), clearSession: jest.fn() }) as unknown as BaileysMessageStoreService; + it('passes ONLY engine-neutral fields to createEngine (no Puppeteer leak)', () => { const createEngine = jest.fn().mockReturnValue({}); const pluginInstance = { type: PluginType.ENGINE, createEngine }; @@ -28,7 +32,7 @@ describe('EngineFactory', () => { getPlugin: jest.fn().mockReturnValue({ instance: pluginInstance }), } as unknown as PluginLoaderService; - const factory = new EngineFactory(buildConfigService(), pluginLoader); + const factory = new EngineFactory(buildConfigService(), pluginLoader, buildMessageStore()); factory.create({ sessionId: 'sess-1', proxyUrl: 'http://p', proxyType: 'http' }); // Plain-object (not objectContaining) assertion: any browser key (headless/puppeteerArgs/ @@ -44,7 +48,7 @@ describe('EngineFactory', () => { getPlugin: jest.fn(), } as unknown as PluginLoaderService; - const factory = new EngineFactory(buildConfigService(), pluginLoader); + const factory = new EngineFactory(buildConfigService(), pluginLoader, buildMessageStore()); await factory.onModuleInit(); expect(registerBuiltInPlugin).toHaveBeenCalledWith( @@ -62,7 +66,7 @@ describe('EngineFactory', () => { getPlugin: jest.fn(), } as unknown as PluginLoaderService; - const factory = new EngineFactory(buildConfigService(), pluginLoader); + const factory = new EngineFactory(buildConfigService(), pluginLoader, buildMessageStore()); await factory.onModuleInit(); const registeredIds = registerBuiltInPlugin.mock.calls.map(call => (call as [{ id: string }])[0].id); @@ -75,7 +79,7 @@ describe('EngineFactory', () => { getPlugin: jest.fn().mockReturnValue(undefined), } as unknown as PluginLoaderService; - const factory = new EngineFactory(buildConfigService(), pluginLoader); + const factory = new EngineFactory(buildConfigService(), pluginLoader, buildMessageStore()); expect(() => factory.create({ sessionId: 'sess-2' })).not.toThrow(); }); }); diff --git a/src/engine/engine.factory.ts b/src/engine/engine.factory.ts index c8709c9d..9ea9397d 100644 --- a/src/engine/engine.factory.ts +++ b/src/engine/engine.factory.ts @@ -6,6 +6,7 @@ import { PluginLoaderService, PluginType, IEnginePlugin, PluginManifest } from ' import { WhatsAppWebJsPlugin } from '../plugins/engines/whatsapp-web-js'; import { BaileysPlugin } from '../plugins/engines/baileys'; import { createLogger } from '../common/services/logger.service'; +import { BaileysMessageStoreService } from './adapters/baileys-message-store.service'; export interface EngineCreateOptions { sessionId: string; @@ -21,6 +22,7 @@ export class EngineFactory implements OnModuleInit { constructor( private readonly configService: ConfigService, private readonly pluginLoader: PluginLoaderService, + private readonly baileysMessageStore: BaileysMessageStoreService, ) { this.engineType = this.configService.get('engine.type') ?? 'whatsapp-web.js'; } @@ -60,7 +62,7 @@ export class EngineFactory implements OnModuleInit { }; this.pluginLoader.registerBuiltInPlugin( baileysManifest, - new BaileysPlugin(), + new BaileysPlugin(this.baileysMessageStore), this.configService.get('engine') ?? {}, ); diff --git a/src/plugins/engines/baileys/index.spec.ts b/src/plugins/engines/baileys/index.spec.ts index ac5d3325..5a49c230 100644 --- a/src/plugins/engines/baileys/index.spec.ts +++ b/src/plugins/engines/baileys/index.spec.ts @@ -50,4 +50,11 @@ describe('BaileysPlugin.createEngine (opaque config)', () => { it('reports the baileys library name', () => { expect(new BaileysPlugin().getEngineLibrary().name).toBe('@whiskeysockets/baileys'); }); + + it('passes the message store to the adapter', () => { + const store = { put: jest.fn(), getMessage: jest.fn(), clearSession: jest.fn() }; + const plugin = new BaileysPlugin(store); + plugin.createEngine({ sessionId: 'sess-1' }); + expect(BaileysAdapter).toHaveBeenCalledWith(expect.objectContaining({ sessionId: 'sess-1', messageStore: store })); + }); }); diff --git a/src/plugins/engines/baileys/index.ts b/src/plugins/engines/baileys/index.ts index d0169f0a..a33ec851 100644 --- a/src/plugins/engines/baileys/index.ts +++ b/src/plugins/engines/baileys/index.ts @@ -6,11 +6,14 @@ import { PluginContext, PluginType, IEnginePlugin } from '../../../core/plugins'; import { IWhatsAppEngine } from '../../../engine/interfaces/whatsapp-engine.interface'; import { BaileysAdapter } from '../../../engine/adapters/baileys.adapter'; +import { BaileysMessageStore } from '../../../engine/types/baileys.types'; export class BaileysPlugin implements IEnginePlugin { type = PluginType.ENGINE as const; private context?: PluginContext; + constructor(private readonly messageStore?: BaileysMessageStore) {} + onLoad(context: PluginContext): Promise { this.context = context; context.logger.log('Baileys engine plugin loaded'); @@ -43,6 +46,7 @@ export class BaileysPlugin implements IEnginePlugin { authDir, proxyUrl, proxyType, + messageStore: this.messageStore, }); } From ffb38bd982015802baf69b496d4ca60cb8907b89 Mon Sep 17 00:00:00 2001 From: rmyndharis Date: Thu, 18 Jun 2026 10:53:53 +0700 Subject: [PATCH 5/7] feat(engine): baileys reply/forward/react/delete via the persisted store --- CHANGELOG.md | 1 + src/engine/adapters/baileys.adapter.spec.ts | 105 +++++++++++++++++++- src/engine/adapters/baileys.adapter.ts | 79 ++++++++++++--- src/plugins/engines/baileys/index.spec.ts | 6 +- src/plugins/engines/baileys/index.ts | 12 ++- test/baileys-engine.e2e-spec.ts | 4 + 6 files changed, 190 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cea16a1a..c736d862 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ plugins instead of in core (#265). it via `POST /plugins/auto-reply/enable` to exercise the capability layer end-to-end. (#294) - **Baileys engine (minimal slice)** — `ENGINE_TYPE=baileys` now selects a second, browser-free WhatsApp engine built on `@whiskeysockets/baileys` (WebSocket/Noise protocol, no Chromium). This first slice supports linking (QR + pairing code), sending and receiving **text**, recipient resolution, and typing presence; all other operations return HTTP 501 until later slices add a message store. Config: `BAILEYS_AUTH_DIR` (default `./data/baileys`). Proxy is not yet supported on this engine. (#299) - **Baileys engine — media/location/contact sends.** The Baileys engine (`ENGINE_TYPE=baileys`) can now send image/video/audio/document/sticker, location, and contact messages (slice 2a). URL media is fetched through the same SSRF-guarded path as the whatsapp-web.js engine (host guard + byte cap + timeout, no redirects). Reply/forward/react/delete remain unsupported (HTTP 501) until a later slice adds a message store. (#307) +- **Baileys engine — reply/forward/react/delete.** The Baileys engine can now reply to, forward, react to, and delete (revoke-for-everyone) messages, backed by a persisted per-session message store (`baileys_stored_messages`, bounded by `BAILEYS_MESSAGE_STORE_LIMIT`, default 5000). Delete-for-me and reading reactions remain unsupported (HTTP 501). (#PR) ### Changed diff --git a/src/engine/adapters/baileys.adapter.spec.ts b/src/engine/adapters/baileys.adapter.spec.ts index b6e72e11..59253739 100644 --- a/src/engine/adapters/baileys.adapter.spec.ts +++ b/src/engine/adapters/baileys.adapter.spec.ts @@ -45,7 +45,13 @@ import { EngineNotReadyError } from '../../common/errors/engine-not-ready.error' import { EngineNotSupportedError } from '../../common/errors/engine-not-supported.error'; import { loadRemoteMediaBuffer } from '../../common/media/load-remote-media'; -const newAdapter = (): BaileysAdapter => new BaileysAdapter({ sessionId: 'sess-1', authDir: './data/baileys' }); +const fakeStore = { + put: jest.fn().mockResolvedValue(undefined), + getMessage: jest.fn(), + clearSession: jest.fn().mockResolvedValue(undefined), +}; +const newAdapter = (): BaileysAdapter => + new BaileysAdapter({ sessionId: 'sess-1', authDir: './data/baileys', messageStore: fakeStore }); const noopCallbacks = (over: Partial = {}): EngineEventCallbacks => over; @@ -419,3 +425,100 @@ describe('BaileysAdapter media sends', () => { ).rejects.toBeInstanceOf(EngineNotReadyError); }); }); + +describe('BaileysAdapter store-backed ops', () => { + beforeEach(() => { + fakeSock.user = { id: '628999:1@s.whatsapp.net', name: 'Me' }; + jest.clearAllMocks(); + fakeSock.sendMessage.mockResolvedValue({ + key: { id: 'OUT', remoteJid: '628111@s.whatsapp.net', fromMe: true }, + messageTimestamp: 1700000009, + }); + }); + + const ready = async (): Promise => { + const adapter = newAdapter(); + await adapter.initialize({}); + fakeSock.fire('connection.update', { connection: 'open' }); + return adapter; + }; + + const stored = { + key: { id: 'TARGET', remoteJid: '628111@s.whatsapp.net', fromMe: false }, + message: { conversation: 'hi' }, + }; + + it('replyToMessage quotes the stored message', async () => { + fakeStore.getMessage.mockResolvedValue(stored); + const adapter = await ready(); + await adapter.replyToMessage('628111@s.whatsapp.net', 'TARGET', 'my reply'); + expect(fakeStore.getMessage).toHaveBeenCalledWith('sess-1', 'TARGET'); + expect(fakeSock.sendMessage).toHaveBeenCalledWith( + '628111@s.whatsapp.net', + { text: 'my reply' }, + { quoted: stored }, + ); + }); + + it('forwardMessage forwards the stored message', async () => { + fakeStore.getMessage.mockResolvedValue(stored); + const adapter = await ready(); + await adapter.forwardMessage('628111@s.whatsapp.net', '628222@s.whatsapp.net', 'TARGET'); + expect(fakeSock.sendMessage).toHaveBeenCalledWith('628222@s.whatsapp.net', { forward: stored }); + }); + + it('reactToMessage sends the stored key', async () => { + fakeStore.getMessage.mockResolvedValue(stored); + const adapter = await ready(); + await adapter.reactToMessage('628111@s.whatsapp.net', 'TARGET', '👍'); + expect(fakeSock.sendMessage).toHaveBeenCalledWith('628111@s.whatsapp.net', { + react: { text: '👍', key: stored.key }, + }); + }); + + it('deleteMessage revokes via the stored key', async () => { + fakeStore.getMessage.mockResolvedValue(stored); + const adapter = await ready(); + await adapter.deleteMessage('628111@s.whatsapp.net', 'TARGET', true); + expect(fakeSock.sendMessage).toHaveBeenCalledWith('628111@s.whatsapp.net', { delete: stored.key }); + }); + + it('throws when the referenced message is not in the store', async () => { + fakeStore.getMessage.mockResolvedValue(null); + const adapter = await ready(); + await expect(adapter.replyToMessage('c', 'GONE', 'x')).rejects.toThrow(/not found/i); + }); + + it('deleteMessage for-me (forEveryone=false) is not supported', async () => { + const adapter = await ready(); + await expect(adapter.deleteMessage('c', 'TARGET', false)).rejects.toBeInstanceOf(EngineNotSupportedError); + }); + + it('populates the store on an inbound message', async () => { + const adapter = newAdapter(); + await adapter.initialize({}); + fakeSock.fire('messages.upsert', { + type: 'notify', + messages: [ + { key: { remoteJid: '628111@s.whatsapp.net', fromMe: false, id: 'IN9' }, message: { conversation: 'hi' } }, + ], + }); + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const inboundMatcher = expect.objectContaining({ key: expect.objectContaining({ id: 'IN9' }) }); + expect(fakeStore.put).toHaveBeenCalledWith('sess-1', inboundMatcher); + }); + + it('populates the store on an outgoing send', async () => { + const adapter = await ready(); + await adapter.sendTextMessage('628111@s.whatsapp.net', 'hello'); + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + const outboundMatcher = expect.objectContaining({ key: expect.objectContaining({ id: 'OUT' }) }); + expect(fakeStore.put).toHaveBeenCalledWith('sess-1', outboundMatcher); + }); + + it('clears the store on logout', async () => { + const adapter = await ready(); + await adapter.logout(); + expect(fakeStore.clearSession).toHaveBeenCalledWith('sess-1'); + }); +}); diff --git a/src/engine/adapters/baileys.adapter.ts b/src/engine/adapters/baileys.adapter.ts index 715a2388..f90d01d0 100644 --- a/src/engine/adapters/baileys.adapter.ts +++ b/src/engine/adapters/baileys.adapter.ts @@ -5,7 +5,7 @@ import makeWASocket, { getContentType, useMultiFileAuthState, } from '@whiskeysockets/baileys'; -import type { AnyMessageContent, WAMessage, WASocket } from '@whiskeysockets/baileys'; +import type { AnyMessageContent, MiscMessageGenerationOptions, WAMessage, WASocket } from '@whiskeysockets/baileys'; import { buildIncomingMessageFromBaileys, mapBaileysStatus } from './baileys-message-mapper'; import type { ILogger } from '@whiskeysockets/baileys/lib/Utils/logger.js'; import { @@ -185,6 +185,7 @@ export class BaileysAdapter implements IWhatsAppEngine { } this.sock = null; this.setStatus(EngineStatus.DISCONNECTED); + await this.config.messageStore?.clearSession(this.config.sessionId).catch(() => undefined); // ponytail: leaves the multi-file auth dir on disk; a fresh link overwrites it. Add fs cleanup if // stale creds ever block re-linking. } @@ -227,6 +228,13 @@ export class BaileysAdapter implements IWhatsAppEngine { async sendTextMessage(chatId: string, text: string): Promise { this.ensureReady(); const sent = await this.sock!.sendMessage(chatId, { text }); + if (sent) { + void this.config.messageStore?.put(this.config.sessionId, sent).catch(err => + this.logger.warn('Failed to persist sent message to store', { + error: err instanceof Error ? err.message : String(err), + }), + ); + } return { id: sent?.key?.id ?? '', timestamp: this.toUnixSeconds(sent?.messageTimestamp), @@ -299,18 +307,37 @@ export class BaileysAdapter implements IWhatsAppEngine { }); } - // ----- Gated: not supported by this minimal slice (no store) ----- - /* eslint-disable @typescript-eslint/no-unused-vars */ + async replyToMessage(chatId: string, quotedMsgId: string, text: string): Promise { + this.ensureReady(); + const quoted = await this.requireStored(quotedMsgId); + return this.sendContent(chatId, { text }, { quoted }); + } - replyToMessage(_chatId: string, _quotedMsgId: string, _text: string): Promise { - return this.unsupported('replyToMessage'); + async forwardMessage(fromChatId: string, toChatId: string, messageId: string): Promise { + this.ensureReady(); + const forward = await this.requireStored(messageId); + return this.sendContent(toChatId, { forward }); } - forwardMessage(_fromChatId: string, _toChatId: string, _messageId: string): Promise { - return this.unsupported('forwardMessage'); + + async reactToMessage(chatId: string, messageId: string, emoji: string): Promise { + this.ensureReady(); + const target = await this.requireStored(messageId); + await this.sock!.sendMessage(chatId, { react: { text: emoji, key: target.key } }); } - reactToMessage(_chatId: string, _messageId: string, _emoji: string): Promise { - return this.unsupported('reactToMessage'); + + async deleteMessage(chatId: string, messageId: string, forEveryone = true): Promise { + this.ensureReady(); + if (!forEveryone) { + // Baileys only supports revoke-for-everyone via sendMessage; delete-for-me is not implemented. + throw new EngineNotSupportedError('deleteMessage (delete-for-me)'); + } + const target = await this.requireStored(messageId); + await this.sock!.sendMessage(chatId, { delete: target.key }); } + + // ----- Gated: not supported by this minimal slice (no store) ----- + /* eslint-disable @typescript-eslint/no-unused-vars */ + getMessageReactions(_chatId: string, _messageId: string): Promise { return this.unsupported('getMessageReactions'); } @@ -359,9 +386,6 @@ export class BaileysAdapter implements IWhatsAppEngine { revokeGroupInviteCode(_groupId: string): Promise { return this.unsupported('revokeGroupInviteCode'); } - deleteMessage(_chatId: string, _messageId: string, _forEveryone?: boolean): Promise { - return this.unsupported('deleteMessage'); - } getChatHistory(_chatId: string, _limit?: number, _includeMedia?: boolean): Promise { return this.unsupported('getChatHistory'); } @@ -465,6 +489,11 @@ export class BaileysAdapter implements IWhatsAppEngine { } else { this.callbacks.onMessage?.(incoming); } + void this.config.messageStore?.put(this.config.sessionId, msg).catch(err => + this.logger.warn('Failed to persist message to store', { + error: err instanceof Error ? err.message : String(err), + }), + ); } } @@ -539,11 +568,33 @@ export class BaileysAdapter implements IWhatsAppEngine { } /** Send a Baileys content object and shape the result like the other sends. */ - private async sendContent(chatId: string, content: AnyMessageContent): Promise { - const sent = await this.sock!.sendMessage(chatId, content); + private async sendContent( + chatId: string, + content: AnyMessageContent, + options?: MiscMessageGenerationOptions, + ): Promise { + const sent = options + ? await this.sock!.sendMessage(chatId, content, options) + : await this.sock!.sendMessage(chatId, content); + if (sent) { + void this.config.messageStore?.put(this.config.sessionId, sent).catch(err => + this.logger.warn('Failed to persist sent message to store', { + error: err instanceof Error ? err.message : String(err), + }), + ); + } return { id: sent?.key?.id ?? '', timestamp: this.toUnixSeconds(sent?.messageTimestamp) }; } + /** Resolve a previously-seen message from the store, or throw a clear not-found error. */ + private async requireStored(messageId: string): Promise { + const found = await this.config.messageStore?.getMessage(this.config.sessionId, messageId); + if (!found?.key) { + throw new Error(`Message ${messageId} not found`); + } + return found; + } + private unsupported(method: string): Promise { return Promise.reject(new EngineNotSupportedError(method)); } diff --git a/src/plugins/engines/baileys/index.spec.ts b/src/plugins/engines/baileys/index.spec.ts index 5a49c230..e73a6beb 100644 --- a/src/plugins/engines/baileys/index.spec.ts +++ b/src/plugins/engines/baileys/index.spec.ts @@ -37,13 +37,17 @@ describe('BaileysPlugin.createEngine (opaque config)', () => { ); }); - it('advertises the slice-2a supported feature set', () => { + it('advertises the slice-2b supported feature set', () => { expect(new BaileysPlugin().getFeatures()).toEqual([ 'text-messages', 'typing-indicator', 'media-messages', 'location-messages', 'contact-messages', + 'message-replies', + 'message-forwarding', + 'message-reactions', + 'message-deletion', ]); }); diff --git a/src/plugins/engines/baileys/index.ts b/src/plugins/engines/baileys/index.ts index a33ec851..90ed0346 100644 --- a/src/plugins/engines/baileys/index.ts +++ b/src/plugins/engines/baileys/index.ts @@ -51,7 +51,17 @@ export class BaileysPlugin implements IEnginePlugin { } getFeatures(): string[] { - return ['text-messages', 'typing-indicator', 'media-messages', 'location-messages', 'contact-messages']; + return [ + 'text-messages', + 'typing-indicator', + 'media-messages', + 'location-messages', + 'contact-messages', + 'message-replies', + 'message-forwarding', + 'message-reactions', + 'message-deletion', + ]; } getEngineLibrary(): { name: string; version: string } { diff --git a/test/baileys-engine.e2e-spec.ts b/test/baileys-engine.e2e-spec.ts index e48da300..bf396e85 100644 --- a/test/baileys-engine.e2e-spec.ts +++ b/test/baileys-engine.e2e-spec.ts @@ -60,6 +60,10 @@ describe('Baileys engine boot (e2e)', () => { 'media-messages', 'location-messages', 'contact-messages', + 'message-replies', + 'message-forwarding', + 'message-reactions', + 'message-deletion', ]); }); }); From 58b363ddee24bbef0830d62bc94580ddc94f766c Mon Sep 17 00:00:00 2001 From: rmyndharis Date: Thu, 18 Jun 2026 10:57:13 +0700 Subject: [PATCH 6/7] fix(engine): move baileys migration spec out of the migrations glob (e2e/dev boot crash) --- .../1781000000000-AddBaileysStoredMessages.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename src/database/migrations/{ => __tests__}/1781000000000-AddBaileysStoredMessages.spec.ts (89%) diff --git a/src/database/migrations/1781000000000-AddBaileysStoredMessages.spec.ts b/src/database/migrations/__tests__/1781000000000-AddBaileysStoredMessages.spec.ts similarity index 89% rename from src/database/migrations/1781000000000-AddBaileysStoredMessages.spec.ts rename to src/database/migrations/__tests__/1781000000000-AddBaileysStoredMessages.spec.ts index 463b04fc..b12da56b 100644 --- a/src/database/migrations/1781000000000-AddBaileysStoredMessages.spec.ts +++ b/src/database/migrations/__tests__/1781000000000-AddBaileysStoredMessages.spec.ts @@ -1,5 +1,5 @@ import { DataSource } from 'typeorm'; -import { AddBaileysStoredMessages1781000000000 } from './1781000000000-AddBaileysStoredMessages'; +import { AddBaileysStoredMessages1781000000000 } from '../1781000000000-AddBaileysStoredMessages'; describe('AddBaileysStoredMessages migration', () => { let ds: DataSource; From fe376fbd7ecf3a4490bcd3d41c80e54dc104cbc2 Mon Sep 17 00:00:00 2001 From: rmyndharis Date: Thu, 18 Jun 2026 11:05:49 +0700 Subject: [PATCH 7/7] docs(changelog): reference PR #308 + operator note for the baileys store --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c736d862..be05baaf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,7 @@ plugins instead of in core (#265). it via `POST /plugins/auto-reply/enable` to exercise the capability layer end-to-end. (#294) - **Baileys engine (minimal slice)** — `ENGINE_TYPE=baileys` now selects a second, browser-free WhatsApp engine built on `@whiskeysockets/baileys` (WebSocket/Noise protocol, no Chromium). This first slice supports linking (QR + pairing code), sending and receiving **text**, recipient resolution, and typing presence; all other operations return HTTP 501 until later slices add a message store. Config: `BAILEYS_AUTH_DIR` (default `./data/baileys`). Proxy is not yet supported on this engine. (#299) - **Baileys engine — media/location/contact sends.** The Baileys engine (`ENGINE_TYPE=baileys`) can now send image/video/audio/document/sticker, location, and contact messages (slice 2a). URL media is fetched through the same SSRF-guarded path as the whatsapp-web.js engine (host guard + byte cap + timeout, no redirects). Reply/forward/react/delete remain unsupported (HTTP 501) until a later slice adds a message store. (#307) -- **Baileys engine — reply/forward/react/delete.** The Baileys engine can now reply to, forward, react to, and delete (revoke-for-everyone) messages, backed by a persisted per-session message store (`baileys_stored_messages`, bounded by `BAILEYS_MESSAGE_STORE_LIMIT`, default 5000). Delete-for-me and reading reactions remain unsupported (HTTP 501). (#PR) +- **Baileys engine — reply/forward/react/delete.** The Baileys engine can now reply to, forward, react to, and delete (revoke-for-everyone) messages, backed by a persisted per-session message store (`baileys_stored_messages`, bounded by `BAILEYS_MESSAGE_STORE_LIMIT`, default 5000). Delete-for-me and reading reactions remain unsupported (HTTP 501). Note: this engine persists recent message protos to the data database (so these operations survive restarts) — more data-at-rest than the whatsapp-web.js engine; the store is bounded per session, cleared on logout, and CASCADE-deleted with its session, and operators with retention requirements can tune `BAILEYS_MESSAGE_STORE_LIMIT`. (#308) ### Changed