From 3cde72a8fe6eb3c0564bdf691da0d42b97e4c8b9 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Wed, 17 Jun 2026 16:29:20 +0200 Subject: [PATCH 1/6] WIP: Update ocidir-rs to latest commit and follow-on issues Latest ocidir-rs has the oci-archive support. Unfortunately it also updates the oci-spec dependency to 0.10, so we need to bump oci-spec to 0.10, and containers-image-proxy to latest main commit to sync the oci-spec version everywhere. This is marked WIP because these should really get releases and we should bump dependencies to that release version. Signed-off-by: Alexander Larsson --- Cargo.toml | 3 ++- crates/composefs-oci/Cargo.toml | 2 +- crates/composefs-oci/src/oci_layout.rs | 1 + crates/composefs-storage/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 077c9461..9d2d1ec9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,8 @@ composefs-oci = { version = "0.4.0", path = "crates/composefs-oci", default-feat composefs-boot = { version = "0.4.0", path = "crates/composefs-boot", default-features = false } composefs-http = { version = "0.4.0", path = "crates/composefs-http", default-features = false } cap-std-ext = "5.1.2" -ocidir = "0.7.2" +containers-image-proxy = { git = "https://github.com/containers/containers-image-proxy-rs", rev = "e339fc1c", default-features = false } +ocidir = { git = "https://github.com/containers/ocidir-rs", rev = "e137648a" } # JSON-RPC with FD passing for userns helper jsonrpc-fdpass = { version = "0.1.0", default-features = false } diff --git a/crates/composefs-oci/Cargo.toml b/crates/composefs-oci/Cargo.toml index 9c136bd7..e04e3567 100644 --- a/crates/composefs-oci/Cargo.toml +++ b/crates/composefs-oci/Cargo.toml @@ -27,7 +27,7 @@ composefs = { workspace = true } zlink-core = { workspace = true, optional = true } composefs-boot = { workspace = true, optional = true } flate2 = { version = "1.0", default-features = false, features = ["zlib"] } -containers-image-proxy = { version = "0.10", default-features = false } +containers-image-proxy = { workspace = true } cstorage = { package = "composefs-storage", path = "../composefs-storage", version = "0.4.0", optional = true } hex = { version = "0.4.0", default-features = false } indicatif = { version = "0.17.0", default-features = false } diff --git a/crates/composefs-oci/src/oci_layout.rs b/crates/composefs-oci/src/oci_layout.rs index 9ff37446..c5a6d2e8 100644 --- a/crates/composefs-oci/src/oci_layout.rs +++ b/crates/composefs-oci/src/oci_layout.rs @@ -24,6 +24,7 @@ use anyhow::{Context, Result}; use cap_std_ext::cap_std; use containers_image_proxy::oci_spec::image::{Descriptor, Digest as OciDigest, MediaType}; use fn_error_context::context; +use ocidir::prelude::*; use ocidir::{OciDir, ResolvedManifest}; use tokio::sync::Semaphore; use tokio::task::JoinSet; diff --git a/crates/composefs-storage/Cargo.toml b/crates/composefs-storage/Cargo.toml index 904551a5..51a1c91a 100644 --- a/crates/composefs-storage/Cargo.toml +++ b/crates/composefs-storage/Cargo.toml @@ -18,7 +18,7 @@ cap-std-ext = { version = "4.0", default-features = false } crc = { version = "3.0", default-features = false } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } jsonrpc-fdpass = { workspace = true, optional = true } -oci-spec = { version = "0.9", default-features = false, features = ["image"] } +oci-spec = { version = "0.10", default-features = false, features = ["image"] } rustix = { version = "1.0", default-features = false, features = ["fs", "std", "process", "thread"] } serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } From 853578a32aca54ff19bb531d4ccc546c83b1dc2d Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Wed, 17 Jun 2026 16:39:47 +0200 Subject: [PATCH 2/6] deltas: Address delta objects using a Descriptor, not just a digest This will be useful later when we start using OciDir for this, as it needs the full descriptor, including the size. Signed-off-by: Alexander Larsson Assisted-by: Claude Code (Opus 4.6) --- crates/composefs-oci/src/delta.rs | 59 +++++++++++--------------- crates/composefs-oci/src/oci_layout.rs | 3 +- crates/composefs-oci/src/skopeo.rs | 25 +++-------- 3 files changed, 33 insertions(+), 54 deletions(-) diff --git a/crates/composefs-oci/src/delta.rs b/crates/composefs-oci/src/delta.rs index 361b9b09..27e82b9d 100644 --- a/crates/composefs-oci/src/delta.rs +++ b/crates/composefs-oci/src/delta.rs @@ -22,7 +22,7 @@ use composefs::fsverity::FsVerityHashValue; use composefs::repository::Repository; use composefs::tree::RegularFile; use containers_image_proxy::oci_spec::image::{ - Digest as OciDigest, DigestAlgorithm, ImageConfiguration, ImageManifest, MediaType, + Descriptor, Digest as OciDigest, DigestAlgorithm, ImageConfiguration, ImageManifest, MediaType, }; use tokio::sync::Semaphore; @@ -66,7 +66,7 @@ pub(crate) trait DeltaBlobReader: Send + Sync { /// this fetches the blob to a local temp file first. fn open_blob( &self, - digest: &OciDigest, + desc: &Descriptor, ) -> Pin> + Send + '_>>; } @@ -385,15 +385,14 @@ fn reconstruct_layer( // ─── Delta manifest parsing ───────────────────────────────────────────────── struct DeltaLayer { - blob_digest: OciDigest, - media_type: MediaType, + descriptor: Descriptor, } struct ParsedDelta { target_manifest: ImageManifest, - target_manifest_digest: OciDigest, + target_manifest_descriptor: Descriptor, target_manifest_raw: Vec, - target_config_digest: OciDigest, + target_config_descriptor: Descriptor, target_config_raw: Vec, source_config_digest: OciDigest, delta_layer_by_to: HashMap, @@ -416,8 +415,8 @@ async fn parse_delta_manifest( .parse() .context("Invalid source config digest")?; - let mut target_manifest_digest = None; - let mut target_config_digest = None; + let mut target_manifest_descriptor = None; + let mut target_config_descriptor = None; let mut delta_layer_by_to = HashMap::new(); for layer in delta_manifest.layers() { @@ -430,10 +429,10 @@ async fn parse_delta_manifest( match content { "image-manifest" => { - target_manifest_digest = Some(layer.digest().clone()); + target_manifest_descriptor = Some(layer.clone()); } "image-config" => { - target_config_digest = Some(layer.digest().clone()); + target_config_descriptor = Some(layer.clone()); } "image-layer" => { if let Some(to_str) = layer_annotations @@ -445,8 +444,7 @@ async fn parse_delta_manifest( delta_layer_by_to.insert( to_digest, DeltaLayer { - blob_digest: layer.digest().clone(), - media_type: layer.media_type().clone(), + descriptor: layer.clone(), }, ); } @@ -455,14 +453,14 @@ async fn parse_delta_manifest( } } - let target_manifest_digest = - target_manifest_digest.context("Delta manifest has no embedded image manifest")?; - let target_config_digest = - target_config_digest.context("Delta manifest has no embedded image config")?; + let target_manifest_descriptor = + target_manifest_descriptor.context("Delta manifest has no embedded image manifest")?; + let target_config_descriptor = + target_config_descriptor.context("Delta manifest has no embedded image config")?; let mut target_manifest_raw = Vec::new(); blob_reader - .open_blob(&target_manifest_digest) + .open_blob(&target_manifest_descriptor) .await .context("Fetching embedded image manifest")? .read_to_end(&mut target_manifest_raw)?; @@ -471,7 +469,7 @@ async fn parse_delta_manifest( let mut target_config_raw = Vec::new(); blob_reader - .open_blob(&target_config_digest) + .open_blob(&target_config_descriptor) .await .context("Fetching embedded image config")? .read_to_end(&mut target_config_raw)?; @@ -481,9 +479,9 @@ async fn parse_delta_manifest( Ok(ParsedDelta { target_manifest, - target_manifest_digest, + target_manifest_descriptor, target_manifest_raw, - target_config_digest, + target_config_descriptor, target_config_raw, source_config_digest, delta_layer_by_to, @@ -507,8 +505,8 @@ pub(crate) async fn import_delta( ) -> Result<(PullResult, ImportStats)> { let parsed = parse_delta_manifest(delta_manifest, &*blob_reader).await?; - let manifest_digest = &parsed.target_manifest_digest; - let config_digest = &parsed.target_config_digest; + let manifest_digest = parsed.target_manifest_descriptor.digest(); + let config_digest = parsed.target_config_descriptor.digest(); // Check if the target image already exists if let Some(manifest_verity) = oci_image::has_manifest(repo, manifest_digest)? { @@ -615,8 +613,7 @@ pub(crate) async fn import_delta( } let diff_id = diff_id.clone(); - let blob_digest = dl.blob_digest.clone(); - let media_type = dl.media_type.clone(); + let descriptor = dl.descriptor.clone(); let repo = Arc::clone(repo); let source_image = Arc::clone(&source_image); let blob_reader = Arc::clone(&blob_reader); @@ -628,16 +625,17 @@ pub(crate) async fn import_delta( layer_tasks.spawn(async move { let _permit = permit; - let blob_file = blob_reader - .open_blob(&blob_digest) + let blob = blob_reader + .open_blob(&descriptor) .await .with_context(|| format!("Fetching delta blob for layer {diff_id}"))?; + let media_type = descriptor.media_type().clone(); let reconstructed = tokio::task::spawn_blocking({ let diff_id = diff_id.clone(); let repo = Arc::clone(&repo); move || -> Result { - reconstruct_layer(&repo, &source_image, blob_file, &media_type, &diff_id) + reconstruct_layer(&repo, &source_image, blob, &media_type, &diff_id) .with_context(|| format!("Reconstructing layer {diff_id}")) } }) @@ -740,13 +738,6 @@ pub(crate) async fn import_delta( )) } -/// Return references to all layer descriptors in a delta manifest. -pub(crate) fn delta_layer_descriptors( - manifest: &ImageManifest, -) -> &[containers_image_proxy::oci_spec::image::Descriptor] { - manifest.layers() -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/composefs-oci/src/oci_layout.rs b/crates/composefs-oci/src/oci_layout.rs index c5a6d2e8..5d14a118 100644 --- a/crates/composefs-oci/src/oci_layout.rs +++ b/crates/composefs-oci/src/oci_layout.rs @@ -418,9 +418,10 @@ struct OciDirBlobReader(OciDir); impl crate::delta::DeltaBlobReader for OciDirBlobReader { fn open_blob( &self, - digest: &OciDigest, + desc: &Descriptor, ) -> std::pin::Pin> + Send + '_>> { + let digest = desc.digest(); let blob_path = format!("blobs/{}/{}", digest.algorithm(), digest.digest()); Box::pin(std::future::ready( self.0 diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index 39ea1391..b35e48a8 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -514,15 +514,8 @@ impl ImageOp { self.reporter .report(ProgressEvent::Message("Detected delta artifact...".into())); - let descriptors: std::collections::HashMap<_, _> = - crate::delta::delta_layer_descriptors(manifest) - .iter() - .map(|d| (d.digest().clone(), d.clone())) - .collect(); - let blob_reader = Arc::new(ProxyBlobReader { image_op: Arc::clone(self), - descriptors, }); // Limit to 2 concurrent layers: download the next while applying the previous, @@ -534,27 +527,21 @@ impl ImageOp { /// Blob reader that fetches from a skopeo image proxy on demand. struct ProxyBlobReader { image_op: Arc>, - descriptors: - std::collections::HashMap, } impl crate::delta::DeltaBlobReader for ProxyBlobReader { fn open_blob( &self, - digest: &OciDigest, - ) -> std::pin::Pin> + Send + '_>> - { - let digest = digest.clone(); + desc: &Descriptor, + ) -> std::pin::Pin< + Box> + Send + '_>, + > { + let desc = desc.clone(); Box::pin(async move { - let desc = self - .descriptors - .get(&digest) - .with_context(|| format!("No descriptor for blob {digest}"))?; - let (reader, driver) = self .image_op .proxy - .get_blob(&self.image_op.img, &digest, desc.size()) + .get_blob(&self.image_op.img, desc.digest(), desc.size()) .await?; let tmpfile = self From 0f727406aee1e71635fd9a10b19e111127ef4887 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Wed, 17 Jun 2026 17:10:36 +0200 Subject: [PATCH 3/6] oci: Add BlockingReader helper class This is similar to tokio::fs::File, but just converts a sync Read to an AsyncRead, doing each actual read in spawn_blocking. We will need this to handle async reading from the OciDir reader we get from a oci-archive, as that is not a fs::File. Signed-off-by: Alexander Larsson Assisted-by: Claude Code (Opus 4.6) --- crates/composefs-oci/src/oci_layout.rs | 83 ++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/crates/composefs-oci/src/oci_layout.rs b/crates/composefs-oci/src/oci_layout.rs index 5d14a118..4a63fd34 100644 --- a/crates/composefs-oci/src/oci_layout.rs +++ b/crates/composefs-oci/src/oci_layout.rs @@ -17,7 +17,9 @@ use std::cmp::Reverse; use std::collections::HashMap; use std::io::Read; use std::path::Path; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context as TaskContext, Poll}; use std::thread::available_parallelism; use anyhow::{Context, Result}; @@ -42,6 +44,87 @@ use crate::{ImportStats, config_identifier, layer_identifier}; use crate::skopeo::PullResult; +const READ_BUF_SIZE: usize = 128 * 1024; + +/// Adapts a synchronous `Read` into a `tokio::io::AsyncRead` by dispatching +/// reads to [`tokio::task::spawn_blocking`], similar to `tokio::fs::File`. +struct BlockingReader { + reader: Option>, + pending: + Option, std::io::Result, Vec)>>, + buf: Vec, + pos: usize, +} + +impl BlockingReader { + fn new(reader: Box) -> Self { + Self { + reader: Some(reader), + pending: None, + buf: Vec::new(), + pos: 0, + } + } +} + +impl tokio::io::AsyncRead for BlockingReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + // Drain buffered data first. + if self.pos < self.buf.len() { + let remaining = &self.buf[self.pos..]; + let n = remaining.len().min(buf.remaining()); + buf.put_slice(&remaining[..n]); + self.pos += n; + return Poll::Ready(Ok(())); + } + + // Poll an in-flight blocking read. + if let Some(handle) = &mut self.pending { + let (reader, result, data) = match Pin::new(handle).poll(cx) { + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(e)) => { + self.pending = None; + return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))); + } + Poll::Pending => return Poll::Pending, + }; + self.pending = None; + self.reader = Some(reader); + match result { + Ok(n) => { + self.buf = data; + self.buf.truncate(n); + self.pos = 0; + if n == 0 { + return Poll::Ready(Ok(())); + } + let copy = n.min(buf.remaining()); + buf.put_slice(&self.buf[..copy]); + self.pos = copy; + return Poll::Ready(Ok(())); + } + Err(e) => return Poll::Ready(Err(e)), + } + } + + // Spawn a new blocking read, reusing the existing buffer. + let mut reader = self.reader.take().expect("reader taken without pending"); + let mut data = std::mem::take(&mut self.buf); + let handle = tokio::task::spawn_blocking(move || { + data.resize(READ_BUF_SIZE, 0); + let result = reader.read(&mut data); + (reader, result, data) + }); + self.pending = Some(handle); + cx.waker().wake_by_ref(); + Poll::Pending + } +} + /// Parse an OCI layout reference like "/path/to/dir:tag" or "/path/to/dir". /// /// Returns (path, optional_tag). From 0725a9d294fab8eae0425a6d76090ebe22298e52 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Wed, 17 Jun 2026 17:56:35 +0200 Subject: [PATCH 4/6] oci pull: Use OciDir also for oci-archives This applies to both pulling deltas and regular images and uses OciArchive::open() if the source is a file. Also, we have to convert the code to be more generic on the source data stream, as it is no longer always fs::File. We use the new BlockingReader helper to convert the sync Read to an async stream. Signed-off-by: Alexander Larsson --- crates/composefs-oci/src/delta.rs | 13 ++- crates/composefs-oci/src/oci_layout.rs | 137 ++++++++++++++----------- crates/composefs-oci/src/skopeo.rs | 10 +- 3 files changed, 93 insertions(+), 67 deletions(-) diff --git a/crates/composefs-oci/src/delta.rs b/crates/composefs-oci/src/delta.rs index 27e82b9d..15d6194b 100644 --- a/crates/composefs-oci/src/delta.rs +++ b/crates/composefs-oci/src/delta.rs @@ -67,7 +67,7 @@ pub(crate) trait DeltaBlobReader: Send + Sync { fn open_blob( &self, desc: &Descriptor, - ) -> Pin> + Send + '_>>; + ) -> Pin>> + Send + '_>>; } /// Check whether an OCI manifest is a delta artifact. @@ -326,7 +326,10 @@ fn tar_patch_apply( /// Reconstruct a single layer's uncompressed tar from a delta blob. /// Returns a seeked-to-start temp file with diff_id already verified. -fn decompress_layer(reader: File, media_type: &MediaType) -> Result> { +fn decompress_layer( + reader: impl Read + Send + 'static, + media_type: &MediaType, +) -> Result> { let buf = BufReader::new(reader); match media_type { MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => Ok(Box::new(buf)), @@ -343,7 +346,7 @@ fn decompress_layer(reader: File, media_type: &MediaType) -> Result( repo: &Repository, source_image: &Arc>, - blob_file: File, + blob_reader: impl Read + Send + 'static, media_type: &MediaType, expected_diff_id: &OciDigest, ) -> Result { @@ -362,9 +365,9 @@ fn reconstruct_layer( inner: &mut tmpfile, hasher: &mut hasher, }; - tar_patch_apply(blob_file, &mut data_source, &mut hashing_writer)?; + tar_patch_apply(blob_reader, &mut data_source, &mut hashing_writer)?; } else { - let mut decoder = decompress_layer(blob_file, media_type)?; + let mut decoder = decompress_layer(blob_reader, media_type)?; let mut hashing_writer = HashingWriter { inner: &mut tmpfile, hasher: &mut hasher, diff --git a/crates/composefs-oci/src/oci_layout.rs b/crates/composefs-oci/src/oci_layout.rs index 4a63fd34..39728e85 100644 --- a/crates/composefs-oci/src/oci_layout.rs +++ b/crates/composefs-oci/src/oci_layout.rs @@ -24,10 +24,12 @@ use std::thread::available_parallelism; use anyhow::{Context, Result}; use cap_std_ext::cap_std; -use containers_image_proxy::oci_spec::image::{Descriptor, Digest as OciDigest, MediaType}; +use containers_image_proxy::oci_spec::image::{ + Descriptor, Digest as OciDigest, ImageManifest, MediaType, +}; use fn_error_context::context; use ocidir::prelude::*; -use ocidir::{OciDir, ResolvedManifest}; +use ocidir::{OciArchive, OciDir, ResolvedManifest}; use tokio::sync::Semaphore; use tokio::task::JoinSet; use tracing::debug; @@ -125,6 +127,21 @@ impl tokio::io::AsyncRead for BlockingReader { } } +/// Check if an OCI layout contains a single delta artifact manifest. +fn detect_delta_manifest(oci: &impl OciRead) -> Result> { + let index = oci.read_index()?; + if index.manifests().len() == 1 { + let desc = &index.manifests()[0]; + let mut manifest_data = Vec::new(); + oci.read_blob(desc)?.read_to_end(&mut manifest_data)?; + let manifest = ImageManifest::from_reader(&manifest_data[..])?; + if crate::delta::is_delta_artifact(&manifest) { + return Ok(Some(manifest)); + } + } + Ok(None) +} + /// Parse an OCI layout reference like "/path/to/dir:tag" or "/path/to/dir". /// /// Returns (path, optional_tag). @@ -147,13 +164,15 @@ pub(crate) fn parse_oci_layout_ref(imgref: &str) -> (&str, Option<&str>) { } /// Resolve a manifest from an OCI layout directory for the current platform. -fn resolve_manifest(ocidir: &OciDir, tag: Option<&str>) -> Result { - ocidir - .open_image_this_platform(tag) +fn resolve_manifest( + oci: &T, + tag: Option<&str>, +) -> Result { + oci.open_image_this_platform(tag) .context("Resolving manifest for platform") } -/// Import an image from a local OCI layout directory. +/// Import an image from a local OCI layout directory or archive. /// /// This is the fast path for `oci:` transport references. It reads the OCI /// layout directly without going through skopeo. Progress events are emitted @@ -170,33 +189,40 @@ pub async fn import_oci_layout( // a clear "not writable" error rather than a misleading source-open error. repo.ensure_writable()?; - // Open the OCI layout directory - let dir = cap_std::fs::Dir::open_ambient_dir(layout_path, cap_std::ambient_authority()) - .with_context(|| format!("Opening OCI layout directory {}", layout_path.display()))?; - let ocidir = OciDir::open(dir).context("Opening OCI directory")?; - - // Check for delta artifact before platform resolution (deltas lack - // platform info and would fail the platform filter). Only check - // single-manifest layouts since deltas are always single-manifest. - if let Ok(index) = ocidir.read_index() - && index.manifests().len() == 1 - { - let desc = &index.manifests()[0]; - let mut manifest_data = Vec::new(); - ocidir.read_blob(desc)?.read_to_end(&mut manifest_data)?; - let manifest = containers_image_proxy::oci_spec::image::ImageManifest::from_reader( - &manifest_data[..], - )?; - if crate::delta::is_delta_artifact(&manifest) { - let blob_reader = Arc::new(OciDirBlobReader(ocidir)); + // Open the OCI layout directory or archive + if layout_path.is_file() { + let oci = OciArchive::open(layout_path) + .with_context(|| format!("Opening OCI archive {}", layout_path.display()))?; + if let Some(manifest) = detect_delta_manifest(&oci)? { + let blob_reader = Arc::new(OwnedBlobReader(oci)); let (result, stats) = crate::delta::import_delta(repo, &manifest, blob_reader, &reporter, None).await?; return Ok((result, stats)); } + import_oci_image(repo, &oci, layout_tag, reporter).await + } else { + let dir = cap_std::fs::Dir::open_ambient_dir(layout_path, cap_std::ambient_authority()) + .with_context(|| format!("Opening OCI layout directory {}", layout_path.display()))?; + let oci = OciDir::open(dir).context("Opening OCI directory")?; + if let Some(manifest) = detect_delta_manifest(&oci)? { + let blob_reader = Arc::new(OwnedBlobReader(oci)); + let (result, stats) = + crate::delta::import_delta(repo, &manifest, blob_reader, &reporter, None).await?; + return Ok((result, stats)); + } + import_oci_image(repo, &oci, layout_tag, reporter).await } +} +/// Generic import from any [`OciRead`] backend (non-delta path). +async fn import_oci_image( + repo: &Arc>, + oci: &T, + tag: Option<&str>, + reporter: SharedReporter, +) -> Result<(PullResult, ImportStats)> { // Resolve the manifest, with fallback for images lacking platform annotations - let resolved = resolve_manifest(&ocidir, layout_tag)?; + let resolved = resolve_manifest(oci, tag)?; let manifest = resolved.manifest; let manifest_descriptor = &resolved.manifest_descriptor; @@ -210,7 +236,7 @@ pub async fn import_oci_layout( layers.len() ))); let (config_digest, config_verity, layer_refs, stats) = - import_config_and_layers(repo, &ocidir, layers, config_descriptor, &reporter) + import_config_and_layers(repo, oci, layers, config_descriptor, &reporter) .await .with_context(|| format!("Failed to import config {}", config_descriptor.digest()))?; @@ -235,8 +261,7 @@ pub async fn import_oci_layout( } let mut raw_manifest = Vec::with_capacity(manifest_descriptor.size() as usize); - ocidir - .read_blob(manifest_descriptor) + oci.read_blob(manifest_descriptor) .context("Reading raw manifest bytes")? .read_to_end(&mut raw_manifest)?; splitstream.write_external(&raw_manifest)?; @@ -259,14 +284,14 @@ pub async fn import_oci_layout( /// Returns (config_digest, config_verity, layer_refs, stats). /// `layer_refs` is an ordered Vec of (diff_id, verity) pairs preserving the /// order from the config (or manifest for artifacts). -async fn import_config_and_layers( +async fn import_config_and_layers( repo: &Arc>, - ocidir: &OciDir, + oci: &T, manifest_layers: &[Descriptor], config_descriptor: &Descriptor, reporter: &SharedReporter, ) -> Result<(OciDigest, ObjectID, Vec<(OciDigest, ObjectID)>, ImportStats)> { - let config_digest: OciDigest = config_descriptor.digest().clone(); + let config_digest = config_descriptor.digest().clone(); let content_id = config_identifier(&config_digest); if let Some(config_id) = repo.has_stream(&content_id)? { @@ -320,8 +345,7 @@ async fn import_config_and_layers( // and parse diff_ids from the same buffer via as_slice(). debug!("Reading config {config_digest}"); let mut raw_config = Vec::with_capacity(config_descriptor.size() as usize); - ocidir - .read_blob(config_descriptor) + oci.read_blob(config_descriptor) .context("Reading config blob")? .read_to_end(&mut raw_config)?; let diff_ids = crate::extract_diff_ids( @@ -344,19 +368,20 @@ async fn import_config_and_layers( let permit = Arc::clone(&sem).acquire_owned().await?; let reporter = Arc::clone(reporter); - let layer_file = ocidir - .read_blob(descriptor) - .with_context(|| format!("Opening layer blob {}", descriptor.digest()))?; + let layer_reader: Box = Box::new( + oci.read_blob(descriptor) + .with_context(|| format!("Opening layer blob {}", descriptor.digest()))?, + ); let media_type = descriptor.media_type().clone(); let layer_size = descriptor.size(); layer_tasks.spawn(async move { let _permit = permit; - let (verity, layer_stats) = import_layer_from_file( + let (verity, layer_stats) = import_layer_from_blob( &repo, &diff_id, - layer_file, + layer_reader, &media_type, layer_size, &reporter, @@ -404,13 +429,13 @@ async fn import_config_and_layers( Ok((config_digest, config_id, layer_refs, stats)) } -/// Import a single layer by streaming from a file handle. +/// Import a single layer by streaming from a blob reader. /// /// Emits `Started`/`Done` (or `Skipped`) progress events via `reporter`. -async fn import_layer_from_file( +async fn import_layer_from_blob( repo: &Arc>, diff_id: &OciDigest, - layer_file: std::fs::File, + layer_reader: Box, media_type: &MediaType, layer_size: u64, reporter: &SharedReporter, @@ -438,7 +463,7 @@ async fn import_layer_from_file( // The watch channel provides backpressure: if the renderer is slow, intermediate // byte counts are coalesced rather than queued, keeping the I/O path non-blocking. let (async_file, progress_driver) = ProgressRead::new( - tokio::fs::File::from_std(layer_file), + BlockingReader::new(layer_reader), Arc::clone(reporter), id.clone(), Some(layer_size), @@ -495,24 +520,20 @@ async fn import_layer_from_file( Ok((object_id, layer_stats)) } -/// Blob reader backed by an OCI layout directory. -struct OciDirBlobReader(OciDir); +/// Blob reader that owns an [`OciRead`] backend, for use with delta imports. +struct OwnedBlobReader(T); -impl crate::delta::DeltaBlobReader for OciDirBlobReader { +impl crate::delta::DeltaBlobReader for OwnedBlobReader { fn open_blob( &self, desc: &Descriptor, - ) -> std::pin::Pin> + Send + '_>> - { - let digest = desc.digest(); - let blob_path = format!("blobs/{}/{}", digest.algorithm(), digest.digest()); - Box::pin(std::future::ready( - self.0 - .dir() - .open(&blob_path) - .map(|f| f.into_std()) - .with_context(|| format!("Opening blob {digest} from OCI layout")), - )) + ) -> Pin>> + Send + '_>> { + let result = self + .0 + .read_blob(desc) + .map(|r| Box::new(r) as Box) + .with_context(|| format!("Reading blob {}", desc.digest())); + Box::pin(std::future::ready(result)) } } @@ -554,7 +575,7 @@ mod tests { async fn test_wrong_platform_rejected() { use cap_std_ext::cap_std; use composefs::fsverity::Sha256HashValue; - use containers_image_proxy::oci_spec::image::{ + use ocidir::oci_spec::image::{ Arch, ConfigBuilder, ImageConfigurationBuilder, Os, PlatformBuilder, RootFsBuilder, }; diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index b35e48a8..66009c04 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -534,7 +534,7 @@ impl crate::delta::DeltaBlobReader for ProxyBlobRea &self, desc: &Descriptor, ) -> std::pin::Pin< - Box> + Send + '_>, + Box>> + Send + '_>, > { let desc = desc.clone(); Box::pin(async move { @@ -556,7 +556,7 @@ impl crate::delta::DeltaBlobReader for ProxyBlobRea let mut std_file = async_dst.into_std().await; use std::io::Seek; std_file.seek(std::io::SeekFrom::Start(0))?; - anyhow::Ok(std_file) + anyhow::Ok(Box::new(std_file) as Box) }; let (file_result, driver_result) = tokio::join!(copy_fut, driver); let _: () = driver_result?; @@ -588,8 +588,10 @@ pub async fn pull_image( let image_ref = ImageReference::try_from(imgref).context("Parsing image reference transport")?; - // Fast path: read local OCI layout directories directly without skopeo - let (result, stats) = if image_ref.transport == Transport::OciDir { + // Fast path: read local OCI layout directories and archives directly without skopeo + let (result, stats) = if image_ref.transport == Transport::OciDir + || image_ref.transport == Transport::OciArchive + { let (path_str, layout_tag) = crate::oci_layout::parse_oci_layout_ref(&image_ref.name); let layout_path = std::path::Path::new(path_str); crate::oci_layout::import_oci_layout(repo, layout_path, layout_tag, reporter).await? From 231a0695153b1b8fb58a6979c31e4a4a7a82c248 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Thu, 18 Jun 2026 15:03:39 +0200 Subject: [PATCH 5/6] oci pull: Replace BlockingRead with tokio-utils helpers We now create a single blocking thread and move the entire stream to it, we then use tokio::io::duplex() stream that we copy the read to, and a SyncIoBridge to create an AsyncRead from this stream. Signed-off-by: Alexander Larsson --- crates/composefs-oci/Cargo.toml | 2 +- crates/composefs-oci/src/oci_layout.rs | 91 ++++---------------------- 2 files changed, 13 insertions(+), 80 deletions(-) diff --git a/crates/composefs-oci/Cargo.toml b/crates/composefs-oci/Cargo.toml index e04e3567..7dd1b0d7 100644 --- a/crates/composefs-oci/Cargo.toml +++ b/crates/composefs-oci/Cargo.toml @@ -40,7 +40,7 @@ rand = { version = "0.10.0", default-features = false, optional = true } tar = { version = "0.4.38", default-features = false, optional = true } tar-core = "0.1.0" tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread"] } -tokio-util = { version = "0.7", default-features = false, features = ["io"] } +tokio-util = { version = "0.7", default-features = false, features = ["io", "io-util"] } tracing = { version = "0.1", default-features = false } zstd = { version = "0.13.0", default-features = false } cap-std-ext = { workspace = true } diff --git a/crates/composefs-oci/src/oci_layout.rs b/crates/composefs-oci/src/oci_layout.rs index 39728e85..4ba5b30b 100644 --- a/crates/composefs-oci/src/oci_layout.rs +++ b/crates/composefs-oci/src/oci_layout.rs @@ -19,7 +19,6 @@ use std::io::Read; use std::path::Path; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context as TaskContext, Poll}; use std::thread::available_parallelism; use anyhow::{Context, Result}; @@ -30,8 +29,10 @@ use containers_image_proxy::oci_spec::image::{ use fn_error_context::context; use ocidir::prelude::*; use ocidir::{OciArchive, OciDir, ResolvedManifest}; +use tokio::io::DuplexStream; use tokio::sync::Semaphore; use tokio::task::JoinSet; +use tokio_util::io::SyncIoBridge; use tracing::debug; use composefs::fsverity::FsVerityHashValue; @@ -48,83 +49,15 @@ use crate::skopeo::PullResult; const READ_BUF_SIZE: usize = 128 * 1024; -/// Adapts a synchronous `Read` into a `tokio::io::AsyncRead` by dispatching -/// reads to [`tokio::task::spawn_blocking`], similar to `tokio::fs::File`. -struct BlockingReader { - reader: Option>, - pending: - Option, std::io::Result, Vec)>>, - buf: Vec, - pos: usize, -} - -impl BlockingReader { - fn new(reader: Box) -> Self { - Self { - reader: Some(reader), - pending: None, - buf: Vec::new(), - pos: 0, - } - } -} - -impl tokio::io::AsyncRead for BlockingReader { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut TaskContext<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - // Drain buffered data first. - if self.pos < self.buf.len() { - let remaining = &self.buf[self.pos..]; - let n = remaining.len().min(buf.remaining()); - buf.put_slice(&remaining[..n]); - self.pos += n; - return Poll::Ready(Ok(())); - } - - // Poll an in-flight blocking read. - if let Some(handle) = &mut self.pending { - let (reader, result, data) = match Pin::new(handle).poll(cx) { - Poll::Ready(Ok(v)) => v, - Poll::Ready(Err(e)) => { - self.pending = None; - return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))); - } - Poll::Pending => return Poll::Pending, - }; - self.pending = None; - self.reader = Some(reader); - match result { - Ok(n) => { - self.buf = data; - self.buf.truncate(n); - self.pos = 0; - if n == 0 { - return Poll::Ready(Ok(())); - } - let copy = n.min(buf.remaining()); - buf.put_slice(&self.buf[..copy]); - self.pos = copy; - return Poll::Ready(Ok(())); - } - Err(e) => return Poll::Ready(Err(e)), - } - } - - // Spawn a new blocking read, reusing the existing buffer. - let mut reader = self.reader.take().expect("reader taken without pending"); - let mut data = std::mem::take(&mut self.buf); - let handle = tokio::task::spawn_blocking(move || { - data.resize(READ_BUF_SIZE, 0); - let result = reader.read(&mut data); - (reader, result, data) - }); - self.pending = Some(handle); - cx.waker().wake_by_ref(); - Poll::Pending - } +/// Adapts a synchronous `Read` into a [`tokio::io::AsyncRead`] by copying data +/// through a [`tokio::io::duplex`] channel from a single blocking thread. +fn blocking_reader(mut reader: Box) -> DuplexStream { + let (async_read, async_write) = tokio::io::duplex(READ_BUF_SIZE); + tokio::task::spawn_blocking(move || { + let mut writer = SyncIoBridge::new(async_write); + std::io::copy(&mut reader, &mut writer) + }); + async_read } /// Check if an OCI layout contains a single delta artifact manifest. @@ -463,7 +396,7 @@ async fn import_layer_from_blob( // The watch channel provides backpressure: if the renderer is slow, intermediate // byte counts are coalesced rather than queued, keeping the I/O path non-blocking. let (async_file, progress_driver) = ProgressRead::new( - BlockingReader::new(layer_reader), + blocking_reader(layer_reader), Arc::clone(reporter), id.clone(), Some(layer_size), From a9577de0818a6d247b1a372548dc5926e9a66b81 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Thu, 18 Jun 2026 15:26:00 +0200 Subject: [PATCH 6/6] oci: Add a type alias for the oci Read type Signed-off-by: Alexander Larsson --- crates/composefs-oci/src/delta.rs | 5 +++-- crates/composefs-oci/src/oci_layout.rs | 13 ++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/crates/composefs-oci/src/delta.rs b/crates/composefs-oci/src/delta.rs index 15d6194b..2add6dfd 100644 --- a/crates/composefs-oci/src/delta.rs +++ b/crates/composefs-oci/src/delta.rs @@ -29,6 +29,7 @@ use tokio::sync::Semaphore; use tokio::task::JoinSet; use crate::oci_image; +use crate::oci_layout::OciBlobReader; use crate::progress::{ComponentId, ProgressEvent, ProgressUnit, SharedReporter}; use crate::skopeo::PullResult; use crate::{ImportStats, layer_identifier}; @@ -67,7 +68,7 @@ pub(crate) trait DeltaBlobReader: Send + Sync { fn open_blob( &self, desc: &Descriptor, - ) -> Pin>> + Send + '_>>; + ) -> Pin> + Send + '_>>; } /// Check whether an OCI manifest is a delta artifact. @@ -329,7 +330,7 @@ fn tar_patch_apply( fn decompress_layer( reader: impl Read + Send + 'static, media_type: &MediaType, -) -> Result> { +) -> Result { let buf = BufReader::new(reader); match media_type { MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => Ok(Box::new(buf)), diff --git a/crates/composefs-oci/src/oci_layout.rs b/crates/composefs-oci/src/oci_layout.rs index 4ba5b30b..f6ea6b18 100644 --- a/crates/composefs-oci/src/oci_layout.rs +++ b/crates/composefs-oci/src/oci_layout.rs @@ -49,9 +49,12 @@ use crate::skopeo::PullResult; const READ_BUF_SIZE: usize = 128 * 1024; +/// A boxed synchronous byte reader that can be sent across threads. +pub(crate) type OciBlobReader = Box; + /// Adapts a synchronous `Read` into a [`tokio::io::AsyncRead`] by copying data /// through a [`tokio::io::duplex`] channel from a single blocking thread. -fn blocking_reader(mut reader: Box) -> DuplexStream { +fn blocking_reader(mut reader: OciBlobReader) -> DuplexStream { let (async_read, async_write) = tokio::io::duplex(READ_BUF_SIZE); tokio::task::spawn_blocking(move || { let mut writer = SyncIoBridge::new(async_write); @@ -301,7 +304,7 @@ async fn import_config_and_layers = Box::new( + let layer_reader: OciBlobReader = Box::new( oci.read_blob(descriptor) .with_context(|| format!("Opening layer blob {}", descriptor.digest()))?, ); @@ -368,7 +371,7 @@ async fn import_config_and_layers( repo: &Arc>, diff_id: &OciDigest, - layer_reader: Box, + layer_reader: OciBlobReader, media_type: &MediaType, layer_size: u64, reporter: &SharedReporter, @@ -460,11 +463,11 @@ impl crate::delta::DeltaBlobReader for OwnedBlobReader fn open_blob( &self, desc: &Descriptor, - ) -> Pin>> + Send + '_>> { + ) -> Pin> + Send + '_>> { let result = self .0 .read_blob(desc) - .map(|r| Box::new(r) as Box) + .map(|r| Box::new(r) as OciBlobReader) .with_context(|| format!("Reading blob {}", desc.digest())); Box::pin(std::future::ready(result)) }