From 85cf434591788adc1672d9cb2114546d42c8b6f1 Mon Sep 17 00:00:00 2001 From: njbrake Date: Fri, 22 May 2026 14:36:36 +0000 Subject: [PATCH] ds4-server: emit SSE keepalive on wall-clock timer The existing keepalive only fired from inside the prefill_chunk progress callback, so a stall inside a single chunk could leave the socket silent for many minutes (observed: 962s spent on the last 107 tokens of a 1046 token prefill). Clients with body-idle timeouts drop the connection, and the final stream write fails with "client stream write failed". Add a small wall-clock keepalive thread that runs alongside prefill, ticks roughly once a second, and writes ": prefill\n\n" whenever 5s have elapsed since the last keepalive event, regardless of whether progress callbacks are firing. The thread also sends the SSE headers if the callback has not done so yet. The thread and the callback share a mutex so fd writes and shared state (headers_sent, stream_failed, last_keepalive) never interleave. Attached and detached around every ds4_session_sync call site: the main prefill, the cold-prefix prefill, and the tool checkpoint rebuild. Fixes antirez/ds4#222 --- ds4_server.c | 212 +++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 196 insertions(+), 16 deletions(-) diff --git a/ds4_server.c b/ds4_server.c index 435491fe..99539c58 100644 --- a/ds4_server.c +++ b/ds4_server.c @@ -9208,15 +9208,103 @@ typedef struct { /* SSE keepalive during long prefill: send HTTP/SSE headers ahead of * generation and emit a `:` comment line every few seconds so HTTP/TCP * idle timeouts on the client side don't close the connection while the - * server is busy doing prefill. */ + * server is busy doing prefill. The wall-clock keepalive thread (see + * server_prefill_keepalive) emits comments independently of the prefill + * progress callback so a stall inside a single prefill chunk still keeps + * the connection alive. */ int fd; bool stream; bool enable_cors; bool headers_sent; bool stream_failed; double last_keepalive; + struct server_prefill_keepalive *keepalive; } server_prefill_progress; +/* Wall-clock keepalive thread state. Decoupled from the prefill_chunk + * progress callback because internal stalls inside a single chunk (slow + * matmul tile, KV disk fetch, etc.) prevent the callback from firing for + * minutes at a time; the server would then sit silent on the socket and the + * client would hit its body-idle timeout. The thread ticks roughly once a + * second and writes `: prefill\n\n` whenever 5s have elapsed since the last + * keepalive event. Both the thread and the progress callback acquire `mu` + * before touching the fd or any of the shared keepalive fields on + * server_prefill_progress. */ +typedef struct server_prefill_keepalive { + server_prefill_progress *progress; + pthread_t thread; + pthread_mutex_t mu; + pthread_cond_t cv; + bool active; + bool stop; +} server_prefill_keepalive; + +static void *server_prefill_keepalive_thread(void *ud) { + server_prefill_keepalive *ka = ud; + server_prefill_progress *p = ka->progress; + pthread_mutex_lock(&ka->mu); + while (!ka->stop) { + struct timeval tv; + gettimeofday(&tv, NULL); + struct timespec ts; + ts.tv_sec = tv.tv_sec + 1; + ts.tv_nsec = (long)tv.tv_usec * 1000; + pthread_cond_timedwait(&ka->cv, &ka->mu, &ts); + if (ka->stop) break; + if (!p->stream || p->fd < 0 || p->stream_failed) continue; + double now = now_sec(); + if (!p->headers_sent) { + p->headers_sent = true; + if (sse_headers(p->fd, p->enable_cors)) { + p->last_keepalive = now; + } else { + p->stream_failed = true; + } + } else if (now - p->last_keepalive >= 5.0) { + static const char ka_msg[] = ": prefill\n\n"; + if (send_all(p->fd, ka_msg, sizeof(ka_msg) - 1)) { + p->last_keepalive = now; + } else { + p->stream_failed = true; + } + } + } + pthread_mutex_unlock(&ka->mu); + return NULL; +} + +static void server_prefill_keepalive_attach(server_prefill_keepalive *ka, + server_prefill_progress *p) { + memset(ka, 0, sizeof(*ka)); + ka->progress = p; + if (!p || !p->stream || p->fd < 0) return; + pthread_mutex_init(&ka->mu, NULL); + pthread_cond_init(&ka->cv, NULL); + if (pthread_create(&ka->thread, NULL, + server_prefill_keepalive_thread, ka) != 0) { + pthread_mutex_destroy(&ka->mu); + pthread_cond_destroy(&ka->cv); + return; + } + ka->active = true; + p->keepalive = ka; +} + +static void server_prefill_keepalive_detach(server_prefill_keepalive *ka) { + if (!ka->active) return; + pthread_mutex_lock(&ka->mu); + ka->stop = true; + pthread_cond_signal(&ka->cv); + pthread_mutex_unlock(&ka->mu); + pthread_join(ka->thread, NULL); + pthread_mutex_destroy(&ka->mu); + pthread_cond_destroy(&ka->cv); + if (ka->progress && ka->progress->keepalive == ka) { + ka->progress->keepalive = NULL; + } + ka->active = false; +} + static void request_ctx_span(char *buf, size_t len, int cached, int prompt) { int suffix = prompt - cached; if (suffix < 0) suffix = 0; @@ -9354,23 +9442,30 @@ static void server_progress_cb(void *ud, const char *event, int current, int tot * comment line (`:` prefix, ignored by SSE clients) every few seconds. * Best-effort: if the client has already gone away, the writes fail * silently and the outer code will discover the closed socket the next - * time it tries to stream a real event. */ - if (p->stream && p->fd >= 0 && !p->stream_failed) { - if (!p->headers_sent) { - p->headers_sent = true; - if (sse_headers(p->fd, p->enable_cors)) { - p->last_keepalive = now; - } else { - p->stream_failed = true; - } - } else if (now - p->last_keepalive >= 5.0) { - static const char ka[] = ": prefill\n\n"; - if (send_all(p->fd, ka, sizeof(ka) - 1)) { - p->last_keepalive = now; - } else { - p->stream_failed = true; + * time it tries to stream a real event. The wall-clock keepalive thread + * (when attached) races with this callback on the same shared state, so + * the critical section is serialized through the keepalive mutex. */ + if (p->stream && p->fd >= 0) { + server_prefill_keepalive *ka = p->keepalive; + if (ka) pthread_mutex_lock(&ka->mu); + if (!p->stream_failed) { + if (!p->headers_sent) { + p->headers_sent = true; + if (sse_headers(p->fd, p->enable_cors)) { + p->last_keepalive = now; + } else { + p->stream_failed = true; + } + } else if (now - p->last_keepalive >= 5.0) { + static const char ka_msg[] = ": prefill\n\n"; + if (send_all(p->fd, ka_msg, sizeof(ka_msg) - 1)) { + p->last_keepalive = now; + } else { + p->stream_failed = true; + } } } + if (ka) pthread_mutex_unlock(&ka->mu); } double elapsed = now - p->t0; if (p->seen && current == p->last_current) { @@ -9639,9 +9734,12 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct .headers_sent = true, }; snprintf(rebuild_progress.ctx, sizeof(rebuild_progress.ctx), "%s", rebuild_ctx); + server_prefill_keepalive rebuild_ka; + server_prefill_keepalive_attach(&rebuild_ka, &rebuild_progress); ds4_session_set_progress(s->session, server_progress_cb, &rebuild_progress); if (ds4_session_sync(s->session, sync_prompt, sync_err, sizeof(sync_err)) == 0) { ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&rebuild_ka); const double rebuild_sec = now_sec() - rebuild_t0; if (loaded > 0) { server_log(DS4_LOG_KVCACHE, @@ -9660,6 +9758,7 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct } } else { ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&rebuild_ka); server_log(DS4_LOG_KVCACHE, "ds4-server: tool checkpoint rebuild failed ctx=%s request_ctx=%s source=%s cached=%d replay=%d target=%d error=\"%s\"", rebuild_ctx, ctx, source, loaded, replay_tokens, @@ -9906,6 +10005,8 @@ static void generate_job(server *s, job *j) { ctx_span, req_flags[0] ? " " : "", req_flags); + server_prefill_keepalive keepalive; + server_prefill_keepalive_attach(&keepalive, &progress); ds4_session_set_progress(s->session, server_progress_cb, &progress); int cold_store_len = 0; @@ -9943,6 +10044,7 @@ static void generate_job(server *s, job *j) { ds4_tokens_free(&prefix); ds4_tokens_free(&effective_prompt); ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&keepalive); kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last, cold_store_len); trace_event(s, trace_id, "prefill failed: %s", err); @@ -9963,6 +10065,7 @@ static void generate_job(server *s, job *j) { if (ds4_session_sync(s->session, prompt_for_sync, err, sizeof(err)) != 0) { ds4_tokens_free(&effective_prompt); ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&keepalive); kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last, cold_store_len); trace_event(s, trace_id, "prefill failed: %s", err); @@ -9975,6 +10078,7 @@ static void generate_job(server *s, job *j) { if (!anthropic_live_continuation) anthropic_live_clear(s); if (!thinking_live_continuation) thinking_live_clear(s); ds4_session_set_progress(s->session, NULL, NULL); + server_prefill_keepalive_detach(&keepalive); kv_cache_maybe_store_continued(s); server_log(DS4_LOG_PREFILL, "ds4-server: %s ctx=%s%s%s prompt done %.3fs", @@ -11755,6 +11859,80 @@ static void test_cors_sse_headers(void) { close(sv[1]); } +/* Without a wall-clock keepalive thread, a prefill stall that lasts longer + * than the client's idle timeout drops the socket because the progress + * callback never fires during the stall. The thread must emit + * `: prefill\n\n` even when nothing calls back. */ +static void test_prefill_keepalive_thread_emits_without_progress_callback(void) { + int sv[2]; + TEST_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + if (sv[0] < 0 || sv[1] < 0) return; + + server_prefill_progress progress = { + .fd = sv[0], + .stream = true, + .enable_cors = false, + .headers_sent = true, + /* Force the very first tick to clear the 5s guard. */ + .last_keepalive = now_sec() - 10.0, + }; + server_prefill_keepalive ka; + server_prefill_keepalive_attach(&ka, &progress); + TEST_ASSERT(ka.active); + + /* The thread waits up to one second between ticks; sleep long enough + * for at least one tick to fire without invoking server_progress_cb. */ + struct timespec ts = {.tv_sec = 1, .tv_nsec = 500 * 1000 * 1000L}; + nanosleep(&ts, NULL); + + server_prefill_keepalive_detach(&ka); + shutdown(sv[0], SHUT_WR); + char *out = read_socket_text(sv[1]); + TEST_ASSERT(out != NULL && strstr(out, ": prefill\n\n") != NULL); + TEST_ASSERT(!progress.stream_failed); + + free(out); + close(sv[0]); + close(sv[1]); +} + +/* When the progress callback never fires (stall before the first chunk), + * the thread is still responsible for sending SSE headers so the client + * sees a live response. */ +static void test_prefill_keepalive_thread_sends_headers_when_callback_silent(void) { + int sv[2]; + TEST_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + if (sv[0] < 0 || sv[1] < 0) return; + + server_prefill_progress progress = { + .fd = sv[0], + .stream = true, + .enable_cors = true, + .headers_sent = false, + .last_keepalive = 0.0, + }; + server_prefill_keepalive ka; + server_prefill_keepalive_attach(&ka, &progress); + TEST_ASSERT(ka.active); + + struct timespec ts = {.tv_sec = 1, .tv_nsec = 500 * 1000 * 1000L}; + nanosleep(&ts, NULL); + + server_prefill_keepalive_detach(&ka); + shutdown(sv[0], SHUT_WR); + char *out = read_socket_text(sv[1]); + TEST_ASSERT(out != NULL); + TEST_ASSERT(strstr(out, "HTTP/1.1 200 OK") != NULL); + TEST_ASSERT(strstr(out, "Content-Type: text/event-stream") != NULL); + TEST_ASSERT(strstr(out, "Access-Control-Allow-Origin: *") != NULL); + TEST_ASSERT(progress.headers_sent); + TEST_ASSERT(!progress.stream_failed); + + free(out); + close(sv[0]); + close(sv[1]); +} + static void test_anthropic_live_stream_sends_incremental_blocks(void) { int sv[2]; TEST_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); @@ -14715,6 +14893,8 @@ static void ds4_server_unit_tests_run(void) { test_cors_headers_are_opt_in(); test_cors_preflight_response_is_no_content(); test_cors_sse_headers(); + test_prefill_keepalive_thread_emits_without_progress_callback(); + test_prefill_keepalive_thread_sends_headers_when_callback_silent(); test_anthropic_live_stream_sends_incremental_blocks(); test_anthropic_usage_reports_cache_details(); test_anthropic_tool_stream_sends_live_tool_use();