Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/config/src/worker_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ impl From<&CompactorConfig> for amp_worker_core::CompactorConfig {
pub struct CompactionAlgorithmConfig {
/// Base cooldown duration in seconds between compaction runs (default: 1024).
pub cooldown_duration: ConfigDuration<1024>,
/// Allow compaction of recently created segments (default: false).
/// When false, segments must wait for the cooldown period before compaction.
/// When true, segments can be compacted immediately with same-generation segments.
pub immediate_compaction: bool,
/// Eager compaction limits (flattened fields: `overflow`, `bytes`, `rows`).
#[serde(
flatten,
Expand All @@ -309,6 +313,7 @@ impl Default for CompactionAlgorithmConfig {
fn default() -> Self {
Self {
cooldown_duration: ConfigDuration::default(),
immediate_compaction: false,
eager_compaction_limit: SizeLimitConfig::default_eager_limit(),
}
}
Expand All @@ -318,6 +323,7 @@ impl From<&CompactionAlgorithmConfig> for amp_worker_core::CompactionAlgorithmCo
fn from(config: &CompactionAlgorithmConfig) -> Self {
Self {
cooldown_duration: (&config.cooldown_duration).into(),
immediate_compaction: config.immediate_compaction,
eager_compaction_limit: (&config.eager_compaction_limit).into(),
}
}
Expand Down
10 changes: 8 additions & 2 deletions crates/core/worker-core/src/compaction/algorithm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ use crate::{
/// ## Fields
/// - `cooldown_duration`: The base duration used to calculate
/// the cooldown period for files based on their generation.
/// - `immediate_compaction`: Whether to allow compaction of recently created segments.
/// - `target_partition_size`: The upper bound for segment size limits.
/// Files exceeding this limit will not be compacted together. This
/// value must be non-unbounded.
Expand All @@ -170,6 +171,8 @@ pub struct CompactionAlgorithm {
/// The amount of time a file must wait before it can be
/// compacted with files of different generations.
pub cooldown_duration: Duration,
/// Whether to allow compaction of recently created segments.
pub immediate_compaction: bool,
/// The upper bound for segment size limits. Files exceeding this limit
/// will not be compacted together. This value must be non-unbounded.
pub target_partition_size: SegmentSizeLimit,
Expand Down Expand Up @@ -217,7 +220,7 @@ impl CompactionAlgorithm {
let is_hot = self.is_hot(segment);

match is_hot {
TestResult::Skipped => FileState::Hot,
TestResult::Skipped => FileState::Cold,
TestResult::Activated(true) => FileState::Hot,
TestResult::Activated(false) => FileState::Cold,
}
Expand Down Expand Up @@ -246,7 +249,9 @@ impl CompactionAlgorithm {
} else if state == FileState::Hot {
// For hot files, only compact if size limit is not exceeded,
// and both files share the same generation.
group.size.generation == candidate.size.generation && !*size_exceeded
self.immediate_compaction
&& group.size.generation == candidate.size.generation
&& !*size_exceeded
} else {
// For cold files, compact regardless of generation,
// as long as size limit is not exceeded.
Expand Down Expand Up @@ -301,6 +306,7 @@ impl<'a> From<&'a ParquetConfig> for CompactionAlgorithm {
fn from(config: &'a ParquetConfig) -> Self {
CompactionAlgorithm {
cooldown_duration: config.compactor.algorithm.cooldown_duration.clone().into(),
immediate_compaction: config.compactor.algorithm.immediate_compaction,
target_partition_size: SegmentSizeLimit::from(&config.target_size),
eager_compaction_limit: SegmentSizeLimit::from(
&config.compactor.algorithm.eager_compaction_limit,
Expand Down
3 changes: 3 additions & 0 deletions crates/core/worker-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ impl Default for CompactorConfig {
pub struct CompactionAlgorithmConfig {
/// Base cooldown duration in seconds (default: 1024.0)
pub cooldown_duration: ConfigDuration<1024>,
/// Allow compaction of recently created segments (default: false)
pub immediate_compaction: bool,
/// Eager compaction limits
pub eager_compaction_limit: SizeLimitConfig,
}
Expand All @@ -115,6 +117,7 @@ impl Default for CompactionAlgorithmConfig {
fn default() -> Self {
Self {
cooldown_duration: ConfigDuration::default(),
immediate_compaction: false,
eager_compaction_limit: SizeLimitConfig {
bytes: 0,
..Default::default()
Expand Down
1 change: 1 addition & 0 deletions docs/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ url = "postgres://<pg_url>"
# write_concurrency = 2 # Max concurrent compaction write operations (default: 2)
# min_interval = 1.0 # Interval in seconds to run the compactor (default: 1.0)
# cooldown_duration = 1024.0 # Base cooldown duration in seconds (default: 1024.0)
# immediate_compaction = false # Allow compaction of recently created segments (default: false)
# overflow = "1" # Eager compaction overflow (default: "1")
# bytes = 0 # Eager compaction byte threshold (default: 0)
# rows = 0 # Eager compaction row threshold (default: 0)
Expand Down
5 changes: 5 additions & 0 deletions docs/config/ampd.spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@
"description": "Base cooldown duration in seconds between compaction runs (default: 1024).",
"$ref": "#/$defs/ConfigDuration"
},
"immediate_compaction": {
"description": "Allow compaction of recently created segments (default: false).\nWhen false, segments must wait for the cooldown period before compaction.\nWhen true, segments can be compacted immediately with same-generation segments.",
"type": "boolean",
"default": false
},
"metadata_concurrency": {
"description": "Maximum concurrent metadata operations (default: 2).",
"type": "integer",
Expand Down