Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ plugins instead of in core (#265).
it via `POST /plugins/auto-reply/enable` to exercise the capability layer end-to-end. (#294)
- **Baileys engine (minimal slice)** — `ENGINE_TYPE=baileys` now selects a second, browser-free WhatsApp engine built on `@whiskeysockets/baileys` (WebSocket/Noise protocol, no Chromium). This first slice supports linking (QR + pairing code), sending and receiving **text**, recipient resolution, and typing presence; all other operations return HTTP 501 until later slices add a message store. Config: `BAILEYS_AUTH_DIR` (default `./data/baileys`). Proxy is not yet supported on this engine. (#299)
- **Baileys engine — media/location/contact sends.** The Baileys engine (`ENGINE_TYPE=baileys`) can now send image/video/audio/document/sticker, location, and contact messages (slice 2a). URL media is fetched through the same SSRF-guarded path as the whatsapp-web.js engine (host guard + byte cap + timeout, no redirects). Reply/forward/react/delete remain unsupported (HTTP 501) until a later slice adds a message store. (#307)
- **Baileys engine — reply/forward/react/delete.** The Baileys engine can now reply to, forward, react to, and delete (revoke-for-everyone) messages, backed by a persisted per-session message store (`baileys_stored_messages`, bounded by `BAILEYS_MESSAGE_STORE_LIMIT`, default 5000). Delete-for-me and reading reactions remain unsupported (HTTP 501). Note: this engine persists recent message protos to the data database (so these operations survive restarts) — more data-at-rest than the whatsapp-web.js engine; the store is bounded per session, cleared on logout, and CASCADE-deleted with its session, and operators with retention requirements can tune `BAILEYS_MESSAGE_STORE_LIMIT`. (#308)

### Changed

Expand Down
1 change: 1 addition & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ if (process.env.QUEUE_ENABLED === 'true') {
__dirname + '/modules/webhook/**/*.entity{.ts,.js}',
__dirname + '/modules/message/**/*.entity{.ts,.js}',
__dirname + '/modules/template/**/*.entity{.ts,.js}',
__dirname + '/engine/**/*.entity{.ts,.js}',
],
migrations: [__dirname + '/database/migrations/*{.ts,.js}'],
logging: configService.get<boolean>('dataDatabase.logging', false),
Expand Down
38 changes: 38 additions & 0 deletions src/database/migrations/1781000000000-AddBaileysStoredMessages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

/**
* Creates `baileys_stored_messages` — the persisted Baileys message store backing
* reply/forward/react/delete. CASCADE-deleted with its session. Hand-authored because
* `synchronize` is off for the `data` connection on Postgres (and optional on SQLite).
*/
export class AddBaileysStoredMessages1781000000000 implements MigrationInterface {
name = 'AddBaileysStoredMessages1781000000000';

public async up(queryRunner: QueryRunner): Promise<void> {
if (await queryRunner.hasTable('baileys_stored_messages')) return;
const isPostgres = queryRunner.connection.options.type === 'postgres';

if (isPostgres) {
await queryRunner.query(
`CREATE TABLE "baileys_stored_messages" ("id" varchar PRIMARY KEY NOT NULL DEFAULT gen_random_uuid()::varchar, "sessionId" varchar NOT NULL, "waMessageId" varchar NOT NULL, "serializedMessage" text NOT NULL, "createdAt" timestamp NOT NULL DEFAULT NOW(), CONSTRAINT "FK_baileys_stored_messages_sessionId" FOREIGN KEY ("sessionId") REFERENCES "sessions" ("id") ON DELETE CASCADE ON UPDATE NO ACTION)`,
);
} else {
await queryRunner.query(
`CREATE TABLE "baileys_stored_messages" ("id" varchar PRIMARY KEY NOT NULL, "sessionId" varchar NOT NULL, "waMessageId" varchar NOT NULL, "serializedMessage" text NOT NULL, "createdAt" datetime NOT NULL DEFAULT (datetime('now')), CONSTRAINT "FK_baileys_stored_messages_sessionId" FOREIGN KEY ("sessionId") REFERENCES "sessions" ("id") ON DELETE CASCADE ON UPDATE NO ACTION)`,
);
}

await queryRunner.query(
`CREATE UNIQUE INDEX "UQ_baileys_stored_messages_session_wamsg" ON "baileys_stored_messages" ("sessionId", "waMessageId")`,
);
await queryRunner.query(
`CREATE INDEX "IDX_baileys_stored_messages_session_created" ON "baileys_stored_messages" ("sessionId", "createdAt")`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "IDX_baileys_stored_messages_session_created"`);
await queryRunner.query(`DROP INDEX "UQ_baileys_stored_messages_session_wamsg"`);
await queryRunner.query(`DROP TABLE "baileys_stored_messages"`);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { DataSource } from 'typeorm';
import { AddBaileysStoredMessages1781000000000 } from '../1781000000000-AddBaileysStoredMessages';

describe('AddBaileysStoredMessages migration', () => {
let ds: DataSource;

beforeEach(async () => {
// A `sessions` table must exist for the FK; create a minimal stand-in.
ds = new DataSource({ type: 'sqlite', database: ':memory:' });
await ds.initialize();
await ds.query(`CREATE TABLE "sessions" ("id" varchar PRIMARY KEY NOT NULL)`);
});

afterEach(async () => {
await ds.destroy();
});

it('creates and drops the table + indexes', async () => {
const runner = ds.createQueryRunner();
const migration = new AddBaileysStoredMessages1781000000000();

await migration.up(runner);
expect(await runner.hasTable('baileys_stored_messages')).toBe(true);

await migration.down(runner);
expect(await runner.hasTable('baileys_stored_messages')).toBe(false);

await runner.release();
});
});
94 changes: 94 additions & 0 deletions src/engine/adapters/baileys-message-store.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { DataSource, Repository } from 'typeorm';
import { BaileysStoredMessage } from './baileys-stored-message.entity';
import { BaileysMessageStoreService } from './baileys-message-store.service';

describe('BaileysMessageStoreService', () => {
let ds: DataSource;
let repo: Repository<BaileysStoredMessage>;
let service: BaileysMessageStoreService;

beforeEach(async () => {
ds = new DataSource({ type: 'sqlite', database: ':memory:', entities: [BaileysStoredMessage], synchronize: true });
await ds.initialize();
repo = ds.getRepository(BaileysStoredMessage);
service = new BaileysMessageStoreService(repo);
});

afterEach(async () => {
await ds.destroy();
delete process.env.BAILEYS_MESSAGE_STORE_LIMIT;
});

// Partial WAMessage fixture — cast through unknown so strict checks don't fire on the incomplete shape.
const msg = (id: string) =>
({
key: { id, remoteJid: '1@s.whatsapp.net', fromMe: false },
message: { conversation: id },
}) as unknown as Parameters<BaileysMessageStoreService['put']>[1];

it('round-trips a WAMessage through BufferJSON', async () => {
await service.put('s1', msg('M1'));
const got = await service.getMessage('s1', 'M1');
expect(got?.key?.id).toBe('M1');
expect(got?.message?.conversation).toBe('M1');
});

it('returns null for an unknown id and is session-scoped', async () => {
await service.put('s1', msg('M1'));
expect(await service.getMessage('s1', 'NOPE')).toBeNull();
expect(await service.getMessage('s2', 'M1')).toBeNull();
});

it('is idempotent on (sessionId, waMessageId)', async () => {
await service.put('s1', msg('M1'));
await service.put('s1', msg('M1'));
expect(await repo.count({ where: { sessionId: 's1' } })).toBe(1);
});

it('evicts oldest beyond the per-session cap', async () => {
process.env.BAILEYS_MESSAGE_STORE_LIMIT = '2';
const s = new BaileysMessageStoreService(repo);
// Use distinct createdAt values so ordering is deterministic regardless of UUID tiebreaker.
const t0 = new Date('2024-01-01T00:00:00.000Z');
const t1 = new Date('2024-01-01T00:00:01.000Z');
const t2 = new Date('2024-01-01T00:00:02.000Z');
for (const [waMessageId, createdAt] of [
['M1', t0],
['M2', t1],
['M3', t2],
] as [string, Date][]) {
await repo.save(repo.create({ sessionId: 's1', waMessageId, serializedMessage: '{}', createdAt }));
}
// Trigger eviction: put M3 again (idempotent upsert) so enforceLimit runs with 3 rows and cap=2.
await s.put('s1', msg('M3'));
expect(await s.getMessage('s1', 'M1')).toBeNull(); // oldest (t0) evicted
expect(await s.getMessage('s1', 'M2')).not.toBeNull();
expect(await s.getMessage('s1', 'M3')).not.toBeNull();
expect(await repo.count({ where: { sessionId: 's1' } })).toBe(2);
});

it('keeps exactly limit rows when multiple share the same createdAt (tiebreaker via id)', async () => {
process.env.BAILEYS_MESSAGE_STORE_LIMIT = '2';
const s = new BaileysMessageStoreService(repo);
// Insert 3 rows with identical createdAt to stress the (createdAt, id) tiebreaker.
// With UUID primary keys, id ordering is lexicographic — we can only assert count, not which survive.
const sharedTs = new Date('2024-01-01T00:00:00.000Z');
for (const waMessageId of ['T1', 'T2', 'T3']) {
await repo.save(repo.create({ sessionId: 's2', waMessageId, serializedMessage: '{}', createdAt: sharedTs }));
}
// Trigger eviction: put a 4th message (distinct, newer ts) through the service.
await s.put('s2', msg('T4'));
// Exactly limit rows must remain — no over- or under-deletion.
expect(await repo.count({ where: { sessionId: 's2' } })).toBe(2);
// T4 is the newest (distinct createdAt = now) and must survive.
expect(await s.getMessage('s2', 'T4')).not.toBeNull();
});

it('clearSession removes only that session', async () => {
await service.put('s1', msg('M1'));
await service.put('s2', msg('M2'));
await service.clearSession('s1');
expect(await service.getMessage('s1', 'M1')).toBeNull();
expect(await service.getMessage('s2', 'M2')).not.toBeNull();
});
});
65 changes: 65 additions & 0 deletions src/engine/adapters/baileys-message-store.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { BufferJSON } from '@whiskeysockets/baileys';
import type { WAMessage } from '@whiskeysockets/baileys';
import { BaileysStoredMessage } from './baileys-stored-message.entity';
import { BaileysMessageStore } from '../types/baileys.types';

function positiveIntFromEnv(name: string, fallback: number): number {
const parsed = Number.parseInt(process.env[name] ?? '', 10);
return Number.isInteger(parsed) && parsed > 0 ? parsed : fallback;
}

@Injectable()
export class BaileysMessageStoreService implements BaileysMessageStore {
constructor(
@InjectRepository(BaileysStoredMessage, 'data')
private readonly repo: Repository<BaileysStoredMessage>,
) {}

async put(sessionId: string, msg: WAMessage): Promise<void> {
const waMessageId = msg.key?.id;
if (!waMessageId) {
return;
}
const serializedMessage = JSON.stringify(msg, BufferJSON.replacer);
// Idempotent: the same message arrives from the send return AND the messages.upsert echo.
await this.repo.upsert({ sessionId, waMessageId, serializedMessage }, ['sessionId', 'waMessageId']);
await this.enforceLimit(sessionId);
}

async getMessage(sessionId: string, messageId: string): Promise<WAMessage | null> {
const row = await this.repo.findOne({ where: { sessionId, waMessageId: messageId } });
if (!row) {
return null;
}
return JSON.parse(row.serializedMessage, BufferJSON.reviver) as WAMessage;
}

async clearSession(sessionId: string): Promise<void> {
await this.repo.delete({ sessionId });
}

/** Per-session row cap: keep the newest N, delete the rest. Deterministic via (createdAt, id). */
private async enforceLimit(sessionId: string): Promise<void> {
const limit = positiveIntFromEnv('BAILEYS_MESSAGE_STORE_LIMIT', 5000);
const cutoff = await this.repo.find({
where: { sessionId },
order: { createdAt: 'DESC', id: 'DESC' },
skip: limit,
take: 1,
select: ['id', 'createdAt'],
});
if (cutoff.length === 0) {
return; // under the cap — nothing to evict
}
const { id, createdAt } = cutoff[0];
await this.repo
.createQueryBuilder()
.delete()
.where('sessionId = :sessionId', { sessionId })
.andWhere('(createdAt < :createdAt OR (createdAt = :createdAt AND id <= :id))', { createdAt, id })
.execute();
}
}
26 changes: 26 additions & 0 deletions src/engine/adapters/baileys-stored-message.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm';

/**
* Persisted Baileys message store (the lib ships none). Holds the serialized WAMessage proto
* (via BufferJSON) so reply/forward/react/delete can resolve the original message/key by id across
* restarts. Engine-specific — lives in the engine layer, not the neutral `messages` table.
*/
@Entity('baileys_stored_messages')
@Index(['sessionId', 'waMessageId'], { unique: true }) // lookup + dedup (send-return vs upsert echo)
@Index(['sessionId', 'createdAt']) // eviction ordering
export class BaileysStoredMessage {
@PrimaryGeneratedColumn('uuid')
id: string;

@Column()
sessionId: string;

@Column()
waMessageId: string;

@Column({ type: 'text' })
serializedMessage: string;

@CreateDateColumn()
createdAt: Date;
}
105 changes: 104 additions & 1 deletion src/engine/adapters/baileys.adapter.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ import { EngineNotReadyError } from '../../common/errors/engine-not-ready.error'
import { EngineNotSupportedError } from '../../common/errors/engine-not-supported.error';
import { loadRemoteMediaBuffer } from '../../common/media/load-remote-media';

const newAdapter = (): BaileysAdapter => new BaileysAdapter({ sessionId: 'sess-1', authDir: './data/baileys' });
const fakeStore = {
put: jest.fn().mockResolvedValue(undefined),
getMessage: jest.fn(),
clearSession: jest.fn().mockResolvedValue(undefined),
};
const newAdapter = (): BaileysAdapter =>
new BaileysAdapter({ sessionId: 'sess-1', authDir: './data/baileys', messageStore: fakeStore });

const noopCallbacks = (over: Partial<EngineEventCallbacks> = {}): EngineEventCallbacks => over;

Expand Down Expand Up @@ -419,3 +425,100 @@ describe('BaileysAdapter media sends', () => {
).rejects.toBeInstanceOf(EngineNotReadyError);
});
});

describe('BaileysAdapter store-backed ops', () => {
beforeEach(() => {
fakeSock.user = { id: '628999:1@s.whatsapp.net', name: 'Me' };
jest.clearAllMocks();
fakeSock.sendMessage.mockResolvedValue({
key: { id: 'OUT', remoteJid: '628111@s.whatsapp.net', fromMe: true },
messageTimestamp: 1700000009,
});
});

const ready = async (): Promise<BaileysAdapter> => {
const adapter = newAdapter();
await adapter.initialize({});
fakeSock.fire('connection.update', { connection: 'open' });
return adapter;
};

const stored = {
key: { id: 'TARGET', remoteJid: '628111@s.whatsapp.net', fromMe: false },
message: { conversation: 'hi' },
};

it('replyToMessage quotes the stored message', async () => {
fakeStore.getMessage.mockResolvedValue(stored);
const adapter = await ready();
await adapter.replyToMessage('628111@s.whatsapp.net', 'TARGET', 'my reply');
expect(fakeStore.getMessage).toHaveBeenCalledWith('sess-1', 'TARGET');
expect(fakeSock.sendMessage).toHaveBeenCalledWith(
'628111@s.whatsapp.net',
{ text: 'my reply' },
{ quoted: stored },
);
});

it('forwardMessage forwards the stored message', async () => {
fakeStore.getMessage.mockResolvedValue(stored);
const adapter = await ready();
await adapter.forwardMessage('628111@s.whatsapp.net', '628222@s.whatsapp.net', 'TARGET');
expect(fakeSock.sendMessage).toHaveBeenCalledWith('628222@s.whatsapp.net', { forward: stored });
});

it('reactToMessage sends the stored key', async () => {
fakeStore.getMessage.mockResolvedValue(stored);
const adapter = await ready();
await adapter.reactToMessage('628111@s.whatsapp.net', 'TARGET', '👍');
expect(fakeSock.sendMessage).toHaveBeenCalledWith('628111@s.whatsapp.net', {
react: { text: '👍', key: stored.key },
});
});

it('deleteMessage revokes via the stored key', async () => {
fakeStore.getMessage.mockResolvedValue(stored);
const adapter = await ready();
await adapter.deleteMessage('628111@s.whatsapp.net', 'TARGET', true);
expect(fakeSock.sendMessage).toHaveBeenCalledWith('628111@s.whatsapp.net', { delete: stored.key });
});

it('throws when the referenced message is not in the store', async () => {
fakeStore.getMessage.mockResolvedValue(null);
const adapter = await ready();
await expect(adapter.replyToMessage('c', 'GONE', 'x')).rejects.toThrow(/not found/i);
});

it('deleteMessage for-me (forEveryone=false) is not supported', async () => {
const adapter = await ready();
await expect(adapter.deleteMessage('c', 'TARGET', false)).rejects.toBeInstanceOf(EngineNotSupportedError);
});

it('populates the store on an inbound message', async () => {
const adapter = newAdapter();
await adapter.initialize({});
fakeSock.fire('messages.upsert', {
type: 'notify',
messages: [
{ key: { remoteJid: '628111@s.whatsapp.net', fromMe: false, id: 'IN9' }, message: { conversation: 'hi' } },
],
});
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const inboundMatcher = expect.objectContaining({ key: expect.objectContaining({ id: 'IN9' }) });
expect(fakeStore.put).toHaveBeenCalledWith('sess-1', inboundMatcher);
});

it('populates the store on an outgoing send', async () => {
const adapter = await ready();
await adapter.sendTextMessage('628111@s.whatsapp.net', 'hello');
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const outboundMatcher = expect.objectContaining({ key: expect.objectContaining({ id: 'OUT' }) });
expect(fakeStore.put).toHaveBeenCalledWith('sess-1', outboundMatcher);
});

it('clears the store on logout', async () => {
const adapter = await ready();
await adapter.logout();
expect(fakeStore.clearSession).toHaveBeenCalledWith('sess-1');
});
});
Loading