Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 196 additions & 16 deletions ds4_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -9208,15 +9208,103 @@ typedef struct {
/* SSE keepalive during long prefill: send HTTP/SSE headers ahead of
* generation and emit a `:` comment line every few seconds so HTTP/TCP
* idle timeouts on the client side don't close the connection while the
* server is busy doing prefill. */
* server is busy doing prefill. The wall-clock keepalive thread (see
* server_prefill_keepalive) emits comments independently of the prefill
* progress callback so a stall inside a single prefill chunk still keeps
* the connection alive. */
int fd;
bool stream;
bool enable_cors;
bool headers_sent;
bool stream_failed;
double last_keepalive;
struct server_prefill_keepalive *keepalive;
} server_prefill_progress;

/* Wall-clock keepalive thread state. Decoupled from the prefill_chunk
* progress callback because internal stalls inside a single chunk (slow
* matmul tile, KV disk fetch, etc.) prevent the callback from firing for
* minutes at a time; the server would then sit silent on the socket and the
* client would hit its body-idle timeout. The thread ticks roughly once a
* second and writes `: prefill\n\n` whenever 5s have elapsed since the last
* keepalive event. Both the thread and the progress callback acquire `mu`
* before touching the fd or any of the shared keepalive fields on
* server_prefill_progress. */
typedef struct server_prefill_keepalive {
server_prefill_progress *progress;
pthread_t thread;
pthread_mutex_t mu;
pthread_cond_t cv;
bool active;
bool stop;
} server_prefill_keepalive;

static void *server_prefill_keepalive_thread(void *ud) {
server_prefill_keepalive *ka = ud;
server_prefill_progress *p = ka->progress;
pthread_mutex_lock(&ka->mu);
while (!ka->stop) {
struct timeval tv;
gettimeofday(&tv, NULL);
struct timespec ts;
ts.tv_sec = tv.tv_sec + 1;
ts.tv_nsec = (long)tv.tv_usec * 1000;
pthread_cond_timedwait(&ka->cv, &ka->mu, &ts);
if (ka->stop) break;
if (!p->stream || p->fd < 0 || p->stream_failed) continue;
double now = now_sec();
if (!p->headers_sent) {
p->headers_sent = true;
if (sse_headers(p->fd, p->enable_cors)) {
p->last_keepalive = now;
} else {
p->stream_failed = true;
}
} else if (now - p->last_keepalive >= 5.0) {
static const char ka_msg[] = ": prefill\n\n";
if (send_all(p->fd, ka_msg, sizeof(ka_msg) - 1)) {
p->last_keepalive = now;
} else {
p->stream_failed = true;
}
}
}
pthread_mutex_unlock(&ka->mu);
return NULL;
}

static void server_prefill_keepalive_attach(server_prefill_keepalive *ka,
server_prefill_progress *p) {
memset(ka, 0, sizeof(*ka));
ka->progress = p;
if (!p || !p->stream || p->fd < 0) return;
pthread_mutex_init(&ka->mu, NULL);
pthread_cond_init(&ka->cv, NULL);
if (pthread_create(&ka->thread, NULL,
server_prefill_keepalive_thread, ka) != 0) {
pthread_mutex_destroy(&ka->mu);
pthread_cond_destroy(&ka->cv);
return;
}
ka->active = true;
p->keepalive = ka;
}

static void server_prefill_keepalive_detach(server_prefill_keepalive *ka) {
if (!ka->active) return;
pthread_mutex_lock(&ka->mu);
ka->stop = true;
pthread_cond_signal(&ka->cv);
pthread_mutex_unlock(&ka->mu);
pthread_join(ka->thread, NULL);
pthread_mutex_destroy(&ka->mu);
pthread_cond_destroy(&ka->cv);
if (ka->progress && ka->progress->keepalive == ka) {
ka->progress->keepalive = NULL;
}
ka->active = false;
}

static void request_ctx_span(char *buf, size_t len, int cached, int prompt) {
int suffix = prompt - cached;
if (suffix < 0) suffix = 0;
Expand Down Expand Up @@ -9354,23 +9442,30 @@ static void server_progress_cb(void *ud, const char *event, int current, int tot
* comment line (`:` prefix, ignored by SSE clients) every few seconds.
* Best-effort: if the client has already gone away, the writes fail
* silently and the outer code will discover the closed socket the next
* time it tries to stream a real event. */
if (p->stream && p->fd >= 0 && !p->stream_failed) {
if (!p->headers_sent) {
p->headers_sent = true;
if (sse_headers(p->fd, p->enable_cors)) {
p->last_keepalive = now;
} else {
p->stream_failed = true;
}
} else if (now - p->last_keepalive >= 5.0) {
static const char ka[] = ": prefill\n\n";
if (send_all(p->fd, ka, sizeof(ka) - 1)) {
p->last_keepalive = now;
} else {
p->stream_failed = true;
* time it tries to stream a real event. The wall-clock keepalive thread
* (when attached) races with this callback on the same shared state, so
* the critical section is serialized through the keepalive mutex. */
if (p->stream && p->fd >= 0) {
server_prefill_keepalive *ka = p->keepalive;
if (ka) pthread_mutex_lock(&ka->mu);
if (!p->stream_failed) {
if (!p->headers_sent) {
p->headers_sent = true;
if (sse_headers(p->fd, p->enable_cors)) {
p->last_keepalive = now;
} else {
p->stream_failed = true;
}
} else if (now - p->last_keepalive >= 5.0) {
static const char ka_msg[] = ": prefill\n\n";
if (send_all(p->fd, ka_msg, sizeof(ka_msg) - 1)) {
p->last_keepalive = now;
} else {
p->stream_failed = true;
}
}
}
if (ka) pthread_mutex_unlock(&ka->mu);
}
double elapsed = now - p->t0;
if (p->seen && current == p->last_current) {
Expand Down Expand Up @@ -9639,9 +9734,12 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct
.headers_sent = true,
};
snprintf(rebuild_progress.ctx, sizeof(rebuild_progress.ctx), "%s", rebuild_ctx);
server_prefill_keepalive rebuild_ka;
server_prefill_keepalive_attach(&rebuild_ka, &rebuild_progress);
ds4_session_set_progress(s->session, server_progress_cb, &rebuild_progress);
if (ds4_session_sync(s->session, sync_prompt, sync_err, sizeof(sync_err)) == 0) {
ds4_session_set_progress(s->session, NULL, NULL);
server_prefill_keepalive_detach(&rebuild_ka);
const double rebuild_sec = now_sec() - rebuild_t0;
if (loaded > 0) {
server_log(DS4_LOG_KVCACHE,
Expand All @@ -9660,6 +9758,7 @@ static void canonicalize_tool_checkpoint(server *s, const job *j, const char *ct
}
} else {
ds4_session_set_progress(s->session, NULL, NULL);
server_prefill_keepalive_detach(&rebuild_ka);
server_log(DS4_LOG_KVCACHE,
"ds4-server: tool checkpoint rebuild failed ctx=%s request_ctx=%s source=%s cached=%d replay=%d target=%d error=\"%s\"",
rebuild_ctx, ctx, source, loaded, replay_tokens,
Expand Down Expand Up @@ -9906,6 +10005,8 @@ static void generate_job(server *s, job *j) {
ctx_span,
req_flags[0] ? " " : "",
req_flags);
server_prefill_keepalive keepalive;
server_prefill_keepalive_attach(&keepalive, &progress);
ds4_session_set_progress(s->session, server_progress_cb, &progress);

int cold_store_len = 0;
Expand Down Expand Up @@ -9943,6 +10044,7 @@ static void generate_job(server *s, job *j) {
ds4_tokens_free(&prefix);
ds4_tokens_free(&effective_prompt);
ds4_session_set_progress(s->session, NULL, NULL);
server_prefill_keepalive_detach(&keepalive);
kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last,
cold_store_len);
trace_event(s, trace_id, "prefill failed: %s", err);
Expand All @@ -9963,6 +10065,7 @@ static void generate_job(server *s, job *j) {
if (ds4_session_sync(s->session, prompt_for_sync, err, sizeof(err)) != 0) {
ds4_tokens_free(&effective_prompt);
ds4_session_set_progress(s->session, NULL, NULL);
server_prefill_keepalive_detach(&keepalive);
kv_cache_restore_suppressed_continued(&s->kv, suppressed_continued_last,
cold_store_len);
trace_event(s, trace_id, "prefill failed: %s", err);
Expand All @@ -9975,6 +10078,7 @@ static void generate_job(server *s, job *j) {
if (!anthropic_live_continuation) anthropic_live_clear(s);
if (!thinking_live_continuation) thinking_live_clear(s);
ds4_session_set_progress(s->session, NULL, NULL);
server_prefill_keepalive_detach(&keepalive);
kv_cache_maybe_store_continued(s);
server_log(DS4_LOG_PREFILL,
"ds4-server: %s ctx=%s%s%s prompt done %.3fs",
Expand Down Expand Up @@ -11755,6 +11859,80 @@ static void test_cors_sse_headers(void) {
close(sv[1]);
}

/* Without a wall-clock keepalive thread, a prefill stall that lasts longer
* than the client's idle timeout drops the socket because the progress
* callback never fires during the stall. The thread must emit
* `: prefill\n\n` even when nothing calls back. */
static void test_prefill_keepalive_thread_emits_without_progress_callback(void) {
int sv[2];
TEST_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
if (sv[0] < 0 || sv[1] < 0) return;

server_prefill_progress progress = {
.fd = sv[0],
.stream = true,
.enable_cors = false,
.headers_sent = true,
/* Force the very first tick to clear the 5s guard. */
.last_keepalive = now_sec() - 10.0,
};
server_prefill_keepalive ka;
server_prefill_keepalive_attach(&ka, &progress);
TEST_ASSERT(ka.active);

/* The thread waits up to one second between ticks; sleep long enough
* for at least one tick to fire without invoking server_progress_cb. */
struct timespec ts = {.tv_sec = 1, .tv_nsec = 500 * 1000 * 1000L};
nanosleep(&ts, NULL);

server_prefill_keepalive_detach(&ka);
shutdown(sv[0], SHUT_WR);
char *out = read_socket_text(sv[1]);
TEST_ASSERT(out != NULL && strstr(out, ": prefill\n\n") != NULL);
TEST_ASSERT(!progress.stream_failed);

free(out);
close(sv[0]);
close(sv[1]);
}

/* When the progress callback never fires (stall before the first chunk),
* the thread is still responsible for sending SSE headers so the client
* sees a live response. */
static void test_prefill_keepalive_thread_sends_headers_when_callback_silent(void) {
int sv[2];
TEST_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
if (sv[0] < 0 || sv[1] < 0) return;

server_prefill_progress progress = {
.fd = sv[0],
.stream = true,
.enable_cors = true,
.headers_sent = false,
.last_keepalive = 0.0,
};
server_prefill_keepalive ka;
server_prefill_keepalive_attach(&ka, &progress);
TEST_ASSERT(ka.active);

struct timespec ts = {.tv_sec = 1, .tv_nsec = 500 * 1000 * 1000L};
nanosleep(&ts, NULL);

server_prefill_keepalive_detach(&ka);
shutdown(sv[0], SHUT_WR);
char *out = read_socket_text(sv[1]);
TEST_ASSERT(out != NULL);
TEST_ASSERT(strstr(out, "HTTP/1.1 200 OK") != NULL);
TEST_ASSERT(strstr(out, "Content-Type: text/event-stream") != NULL);
TEST_ASSERT(strstr(out, "Access-Control-Allow-Origin: *") != NULL);
TEST_ASSERT(progress.headers_sent);
TEST_ASSERT(!progress.stream_failed);

free(out);
close(sv[0]);
close(sv[1]);
}

static void test_anthropic_live_stream_sends_incremental_blocks(void) {
int sv[2];
TEST_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
Expand Down Expand Up @@ -14715,6 +14893,8 @@ static void ds4_server_unit_tests_run(void) {
test_cors_headers_are_opt_in();
test_cors_preflight_response_is_no_content();
test_cors_sse_headers();
test_prefill_keepalive_thread_emits_without_progress_callback();
test_prefill_keepalive_thread_sends_headers_when_callback_silent();
test_anthropic_live_stream_sends_incremental_blocks();
test_anthropic_usage_reports_cache_details();
test_anthropic_tool_stream_sends_live_tool_use();
Expand Down