diff --git a/trimsock.js/package.json b/trimsock.js/package.json index 72001fe..010dda2 100644 --- a/trimsock.js/package.json +++ b/trimsock.js/package.json @@ -1,7 +1,7 @@ { "name": "trimsock", "private": true, - "version": "0.18.2", + "version": "0.19.0", "type": "module", "scripts": { "docs": "typedoc", diff --git a/trimsock.js/packages/trimsock-bun/package.json b/trimsock.js/packages/trimsock-bun/package.json index f345475..099f1de 100644 --- a/trimsock.js/packages/trimsock-bun/package.json +++ b/trimsock.js/packages/trimsock-bun/package.json @@ -1,6 +1,6 @@ { "name": "@foxssake/trimsock-bun", - "version": "0.18.2", + "version": "0.19.0", "module": "dist/index.js", "type": "module", "author": "Tamás Gálffy", diff --git a/trimsock.js/packages/trimsock-js/lib/reactor.ts b/trimsock.js/packages/trimsock-js/lib/reactor.ts index 2ceb8d9..190c629 100644 --- a/trimsock.js/packages/trimsock-js/lib/reactor.ts +++ b/trimsock.js/packages/trimsock-js/lib/reactor.ts @@ -31,6 +31,11 @@ export type CommandErrorHandler = ( error: unknown, ) => void; +export type IngestErrorHandler = ( + error: unknown, + input: string | Buffer, +) => void; + /** * Callback type for generating exchange ID's * @@ -551,6 +556,7 @@ export abstract class Reactor { private handlers: Map> = new Map(); private defaultHandler: CommandHandler = () => {}; private errorHandler: CommandErrorHandler = () => {}; + private ingestErrorHandler: IngestErrorHandler = () => {}; private filters: CommandFilter[] = []; private exchanges = new ExchangeMap>(); @@ -563,6 +569,8 @@ export abstract class Reactor { /** * Register a command handler * + * Note that one command can only have one handler active at a time. Calling + * this method will replace the currently active handler, if it exists. * * @param commandName command name * @param handler callback function @@ -578,6 +586,9 @@ export abstract class Reactor { * Whenever a command is received that has no associated handler, the unknown * command handler is called. * + * Note that there's only one handler at any time, calling this method will + * replace the currently active handler. + * * @param handler callback function */ public onUnknown(handler: CommandHandler): this { @@ -591,6 +602,9 @@ export abstract class Reactor { * Whenever an error occurs during command processing ( e.g. in one of the * registered handlers ), the error handler is called. * + * Note that there's only one handler at any time, calling this method will + * replace the currently active handler. + * * @param handler callback function */ public onError(handler: CommandErrorHandler): this { @@ -598,6 +612,23 @@ export abstract class Reactor { return this; } + /** + * Register an ingest error handler + * + * If the reactor receives input that it can't manage ( e.g. it's malformed or + * the command is too long ), it rejects the command and calls the ingest error + * handler. + * + * Note that there's only one handler at any time, calling this method will + * replace the currently active handler. + * + * @param handler callback function + */ + public onIngestError(handler: IngestErrorHandler): this { + this.ingestErrorHandler = handler; + return this; + } + /** * Register a new command filter * @@ -683,15 +714,18 @@ export abstract class Reactor { * @param data incoming data * @param source source connection */ - public ingest(data: Buffer | string, source: T): void { - // TODO: Invoke error handler when ingest fails? - const reader = this.ensureReaderFor(source); - - if (typeof data === "string") reader.ingest(Buffer.from(data, "utf8")); - else reader.ingest(data); - - for (const item of reader.commands()) { - this.handle(new Command(item), source); + public async ingest(data: Buffer | string, source: T): Promise { + try { + const reader = this.ensureReaderFor(source); + + if (typeof data === "string") reader.ingest(Buffer.from(data, "utf8")); + else reader.ingest(data); + + await Promise.all( + reader.commands().map((it) => this.handle(new Command(it), source)), + ); + } catch (e) { + this.ingestErrorHandler(e, data); } } diff --git a/trimsock.js/packages/trimsock-js/package.json b/trimsock.js/packages/trimsock-js/package.json index 72d0196..a6ff895 100644 --- a/trimsock.js/packages/trimsock-js/package.json +++ b/trimsock.js/packages/trimsock-js/package.json @@ -1,6 +1,6 @@ { "name": "@foxssake/trimsock-js", - "version": "0.18.2", + "version": "0.19.0", "module": "dist/index.js", "type": "module", "author": "Tamás Gálffy", diff --git a/trimsock.js/packages/trimsock-js/spec/reactor/reactor.spec.ts b/trimsock.js/packages/trimsock-js/spec/reactor/reactor.spec.ts index 3ee9234..6e0948e 100644 --- a/trimsock.js/packages/trimsock-js/spec/reactor/reactor.spec.ts +++ b/trimsock.js/packages/trimsock-js/spec/reactor/reactor.spec.ts @@ -23,6 +23,33 @@ describe("Reactor", () => { // Outbox should be empty expect(reactor.outbox).toBeEmpty(); }); + + test("should not throw on unknown exchange", async () => { + expect( + async () => await reactor.ingest(".1234 foo\n", "0"), + ).not.toThrow(); + }); + + test("should never throw", async () => { + // Throw random strings at the reactor and see if it fails + const count = 1024; + const length = 32; + const charset = + "abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+-=,./<>?"; + + const inputs = [...new Array(count)].map( + () => + `${[...new Array(length)] + .map(() => ~~(Math.random() * charset.length)) + .map((idx) => charset.charAt(idx)) + .join("")}\n`, + ); + + const promise = Promise.all(inputs.map((it) => reactor.ingest(it, "0"))); + + console.log("Randomized inputs: ", inputs); + expect(async () => await promise).not.toThrow(); + }); }); describe("knownCommands", () => { diff --git a/trimsock.js/packages/trimsock-node/package.json b/trimsock.js/packages/trimsock-node/package.json index 2688c8d..0ac06ae 100644 --- a/trimsock.js/packages/trimsock-node/package.json +++ b/trimsock.js/packages/trimsock-node/package.json @@ -1,6 +1,6 @@ { "name": "@foxssake/trimsock-node", - "version": "0.18.2", + "version": "0.19.0", "module": "./dist/index.js", "type": "module", "author": "Tamás Gálffy",