Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 113 additions & 50 deletions packages/vercel-flags-core/src/data-source/flag-network-data-source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ export class FlagNetworkDataSource implements DataSource {
resolveStreamInitPromise: undefined | ((value: BundledDefinitions) => void);
rejectStreamInitPromise: undefined | ((reason?: any) => void);
initialized?: boolean = false;
private hasReceivedData: boolean = false;
private retryCount: number = 0;
private readonly maxRetryDelay: number = 30000; // 30 seconds max delay
private readonly baseRetryDelay: number = 1000; // 1 second initial delay
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we retry once immediately?


readonly host = 'https://flags.vercel.com';

Expand All @@ -41,6 +45,19 @@ export class FlagNetworkDataSource implements DataSource {
this.bundledDefinitions = readBundledDefinitions(this.sdkKey);
}

private getRetryDelay(): number {
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s (capped)
const delay = Math.min(
this.baseRetryDelay * Math.pow(2, this.retryCount),
this.maxRetryDelay,
);
return delay;
}

private async sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

async subscribe() {
// only init lazily to prevent opening streams when a page
// has no flags anyhow and just the client is imported
Expand Down Expand Up @@ -70,63 +87,105 @@ export class FlagNetworkDataSource implements DataSource {
}

async createLoop() {
console.log(process.pid, 'createLoop → MAKE STREAM');
const response = await fetch(`${this.host}/v1/stream`, {
headers: {
Authorization: `Bearer ${this.sdkKey}`,
},
});

if (!response.ok) {
const error = new Error(`Failed to fetch stream: ${response.statusText}`);
this.rejectStreamInitPromise!(error);
throw error;
}

if (!response.body) {
const error = new Error(`No body found`);
this.rejectStreamInitPromise!(error);
throw error;
}

let buffer = '';

// Wait for the server to push some data
for await (const chunk of streamAsyncIterable(response.body)) {
if (this.breakLoop) break;
buffer += new TextDecoder().decode(chunk);

// SSE events are separated by double newlines
let eventBoundary = buffer.indexOf('\n\n');
while (eventBoundary !== -1) {
const eventBlock = buffer.slice(0, eventBoundary);
buffer = buffer.slice(eventBoundary + 2);

// Parse the SSE event block
let eventType: string | null = null;
let eventData: string | null = null;
while (!this.breakLoop) {
try {
console.log(process.pid, 'createLoop → MAKE STREAM');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover?

const response = await fetch(`${this.host}/v1/stream`, {
headers: {
Authorization: `Bearer ${this.sdkKey}`,
},
});

if (!response.ok) {
const error = new Error(
`Failed to fetch stream: ${response.statusText}`,
);
// Only reject the init promise if we haven't received data yet
if (!this.hasReceivedData) {
this.rejectStreamInitPromise!(error);
throw error;
}
// Otherwise, throw to trigger retry
throw error;
}

for (const line of eventBlock.split('\n')) {
// Skip empty lines and comment lines (like ": ping")
if (line === '' || line.startsWith(':')) continue;
if (!response.body) {
const error = new Error(`No body found`);
if (!this.hasReceivedData) {
this.rejectStreamInitPromise!(error);
throw error;
}
throw error;
}

if (line.startsWith('event: ')) {
eventType = line.slice(7);
} else if (line.startsWith('data: ')) {
eventData = line.slice(6);
// Reset retry count on successful connection
this.retryCount = 0;

let buffer = '';

// Wait for the server to push some data
for await (const chunk of streamAsyncIterable(response.body)) {
if (this.breakLoop) break;
buffer += new TextDecoder().decode(chunk);

// SSE events are separated by double newlines
let eventBoundary = buffer.indexOf('\n\n');
while (eventBoundary !== -1) {
const eventBlock = buffer.slice(0, eventBoundary);
buffer = buffer.slice(eventBoundary + 2);

// Parse the SSE event block
let eventType: string | null = null;
let eventData: string | null = null;

for (const line of eventBlock.split('\n')) {
// Skip empty lines and comment lines (like ": ping")
if (line === '' || line.startsWith(':')) continue;

if (line.startsWith('event: ')) {
eventType = line.slice(7);
} else if (line.startsWith('data: ')) {
eventData = line.slice(6);
}
}

// Only process datafile events
if (eventType === 'datafile' && eventData) {
const data = JSON.parse(eventData) as BundledDefinitions;
this.definitions = data;
this.hasReceivedData = true;
console.log(process.pid, 'loop → data', data);
this.resolveStreamInitPromise!(data);
}

// Check for more events in the buffer
eventBoundary = buffer.indexOf('\n\n');
}
}

// Only process datafile events
if (eventType === 'datafile' && eventData) {
const data = JSON.parse(eventData) as BundledDefinitions;
this.definitions = data;
console.log(process.pid, 'loop → data', data);
this.resolveStreamInitPromise!(data);
// Stream ended - if not intentional, retry
if (!this.breakLoop) {
console.log(process.pid, 'loop → stream closed, will retry');
}
} catch (error) {
// If we haven't received data and this is the initial connection,
// the error was already propagated via rejectStreamInitPromise
if (!this.hasReceivedData) {
throw error;
}

console.error(process.pid, 'loop → error, will retry', error);
}

// Check for more events in the buffer
eventBoundary = buffer.indexOf('\n\n');
// Retry logic with exponential backoff
if (!this.breakLoop) {
const delay = this.getRetryDelay();
this.retryCount++;
console.log(
process.pid,
`loop → retrying in ${delay}ms (attempt ${this.retryCount})`,
);
await this.sleep(delay);
}
}

Expand All @@ -150,6 +209,10 @@ export class FlagNetworkDataSource implements DataSource {
return { projectId: data.projectId };
}

shutdown(): void {
this.breakLoop = true;
}

// called once per flag rather than once per request,
// but it's okay since we only ever read from memory here
async getData() {
Expand Down
Loading