From 5af478f903a62c08b07b7f16b350e6b6ab174f39 Mon Sep 17 00:00:00 2001 From: Ben Papillon Date: Fri, 22 May 2026 10:14:24 -0700 Subject: [PATCH 1/6] parse datastream payloads through Fern serializers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Incoming WebSocket payloads (`rulesengine.Flags`, `Company`, `User`) were applied to the local cache via bare `message.data as Schematic.Foo` casts, with no transformation between the wire format and the in-memory type. Two stacked bugs fall out of that: 1. snake_case vs camelCase. The Go server emits snake_case JSON (`event_subtype`, `condition_type`, `credit_id`), and the Fern- generated TS types use camelCase. Without running incoming payloads through the serializer, every camelCase property read on a cached entity returns `undefined`. That silently breaks any downstream consumer that walks the cached structure — most painfully `findCreditCondition()` on the credit-lease check path, which loops over `condition.conditionType` and `condition.eventSubtype` looking for a match and never finds one. 2. Go nil-slice → JSON null. `json.Marshal` serializes a nil slice as `null`, but the Fern serializers declare these as required `list(...)` and reject null with an opaque "Expected list. Received null." error. The same flag tree that fails the camelCase reads above also has `condition_groups: null` on rules that don't carry any groups, so even after wiring the serializer in the parse rejects the whole payload. Fix: run flag / company / user payloads through `parseOrThrow` after walking the raw object to coerce known list-shaped wire fields from null to []. Parse failures degrade to a warn-log + skip rather than poisoning the whole connection. --- src/datastream/datastream-client.ts | 92 +++++++++++++++++++++++++++-- 1 file changed, 86 insertions(+), 6 deletions(-) diff --git a/src/datastream/datastream-client.ts b/src/datastream/datastream-client.ts index 5d161793..db752e25 100644 --- a/src/datastream/datastream-client.ts +++ b/src/datastream/datastream-client.ts @@ -5,6 +5,55 @@ import { RulesEngineClient } from '../rules-engine'; import { Logger } from '../logger'; import { LazyEmitter } from './emitter'; import { partialCompany, partialUser, deepCopyCompany as deepCopyCompanyFn } from './merge'; +import * as serializers from '../serialization'; + +// Wire payloads from the schematic-api Go server use snake_case JSON +// (e.g. `event_subtype`, `condition_type`, `credit_id`). The TypeScript +// API types use camelCase (`eventSubtype`, `conditionType`, `creditId`), +// and downstream consumers (lease check, rules-engine WASM bridge, etc.) +// read camelCase. Without running incoming payloads through the Fern +// serializer the fields read as `undefined`, which silently breaks +// `findCreditCondition()` and any other camelCase access. +const PARSE_OPTS = { + allowUnrecognizedEnumValues: true, + allowUnrecognizedUnionMembers: true, + unrecognizedObjectKeys: 'passthrough' as const, +}; + +// Go json.Marshal serializes nil slices as `null`, but the Fern-generated +// serializers declare these fields as required `list(...)` and reject null +// outright (with an opaque "Expected list. Received null." error that +// silently disables every downstream feature). Walk the raw payload and +// coerce known list-shaped wire fields from null to []. Names below are +// the snake_case wire keys the Go server emits. +const NULLABLE_LIST_KEYS = new Set([ + 'rules', + 'conditions', + 'condition_groups', + 'resource_ids', + 'metrics', + 'traits', + 'plans', + 'billing_products', + 'subscriptions', + 'features', + 'keys', + 'company_plans', +]); + +function coerceNullArrays(value: unknown): unknown { + if (Array.isArray(value)) return value.map(coerceNullArrays); + if (value === null || typeof value !== 'object') return value; + const out: Record = {}; + for (const [k, v] of Object.entries(value as Record)) { + if (v === null && NULLABLE_LIST_KEYS.has(k)) { + out[k] = []; + } else { + out[k] = coerceNullArrays(v); + } + } + return out; +} // Import cache providers from the cache module import type { CacheProvider } from '../cache/types'; @@ -705,7 +754,12 @@ export class DataStreamClient extends LazyEmitter { return; } } else { - company = message.data as Schematic.RulesengineCompany; + try { + company = serializers.RulesengineCompany.parseOrThrow(coerceNullArrays(message.data), PARSE_OPTS); + } catch (error) { + this.logger.warn(`Failed to deserialize company payload: ${error}`); + return; + } } if (!company) { @@ -768,7 +822,12 @@ export class DataStreamClient extends LazyEmitter { return; } } else { - user = message.data as Schematic.RulesengineUser; + try { + user = serializers.RulesengineUser.parseOrThrow(coerceNullArrays(message.data), PARSE_OPTS); + } catch (error) { + this.logger.warn(`Failed to deserialize user payload: ${error}`); + return; + } } if (!user) { @@ -808,13 +867,28 @@ export class DataStreamClient extends LazyEmitter { * handleFlagsMessage processes bulk flags messages */ private async handleFlagsMessage(message: DataStreamResp): Promise { - const flags = message.data as Schematic.RulesengineFlag[]; - - if (!Array.isArray(flags)) { + const rawFlags = message.data as unknown[]; + + if (!Array.isArray(rawFlags)) { this.logger.warn('Expected flags array in bulk flags message'); return; } + const flags: Schematic.RulesengineFlag[] = []; + let parseFailureCount = 0; + let firstFailure: unknown = undefined; + for (const raw of rawFlags) { + try { + flags.push(serializers.RulesengineFlag.parseOrThrow(coerceNullArrays(raw), PARSE_OPTS)); + } catch (error) { + parseFailureCount++; + if (firstFailure === undefined) firstFailure = error; + } + } + if (parseFailureCount > 0) { + this.logger.warn(`Failed to deserialize ${parseFailureCount} flag(s) in bulk message: ${String(firstFailure)}`); + } + const results = await Promise.allSettled( flags .filter((flag) => flag?.key) @@ -854,7 +928,13 @@ export class DataStreamClient extends LazyEmitter { * handleFlagMessage processes single flag messages */ private async handleFlagMessage(message: DataStreamResp): Promise { - const flag = message.data as Schematic.RulesengineFlag; + let flag: Schematic.RulesengineFlag; + try { + flag = serializers.RulesengineFlag.parseOrThrow(coerceNullArrays(message.data), PARSE_OPTS); + } catch (error) { + this.logger.warn(`Failed to deserialize flag payload: ${error}`); + return; + } if (!flag?.key) { return; From 9997701be44b48880b792470a4c76d3c1ac96355 Mon Sep 17 00:00:00 2001 From: Ben Papillon Date: Fri, 22 May 2026 13:42:49 -0700 Subject: [PATCH 2/6] canonicalize merge output to camelCase, update tests for parseOrThrow shape MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit partialCompany/partialUser were writing snake_case keys back onto a camelCase cached entity, leaving the cache in a hybrid shape after any partial update. Update both to write camelCase target keys consistently so the cache stays in one canonical form. Test fixture updates: - Add `plan_version_ids: []` to all Company mocks. Required by the Fern schema and previously absent — once payloads route through parseOrThrow, the missing field caused ParseError and downstream test timeouts (the cache update silently failed and getCompany() hung waiting for a never-arriving WS response). - Compute expected post-parse shapes via asCompany/asUser/asFlag helpers. Wire fixtures stay snake_case (matching what the server sends); assertions compare against the camelCase canonical shape. - Switch partial-merge assertions from snake_case field names to camelCase to match the canonicalized merge output. --- src/datastream/merge.ts | 83 +++++++++--------- .../unit/datastream/datastream-client.test.ts | 85 ++++++++++++------- 2 files changed, 92 insertions(+), 76 deletions(-) diff --git a/src/datastream/merge.ts b/src/datastream/merge.ts index 90599b88..1cd78afc 100644 --- a/src/datastream/merge.ts +++ b/src/datastream/merge.ts @@ -1,36 +1,16 @@ import type * as Schematic from "../api/types"; -/** - * Helper to read a property that may be in camelCase or snake_case form. - * Wire data from WebSocket uses snake_case; Fern-generated types use camelCase. - */ -function getProp(obj: Record, camel: string, snake: string): unknown { - return obj[camel] ?? obj[snake]; -} - -/** - * Creates a complete deep copy of a Company object. - */ export function deepCopyCompany(c: Schematic.RulesengineCompany): Schematic.RulesengineCompany { return JSON.parse(JSON.stringify(c)); } -/** - * Creates a complete deep copy of a User object. - */ export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.RulesengineUser { return JSON.parse(JSON.stringify(u)); } -/** - * Merges a partial update into an existing Company. - * Deep-copies the existing company, then applies only the fields - * present in the partial object. - * - * Wire format uses snake_case keys. The existing company from cache - * may have either camelCase or snake_case keys depending on how it - * was stored. - */ +// Partial updates arrive as raw wire payloads (snake_case keys) and are merged +// into an existing camelCase-canonicalized entity. Each case writes the +// corresponding camelCase field so the cached entity stays in a single shape. export function partialCompany( existing: Schematic.RulesengineCompany, partial: Record, @@ -40,42 +20,54 @@ export function partialCompany( for (const key of Object.keys(partial)) { switch (key) { case "id": + merged.id = partial[key]; + break; case "account_id": + merged.accountId = partial[key]; + break; case "environment_id": - merged[key] = partial[key]; + merged.environmentId = partial[key]; break; case "base_plan_id": - merged[key] = partial[key] ?? null; + merged.basePlanId = partial[key] ?? null; break; case "billing_product_ids": + merged.billingProductIds = partial[key]; + break; case "plan_ids": + merged.planIds = partial[key]; + break; case "plan_version_ids": + merged.planVersionIds = partial[key]; + break; case "entitlements": + merged.entitlements = partial[key]; + break; case "rules": + merged.rules = partial[key]; + break; case "traits": + merged.traits = partial[key]; + break; case "subscription": - merged[key] = partial[key]; + merged.subscription = partial[key]; break; case "keys": { - const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record; + const existingKeys = (merged.keys ?? {}) as Record; const incomingKeys = partial[key] as Record; - merged[key] = { ...existingKeys, ...incomingKeys }; + merged.keys = { ...existingKeys, ...incomingKeys }; break; } case "credit_balances": { - const existingCB = (getProp(merged, "creditBalances", "credit_balances") ?? {}) as Record< - string, - number - >; + const existingCB = (merged.creditBalances ?? {}) as Record; const incomingCB = partial[key] as Record; - merged[key] = { ...existingCB, ...incomingCB }; + merged.creditBalances = { ...existingCB, ...incomingCB }; break; } case "metrics": { - const existingMetrics = ((getProp(merged, "metrics", "metrics") as unknown[]) ?? - []) as Schematic.RulesengineCompanyMetric[]; + const existingMetrics = (merged.metrics ?? []) as Schematic.RulesengineCompanyMetric[]; const incomingMetrics = partial[key] as Schematic.RulesengineCompanyMetric[]; - merged[key] = upsertMetrics(existingMetrics, incomingMetrics); + merged.metrics = upsertMetrics(existingMetrics, incomingMetrics); break; } // Ignore unknown keys silently @@ -85,11 +77,6 @@ export function partialCompany( return merged as unknown as Schematic.RulesengineCompany; } -/** - * Merges a partial update into an existing User. - * Deep-copies the existing user, then applies only the fields - * present in the partial object. - */ export function partialUser( existing: Schematic.RulesengineUser, partial: Record, @@ -99,19 +86,25 @@ export function partialUser( for (const key of Object.keys(partial)) { switch (key) { case "id": + merged.id = partial[key]; + break; case "account_id": + merged.accountId = partial[key]; + break; case "environment_id": - merged[key] = partial[key]; + merged.environmentId = partial[key]; break; case "keys": { - const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record; + const existingKeys = (merged.keys ?? {}) as Record; const incomingKeys = partial[key] as Record; - merged[key] = { ...existingKeys, ...incomingKeys }; + merged.keys = { ...existingKeys, ...incomingKeys }; break; } case "traits": + merged.traits = partial[key]; + break; case "rules": - merged[key] = partial[key]; + merged.rules = partial[key]; break; // Ignore unknown keys silently } diff --git a/tests/unit/datastream/datastream-client.test.ts b/tests/unit/datastream/datastream-client.test.ts index c0f180fb..f6dd4dc2 100644 --- a/tests/unit/datastream/datastream-client.test.ts +++ b/tests/unit/datastream/datastream-client.test.ts @@ -7,6 +7,23 @@ import { DatastreamWSClient } from '../../../src/datastream/websocket-client'; import { DataStreamResp, EntityType, MessageType } from '../../../src/datastream/types'; import { Logger } from '../../../src/logger'; import * as Schematic from '../../../src/api/types'; +import * as serializers from '../../../src/serialization'; + +const PARSE_OPTS = { + allowUnrecognizedEnumValues: true, + allowUnrecognizedUnionMembers: true, + unrecognizedObjectKeys: 'passthrough' as const, +}; +// The SUT runs incoming snake_case wire payloads through Fern's parseOrThrow +// to canonicalize them to camelCase before caching. Mock fixtures are written +// in wire format (snake_case), so we route them through the same serializer +// to compute the expected camelCase shape returned by getCompany/getUser/getFlag. +const asCompany = (c: unknown): Schematic.RulesengineCompany => + serializers.RulesengineCompany.parseOrThrow(c, PARSE_OPTS); +const asUser = (u: unknown): Schematic.RulesengineUser => + serializers.RulesengineUser.parseOrThrow(u, PARSE_OPTS); +const asFlag = (f: unknown): Schematic.RulesengineFlag => + serializers.RulesengineFlag.parseOrThrow(f, PARSE_OPTS); // Mock DatastreamWSClient const mockDatastreamWSClientInstance = { on: jest.fn(), @@ -52,6 +69,7 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: [], + plan_version_ids: [], billing_product_ids: [], crm_product_ids: [], credit_balances: {}, @@ -251,7 +269,7 @@ describe('DataStreamClient', () => { // Verify company is cached and can be retrieved using the correct keys const retrievedCompany = await client.getCompany(mockCompany.keys!); - expect(retrievedCompany).toEqual(mockCompany); + expect(retrievedCompany).toEqual(asCompany(mockCompany)); }, 10000); test('should handle user messages and update cache', async () => { @@ -273,7 +291,7 @@ describe('DataStreamClient', () => { // Verify user is cached and can be retrieved using the correct keys const retrievedUser = await client.getUser(mockUser.keys!); - expect(retrievedUser).toEqual(mockUser); + expect(retrievedUser).toEqual(asUser(mockUser)); }, 10000); test('should handle flag messages and update cache', async () => { @@ -295,7 +313,7 @@ describe('DataStreamClient', () => { // Verify flag is cached and can be retrieved const retrievedFlag = await client.getFlag(mockFlag.key); - expect(retrievedFlag).toEqual(mockFlag); + expect(retrievedFlag).toEqual(asFlag(mockFlag)); }); test('should handle partial entity message merging', async () => { @@ -315,6 +333,7 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: ['plan-1'], + plan_version_ids: [], billing_product_ids: [], crm_product_ids: [], credit_balances: {}, @@ -328,7 +347,7 @@ describe('DataStreamClient', () => { // Verify the full company is cached const cachedFull = await client.getCompany({ name: 'Partial Corp' }); - expect(cachedFull).toEqual(fullCompany); + expect(cachedFull).toEqual(asCompany(fullCompany)); // Send a PARTIAL company message. Wire shape: data is the partial fields, // entity_id at the top level identifies the cached company to merge into. @@ -345,15 +364,17 @@ describe('DataStreamClient', () => { // Partial messages are now properly merged: fields in the partial update // the cached entity, while fields not present in the partial are preserved. + // Cached values are camelCase (canonicalized by parseOrThrow on the FULL + // message), and partialCompany writes camelCase keys, so assertions read + // camelCase regardless of whether the field was full-loaded or merged. const cachedAfterPartial = await client.getCompany({ name: 'Partial Corp' }); expect(cachedAfterPartial.id).toBe('company-partial'); expect((cachedAfterPartial as any).traits).toEqual([{ key: 'tier', value: 'enterprise' }]); - expect((cachedAfterPartial as any).plan_ids).toEqual(['plan-2']); - // Original fields not present in the partial message are preserved + expect((cachedAfterPartial as any).planIds).toEqual(['plan-2']); expect((cachedAfterPartial as any).metrics).toEqual([]); expect((cachedAfterPartial as any).rules).toEqual([]); - expect((cachedAfterPartial as any).account_id).toBe('account-123'); - expect((cachedAfterPartial as any).billing_product_ids).toEqual([]); + expect((cachedAfterPartial as any).accountId).toBe('account-123'); + expect((cachedAfterPartial as any).billingProductIds).toEqual([]); }, 10000); test('should skip partial company message when entity is not in cache', async () => { @@ -561,9 +582,9 @@ describe('DataStreamClient', () => { const cachedUser = await client.getUser(mockUser.keys!); const cachedFlag = await client.getFlag(mockFlag.key); - expect(cachedCompany).toEqual(mockCompany); - expect(cachedUser).toEqual(mockUser); - expect(cachedFlag).toEqual(mockFlag); + expect(cachedCompany).toEqual(asCompany(mockCompany)); + expect(cachedUser).toEqual(asUser(mockUser)); + expect(cachedFlag).toEqual(asFlag(mockFlag)); }); test('should handle error type messages from WebSocket', async () => { @@ -793,6 +814,7 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: [], + plan_version_ids: [], billing_product_ids: [], crm_product_ids: [], credit_balances: {}, @@ -827,9 +849,9 @@ describe('DataStreamClient', () => { const bySlug = await client.getCompany({ slug: 'acme-corp' }); const byExtId = await client.getCompany({ external_id: 'ext-1' }); - expect(byName).toEqual(multiKeyCompany); - expect(bySlug).toEqual(multiKeyCompany); - expect(byExtId).toEqual(multiKeyCompany); + expect(byName).toEqual(asCompany(multiKeyCompany)); + expect(bySlug).toEqual(asCompany(multiKeyCompany)); + expect(byExtId).toEqual(asCompany(multiKeyCompany)); }); test('should retrieve user by any of its keys after caching', async () => { @@ -842,8 +864,8 @@ describe('DataStreamClient', () => { const byEmail = await client.getUser({ email: 'alice@example.com' }); const byUserId = await client.getUser({ user_id: 'u-1' }); - expect(byEmail).toEqual(multiKeyUser); - expect(byUserId).toEqual(multiKeyUser); + expect(byEmail).toEqual(asUser(multiKeyUser)); + expect(byUserId).toEqual(asUser(multiKeyUser)); }); test('should remove company from cache on DELETE for all keys', async () => { @@ -859,7 +881,7 @@ describe('DataStreamClient', () => { // Verify it's cached — returns from cache without sending a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); const cached = await client.getCompany({ name: 'acme' }); - expect(cached).toEqual(multiKeyCompany); + expect(cached).toEqual(asCompany(multiKeyCompany)); expect(mockDatastreamWSClientInstance.sendMessage).not.toHaveBeenCalled(); // Send DELETE @@ -904,7 +926,7 @@ describe('DataStreamClient', () => { // Verify it's cached — returns from cache without sending a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); const cached = await client.getUser({ email: 'alice@example.com' }); - expect(cached).toEqual(multiKeyUser); + expect(cached).toEqual(asUser(multiKeyUser)); expect(mockDatastreamWSClientInstance.sendMessage).not.toHaveBeenCalled(); // Send DELETE @@ -958,8 +980,8 @@ describe('DataStreamClient', () => { const byName = await client.getCompany({ name: 'acme' }); const bySlug = await client.getCompany({ slug: 'acme-corp' }); - expect(byName).toEqual(updatedCompany); - expect(bySlug).toEqual(updatedCompany); + expect(byName).toEqual(asCompany(updatedCompany)); + expect(bySlug).toEqual(asCompany(updatedCompany)); }); test('should handle deep copy to prevent mutation of cached entities', async () => { @@ -972,7 +994,7 @@ describe('DataStreamClient', () => { // Retrieve the company from cache const firstRetrieval = await client.getCompany({ name: 'acme' }); - expect(firstRetrieval).toEqual(multiKeyCompany); + expect(firstRetrieval).toEqual(asCompany(multiKeyCompany)); // Mutate a field on the returned object (firstRetrieval as any).traits = [{ key: 'mutated', value: 'yes' }]; @@ -999,9 +1021,9 @@ describe('DataStreamClient', () => { }); // Verify all three keys resolve from cache - expect(await client.getCompany({ name: 'acme' })).toEqual(multiKeyCompany); - expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(multiKeyCompany); - expect(await client.getCompany({ external_id: 'ext-1' })).toEqual(multiKeyCompany); + expect(await client.getCompany({ name: 'acme' })).toEqual(asCompany(multiKeyCompany)); + expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(asCompany(multiKeyCompany)); + expect(await client.getCompany({ external_id: 'ext-1' })).toEqual(asCompany(multiKeyCompany)); // Update with only two keys — external_id has been removed const updatedCompany = { @@ -1016,8 +1038,8 @@ describe('DataStreamClient', () => { }); // Remaining keys should still resolve from cache - expect(await client.getCompany({ name: 'acme' })).toEqual(updatedCompany); - expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(updatedCompany); + expect(await client.getCompany({ name: 'acme' })).toEqual(asCompany(updatedCompany)); + expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(asCompany(updatedCompany)); // Removed key should miss cache and trigger a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); @@ -1045,8 +1067,8 @@ describe('DataStreamClient', () => { }); // Verify both keys resolve from cache - expect(await client.getUser({ email: 'alice@example.com' })).toEqual(multiKeyUser); - expect(await client.getUser({ user_id: 'u-1' })).toEqual(multiKeyUser); + expect(await client.getUser({ email: 'alice@example.com' })).toEqual(asUser(multiKeyUser)); + expect(await client.getUser({ user_id: 'u-1' })).toEqual(asUser(multiKeyUser)); // Update with only email — user_id has been removed const updatedUser = { @@ -1061,7 +1083,7 @@ describe('DataStreamClient', () => { }); // Remaining key should still resolve from cache - expect(await client.getUser({ email: 'alice@example.com' })).toEqual(updatedUser); + expect(await client.getUser({ email: 'alice@example.com' })).toEqual(asUser(updatedUser)); // Removed key should miss cache and trigger a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); @@ -1088,7 +1110,7 @@ describe('DataStreamClient', () => { data: multiKeyCompany, }); - expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(multiKeyCompany); + expect(await client.getCompany({ slug: 'acme-corp' })).toEqual(asCompany(multiKeyCompany)); // Update: slug value changed from 'acme-corp' to 'acme-inc' const updatedCompany = { @@ -1103,7 +1125,7 @@ describe('DataStreamClient', () => { }); // New slug should resolve from cache - expect(await client.getCompany({ slug: 'acme-inc' })).toEqual(updatedCompany); + expect(await client.getCompany({ slug: 'acme-inc' })).toEqual(asCompany(updatedCompany)); // Old slug value should miss cache and trigger a WS request mockDatastreamWSClientInstance.sendMessage.mockClear(); @@ -1156,6 +1178,7 @@ describe('DataStreamClient', () => { rules: [], metrics: [], plan_ids: [], + plan_version_ids: [], billing_product_ids: [], crm_product_ids: [], credit_balances: {}, From 840d46b7cd3211285e629bf946095d6e1f01b96b Mon Sep 17 00:00:00 2001 From: Ben Papillon Date: Fri, 22 May 2026 14:57:40 -0700 Subject: [PATCH 3/6] chore(tests): fix merge.test/datastream-client.test for camelCase canonicalization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit merge.test.ts: cached entities now model the post-parseOrThrow shape (camelCase). Partial inputs stay snake_case (wire). Assertions check the camelCase output keys partialCompany/partialUser now write. datastream-client.test.ts: the two-level-caching metrics test was providing a metric with only { eventSubtype, value } — parseOrThrow rejects the company on the FULL message (RulesengineCompanyMetric requires account_id/company_id/created_at/environment_id/event_subtype/ month_reset/period/value), so the cache never lands and the subsequent getCompany hangs out the test timer. Provide a full metric. --- .../unit/datastream/datastream-client.test.ts | 11 ++- tests/unit/datastream/merge.test.ts | 82 ++++++++++--------- 2 files changed, 52 insertions(+), 41 deletions(-) diff --git a/tests/unit/datastream/datastream-client.test.ts b/tests/unit/datastream/datastream-client.test.ts index f6dd4dc2..ab3b649f 100644 --- a/tests/unit/datastream/datastream-client.test.ts +++ b/tests/unit/datastream/datastream-client.test.ts @@ -1146,7 +1146,16 @@ describe('DataStreamClient', () => { const companyWithMetrics = { ...multiKeyCompany, metrics: [ - { eventSubtype: 'api-call', value: 10 }, + { + account_id: 'account-123', + company_id: 'company-multi', + created_at: '2026-01-01T00:00:00Z', + environment_id: 'env-123', + event_subtype: 'api-call', + month_reset: 'first_of_month', + period: 'all_time', + value: 10, + }, ], } as unknown as Schematic.RulesengineCompany; diff --git a/tests/unit/datastream/merge.test.ts b/tests/unit/datastream/merge.test.ts index 8b9216b4..8208e255 100644 --- a/tests/unit/datastream/merge.test.ts +++ b/tests/unit/datastream/merge.test.ts @@ -6,23 +6,25 @@ import { deepCopyUser, } from '../../../src/datastream/merge'; -// Helper: base company in snake_case wire format (matches WebSocket data) +// Helper: base company in camelCase (matches the cached, parseOrThrow-normalized +// shape that partialCompany sees in production). Partial payloads arrive in +// snake_case from the wire; the merge function canonicalizes to camelCase. function baseCompany(): Schematic.RulesengineCompany { return { id: 'co-1', - account_id: 'acc-1', - environment_id: 'env-1', - base_plan_id: 'plan-1', - billing_product_ids: ['bp-1'], - credit_balances: { 'credit-1': 100.0 }, + accountId: 'acc-1', + environmentId: 'env-1', + basePlanId: 'plan-1', + billingProductIds: ['bp-1'], + creditBalances: { 'credit-1': 100.0 }, keys: { domain: 'example.com' }, - plan_ids: ['plan-1'], - plan_version_ids: ['pv-1'], + planIds: ['plan-1'], + planVersionIds: ['pv-1'], traits: [ - { value: 'Enterprise', trait_definition: { id: 'plan', comparable_type: 'string', entity_type: 'company' } }, + { value: 'Enterprise', traitDefinition: { id: 'plan', comparableType: 'string', entityType: 'company' } }, ], entitlements: [ - { feature_id: 'feat-1', feature_key: 'feature-one', value_type: 'boolean' }, + { featureId: 'feat-1', featureKey: 'feature-one', valueType: 'boolean' }, ], metrics: [], rules: [], @@ -32,11 +34,11 @@ function baseCompany(): Schematic.RulesengineCompany { function baseUser(): Schematic.RulesengineUser { return { id: 'user-1', - account_id: 'acc-1', - environment_id: 'env-1', + accountId: 'acc-1', + environmentId: 'env-1', keys: { email: 'user@example.com' }, traits: [ - { value: 'Premium', trait_definition: { id: 'tier', comparable_type: 'string', entity_type: 'user' } }, + { value: 'Premium', traitDefinition: { id: 'tier', comparableType: 'string', entityType: 'user' } }, ], rules: [], } as unknown as Schematic.RulesengineUser; @@ -73,11 +75,11 @@ describe('partialCompany', () => { expect((m.traits as Record[])[0].value).toBe('Startup'); // Other fields preserved - expect(m.account_id).toBe('acc-1'); - expect(m.environment_id).toBe('env-1'); + expect(m.accountId).toBe('acc-1'); + expect(m.environmentId).toBe('env-1'); expect(m.keys).toEqual({ domain: 'example.com' }); - expect(m.billing_product_ids).toEqual(['bp-1']); - expect(m.base_plan_id).toBe('plan-1'); + expect(m.billingProductIds).toEqual(['bp-1']); + expect(m.basePlanId).toBe('plan-1'); }); test('merges keys - new key added, existing preserved', () => { @@ -98,7 +100,7 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.credit_balances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); }); test('overwrites credit balance', () => { @@ -108,7 +110,7 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.credit_balances).toEqual({ 'credit-1': 50.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 50.0 }); }); test('upserts metrics - updates existing, appends new', () => { @@ -169,7 +171,7 @@ describe('partialCompany', () => { const m = merged as unknown as Record; expect(m.entitlements).toEqual([]); - expect(m.account_id).toBe('acc-1'); + expect(m.accountId).toBe('acc-1'); }); test('null base_plan_id sets to null', () => { @@ -179,8 +181,8 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.base_plan_id).toBeNull(); - expect(m.billing_product_ids).toEqual(['bp-1']); + expect(m.basePlanId).toBeNull(); + expect(m.billingProductIds).toEqual(['bp-1']); }); test('tolerates missing id - cache lookup uses envelope entity_id', () => { @@ -193,7 +195,7 @@ describe('partialCompany', () => { const merged = partialCompany(existing, partial); const m = merged as unknown as Record; - expect(m.credit_balances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 100.0, 'credit-2': 200.0 }); expect(m.id).toBe('co-1'); }); @@ -277,13 +279,13 @@ describe('partialCompany', () => { const m = merged as unknown as Record; expect(m.id).toBe('co-1'); - expect(m.account_id).toBe('acc-2'); - expect(m.environment_id).toBe('env-2'); - expect(m.base_plan_id).toBe('plan-99'); - expect(m.billing_product_ids).toEqual(['bp-10', 'bp-20']); + expect(m.accountId).toBe('acc-2'); + expect(m.environmentId).toBe('env-2'); + expect(m.basePlanId).toBe('plan-99'); + expect(m.billingProductIds).toEqual(['bp-10', 'bp-20']); // Credit balances merge: credit-1 overwritten, credit-new added - expect(m.credit_balances).toEqual({ 'credit-1': 999.0, 'credit-new': 50.0 }); + expect(m.creditBalances).toEqual({ 'credit-1': 999.0, 'credit-new': 50.0 }); const entitlements = m.entitlements as Record[]; expect(entitlements.length).toBe(2); @@ -301,8 +303,8 @@ describe('partialCompany', () => { expect(metrics[1].event_subtype).toBe('event-new'); expect(metrics[1].value).toBe(7); - expect(m.plan_ids).toEqual(['plan-99', 'plan-100']); - expect(m.plan_version_ids).toEqual(['pv-99']); + expect(m.planIds).toEqual(['plan-99', 'plan-100']); + expect(m.planVersionIds).toEqual(['pv-99']); const rules = m.rules as Record[]; expect(rules.length).toBe(2); @@ -318,8 +320,8 @@ describe('partialCompany', () => { // Original not mutated const orig = existing as unknown as Record; - expect(orig.account_id).toBe('acc-1'); - expect(orig.base_plan_id).toBe('plan-1'); + expect(orig.accountId).toBe('acc-1'); + expect(orig.basePlanId).toBe('plan-1'); expect(orig.keys).toEqual({ domain: 'example.com' }); expect((orig.metrics as Record[])[0].value).toBe(10); }); @@ -422,12 +424,12 @@ describe('deepCopyCompany', () => { const origRaw = orig as unknown as Record; origRaw.metrics = [ { - account_id: 'acc-1', environment_id: 'env-1', company_id: 'co-1', - event_subtype: 'event-1', period: 'all_time', month_reset: 'first_of_month', - value: 42, created_at: '2026-01-01T00:00:00Z', + accountId: 'acc-1', environmentId: 'env-1', companyId: 'co-1', + eventSubtype: 'event-1', period: 'all_time', monthReset: 'first_of_month', + value: 42, createdAt: '2026-01-01T00:00:00Z', }, ]; - origRaw.subscription = { id: 'sub-1', period_start: '2026-01-01T00:00:00Z', period_end: '2027-01-01T00:00:00Z' }; + origRaw.subscription = { id: 'sub-1', periodStart: '2026-01-01T00:00:00Z', periodEnd: '2027-01-01T00:00:00Z' }; const cp = deepCopyCompany(orig); const cpRaw = cp as unknown as Record; @@ -437,8 +439,8 @@ describe('deepCopyCompany', () => { expect((origRaw.keys as Record).domain).toBe('example.com'); // Credit balances are independent - (cpRaw.credit_balances as Record)['credit-1'] = 999; - expect((origRaw.credit_balances as Record)['credit-1']).toBe(100.0); + (cpRaw.creditBalances as Record)['credit-1'] = 999; + expect((origRaw.creditBalances as Record)['credit-1']).toBe(100.0); // Metrics are independent ((cpRaw.metrics as Record[])[0]).value = 999; @@ -460,8 +462,8 @@ describe('deepCopyUser', () => { test('empty fields - user with only required fields', () => { const cp = deepCopyUser({ id: 'u1', - account_id: 'acc-1', - environment_id: 'env-1', + accountId: 'acc-1', + environmentId: 'env-1', keys: {}, traits: [], rules: [], From 858659628ef33253876d12051d94e7f97aeae409 Mon Sep 17 00:00:00 2001 From: Ben Papillon Date: Fri, 22 May 2026 18:10:54 -0700 Subject: [PATCH 4/6] widen NULLABLE_LIST_KEYS to cover rulesengine.Company slice fields MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit rulesengine v0.1.16 migrated billing_product_ids, plan_ids, plan_version_ids, and entitlements on the wire Company struct to JSONSlice[T] — so they marshal as [] not null. But until the api ships v0.1.16 (api PR #5535) the WebSocket still ships null for these on any company without a plan/subscription, and parseOrThrow on the SDK side rejects the message. Existing NULLABLE_LIST_KEYS set missed the four names above; add them so the coerce helper keeps the SDK working against pre-bump APIs in the field. Once #5535 deploys the whole coerceNullArrays hack can be deleted in one followup. --- src/datastream/datastream-client.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/datastream/datastream-client.ts b/src/datastream/datastream-client.ts index db752e25..ebd1494e 100644 --- a/src/datastream/datastream-client.ts +++ b/src/datastream/datastream-client.ts @@ -39,6 +39,16 @@ const NULLABLE_LIST_KEYS = new Set([ 'features', 'keys', 'company_plans', + // rulesengine.Company slice fields. Pre-v0.1.16 these are []T in Go, + // post-v0.1.16 they're JSONSlice[T] (marshal as []). Once the schematic-api + // bump deploys (api PR #5535) the wire never ships null for these, and the + // whole coerceNullArrays helper can be deleted. Include them defensively + // in the meantime so a company without a plan/subscription doesn't fail + // parseOrThrow on the WS path. + 'billing_product_ids', + 'plan_ids', + 'plan_version_ids', + 'entitlements', ]); function coerceNullArrays(value: unknown): unknown { From e834f33db9e6f310957090973c5a07d26015c5c2 Mon Sep 17 00:00:00 2001 From: Ben Papillon Date: Fri, 22 May 2026 20:31:15 -0700 Subject: [PATCH 5/6] drop coerceNullArrays now that schematic-api ships JSONSlice [] rulesengine v0.1.16 wire types use JSONSlice[T] so nil slices marshal as [] not null, and the api bump (schematic-api #5535) is now live on main. parseOrThrow on the WS payloads no longer encounters null where a list is expected, so the coerce helper and its key allowlist have nothing to do. Pass message.data straight into the serializers. --- src/datastream/datastream-client.ts | 53 +++-------------------------- 1 file changed, 4 insertions(+), 49 deletions(-) diff --git a/src/datastream/datastream-client.ts b/src/datastream/datastream-client.ts index ebd1494e..42d802c9 100644 --- a/src/datastream/datastream-client.ts +++ b/src/datastream/datastream-client.ts @@ -20,51 +20,6 @@ const PARSE_OPTS = { unrecognizedObjectKeys: 'passthrough' as const, }; -// Go json.Marshal serializes nil slices as `null`, but the Fern-generated -// serializers declare these fields as required `list(...)` and reject null -// outright (with an opaque "Expected list. Received null." error that -// silently disables every downstream feature). Walk the raw payload and -// coerce known list-shaped wire fields from null to []. Names below are -// the snake_case wire keys the Go server emits. -const NULLABLE_LIST_KEYS = new Set([ - 'rules', - 'conditions', - 'condition_groups', - 'resource_ids', - 'metrics', - 'traits', - 'plans', - 'billing_products', - 'subscriptions', - 'features', - 'keys', - 'company_plans', - // rulesengine.Company slice fields. Pre-v0.1.16 these are []T in Go, - // post-v0.1.16 they're JSONSlice[T] (marshal as []). Once the schematic-api - // bump deploys (api PR #5535) the wire never ships null for these, and the - // whole coerceNullArrays helper can be deleted. Include them defensively - // in the meantime so a company without a plan/subscription doesn't fail - // parseOrThrow on the WS path. - 'billing_product_ids', - 'plan_ids', - 'plan_version_ids', - 'entitlements', -]); - -function coerceNullArrays(value: unknown): unknown { - if (Array.isArray(value)) return value.map(coerceNullArrays); - if (value === null || typeof value !== 'object') return value; - const out: Record = {}; - for (const [k, v] of Object.entries(value as Record)) { - if (v === null && NULLABLE_LIST_KEYS.has(k)) { - out[k] = []; - } else { - out[k] = coerceNullArrays(v); - } - } - return out; -} - // Import cache providers from the cache module import type { CacheProvider } from '../cache/types'; import { LocalCache } from '../cache/local'; @@ -765,7 +720,7 @@ export class DataStreamClient extends LazyEmitter { } } else { try { - company = serializers.RulesengineCompany.parseOrThrow(coerceNullArrays(message.data), PARSE_OPTS); + company = serializers.RulesengineCompany.parseOrThrow(message.data, PARSE_OPTS); } catch (error) { this.logger.warn(`Failed to deserialize company payload: ${error}`); return; @@ -833,7 +788,7 @@ export class DataStreamClient extends LazyEmitter { } } else { try { - user = serializers.RulesengineUser.parseOrThrow(coerceNullArrays(message.data), PARSE_OPTS); + user = serializers.RulesengineUser.parseOrThrow(message.data, PARSE_OPTS); } catch (error) { this.logger.warn(`Failed to deserialize user payload: ${error}`); return; @@ -889,7 +844,7 @@ export class DataStreamClient extends LazyEmitter { let firstFailure: unknown = undefined; for (const raw of rawFlags) { try { - flags.push(serializers.RulesengineFlag.parseOrThrow(coerceNullArrays(raw), PARSE_OPTS)); + flags.push(serializers.RulesengineFlag.parseOrThrow(raw, PARSE_OPTS)); } catch (error) { parseFailureCount++; if (firstFailure === undefined) firstFailure = error; @@ -940,7 +895,7 @@ export class DataStreamClient extends LazyEmitter { private async handleFlagMessage(message: DataStreamResp): Promise { let flag: Schematic.RulesengineFlag; try { - flag = serializers.RulesengineFlag.parseOrThrow(coerceNullArrays(message.data), PARSE_OPTS); + flag = serializers.RulesengineFlag.parseOrThrow(message.data, PARSE_OPTS); } catch (error) { this.logger.warn(`Failed to deserialize flag payload: ${error}`); return; From 96ecf1582834efeb78e9265201ba933d20b871b9 Mon Sep 17 00:00:00 2001 From: Ben Papillon Date: Fri, 22 May 2026 21:02:53 -0700 Subject: [PATCH 6/6] drop PARSE_OPTS lenient flags; rely on Fern defaults --- src/datastream/datastream-client.ts | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/src/datastream/datastream-client.ts b/src/datastream/datastream-client.ts index 42d802c9..b395cdbe 100644 --- a/src/datastream/datastream-client.ts +++ b/src/datastream/datastream-client.ts @@ -7,19 +7,6 @@ import { LazyEmitter } from './emitter'; import { partialCompany, partialUser, deepCopyCompany as deepCopyCompanyFn } from './merge'; import * as serializers from '../serialization'; -// Wire payloads from the schematic-api Go server use snake_case JSON -// (e.g. `event_subtype`, `condition_type`, `credit_id`). The TypeScript -// API types use camelCase (`eventSubtype`, `conditionType`, `creditId`), -// and downstream consumers (lease check, rules-engine WASM bridge, etc.) -// read camelCase. Without running incoming payloads through the Fern -// serializer the fields read as `undefined`, which silently breaks -// `findCreditCondition()` and any other camelCase access. -const PARSE_OPTS = { - allowUnrecognizedEnumValues: true, - allowUnrecognizedUnionMembers: true, - unrecognizedObjectKeys: 'passthrough' as const, -}; - // Import cache providers from the cache module import type { CacheProvider } from '../cache/types'; import { LocalCache } from '../cache/local'; @@ -720,7 +707,7 @@ export class DataStreamClient extends LazyEmitter { } } else { try { - company = serializers.RulesengineCompany.parseOrThrow(message.data, PARSE_OPTS); + company = serializers.RulesengineCompany.parseOrThrow(message.data); } catch (error) { this.logger.warn(`Failed to deserialize company payload: ${error}`); return; @@ -788,7 +775,7 @@ export class DataStreamClient extends LazyEmitter { } } else { try { - user = serializers.RulesengineUser.parseOrThrow(message.data, PARSE_OPTS); + user = serializers.RulesengineUser.parseOrThrow(message.data); } catch (error) { this.logger.warn(`Failed to deserialize user payload: ${error}`); return; @@ -844,7 +831,7 @@ export class DataStreamClient extends LazyEmitter { let firstFailure: unknown = undefined; for (const raw of rawFlags) { try { - flags.push(serializers.RulesengineFlag.parseOrThrow(raw, PARSE_OPTS)); + flags.push(serializers.RulesengineFlag.parseOrThrow(raw)); } catch (error) { parseFailureCount++; if (firstFailure === undefined) firstFailure = error; @@ -895,7 +882,7 @@ export class DataStreamClient extends LazyEmitter { private async handleFlagMessage(message: DataStreamResp): Promise { let flag: Schematic.RulesengineFlag; try { - flag = serializers.RulesengineFlag.parseOrThrow(message.data, PARSE_OPTS); + flag = serializers.RulesengineFlag.parseOrThrow(message.data); } catch (error) { this.logger.warn(`Failed to deserialize flag payload: ${error}`); return;