Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ composefs-oci = { version = "0.4.0", path = "crates/composefs-oci", default-feat
composefs-boot = { version = "0.4.0", path = "crates/composefs-boot", default-features = false }
composefs-http = { version = "0.4.0", path = "crates/composefs-http", default-features = false }
cap-std-ext = "5.1.2"
ocidir = "0.7.2"
containers-image-proxy = { git = "https://github.com/containers/containers-image-proxy-rs", rev = "e339fc1c", default-features = false }
ocidir = { git = "https://github.com/containers/ocidir-rs", rev = "e137648a" }

# JSON-RPC with FD passing for userns helper
jsonrpc-fdpass = { version = "0.1.0", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions crates/composefs-oci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ composefs = { workspace = true }
zlink-core = { workspace = true, optional = true }
composefs-boot = { workspace = true, optional = true }
flate2 = { version = "1.0", default-features = false, features = ["zlib"] }
containers-image-proxy = { version = "0.10", default-features = false }
containers-image-proxy = { workspace = true }
cstorage = { package = "composefs-storage", path = "../composefs-storage", version = "0.4.0", optional = true }
hex = { version = "0.4.0", default-features = false }
indicatif = { version = "0.17.0", default-features = false }
Expand All @@ -40,7 +40,7 @@ rand = { version = "0.10.0", default-features = false, optional = true }
tar = { version = "0.4.38", default-features = false, optional = true }
tar-core = "0.1.0"
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.7", default-features = false, features = ["io"] }
tokio-util = { version = "0.7", default-features = false, features = ["io", "io-util"] }
tracing = { version = "0.1", default-features = false }
zstd = { version = "0.13.0", default-features = false }
cap-std-ext = { workspace = true }
Expand Down
73 changes: 34 additions & 39 deletions crates/composefs-oci/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use composefs::fsverity::FsVerityHashValue;
use composefs::repository::Repository;
use composefs::tree::RegularFile;
use containers_image_proxy::oci_spec::image::{
Digest as OciDigest, DigestAlgorithm, ImageConfiguration, ImageManifest, MediaType,
Descriptor, Digest as OciDigest, DigestAlgorithm, ImageConfiguration, ImageManifest, MediaType,
};

use tokio::sync::Semaphore;
use tokio::task::JoinSet;

use crate::oci_image;
use crate::oci_layout::OciBlobReader;
use crate::progress::{ComponentId, ProgressEvent, ProgressUnit, SharedReporter};
use crate::skopeo::PullResult;
use crate::{ImportStats, layer_identifier};
Expand Down Expand Up @@ -66,8 +67,8 @@ pub(crate) trait DeltaBlobReader: Send + Sync {
/// this fetches the blob to a local temp file first.
fn open_blob(
&self,
digest: &OciDigest,
) -> Pin<Box<dyn Future<Output = Result<File>> + Send + '_>>;
desc: &Descriptor,
) -> Pin<Box<dyn Future<Output = Result<OciBlobReader>> + Send + '_>>;
}

/// Check whether an OCI manifest is a delta artifact.
Expand Down Expand Up @@ -326,7 +327,10 @@ fn tar_patch_apply<ObjectID: FsVerityHashValue>(

/// Reconstruct a single layer's uncompressed tar from a delta blob.
/// Returns a seeked-to-start temp file with diff_id already verified.
fn decompress_layer(reader: File, media_type: &MediaType) -> Result<Box<dyn Read + Send>> {
fn decompress_layer(
reader: impl Read + Send + 'static,
media_type: &MediaType,
) -> Result<OciBlobReader> {
let buf = BufReader::new(reader);
match media_type {
MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => Ok(Box::new(buf)),
Expand All @@ -343,7 +347,7 @@ fn decompress_layer(reader: File, media_type: &MediaType) -> Result<Box<dyn Read
fn reconstruct_layer<ObjectID: FsVerityHashValue>(
repo: &Repository<ObjectID>,
source_image: &Arc<SourceImage<ObjectID>>,
blob_file: File,
blob_reader: impl Read + Send + 'static,
media_type: &MediaType,
expected_diff_id: &OciDigest,
) -> Result<File> {
Expand All @@ -362,9 +366,9 @@ fn reconstruct_layer<ObjectID: FsVerityHashValue>(
inner: &mut tmpfile,
hasher: &mut hasher,
};
tar_patch_apply(blob_file, &mut data_source, &mut hashing_writer)?;
tar_patch_apply(blob_reader, &mut data_source, &mut hashing_writer)?;
} else {
let mut decoder = decompress_layer(blob_file, media_type)?;
let mut decoder = decompress_layer(blob_reader, media_type)?;
let mut hashing_writer = HashingWriter {
inner: &mut tmpfile,
hasher: &mut hasher,
Expand All @@ -385,15 +389,14 @@ fn reconstruct_layer<ObjectID: FsVerityHashValue>(
// ─── Delta manifest parsing ─────────────────────────────────────────────────

struct DeltaLayer {
blob_digest: OciDigest,
media_type: MediaType,
descriptor: Descriptor,
}

struct ParsedDelta {
target_manifest: ImageManifest,
target_manifest_digest: OciDigest,
target_manifest_descriptor: Descriptor,
target_manifest_raw: Vec<u8>,
target_config_digest: OciDigest,
target_config_descriptor: Descriptor,
target_config_raw: Vec<u8>,
source_config_digest: OciDigest,
delta_layer_by_to: HashMap<OciDigest, DeltaLayer>,
Expand All @@ -416,8 +419,8 @@ async fn parse_delta_manifest(
.parse()
.context("Invalid source config digest")?;

let mut target_manifest_digest = None;
let mut target_config_digest = None;
let mut target_manifest_descriptor = None;
let mut target_config_descriptor = None;
let mut delta_layer_by_to = HashMap::new();

for layer in delta_manifest.layers() {
Expand All @@ -430,10 +433,10 @@ async fn parse_delta_manifest(

match content {
"image-manifest" => {
target_manifest_digest = Some(layer.digest().clone());
target_manifest_descriptor = Some(layer.clone());
}
"image-config" => {
target_config_digest = Some(layer.digest().clone());
target_config_descriptor = Some(layer.clone());
}
"image-layer" => {
if let Some(to_str) = layer_annotations
Expand All @@ -445,8 +448,7 @@ async fn parse_delta_manifest(
delta_layer_by_to.insert(
to_digest,
DeltaLayer {
blob_digest: layer.digest().clone(),
media_type: layer.media_type().clone(),
descriptor: layer.clone(),
},
);
}
Expand All @@ -455,14 +457,14 @@ async fn parse_delta_manifest(
}
}

let target_manifest_digest =
target_manifest_digest.context("Delta manifest has no embedded image manifest")?;
let target_config_digest =
target_config_digest.context("Delta manifest has no embedded image config")?;
let target_manifest_descriptor =
target_manifest_descriptor.context("Delta manifest has no embedded image manifest")?;
let target_config_descriptor =
target_config_descriptor.context("Delta manifest has no embedded image config")?;

let mut target_manifest_raw = Vec::new();
blob_reader
.open_blob(&target_manifest_digest)
.open_blob(&target_manifest_descriptor)
.await
.context("Fetching embedded image manifest")?
.read_to_end(&mut target_manifest_raw)?;
Expand All @@ -471,7 +473,7 @@ async fn parse_delta_manifest(

let mut target_config_raw = Vec::new();
blob_reader
.open_blob(&target_config_digest)
.open_blob(&target_config_descriptor)
.await
.context("Fetching embedded image config")?
.read_to_end(&mut target_config_raw)?;
Expand All @@ -481,9 +483,9 @@ async fn parse_delta_manifest(

Ok(ParsedDelta {
target_manifest,
target_manifest_digest,
target_manifest_descriptor,
target_manifest_raw,
target_config_digest,
target_config_descriptor,
target_config_raw,
source_config_digest,
delta_layer_by_to,
Expand All @@ -507,8 +509,8 @@ pub(crate) async fn import_delta<ObjectID: FsVerityHashValue>(
) -> Result<(PullResult<ObjectID>, ImportStats)> {
let parsed = parse_delta_manifest(delta_manifest, &*blob_reader).await?;

let manifest_digest = &parsed.target_manifest_digest;
let config_digest = &parsed.target_config_digest;
let manifest_digest = parsed.target_manifest_descriptor.digest();
let config_digest = parsed.target_config_descriptor.digest();

// Check if the target image already exists
if let Some(manifest_verity) = oci_image::has_manifest(repo, manifest_digest)? {
Expand Down Expand Up @@ -615,8 +617,7 @@ pub(crate) async fn import_delta<ObjectID: FsVerityHashValue>(
}

let diff_id = diff_id.clone();
let blob_digest = dl.blob_digest.clone();
let media_type = dl.media_type.clone();
let descriptor = dl.descriptor.clone();
let repo = Arc::clone(repo);
let source_image = Arc::clone(&source_image);
let blob_reader = Arc::clone(&blob_reader);
Expand All @@ -628,16 +629,17 @@ pub(crate) async fn import_delta<ObjectID: FsVerityHashValue>(
layer_tasks.spawn(async move {
let _permit = permit;

let blob_file = blob_reader
.open_blob(&blob_digest)
let blob = blob_reader
.open_blob(&descriptor)
.await
.with_context(|| format!("Fetching delta blob for layer {diff_id}"))?;

let media_type = descriptor.media_type().clone();
let reconstructed = tokio::task::spawn_blocking({
let diff_id = diff_id.clone();
let repo = Arc::clone(&repo);
move || -> Result<File> {
reconstruct_layer(&repo, &source_image, blob_file, &media_type, &diff_id)
reconstruct_layer(&repo, &source_image, blob, &media_type, &diff_id)
.with_context(|| format!("Reconstructing layer {diff_id}"))
}
})
Expand Down Expand Up @@ -740,13 +742,6 @@ pub(crate) async fn import_delta<ObjectID: FsVerityHashValue>(
))
}

/// Return references to all layer descriptors in a delta manifest.
pub(crate) fn delta_layer_descriptors(
manifest: &ImageManifest,
) -> &[containers_image_proxy::oci_spec::image::Descriptor] {
manifest.layers()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading