Skip to content

Commit 3e4fb39

Browse files
Add channel. Finally
2 parents 8a7d327 + 2ad9753 commit 3e4fb39

12 files changed

Lines changed: 981 additions & 168 deletions

File tree

.github/workflows/main-commit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name: Test
22
on:
33
push:
4-
branches: [main]
4+
branches: [main, events]
55
jobs:
66
test:
77
name: Test

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,6 @@ But the "speed" of creating your app comes from so-called "abstractions". Their
4141
Though it IS performant, it handles only core features. Now there are solutions like "hyper-express", "ultimate-express", "uwebsockets-express", but they are just additional layers simulating "express" behaviour, while hiding the most important (and difficult) aspects of uWS.<br>
4242

4343
This library provides you with utilities but you still operate on the uWebSockets.js. This way server remains optimizable and rapid typing doesn't vanish.
44+
45+
# New feature - "Channel". In other words, "another event emitter".
46+
Channel proved to be the fastest among tseep, cozyevent and node:events. It is cross-platform and can be used with/without main µBlitz.js ecosystem. Import it using "@ublitzjs/core/channel"

USAGE.md

Lines changed: 104 additions & 31 deletions
Large diffs are not rendered by default.

bun.lock

Lines changed: 8 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,45 @@
11
{
22
"name": "@ublitzjs/core",
3-
"version": "1.1.0",
3+
"version": "1.2.0",
44
"types": "./dist/types/index.d.ts",
55
"files": ["dist", "LICENSE"],
66
"typesVersions": {
77
"*": {
8-
".": ["./dist/types/index.d.ts"]
8+
".": ["./dist/types/index.d.ts"],
9+
"channel": ["./dist/types/channel.d.ts"]
910
}
1011
},
1112
"license": "MIT",
1213
"exports": {
13-
"types": "./dist/types/index.d.ts",
14-
"require": "./dist/cjs/index.js",
15-
"import": "./dist/esm/index.js"
14+
".": {
15+
"types": "./dist/types/index.d.ts",
16+
"require": "./dist/cjs/index.js",
17+
"import": "./dist/esm/index.js"
18+
},
19+
"./channel": {
20+
"types": "./dist/types/channel.d.ts",
21+
"require": "./dist/cjs/channel.js",
22+
"import": "./dist/esm/channel.js"
23+
}
1624
},
1725
"dependencies": {
18-
"tseep": "^1.3.1",
1926
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.57.0"
2027
},
2128
"publishConfig": {
2229
"access": "public"
2330
},
2431
"devDependencies": {
32+
"cozyevent": "^1.3.3",
2533
"@babel/cli": "^7.28.6",
2634
"@babel/core": "^7.29.0",
2735
"@babel/plugin-transform-modules-commonjs": "^7.28.6",
2836
"@types/node": "^25.2.1",
2937
"@types/ws": "^8.18.1",
3038
"@vitest/coverage-v8": "4.0.18",
3139
"esbuild": "^0.25.12",
40+
"tinybench": "^6.0.0",
3241
"tsd": "^0.33.0",
42+
"tseep": "^1.3.1",
3343
"vitest": "^4.0.8",
3444
"ws": "^8.18.3"
3545
},
@@ -39,7 +49,9 @@
3949
"build:cjs": "babel dist/esm --out-dir dist/cjs",
4050
"build:types": "tsc -p tsconfig.types.json",
4151
"build": "npm run build:types && npm run build:esm && npm run build:cjs",
42-
"test": "tsd && vitest run tests/index.test.ts --coverage"
52+
"test:base": "tsd && vitest run tests/index.test.ts --coverage",
53+
"test:channel": "node tests/ch-bench.mjs",
54+
"test": "bun run test:base && bun run test:channel"
4355
},
4456
"tsd": {
4557
"directory": "tests"
@@ -64,6 +76,7 @@
6476
"headers",
6577
"codes",
6678
"development",
79+
"fastest pub/sub event emitter",
6780
"asynchronous code",
6881
"µBlitz.js"
6982
]

src/channel.ts

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/**
2+
* This is a type for callbacks, passed into "Channel". Note that it has an "id" property for O(1) lookup when removing items. Please, don't touch it.
3+
* */
4+
export type ChannelCB<T> = ((data: T)=>void) & {id: number}
5+
6+
/**
7+
* An event channel, pub/sub pattern, replacement (for ordinary event emitter. Creation is faster, removal is O(1) (callbacks get "id" property for this, don't touch it), hence is scalable. When has one listener (not when it had more and then left 1) - has optimization. It is used in "HttpResponse.abortCh
8+
* If you want to have something like "emitter.emit("event", data)" just create an object with channels: {"event": new Channel()}.
9+
* It doesn't support "once" events, because all you need is just "clear" the channel for it.
10+
* It doesn't handle cases when you are deleting a listener within "pub" call. It can skip other listener. If you want both 'once' and 'on' listeners, create 2 separate channels for both handlers, for 'once' use "channel.clear()"
11+
* Took some inspiration from tseep.
12+
* @example
13+
* // any message you'd like to send. It can be anything
14+
* type MessageT = `hello, ${string}`
15+
* var channel = new Channel<MessageT>()
16+
* save function, but not anonymously (if you want then to remove it alone)
17+
* function subscriber1 (message: MessageT) {
18+
* console.log("Here is a message", message)
19+
* }
20+
* channel.sub(subscriber1)
21+
* function sub2 () {}
22+
* channel.sub(sub2)
23+
* // message is of MessageT
24+
* channel.pub(`hello, MISTER ANDERSON`)
25+
* // remove subscribers individually
26+
* channel.unsub(subscriber1)
27+
* // remove all subscribers at once
28+
* channel.clear()
29+
* */
30+
export class Channel<MessageType> {
31+
protected cbs: ChannelCB<MessageType>[] = [];
32+
/**find out if there are any active listeners*/
33+
get isEmpty() { return !this.cbs }
34+
/**subscribe to channel.*/
35+
sub(fn: (msg: MessageType) => void) {
36+
var cbs = this.cbs;
37+
(fn as any).id = cbs.length; cbs.push(fn as any);
38+
}
39+
/**unsubscribe from channel - remove only callbacks, that are definitely stored inside. Otherwise function throws an error*/
40+
unsub(fn: (msg: MessageType) => void) {
41+
var cbs = this.cbs
42+
let id: number = (fn as any).id
43+
if (id == cbs.length - 1)
44+
cbs.pop();
45+
else {
46+
let newCb = (cbs[id] = cbs.pop()!); newCb.id = id;
47+
}
48+
}
49+
/**publish a message to the whole channel. While the function is active, don't remove any of channel's listeners. */
50+
pub(data: MessageType){
51+
var cbs = this.cbs
52+
for (let i = 0; i < cbs.length; i++) {
53+
cbs[i]!(data);
54+
}
55+
}
56+
clear() { this.cbs = [] }
57+
}
58+
59+
/**
60+
* @description it is not written to be actively used. If you use "Channel" you eliminate unwanted overhead of using "too universal" tool like this EventEmitter. Its main purpose is to combine 'on' and 'once' listeners.
61+
**/
62+
export class EventEmitter<T extends Record<string|number|symbol, any>> {
63+
/**All events that you use. If you need both 'once' and 'on' but want to avoid built-in methods of 'EventEmitter', you can use 'events' manually.*/
64+
events: {
65+
[K in keyof T]: {
66+
on: Channel<T[K]>,
67+
once: Channel<T[K]>
68+
} | undefined
69+
} = {} as any
70+
on<K extends keyof T>(ev: K, handler: (this: Channel<T[K]>, data: T[K])=>void) {
71+
var obj = this.events[ev]
72+
if (!obj) this.events[ev] = obj = {
73+
on: new Channel(),
74+
once: new Channel()
75+
}
76+
obj.on.sub(handler)
77+
}
78+
once<K extends keyof T>(ev: K, handler: (this: Channel<T[K]>, data: T[K])=>void) {
79+
var obj = this.events[ev]
80+
if (!obj) this.events[ev] = obj = {
81+
on: new Channel(),
82+
once: new Channel()
83+
}
84+
obj.once.sub(handler)
85+
}
86+
/**Remove listener from event. For 'once' listeners you 'offOnce'*/
87+
off<K extends keyof T>(ev: K, handler: (this: Channel<T[K]>, data: T[K])=>void) {
88+
var obj = this.events[ev]
89+
if(!obj || !("id" in handler)) return;
90+
obj.on.unsub(handler)
91+
delete handler.id
92+
}
93+
/**Remove 'once' listener from event*/
94+
offOnce<K extends keyof T>(ev: K, handler: (this: Channel<T[K]>, data: T[K])=>void) {
95+
var obj = this.events[ev]
96+
if(!obj || !("id" in handler)) return;
97+
obj.once.unsub(handler)
98+
delete handler.id
99+
}
100+
/**Here message first goes to "once" listeners, then - 'on'*/
101+
emit<K extends keyof T>(ev: K, data: T[K]) {
102+
var obj = this.events[ev]
103+
if(!obj) return;
104+
obj.once.pub(data);
105+
obj.once.clear();
106+
obj.on.pub(data);
107+
}
108+
/**remove all listeners from specific event OR, if unspecificed, remove all events*/
109+
removeAllListeners(ev?: keyof T) {
110+
ev ? delete this.events[ev] : this.events = {} as any
111+
}
112+
}

src/index.ts

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { BaseHeaders, lowHeaders, RequiredBaseHeaders } from "./http-header
33
import uWS from "uWebSockets.js";
44
import { Buffer } from "node:buffer";
55
import { EventEmitter } from "tseep";
6+
import { Channel } from "./channel.js"
67
(uWS as any).DeclarativeResponse.prototype.writeHeaders = function (headers: any) {
78
for (const key in headers) this.writeHeader(key, headers[key]);
89
return this;
@@ -20,13 +21,36 @@ import type {
2021
} from "uWebSockets.js";
2122

2223
/**
23-
* function to effortlessly mark response as aborted AND to attach an event emitter, so that you can easily scale the handler. If you don't need event emitter and only some res.aborted - set it by yourself (no overkill for the handler)
24-
* If some utility expect import("@ublitzjs/core").HttpResponse, it means that they response to first go through this registerAbort
25-
* @param res
26-
* @example
27-
* console.log(Boolean(res.emitter)) // false
28-
* registerAbort(res)
29-
* console.log(Boolean(res.emitter)) // true
24+
* Simple utility proving fast (at least 6+ times) tools to handle "onAborted" using "pub/sub" pattern. Inside uses custom "event emitter", but for one single event. Properties created are "res.aborted (boolean, becomes "true" when res.onAborted fires)" and "res.abortCh (abort channel, imported from @ublitzjs/core/channel). However all callbacks you pass to "abortCh.sub" get "id" property. Don't touch it, ok? It assists with O(1) removal.
25+
* @example
26+
* server.get('/', (res)=>{
27+
* res.aborted === undefined // true
28+
* res.abortCh === undefined // true
29+
* regAbort(res); // no unwanted overhead
30+
* res.aborted === false;
31+
* function onAb() { console.log("aborted"); }
32+
* res.abortCh.sub(onAb);
33+
* setTimeout(()=>{
34+
* if(!res.aborted) { // you need to check, otherwise uWS drops server
35+
* res.abortCh.unsub(onAb); // O(1) lookup
36+
* res.end("HOORAY")
37+
* }
38+
* }, 1000)
39+
* })
40+
* */
41+
export function regAbort(res: uwsHttpResponse): HttpResponse {
42+
if ("aborted" in res) throw new Error("abort already registered");
43+
res.aborted = false;
44+
res.abortCh = new Channel<undefined>()
45+
return res.onAborted(() => {
46+
res.aborted = true;
47+
res.abortCh.pub(undefined);
48+
res.abortCh.clear()
49+
}) as HttpResponse;
50+
}
51+
/**
52+
* @deprecated this function uses "tseep" dependency, which adds an overhead while gets created and doesn't give just one "abort" channel - too much. Instead use "regAbort", which adds "abortCh" (abort channel, AT LEAST 6 TIMES FASTER)
53+
* this function adds "res.emitter and res.aborted=false".
3054
*/
3155
export function registerAbort(res: uwsHttpResponse): HttpResponse {
3256
if (typeof res.aborted === "boolean")
@@ -130,21 +154,34 @@ export interface HttpResponse<UserDataForWS = {}>
130154
*/
131155
collect: (...any: any[]) => any;
132156
/**
133-
* An event emitter, which lets you subscribe several listeners to "abort" event OR your own events, defined with Symbol() | other string
134-
* @example
135-
* res.emitter.once("abort", ()=>{
136-
* console.log("I have to clean up some random file descriptor")
137-
* // do cleanup
138-
* })
157+
* @deprecated This is an event emitter, but it is "too much" as just an "onAborted" extension. Don't use "registerAbort", and it won't appear. Instead use "regAbort" and you will get better "res.abortCh"
139158
*/
140159
emitter: EventEmitter<{
141160
abort: () => void;
142161
[k: symbol]: (...any: any[]) => void;
143162
}>;
144163
/**
145-
* changes when res.onAborted fires (you have to use registerAbort for this)
164+
* An event channel for onAborted. You can subscribe to it and unsubscribe. "pub/clear" are better to be avoided here
165+
* server.get('/', (res)=>{
166+
* res.aborted === undefined // true
167+
* res.abortCh === undefined // true
168+
* regAbort(res); // no unwanted overhead
169+
* res.aborted === false;
170+
* function onAb() { console.log("aborted"); }
171+
* res.abortCh.sub(onAb);
172+
* setTimeout(()=>{
173+
* if(!res.aborted) { // you need to check, otherwise uWS drops server
174+
* res.abortCh.unsub(onAb); // O(1) lookup
175+
* res.end("HOORAY")
176+
* }
177+
* }, 1000)
178+
* })
179+
* */
180+
abortCh: Channel<undefined>
181+
/**
182+
* changes when res.onAborted fires (you have to use regAbort (not registerAbort) for this)
146183
*/
147-
aborted?: boolean;
184+
aborted: boolean;
148185
/**
149186
* You should set it manually when ending the response. Particularly useful if some error has fired and you are doubting whether res.aborted is a sufficient flag.
150187
* @example

0 commit comments

Comments
 (0)