diff --git a/pom.xml b/pom.xml index be72cf7..3816c7a 100644 --- a/pom.xml +++ b/pom.xml @@ -19,6 +19,13 @@ + + + com.deepgram + deepgram-java-sdk + 0.2.0 + + io.javalin @@ -37,14 +44,7 @@ com.fasterxml.jackson.dataformat jackson-dataformat-toml - 2.18.2 - - - - - com.fasterxml.jackson.core - jackson-databind - 2.18.2 + 2.18.6 @@ -67,13 +67,6 @@ kotlin-stdlib 1.9.25 - - - - org.eclipse.jetty.websocket - websocket-jetty-client - 11.0.24 - diff --git a/src/main/java/com/deepgram/starter/App.java b/src/main/java/com/deepgram/starter/App.java index 8b61e34..1739333 100644 --- a/src/main/java/com/deepgram/starter/App.java +++ b/src/main/java/com/deepgram/starter/App.java @@ -1,15 +1,16 @@ /** * Java Flux Starter - Backend Server * - * Simple WebSocket proxy to Deepgram's Flux API using Javalin 6.4.0. - * Forwards all messages (JSON and binary) bidirectionally between client and Deepgram. + * Simple WebSocket proxy to Deepgram's Flux API using the Deepgram Java SDK. + * Forwards audio from the browser to the SDK's V2 WebSocket client, and + * forwards transcription events back to the browser. * * Key Features: - * - WebSocket proxy endpoint: /api/flux -> Deepgram wss://api.deepgram.com/v2/listen + * - WebSocket proxy endpoint: /api/flux -> Deepgram V2 Listen WebSocket * - JWT session auth via access_token. subprotocol * - Session endpoint: GET /api/session * - Metadata endpoint: GET /api/metadata - * - No Deepgram SDK -- direct WebSocket connections via Jetty WebSocket client + * - Uses Deepgram Java SDK for WebSocket connection management */ package com.deepgram.starter; @@ -31,24 +32,19 @@ import io.javalin.websocket.WsConfig; import io.javalin.websocket.WsContext; -import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.StatusCode; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; -import org.eclipse.jetty.websocket.api.annotations.WebSocket; -import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; -import org.eclipse.jetty.websocket.client.WebSocketClient; +import com.deepgram.DeepgramClient; +import com.deepgram.resources.listen.v2.websocket.V2WebSocketClient; +import com.deepgram.resources.listen.v2.websocket.V2ConnectOptions; +import com.deepgram.types.ListenV2Encoding; +import com.deepgram.types.ListenV2SampleRate; + +import okio.ByteString; import java.io.File; -import java.net.URI; -import java.nio.ByteBuffer; import java.security.SecureRandom; import java.time.Instant; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; // ============================================================================ @@ -80,15 +76,6 @@ public class App { /** Server host, configurable via HOST env var (default 0.0.0.0) */ private static final String HOST = getEnv("HOST", "0.0.0.0"); - /** Deepgram Flux WebSocket URL (v2 endpoint) */ - private static final String DEEPGRAM_FLUX_URL = "wss://api.deepgram.com/v2/listen"; - - /** - * Reserved WebSocket close codes that cannot be set by applications. - * If Deepgram sends one of these, we fall back to 1000 (normal closure). - */ - private static final Set RESERVED_CLOSE_CODES = Set.of(1004, 1005, 1006, 1015); - // ======================================================================== // SECTION 4: SESSION AUTH - JWT tokens for production security // ======================================================================== @@ -117,6 +104,7 @@ private static String initSessionSecret() { if (secret != null && !secret.isEmpty()) { return secret; } + // Generate random 32 bytes as hex byte[] bytes = new byte[32]; new SecureRandom().nextBytes(bytes); StringBuilder hex = new StringBuilder(); @@ -139,28 +127,25 @@ private static String createSessionToken() { } /** - * Validates JWT from WebSocket subprotocol: access_token. - * Returns the full protocol string if valid, null if invalid. + * Validates a JWT token from WebSocket subprotocol header. + * The client sends "access_token." as a subprotocol. * - * @param protocols The Sec-WebSocket-Protocol header value - * @return The matching protocol string, or null + * @param subprotocols The list of subprotocol strings from the upgrade request + * @return true if token is valid, false otherwise */ - private static String validateWsToken(String protocols) { - if (protocols == null || protocols.isEmpty()) return null; - String[] list = protocols.split(","); - for (String proto : list) { - String trimmed = proto.trim(); - if (trimmed.startsWith("access_token.")) { - String token = trimmed.substring("access_token.".length()); + private static boolean validateWsToken(List subprotocols) { + for (String proto : subprotocols) { + if (proto.startsWith("access_token.")) { + String token = proto.substring("access_token.".length()); try { jwtVerifier.verify(token); - return trimmed; + return true; } catch (JWTVerificationException e) { - return null; + return false; } } } - return null; + return false; } // ======================================================================== @@ -198,17 +183,7 @@ private static String loadApiKey() { } // ======================================================================== - // SECTION 6: SETUP - Track connections and Jetty WebSocket client - // ======================================================================== - - /** Track all active client WebSocket contexts for graceful shutdown */ - private static final Set activeConnections = ConcurrentHashMap.newKeySet(); - - /** Jetty WebSocket client for outbound connections to Deepgram */ - private static WebSocketClient wsClient; - - // ======================================================================== - // SECTION 7: HELPER FUNCTIONS + // SECTION 6: HELPER FUNCTIONS // ======================================================================== /** @@ -236,73 +211,13 @@ private static String getEnv(String key, String defaultValue) { return defaultValue; } - /** - * Returns a safe close code that can be sent over WebSocket. - * Reserved codes (1004, 1005, 1006, 1015) are mapped to 1000 (normal closure). - * - * @param code The close code to check - * @return A safe close code - */ - private static int getSafeCloseCode(int code) { - if (code >= 1000 && code <= 4999 && !RESERVED_CLOSE_CODES.contains(code)) { - return code; - } - return 1000; - } - - /** - * Builds the Deepgram Flux WebSocket URL with query parameters forwarded from the client. - * - * @param ctx The client WebSocket context - * @return The fully constructed Deepgram URL string - */ - private static String buildDeepgramUrl(WsContext ctx) { - String model = ctx.queryParam("model"); - if (model == null || model.isEmpty()) model = "flux-general-en"; - - String sampleRate = ctx.queryParam("sample_rate"); - if (sampleRate == null || sampleRate.isEmpty()) sampleRate = "16000"; - - String encoding = ctx.queryParam("encoding"); - if (encoding == null || encoding.isEmpty()) encoding = "linear16"; - - String channels = ctx.queryParam("channels"); - if (channels == null || channels.isEmpty()) channels = "1"; - - StringBuilder url = new StringBuilder(DEEPGRAM_FLUX_URL); - url.append("?model=").append(model); - url.append("&sample_rate=").append(sampleRate); - url.append("&encoding=").append(encoding); - url.append("&channels=").append(channels); - - // Forward optional parameters - String eotThreshold = ctx.queryParam("eot_threshold"); - if (eotThreshold != null && !eotThreshold.isEmpty()) { - url.append("&eot_threshold=").append(eotThreshold); - } - - String eagerEotThreshold = ctx.queryParam("eager_eot_threshold"); - if (eagerEotThreshold != null && !eagerEotThreshold.isEmpty()) { - url.append("&eager_eot_threshold=").append(eagerEotThreshold); - } - - String eotTimeoutMs = ctx.queryParam("eot_timeout_ms"); - if (eotTimeoutMs != null && !eotTimeoutMs.isEmpty()) { - url.append("&eot_timeout_ms=").append(eotTimeoutMs); - } - - // Handle keyterm: can appear multiple times, forward each one - List keyterms = ctx.queryParams("keyterm"); - if (keyterms != null) { - for (String term : keyterms) { - if (term != null && !term.isEmpty()) { - url.append("&keyterm=").append(term); - } - } - } + // ======================================================================== + // SECTION 7: TRACK ACTIVE WEBSOCKET CONNECTIONS + // ======================================================================== - return url.toString(); - } + /** Map of browser WsContext -> SDK V2WebSocketClient for cleanup */ + private static final ConcurrentHashMap activeConnections = + new ConcurrentHashMap<>(); // ======================================================================== // SECTION 8: SESSION ROUTES @@ -319,7 +234,7 @@ private static void handleSession(Context ctx) { } // ======================================================================== - // SECTION 9: API ROUTES & WEBSOCKET PROXY + // SECTION 9: METADATA ROUTE // ======================================================================== /** @@ -355,241 +270,138 @@ private static void handleMetadata(Context ctx) { } } + // ======================================================================== + // SECTION 10: WEBSOCKET ROUTE - Flux Proxy + // ======================================================================== + /** - * Configures the /api/flux WebSocket endpoint. - * Validates JWT from subprotocol on upgrade, then creates a bidirectional - * proxy to Deepgram's Flux API. + * Configures the WebSocket endpoint for Flux transcription. + * Acts as a bidirectional proxy: browser <-> Javalin <-> Deepgram SDK V2 WebSocket. + * + * The SDK handles the outbound connection to Deepgram, authentication, + * and WebSocket lifecycle. This handler bridges the browser's WebSocket + * connection to the SDK's WebSocket client. * * @param ws Javalin WebSocket config */ - private static void handleFluxWebSocket(WsConfig ws) { + private static void handleFlux(WsConfig ws) { ws.onConnect(ctx -> { - // Validate JWT from access_token. subprotocol - String protocols = ctx.header("Sec-WebSocket-Protocol"); - String validProto = validateWsToken(protocols); - if (validProto == null) { - System.out.println("WebSocket auth failed: invalid or missing token"); - ctx.closeSession(4401, "Unauthorized"); + // Authenticate via subprotocol header + List subprotocols = ctx.header("Sec-WebSocket-Protocol") != null + ? List.of(ctx.header("Sec-WebSocket-Protocol").split(",\\s*")) + : List.of(); + + if (!validateWsToken(subprotocols)) { + ctx.closeSession(4001, "Unauthorized"); return; } - System.out.println("Client connected to /api/flux (authenticated)"); - activeConnections.add(ctx); + // Parse query params with defaults + String model = ctx.queryParam("model") != null ? ctx.queryParam("model") : "flux-general-en"; + String encoding = ctx.queryParam("encoding") != null ? ctx.queryParam("encoding") : "linear16"; + String sampleRate = ctx.queryParam("sample_rate") != null ? ctx.queryParam("sample_rate") : "16000"; - // Build the Deepgram URL with forwarded query parameters - String deepgramUrl = buildDeepgramUrl(ctx); - System.out.println("Connecting to Deepgram Flux: " + deepgramUrl); + // Create SDK client for this connection + DeepgramClient dgClient = DeepgramClient.builder() + .apiKey(apiKey) + .build(); - // Create outbound WebSocket connection to Deepgram - try { - ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); - upgradeRequest.setHeader("Authorization", "Token " + apiKey); + V2WebSocketClient dgWs = dgClient.listen().v2().v2WebSocket(); - DeepgramSocket dgSocket = new DeepgramSocket(ctx); - // Store the Deepgram socket on the ctx attribute map for forwarding - ctx.attribute("deepgramSocket", dgSocket); + // Forward all text messages (transcripts/events) from Deepgram to browser + dgWs.onMessage(json -> { + try { + if (ctx.session.isOpen()) { + ctx.send(json); + } + } catch (Exception e) { + System.err.println("Error forwarding transcript to browser: " + e.getMessage()); + } + }); - wsClient.connect(dgSocket, URI.create(deepgramUrl), upgradeRequest); - } catch (Exception e) { - System.err.println("Failed to connect to Deepgram: " + e.getMessage()); - ctx.closeSession(1011, "Failed to connect to Deepgram"); - } - }); + dgWs.onError(e -> { + System.err.println("Deepgram WebSocket error: " + e.getMessage()); + try { + if (ctx.session.isOpen()) { + ctx.closeSession(1011, "Deepgram connection error"); + } + } catch (Exception ignored) {} + }); - // Forward client messages to Deepgram - ws.onMessage(ctx -> { - DeepgramSocket dgSocket = ctx.attribute("deepgramSocket"); - if (dgSocket != null && dgSocket.isOpen()) { - String text = ctx.message(); - dgSocket.sendText(text); - } + dgWs.onDisconnected(reason -> { + try { + if (ctx.session.isOpen()) { + ctx.closeSession(1000, "Deepgram disconnected"); + } + } catch (Exception ignored) {} + activeConnections.remove(ctx); + }); + + // Build connection options using SDK builder + V2ConnectOptions.Builder optionsBuilder = (V2ConnectOptions.Builder) + V2ConnectOptions.builder() + .model(model); + optionsBuilder + .encoding(ListenV2Encoding.valueOf(encoding)) + .sampleRate(ListenV2SampleRate.of(Integer.parseInt(sampleRate))); + V2ConnectOptions options = optionsBuilder.build(); + + // Connect to Deepgram via SDK + dgWs.connect(options).thenRun(() -> { + activeConnections.put(ctx, dgWs); + System.out.println("Flux session started (model=" + model + ")"); + }).exceptionally(e -> { + System.err.println("Failed to connect to Deepgram: " + e.getMessage()); + try { + ctx.closeSession(1011, "Failed to connect to Deepgram"); + } catch (Exception ignored) {} + return null; + }); }); - // Forward client binary messages to Deepgram + // Forward audio binary from browser to Deepgram via SDK ws.onBinaryMessage(ctx -> { - DeepgramSocket dgSocket = ctx.attribute("deepgramSocket"); - if (dgSocket != null && dgSocket.isOpen()) { + V2WebSocketClient dgWs = activeConnections.get(ctx); + if (dgWs != null) { byte[] data = ctx.data(); int offset = ctx.offset(); int length = ctx.length(); - byte[] bytes = new byte[length]; - System.arraycopy(data, offset, bytes, 0, length); - dgSocket.sendBinary(ByteBuffer.wrap(bytes)); + byte[] audioData = new byte[length]; + System.arraycopy(data, offset, audioData, 0, length); + dgWs.sendMedia(ByteString.of(audioData)); } }); - // Handle client disconnect + // Handle client disconnect - clean up Deepgram connection ws.onClose(ctx -> { - int code = ctx.status(); - String reason = ctx.reason() != null ? ctx.reason() : ""; - System.out.println("Client disconnected: " + code + " " + reason); - - DeepgramSocket dgSocket = ctx.attribute("deepgramSocket"); - if (dgSocket != null && dgSocket.isOpen()) { - dgSocket.close(1000, "Client disconnected"); + V2WebSocketClient dgWs = activeConnections.remove(ctx); + if (dgWs != null) { + try { + dgWs.disconnect(); + } catch (Exception ignored) {} } - activeConnections.remove(ctx); + System.out.println("Flux session ended"); }); // Handle client errors ws.onError(ctx -> { - Throwable error = ctx.error(); - if (error != null) { - System.err.println("Client WebSocket error: " + error.getMessage()); - } - DeepgramSocket dgSocket = ctx.attribute("deepgramSocket"); - if (dgSocket != null && dgSocket.isOpen()) { - dgSocket.close(1011, "Client error"); - } - activeConnections.remove(ctx); - }); - } - - /** - * Jetty WebSocket endpoint for the outbound connection to Deepgram. - * Forwards all messages from Deepgram back to the connected client. - */ - @WebSocket - public static class DeepgramSocket { - - /** Reference to the client-side WsContext for forwarding messages */ - private final WsContext clientCtx; - - /** The Deepgram-side Jetty WebSocket session */ - private volatile Session deepgramSession; - - /** Message counters for debug logging */ - private int clientMessageCount = 0; - private int deepgramMessageCount = 0; - - public DeepgramSocket(WsContext clientCtx) { - this.clientCtx = clientCtx; - } - - @OnWebSocketConnect - public void onOpen(Session session) { - this.deepgramSession = session; - System.out.println("Connected to Deepgram Flux API"); - } - - /** - * Forwards text messages from Deepgram to the client. - */ - @OnWebSocketMessage - public void onTextMessage(Session session, String message) { - deepgramMessageCount++; - if (deepgramMessageCount % 10 == 0) { - System.out.println("<- Deepgram text message #" + deepgramMessageCount - + " (size: " + message.length() + ")"); - } - try { - if (clientCtx.session.isOpen()) { - clientCtx.send(message); - } - } catch (Exception e) { - System.err.println("Error forwarding Deepgram text to client: " + e.getMessage()); - } - } - - /** - * Forwards binary messages from Deepgram to the client. - */ - @OnWebSocketMessage - public void onBinaryMessage(byte[] payload, int offset, int len) { - deepgramMessageCount++; - if (deepgramMessageCount % 10 == 0) { - System.out.println("<- Deepgram binary message #" + deepgramMessageCount - + " (size: " + len + ")"); - } - try { - if (clientCtx.session.isOpen()) { - byte[] data = new byte[len]; - System.arraycopy(payload, offset, data, 0, len); - clientCtx.send(ByteBuffer.wrap(data)); - } - } catch (Exception e) { - System.err.println("Error forwarding Deepgram binary to client: " + e.getMessage()); - } - } - - @OnWebSocketClose - public void onClose(int statusCode, String reason) { - System.out.println("Deepgram connection closed: " + statusCode + " " + reason); - try { - if (clientCtx.session.isOpen()) { - int safeCode = getSafeCloseCode(statusCode); - clientCtx.closeSession(safeCode, reason != null ? reason : ""); - } - } catch (Exception e) { - System.err.println("Error closing client after Deepgram close: " + e.getMessage()); - } - } - - @OnWebSocketError - public void onError(Throwable cause) { - System.err.println("Deepgram WebSocket error: " + cause.getMessage()); - try { - if (clientCtx.session.isOpen()) { - clientCtx.closeSession(1011, "Deepgram connection error"); - } - } catch (Exception e) { - System.err.println("Error closing client after Deepgram error: " + e.getMessage()); - } - } - - /** Check if the Deepgram connection is open */ - public boolean isOpen() { - return deepgramSession != null && deepgramSession.isOpen(); - } - - /** Send text data to Deepgram */ - public void sendText(String text) { - clientMessageCount++; - if (clientMessageCount % 100 == 0) { - System.out.println("-> Client text message #" + clientMessageCount - + " (size: " + text.length() + ")"); - } - if (isOpen()) { + V2WebSocketClient dgWs = activeConnections.remove(ctx); + if (dgWs != null) { try { - deepgramSession.getRemote().sendString(text); - } catch (Exception e) { - System.err.println("Error sending text to Deepgram: " + e.getMessage()); - } + dgWs.disconnect(); + } catch (Exception ignored) {} } - } - - /** Send binary data to Deepgram */ - public void sendBinary(ByteBuffer data) { - clientMessageCount++; - if (clientMessageCount % 100 == 0) { - System.out.println("-> Client binary message #" + clientMessageCount - + " (size: " + data.remaining() + ")"); - } - if (isOpen()) { - try { - deepgramSession.getRemote().sendBytes(data); - } catch (Exception e) { - System.err.println("Error sending binary to Deepgram: " + e.getMessage()); - } - } - } - - /** Close the Deepgram connection */ - public void close(int code, String reason) { - if (isOpen()) { - deepgramSession.close(code, reason); - } - } + }); } // ======================================================================== - // SECTION 10: SERVER START + // SECTION 11: SERVER START // ======================================================================== /** * Application entry point. Loads configuration, validates the API key, - * initializes the Jetty WebSocket client, and starts the Javalin server. + * and starts the Javalin HTTP server with WebSocket support. * * @param args Command-line arguments (unused) */ @@ -597,15 +409,6 @@ public static void main(String[] args) { // Load API key (exits if missing) apiKey = loadApiKey(); - // Initialize Jetty WebSocket client for outbound Deepgram connections - wsClient = new WebSocketClient(); - try { - wsClient.start(); - } catch (Exception e) { - System.err.println("Failed to start WebSocket client: " + e.getMessage()); - System.exit(1); - } - // Create Javalin app with CORS enabled Javalin app = Javalin.create(config -> { config.bundledPlugins.enableCors(cors -> { @@ -626,32 +429,8 @@ public static void main(String[] args) { ctx.json(Map.of("status", "ok")); }); - // WebSocket proxy route (authenticated via subprotocol) - app.ws("/api/flux", App::handleFluxWebSocket); - - // Graceful shutdown hook - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("Shutting down..."); - - // Close all active client WebSocket connections - System.out.println("Closing " + activeConnections.size() + " active connection(s)..."); - for (WsContext wsCtx : activeConnections) { - try { - wsCtx.closeSession(1001, "Server shutting down"); - } catch (Exception e) { - System.err.println("Error closing WebSocket: " + e.getMessage()); - } - } - - // Stop the Jetty WebSocket client - try { - wsClient.stop(); - } catch (Exception e) { - System.err.println("Error stopping WebSocket client: " + e.getMessage()); - } - - System.out.println("Shutdown complete"); - })); + // WebSocket route for Flux transcription (auth via subprotocol) + app.ws("/api/flux", App::handleFlux); // Start the server app.start(HOST, PORT);