Skip to content
This repository was archived by the owner on Feb 6, 2026. It is now read-only.

Commit 69ff778

Browse files
authored
Merge pull request #8322 from systeminit/victor/eng-3338-migrate-func-runs-and-func-run-logs-back-to-the-si-database
feat: add func runs and func run logs migration to sdf
2 parents e81eefb + e4d015a commit 69ff778

77 files changed

Lines changed: 3064 additions & 423 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/sdf/src/args.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ pub(crate) struct Args {
164164
#[arg(long, env = "SI_BACKFILL_CACHE_TYPES")]
165165
pub(crate) backfill_cache_types: Option<String>,
166166

167-
/// Key batch size for PostgreSQL queries during backfill
167+
/// Batch size for PostgreSQL queries during backfill (layer cache and func runs)
168168
#[arg(long, default_value = "1000", env = "SI_BACKFILL_KEY_BATCH_SIZE")]
169169
pub(crate) backfill_key_batch_size: usize,
170170

@@ -180,6 +180,14 @@ pub(crate) struct Args {
180180
#[arg(long, default_value = "5", env = "SI_BACKFILL_MAX_CONCURRENT_UPLOADS")]
181181
pub(crate) backfill_max_concurrent_uploads: usize,
182182

183+
/// Cutoff ID for func runs backfill (optional, start from a specific func run ID)
184+
#[arg(long, env = "SI_BACKFILL_FUNC_RUNS_CUTOFF_ID")]
185+
pub(crate) backfill_func_runs_cutoff_id: Option<String>,
186+
187+
/// Cutoff ID for func run logs backfill (optional, start from a specific func run log ID)
188+
#[arg(long, env = "SI_BACKFILL_FUNC_RUN_LOGS_CUTOFF_ID")]
189+
pub(crate) backfill_func_run_logs_cutoff_id: Option<String>,
190+
183191
/// Veritech encryption key file location [default: /run/sdf/veritech_encryption.key]
184192
#[arg(long)]
185193
pub(crate) veritech_encryption_key_path: Option<PathBuf>,
@@ -502,6 +510,17 @@ fn build_config_map(args: Args, config_map: &mut ConfigMap) -> &ConfigMap {
502510
i64::try_from(args.backfill_max_concurrent_uploads).unwrap_or(5),
503511
);
504512

513+
if let Some(backfill_func_runs_cutoff_id) = args.backfill_func_runs_cutoff_id {
514+
config_map.set("backfill_func_runs_cutoff_id", backfill_func_runs_cutoff_id);
515+
}
516+
517+
if let Some(backfill_func_run_logs_cutoff_id) = args.backfill_func_run_logs_cutoff_id {
518+
config_map.set(
519+
"backfill_func_run_logs_cutoff_id",
520+
backfill_func_run_logs_cutoff_id,
521+
);
522+
}
523+
505524
config_map.set("nats.connection_name", NAME);
506525
config_map.set("pg.application_name", NAME);
507526
config_map.set("layer_db_config.pg_pool_config.application_name", NAME);

bin/sdf/src/main.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use innit_client::InnitClient;
99
use sdf_server::{
1010
BackfillConfig,
1111
Config,
12+
FuncRunsBackfiller,
1213
LayerCacheBackfiller,
1314
Migrator,
1415
Server,
@@ -157,6 +158,18 @@ async fn async_main() -> Result<()> {
157158
telemetry_shutdown,
158159
)
159160
.await
161+
} else if config.migration_mode().is_backfill_func_runs() {
162+
backfill_func_runs(
163+
config,
164+
main_tracker,
165+
main_token,
166+
helping_tasks_tracker,
167+
helping_tasks_token,
168+
telemetry_tracker,
169+
telemetry_token,
170+
telemetry_shutdown,
171+
)
172+
.await
160173
} else {
161174
run_server(
162175
config,
@@ -396,3 +409,47 @@ async fn generate_symmetric_key(
396409
.await
397410
.map_err(Into::into)
398411
}
412+
413+
#[inline]
414+
#[allow(clippy::too_many_arguments)]
415+
async fn backfill_func_runs(
416+
config: Config,
417+
main_tracker: TaskTracker,
418+
main_token: CancellationToken,
419+
helping_tasks_tracker: TaskTracker,
420+
helping_tasks_token: CancellationToken,
421+
telemetry_tracker: TaskTracker,
422+
telemetry_token: CancellationToken,
423+
telemetry_shutdown: TelemetryShutdownGuard,
424+
) -> Result<()> {
425+
// Extract parameters before config is moved
426+
let func_run_cutoff_id = config
427+
.backfill_func_runs_cutoff_id()
428+
.and_then(|id_str| id_str.parse().ok());
429+
let func_run_log_cutoff_id = config
430+
.backfill_func_run_logs_cutoff_id()
431+
.and_then(|id_str| id_str.parse().ok());
432+
let batch_size = config.backfill_key_batch_size() as i64;
433+
434+
let handle = main_tracker.spawn(
435+
FuncRunsBackfiller::upload_all_func_runs_and_logs_concurrently(
436+
config,
437+
helping_tasks_tracker.clone(),
438+
helping_tasks_token.clone(),
439+
main_token.clone(),
440+
func_run_cutoff_id,
441+
func_run_log_cutoff_id,
442+
batch_size,
443+
),
444+
);
445+
446+
shutdown::graceful_with_handle(handle)
447+
.group(main_tracker, main_token)
448+
.group(helping_tasks_tracker, helping_tasks_token)
449+
.group(telemetry_tracker, telemetry_token)
450+
.telemetry_guard(telemetry_shutdown.into_future())
451+
.timeout(GRACEFUL_SHUTDOWN_TIMEOUT)
452+
.wait()
453+
.await
454+
.map_err(Into::into)
455+
}

0 commit comments

Comments
 (0)