diff --git a/crates/config/src/worker_core.rs b/crates/config/src/worker_core.rs index 47c823ab9..336059989 100644 --- a/crates/config/src/worker_core.rs +++ b/crates/config/src/worker_core.rs @@ -295,6 +295,10 @@ impl From<&CompactorConfig> for amp_worker_core::CompactorConfig { pub struct CompactionAlgorithmConfig { /// Base cooldown duration in seconds between compaction runs (default: 1024). pub cooldown_duration: ConfigDuration<1024>, + /// Allow compaction of recently created segments (default: false). + /// When false, segments must wait for the cooldown period before compaction. + /// When true, segments can be compacted immediately with same-generation segments. + pub immediate_compaction: bool, /// Eager compaction limits (flattened fields: `overflow`, `bytes`, `rows`). #[serde( flatten, @@ -309,6 +313,7 @@ impl Default for CompactionAlgorithmConfig { fn default() -> Self { Self { cooldown_duration: ConfigDuration::default(), + immediate_compaction: false, eager_compaction_limit: SizeLimitConfig::default_eager_limit(), } } @@ -318,6 +323,7 @@ impl From<&CompactionAlgorithmConfig> for amp_worker_core::CompactionAlgorithmCo fn from(config: &CompactionAlgorithmConfig) -> Self { Self { cooldown_duration: (&config.cooldown_duration).into(), + immediate_compaction: config.immediate_compaction, eager_compaction_limit: (&config.eager_compaction_limit).into(), } } diff --git a/crates/core/worker-core/src/compaction/algorithm.rs b/crates/core/worker-core/src/compaction/algorithm.rs index 23b28efff..02f4e530a 100644 --- a/crates/core/worker-core/src/compaction/algorithm.rs +++ b/crates/core/worker-core/src/compaction/algorithm.rs @@ -160,6 +160,7 @@ use crate::{ /// ## Fields /// - `cooldown_duration`: The base duration used to calculate /// the cooldown period for files based on their generation. +/// - `immediate_compaction`: Whether to allow compaction of recently created segments. /// - `target_partition_size`: The upper bound for segment size limits. /// Files exceeding this limit will not be compacted together. This /// value must be non-unbounded. @@ -170,6 +171,8 @@ pub struct CompactionAlgorithm { /// The amount of time a file must wait before it can be /// compacted with files of different generations. pub cooldown_duration: Duration, + /// Whether to allow compaction of recently created segments. + pub immediate_compaction: bool, /// The upper bound for segment size limits. Files exceeding this limit /// will not be compacted together. This value must be non-unbounded. pub target_partition_size: SegmentSizeLimit, @@ -217,7 +220,7 @@ impl CompactionAlgorithm { let is_hot = self.is_hot(segment); match is_hot { - TestResult::Skipped => FileState::Hot, + TestResult::Skipped => FileState::Cold, TestResult::Activated(true) => FileState::Hot, TestResult::Activated(false) => FileState::Cold, } @@ -246,7 +249,9 @@ impl CompactionAlgorithm { } else if state == FileState::Hot { // For hot files, only compact if size limit is not exceeded, // and both files share the same generation. - group.size.generation == candidate.size.generation && !*size_exceeded + self.immediate_compaction + && group.size.generation == candidate.size.generation + && !*size_exceeded } else { // For cold files, compact regardless of generation, // as long as size limit is not exceeded. @@ -301,6 +306,7 @@ impl<'a> From<&'a ParquetConfig> for CompactionAlgorithm { fn from(config: &'a ParquetConfig) -> Self { CompactionAlgorithm { cooldown_duration: config.compactor.algorithm.cooldown_duration.clone().into(), + immediate_compaction: config.compactor.algorithm.immediate_compaction, target_partition_size: SegmentSizeLimit::from(&config.target_size), eager_compaction_limit: SegmentSizeLimit::from( &config.compactor.algorithm.eager_compaction_limit, diff --git a/crates/core/worker-core/src/config.rs b/crates/core/worker-core/src/config.rs index 23cea9519..39178f40d 100644 --- a/crates/core/worker-core/src/config.rs +++ b/crates/core/worker-core/src/config.rs @@ -107,6 +107,8 @@ impl Default for CompactorConfig { pub struct CompactionAlgorithmConfig { /// Base cooldown duration in seconds (default: 1024.0) pub cooldown_duration: ConfigDuration<1024>, + /// Allow compaction of recently created segments (default: false) + pub immediate_compaction: bool, /// Eager compaction limits pub eager_compaction_limit: SizeLimitConfig, } @@ -115,6 +117,7 @@ impl Default for CompactionAlgorithmConfig { fn default() -> Self { Self { cooldown_duration: ConfigDuration::default(), + immediate_compaction: false, eager_compaction_limit: SizeLimitConfig { bytes: 0, ..Default::default() diff --git a/docs/config.sample.toml b/docs/config.sample.toml index a0784dfb9..39a2678b3 100644 --- a/docs/config.sample.toml +++ b/docs/config.sample.toml @@ -94,6 +94,7 @@ url = "postgres://" # write_concurrency = 2 # Max concurrent compaction write operations (default: 2) # min_interval = 1.0 # Interval in seconds to run the compactor (default: 1.0) # cooldown_duration = 1024.0 # Base cooldown duration in seconds (default: 1024.0) +# immediate_compaction = false # Allow compaction of recently created segments (default: false) # overflow = "1" # Eager compaction overflow (default: "1") # bytes = 0 # Eager compaction byte threshold (default: 0) # rows = 0 # Eager compaction row threshold (default: 0) diff --git a/docs/config/ampd.spec.json b/docs/config/ampd.spec.json index 650923348..68d12a5d1 100644 --- a/docs/config/ampd.spec.json +++ b/docs/config/ampd.spec.json @@ -143,6 +143,11 @@ "description": "Base cooldown duration in seconds between compaction runs (default: 1024).", "$ref": "#/$defs/ConfigDuration" }, + "immediate_compaction": { + "description": "Allow compaction of recently created segments (default: false).\nWhen false, segments must wait for the cooldown period before compaction.\nWhen true, segments can be compacted immediately with same-generation segments.", + "type": "boolean", + "default": false + }, "metadata_concurrency": { "description": "Maximum concurrent metadata operations (default: 2).", "type": "integer",