From 1850ef01ac4c8511738cdfe8cca8cf0449462384 Mon Sep 17 00:00:00 2001 From: Colin B Date: Tue, 3 Mar 2026 15:53:34 -0800 Subject: [PATCH 1/3] Fix _internal_btql.limit and _internal_btql.cursor being silently overwritten by SDK defaults in the TypeScript SDK's ObjectFetcher.fetchRecordsFromApi(). --- js/src/logger.ts | 18 +++- js/src/object-fetcher.test.ts | 162 ++++++++++++++++++++++++++++++++++ 2 files changed, 178 insertions(+), 2 deletions(-) create mode 100644 js/src/object-fetcher.test.ts diff --git a/js/src/logger.ts b/js/src/logger.ts index f37855a47..52e87a953 100644 --- a/js/src/logger.ts +++ b/js/src/logger.ts @@ -5518,7 +5518,21 @@ export class ObjectFetcher implements AsyncIterable< ): AsyncGenerator> { const state = await this.getState(); const objectId = await this.id; - const limit = batchSize ?? DEFAULT_FETCH_BATCH_SIZE; + const batchLimit = batchSize ?? DEFAULT_FETCH_BATCH_SIZE; + const internalLimit = ( + this._internal_btql as { limit?: number } | undefined + )?.limit; + const limit = + batchSize !== undefined ? batchSize : (internalLimit ?? batchLimit); + const internalBtqlWithoutReservedQueryKeys = Object.fromEntries( + Object.entries(this._internal_btql ?? {}).filter( + ([key]) => + key !== "cursor" && + key !== "limit" && + key !== "select" && + key !== "from", + ), + ); let cursor = undefined; let iterations = 0; while (true) { @@ -5526,7 +5540,6 @@ export class ObjectFetcher implements AsyncIterable< `btql`, { query: { - ...this._internal_btql, select: [ { op: "star", @@ -5547,6 +5560,7 @@ export class ObjectFetcher implements AsyncIterable< }, cursor, limit, + ...internalBtqlWithoutReservedQueryKeys, }, use_columnstore: false, brainstore_realtime: true, diff --git a/js/src/object-fetcher.test.ts b/js/src/object-fetcher.test.ts new file mode 100644 index 000000000..931f2adcb --- /dev/null +++ b/js/src/object-fetcher.test.ts @@ -0,0 +1,162 @@ +import { describe, expect, test, vi } from "vitest"; +import { + DEFAULT_FETCH_BATCH_SIZE, + ObjectFetcher, + type BraintrustState, +} from "./logger"; +import { configureNode } from "./node/config"; + +configureNode(); + +type TestRecord = { id: string }; + +type MockBtqlResponse = { + data: Array>; + cursor?: string | null; +}; + +function createPostResponse(response: MockBtqlResponse) { + return { + json: vi.fn().mockResolvedValue(response), + }; +} + +function createPostMock( + response: MockBtqlResponse = { data: [], cursor: null }, +) { + return vi.fn().mockResolvedValue(createPostResponse(response)); +} + +class TestObjectFetcher extends ObjectFetcher { + constructor( + private readonly postMock: ReturnType, + internalBtql?: Record, + ) { + super("dataset", undefined, undefined, internalBtql); + } + + public get id(): Promise { + return Promise.resolve("test-dataset-id"); + } + + protected async getState(): Promise { + return { + apiConn: () => ({ + post: this.postMock, + }), + } as unknown as BraintrustState; + } +} + +async function triggerFetch( + fetcher: TestObjectFetcher, + options?: { batchSize?: number }, +) { + await fetcher.fetchedData(options); +} + +function getBtqlQuery( + postMock: ReturnType, + callIndex = 0, +) { + const call = postMock.mock.calls[callIndex]; + expect(call).toBeDefined(); + const requestBody = call[1] as { query: Record }; + return requestBody.query; +} + +describe("ObjectFetcher internal BTQL limit handling", () => { + test("preserves custom _internal_btql limit instead of default batch size", async () => { + const postMock = createPostMock(); + const fetcher = new TestObjectFetcher(postMock, { + limit: 50, + where: { op: "eq", left: "foo", right: "bar" }, + }); + + await triggerFetch(fetcher); + + expect(postMock).toHaveBeenCalledTimes(1); + const query = getBtqlQuery(postMock); + expect(query.limit).toBe(50); + expect(query.where).toEqual({ op: "eq", left: "foo", right: "bar" }); + }); + + test("uses default batch size when no _internal_btql limit is provided", async () => { + const postMock = createPostMock(); + const fetcher = new TestObjectFetcher(postMock); + + await triggerFetch(fetcher); + + const query = getBtqlQuery(postMock); + expect(query.limit).toBe(DEFAULT_FETCH_BATCH_SIZE); + }); + + test("uses explicit fetch batchSize when no _internal_btql limit is provided", async () => { + const postMock = createPostMock(); + const fetcher = new TestObjectFetcher(postMock); + + await triggerFetch(fetcher, { batchSize: 17 }); + + const query = getBtqlQuery(postMock); + expect(query.limit).toBe(17); + }); + + test("explicit batchSize overrides _internal_btql.limit", async () => { + const postMock = createPostMock(); + const fetcher = new TestObjectFetcher(postMock, { limit: 100 }); + + await triggerFetch(fetcher, { batchSize: 25 }); + + const query = getBtqlQuery(postMock); + expect(query.limit).toBe(25); + }); + + test("does not allow _internal_btql cursor to override pagination cursor", async () => { + const postMock = vi + .fn() + .mockResolvedValueOnce( + createPostResponse({ + data: [{ id: "record-1" }], + cursor: "next-page-cursor", + }), + ) + .mockResolvedValueOnce( + createPostResponse({ + data: [{ id: "record-2" }], + cursor: null, + }), + ); + const fetcher = new TestObjectFetcher(postMock, { + cursor: "stale-cursor", + limit: 1, + }); + + await triggerFetch(fetcher); + + expect(postMock).toHaveBeenCalledTimes(2); + const firstQuery = getBtqlQuery(postMock, 0); + const secondQuery = getBtqlQuery(postMock, 1); + expect(firstQuery.cursor).toBeUndefined(); + expect(secondQuery.cursor).toBe("next-page-cursor"); + }); + + test("does not allow _internal_btql select/from to override base object query", async () => { + const postMock = createPostMock(); + const fetcher = new TestObjectFetcher(postMock, { + where: { op: "eq", left: "foo", right: "bar" }, + select: [{ op: "literal", value: "malicious-select" }], + from: { op: "literal", value: "malicious-from" }, + }); + + await triggerFetch(fetcher); + + const query = getBtqlQuery(postMock); + expect(query.select).toEqual([{ op: "star" }]); + expect(query.from).toEqual({ + op: "function", + name: { op: "ident", name: ["dataset"] }, + args: [{ op: "literal", value: "test-dataset-id" }], + }); + expect(query.where).toEqual({ op: "eq", left: "foo", right: "bar" }); + }); +}); From 187e5b00b2055849d52736097525c0feda42b3d8 Mon Sep 17 00:00:00 2001 From: Colin B Date: Fri, 6 Mar 2026 13:55:07 -0800 Subject: [PATCH 2/3] Implement onFlushError callback handling in logger - Added `setOnFlushError` method to `BraintrustState` and `HTTPBackgroundLogger` to manage error callbacks during log flushing. - Updated `login`, `initLogger`, and `initExperiment` functions to accept and set the `onFlushError` callback. - Enhanced tests in `logger-misc.test.ts` to verify the correct wiring of the `onFlushError` callback during logger initialization and login processes. --- js/src/logger-misc.test.ts | 128 +++++++++++++++++++++++++++++++++++++ js/src/logger.ts | 23 +++++++ 2 files changed, 151 insertions(+) diff --git a/js/src/logger-misc.test.ts b/js/src/logger-misc.test.ts index 766ef2559..1d07ea7b9 100644 --- a/js/src/logger-misc.test.ts +++ b/js/src/logger-misc.test.ts @@ -13,6 +13,7 @@ import { initDataset, initExperiment, initLogger, + login, NOOP_SPAN, permalink, BraintrustState, @@ -27,6 +28,55 @@ configureNode(); const { extractAttachments, deepCopyEvent, validateTags } = _exportsForTestingOnly; +const TEST_API_KEY = "___TEST_API_KEY__THIS_IS_NOT_REAL___"; + +type HttpLogger = ReturnType; + +function getOnFlushError(bgLogger: HttpLogger) { + return ( + bgLogger as unknown as { + onFlushError?: (error: unknown) => void; + } + ).onFlushError; +} + +function mockFlushFailure(state: BraintrustState, error: Error): HttpLogger { + vi.spyOn(console, "warn").mockImplementation(() => {}); + vi.spyOn(state.apiConn(), "get_json").mockResolvedValue({ + logs3_payload_max_bytes: null, + }); + + const bgLogger = state.httpLogger(); + bgLogger.numTries = 1; + vi.spyOn( + bgLogger as unknown as { + submitLogsRequest: () => Promise; + }, + "submitLogsRequest", + ).mockRejectedValue(error); + return bgLogger; +} + +function mockExperimentRegister(state: BraintrustState) { + vi.spyOn(state.appConn(), "post_json").mockImplementation(async (path) => { + if (path === "api/experiment/register") { + return { + project: { + id: "test-project-id", + name: "test-project", + }, + experiment: { + id: "test-experiment-id", + name: "test-experiment", + created: "2024-01-01T00:00:00.000Z", + }, + }; + } + + throw new Error(`Unexpected app connection request: ${path}`); + }); +} + describe("validateTags", () => { test("accepts valid tags", () => { expect(() => validateTags(["foo", "bar", "baz"])).not.toThrow(); @@ -562,3 +612,81 @@ test("disable logging", async () => { expect(submitLogsRequestSpy).toHaveBeenCalledTimes(1); expect(submittedItems.length).toEqual(0); }); + +describe("onFlushError wiring", () => { + beforeEach(() => { + vi.restoreAllMocks(); + _exportsForTestingOnly.simulateLogoutForTests(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + _exportsForTestingOnly.simulateLogoutForTests(); + }); + + test("initLogger updates an existing background logger callback", async () => { + const state = await _exportsForTestingOnly.simulateLoginForTests(); + const flushError = new Error("initLogger flush failure"); + const onFlushError = vi.fn(); + + mockFlushFailure(state, flushError); + + const logger = initLogger({ + projectName: "test-project", + projectId: "test-project-id", + onFlushError, + }); + + const span = logger.startSpan({ name: "test-span" }); + span.end(); + await logger.flush(); + + expect(onFlushError).toHaveBeenCalledTimes(1); + expect(onFlushError).toHaveBeenCalledWith(flushError); + }); + + test("login fast path updates an existing background logger callback", async () => { + const state = await _exportsForTestingOnly.simulateLoginForTests(); + const onFlushError = vi.fn(); + const bgLogger = state.httpLogger(); + + await login({ + apiKey: state.loginToken!, + appUrl: state.appUrl!, + orgName: state.orgName!, + onFlushError, + }); + + expect(getOnFlushError(bgLogger)).toBe(onFlushError); + expect(getOnFlushError(state.httpLogger())).toBe(onFlushError); + }); + + test("initExperiment updates an existing background logger callback", async () => { + const state = new BraintrustState({}); + await state.login({ + apiKey: TEST_API_KEY, + appUrl: "https://braintrust.dev", + }); + const flushError = new Error("initExperiment flush failure"); + const onFlushError = vi.fn(); + + mockFlushFailure(state, flushError); + mockExperimentRegister(state); + + const experiment = initExperiment({ + project: "test-project", + projectId: "test-project-id", + experiment: "test-experiment", + onFlushError, + repoInfo: {}, + state, + }); + + const span = experiment.startSpan({ name: "test-span" }); + span.end(); + await experiment.flush(); + + expect(onFlushError).toHaveBeenCalledTimes(1); + expect(onFlushError).toHaveBeenCalledWith(flushError); + }); +}); diff --git a/js/src/logger.ts b/js/src/logger.ts index 52e87a953..54c567a21 100644 --- a/js/src/logger.ts +++ b/js/src/logger.ts @@ -805,7 +805,19 @@ export class BraintrustState { this.bgLogger().setMaskingFunction(maskingFunction); } + public setOnFlushError(onFlushError?: (error: unknown) => void): void { + if (onFlushError === undefined) { + return; + } + + this.loginParams.onFlushError = onFlushError; + if (this._bgLogger.hasSucceeded) { + this._bgLogger.get().setOnFlushError(onFlushError); + } + } + public async login(loginParams: LoginOptions & { forceLogin?: boolean }) { + this.setOnFlushError(loginParams.onFlushError); if (this.apiUrl && !loginParams.forceLogin) { return; } @@ -2759,6 +2771,14 @@ class HTTPBackgroundLogger implements BackgroundLogger { this.maskingFunction = maskingFunction; } + setOnFlushError(onFlushError?: (error: unknown) => void): void { + if (onFlushError === undefined) { + return; + } + + this.onFlushError = onFlushError; + } + pendingFlushBytes(): number { return this._pendingBytes; } @@ -3399,6 +3419,7 @@ export function init( const state = stateArg ?? _globalState; + state.setOnFlushError(options.onFlushError); // Ensure unlimited queue for init() calls (experiments) // Experiments should never drop data state.enforceQueueSizeLimit(false); @@ -3917,6 +3938,7 @@ export function initLogger( const state = stateArg ?? _globalState; + state.setOnFlushError(options.onFlushError); // Enable queue size limit enforcement for initLogger() calls // This ensures production observability doesn't OOM customer processes state.enforceQueueSizeLimit(true); @@ -4447,6 +4469,7 @@ export async function login( state.loginToken, ); checkUpdatedParam("orgName", options.orgName, state.orgName); + state.setOnFlushError(options.onFlushError); return state; } From 47affe96f92c58db58425e33f7f31bf9388e9e53 Mon Sep 17 00:00:00 2001 From: Colin B Date: Fri, 6 Mar 2026 15:14:07 -0800 Subject: [PATCH 3/3] Update logger-misc tests to use BraintrustState for login simulation --- js/src/logger-misc.test.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/js/src/logger-misc.test.ts b/js/src/logger-misc.test.ts index 1d07ea7b9..73ee64052 100644 --- a/js/src/logger-misc.test.ts +++ b/js/src/logger-misc.test.ts @@ -625,7 +625,11 @@ describe("onFlushError wiring", () => { }); test("initLogger updates an existing background logger callback", async () => { - const state = await _exportsForTestingOnly.simulateLoginForTests(); + const state = new BraintrustState({}); + await state.login({ + apiKey: TEST_API_KEY, + appUrl: "https://braintrust.dev", + }); const flushError = new Error("initLogger flush failure"); const onFlushError = vi.fn(); @@ -635,6 +639,7 @@ describe("onFlushError wiring", () => { projectName: "test-project", projectId: "test-project-id", onFlushError, + state, }); const span = logger.startSpan({ name: "test-span" });