diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..a3eab28 --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,103 @@ +name: "Build and deploy" +on: + # push: + # branches: + # - master + # - main + workflow_dispatch: + inputs: + cf_env: + description: Choose environment + type: choice + required: true + options: + - dev + - staging + - prod + default: dev + branch: + description: Choose branch or tag, defaults to main + type: string + required: true + default: main + emsdk_version: + description: Emscripten SDK version, defaults to 4.0.20 + type: string + required: false + default: "4.0.20" + preCommand: + description: Provide a bash script to execute before running wrangler + type: string + required: false + default: echo "No script provided for execution before running Wrangler. Moving along." + postCommand: + description: Provide a bash script to execute after running wrangler + type: string + required: false + default: echo "Nothing to execute after running Wrangler. Finishing..." +jobs: + deploy: + runs-on: "ubuntu-latest" + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + ref: ${{ inputs.branch }} + submodules: 'true' + + - name: Set environment variables + run: | + if [[ "${{ inputs.cf_env }}" == "dev" ]]; then + echo "CF_ACCOUNT=DEV_ACCOUNT_ID" >> "$GITHUB_ENV" + echo "CF_TOKEN=JITSI_CF_DEV_TOKEN" >> "$GITHUB_ENV" + elif [[ "${{ inputs.cf_env }}" == "staging" ]]; then + echo "CF_ACCOUNT=STAGE_ACCOUNT_ID" >> "$GITHUB_ENV" + echo "CF_TOKEN=JITSI_CF_STAGING_TOKEN" >> "$GITHUB_ENV" + elif [[ "${{ inputs.cf_env }}" == "prod" ]]; then + echo "CF_ACCOUNT=PROD_ACCOUNT_ID" >> "$GITHUB_ENV" + echo "CF_TOKEN=JITSI_CF_PROD_TOKEN" >> "$GITHUB_ENV" + else + echo "Invalid environment specified: ${{ inputs.cf_env }}, exiting." + exit 1 + fi + + - name: Install Linux deps + run: | + sudo apt update + sudo apt -y install wget unzip + + - uses: actions/setup-node@v4 + with: + node-version: 22 + cache: npm + + - name: Install Node Dependencies + run: | + npm ci + + - name: Setup Emscripten SDK + run: | + wget https://github.com/emscripten-core/emsdk/archive/main.zip + unzip main.zip + cd emsdk-main + ./emsdk install ${{ inputs.emsdk_version }} + ./emsdk activate ${{ inputs.emsdk_version }} + source ./emsdk_env.sh + echo "EMSDK=$EMSDK" >> $GITHUB_ENV + echo "EM_CONFIG=$EM_CONFIG" >> $GITHUB_ENV + echo "$EMSDK:$EMSDK/upstream/emscripten" >> $GITHUB_PATH + + - name: Build TS App + run: | + source "$EMSDK/emsdk_env.sh" + npm run build + + - name: Wrangler Deploy + uses: cloudflare/wrangler-action@v3 + with: + wranglerVersion: "4.51.0" + apiToken: ${{ secrets[env.CF_TOKEN] }} + accountId: ${{ secrets[env.CF_ACCOUNT] }} + preCommands: ${{ inputs.preCommand }} + postCommands: ${{ inputs.postCommand }} + command: deploy diff --git a/Makefile b/Makefile index 5ecd1cd..b3eb956 100644 --- a/Makefile +++ b/Makefile @@ -7,15 +7,17 @@ OPUS_DECODER_DIST=./dist OPUS_DECODER_EMSCRIPTEN_BUILD=$(OPUS_DECODER_BUILD)/EmscriptenWasm.tmp.js OPUS_DECODER_EMSCRIPTEN_WASM=$(OPUS_DECODER_BUILD)/EmscriptenWasm.tmp.wasm +OPUS_DECODER_EMSCRIPTEN_WASM_MAP=$(OPUS_DECODER_BUILD)/EmscriptenWasm.tmp.wasm.map OPUS_DECODER_MODULE=$(OPUS_DECODER_DIST)/opus-decoder.js OPUS_DECODER_WASM=$(OPUS_DECODER_DIST)/opus-decoder.wasm +OPUS_DECODER_WASM_MAP=$(OPUS_DECODER_DIST)/opus-decoder.wasm.map LIBOPUS_SRC=$(OPUS_DECODER_SRC)/opus LIBOPUS_BUILD=$(OPUS_DECODER_BUILD)/build-opus-wasm LIBOPUS_WASM_LIB=$(OPUS_DECODER_BUILD)/libopus.a clean: - rm -rf $(OPUS_DECODER_EMSCRIPTEN_BUILD) $(OPUS_DECODER_EMSCRIPTEN_WASM) $(OPUS_DECODER_MODULE) $(OPUS_DECODER_WASM) $(LIBOPUS_WASM_LIB) + rm -rf $(OPUS_DECODER_EMSCRIPTEN_BUILD) $(OPUS_DECODER_EMSCRIPTEN_WASM) $(OPUS_DECODER_EMSCRIPTEN_WASM_MAP) $(OPUS_DECODER_MODULE) $(OPUS_DECODER_WASM) $(OPUS_DECODER_WASM_MAP) $(LIBOPUS_WASM_LIB) +emmake $(MAKE) -C $(LIBOPUS_BUILD) clean configure: libopus-configure @@ -35,6 +37,10 @@ opus-decoder: opus-wasmlib $(OPUS_DECODER_EMSCRIPTEN_BUILD) else \ echo "Warning: WASM file not found, you may need to adjust emscripten settings"; \ fi + @if [ -f "$(OPUS_DECODER_EMSCRIPTEN_WASM_MAP)" ]; then \ + cp $(OPUS_DECODER_EMSCRIPTEN_WASM_MAP) $(OPUS_DECODER_WASM_MAP); \ + echo "Copied WASM source map to $(OPUS_DECODER_WASM_MAP)"; \ + fi # libopus opus-wasmlib: $(LIBOPUS_WASM_LIB) @@ -44,12 +50,13 @@ define EMCC_OPTS -O2 \ -msimd128 \ --minify 0 \ +-gsource-map \ -s WASM=1 \ -s TEXTDECODER=2 \ -s SINGLE_FILE=0 \ -s MALLOC="emmalloc" \ -s NO_FILESYSTEM=1 \ --s ENVIRONMENT=web \ +-s ENVIRONMENT=node \ -s ASSERTIONS=1 \ -s ABORTING_MALLOC=0 \ -s EXIT_RUNTIME=0 \ diff --git a/package-lock.json b/package-lock.json index 91e61d4..241351e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,7 +11,7 @@ "devDependencies": { "@cloudflare/vitest-pool-workers": "^0.8.19", "@types/node": "^24.7.2", - "prettier": "3.6.2", + "prettier": "^3.6.2", "typescript": "^5.9.3", "vitest": "~3.2.0", "wrangler": "^4.38.0" diff --git a/package.json b/package.json index 3482876..d26c7d6 100644 --- a/package.json +++ b/package.json @@ -18,7 +18,7 @@ "devDependencies": { "@cloudflare/vitest-pool-workers": "^0.8.19", "@types/node": "^24.7.2", - "prettier": "3.6.2", + "prettier": "^3.6.2", "typescript": "^5.9.3", "vitest": "~3.2.0", "wrangler": "^4.38.0" diff --git a/src/MetricCache.ts b/src/MetricCache.ts new file mode 100644 index 0000000..4717642 --- /dev/null +++ b/src/MetricCache.ts @@ -0,0 +1,75 @@ +import { writeMetric, MetricEvent } from './metrics'; + +/** + * Aggregates metric counts and periodically flushes them to Analytics Engine. + * Reduces write frequency by batching metrics over a time interval. + */ +export class MetricCache { + private analytics: AnalyticsEngineDataset | undefined; + private intervalMs: number; + private metrics: Map; + + /** + * @param analytics - The Analytics Engine dataset to write to + * @param intervalMs - Time interval in milliseconds between metric writes (default: 1000ms) + */ + constructor(analytics: AnalyticsEngineDataset | undefined, intervalMs: number = 1000) { + this.analytics = analytics; + this.intervalMs = intervalMs; + this.metrics = new Map(); + } + + /** + * Increments the count for a metric. If the time interval has elapsed since + * the last write, flushes the accumulated count to Analytics Engine. + * + * @param event - The metric event to increment + */ + increment(event: MetricEvent): void { + const key = this.getKey(event); + const now = Date.now(); + const metric = this.metrics.get(key); + + if (!metric) { + // First time seeing this metric + this.metrics.set(key, { event, count: 1, lastWriteTime: now }); + } else { + // Increment existing metric + metric.count++; + + // Check if it's time to flush + if (now - metric.lastWriteTime >= this.intervalMs) { + writeMetric(this.analytics, metric.event, metric.count); + metric.count = 0; + metric.lastWriteTime = now; + } + } + } + + /** + * Flushes all accumulated metrics immediately, regardless of time interval. + * Useful for cleanup on shutdown or before long idle periods. + */ + flush(): void { + for (const [_, metric] of this.metrics) { + if (metric.count > 0) { + writeMetric(this.analytics, metric.event, metric.count); + metric.count = 0; + metric.lastWriteTime = Date.now(); + } + } + } + + /** + * Generates a unique key for a metric event based on its distinguishing properties. + * Does not include sessionId to allow aggregation across sessions. + */ + private getKey(event: MetricEvent): string { + return JSON.stringify({ + name: event.name, + worker: event.worker, + errorType: event.errorType ?? '', + targetName: event.targetName ?? '', + }); + } +} diff --git a/src/OpusDecoder/OpusDecoder.ts b/src/OpusDecoder/OpusDecoder.ts index 7923649..3f37172 100644 --- a/src/OpusDecoder/OpusDecoder.ts +++ b/src/OpusDecoder/OpusDecoder.ts @@ -5,6 +5,14 @@ // submodules and compiled into the dist files, may have different // licensing terms." +// Provide Node.js globals for emscripten module. HACK. +if (typeof globalThis.__filename === 'undefined') { + globalThis.__filename = './opus-decoder.js'; +} +if (typeof globalThis.__dirname === 'undefined') { + globalThis.__dirname = '.'; +} + import OpusDecoderModule from '../../dist/opus-decoder.js'; // @ts-ignore import wasm from '../../dist/opus-decoder.wasm'; @@ -68,22 +76,31 @@ export class OpusDecoder((resolve, reject) => { OpusDecoderModule({ instantiateWasm(info: WebAssembly.Imports, receive: (instance: WebAssembly.Instance) => void) { - let instance = new WebAssembly.Instance(wasm, info); - receive(instance); - return instance.exports; + try { + let instance = new WebAssembly.Instance(wasm, info); + receive(instance); + return instance.exports; + } catch (error) { + reject(error); + throw error; + } }, - }).then((module: any) => { - resolve({ - opus_frame_decoder_create: module._opus_frame_decoder_create, - opus_frame_decoder_destroy: module._opus_frame_decoder_destroy, - opus_frame_decoder_reset: module._opus_frame_decoder_reset, - opus_frame_decode: module._opus_frame_decode, - malloc: module._malloc, - free: module._free, - HEAP: module.wasmMemory.buffer, - module, + }) + .then((module: any) => { + resolve({ + opus_frame_decoder_create: module._opus_frame_decoder_create, + opus_frame_decoder_destroy: module._opus_frame_decoder_destroy, + opus_frame_decoder_reset: module._opus_frame_decoder_reset, + opus_frame_decode: module._opus_frame_decode, + malloc: module._malloc, + free: module._free, + HEAP: module.wasmMemory.buffer, + module, + }); + }) + .catch((error) => { + reject(error); }); - }); }); private _sampleRate: OpusDecoderSampleRate; @@ -98,7 +115,7 @@ export class OpusDecoder; private _output!: TypedArrayAllocation; - private _decoder!: number; + private _decoder: number | undefined; constructor( options: { @@ -136,6 +153,8 @@ export class OpusDecoder { const errors: DecodeError[] = []; + if (this._decoder === undefined) { + this.addError(errors, 'Decoder freed or not initialized', 0, 0, 0, 0); + console.error('Decoder freed or not initialized'); + return { + errors, + pcmData: new Int16Array(0), + channels: this._channels, + samplesDecoded: 0, + sampleRate: this._sampleRate, + } as OpusDecodedAudio; + } + this._input.buf.set(opusFrame); let samplesDecoded = this.wasm.opus_frame_decode( @@ -237,10 +274,23 @@ export class OpusDecoder { + const errors: DecodeError[] = []; + + if (this._decoder === undefined) { + this.addError(errors, 'Decoder freed or not initialized', 0, 0, 0, 0); + console.error('Decoder freed or not initialized'); + return { + errors, + pcmData: new Int16Array(0), + channels: this._channels, + samplesDecoded: 0, + sampleRate: this._sampleRate, + } as OpusDecodedAudio; + } + if (samplesToConceal > this._outputChannelSize) { samplesToConceal = this._outputChannelSize; } - const errors: DecodeError[] = []; let samplesDecoded: number; let inLength: number; if (opusFrame !== undefined) { diff --git a/src/OutgoingConnection.ts b/src/OutgoingConnection.ts index 9e33439..0009b41 100644 --- a/src/OutgoingConnection.ts +++ b/src/OutgoingConnection.ts @@ -1,5 +1,8 @@ import { OpusDecoder } from './OpusDecoder/OpusDecoder'; -import type { TranscriptionMessage } from './transcriberproxy'; +import type { TranscriptionMessage, TranscriberProxyOptions } from './transcriberproxy'; +import { getTurnDetectionConfig } from './utils'; +import { writeMetric } from './metrics'; +import { MetricCache } from './MetricCache'; // Type definition augmentation for Uint8Array - Cloudflare Worker's JS has these methods but TypeScript doesn't have // declarations for them as of version 5.9.3. @@ -90,20 +93,37 @@ export class OutgoingConnection { private lastTranscriptTime?: number = undefined; + // Idle commit timeout - forces transcription when audio stops + private idleCommitTimeout: ReturnType | null = null; + onInterimTranscription?: (message: TranscriptionMessage) => void = undefined; onCompleteTranscription?: (message: TranscriptionMessage) => void = undefined; onClosed?: (tag: string) => void = undefined; + onOpenAIError?: (errorType: string, errorMessage: string) => void = undefined; + onError?: (tag: string, error: any) => void = undefined; + + private env: Env; + private options: TranscriberProxyOptions; + private metricCache: MetricCache; - constructor(tag: string, env: Env) { + constructor(tag: string, env: Env, options: TranscriberProxyOptions) { this.setTag(tag); + this.env = env; + this.options = options; + this.metricCache = new MetricCache(env.METRICS); this.initializeOpusDecoder(); this.initializeOpenAIWebSocket(env); } reset(newTag: string) { + this.metricCache.flush(); if (this.connectionStatus == 'connected') { this.pendingTags.push(newTag); + + const commitMessage = { type: 'input_audio_buffer.commit' }; + this.openaiWebSocket?.send(JSON.stringify(commitMessage)); + const clearMessage = { type: 'input_audio_buffer.clear' }; this.openaiWebSocket?.send(JSON.stringify(clearMessage)); } else { @@ -115,6 +135,10 @@ export class OutgoingConnection { // Reset the pending audio buffer this.pendingAudioFrames = []; this.pendingAudioDataBuffer.resize(0); + + this.lastChunkNo = -1; + this.lastTimestamp = -1; + this.lastOpusFrameSize = -1; } private async initializeOpusDecoder(): Promise { @@ -132,6 +156,8 @@ export class OutgoingConnection { } catch (error) { console.error(`Failed to create Opus decoder for tag ${this._tag}:`, error); this.decoderStatus = 'failed'; + this.doClose(true); + this.onError?.(this._tag, `Error initializing Opus decoder: ${error instanceof Error ? error.message : String(error)}`); } } @@ -147,6 +173,13 @@ export class OutgoingConnection { console.log(`OpenAI WebSocket connected for tag: ${this._tag}`); this.connectionStatus = 'connected'; + const transcriptionConfig: { model: string; language?: string } = { + model: env.OPENAI_MODEL || 'gpt-4o-mini-transcribe', + }; + if (this.options.language !== null) { + transcriptionConfig.language = this.options.language; + } + const sessionConfig = { type: 'session.update', session: { @@ -160,18 +193,11 @@ export class OutgoingConnection { noise_reduction: { type: 'near_field', }, - transcription: { - model: 'gpt-4o-transcribe', - language: 'en', // TODO parameterize this - }, - turn_detection: { - type: 'server_vad', - threshold: 0.5, - prefix_padding_ms: 300, - silence_duration_ms: 500, - }, + transcription: transcriptionConfig, + turn_detection: getTurnDetectionConfig(env), }, }, + include: ['item.input_audio_transcription.logprobs'], }, }; @@ -188,19 +214,34 @@ export class OutgoingConnection { this.handleOpenAIMessage(event.data); }); - openaiWs.addEventListener('error', (error) => { - console.error(`OpenAI WebSocket error for tag ${this._tag}:`, error); + openaiWs.addEventListener('error', (event) => { + // Extract useful info from ErrorEvent (event.message is often empty for WebSocket errors) + const errorMessage = event instanceof ErrorEvent ? event.message || 'WebSocket error' : 'WebSocket error'; + console.error(`OpenAI WebSocket error for tag ${this._tag}: ${errorMessage}`); + writeMetric(this.env.METRICS, { + name: 'openai_api_error', + worker: 'opus-transcriber-proxy', + errorType: 'websocket_error', + }); + this.onOpenAIError?.('websocket_error', 'WebSocket connection error'); this.doClose(true); this.connectionStatus = 'failed'; + this.onError?.(this._tag, `Error connecting to OpenAI service: ${errorMessage}`); }); - openaiWs.addEventListener('close', () => { - console.log(`OpenAI WebSocket closed for tag: ${this._tag}`); + openaiWs.addEventListener('close', (event) => { + console.log(`OpenAI WebSocket closed for tag ${this._tag}: code=${event.code} reason=${event.reason || 'none'} wasClean=${event.wasClean}`); this.doClose(true); this.connectionStatus = 'failed'; }); } catch (error) { console.error(`Failed to create OpenAI WebSocket connection for tag ${this._tag}:`, error); + writeMetric(this.env.METRICS, { + name: 'openai_api_error', + worker: 'opus-transcriber-proxy', + errorType: 'connection_failed', + }); + this.onOpenAIError?.('connection_failed', error instanceof Error ? error.message : 'Unknown error'); this.connectionStatus = 'failed'; } } @@ -225,17 +266,27 @@ export class OutgoingConnection { return; } + this.metricCache.increment({ + name: 'opus_packet_received', + worker: 'opus-transcriber-proxy', + }); + if (Number.isInteger(mediaEvent.media?.chunk) && Number.isInteger(mediaEvent.media.timestamp)) { - if (this.lastChunkNo != -1 && mediaEvent.media.chunk != this.lastChunkNo - 1) { + if (this.lastChunkNo != -1 && mediaEvent.media.chunk != this.lastChunkNo + 1) { const chunkDelta = mediaEvent.media.chunk - this.lastChunkNo; - const timestampDelta = mediaEvent.media.timestamp - this.lastTimestamp; - if (chunkDelta <= 0 || timestampDelta <= 0) { - // Packets reordered, drop this packet + if (chunkDelta <= 0) { + // Packets reordered or replayed, drop this packet + writeMetric(this.env.METRICS, { + name: 'opus_packet_discarded', + worker: 'opus-transcriber-proxy', + }); + return; } // Packets lost, do concealment if (this.decoderStatus == 'ready') { + const timestampDelta = mediaEvent.media.timestamp - this.lastTimestamp; // TODO: enqueue concealment actions? Not sure this is needed in practice. this.doConcealment(opusFrame, chunkDelta, timestampDelta); } @@ -249,6 +300,10 @@ export class OutgoingConnection { } else if (this.decoderStatus === 'pending') { // Queue the binary data until decoder is ready this.pendingOpusFrames.push(opusFrame); + this.metricCache.increment({ + name: 'opus_packet_queued', + worker: 'opus-transcriber-proxy', + }); // console.log(`Queued opus frame for tag: ${this.tag} (queue size: ${this.pendingOpusFrames.length})`); } else { console.log(`Not queueing opus frame for tag: ${this._tag}: decoder ${this.decoderStatus}`); @@ -261,18 +316,39 @@ export class OutgoingConnection { return; } + const lostFrames = chunkDelta - 1; + if (lostFrames <= 0) { + return; + } + if (this.lastOpusFrameSize <= 0) { + // Not sure how we could have gotten here if we've never decoded anything + return; + } + /* Make sure numbers make sense */ - const chunkDeltaInSamples = chunkDelta * this.lastOpusFrameSize; - const timestampDeltaInSamples = (timestampDelta / 48000) * 24000; + const lostFramesInSamples = lostFrames * this.lastOpusFrameSize; + const timestampDeltaInSamples = timestampDelta > 0 ? (timestampDelta / 48000) * 24000 : Infinity; const maxConcealment = 120 * 24; /* 120 ms at 24 kHz */ - const samplesToConceal = Math.min(chunkDeltaInSamples, timestampDeltaInSamples, maxConcealment); + const samplesToConceal = Math.min(lostFramesInSamples, timestampDeltaInSamples, maxConcealment); try { const concealedAudio = this.opusDecoder.conceal(opusFrame, samplesToConceal); - this.sendOrEnqueueDecodedAudio(concealedAudio.pcmData); + if (concealedAudio.errors.length > 0) { + writeMetric(this.env.METRICS, { + name: 'opus_decode_failure', + worker: 'opus-transcriber-proxy', + }); + } else { + this.sendOrEnqueueDecodedAudio(concealedAudio.pcmData); + writeMetric(this.env.METRICS, { + name: 'opus_loss_concealment', + worker: 'opus-transcriber-proxy', + }); + } } catch (error) { console.error(`Error concealing ${samplesToConceal} samples for tag ${this._tag}:`, error); + // Don't call onError for concealment errors, as they may be transient } } @@ -285,6 +361,20 @@ export class OutgoingConnection { try { // Decode the Opus audio data const decodedAudio = this.opusDecoder.decodeFrame(opusFrame); + if (decodedAudio.errors.length > 0) { + console.error(`Opus decoding errors for tag ${this._tag}:`, decodedAudio.errors); + writeMetric(this.env.METRICS, { + name: 'opus_decode_failure', + worker: 'opus-transcriber-proxy', + }); + + // Don't call onError for decoding errors, as they may be transient + return; + } + this.metricCache.increment({ + name: 'opus_packet_decoded', + worker: 'opus-transcriber-proxy', + }); this.lastOpusFrameSize = decodedAudio.samplesDecoded; this.sendOrEnqueueDecodedAudio(decodedAudio.pcmData); } catch (error) { @@ -312,6 +402,10 @@ export class OutgoingConnection { this.pendingAudioDataBuffer.resize(uint8Data.byteLength); this.pendingAudioData.set(uint8Data); } + this.metricCache.increment({ + name: 'openai_audio_queued', + worker: 'opus-transcriber-proxy', + }); } else { console.log(`Not queueing audio data for tag: ${this._tag}: connection ${this.connectionStatus}`); } @@ -347,8 +441,14 @@ export class OutgoingConnection { const audioMessageString = JSON.stringify(audioMessage); this.openaiWebSocket.send(audioMessageString); + this.resetIdleCommitTimeout(); + this.metricCache.increment({ + name: 'openai_audio_sent', + worker: 'opus-transcriber-proxy', + }); } catch (error) { console.error(`Failed to send audio to OpenAI for tag ${this._tag}`, error); + // TODO should this call onError? } } @@ -375,11 +475,24 @@ export class OutgoingConnection { } } - private getTranscriptionMessage(transcript: string, timestamp: number, isInterim: boolean): TranscriptionMessage { + private getTranscriptionMessage( + transcript: string, + confidence: number | undefined, + timestamp: number, + message_id: string, + isInterim: boolean, + ): TranscriptionMessage { const message: TranscriptionMessage = { - transcript: [{ text: transcript }], + transcript: [ + { + ...(confidence !== undefined && { confidence }), + text: transcript, + }, + ], is_interim: isInterim, + message_id, type: 'transcription-result', + event: 'transcription-result', participant: this.participant, timestamp, }; @@ -397,10 +510,11 @@ export class OutgoingConnection { } if (parsedMessage.type === 'conversation.item.input_audio_transcription.delta') { const now = Date.now(); - if (this.lastTranscriptTime !== undefined) { + if (this.lastTranscriptTime === undefined) { this.lastTranscriptTime = now; } - const transcription = this.getTranscriptionMessage(parsedMessage.delta, now, true); + const confidence = parsedMessage.logprobs?.[0]?.logprob !== undefined ? Math.exp(parsedMessage.logprobs[0].logprob) : undefined; + const transcription = this.getTranscriptionMessage(parsedMessage.delta, confidence, now, parsedMessage.item_id, true); this.onInterimTranscription?.(transcription); } else if (parsedMessage.type === 'conversation.item.input_audio_transcription.completed') { let transcriptTime; @@ -410,16 +524,91 @@ export class OutgoingConnection { } else { transcriptTime = Date.now(); } - const transcription = this.getTranscriptionMessage(parsedMessage.transcript, transcriptTime, false); + const confidence = parsedMessage.logprobs?.[0]?.logprob !== undefined ? Math.exp(parsedMessage.logprobs[0].logprob) : undefined; + const transcription = this.getTranscriptionMessage( + parsedMessage.transcript, + confidence, + transcriptTime, + parsedMessage.item_id, + false, + ); + this.clearIdleCommitTimeout(); this.onCompleteTranscription?.(transcription); + } else if (parsedMessage.type === 'conversation.item.input_audio_transcription.failed') { + console.error(`OpenAI failed to transcribe audio for tag ${this._tag}: ${data}`); + writeMetric(this.env.METRICS, { + name: 'transcription_failure', + worker: 'opus-transcriber-proxy', + }); } else if (parsedMessage.type === 'input_audio_buffer.cleared') { // Reset completed - this.setTag(this.pendingTags.shift()!); + const nextTag = this.pendingTags.shift(); + if (nextTag !== undefined) { + this.setTag(nextTag); + } else { + console.error('Received cleared event but no pending tag available.'); + } } else if (parsedMessage.type === 'error') { - console.error(`OpenAI sent error message for ${this._tag}: ${parsedMessage}`); + if (parsedMessage.error?.type === 'invalid_request_error' && parsedMessage.error?.code === 'input_audio_buffer_commit_empty') { + // This error indicates that we tried to commit an empty audio buffer, which can happen + // if the VAD detected speech stopped just before we did a manual commit. Ignore. + // TODO should we log this at all? + console.log(`OpenAI reported empty audio buffer commit for ${this._tag}, ignoring.`); + return; + } + console.error(`OpenAI sent error message for ${this._tag}: ${data}`); + writeMetric(this.env.METRICS, { + name: 'openai_api_error', + worker: 'opus-transcriber-proxy', + errorType: 'api_error', + }); + this.onOpenAIError?.('api_error', parsedMessage.error?.message || data); this.doClose(true); + this.onError?.(this._tag, `OpenAI service sent error message: ${data}`); + } else if ( + parsedMessage.type !== 'session.created' && + parsedMessage.type !== 'session.updated' && + parsedMessage.type !== 'input_audio_buffer.committed' && + parsedMessage.type !== 'input_audio_buffer.speech_started' && + parsedMessage.type !== 'input_audio_buffer.speech_stopped' && + parsedMessage.type !== 'conversation.item.added' && + parsedMessage.type !== 'conversation.item.done' + ) { + // Log unexpected message types that might indicate issues + console.warn(`Unhandled OpenAI message type for ${this._tag}: ${parsedMessage.type}`); } - // TODO: are there any other messages we care about? + } + + private resetIdleCommitTimeout(): void { + this.clearIdleCommitTimeout(); + + const timeoutSeconds = parseInt(this.env.FORCE_COMMIT_TIMEOUT || '0', 10); + if (timeoutSeconds <= 0) { + return; + } + + this.idleCommitTimeout = setTimeout(() => { + this.forceCommit(); + }, timeoutSeconds * 1000); + } + + private clearIdleCommitTimeout(): void { + if (this.idleCommitTimeout !== null) { + clearTimeout(this.idleCommitTimeout); + this.idleCommitTimeout = null; + } + } + + private forceCommit(): void { + if (this.connectionStatus !== 'connected' || !this.openaiWebSocket) { + return; + } + + console.log(`Forcing commit for idle connection ${this._tag}`); + this.metricCache.flush(); + const commitMessage = { type: 'input_audio_buffer.commit' }; + this.openaiWebSocket.send(JSON.stringify(commitMessage)); + this.idleCommitTimeout = null; } close(): void { @@ -427,10 +616,16 @@ export class OutgoingConnection { } private doClose(notify: boolean): void { + this.clearIdleCommitTimeout(); + this.metricCache.flush(); this.opusDecoder?.free(); - this.openaiWebSocket?.close(); + this.opusDecoder = undefined; this.decoderStatus = 'closed'; + + this.openaiWebSocket?.close(); + this.openaiWebSocket = undefined; this.connectionStatus = 'closed'; + if (notify) { this.onClosed?.(this._tag); } diff --git a/src/index.ts b/src/index.ts index 1db7e82..39fabb4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,6 +2,7 @@ import { extractSessionParameters } from './utils'; import { TranscriberProxy, type TranscriptionMessage } from './transcriberproxy'; import { Transcriptionator } from './transcriptionator'; import { WorkerEntrypoint } from 'cloudflare:workers'; +import { writeMetric } from './metrics'; export interface DispatcherTranscriptionMessage { sessionId: string; @@ -37,14 +38,14 @@ export default { const parameters = extractSessionParameters(request.url); console.log('Session parameters:', JSON.stringify(parameters)); - const { url, sessionId, transcribe, connect, useTranscriptionator, useDispatcher, sendBack } = parameters; + const { url, sessionId, transcribe, connect, useTranscriptionator, useDispatcher, sendBack, sendBackInterim, language } = parameters; if (!url.pathname.endsWith('/events') && !url.pathname.endsWith('/transcribe')) { return new Response('Bad URL', { status: 400 }); } if (transcribe) { - if (!useTranscriptionator && !useDispatcher && !sendBack && !connect) { + if (!useTranscriptionator && !useDispatcher && !sendBack && !sendBackInterim && !connect) { return new Response('No transcription output method specified', { status: 400 }); } @@ -53,7 +54,7 @@ export default { server.accept(); - const session = new TranscriberProxy(server, env); + const session = new TranscriberProxy(server, env, { language }); let outbound: WebSocket | undefined; let transcriptionator: DurableObjectStub | undefined; @@ -61,7 +62,7 @@ export default { if (connect) { try { - const outbound = new WebSocket(connect, ['transcription']); + outbound = new WebSocket(connect, ['transcription']); // TODO: pass auth info to this websocket outbound.addEventListener('close', () => { @@ -98,7 +99,22 @@ export default { server.close(); }); - if (outbound || transcriptionator || sendBack) { + session.on('error', (tag, error) => { + try { + const message = `Error in session ${tag}: ${error instanceof Error ? error.message : String(error)}`; + console.error(message); + outbound?.close(1001, message); + transcriptionator?.notifySessionClosed(); + server.close(1011, message); + } catch (closeError) { + // Error handlers do not themselves catch errors, so log to console + console.error( + `Failed to close connections after error in session ${tag}: ${closeError instanceof Error ? closeError.message : String(closeError)}`, + ); + } + }); + + if (outbound || transcriptionator || sendBackInterim) { session.on('interim_transcription', (data: TranscriptionMessage) => { const message = JSON.stringify(data); outbound?.send(message); @@ -110,6 +126,13 @@ export default { } session.on('transcription', (data: TranscriptionMessage) => { + // Track successful transcription + writeMetric(env.METRICS, { + name: 'transcription_success', + worker: 'opus-transcriber-proxy', + sessionId: sessionId ?? undefined, + }); + const message = outbound || transcriptionator || sendBack ? JSON.stringify(data) : ''; outbound?.send(message); transcriptionator?.broadcastMessage(message); @@ -124,23 +147,25 @@ export default { text: data.transcript.map((t) => t.text).join(' '), timestamp: data.timestamp, }; - ctx.waitUntil( - dispatcher - ?.dispatch(dispatcherMessage) - .then((response) => { - if (!response.success || response.errors) { - console.error('Dispatcher error:', { - message: response.message, - errors: response.errors, - dispatcherMessage, - }); - } - }) - .catch((error) => { - const message = error instanceof Error ? error.message : String(error); - console.error('Dispatcher RPC failed:', message, dispatcherMessage); - }), - ); + // Note: We intentionally don't use ctx.waitUntil() here because the + // ExecutionContext from the initial WebSocket upgrade request becomes + // stale after the response is sent. Using it would cause "IoContext + // timed out due to inactivity" errors when transcription events fire. + dispatcher + ?.dispatch(dispatcherMessage) + .then((response) => { + if (!response.success || response.errors) { + console.error('Dispatcher error:', { + message: response.message, + errors: response.errors, + dispatcherMessage, + }); + } + }) + .catch((error) => { + const message = error instanceof Error ? error.message : String(error); + console.error('Dispatcher RPC failed:', message, dispatcherMessage); + }); } }); diff --git a/src/metrics.ts b/src/metrics.ts new file mode 100644 index 0000000..2ed99e0 --- /dev/null +++ b/src/metrics.ts @@ -0,0 +1,59 @@ +/** + * Metrics service for writing to Cloudflare Analytics Engine + * Provides consistent metric structure across the transcription pipeline + * + * Note: Environment is not tracked since each Cloudflare account represents + * a separate environment (dev, staging, prod). + */ + +export type MetricName = + | 'ingester_success' + | 'ingester_failure' + | 'dispatcher_success' + | 'dispatcher_failure' + | 'transcription_success' + | 'transcription_failure' + | 'opus_packet_received' + | 'opus_packet_queued' + | 'opus_loss_concealment' + | 'opus_packet_decoded' + | 'opus_decode_failure' + | 'opus_packet_discarded' + | 'openai_audio_queued' + | 'openai_audio_sent' + | 'openai_api_error'; + +export interface MetricEvent { + name: MetricName; + worker: 'webhook-ingester' | 'transcription-dispatcher' | 'opus-transcriber-proxy'; + errorType?: string; + sessionId?: string; + targetName?: string; + latencyMs?: number; +} + +/** + * Writes a metric data point to Analytics Engine + * + * Schema: + * - blob1: metric_name (e.g., 'ingester_success', 'transcription_failure') + * - blob2: worker_name (e.g., 'webhook-ingester') + * - blob3: error_type (optional, for failures) + * - blob4: session_id (optional, for correlation) + * - blob5: target_name (optional, for dispatcher) + * - double1: count (default 1) + * - double2: latency_ms (optional) + * - index1: session_id (for sampling) + */ +export function writeMetric(analytics: AnalyticsEngineDataset | undefined, event: MetricEvent, count: number = 1): void { + if (!analytics) { + console.warn('Analytics Engine not configured, skipping metric:', event.name); + return; + } + + analytics.writeDataPoint({ + blobs: [event.name, event.worker, event.errorType ?? '', event.sessionId ?? '', event.targetName ?? ''], + doubles: [count, event.latencyMs ?? 0], + indexes: [event.sessionId ?? ''], + }); +} diff --git a/src/transcriberproxy.ts b/src/transcriberproxy.ts index 9a42d4d..49b2cb5 100644 --- a/src/transcriberproxy.ts +++ b/src/transcriberproxy.ts @@ -2,13 +2,20 @@ import { OutgoingConnection } from './OutgoingConnection'; import { EventEmitter } from 'node:events'; export interface TranscriptionMessage { - transcript: Array<{ text: string }>; + transcript: Array<{ confidence?: number; text: string }>; is_interim: boolean; + language?: string; + message_id: string; type: 'transcription-result'; + event: 'transcription-result'; participant: { id: string; ssrc?: string }; timestamp: number; } +export interface TranscriberProxyOptions { + language: string | null; +} + export class TranscriberProxy extends EventEmitter { private readonly ws: WebSocket; private outgoingConnections: Map; @@ -18,11 +25,13 @@ export class TranscriberProxy extends EventEmitter { // three concurrent speakers. private MAX_OUTGOING_CONNECTIONS = 4; private env: Env; + private options: TranscriberProxyOptions; - constructor(ws: WebSocket, env: Env) { - super(); + constructor(ws: WebSocket, env: Env, options: TranscriberProxyOptions) { + super({ captureRejections: true }); this.ws = ws; this.env = env; + this.options = options; this.outgoingConnections = new Map(); this.ws.addEventListener('close', () => { @@ -38,8 +47,14 @@ export class TranscriberProxy extends EventEmitter { console.error('Failed to parse message as JSON:', parseError); parsedMessage = { raw: event.data, parseError: true }; } - // TODO: are there any other events that need to be handled? - if (parsedMessage && parsedMessage.event === 'media') { + + if (parsedMessage && parsedMessage.event === 'ping') { + const pongMessage: { event: string; id?: number } = { event: 'pong' }; + if (typeof parsedMessage.id === 'number') { + pongMessage.id = parsedMessage.id; + } + this.ws.send(JSON.stringify(pongMessage)); + } else if (parsedMessage && parsedMessage.event === 'media') { this.handleMediaEvent(parsedMessage); } }); @@ -62,7 +77,7 @@ export class TranscriberProxy extends EventEmitter { } if (this.outgoingConnections.size < this.MAX_OUTGOING_CONNECTIONS) { - const newConnection = new OutgoingConnection(tag, this.env); + const newConnection = new OutgoingConnection(tag, this.env, this.options); newConnection.onInterimTranscription = (message) => { this.emit('interim_transcription', message); @@ -73,6 +88,9 @@ export class TranscriberProxy extends EventEmitter { newConnection.onClosed = (tag) => { this.outgoingConnections.delete(tag); }; + newConnection.onError = (tag, error) => { + this.emit('error', tag, error); + }; this.outgoingConnections.set(tag, newConnection); console.log(`Created outgoing connection entry for tag: ${tag}`); diff --git a/src/transcriptionator.ts b/src/transcriptionator.ts index da5fd2a..65eb416 100644 --- a/src/transcriptionator.ts +++ b/src/transcriptionator.ts @@ -22,7 +22,11 @@ export class Transcriptionator extends DurableObject { this.observers.add(server); server.addEventListener('close', () => { this.observers.delete(server); - server.close(); + }); + + server.addEventListener('error', () => { + this.observers.delete(server); + console.error(`Observer connection closed with error.`); }); return new Response(null, { diff --git a/src/utils.ts b/src/utils.ts index 15a43a3..d9907e5 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -5,7 +5,9 @@ export interface ISessionParameters { connect: string | null; useTranscriptionator: boolean; useDispatcher: boolean; - sendBack?: boolean; + sendBack: boolean; + sendBackInterim: boolean; + language: string | null; } export function extractSessionParameters(url: string): ISessionParameters { @@ -16,14 +18,49 @@ export function extractSessionParameters(url: string): ISessionParameters { const useTranscriptionator = parsedUrl.searchParams.get('useTranscriptionator'); const useDispatcher = parsedUrl.searchParams.get('useDispatcher'); const sendBack = parsedUrl.searchParams.get('sendBack'); + const sendBackInterim = parsedUrl.searchParams.get('sendBackInterim'); + const lang = parsedUrl.searchParams.get('lang'); return { url: parsedUrl, sessionId, transcribe, connect, - useTranscriptionator: !!useTranscriptionator, - useDispatcher: !!useDispatcher, - sendBack: !!sendBack, + useTranscriptionator: useTranscriptionator === 'true', + useDispatcher: useDispatcher === 'true', + sendBack: sendBack === 'true', + sendBackInterim: sendBackInterim === 'true', + language: lang, }; } + +export function getTurnDetectionConfig(env: Env) { + const defaultTurnDetection = { + type: 'server_vad', + threshold: 0.85, + prefix_padding_ms: 300, + silence_duration_ms: 300, + }; + + let turnDetection = defaultTurnDetection; + + if (env.OPENAI_TURN_DETECTION) { + if (typeof env.OPENAI_TURN_DETECTION === 'string') { + try { + turnDetection = JSON.parse(env.OPENAI_TURN_DETECTION); + } catch (error) { + console.warn(`Invalid OPENAI_TURN_DETECTION JSON, using defaults: ${error}`); + return defaultTurnDetection; + } + } + // JSON object from CF + turnDetection = env.OPENAI_TURN_DETECTION; + } + + if (typeof turnDetection !== 'object' || typeof turnDetection.type !== 'string') { + console.warn(`Invalid OPENAI_TURN_DETECTION JSON, using defaults`); + return defaultTurnDetection; + } + + return turnDetection; +} diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index f419889..fff7341 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -8,8 +8,13 @@ declare namespace Cloudflare { } interface Env { OPENAI_API_KEY: string; + OPENAI_MODEL?: string; + OPENAI_TURN_DETECTION?: any; + DEBUG?: string; + FORCE_COMMIT_TIMEOUT?: string; TRANSCRIPTIONATOR: DurableObjectNamespace; TRANSCRIPTION_DISPATCHER: Fetcher /* transcription-dispatcher */; + METRICS?: AnalyticsEngineDataset; } } interface Env extends Cloudflare.Env {} diff --git a/wrangler.jsonc b/wrangler.jsonc index e3b60a3..61fb071 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -13,6 +13,8 @@ "observability": { "enabled": true }, + "logpush": true, + "workers_dev": false, /** * Smart Placement * Docs: https://developers.cloudflare.com/workers/configuration/smart-placement/#smart-placement @@ -28,7 +30,13 @@ * Environment Variables * https://developers.cloudflare.com/workers/wrangler/configuration/#environment-variables */ - // "vars": { "MY_VARIABLE": "production_value" } + "vars": { + /** + After how many seconds of no received audio and no final transcription + a flush is triggered. + **/ + "FORCE_COMMIT_TIMEOUT": "2" + }, /** * Note: Use secrets to store sensitive data. * https://developers.cloudflare.com/workers/configuration/secrets/ @@ -43,6 +51,16 @@ * https://developers.cloudflare.com/workers/wrangler/configuration/#service-bindings */ "services": [{ "binding": "TRANSCRIPTION_DISPATCHER", "service": "transcription-dispatcher" }], + "limits": { + "cpu_ms": 300000, + }, + /** + * Analytics Engine binding for transcription metrics + * https://developers.cloudflare.com/analytics/analytics-engine/get-started/ + */ + "analytics_engine_datasets": [ + { "binding": "METRICS", "dataset": "transcription-metrics" } + ], "durable_objects": { "bindings": [ {