Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
f882b5e
feat: add chat server client and API client for realtime session sync…
janole Sep 24, 2025
266daca
feat: add onlineMode state and display it in chat UI
janole Sep 24, 2025
f987372
refactor: replace session.flush() with startChatServerClient(session)…
janole Sep 24, 2025
a66f112
feat: add optional onlineMode property to ChatSession and include it …
janole Sep 24, 2025
76049c3
refactor: simplify addError method and update its parameters in sessi…
janole Sep 24, 2025
00b8b91
refactor: remove unused addError method from ChatSession class
janole Sep 24, 2025
874b2bc
refactor: rename error parameters and improve error handling signatur…
janole Sep 24, 2025
9667104
refactor: update handleError to use session.setMessages with ErrorMes…
janole Sep 24, 2025
8745dff
feat: add debug mode to ChatServerClient for conditional error messag…
janole Sep 24, 2025
1ec7552
fix: pass debug option to startChatServerClient and clean export form…
janole Sep 24, 2025
cd2cb23
refactor: use strict inequality and clean up trailing commas in exports
janole Sep 24, 2025
fc12954
chore: update package-lock.json dependencies and remove optional @sup…
janole Sep 24, 2025
e2ea1bb
chore: update package-lock.json to add optional Supabase dependencies…
janole Oct 2, 2025
60dfd05
fix: remove unnecessary await from channel subscription call in api-c…
janole Oct 2, 2025
3a0a35e
chore: update package-lock.json with version bump and dependency chan…
janole Oct 2, 2025
27a2a29
chore: remove unnecessary blank line in utils.ts
janole Jan 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
469 changes: 285 additions & 184 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@
"ulid": "^3.0.1",
"write-file-atomic": "^6.0.0"
},
"optionalDependencies": {
"@supabase/supabase-js": "^2.57.4"
},
"devDependencies": {
"@sindresorhus/tsconfig": "^7.0.0",
"@stylistic/eslint-plugin": "^5.1.0",
Expand Down
354 changes: 354 additions & 0 deletions src/ai/session/chat-server/api-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
// A simple, configurable client library for the documents and links APIs.

import type { REALTIME_SUBSCRIBE_STATES, RealtimeChannel, SupabaseClient } from "@supabase/supabase-js";

import tryCatch from "../../../utils/try-catch.js";

export interface IAuthData
{
user_id: string;
access_token: string;
expires_at: number;
api_url: string;
api_key: string;
channel: string;
}

export type IClientStatusOptional =
| string
| number
| boolean
| null
| undefined
| { [key: string]: IClientStatusOptional | undefined }
| IClientStatusOptional[];

export interface IClientStatus
{
type: "consumer-client" | "super-client";
user_id: string;
external_id: string;
status: "idle" | "working";
optional?: { [key: string]: IClientStatusOptional };
}

export interface IApiClientOptions
{
baseUrl?: string;
token?: string;
}

export interface IApiClientCommandListener
{
handleCommand: (payload: any) => void;
handleConnection: (connected: boolean) => void;
handleError: (message: string, level?: "debug" | "warn" | "error", error?: Error) => void;
}

class ApiClient
{
private baseUrl: string;
private token?: string;

private authData?: IAuthData;

private supabase?: SupabaseClient;
private channel?: RealtimeChannel;

private commandListeners: IApiClientCommandListener[] = [];

private _initPromise?: Promise<void>;

// Reconnection/backoff state
private reconnectAttempt = 0;
private reconnectTimer?: NodeJS.Timeout;

constructor(options: IApiClientOptions = {})
{
this.baseUrl = options.baseUrl || "";
this.token = options.token;
}

private async _request<T>(path: string, options: RequestInit): Promise<T>
{
const url = new URL(path, this.baseUrl).href;
const headers = new Headers(options.headers || {});
headers.set("Content-Type", "application/json");

if (this.token)
{
headers.set("Authorization", `Bearer ${this.token}`);
}

const response = await fetch(url, { ...options, headers });

if (!response.ok)
{
const errorBody = await response.json().catch(() => ({ error: `Request failed with status ${response.status}` }));
throw new Error(errorBody.error || "An unknown error occurred");
}

const text = await response.text();
return text ? JSON.parse(text) as T : {} as T;
}

// --- ----
addCommandListener(listener: IApiClientCommandListener)
{
if (!this.commandListeners.includes(listener))
{
this.commandListeners.push(listener);
}
};

removeCommandListener(f: IApiClientCommandListener)
{
const i = this.commandListeners.indexOf(f);

if (i !== -1)
{
this.commandListeners.splice(i, 1);
}
};

private pushCommand(payload: any)
{
this.commandListeners.forEach(listener => tryCatch(() => listener.handleCommand(payload)));
}

private pushConnection(connected: boolean)
{
this.commandListeners.forEach(listener => tryCatch(() => listener.handleConnection(connected)));
}

private pushError(message: string, level: "debug" | "warn" | "error" = "error", error?: Error)
{
this.commandListeners.forEach(listener => tryCatch(() => listener.handleError(message, level, error)));
}

// --- Auth API ---

private getAuthData(): Promise<IAuthData>
{
return this._request<IAuthData>("api/auth/access-token", { method: "POST" });
}

// --- Realtime API ---

private getAccessToken = async () =>
{
if (!this.supabase || (this.authData?.expires_at || 0) > Date.now() / 1000 + 60)
{
return this.authData?.access_token || null;
}

const authData = await this.getAuthData();
this.authData = authData;

if (this.supabase?.realtime)
{
this.supabase.realtime.setAuth(authData.access_token);
}

return this.authData.access_token;
};

private createSupabaseClient = async () =>
{
this.authData = await this.getAuthData();

this.pushError("AUTHDATA\n\n" + JSON.stringify(this.authData, null, 3), "debug");

const { createClient } = await import("@supabase/supabase-js");

const supabase = createClient(this.authData.api_url, this.authData.api_key, {
accessToken: this.getAccessToken,
});

supabase.realtime.setAuth(this.authData.access_token);

this.supabase = supabase;

this.channel = supabase.channel(this.authData.channel)
.on(
"postgres_changes",
{
event: "*",
schema: "public",
table: "documents",
filter: `user_id=eq.${this.authData.user_id}`,
},
(payload: any) => this.pushCommand(payload),
)
// .on("broadcast", { event: "command" }, (args: any) => this.pushCommand({ args }))
.on("presence", { event: "sync" }, () => this.pushCommand({ presenceState: this.channel?.presenceState() }))
.on("presence", { event: "join" }, (args: any) => this.pushCommand({ args, presenceState: this.channel?.presenceState() }))
.on("presence", { event: "leave" }, (args: any) => this.pushCommand({ args, presenceState: this.channel?.presenceState() }));

let closed = false;

this.channel.subscribe((status: REALTIME_SUBSCRIBE_STATES, err) =>
{
if (closed)
{
// console.error("CHANNEL-ERROR: already closed!");
return;
}

if (["CHANNEL_ERROR", "TIMED_OUT", "CLOSED"].includes(status))
{
closed = true;

if (err)
{
this.pushError(err.message);
}

this.pushConnection(false);

this.closeSupabaseClient().then(() => this.scheduleReconnect(status));
}
else if (status === "SUBSCRIBED")
{
this.clearReconnectTimer();
this.reconnectAttempt = 0;
this.pushConnection(true);
}
});

return this.supabase;
};

private clearReconnectTimer()
{
if (this.reconnectTimer)
{
clearTimeout(this.reconnectTimer);
this.reconnectTimer = undefined;
}
}

private scheduleReconnect(reason?: string)
{
// If the client is already connected or a reconnect is pending, do nothing.
if (this.supabase || this.reconnectTimer)
{
return;
}

const attempt = ++this.reconnectAttempt;
const base = 1000; // 1s
const max = 30000; // 30s cap
const delay = Math.min(max, base * Math.pow(2, attempt - 1));
const jitter = Math.floor(Math.random() * 250);
const totalDelay = delay + jitter;

this.pushError(`Scheduling realtime reconnect in ${totalDelay}ms (attempt ${attempt}${reason ? ", reason: " + reason : ""})`, "debug");

this.reconnectTimer = setTimeout(async () =>
{
this.reconnectTimer = undefined;
try
{
await this.ensureSupabaseClient();
}
catch (e: any)
{
this.pushError("Reconnect failed.", "error", e);

// Schedule next attempt if still not connected
if (!this.supabase)
{
this.scheduleReconnect("ensureSupabaseClient failed");
}
}
}, totalDelay);
}

private closeSupabaseClient = async (): Promise<void> =>
{
if (!this.channel && !this.supabase && !this._initPromise)
{
return;
}

// Cancel any pending reconnect when we are explicitly closing
this.clearReconnectTimer();

await this.channel?.unsubscribe();
this.channel = undefined;

await this.supabase?.removeAllChannels();
await this.supabase?.realtime?.disconnect();

this.supabase = undefined;
this.authData = undefined;
};

private async ensureSupabaseClient()
{
if (this.supabase)
{
return;
}

if (!this._initPromise)
{
this._initPromise = this.createSupabaseClient()
.then((_supabase) =>
{
// supabase should be ready now!
})
.catch(error =>
{
this.pushError("Failed to create supabase client", "error", error);

// If creation fails before subscription, schedule a reconnect
this.scheduleReconnect("init-failed");
})
.finally(() =>
{
this._initPromise = undefined;
});
}

await this._initPromise;
}

async start()
{
await this.ensureSupabaseClient();
}

async setStatus(status: Pick<IClientStatus, "external_id" | "status" | "optional">)
{
await this.ensureSupabaseClient();

if (this.authData?.user_id)
{
await this.channel?.track({
user_id: this.authData!.user_id,
type: "super-client",
external_id: status.external_id,
status: status.status,
optional: status.optional,
} satisfies IClientStatus);
}
}

// --- Direct Supabase API ---

async directUpsertDocument(externalId: string, documentData: { data: Record<string, any>, schema_id?: string | null })
{
const rpcParams = {
p_external_id: externalId,
p_data: documentData.data,
p_schema_id: documentData.schema_id,
} as const;

await this.ensureSupabaseClient();
await this.supabase?.rpc("upsert_document", rpcParams).select().single();
}
}

export { ApiClient };
Loading