Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,29 @@ impl Host {
self.host_controller.get_module_host(self.replica_id).await
}

/// Wait for the module host to become available, retrying with backoff.
///
/// This is useful for routes like `/schema` that may be called while the
/// database is still loading. Instead of returning an immediate 500, we
/// poll for up to `timeout` before giving up.
pub async fn wait_for_module(&self, timeout: std::time::Duration) -> Result<ModuleHost, NoSuchModule> {
let deadline = tokio::time::Instant::now() + timeout;
let mut interval = tokio::time::Duration::from_millis(100);
loop {
match self.host_controller.get_module_host(self.replica_id).await {
Ok(module) => return Ok(module),
Err(NoSuchModule) => {
if tokio::time::Instant::now() >= deadline {
return Err(NoSuchModule);
}
tokio::time::sleep(interval).await;
// Exponential backoff: 100ms, 200ms, 400ms, 800ms, 1s, 1s, ...
interval = (interval * 2).min(tokio::time::Duration::from_secs(1));
}
}
}
}

pub async fn module_watcher(&self) -> Result<watch::Receiver<ModuleHost>, NoSuchModule> {
self.host_controller.watch_module_host(self.replica_id).await
}
Expand Down
9 changes: 8 additions & 1 deletion crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,14 @@ pub async fn schema<S>(
where
S: ControlStateDelegate + NodeDelegate,
{
let (module, _) = find_module_and_database(&worker_ctx, name_or_identity).await?;
let (leader, _) = find_leader_and_database(&worker_ctx, name_or_identity).await?;
// Wait for the module to finish loading rather than returning an immediate
// 500 error. The database may still be initializing (replaying the log,
// running init reducers, etc.).
let module = leader
.wait_for_module(std::time::Duration::from_secs(10))
.await
.map_err(log_and_500)?;

let module_def = &module.info.module_def;
let response_json = match version {
Expand Down
Loading