Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 58 additions & 13 deletions arrow-rs/catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct Config {
#[serde(with = "humantime_serde")]
pub checkpoint_interval: Duration,
pub retain: Retention,
pub retention: CatalogRetention,
#[serde(default = "Catalog::default_headroom")]
pub headroom: ByteSize,
pub partition: partition::Config,
Expand All @@ -46,6 +47,7 @@ impl Default for Config {
Self {
checkpoint_interval: Duration::from_millis(1000),
retain: Default::default(),
retention: CatalogRetention::default(),
headroom: Catalog::default_headroom(),
partition: Default::default(),
storage: Default::default(),
Expand All @@ -55,6 +57,42 @@ impl Default for Config {
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum CatalogRetention {
Fixed(Retention),
RootMountTotal,
}

impl CatalogRetention {
async fn resolve(&self, root: &Path) -> anyhow::Result<Retention> {
match self {
Self::Fixed(r) => Ok(r.clone()),
Self::RootMountTotal => {
let mount_size = storage::path_mount_stat(root.to_owned()).await?.total;
info!(?root, %mount_size, "resolved size for retention");

Ok(Retention {
max_bytes: mount_size,
..Default::default()
})
}
}
}

fn prior_default() -> Retention {
Retention {
max_bytes: ByteSize::gib(99),
..Default::default()
}
}
}

impl Default for CatalogRetention {
fn default() -> Self {
Self::Fixed(Self::prior_default())
}
}

#[derive(Debug)]
#[must_use = "close() explicitly to flush writes"]
pub struct Catalog {
Expand All @@ -73,7 +111,7 @@ struct State {
}

impl Catalog {
pub async fn attach(root: PathBuf, config: Config) -> anyhow::Result<Self> {
pub async fn attach(root: PathBuf, mut config: Config) -> anyhow::Result<Self> {
let manifest = Manifest::current_prior_attach(
root.join("manifest.sqlite"),
root.join("manifest.json"),
Expand All @@ -87,6 +125,14 @@ impl Catalog {
}
Self::migrate_topics(&manifest, &root, &topic_root).await?;

if config.retain != CatalogRetention::prior_default() {
warn!(
"catalog.config.retain is obsolete and has no effect. see catalog.config.retention"
)
}

config.retain = config.retention.resolve(&root).await?;

Ok(Self::attach_v0(manifest, root, topic_root, config).await)
}

Expand Down Expand Up @@ -195,14 +241,7 @@ impl Catalog {

#[tracing::instrument(skip_all, level = "debug")]
pub async fn gauge_topics(&self) {
// collect a list of active topic names and then drop the read lock in
// case the `get_size` calls below block for a substantial amount of
// time
let names: Vec<String> = {
let topics = &self.state.read().await.topics;
topics.keys().cloned().collect()
};

let names = self.list_topics().await;
for name in names {
let bytes = self.manifest.get_size(Scope::Topic(&name)).await;
let labels = [("topic", name)];
Expand Down Expand Up @@ -275,7 +314,13 @@ impl Catalog {
}

pub fn total_byte_limit(&self) -> ByteSize {
self.config.retain.max_bytes + self.config.headroom
ByteSize(
self.config
.retain
.max_bytes
.0
.saturating_sub(self.config.headroom.0),
)
}

async fn over_retention_limit(&self) -> bool {
Expand Down Expand Up @@ -549,8 +594,8 @@ mod test {
#[test_log::test(tokio::test)]
async fn test_retain() -> Result<()> {
let (_root, mut catalog) = catalog().await;
catalog.config.retain.max_bytes = ByteSize::b(8000);
catalog.config.headroom = ByteSize::b(catalog.manifest.db_bytes() as u64);
catalog.config.retain.max_bytes = ByteSize::b(8000 + catalog.manifest.db_bytes() as u64);
catalog.config.headroom = ByteSize::b(0);

let data = "x".to_string().repeat(500);

Expand Down Expand Up @@ -587,7 +632,7 @@ mod test {
Ok(())
}

#[test(tokio::test)]
#[test_log::test(tokio::test)]
async fn test_max_open_topics() -> Result<()> {
let (_root, catalog) = catalog_config(Config {
max_open_topics: 1,
Expand Down
17 changes: 10 additions & 7 deletions arrow-rs/catalog/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ const MAX_WRITES: usize = 10_000;
const MAX_DURATION: Duration = Duration::from_secs(10);
const MIN_AVAILABLE: ByteSize = ByteSize::gb(1);

pub(crate) async fn path_mount_stat(path: PathBuf) -> anyhow::Result<systemstat::Filesystem> {
let stat = System::new();
spawn_blocking(move || {
let fs = stat.mount_at(path)?;
Ok::<_, anyhow::Error>(fs)
})
.await?
}

/// Periodically monitors the disk space available for logs
/// and switches its state between read-only and writable
/// based on configured thresholds.
Expand Down Expand Up @@ -80,13 +89,7 @@ impl DiskMonitor {
if self.write_count.load(Ordering::SeqCst) >= config.max_writes
|| self.last_write.load(Ordering::SeqCst) < deadline
{
let stat = System::new();
let path = path.clone();
let avail = spawn_blocking(move || {
let fs = stat.mount_at(path)?;
Ok::<_, anyhow::Error>(fs.avail)
})
.await??;
let avail = path_mount_stat(path.clone()).await?.avail;

self.readonly
.store(avail < config.min_available, Ordering::SeqCst);
Expand Down
2 changes: 1 addition & 1 deletion arrow-rs/data/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};

use std::time::Duration;

#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct Retention {
pub max_segment_count: Option<usize>,
Expand Down
Loading