Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion trimsock.js/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "trimsock",
"private": true,
"version": "0.18.2",
"version": "0.19.0",
"type": "module",
"scripts": {
"docs": "typedoc",
Expand Down
2 changes: 1 addition & 1 deletion trimsock.js/packages/trimsock-bun/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@foxssake/trimsock-bun",
"version": "0.18.2",
"version": "0.19.0",
"module": "dist/index.js",
"type": "module",
"author": "Tamás Gálffy",
Expand Down
52 changes: 43 additions & 9 deletions trimsock.js/packages/trimsock-js/lib/reactor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ export type CommandErrorHandler<T> = (
error: unknown,
) => void;

export type IngestErrorHandler = (
error: unknown,
input: string | Buffer,
) => void;

/**
* Callback type for generating exchange ID's
*
Expand Down Expand Up @@ -551,6 +556,7 @@ export abstract class Reactor<T> {
private handlers: Map<string, CommandHandler<T>> = new Map();
private defaultHandler: CommandHandler<T> = () => {};
private errorHandler: CommandErrorHandler<T> = () => {};
private ingestErrorHandler: IngestErrorHandler = () => {};
private filters: CommandFilter<T>[] = [];

private exchanges = new ExchangeMap<T, ReactorExchange<T>>();
Expand All @@ -563,6 +569,8 @@ export abstract class Reactor<T> {
/**
* Register a command handler
*
* Note that one command can only have one handler active at a time. Calling
* this method will replace the currently active handler, if it exists.
*
* @param commandName command name
* @param handler callback function
Expand All @@ -578,6 +586,9 @@ export abstract class Reactor<T> {
* Whenever a command is received that has no associated handler, the unknown
* command handler is called.
*
* Note that there's only one handler at any time, calling this method will
* replace the currently active handler.
*
* @param handler callback function
*/
public onUnknown(handler: CommandHandler<T>): this {
Expand All @@ -591,13 +602,33 @@ export abstract class Reactor<T> {
* Whenever an error occurs during command processing ( e.g. in one of the
* registered handlers ), the error handler is called.
*
* Note that there's only one handler at any time, calling this method will
* replace the currently active handler.
*
* @param handler callback function
*/
public onError(handler: CommandErrorHandler<T>): this {
this.errorHandler = handler;
return this;
}

/**
* Register an ingest error handler
*
* If the reactor receives input that it can't manage ( e.g. it's malformed or
* the command is too long ), it rejects the command and calls the ingest error
* handler.
*
* Note that there's only one handler at any time, calling this method will
* replace the currently active handler.
*
* @param handler callback function
*/
public onIngestError(handler: IngestErrorHandler): this {
this.ingestErrorHandler = handler;
return this;
}

/**
* Register a new command filter
*
Expand Down Expand Up @@ -683,15 +714,18 @@ export abstract class Reactor<T> {
* @param data incoming data
* @param source source connection
*/
public ingest(data: Buffer | string, source: T): void {
// TODO: Invoke error handler when ingest fails?
const reader = this.ensureReaderFor(source);

if (typeof data === "string") reader.ingest(Buffer.from(data, "utf8"));
else reader.ingest(data);

for (const item of reader.commands()) {
this.handle(new Command(item), source);
public async ingest(data: Buffer | string, source: T): Promise<void> {
try {
const reader = this.ensureReaderFor(source);

if (typeof data === "string") reader.ingest(Buffer.from(data, "utf8"));
else reader.ingest(data);

await Promise.all(
reader.commands().map((it) => this.handle(new Command(it), source)),
);
} catch (e) {
this.ingestErrorHandler(e, data);
}
}

Expand Down
2 changes: 1 addition & 1 deletion trimsock.js/packages/trimsock-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@foxssake/trimsock-js",
"version": "0.18.2",
"version": "0.19.0",
"module": "dist/index.js",
"type": "module",
"author": "Tamás Gálffy",
Expand Down
27 changes: 27 additions & 0 deletions trimsock.js/packages/trimsock-js/spec/reactor/reactor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,33 @@ describe("Reactor", () => {
// Outbox should be empty
expect(reactor.outbox).toBeEmpty();
});

test("should not throw on unknown exchange", async () => {
expect(
async () => await reactor.ingest(".1234 foo\n", "0"),
).not.toThrow();
});

test("should never throw", async () => {
// Throw random strings at the reactor and see if it fails
const count = 1024;
const length = 32;
const charset =
"abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()_+-=,./<>?";

const inputs = [...new Array(count)].map(
() =>
`${[...new Array(length)]
.map(() => ~~(Math.random() * charset.length))
.map((idx) => charset.charAt(idx))
.join("")}\n`,
);

const promise = Promise.all(inputs.map((it) => reactor.ingest(it, "0")));

console.log("Randomized inputs: ", inputs);
expect(async () => await promise).not.toThrow();
});
});

describe("knownCommands", () => {
Expand Down
2 changes: 1 addition & 1 deletion trimsock.js/packages/trimsock-node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@foxssake/trimsock-node",
"version": "0.18.2",
"version": "0.19.0",
"module": "./dist/index.js",
"type": "module",
"author": "Tamás Gálffy",
Expand Down