diff --git a/Cargo.toml b/Cargo.toml index 077c9461..9d2d1ec9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,8 @@ composefs-oci = { version = "0.4.0", path = "crates/composefs-oci", default-feat composefs-boot = { version = "0.4.0", path = "crates/composefs-boot", default-features = false } composefs-http = { version = "0.4.0", path = "crates/composefs-http", default-features = false } cap-std-ext = "5.1.2" -ocidir = "0.7.2" +containers-image-proxy = { git = "https://github.com/containers/containers-image-proxy-rs", rev = "e339fc1c", default-features = false } +ocidir = { git = "https://github.com/containers/ocidir-rs", rev = "e137648a" } # JSON-RPC with FD passing for userns helper jsonrpc-fdpass = { version = "0.1.0", default-features = false } diff --git a/crates/composefs-oci/Cargo.toml b/crates/composefs-oci/Cargo.toml index 9c136bd7..7dd1b0d7 100644 --- a/crates/composefs-oci/Cargo.toml +++ b/crates/composefs-oci/Cargo.toml @@ -27,7 +27,7 @@ composefs = { workspace = true } zlink-core = { workspace = true, optional = true } composefs-boot = { workspace = true, optional = true } flate2 = { version = "1.0", default-features = false, features = ["zlib"] } -containers-image-proxy = { version = "0.10", default-features = false } +containers-image-proxy = { workspace = true } cstorage = { package = "composefs-storage", path = "../composefs-storage", version = "0.4.0", optional = true } hex = { version = "0.4.0", default-features = false } indicatif = { version = "0.17.0", default-features = false } @@ -40,7 +40,7 @@ rand = { version = "0.10.0", default-features = false, optional = true } tar = { version = "0.4.38", default-features = false, optional = true } tar-core = "0.1.0" tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread"] } -tokio-util = { version = "0.7", default-features = false, features = ["io"] } +tokio-util = { version = "0.7", default-features = false, features = ["io", "io-util"] } tracing = { version = "0.1", default-features = false } zstd = { version = "0.13.0", default-features = false } cap-std-ext = { workspace = true } diff --git a/crates/composefs-oci/src/delta.rs b/crates/composefs-oci/src/delta.rs index 361b9b09..2add6dfd 100644 --- a/crates/composefs-oci/src/delta.rs +++ b/crates/composefs-oci/src/delta.rs @@ -22,13 +22,14 @@ use composefs::fsverity::FsVerityHashValue; use composefs::repository::Repository; use composefs::tree::RegularFile; use containers_image_proxy::oci_spec::image::{ - Digest as OciDigest, DigestAlgorithm, ImageConfiguration, ImageManifest, MediaType, + Descriptor, Digest as OciDigest, DigestAlgorithm, ImageConfiguration, ImageManifest, MediaType, }; use tokio::sync::Semaphore; use tokio::task::JoinSet; use crate::oci_image; +use crate::oci_layout::OciBlobReader; use crate::progress::{ComponentId, ProgressEvent, ProgressUnit, SharedReporter}; use crate::skopeo::PullResult; use crate::{ImportStats, layer_identifier}; @@ -66,8 +67,8 @@ pub(crate) trait DeltaBlobReader: Send + Sync { /// this fetches the blob to a local temp file first. fn open_blob( &self, - digest: &OciDigest, - ) -> Pin> + Send + '_>>; + desc: &Descriptor, + ) -> Pin> + Send + '_>>; } /// Check whether an OCI manifest is a delta artifact. @@ -326,7 +327,10 @@ fn tar_patch_apply( /// Reconstruct a single layer's uncompressed tar from a delta blob. /// Returns a seeked-to-start temp file with diff_id already verified. -fn decompress_layer(reader: File, media_type: &MediaType) -> Result> { +fn decompress_layer( + reader: impl Read + Send + 'static, + media_type: &MediaType, +) -> Result { let buf = BufReader::new(reader); match media_type { MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => Ok(Box::new(buf)), @@ -343,7 +347,7 @@ fn decompress_layer(reader: File, media_type: &MediaType) -> Result( repo: &Repository, source_image: &Arc>, - blob_file: File, + blob_reader: impl Read + Send + 'static, media_type: &MediaType, expected_diff_id: &OciDigest, ) -> Result { @@ -362,9 +366,9 @@ fn reconstruct_layer( inner: &mut tmpfile, hasher: &mut hasher, }; - tar_patch_apply(blob_file, &mut data_source, &mut hashing_writer)?; + tar_patch_apply(blob_reader, &mut data_source, &mut hashing_writer)?; } else { - let mut decoder = decompress_layer(blob_file, media_type)?; + let mut decoder = decompress_layer(blob_reader, media_type)?; let mut hashing_writer = HashingWriter { inner: &mut tmpfile, hasher: &mut hasher, @@ -385,15 +389,14 @@ fn reconstruct_layer( // ─── Delta manifest parsing ───────────────────────────────────────────────── struct DeltaLayer { - blob_digest: OciDigest, - media_type: MediaType, + descriptor: Descriptor, } struct ParsedDelta { target_manifest: ImageManifest, - target_manifest_digest: OciDigest, + target_manifest_descriptor: Descriptor, target_manifest_raw: Vec, - target_config_digest: OciDigest, + target_config_descriptor: Descriptor, target_config_raw: Vec, source_config_digest: OciDigest, delta_layer_by_to: HashMap, @@ -416,8 +419,8 @@ async fn parse_delta_manifest( .parse() .context("Invalid source config digest")?; - let mut target_manifest_digest = None; - let mut target_config_digest = None; + let mut target_manifest_descriptor = None; + let mut target_config_descriptor = None; let mut delta_layer_by_to = HashMap::new(); for layer in delta_manifest.layers() { @@ -430,10 +433,10 @@ async fn parse_delta_manifest( match content { "image-manifest" => { - target_manifest_digest = Some(layer.digest().clone()); + target_manifest_descriptor = Some(layer.clone()); } "image-config" => { - target_config_digest = Some(layer.digest().clone()); + target_config_descriptor = Some(layer.clone()); } "image-layer" => { if let Some(to_str) = layer_annotations @@ -445,8 +448,7 @@ async fn parse_delta_manifest( delta_layer_by_to.insert( to_digest, DeltaLayer { - blob_digest: layer.digest().clone(), - media_type: layer.media_type().clone(), + descriptor: layer.clone(), }, ); } @@ -455,14 +457,14 @@ async fn parse_delta_manifest( } } - let target_manifest_digest = - target_manifest_digest.context("Delta manifest has no embedded image manifest")?; - let target_config_digest = - target_config_digest.context("Delta manifest has no embedded image config")?; + let target_manifest_descriptor = + target_manifest_descriptor.context("Delta manifest has no embedded image manifest")?; + let target_config_descriptor = + target_config_descriptor.context("Delta manifest has no embedded image config")?; let mut target_manifest_raw = Vec::new(); blob_reader - .open_blob(&target_manifest_digest) + .open_blob(&target_manifest_descriptor) .await .context("Fetching embedded image manifest")? .read_to_end(&mut target_manifest_raw)?; @@ -471,7 +473,7 @@ async fn parse_delta_manifest( let mut target_config_raw = Vec::new(); blob_reader - .open_blob(&target_config_digest) + .open_blob(&target_config_descriptor) .await .context("Fetching embedded image config")? .read_to_end(&mut target_config_raw)?; @@ -481,9 +483,9 @@ async fn parse_delta_manifest( Ok(ParsedDelta { target_manifest, - target_manifest_digest, + target_manifest_descriptor, target_manifest_raw, - target_config_digest, + target_config_descriptor, target_config_raw, source_config_digest, delta_layer_by_to, @@ -507,8 +509,8 @@ pub(crate) async fn import_delta( ) -> Result<(PullResult, ImportStats)> { let parsed = parse_delta_manifest(delta_manifest, &*blob_reader).await?; - let manifest_digest = &parsed.target_manifest_digest; - let config_digest = &parsed.target_config_digest; + let manifest_digest = parsed.target_manifest_descriptor.digest(); + let config_digest = parsed.target_config_descriptor.digest(); // Check if the target image already exists if let Some(manifest_verity) = oci_image::has_manifest(repo, manifest_digest)? { @@ -615,8 +617,7 @@ pub(crate) async fn import_delta( } let diff_id = diff_id.clone(); - let blob_digest = dl.blob_digest.clone(); - let media_type = dl.media_type.clone(); + let descriptor = dl.descriptor.clone(); let repo = Arc::clone(repo); let source_image = Arc::clone(&source_image); let blob_reader = Arc::clone(&blob_reader); @@ -628,16 +629,17 @@ pub(crate) async fn import_delta( layer_tasks.spawn(async move { let _permit = permit; - let blob_file = blob_reader - .open_blob(&blob_digest) + let blob = blob_reader + .open_blob(&descriptor) .await .with_context(|| format!("Fetching delta blob for layer {diff_id}"))?; + let media_type = descriptor.media_type().clone(); let reconstructed = tokio::task::spawn_blocking({ let diff_id = diff_id.clone(); let repo = Arc::clone(&repo); move || -> Result { - reconstruct_layer(&repo, &source_image, blob_file, &media_type, &diff_id) + reconstruct_layer(&repo, &source_image, blob, &media_type, &diff_id) .with_context(|| format!("Reconstructing layer {diff_id}")) } }) @@ -740,13 +742,6 @@ pub(crate) async fn import_delta( )) } -/// Return references to all layer descriptors in a delta manifest. -pub(crate) fn delta_layer_descriptors( - manifest: &ImageManifest, -) -> &[containers_image_proxy::oci_spec::image::Descriptor] { - manifest.layers() -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/composefs-oci/src/oci_layout.rs b/crates/composefs-oci/src/oci_layout.rs index 9ff37446..f6ea6b18 100644 --- a/crates/composefs-oci/src/oci_layout.rs +++ b/crates/composefs-oci/src/oci_layout.rs @@ -17,16 +17,22 @@ use std::cmp::Reverse; use std::collections::HashMap; use std::io::Read; use std::path::Path; +use std::pin::Pin; use std::sync::Arc; use std::thread::available_parallelism; use anyhow::{Context, Result}; use cap_std_ext::cap_std; -use containers_image_proxy::oci_spec::image::{Descriptor, Digest as OciDigest, MediaType}; +use containers_image_proxy::oci_spec::image::{ + Descriptor, Digest as OciDigest, ImageManifest, MediaType, +}; use fn_error_context::context; -use ocidir::{OciDir, ResolvedManifest}; +use ocidir::prelude::*; +use ocidir::{OciArchive, OciDir, ResolvedManifest}; +use tokio::io::DuplexStream; use tokio::sync::Semaphore; use tokio::task::JoinSet; +use tokio_util::io::SyncIoBridge; use tracing::debug; use composefs::fsverity::FsVerityHashValue; @@ -41,6 +47,37 @@ use crate::{ImportStats, config_identifier, layer_identifier}; use crate::skopeo::PullResult; +const READ_BUF_SIZE: usize = 128 * 1024; + +/// A boxed synchronous byte reader that can be sent across threads. +pub(crate) type OciBlobReader = Box; + +/// Adapts a synchronous `Read` into a [`tokio::io::AsyncRead`] by copying data +/// through a [`tokio::io::duplex`] channel from a single blocking thread. +fn blocking_reader(mut reader: OciBlobReader) -> DuplexStream { + let (async_read, async_write) = tokio::io::duplex(READ_BUF_SIZE); + tokio::task::spawn_blocking(move || { + let mut writer = SyncIoBridge::new(async_write); + std::io::copy(&mut reader, &mut writer) + }); + async_read +} + +/// Check if an OCI layout contains a single delta artifact manifest. +fn detect_delta_manifest(oci: &impl OciRead) -> Result> { + let index = oci.read_index()?; + if index.manifests().len() == 1 { + let desc = &index.manifests()[0]; + let mut manifest_data = Vec::new(); + oci.read_blob(desc)?.read_to_end(&mut manifest_data)?; + let manifest = ImageManifest::from_reader(&manifest_data[..])?; + if crate::delta::is_delta_artifact(&manifest) { + return Ok(Some(manifest)); + } + } + Ok(None) +} + /// Parse an OCI layout reference like "/path/to/dir:tag" or "/path/to/dir". /// /// Returns (path, optional_tag). @@ -63,13 +100,15 @@ pub(crate) fn parse_oci_layout_ref(imgref: &str) -> (&str, Option<&str>) { } /// Resolve a manifest from an OCI layout directory for the current platform. -fn resolve_manifest(ocidir: &OciDir, tag: Option<&str>) -> Result { - ocidir - .open_image_this_platform(tag) +fn resolve_manifest( + oci: &T, + tag: Option<&str>, +) -> Result { + oci.open_image_this_platform(tag) .context("Resolving manifest for platform") } -/// Import an image from a local OCI layout directory. +/// Import an image from a local OCI layout directory or archive. /// /// This is the fast path for `oci:` transport references. It reads the OCI /// layout directly without going through skopeo. Progress events are emitted @@ -86,33 +125,40 @@ pub async fn import_oci_layout( // a clear "not writable" error rather than a misleading source-open error. repo.ensure_writable()?; - // Open the OCI layout directory - let dir = cap_std::fs::Dir::open_ambient_dir(layout_path, cap_std::ambient_authority()) - .with_context(|| format!("Opening OCI layout directory {}", layout_path.display()))?; - let ocidir = OciDir::open(dir).context("Opening OCI directory")?; - - // Check for delta artifact before platform resolution (deltas lack - // platform info and would fail the platform filter). Only check - // single-manifest layouts since deltas are always single-manifest. - if let Ok(index) = ocidir.read_index() - && index.manifests().len() == 1 - { - let desc = &index.manifests()[0]; - let mut manifest_data = Vec::new(); - ocidir.read_blob(desc)?.read_to_end(&mut manifest_data)?; - let manifest = containers_image_proxy::oci_spec::image::ImageManifest::from_reader( - &manifest_data[..], - )?; - if crate::delta::is_delta_artifact(&manifest) { - let blob_reader = Arc::new(OciDirBlobReader(ocidir)); + // Open the OCI layout directory or archive + if layout_path.is_file() { + let oci = OciArchive::open(layout_path) + .with_context(|| format!("Opening OCI archive {}", layout_path.display()))?; + if let Some(manifest) = detect_delta_manifest(&oci)? { + let blob_reader = Arc::new(OwnedBlobReader(oci)); + let (result, stats) = + crate::delta::import_delta(repo, &manifest, blob_reader, &reporter, None).await?; + return Ok((result, stats)); + } + import_oci_image(repo, &oci, layout_tag, reporter).await + } else { + let dir = cap_std::fs::Dir::open_ambient_dir(layout_path, cap_std::ambient_authority()) + .with_context(|| format!("Opening OCI layout directory {}", layout_path.display()))?; + let oci = OciDir::open(dir).context("Opening OCI directory")?; + if let Some(manifest) = detect_delta_manifest(&oci)? { + let blob_reader = Arc::new(OwnedBlobReader(oci)); let (result, stats) = crate::delta::import_delta(repo, &manifest, blob_reader, &reporter, None).await?; return Ok((result, stats)); } + import_oci_image(repo, &oci, layout_tag, reporter).await } +} +/// Generic import from any [`OciRead`] backend (non-delta path). +async fn import_oci_image( + repo: &Arc>, + oci: &T, + tag: Option<&str>, + reporter: SharedReporter, +) -> Result<(PullResult, ImportStats)> { // Resolve the manifest, with fallback for images lacking platform annotations - let resolved = resolve_manifest(&ocidir, layout_tag)?; + let resolved = resolve_manifest(oci, tag)?; let manifest = resolved.manifest; let manifest_descriptor = &resolved.manifest_descriptor; @@ -126,7 +172,7 @@ pub async fn import_oci_layout( layers.len() ))); let (config_digest, config_verity, layer_refs, stats) = - import_config_and_layers(repo, &ocidir, layers, config_descriptor, &reporter) + import_config_and_layers(repo, oci, layers, config_descriptor, &reporter) .await .with_context(|| format!("Failed to import config {}", config_descriptor.digest()))?; @@ -151,8 +197,7 @@ pub async fn import_oci_layout( } let mut raw_manifest = Vec::with_capacity(manifest_descriptor.size() as usize); - ocidir - .read_blob(manifest_descriptor) + oci.read_blob(manifest_descriptor) .context("Reading raw manifest bytes")? .read_to_end(&mut raw_manifest)?; splitstream.write_external(&raw_manifest)?; @@ -175,14 +220,14 @@ pub async fn import_oci_layout( /// Returns (config_digest, config_verity, layer_refs, stats). /// `layer_refs` is an ordered Vec of (diff_id, verity) pairs preserving the /// order from the config (or manifest for artifacts). -async fn import_config_and_layers( +async fn import_config_and_layers( repo: &Arc>, - ocidir: &OciDir, + oci: &T, manifest_layers: &[Descriptor], config_descriptor: &Descriptor, reporter: &SharedReporter, ) -> Result<(OciDigest, ObjectID, Vec<(OciDigest, ObjectID)>, ImportStats)> { - let config_digest: OciDigest = config_descriptor.digest().clone(); + let config_digest = config_descriptor.digest().clone(); let content_id = config_identifier(&config_digest); if let Some(config_id) = repo.has_stream(&content_id)? { @@ -236,8 +281,7 @@ async fn import_config_and_layers( // and parse diff_ids from the same buffer via as_slice(). debug!("Reading config {config_digest}"); let mut raw_config = Vec::with_capacity(config_descriptor.size() as usize); - ocidir - .read_blob(config_descriptor) + oci.read_blob(config_descriptor) .context("Reading config blob")? .read_to_end(&mut raw_config)?; let diff_ids = crate::extract_diff_ids( @@ -260,19 +304,20 @@ async fn import_config_and_layers( let permit = Arc::clone(&sem).acquire_owned().await?; let reporter = Arc::clone(reporter); - let layer_file = ocidir - .read_blob(descriptor) - .with_context(|| format!("Opening layer blob {}", descriptor.digest()))?; + let layer_reader: OciBlobReader = Box::new( + oci.read_blob(descriptor) + .with_context(|| format!("Opening layer blob {}", descriptor.digest()))?, + ); let media_type = descriptor.media_type().clone(); let layer_size = descriptor.size(); layer_tasks.spawn(async move { let _permit = permit; - let (verity, layer_stats) = import_layer_from_file( + let (verity, layer_stats) = import_layer_from_blob( &repo, &diff_id, - layer_file, + layer_reader, &media_type, layer_size, &reporter, @@ -320,13 +365,13 @@ async fn import_config_and_layers( Ok((config_digest, config_id, layer_refs, stats)) } -/// Import a single layer by streaming from a file handle. +/// Import a single layer by streaming from a blob reader. /// /// Emits `Started`/`Done` (or `Skipped`) progress events via `reporter`. -async fn import_layer_from_file( +async fn import_layer_from_blob( repo: &Arc>, diff_id: &OciDigest, - layer_file: std::fs::File, + layer_reader: OciBlobReader, media_type: &MediaType, layer_size: u64, reporter: &SharedReporter, @@ -354,7 +399,7 @@ async fn import_layer_from_file( // The watch channel provides backpressure: if the renderer is slow, intermediate // byte counts are coalesced rather than queued, keeping the I/O path non-blocking. let (async_file, progress_driver) = ProgressRead::new( - tokio::fs::File::from_std(layer_file), + blocking_reader(layer_reader), Arc::clone(reporter), id.clone(), Some(layer_size), @@ -411,23 +456,20 @@ async fn import_layer_from_file( Ok((object_id, layer_stats)) } -/// Blob reader backed by an OCI layout directory. -struct OciDirBlobReader(OciDir); +/// Blob reader that owns an [`OciRead`] backend, for use with delta imports. +struct OwnedBlobReader(T); -impl crate::delta::DeltaBlobReader for OciDirBlobReader { +impl crate::delta::DeltaBlobReader for OwnedBlobReader { fn open_blob( &self, - digest: &OciDigest, - ) -> std::pin::Pin> + Send + '_>> - { - let blob_path = format!("blobs/{}/{}", digest.algorithm(), digest.digest()); - Box::pin(std::future::ready( - self.0 - .dir() - .open(&blob_path) - .map(|f| f.into_std()) - .with_context(|| format!("Opening blob {digest} from OCI layout")), - )) + desc: &Descriptor, + ) -> Pin> + Send + '_>> { + let result = self + .0 + .read_blob(desc) + .map(|r| Box::new(r) as OciBlobReader) + .with_context(|| format!("Reading blob {}", desc.digest())); + Box::pin(std::future::ready(result)) } } @@ -469,7 +511,7 @@ mod tests { async fn test_wrong_platform_rejected() { use cap_std_ext::cap_std; use composefs::fsverity::Sha256HashValue; - use containers_image_proxy::oci_spec::image::{ + use ocidir::oci_spec::image::{ Arch, ConfigBuilder, ImageConfigurationBuilder, Os, PlatformBuilder, RootFsBuilder, }; diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index 39ea1391..66009c04 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -514,15 +514,8 @@ impl ImageOp { self.reporter .report(ProgressEvent::Message("Detected delta artifact...".into())); - let descriptors: std::collections::HashMap<_, _> = - crate::delta::delta_layer_descriptors(manifest) - .iter() - .map(|d| (d.digest().clone(), d.clone())) - .collect(); - let blob_reader = Arc::new(ProxyBlobReader { image_op: Arc::clone(self), - descriptors, }); // Limit to 2 concurrent layers: download the next while applying the previous, @@ -534,27 +527,21 @@ impl ImageOp { /// Blob reader that fetches from a skopeo image proxy on demand. struct ProxyBlobReader { image_op: Arc>, - descriptors: - std::collections::HashMap, } impl crate::delta::DeltaBlobReader for ProxyBlobReader { fn open_blob( &self, - digest: &OciDigest, - ) -> std::pin::Pin> + Send + '_>> - { - let digest = digest.clone(); + desc: &Descriptor, + ) -> std::pin::Pin< + Box>> + Send + '_>, + > { + let desc = desc.clone(); Box::pin(async move { - let desc = self - .descriptors - .get(&digest) - .with_context(|| format!("No descriptor for blob {digest}"))?; - let (reader, driver) = self .image_op .proxy - .get_blob(&self.image_op.img, &digest, desc.size()) + .get_blob(&self.image_op.img, desc.digest(), desc.size()) .await?; let tmpfile = self @@ -569,7 +556,7 @@ impl crate::delta::DeltaBlobReader for ProxyBlobRea let mut std_file = async_dst.into_std().await; use std::io::Seek; std_file.seek(std::io::SeekFrom::Start(0))?; - anyhow::Ok(std_file) + anyhow::Ok(Box::new(std_file) as Box) }; let (file_result, driver_result) = tokio::join!(copy_fut, driver); let _: () = driver_result?; @@ -601,8 +588,10 @@ pub async fn pull_image( let image_ref = ImageReference::try_from(imgref).context("Parsing image reference transport")?; - // Fast path: read local OCI layout directories directly without skopeo - let (result, stats) = if image_ref.transport == Transport::OciDir { + // Fast path: read local OCI layout directories and archives directly without skopeo + let (result, stats) = if image_ref.transport == Transport::OciDir + || image_ref.transport == Transport::OciArchive + { let (path_str, layout_tag) = crate::oci_layout::parse_oci_layout_ref(&image_ref.name); let layout_path = std::path::Path::new(path_str); crate::oci_layout::import_oci_layout(repo, layout_path, layout_tag, reporter).await? diff --git a/crates/composefs-storage/Cargo.toml b/crates/composefs-storage/Cargo.toml index 904551a5..51a1c91a 100644 --- a/crates/composefs-storage/Cargo.toml +++ b/crates/composefs-storage/Cargo.toml @@ -18,7 +18,7 @@ cap-std-ext = { version = "4.0", default-features = false } crc = { version = "3.0", default-features = false } flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] } jsonrpc-fdpass = { workspace = true, optional = true } -oci-spec = { version = "0.9", default-features = false, features = ["image"] } +oci-spec = { version = "0.10", default-features = false, features = ["image"] } rustix = { version = "1.0", default-features = false, features = ["fs", "std", "process", "thread"] } serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] }