diff --git a/docs/runbooks/genesis-and-identities-endpoints.md b/docs/runbooks/genesis-and-identities-endpoints.md new file mode 100644 index 00000000..f88953cb --- /dev/null +++ b/docs/runbooks/genesis-and-identities-endpoints.md @@ -0,0 +1,174 @@ +--- +type: runbook +title: The /genesisBlock and /identities Endpoints +date: 2026-06-05 +status: active +--- + +# The /genesisBlock and /identities Endpoints + +Two read-only GET routes on the node's RPC HTTP server +(`src/libs/network/server_rpc.ts`), served behind the same +CORS + rate-limiter + JSON middleware stack as every other GET route. +Default port is `53550`. + +There are now three genesis-related GET routes — know which one you want: + +| Route | Returns | +|---|---| +| `/genesis` | Only the embedded `genesisData` (chain params, balances, validators) parsed out of the genesis block's `content.extra.genesisData`. Pre-existing. | +| `/genesisBlock` | The **entire** genesis block (block 0) as stored — full `Blocks` record including `hash`, `number`, `content`, signatures, etc. | +| `/identities` | Paginated listing of every account's linked identities. | + +--- + +## /genesisBlock + +Dumps the whole genesis block (block 0) exactly as persisted, via +`Chain.getGenesisBlock()` (which is `getBlockByNumber(0)`). + +### Quick check + +```bash +curl -fsS http://localhost:53550/genesisBlock | jq +``` + +### Responses + +- **200** — the full genesis `Blocks` object. +- **503** — `{ result: 503, response: "STATE_NOT_READY", extra: { message } }` + when the genesis block is not found yet (node still booting / chain not + initialized) or the read threw. + +Use `/genesisBlock` when you need block-level fields (hash, signatures, +the raw `content` envelope). Use `/genesis` when you only need the chain +parameters and pre-funded balances. + +Rate limit: `/genesisBlock` maps to the `genesisblock` method in the rate +limiter's `pathMethodMap`, so it gets the default per-IP GET allowance +(2000 req/60s) rather than falling through to the lax POST bucket. + +--- + +## /identities + +Lists the linked identities of every account, **paginated**. Each row is +just the account `pubkey` plus its `identities` jsonb blob — never +balance, nonce, or points. Backed by `GCR.listIdentities(limit, cursor)` +in `src/libs/blockchain/gcr/gcr.ts`. + +### Why only pubkey + identities + +The `gcr_main` table can hold a large number of jsonb-heavy rows, and the +`balance` column is a `bigint` that `JSON.stringify` cannot serialize. By +projecting only `pubkey` + `identities` at the SQL level +(`.select(["gcr.pubkey", "gcr.identities"])`) the payload stays focused +and the bigint serialization trap is never hit. + +### Pagination — keyset, not offset + +Pagination seeks on the `pubkey` primary key: +`WHERE pubkey > :cursor ORDER BY pubkey ASC LIMIT :n`. This stays +O(log n) on the PK index no matter how deep into the table you page — +unlike `OFFSET`, which scans and discards rows. An unbounded `find()` +over the whole table is deliberately avoided so a single request can +never load the entire account set into memory. + +### Query parameters + +| Param | Type | Default | Notes | +|---|---|---|---| +| `limit` | int | `100` | Clamped to `[1, 1000]`. Non-numeric / non-positive falls back to 100; values above 1000 are capped. | +| `cursor` | string | — | The `pubkey` of the last row from the previous page. Omit for the first page. | + +### Quick check + +```bash +# First page (default 100 per page) +curl -fsS "http://localhost:53550/identities" | jq + +# Custom page size +curl -fsS "http://localhost:53550/identities?limit=250" | jq + +# Next page — pass back the previous response's nextCursor +curl -fsS "http://localhost:53550/identities?cursor=" | jq +``` + +### Response shape + +```jsonc +{ + "success": true, + "identities": [ + { "pubkey": "0xabc…", "identities": { "xm": { … }, "web2": { … }, "pqc": { … }, "ud": [ … ] } } + // … + ], + "count": 100, // rows in THIS page + "limit": 100, // effective (clamped) page size + "nextCursor": "0xfff…" // pass as ?cursor= for the next page; null = end of table +} +``` + +### Paging to the end + +Keep calling with `?cursor=` until `nextCursor` is `null`. +A `null` cursor means the last page returned fewer rows than `limit`, +i.e. the table is exhausted. + +```bash +cursor="" +while :; do + url="http://localhost:53550/identities?limit=500" + [ -n "$cursor" ] && url="$url&cursor=$cursor" + resp=$(curl -fsS "$url") + echo "$resp" | jq -c '.identities[]' + cursor=$(echo "$resp" | jq -r '.nextCursor') + [ "$cursor" = "null" ] && break +done +``` + +### Errors + +- **500** — `{ success: false, error: "Failed to list identities", message }` + if the DB query throws. +- **503** + `Retry-After` header — `{ success: false, error: "Service busy", message }` + when the node is already serving its max concurrent `/identities` requests + and a slot did not free in time (see DDoS hardening below). + +### DDoS hardening + +`/identities` is the most expensive GET route (a paginated full-table read +over jsonb-heavy rows), so it has three independent brakes layered on top +of each other. Tunable via `src/utilities/constants.ts`. + +1. **Per-IP rate limit** — `identities` method limit + (`RATE_LIMIT_IDENTITIES_MAX_REQUESTS` = 30 per + `RATE_LIMIT_IDENTITIES_WINDOW_MS` = 60s), far below the default GET + allowance of 2000/60s. Registered in the rate limiter's `pathMethodMap` + (`src/libs/network/middleware/rateLimiter.ts`) and `methodLimits` + (`src/utilities/sharedState.ts`). Stops a single source from flooding. + Over the limit → standard 429 from the rate-limiter middleware. + +2. **Global concurrency gate** — at most `IDENTITIES_MAX_CONCURRENT` (= 3) + `/identities` handlers execute at once across **all** callers, so a + distributed burst (many IPs × 1 request each, which the per-IP limit + can't catch) still can't pile unbounded DB load. Overflow callers queue + up to `IDENTITIES_MAX_QUEUE` (= 12) deep and wait at most + `IDENTITIES_ACQUIRE_TIMEOUT_MS` (= 2000ms) for a slot; if none frees (or + the queue is already full) they get the 503 + `Retry-After` above + instead of waiting forever or deepening the load. Implemented by + `ConcurrencyGate` (`src/libs/network/utils/concurrencyGate.ts`), a pure + in-process counting semaphore with a bounded FIFO wait queue. + +3. **Bounded work per request** — `limit` is hard-capped at 1000, the query + projects only `pubkey` + `identities` columns, and pagination is keyset + (PK seek) not offset, so no single request can pull the whole table into + memory. + +**Note on Bun workers:** workers were considered and deliberately *not* +used here. Workers (as in `txValidatorPool`) offload **CPU-bound** crypto +off the event loop. `/identities` is **I/O-bound** (a Postgres query) — the +event loop is already free while awaiting the DB, so a worker would add IPC +copy overhead, would not reduce DB load, and would give an attacker more +threads/memory to exhaust. The concurrency gate caps DB load directly, +which is the actual bottleneck. diff --git a/src/libs/blockchain/gcr/gcr.ts b/src/libs/blockchain/gcr/gcr.ts index 6aada667..a1fdcee0 100644 --- a/src/libs/blockchain/gcr/gcr.ts +++ b/src/libs/blockchain/gcr/gcr.ts @@ -65,6 +65,10 @@ import TxValidatorPool from "../validation/txValidatorPool" import { GCRSubnetsTxs } from "@/model/entities/GCRv2/GCRSubnetsTxs" import { emptyResponse } from "@/libs/network" import { Validators } from "@/model/entities/Validators" +import { + IDENTITIES_DEFAULT_LIMIT, + IDENTITIES_MAX_LIMIT, +} from "@/utilities/constants" export type GetNativeSubnetsTxsOptions = { txData?: boolean @@ -1108,6 +1112,106 @@ export default class GCR { // return awardedAccounts } + /** + * List the linked identities of every account, paginated. + * + * Returns only `pubkey` + the `identities` jsonb blob per account — + * never balance/nonce/points — so the payload stays focused and the + * `bigint` balance column (which `JSON.stringify` cannot serialize) is + * never touched. + * + * Pagination is keyset (a.k.a. seek) on the `pubkey` primary key, not + * offset: each page seeks `WHERE pubkey > :cursor ORDER BY pubkey ASC + * LIMIT :n`, which stays O(log n) on the PK index regardless of how + * deep into the table the caller is. The `gcr_main` table can hold a + * large number of jsonb-heavy rows, so an unbounded `find()` is + * deliberately avoided here. + * + * @param limit Max rows per page. Clamped to [1, 1000]. Default 100. + * @param cursor The `pubkey` of the last row from the previous page. + * Omit for the first page. + * @returns `RPCResponse` whose `response` is + * `{ success, identities: [{ pubkey, identities }], count, limit, nextCursor }`. + * `nextCursor` is the last `pubkey` of this page when a full page was + * returned (more rows may exist), or `null` when the end was reached. + */ + static async listIdentities( + limit = 100, + cursor?: string, + ): Promise { + try { + // Clamp the page size into a sane bound. A non-numeric or + // non-positive limit falls back to the default; the hard cap + // protects the node from a single huge response. + const DEFAULT_LIMIT = IDENTITIES_DEFAULT_LIMIT + const MAX_LIMIT = IDENTITIES_MAX_LIMIT + const parsedLimit = Number(limit) + // Floor of a fractional limit in (0, 1) is 0, which would request + // an empty page; clamp to a minimum of 1 so pageSize is always a + // positive integer. + const pageSize = + Number.isFinite(parsedLimit) && parsedLimit > 0 + ? Math.max(1, Math.min(Math.floor(parsedLimit), MAX_LIMIT)) + : DEFAULT_LIMIT + + const db = await Datasource.getInstance() + const gcrMainRepository = db.getDataSource().getRepository(GCRMain) + + const qb = gcrMainRepository + .createQueryBuilder("gcr") + .select(["gcr.pubkey", "gcr.identities"]) + .orderBy("gcr.pubkey", "ASC") + .limit(pageSize) + + // Keyset seek: only rows strictly after the cursor pubkey. + if (cursor) { + qb.where("gcr.pubkey > :cursor", { cursor }) + } + + const accounts = await qb.getMany() + + const identities = accounts.map(account => ({ + pubkey: account.pubkey, + identities: account.identities, + })) + + // A full page means there may be more rows; hand back the last + // pubkey as the next cursor. A short page is the end of the table. + const nextCursor = + identities.length === pageSize && identities.length > 0 + ? identities[identities.length - 1].pubkey + : null + + return { + result: 200, + response: { + success: true, + identities, + count: identities.length, + limit: pageSize, + nextCursor, + }, + extra: null, + require_reply: false, + } + } catch (error) { + log.error("Error listing identities: " + error) + return { + result: 500, + response: { + success: false, + error: "Failed to list identities", + message: + error instanceof Error + ? error.message + : "Unknown error", + }, + extra: null, + require_reply: false, + } + } + } + // static async getFlaggedAccounts(start: number, end: number) { // const db = await Datasource.getInstance() // const gcrMainRepository = db.getDataSource().getRepository(GCRMain) diff --git a/src/libs/network/middleware/rateLimiter.ts b/src/libs/network/middleware/rateLimiter.ts index d7641f69..1ce860a1 100644 --- a/src/libs/network/middleware/rateLimiter.ts +++ b/src/libs/network/middleware/rateLimiter.ts @@ -529,6 +529,8 @@ export class RateLimiter { "/public_logs": "public_logs", "/diagnostics": "diagnostics", "/genesis": "genesis", + "/genesisBlock": "genesisblock", + "/identities": "identities", } if (req.method === "GET" && pathMethodMap[path]) { diff --git a/src/libs/network/server_rpc.ts b/src/libs/network/server_rpc.ts index dc8be215..9b584f23 100644 --- a/src/libs/network/server_rpc.ts +++ b/src/libs/network/server_rpc.ts @@ -5,6 +5,7 @@ import sharedState, { getSharedState } from "src/utilities/sharedState" import { PeerManager } from "../peer" import Chain from "../blockchain/chain" import Mempool from "../blockchain/mempool" +import GCR from "../blockchain/gcr/gcr" import { BunServer, cors, json, jsonResponse } from "./bunServer" import { RateLimiter } from "./middleware/rateLimiter" import { getAuthContext } from "./authContext" @@ -13,6 +14,16 @@ import { handleError } from "src/errors" import { isRPCRequest, processPayload, emptyResponse } from "./rpcDispatch" import { handleIdentityTxRateLimit } from "./rpcRateLimit" import { registerZkRoutes } from "./zkMerkle" +import { + ConcurrencyGate, + GateRejectedError, + GateTimeoutError, +} from "./utils/concurrencyGate" +import { + IDENTITIES_MAX_CONCURRENT, + IDENTITIES_MAX_QUEUE, + IDENTITIES_ACQUIRE_TIMEOUT_MS, +} from "src/utilities/constants" // Re-export for backward compatibility export { emptyResponse } @@ -27,6 +38,17 @@ export async function serverRpcBun() { // Initialize rate limiter const rateLimiter = RateLimiter.getInstance() + // Global concurrency gate for the expensive /identities full-table read. + // Per-IP rate limiting (above) stops a single source flooding; this gate + // caps TOTAL in-flight /identities work across ALL callers so a + // distributed (many-IP) burst still can't pile unbounded DB load. Overflow + // callers queue briefly, then get a fast 503 rather than waiting forever. + const identitiesGate = new ConcurrencyGate({ + maxConcurrent: IDENTITIES_MAX_CONCURRENT, + maxQueue: IDENTITIES_MAX_QUEUE, + acquireTimeoutMs: IDENTITIES_ACQUIRE_TIMEOUT_MS, + }) + // Apply middlewares // server.use(async (req, next) => { // const url = new URL(req.url) @@ -255,6 +277,93 @@ export async function serverRpcBun() { } }) + // Full genesis block (block 0) — the entire stored Blocks record, + // not just the embedded genesisData that /genesis returns. + server.get("/genesisBlock", async () => { + try { + const genesisBlock = await Chain.getGenesisBlock() + if (!genesisBlock) { + return jsonResponse( + { + result: 503, + response: "STATE_NOT_READY", + extra: { message: "Genesis block not found" }, + }, + 503, + ) + } + return jsonResponse(genesisBlock) + } catch (e) { + return jsonResponse( + { + result: 503, + response: "STATE_NOT_READY", + extra: { + message: e instanceof Error ? e.message : String(e), + }, + }, + 503, + ) + } + }) + + // Paginated listing of every account's linked identities (pubkey + + // identities blob only). Query params: ?limit=<1-1000>&cursor=. + // Keyset pagination — pass the response's `nextCursor` back as `cursor` + // to fetch the next page; a null nextCursor means the end of the table. + // + // Wrapped in the global concurrency gate: at most IDENTITIES_MAX_CONCURRENT + // of these run at once. Overflow callers queue up to IDENTITIES_MAX_QUEUE + // deep and wait at most IDENTITIES_ACQUIRE_TIMEOUT_MS for a slot, after + // which (or if the queue is already full) they get a 503 + Retry-After + // instead of adding to the DB load. + server.get("/identities", async req => { + const url = new URL(req.url) + const limitParam = url.searchParams.get("limit") + const cursorParam = url.searchParams.get("cursor") || undefined + const limit = limitParam ? Number(limitParam) : undefined + + try { + const result = await identitiesGate.run(() => + GCR.listIdentities(limit, cursorParam), + ) + const httpStatus = result.result === 200 ? 200 : 500 + return jsonResponse(result.response, httpStatus) + } catch (e) { + if ( + e instanceof GateTimeoutError || + e instanceof GateRejectedError + ) { + const retryAfterSecs = Math.ceil( + IDENTITIES_ACQUIRE_TIMEOUT_MS / 1000, + ) + return jsonResponse( + { + success: false, + error: "Service busy", + message: + "Too many concurrent /identities requests; retry shortly.", + }, + 503, + { "Retry-After": String(retryAfterSecs) }, + ) + } + // Anything else is a genuine failure, not backpressure. + log.error( + "[/identities] unexpected error: " + + (e instanceof Error ? e.message : String(e)), + ) + return jsonResponse( + { + success: false, + error: "Failed to list identities", + message: e instanceof Error ? e.message : String(e), + }, + 500, + ) + } + }) + server.get("/rate-limit/stats", () => { return jsonResponse(rateLimiter.getStats()) }) diff --git a/src/libs/network/utils/concurrencyGate.test.ts b/src/libs/network/utils/concurrencyGate.test.ts new file mode 100644 index 00000000..fbd20d22 --- /dev/null +++ b/src/libs/network/utils/concurrencyGate.test.ts @@ -0,0 +1,337 @@ +import { describe, expect, it, jest } from "bun:test" + +import { + ConcurrencyGate, + GateRejectedError, + GateTimeoutError, +} from "./concurrencyGate" + +describe("ConcurrencyGate", () => { + describe("constructor validation", () => { + it("rejects maxConcurrent < 1", () => { + expect( + () => + new ConcurrencyGate({ + maxConcurrent: 0, + maxQueue: 1, + acquireTimeoutMs: 10, + }), + ).toThrow(RangeError) + }) + + it("rejects negative maxQueue", () => { + expect( + () => + new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: -1, + acquireTimeoutMs: 10, + }), + ).toThrow(RangeError) + }) + + it("rejects negative acquireTimeoutMs", () => { + expect( + () => + new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 0, + acquireTimeoutMs: -5, + }), + ).toThrow(RangeError) + }) + + it("accepts maxQueue = 0 and acquireTimeoutMs = 0", () => { + expect( + () => + new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 0, + acquireTimeoutMs: 0, + }), + ).not.toThrow() + }) + }) + + describe("immediate acquire under capacity", () => { + it("hands out up to maxConcurrent slots without waiting", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 2, + maxQueue: 4, + acquireTimeoutMs: 1000, + }) + + const r1 = await gate.acquire() + const r2 = await gate.acquire() + + expect(typeof r1).toBe("function") + expect(typeof r2).toBe("function") + expect(gate.active).toBe(2) + expect(gate.queued).toBe(0) + + r1() + r2() + expect(gate.active).toBe(0) + }) + }) + + describe("queueing past capacity", () => { + it("queues the overflow caller then serves it on release", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 4, + acquireTimeoutMs: 1000, + }) + + const r1 = await gate.acquire() + expect(gate.active).toBe(1) + + const pending = gate.acquire() + // Allow the pending promise's executor to enqueue. + await Promise.resolve() + expect(gate.queued).toBe(1) + expect(gate.active).toBe(1) + + // Release the held slot; it is handed directly to the waiter. + r1() + const r2 = await pending + expect(gate.queued).toBe(0) + expect(gate.active).toBe(1) + + r2() + expect(gate.active).toBe(0) + }) + + it("serves multiple waiters in FIFO order", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 8, + acquireTimeoutMs: 1000, + }) + + const order: number[] = [] + const first = await gate.acquire() + + // Enqueue three waiters in a known order. + const p1 = gate.acquire().then((rel) => { + order.push(1) + return rel + }) + const p2 = gate.acquire().then((rel) => { + order.push(2) + return rel + }) + const p3 = gate.acquire().then((rel) => { + order.push(3) + return rel + }) + + await Promise.resolve() + expect(gate.queued).toBe(3) + + // Drain the chain: each release hands the slot to the next waiter. + first() + ;(await p1)() + ;(await p2)() + ;(await p3)() + + expect(order).toEqual([1, 2, 3]) + expect(gate.active).toBe(0) + expect(gate.queued).toBe(0) + }) + }) + + describe("queue-full rejection", () => { + it("rejects with GateRejectedError without enqueueing", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 1, + acquireTimeoutMs: 1000, + }) + + const r1 = await gate.acquire() + const queued = gate.acquire() // fills the single queue slot + await Promise.resolve() + expect(gate.queued).toBe(1) + + // Third acquire: no slot, queue full => immediate rejection. + await expect(gate.acquire()).rejects.toBeInstanceOf( + GateRejectedError, + ) + expect(gate.queued).toBe(1) + + // Cleanup so the queued promise settles. + r1() + ;(await queued)() + }) + }) + + describe("acquire timeout", () => { + it("rejects a waiter with GateTimeoutError after acquireTimeoutMs", async () => { + // Real (short) timer rather than fake timers: bun:test's fake-timer + // support does not reliably drive setTimeout callbacks, so a small + // real wait keeps this deterministic without flaking. + const gate = new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 4, + acquireTimeoutMs: 30, + }) + + const r1 = await gate.acquire() + expect(gate.active).toBe(1) + + const pending = gate.acquire() + // Pre-empt an unhandled-rejection warning by attaching a catch now; + // the real assertion happens after the wait below. + pending.catch(() => {}) + + // Let the executor run and enqueue the waiter. + await Promise.resolve() + expect(gate.queued).toBe(1) + + // Wait past the acquire timeout; the waiter removes itself + rejects. + await new Promise(resolve => setTimeout(resolve, 60)) + await expect(pending).rejects.toBeInstanceOf(GateTimeoutError) + + expect(gate.queued).toBe(0) + // The held slot is untouched by the timeout. + expect(gate.active).toBe(1) + + r1() + expect(gate.active).toBe(0) + }) + }) + + describe("release idempotency", () => { + it("releases only one slot when called twice", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 2, + maxQueue: 4, + acquireTimeoutMs: 1000, + }) + + const r1 = await gate.acquire() + const r2 = await gate.acquire() + expect(gate.active).toBe(2) + + r1() + r1() // second call is a no-op + expect(gate.active).toBe(1) + + r2() + expect(gate.active).toBe(0) + }) + + it("does not double-serve a waiter on a double release", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 4, + acquireTimeoutMs: 1000, + }) + + const r1 = await gate.acquire() + const pending = gate.acquire() + await Promise.resolve() + expect(gate.queued).toBe(1) + + r1() + const r2 = await pending + r1() // idempotent: must NOT free the slot now held by the waiter + + expect(gate.active).toBe(1) + expect(gate.queued).toBe(0) + + r2() + expect(gate.active).toBe(0) + }) + }) + + describe("run()", () => { + it("releases the slot on success and returns the value", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 4, + acquireTimeoutMs: 1000, + }) + + const result = await gate.run(async () => { + expect(gate.active).toBe(1) + return 42 + }) + + expect(result).toBe(42) + expect(gate.active).toBe(0) + }) + + it("releases the slot when fn throws and propagates the error", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 4, + acquireTimeoutMs: 1000, + }) + + const boom = new Error("boom") + await expect( + gate.run(async () => { + throw boom + }), + ).rejects.toBe(boom) + + expect(gate.active).toBe(0) + }) + + it("never invokes fn when acquire is rejected (queue full)", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 1, + maxQueue: 0, + acquireTimeoutMs: 1000, + }) + + const r1 = await gate.acquire() + const fn = jest.fn(async () => "unreachable") + + await expect(gate.run(fn)).rejects.toBeInstanceOf(GateRejectedError) + expect(fn).not.toHaveBeenCalled() + + r1() + }) + }) + + describe("counters", () => { + it("tracks active and queued through a full cycle", async () => { + const gate = new ConcurrencyGate({ + maxConcurrent: 2, + maxQueue: 4, + acquireTimeoutMs: 1000, + }) + + expect(gate.active).toBe(0) + expect(gate.queued).toBe(0) + + const r1 = await gate.acquire() + const r2 = await gate.acquire() + expect(gate.active).toBe(2) + + const p3 = gate.acquire() + const p4 = gate.acquire() + await Promise.resolve() + expect(gate.queued).toBe(2) + expect(gate.active).toBe(2) + + r1() + const r3 = await p3 + expect(gate.queued).toBe(1) + expect(gate.active).toBe(2) + + r2() + const r4 = await p4 + expect(gate.queued).toBe(0) + expect(gate.active).toBe(2) + + r3() + r4() + expect(gate.active).toBe(0) + expect(gate.queued).toBe(0) + }) + }) +}) diff --git a/src/libs/network/utils/concurrencyGate.ts b/src/libs/network/utils/concurrencyGate.ts new file mode 100644 index 00000000..58433c60 --- /dev/null +++ b/src/libs/network/utils/concurrencyGate.ts @@ -0,0 +1,193 @@ +/** + * In-process async concurrency gate (counting semaphore). + * + * Bounds how many callers may hold a slot at once (`maxConcurrent`). Callers + * that arrive while all slots are taken wait in a bounded FIFO queue + * (`maxQueue`). Each waiter is subject to a per-acquire timeout + * (`acquireTimeoutMs`); if no slot frees before it elapses the waiter is + * removed from the queue and rejected. + * + * Designed for a single-thread Bun/Node event loop: no real locks are needed, + * correctness comes from careful ordering of the synchronous bookkeeping + * performed inside `acquire`, `release` and the timeout callback. + * + * @example + * const gate = new ConcurrencyGate({ maxConcurrent: 4, maxQueue: 16, acquireTimeoutMs: 250 }) + * const value = await gate.run(() => expensiveRead()) + */ +export class ConcurrencyGate { + private readonly maxConcurrent: number + private readonly maxQueue: number + private readonly acquireTimeoutMs: number + + /** Number of slots currently handed out (release functions outstanding). */ + private activeCount = 0 + + /** FIFO list of waiters blocked on a free slot. */ + private readonly waiters: Waiter[] = [] + + constructor(opts: { + maxConcurrent: number + maxQueue: number + acquireTimeoutMs: number + }) { + if (!Number.isInteger(opts.maxConcurrent) || opts.maxConcurrent < 1) { + throw new RangeError("maxConcurrent must be an integer >= 1") + } + if (!Number.isInteger(opts.maxQueue) || opts.maxQueue < 0) { + throw new RangeError("maxQueue must be an integer >= 0") + } + if ( + !Number.isFinite(opts.acquireTimeoutMs) || + opts.acquireTimeoutMs < 0 + ) { + throw new RangeError("acquireTimeoutMs must be a number >= 0") + } + this.maxConcurrent = opts.maxConcurrent + this.maxQueue = opts.maxQueue + this.acquireTimeoutMs = opts.acquireTimeoutMs + } + + /** Slots currently held. */ + get active(): number { + return this.activeCount + } + + /** Callers currently waiting in the queue. */ + get queued(): number { + return this.waiters.length + } + + /** + * Acquire a slot. + * + * Resolution semantics: + * - If a slot is free (`active < maxConcurrent`) it is taken synchronously + * and the returned promise resolves on the next microtask with a release + * function. + * - Otherwise, if the wait queue is already full (`queued >= maxQueue`) the + * promise rejects immediately with a {@link GateRejectedError} and the + * caller is NOT enqueued. + * - Otherwise the caller is enqueued. It resolves with a release function + * when an earlier holder releases and hands its slot directly to this + * waiter (preserving FIFO order), or rejects with a + * {@link GateTimeoutError} once `acquireTimeoutMs` elapses, whichever + * happens first. The pending timer is always cleared on settle. + * + * @returns a release function. Calling it returns the slot. It is + * idempotent: calling it more than once releases at most one slot. + */ + acquire(): Promise<() => void> { + // Fast path: a slot is free, take it without queueing. + if (this.activeCount < this.maxConcurrent) { + this.activeCount += 1 + return Promise.resolve(this.makeRelease()) + } + + // No slot free and the queue is saturated: reject without enqueueing. + if (this.waiters.length >= this.maxQueue) { + return Promise.reject( + new GateRejectedError( + "concurrency gate queue is full; request rejected", + ), + ) + } + + // Enqueue and wait for a slot or the timeout, whichever comes first. + return new Promise<() => void>((resolve, reject) => { + const waiter: Waiter = { + resolve, + reject, + timer: null, + } + + waiter.timer = setTimeout(() => { + // Remove self from the queue (still present => not yet served). + const idx = this.waiters.indexOf(waiter) + if (idx !== -1) { + this.waiters.splice(idx, 1) + } + waiter.timer = null + reject( + new GateTimeoutError( + "timed out while waiting for a concurrency gate slot", + ), + ) + }, this.acquireTimeoutMs) + + this.waiters.push(waiter) + }) + } + + /** + * Run `fn` while holding a slot, releasing it afterwards. + * + * Acquires a slot first; if acquisition rejects (timeout or queue full) + * the returned promise rejects with that error and `fn` is never invoked. + * Otherwise `fn` is awaited inside a try/finally so the slot is always + * released, whether `fn` resolves or throws. + */ + async run(fn: () => Promise): Promise { + const release = await this.acquire() + try { + return await fn() + } finally { + release() + } + } + + /** + * Build an idempotent release function bound to one held slot. + * + * On release, if a waiter is queued the freed slot is handed directly to + * the oldest waiter (its timer cleared, its promise resolved with a fresh + * release) instead of decrementing then re-incrementing `activeCount`. + * This keeps the slot count effectively unchanged across the handoff and + * prevents a brand-new `acquire` from stealing the slot ahead of a waiter. + */ + private makeRelease(): () => void { + let released = false + return () => { + if (released) { + return + } + released = true + + const next = this.waiters.shift() + if (next) { + if (next.timer !== null) { + clearTimeout(next.timer) + next.timer = null + } + // Slot is handed off: activeCount stays the same. + next.resolve(this.makeRelease()) + return + } + + // No waiter: actually free the slot. + this.activeCount -= 1 + } + } +} + +interface Waiter { + resolve: (release: () => void) => void + reject: (err: Error) => void + timer: ReturnType | null +} + +/** Thrown when a queued acquire exceeds `acquireTimeoutMs`. */ +export class GateTimeoutError extends Error { + constructor(message = "concurrency gate acquire timed out") { + super(message) + this.name = "GateTimeoutError" + } +} + +/** Thrown when an acquire arrives and the wait queue is already full. */ +export class GateRejectedError extends Error { + constructor(message = "concurrency gate queue is full") { + super(message) + this.name = "GateRejectedError" + } +} diff --git a/src/utilities/constants.ts b/src/utilities/constants.ts index 92fe1a4f..55502482 100644 --- a/src/utilities/constants.ts +++ b/src/utilities/constants.ts @@ -31,6 +31,21 @@ export const RATE_LIMIT_POST_MAX_REQUESTS = 200_000 export const RATE_LIMIT_POST_WINDOW_MS = 86_400_000 export const RATE_LIMIT_TX_PER_BLOCK = 4 +// /identities is an expensive full-table read; throttle it far below the +// default GET allowance (per-IP) and cap how many can run at once (global). +export const RATE_LIMIT_IDENTITIES_MAX_REQUESTS = 30 +export const RATE_LIMIT_IDENTITIES_WINDOW_MS = 60_000 +/** Max concurrent /identities executions across ALL callers. */ +export const IDENTITIES_MAX_CONCURRENT = 3 +/** Max callers allowed to wait for a free /identities slot before a fast 503. */ +export const IDENTITIES_MAX_QUEUE = 12 +/** How long a queued /identities caller waits for a slot before a 503. */ +export const IDENTITIES_ACQUIRE_TIMEOUT_MS = 2_000 +/** Default page size when /identities is called without a valid ?limit. */ +export const IDENTITIES_DEFAULT_LIMIT = 100 +/** Hard cap on the /identities page size so one request can't dump the table. */ +export const IDENTITIES_MAX_LIMIT = 1000 + /** Localhost IPs that always bypass rate limiting */ export const LOCALHOST_IPS = [ "127.0.0.1", diff --git a/src/utilities/sharedState.ts b/src/utilities/sharedState.ts index 7e8ac041..01248587 100644 --- a/src/utilities/sharedState.ts +++ b/src/utilities/sharedState.ts @@ -36,6 +36,8 @@ import { RATE_LIMIT_POST_MAX_REQUESTS, RATE_LIMIT_POST_WINDOW_MS, RATE_LIMIT_TX_PER_BLOCK, + RATE_LIMIT_IDENTITIES_MAX_REQUESTS, + RATE_LIMIT_IDENTITIES_WINDOW_MS, LOCALHOST_IPS, TWITTER_COOKIE_FILE, } from "./constants" @@ -442,6 +444,7 @@ export default class SharedState { ], methodLimits: { POST: { maxRequests: RATE_LIMIT_POST_MAX_REQUESTS, windowMs: RATE_LIMIT_POST_WINDOW_MS }, + identities: { maxRequests: RATE_LIMIT_IDENTITIES_MAX_REQUESTS, windowMs: RATE_LIMIT_IDENTITIES_WINDOW_MS }, }, txPerBlock: RATE_LIMIT_TX_PER_BLOCK, // Proxy-header trust — see RateLimiter constructor for resolution