diff --git a/arrow-rs/catalog/src/catalog.rs b/arrow-rs/catalog/src/catalog.rs index 6b0bf4a..127bb96 100644 --- a/arrow-rs/catalog/src/catalog.rs +++ b/arrow-rs/catalog/src/catalog.rs @@ -32,6 +32,7 @@ pub struct Config { #[serde(with = "humantime_serde")] pub checkpoint_interval: Duration, pub retain: Retention, + pub retention: CatalogRetention, #[serde(default = "Catalog::default_headroom")] pub headroom: ByteSize, pub partition: partition::Config, @@ -46,6 +47,7 @@ impl Default for Config { Self { checkpoint_interval: Duration::from_millis(1000), retain: Default::default(), + retention: CatalogRetention::default(), headroom: Catalog::default_headroom(), partition: Default::default(), storage: Default::default(), @@ -55,6 +57,42 @@ impl Default for Config { } } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub enum CatalogRetention { + Fixed(Retention), + RootMountTotal, +} + +impl CatalogRetention { + async fn resolve(&self, root: &Path) -> anyhow::Result { + match self { + Self::Fixed(r) => Ok(r.clone()), + Self::RootMountTotal => { + let mount_size = storage::path_mount_stat(root.to_owned()).await?.total; + info!(?root, %mount_size, "resolved size for retention"); + + Ok(Retention { + max_bytes: mount_size, + ..Default::default() + }) + } + } + } + + fn prior_default() -> Retention { + Retention { + max_bytes: ByteSize::gib(99), + ..Default::default() + } + } +} + +impl Default for CatalogRetention { + fn default() -> Self { + Self::Fixed(Self::prior_default()) + } +} + #[derive(Debug)] #[must_use = "close() explicitly to flush writes"] pub struct Catalog { @@ -73,7 +111,7 @@ struct State { } impl Catalog { - pub async fn attach(root: PathBuf, config: Config) -> anyhow::Result { + pub async fn attach(root: PathBuf, mut config: Config) -> anyhow::Result { let manifest = Manifest::current_prior_attach( root.join("manifest.sqlite"), root.join("manifest.json"), @@ -87,6 +125,14 @@ impl Catalog { } Self::migrate_topics(&manifest, &root, &topic_root).await?; + if config.retain != CatalogRetention::prior_default() { + warn!( + "catalog.config.retain is obsolete and has no effect. see catalog.config.retention" + ) + } + + config.retain = config.retention.resolve(&root).await?; + Ok(Self::attach_v0(manifest, root, topic_root, config).await) } @@ -195,14 +241,7 @@ impl Catalog { #[tracing::instrument(skip_all, level = "debug")] pub async fn gauge_topics(&self) { - // collect a list of active topic names and then drop the read lock in - // case the `get_size` calls below block for a substantial amount of - // time - let names: Vec = { - let topics = &self.state.read().await.topics; - topics.keys().cloned().collect() - }; - + let names = self.list_topics().await; for name in names { let bytes = self.manifest.get_size(Scope::Topic(&name)).await; let labels = [("topic", name)]; @@ -275,7 +314,13 @@ impl Catalog { } pub fn total_byte_limit(&self) -> ByteSize { - self.config.retain.max_bytes + self.config.headroom + ByteSize( + self.config + .retain + .max_bytes + .0 + .saturating_sub(self.config.headroom.0), + ) } async fn over_retention_limit(&self) -> bool { @@ -549,8 +594,8 @@ mod test { #[test_log::test(tokio::test)] async fn test_retain() -> Result<()> { let (_root, mut catalog) = catalog().await; - catalog.config.retain.max_bytes = ByteSize::b(8000); - catalog.config.headroom = ByteSize::b(catalog.manifest.db_bytes() as u64); + catalog.config.retain.max_bytes = ByteSize::b(8000 + catalog.manifest.db_bytes() as u64); + catalog.config.headroom = ByteSize::b(0); let data = "x".to_string().repeat(500); @@ -587,7 +632,7 @@ mod test { Ok(()) } - #[test(tokio::test)] + #[test_log::test(tokio::test)] async fn test_max_open_topics() -> Result<()> { let (_root, catalog) = catalog_config(Config { max_open_topics: 1, diff --git a/arrow-rs/catalog/src/storage.rs b/arrow-rs/catalog/src/storage.rs index 2861c91..1532398 100644 --- a/arrow-rs/catalog/src/storage.rs +++ b/arrow-rs/catalog/src/storage.rs @@ -18,6 +18,15 @@ const MAX_WRITES: usize = 10_000; const MAX_DURATION: Duration = Duration::from_secs(10); const MIN_AVAILABLE: ByteSize = ByteSize::gb(1); +pub(crate) async fn path_mount_stat(path: PathBuf) -> anyhow::Result { + let stat = System::new(); + spawn_blocking(move || { + let fs = stat.mount_at(path)?; + Ok::<_, anyhow::Error>(fs) + }) + .await? +} + /// Periodically monitors the disk space available for logs /// and switches its state between read-only and writable /// based on configured thresholds. @@ -80,13 +89,7 @@ impl DiskMonitor { if self.write_count.load(Ordering::SeqCst) >= config.max_writes || self.last_write.load(Ordering::SeqCst) < deadline { - let stat = System::new(); - let path = path.clone(); - let avail = spawn_blocking(move || { - let fs = stat.mount_at(path)?; - Ok::<_, anyhow::Error>(fs.avail) - }) - .await??; + let avail = path_mount_stat(path.clone()).await?.avail; self.readonly .store(avail < config.min_available, Ordering::SeqCst); diff --git a/arrow-rs/data/src/limit.rs b/arrow-rs/data/src/limit.rs index 5791808..1cf4876 100644 --- a/arrow-rs/data/src/limit.rs +++ b/arrow-rs/data/src/limit.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default)] pub struct Retention { pub max_segment_count: Option,