From abf277b9be4b330dba49d4d28302092360fbbf01 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 5 Mar 2026 12:48:14 -0700 Subject: [PATCH 1/2] fix(compaction): add immediate_compaction option, default false When immediate_compaction is false (the new default), segments within the cooldown period cannot be compacted. This prevents compaction from locking in data from an orphaned blockchain fork before a reorg settles. Previously, "hot" segments (within cooldown) could still compact with same-generation segments, which could mix finalized and non-finalized blocks and create fragmentation that adds dignificant delays to reorg resolution. --- crates/config/src/worker_core.rs | 6 ++++++ crates/core/worker-core/src/compaction/algorithm.rs | 8 +++++++- crates/core/worker-core/src/config.rs | 3 +++ docs/config.sample.toml | 1 + docs/config/ampd.spec.json | 5 +++++ 5 files changed, 22 insertions(+), 1 deletion(-) diff --git a/crates/config/src/worker_core.rs b/crates/config/src/worker_core.rs index 47c823ab9..336059989 100644 --- a/crates/config/src/worker_core.rs +++ b/crates/config/src/worker_core.rs @@ -295,6 +295,10 @@ impl From<&CompactorConfig> for amp_worker_core::CompactorConfig { pub struct CompactionAlgorithmConfig { /// Base cooldown duration in seconds between compaction runs (default: 1024). pub cooldown_duration: ConfigDuration<1024>, + /// Allow compaction of recently created segments (default: false). + /// When false, segments must wait for the cooldown period before compaction. + /// When true, segments can be compacted immediately with same-generation segments. + pub immediate_compaction: bool, /// Eager compaction limits (flattened fields: `overflow`, `bytes`, `rows`). #[serde( flatten, @@ -309,6 +313,7 @@ impl Default for CompactionAlgorithmConfig { fn default() -> Self { Self { cooldown_duration: ConfigDuration::default(), + immediate_compaction: false, eager_compaction_limit: SizeLimitConfig::default_eager_limit(), } } @@ -318,6 +323,7 @@ impl From<&CompactionAlgorithmConfig> for amp_worker_core::CompactionAlgorithmCo fn from(config: &CompactionAlgorithmConfig) -> Self { Self { cooldown_duration: (&config.cooldown_duration).into(), + immediate_compaction: config.immediate_compaction, eager_compaction_limit: (&config.eager_compaction_limit).into(), } } diff --git a/crates/core/worker-core/src/compaction/algorithm.rs b/crates/core/worker-core/src/compaction/algorithm.rs index 23b28efff..d038e02ad 100644 --- a/crates/core/worker-core/src/compaction/algorithm.rs +++ b/crates/core/worker-core/src/compaction/algorithm.rs @@ -160,6 +160,7 @@ use crate::{ /// ## Fields /// - `cooldown_duration`: The base duration used to calculate /// the cooldown period for files based on their generation. +/// - `immediate_compaction`: Whether to allow compaction of recently created segments. /// - `target_partition_size`: The upper bound for segment size limits. /// Files exceeding this limit will not be compacted together. This /// value must be non-unbounded. @@ -170,6 +171,8 @@ pub struct CompactionAlgorithm { /// The amount of time a file must wait before it can be /// compacted with files of different generations. pub cooldown_duration: Duration, + /// Whether to allow compaction of recently created segments. + pub immediate_compaction: bool, /// The upper bound for segment size limits. Files exceeding this limit /// will not be compacted together. This value must be non-unbounded. pub target_partition_size: SegmentSizeLimit, @@ -246,7 +249,9 @@ impl CompactionAlgorithm { } else if state == FileState::Hot { // For hot files, only compact if size limit is not exceeded, // and both files share the same generation. - group.size.generation == candidate.size.generation && !*size_exceeded + self.immediate_compaction + && group.size.generation == candidate.size.generation + && !*size_exceeded } else { // For cold files, compact regardless of generation, // as long as size limit is not exceeded. @@ -301,6 +306,7 @@ impl<'a> From<&'a ParquetConfig> for CompactionAlgorithm { fn from(config: &'a ParquetConfig) -> Self { CompactionAlgorithm { cooldown_duration: config.compactor.algorithm.cooldown_duration.clone().into(), + immediate_compaction: config.compactor.algorithm.immediate_compaction, target_partition_size: SegmentSizeLimit::from(&config.target_size), eager_compaction_limit: SegmentSizeLimit::from( &config.compactor.algorithm.eager_compaction_limit, diff --git a/crates/core/worker-core/src/config.rs b/crates/core/worker-core/src/config.rs index 23cea9519..39178f40d 100644 --- a/crates/core/worker-core/src/config.rs +++ b/crates/core/worker-core/src/config.rs @@ -107,6 +107,8 @@ impl Default for CompactorConfig { pub struct CompactionAlgorithmConfig { /// Base cooldown duration in seconds (default: 1024.0) pub cooldown_duration: ConfigDuration<1024>, + /// Allow compaction of recently created segments (default: false) + pub immediate_compaction: bool, /// Eager compaction limits pub eager_compaction_limit: SizeLimitConfig, } @@ -115,6 +117,7 @@ impl Default for CompactionAlgorithmConfig { fn default() -> Self { Self { cooldown_duration: ConfigDuration::default(), + immediate_compaction: false, eager_compaction_limit: SizeLimitConfig { bytes: 0, ..Default::default() diff --git a/docs/config.sample.toml b/docs/config.sample.toml index a0784dfb9..39a2678b3 100644 --- a/docs/config.sample.toml +++ b/docs/config.sample.toml @@ -94,6 +94,7 @@ url = "postgres://" # write_concurrency = 2 # Max concurrent compaction write operations (default: 2) # min_interval = 1.0 # Interval in seconds to run the compactor (default: 1.0) # cooldown_duration = 1024.0 # Base cooldown duration in seconds (default: 1024.0) +# immediate_compaction = false # Allow compaction of recently created segments (default: false) # overflow = "1" # Eager compaction overflow (default: "1") # bytes = 0 # Eager compaction byte threshold (default: 0) # rows = 0 # Eager compaction row threshold (default: 0) diff --git a/docs/config/ampd.spec.json b/docs/config/ampd.spec.json index 650923348..68d12a5d1 100644 --- a/docs/config/ampd.spec.json +++ b/docs/config/ampd.spec.json @@ -143,6 +143,11 @@ "description": "Base cooldown duration in seconds between compaction runs (default: 1024).", "$ref": "#/$defs/ConfigDuration" }, + "immediate_compaction": { + "description": "Allow compaction of recently created segments (default: false).\nWhen false, segments must wait for the cooldown period before compaction.\nWhen true, segments can be compacted immediately with same-generation segments.", + "type": "boolean", + "default": false + }, "metadata_concurrency": { "description": "Maximum concurrent metadata operations (default: 2).", "type": "integer", From e096d6b71baedc0608f102a408465b16d5f48920 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 5 Mar 2026 13:25:01 -0700 Subject: [PATCH 2/2] fix test --- crates/core/worker-core/src/compaction/algorithm.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/worker-core/src/compaction/algorithm.rs b/crates/core/worker-core/src/compaction/algorithm.rs index d038e02ad..02f4e530a 100644 --- a/crates/core/worker-core/src/compaction/algorithm.rs +++ b/crates/core/worker-core/src/compaction/algorithm.rs @@ -220,7 +220,7 @@ impl CompactionAlgorithm { let is_hot = self.is_hot(segment); match is_hot { - TestResult::Skipped => FileState::Hot, + TestResult::Skipped => FileState::Cold, TestResult::Activated(true) => FileState::Hot, TestResult::Activated(false) => FileState::Cold, }