Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions packages/vercel-flags-core/src/client.test.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,38 @@
import { describe, expect, it } from 'vitest';
import { createClient } from './client';
import { createRawClient } from './client';
import { InMemoryDataSource } from './data-source/in-memory-data-source';

describe('createClient', () => {
describe('createRawClient', () => {
it('should be a function', () => {
expect(typeof createClient).toBe('function');
expect(typeof createRawClient).toBe('function');
});

it('should allow a custom data source', () => {
const inlineDataSource = new InMemoryDataSource({ definitions: {} });
const flagsClient = createClient({
dataSource: inlineDataSource,
const inlineDataSource = new InMemoryDataSource({
data: { definitions: {}, segments: {} },
projectId: 'test',
environment: 'production',
});
const flagsClient = createRawClient({
dataSource: inlineDataSource,
});

expect(flagsClient.environment).toEqual('production');
expect(flagsClient.dataSource).toEqual(inlineDataSource);
});

it('should evaluate', async () => {
const customDataSource = new InMemoryDataSource({
definitions: {
'summer-sale': { environments: { production: 0 }, variants: [false] },
data: {
definitions: {
'summer-sale': { environments: { production: 0 }, variants: [false] },
},
segments: {},
},
projectId: 'test',
environment: 'production',
});
const flagsClient = createClient({
const flagsClient = createRawClient({
dataSource: customDataSource,
environment: 'production',
});

await expect(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import { HttpResponse, http } from 'msw';
import { setupServer } from 'msw/node';
import { afterAll, afterEach, beforeAll, describe, expect, it } from 'vitest';
import { FlagNetworkDataSource } from './flag-network-data-source';

const server = setupServer();

beforeAll(() => server.listen());
afterEach(() => server.resetHandlers());
afterAll(() => server.close());

function createNdjsonStream(messages: object[], delayMs = 0): ReadableStream {
return new ReadableStream({
async start(controller) {
for (const message of messages) {
if (delayMs > 0) await new Promise((r) => setTimeout(r, delayMs));
controller.enqueue(
new TextEncoder().encode(JSON.stringify(message) + '\n'),
);
}
controller.close();
},
});
}

describe('FlagNetworkDataSource', () => {
it('should parse datafile messages from NDJSON stream', async () => {
const definitions = {
projectId: 'test-project',
definitions: { 'my-flag': { variants: [true, false] } },
};

server.use(
http.get('https://flags.vercel.com/v1/stream', () => {
return new HttpResponse(
createNdjsonStream([{ type: 'datafile', data: definitions }]),
{ headers: { 'Content-Type': 'application/x-ndjson' } },
);
}),
);

const dataSource = new FlagNetworkDataSource({ sdkKey: 'test-key' });
await dataSource.subscribe();

expect(dataSource.definitions).toEqual(definitions);
});

it('should ignore ping messages', async () => {
const definitions = {
projectId: 'test-project',
definitions: {},
};

server.use(
http.get('https://flags.vercel.com/v1/stream', () => {
return new HttpResponse(
createNdjsonStream([
{ type: 'ping' },
{ type: 'datafile', data: definitions },
{ type: 'ping' },
]),
{ headers: { 'Content-Type': 'application/x-ndjson' } },
);
}),
);

const dataSource = new FlagNetworkDataSource({ sdkKey: 'test-key' });
await dataSource.subscribe();

expect(dataSource.definitions).toEqual(definitions);
});

it('should stop reconnecting on terminate message', async () => {
const definitions = {
projectId: 'test-project',
definitions: {},
};

server.use(
http.get('https://flags.vercel.com/v1/stream', () => {
return new HttpResponse(
createNdjsonStream([
{ type: 'datafile', data: definitions },
{ type: 'terminate', reason: 'sdk-key-revoked' },
]),
{ headers: { 'Content-Type': 'application/x-ndjson' } },
);
}),
);

const dataSource = new FlagNetworkDataSource({ sdkKey: 'test-key' });
await dataSource.subscribe();

// Wait for the loop to process the terminate message
await dataSource._loopPromise;

expect(dataSource.breakLoop).toBe(true);
});

it('should handle messages split across chunks', async () => {
const definitions = {
projectId: 'test-project',
definitions: { flag: { variants: [1, 2, 3] } },
};

const fullMessage = JSON.stringify({ type: 'datafile', data: definitions });
const part1 = fullMessage.slice(0, 20);
const part2 = fullMessage.slice(20) + '\n';

server.use(
http.get('https://flags.vercel.com/v1/stream', () => {
return new HttpResponse(
new ReadableStream({
async start(controller) {
controller.enqueue(new TextEncoder().encode(part1));
await new Promise((r) => setTimeout(r, 10));
controller.enqueue(new TextEncoder().encode(part2));
controller.close();
},
}),
{ headers: { 'Content-Type': 'application/x-ndjson' } },
);
}),
);

const dataSource = new FlagNetworkDataSource({ sdkKey: 'test-key' });
await dataSource.subscribe();

expect(dataSource.definitions).toEqual(definitions);
});

it('should update definitions when new datafile messages arrive', async () => {
const definitions1 = { projectId: 'test', definitions: { v: 1 } };
const definitions2 = { projectId: 'test', definitions: { v: 2 } };

server.use(
http.get('https://flags.vercel.com/v1/stream', () => {
return new HttpResponse(
createNdjsonStream([
{ type: 'datafile', data: definitions1 },
{ type: 'datafile', data: definitions2 },
{ type: 'terminate', reason: 'sdk-key-revoked' },
]),
{ headers: { 'Content-Type': 'application/x-ndjson' } },
);
}),
);

const dataSource = new FlagNetworkDataSource({ sdkKey: 'test-key' });
await dataSource.subscribe();

// Wait for stream to complete
await dataSource._loopPromise;

expect(dataSource.definitions).toEqual(definitions2);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import type { BundledDefinitions } from '../types';
import { readBundledDefinitions } from '../utils/read-bundled-definitions';
import type { DataSource, DataSourceMetadata } from './interface';

const debugLog = (...args: any[]) => {
if (process.env.DEBUG !== '1') return;
console.log(...args);
};

async function* streamAsyncIterable(stream: ReadableStream<Uint8Array>) {
const reader = stream.getReader();
try {
Expand All @@ -16,7 +21,7 @@ async function* streamAsyncIterable(stream: ReadableStream<Uint8Array>) {
}

/**
* Implements the DataSource interface for Edge Config.
* Implements the DataSource interface for flags.vercel.com.
*/
export class FlagNetworkDataSource implements DataSource {
sdkKey?: string;
Expand Down Expand Up @@ -89,7 +94,7 @@ export class FlagNetworkDataSource implements DataSource {
async createLoop() {
while (!this.breakLoop) {
try {
console.log(process.pid, 'createLoop → MAKE STREAM');
debugLog(process.pid, 'createLoop → MAKE STREAM');
const response = await fetch(`${this.host}/v1/stream`, {
headers: {
Authorization: `Bearer ${this.sdkKey}`,
Expand All @@ -100,12 +105,20 @@ export class FlagNetworkDataSource implements DataSource {
const error = new Error(
`Failed to fetch stream: ${response.statusText}`,
);
// Stop retrying on 4xx client errors (won't fix itself on retry)
if (response.status >= 400 && response.status < 500) {
this.breakLoop = true;
if (!this.hasReceivedData) {
this.rejectStreamInitPromise!(error);
}
throw error;
}
// Only reject the init promise if we haven't received data yet
if (!this.hasReceivedData) {
this.rejectStreamInitPromise!(error);
throw error;
}
// Otherwise, throw to trigger retry
// Otherwise, throw to trigger retry (5xx errors, etc.)
throw error;
}

Expand All @@ -121,51 +134,35 @@ export class FlagNetworkDataSource implements DataSource {
// Reset retry count on successful connection
this.retryCount = 0;

const decoder = new TextDecoder();
let buffer = '';

// Wait for the server to push some data
for await (const chunk of streamAsyncIterable(response.body)) {
if (this.breakLoop) break;
buffer += new TextDecoder().decode(chunk);

// SSE events are separated by double newlines
let eventBoundary = buffer.indexOf('\n\n');
while (eventBoundary !== -1) {
const eventBlock = buffer.slice(0, eventBoundary);
buffer = buffer.slice(eventBoundary + 2);

// Parse the SSE event block
let eventType: string | null = null;
let eventData: string | null = null;

for (const line of eventBlock.split('\n')) {
// Skip empty lines and comment lines (like ": ping")
if (line === '' || line.startsWith(':')) continue;

if (line.startsWith('event: ')) {
eventType = line.slice(7);
} else if (line.startsWith('data: ')) {
eventData = line.slice(6);
}
}
buffer += decoder.decode(chunk, { stream: true });

// Only process datafile events
if (eventType === 'datafile' && eventData) {
const data = JSON.parse(eventData) as BundledDefinitions;
this.definitions = data;
const lines = buffer.split('\n');
buffer = lines.pop()!; // Keep incomplete line in buffer

for (const line of lines) {
if (line === '') continue;

const message = JSON.parse(line) as
| { type: 'datafile'; data: BundledDefinitions }
| { type: 'ping' };

if (message.type === 'datafile') {
this.definitions = message.data;
this.hasReceivedData = true;
console.log(process.pid, 'loop → data', data);
this.resolveStreamInitPromise!(data);
debugLog(process.pid, 'loop → data', message.data);
this.resolveStreamInitPromise!(message.data);
}

// Check for more events in the buffer
eventBoundary = buffer.indexOf('\n\n');
}
}

// Stream ended - if not intentional, retry
if (!this.breakLoop) {
console.log(process.pid, 'loop → stream closed, will retry');
debugLog(process.pid, 'loop → stream closed, will retry');
}
} catch (error) {
// If we haven't received data and this is the initial connection,
Expand All @@ -189,7 +186,7 @@ export class FlagNetworkDataSource implements DataSource {
}
}

console.log(process.pid, 'loop → done');
debugLog(process.pid, 'loop → done');
}

async fetchData(): Promise<BundledDefinitions> {
Expand Down Expand Up @@ -217,22 +214,22 @@ export class FlagNetworkDataSource implements DataSource {
// but it's okay since we only ever read from memory here
async getData() {
if (!this.initialized) {
console.log(process.pid, 'getData → init');
debugLog(process.pid, 'getData → init');
await this.subscribe();
}
if (this.streamInitPromise) {
console.log(process.pid, 'getData → await');
debugLog(process.pid, 'getData → await');
await this.streamInitPromise;
}
if (this.definitions) {
console.log(process.pid, 'getData → definitions');
debugLog(process.pid, 'getData → definitions');
return this.definitions;
}
if (this.bundledDefinitions) {
console.log(process.pid, 'getData → bundledDefinitions');
debugLog(process.pid, 'getData → bundledDefinitions');
return this.bundledDefinitions;
}
console.log(process.pid, 'getData → throw');
debugLog(process.pid, 'getData → throw');
throw new Error('No definitions found');
}
}
Loading
Loading