From 31dbb99d36cd2ff45a8c85699f9f140bf6be2b5d Mon Sep 17 00:00:00 2001 From: Priyanshu Kumar Date: Fri, 19 Dec 2025 18:19:51 +0000 Subject: [PATCH 1/3] Add get_blob_stream (prefer GetRawBlob) Signed-off-by: Priyanshu Kumar --- Cargo.toml | 2 + src/imageproxy.rs | 327 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 325 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5ee696c..01cdbbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ rust-version = "1.70.0" [dependencies] futures-util = "0.3.13" +hex = "0.4.3" # NOTE when bumping this in a semver-incompatible way, because we re-export it you # must also bump the semver of this project. # See also https://github.com/youki-dev/oci-spec-rs/pull/288 @@ -18,6 +19,7 @@ rustix = { version = "1.0", features = ["process", "fs", "net"] } serde = { features = ["derive"], version = "1.0.125" } serde_json = "1.0.64" semver = "1.0.4" +sha2 = "0.10.9" thiserror = "2" tokio = { features = ["fs", "io-util", "macros", "process", "rt", "sync"], version = "1" } tracing = "0.1" diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 5ceba9f..c71d92f 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -20,6 +20,7 @@ use cap_std_ext::{cap_std, cap_tempfile}; use futures_util::{Future, FutureExt}; use itertools::Itertools; use serde::{Deserialize, Serialize}; +use sha2::Digest as _; use std::fs::File; use std::iter::FusedIterator; use std::num::NonZeroU32; @@ -31,8 +32,8 @@ use std::pin::Pin; use std::process::{Command, Stdio}; use std::sync::{Arc, Mutex, OnceLock}; use thiserror::Error; -use tokio::io::{AsyncBufRead, AsyncReadExt}; -use tokio::sync::Mutex as AsyncMutex; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf}; +use tokio::sync::{oneshot, Mutex as AsyncMutex}; use tokio::task::JoinError; use tracing::instrument; @@ -55,6 +56,9 @@ pub enum Error { /// An error returned from the remote proxy #[error("proxy request returned error: {0}")] RequestReturned(Box), + /// An error returned via the `GetRawBlob` error pipe. + #[error(transparent)] + BlobError(#[from] GetBlobError), #[error("semantic version error: {0}")] SemanticVersion(#[from] semver::Error), #[error("proxy too old (requested={requested_version} found={found_version}) error")] @@ -98,6 +102,187 @@ impl From for Error { /// The error type returned from this crate. pub type Result = std::result::Result; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BlobStreamSource { + /// Used `GetRawBlob`. + GetRawBlob, + /// Fell back to `GetBlob`. + GetBlob, +} + +/// A streaming blob reader and "driver" future. +pub struct BlobStream<'a> { + pub source: BlobStreamSource, + pub expected_size: u64, + pub reported_size: Option, + pub reader: Box, + pub driver: futures_util::future::BoxFuture<'a, Result<()>>, +} + +impl std::fmt::Debug for BlobStream<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlobStream") + .field("source", &self.source) + .field("expected_size", &self.expected_size) + .field("reported_size", &self.reported_size) + .finish_non_exhaustive() + } +} + +#[derive(Debug)] +enum VerifiedBlobReadResult { + Complete { nbytes: u64, digest_hex: String }, + Incomplete, +} + +#[derive(Debug)] +enum Hasher { + Sha256(sha2::Sha256), + Sha384(sha2::Sha384), + Sha512(sha2::Sha512), +} + +impl Hasher { + fn new_for_digest(digest: &Digest) -> Result { + use oci_spec::image::DigestAlgorithm; + Ok(match digest.algorithm() { + DigestAlgorithm::Sha256 => Self::Sha256(sha2::Sha256::new()), + DigestAlgorithm::Sha384 => Self::Sha384(sha2::Sha384::new()), + DigestAlgorithm::Sha512 => Self::Sha512(sha2::Sha512::new()), + DigestAlgorithm::Other(a) => { + return Err(Error::Other( + format!("Unsupported digest algorithm for blob verification: {a}").into(), + )); + } + _ => { + return Err(Error::Other( + format!( + "Unsupported digest algorithm for blob verification: {}", + digest.algorithm().as_ref() + ) + .into(), + )); + } + }) + } + + fn update(&mut self, chunk: &[u8]) { + match self { + Self::Sha256(h) => h.update(chunk), + Self::Sha384(h) => h.update(chunk), + Self::Sha512(h) => h.update(chunk), + } + } + + fn finalize_hex(self) -> String { + match self { + Self::Sha256(h) => hex::encode(h.finalize()), + Self::Sha384(h) => hex::encode(h.finalize()), + Self::Sha512(h) => hex::encode(h.finalize()), + } + } +} + +/// Wraps an [`AsyncRead`] and computes a digest; reports the result on EOF. +#[derive(Debug)] +struct VerifiedBlobReader { + inner: R, + nbytes: u64, + hasher: Option, + completion: Option>, +} + +impl VerifiedBlobReader { + fn new( + inner: R, + expected: Digest, + completion: oneshot::Sender, + ) -> Result { + let hasher = Hasher::new_for_digest(&expected)?; + Ok(Self { + inner, + nbytes: 0, + hasher: Some(hasher), + completion: Some(completion), + }) + } +} + +impl AsyncRead for VerifiedBlobReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> std::task::Poll> { + if buf.remaining() == 0 { + return std::task::Poll::Ready(Ok(())); + } + let before = buf.filled().len(); + match Pin::new(&mut self.inner).poll_read(cx, buf) { + v @ std::task::Poll::Ready(Ok(_)) => { + let after = buf.filled().len(); + debug_assert!(after >= before); + let delta = after - before; + if delta > 0 { + let chunk = &buf.filled()[before..after]; + if let Some(hasher) = self.hasher.as_mut() { + hasher.update(chunk); + } + self.nbytes += delta as u64; + } else { + // EOF reached + let Some(tx) = self.completion.take() else { + return v; + }; + let Some(hasher) = self.hasher.take() else { + return v; + }; + let _ = tx.send(VerifiedBlobReadResult::Complete { + nbytes: self.nbytes, + digest_hex: hasher.finalize_hex(), + }); + } + v + } + o => o, + } + } +} + +impl Drop for VerifiedBlobReader { + fn drop(&mut self) { + if let Some(tx) = self.completion.take() { + let _ = tx.send(VerifiedBlobReadResult::Incomplete); + } + } +} + +fn verify_blob_bytes_read( + expected: &Digest, + expected_size: u64, + r: VerifiedBlobReadResult, +) -> Result<()> { + match r { + VerifiedBlobReadResult::Incomplete => Ok(()), + VerifiedBlobReadResult::Complete { nbytes, digest_hex } => { + if nbytes != expected_size { + return Err(Error::Other( + format!( + "Blob size mismatch for {expected}: expected {expected_size} bytes, read {nbytes} bytes" + ) + .into(), + )); + } + if digest_hex != expected.digest() { + return Err(Error::Other( + format!("Blob digest mismatch for {expected}: computed {digest_hex}").into(), + )); + } + Ok(()) + } + } +} + /// File descriptor range which is reserved for passing data down into the proxy; /// avoid configuring the command to use files in this range. (Also, stdin is /// reserved) @@ -123,6 +308,11 @@ fn layer_info_piped_proto_version() -> &'static semver::VersionReq { LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap()) } +fn raw_blob_proto_version() -> &'static semver::VersionReq { + static RAW_BLOB_PROTO_VERSION: OnceLock = OnceLock::new(); + RAW_BLOB_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.8").unwrap()) +} + #[derive(Serialize)] struct Request { method: String, @@ -498,8 +688,8 @@ impl ImageProxy { let supported = base_proto_version(); if !supported.matches(&protover) { return Err(Error::ProxyTooOld { - requested_version: protover.to_string().into(), - found_version: supported.to_string().into(), + requested_version: supported.to_string().into(), + found_version: protover.to_string().into(), }); } r.protover = protover; @@ -507,6 +697,14 @@ impl ImageProxy { Ok(r) } + pub fn protocol_version(&self) -> &semver::Version { + &self.protover + } + + pub fn supports_get_raw_blob(&self) -> bool { + raw_blob_proto_version().matches(&self.protover) + } + /// Create and send a request. Should only be used by impl_request. async fn impl_request_raw( sockfd: Arc>, @@ -773,6 +971,69 @@ impl ImageProxy { Ok((bloblen, fd, err)) } + /// Fetch a blob as a stream, preferring `GetRawBlob` and falling back to `GetBlob`. + /// + /// The returned `driver` future completes only after proxy-side processing finishes; it also + /// verifies `expected_size` and `digest` for the `GetRawBlob` path. + #[instrument] + pub async fn get_blob_stream<'a>( + &'a self, + img: &OpenedImage, + digest: &Digest, + expected_size: u64, + ) -> Result> { + let fallback_to_get_blob = || async move { + let (reader, driver) = self.get_blob(img, digest, expected_size).await?; + let driver = async move { driver.await }.boxed(); + Ok(BlobStream { + source: BlobStreamSource::GetBlob, + expected_size, + reported_size: Some(expected_size), + reader: Box::new(reader), + driver, + }) + }; + + if !self.supports_get_raw_blob() { + return fallback_to_get_blob().await; + } + + match self.get_raw_blob(img, digest).await { + Ok((reported_size, fd, err)) => { + if let Some(sz) = reported_size { + if sz != expected_size { + return Err(Error::Other( + format!( + "Blob size mismatch for {digest}: expected {expected_size} bytes, proxy reported {sz} bytes" + ) + .into(), + )); + } + } + + let expected = digest.clone(); + let (tx, rx) = oneshot::channel(); + let verified = VerifiedBlobReader::new(fd, expected.clone(), tx)?; + let driver = async move { + err.await.map_err(Error::from)?; + match rx.await { + Ok(r) => verify_blob_bytes_read(&expected, expected_size, r), + Err(_) => Ok(()), + } + } + .boxed(); + Ok(BlobStream { + source: BlobStreamSource::GetRawBlob, + expected_size, + reported_size, + reader: Box::new(verified), + driver, + }) + } + Err(e) => Err(e), + } + } + /// Fetch a descriptor. The requested size and digest are verified (by the proxy process). #[instrument] pub async fn get_descriptor( @@ -1039,6 +1300,64 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_verified_blob_reader_ok() -> Result<()> { + use std::str::FromStr; + use tokio::io::AsyncReadExt; + + let data = b"hello world"; + let mut tmp = tempfile::NamedTempFile::new()?; + tmp.as_file_mut().write_all(data)?; + tmp.as_file_mut().sync_all()?; + + let digest = { + let mut h = sha2::Sha256::new(); + h.update(data); + Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() + }; + + let fd = tokio::fs::File::open(tmp.path()).await?; + let (tx, rx) = oneshot::channel(); + let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?; + + let mut out = Vec::new(); + reader.read_to_end(&mut out).await?; + assert_eq!(&out, data); + + let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?; + verify_blob_bytes_read(&digest, data.len() as u64, result)?; + Ok(()) + } + + #[tokio::test] + async fn test_verified_blob_reader_digest_mismatch() -> Result<()> { + use std::str::FromStr; + use tokio::io::AsyncReadExt; + + let data = b"hello world"; + let mut tmp = tempfile::NamedTempFile::new()?; + tmp.as_file_mut().write_all(data)?; + tmp.as_file_mut().sync_all()?; + + let digest = { + let mut h = sha2::Sha256::new(); + h.update(b"not the content"); + Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() + }; + + let fd = tokio::fs::File::open(tmp.path()).await?; + let (tx, rx) = oneshot::channel(); + let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?; + + let mut out = Vec::new(); + reader.read_to_end(&mut out).await?; + assert_eq!(&out, data); + + let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?; + assert!(verify_blob_bytes_read(&digest, data.len() as u64, result).is_err()); + Ok(()) + } + // Helper to create a dummy OwnedFd using memfd_create for testing. fn create_dummy_fd() -> OwnedFd { memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap() From ad9dafa47de913019460dc51435fac0fb9c647ea Mon Sep 17 00:00:00 2001 From: Priyanshu Kumar Date: Sat, 20 Dec 2025 05:23:34 +0000 Subject: [PATCH 2/3] ci: test get_blob_stream against old/new skopeo Run tests in containers with skopeo 1.18 and >=1.19 to exercise both GetBlob fallback and GetRawBlob paths. Signed-off-by: Priyanshu Kumar --- .github/workflows/ci.yaml | 50 ++++++++++++++- src/imageproxy.rs | 126 +++++++++++++++++++++++++++++++++++++- 2 files changed, 174 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 26d6753..7ac90da 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -67,9 +67,57 @@ jobs: - name: Test ostree-rs-ext run: ./ci/test-ostree-rs-ext.sh + test-old-skopeo: + name: Test (old skopeo) + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + ref: ${{ github.event.pull_request.head.sha }} + fetch-depth: 20 + - name: Test in AlmaLinux bootc container (skopeo 1.18) + run: | + set -euxo pipefail + rm -rf .ci-cargo-home + mkdir -p .ci-cargo-home + docker run --rm \ + -e CARGO_TARGET_DIR=/tmp/target \ + -e EXPECT_BLOB_STREAM_SOURCE=GetBlob \ + -v "$PWD:/src:ro" \ + -v "$PWD/.ci-cargo-home:/root/.cargo" \ + -w /src \ + quay.io/almalinuxorg/almalinux-bootc:10.0 \ + sh -lc 'dnf -y install cargo rustc >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' + sudo rm -rf .ci-cargo-home + + test-new-skopeo: + name: Test (new skopeo) + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + ref: ${{ github.event.pull_request.head.sha }} + fetch-depth: 20 + - name: Test in CentOS Stream 10 container (skopeo >= 1.19) + run: | + set -euxo pipefail + rm -rf .ci-cargo-home + mkdir -p .ci-cargo-home + docker run --rm \ + -e CARGO_TARGET_DIR=/tmp/target \ + -e EXPECT_BLOB_STREAM_SOURCE=GetRawBlob \ + -v "$PWD:/src:ro" \ + -v "$PWD/.ci-cargo-home:/root/.cargo" \ + -w /src \ + quay.io/centos/centos:stream10 \ + sh -lc 'dnf -y install cargo rustc skopeo >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' + sudo rm -rf .ci-cargo-home + required-checks: if: always() - needs: [semver-checks, build-test] + needs: [semver-checks, build-test, test-old-skopeo, test-new-skopeo] runs-on: ubuntu-latest steps: - name: Check all jobs diff --git a/src/imageproxy.rs b/src/imageproxy.rs index c71d92f..550d66d 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -984,7 +984,7 @@ impl ImageProxy { ) -> Result> { let fallback_to_get_blob = || async move { let (reader, driver) = self.get_blob(img, digest, expected_size).await?; - let driver = async move { driver.await }.boxed(); + let driver = driver.boxed(); Ok(BlobStream { source: BlobStreamSource::GetBlob, expected_size, @@ -1358,6 +1358,130 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_get_blob_stream_oci_dir() -> Result<()> { + use std::str::FromStr; + + if !check_skopeo() { + return Ok(()); + } + + fn sha256_digest(bytes: &[u8]) -> Digest { + let mut h = sha2::Sha256::new(); + h.update(bytes); + Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() + } + + fn write_blob(root: &std::path::Path, bytes: &[u8]) -> Result<(Digest, u64)> { + let digest = sha256_digest(bytes); + let size = bytes.len() as u64; + let dir = root.join("blobs").join("sha256"); + std::fs::create_dir_all(&dir)?; + std::fs::write(dir.join(digest.digest()), bytes)?; + Ok((digest, size)) + } + + let td = tempfile::tempdir()?; + std::fs::write( + td.path().join("oci-layout"), + serde_json::to_vec(&serde_json::json!({"imageLayoutVersion":"1.0.0"}))?, + )?; + + let layer_bytes = b"layer bytes"; + let (layer_digest, layer_size) = write_blob(td.path(), layer_bytes)?; + + let config_bytes = serde_json::to_vec(&serde_json::json!({ + "architecture": "amd64", + "os": "linux", + "rootfs": { + "type": "layers", + "diff_ids": [layer_digest.to_string()], + }, + "config": {}, + }))?; + let (config_digest, config_size) = write_blob(td.path(), &config_bytes)?; + + let manifest_bytes = serde_json::to_vec(&serde_json::json!({ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "config": { + "mediaType": "application/vnd.oci.image.config.v1+json", + "digest": config_digest.to_string(), + "size": config_size, + }, + "layers": [{ + "mediaType": "application/vnd.oci.image.layer.v1.tar", + "digest": layer_digest.to_string(), + "size": layer_size, + }], + }))?; + let (manifest_digest, manifest_size) = write_blob(td.path(), &manifest_bytes)?; + + std::fs::write( + td.path().join("index.json"), + serde_json::to_vec(&serde_json::json!({ + "schemaVersion": 2, + "mediaType": "application/vnd.oci.image.index.v1+json", + "manifests": [{ + "mediaType": "application/vnd.oci.image.manifest.v1+json", + "digest": manifest_digest.to_string(), + "size": manifest_size, + "annotations": { + "org.opencontainers.image.ref.name": "test", + } + }] + }))?, + )?; + + let proxy = ImageProxy::new().await?; + let imgref = format!("oci:{}:test", td.path().display()); + let img = proxy.open_image(&imgref).await?; + + let expected_source = match std::env::var("EXPECT_BLOB_STREAM_SOURCE").ok().as_deref() { + Some("GetRawBlob") => BlobStreamSource::GetRawBlob, + Some("GetBlob") => BlobStreamSource::GetBlob, + Some(v) => { + return Err(Error::Other( + format!( + "Invalid EXPECT_BLOB_STREAM_SOURCE={v}; expected GetRawBlob or GetBlob" + ) + .into(), + )); + } + None => { + if proxy.supports_get_raw_blob() { + BlobStreamSource::GetRawBlob + } else { + BlobStreamSource::GetBlob + } + } + }; + + let BlobStream { + source, + reader, + driver, + .. + } = proxy + .get_blob_stream(&img, &layer_digest, layer_size) + .await?; + assert_eq!(source, expected_source); + + let mut reader = reader; + let mut sink = tokio::io::sink(); + let read = async move { + let n = tokio::io::copy(&mut *reader, &mut sink).await?; + Result::Ok(n) + }; + let (n, driver) = tokio::join!(read, driver); + assert_eq!(n?, layer_size); + driver?; + + proxy.close_image(&img).await?; + proxy.finalize().await?; + Ok(()) + } + // Helper to create a dummy OwnedFd using memfd_create for testing. fn create_dummy_fd() -> OwnedFd { memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap() From 7ed1a8d8d5ead8c6302cb3ee0e9d5c5196e1a934 Mon Sep 17 00:00:00 2001 From: Priyanshu Kumar Date: Thu, 18 Jun 2026 09:39:37 +0000 Subject: [PATCH 3/3] Address blob stream review feedback Assisted-by: Codex (GPT-5) Signed-off-by: Priyanshu Kumar Signed-off-by: Colin Walters --- .github/workflows/ci.yaml | 8 +- Cargo.toml | 1 + src/imageproxy.rs | 250 +++++++++++++++++--------------------- 3 files changed, 119 insertions(+), 140 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7ac90da..49a6519 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -84,11 +84,11 @@ jobs: docker run --rm \ -e CARGO_TARGET_DIR=/tmp/target \ -e EXPECT_BLOB_STREAM_SOURCE=GetBlob \ - -v "$PWD:/src:ro" \ + -v "$PWD:/src" \ -v "$PWD/.ci-cargo-home:/root/.cargo" \ -w /src \ quay.io/almalinuxorg/almalinux-bootc:10.0 \ - sh -lc 'dnf -y install cargo rustc >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' + sh -lc 'dnf -y install cargo rustc openssl-devel pkg-config >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' sudo rm -rf .ci-cargo-home test-new-skopeo: @@ -108,11 +108,11 @@ jobs: docker run --rm \ -e CARGO_TARGET_DIR=/tmp/target \ -e EXPECT_BLOB_STREAM_SOURCE=GetRawBlob \ - -v "$PWD:/src:ro" \ + -v "$PWD:/src" \ -v "$PWD/.ci-cargo-home:/root/.cargo" \ -w /src \ quay.io/centos/centos:stream10 \ - sh -lc 'dnf -y install cargo rustc skopeo >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' + sh -lc 'dnf -y install cargo rustc skopeo openssl-devel pkg-config >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' sudo rm -rf .ci-cargo-home required-checks: diff --git a/Cargo.toml b/Cargo.toml index 01cdbbf..976845f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ itertools = "0.14.0" anyhow = "1.0" bytes = "1.5" clap = { version = "4.4", features = ["derive"] } +ocidir = "0.7.2" tempfile = "3.20.0" [lib] diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 550d66d..d81b7ee 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -30,6 +30,7 @@ use std::os::unix::prelude::CommandExt; use std::path::PathBuf; use std::pin::Pin; use std::process::{Command, Stdio}; +use std::str::FromStr; use std::sync::{Arc, Mutex, OnceLock}; use thiserror::Error; use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf}; @@ -112,11 +113,29 @@ pub enum BlobStreamSource { /// A streaming blob reader and "driver" future. pub struct BlobStream<'a> { - pub source: BlobStreamSource, - pub expected_size: u64, - pub reported_size: Option, - pub reader: Box, - pub driver: futures_util::future::BoxFuture<'a, Result<()>>, + source: BlobStreamSource, + expected_size: u64, + reader: Box, + driver: futures_util::future::BoxFuture<'a, Result<()>>, +} + +impl<'a> BlobStream<'a> { + pub fn source(&self) -> BlobStreamSource { + self.source + } + + pub fn expected_size(&self) -> u64 { + self.expected_size + } + + pub fn into_parts( + self, + ) -> ( + Box, + futures_util::future::BoxFuture<'a, Result<()>>, + ) { + (self.reader, self.driver) + } } impl std::fmt::Debug for BlobStream<'_> { @@ -124,14 +143,13 @@ impl std::fmt::Debug for BlobStream<'_> { f.debug_struct("BlobStream") .field("source", &self.source) .field("expected_size", &self.expected_size) - .field("reported_size", &self.reported_size) .finish_non_exhaustive() } } #[derive(Debug)] enum VerifiedBlobReadResult { - Complete { nbytes: u64, digest_hex: String }, + Complete { nbytes: u64, digest: Digest }, Incomplete, } @@ -174,16 +192,18 @@ impl Hasher { } } - fn finalize_hex(self) -> String { - match self { - Self::Sha256(h) => hex::encode(h.finalize()), - Self::Sha384(h) => hex::encode(h.finalize()), - Self::Sha512(h) => hex::encode(h.finalize()), - } + fn finalize_digest(self) -> Digest { + let (algorithm, hex) = match self { + Self::Sha256(h) => ("sha256", hex::encode(h.finalize())), + Self::Sha384(h) => ("sha384", hex::encode(h.finalize())), + Self::Sha512(h) => ("sha512", hex::encode(h.finalize())), + }; + Digest::from_str(&format!("{algorithm}:{hex}")).expect("valid digest") } } -/// Wraps an [`AsyncRead`] and computes a digest; reports the result on EOF. +/// Wraps an [`AsyncRead`] and computes a digest; sends the result on EOF so the +/// driver future can verify the stream without re-reading it. #[derive(Debug)] struct VerifiedBlobReader { inner: R, @@ -217,17 +237,16 @@ impl AsyncRead for VerifiedBlobReader { if buf.remaining() == 0 { return std::task::Poll::Ready(Ok(())); } + // ReadBuf may already have data; only hash the newly appended bytes. let before = buf.filled().len(); match Pin::new(&mut self.inner).poll_read(cx, buf) { - v @ std::task::Poll::Ready(Ok(_)) => { + v @ std::task::Poll::Ready(Ok(())) => { let after = buf.filled().len(); - debug_assert!(after >= before); - let delta = after - before; + let delta = after.checked_sub(before).unwrap(); if delta > 0 { let chunk = &buf.filled()[before..after]; - if let Some(hasher) = self.hasher.as_mut() { - hasher.update(chunk); - } + let hasher = self.hasher.as_mut().expect("hasher missing before EOF"); + hasher.update(chunk); self.nbytes += delta as u64; } else { // EOF reached @@ -239,7 +258,7 @@ impl AsyncRead for VerifiedBlobReader { }; let _ = tx.send(VerifiedBlobReadResult::Complete { nbytes: self.nbytes, - digest_hex: hasher.finalize_hex(), + digest: hasher.finalize_digest(), }); } v @@ -264,7 +283,7 @@ fn verify_blob_bytes_read( ) -> Result<()> { match r { VerifiedBlobReadResult::Incomplete => Ok(()), - VerifiedBlobReadResult::Complete { nbytes, digest_hex } => { + VerifiedBlobReadResult::Complete { nbytes, digest } => { if nbytes != expected_size { return Err(Error::Other( format!( @@ -273,9 +292,9 @@ fn verify_blob_bytes_read( .into(), )); } - if digest_hex != expected.digest() { + if digest != *expected { return Err(Error::Other( - format!("Blob digest mismatch for {expected}: computed {digest_hex}").into(), + format!("Blob digest mismatch for {expected}: computed {digest}").into(), )); } Ok(()) @@ -982,56 +1001,48 @@ impl ImageProxy { digest: &Digest, expected_size: u64, ) -> Result> { - let fallback_to_get_blob = || async move { + if !self.supports_get_raw_blob() { let (reader, driver) = self.get_blob(img, digest, expected_size).await?; let driver = driver.boxed(); - Ok(BlobStream { + return Ok(BlobStream { source: BlobStreamSource::GetBlob, expected_size, - reported_size: Some(expected_size), reader: Box::new(reader), driver, - }) - }; - - if !self.supports_get_raw_blob() { - return fallback_to_get_blob().await; + }); } - match self.get_raw_blob(img, digest).await { - Ok((reported_size, fd, err)) => { - if let Some(sz) = reported_size { - if sz != expected_size { - return Err(Error::Other( - format!( - "Blob size mismatch for {digest}: expected {expected_size} bytes, proxy reported {sz} bytes" - ) - .into(), - )); - } - } + let (reported_size, fd, err) = self.get_raw_blob(img, digest).await?; + if let Some(sz) = reported_size { + if sz != expected_size { + return Err(Error::Other( + format!( + "Blob size mismatch for {digest}: expected {expected_size} bytes, got {sz} bytes" + ) + .into(), + )); + } + } - let expected = digest.clone(); - let (tx, rx) = oneshot::channel(); - let verified = VerifiedBlobReader::new(fd, expected.clone(), tx)?; - let driver = async move { - err.await.map_err(Error::from)?; - match rx.await { - Ok(r) => verify_blob_bytes_read(&expected, expected_size, r), - Err(_) => Ok(()), - } - } - .boxed(); - Ok(BlobStream { - source: BlobStreamSource::GetRawBlob, - expected_size, - reported_size, - reader: Box::new(verified), - driver, - }) + let expected = digest.clone(); + let (tx, rx) = oneshot::channel(); + let verified = VerifiedBlobReader::new(fd, expected.clone(), tx)?; + let driver = async move { + err.await.map_err(Error::from)?; + match rx.await { + Ok(r) => verify_blob_bytes_read(&expected, expected_size, r), + Err(e) => Err(Error::Other( + format!("Blob stream verification channel error: {e}").into(), + )), } - Err(e) => Err(e), } + .boxed(); + Ok(BlobStream { + source: BlobStreamSource::GetRawBlob, + expected_size, + reader: Box::new(verified), + driver, + }) } /// Fetch a descriptor. The requested size and digest are verified (by the proxy process). @@ -1360,78 +1371,50 @@ mod tests { #[tokio::test] async fn test_get_blob_stream_oci_dir() -> Result<()> { - use std::str::FromStr; + use ocidir::{oci_spec as ocidir_spec, OciDir}; if !check_skopeo() { return Ok(()); } - fn sha256_digest(bytes: &[u8]) -> Digest { - let mut h = sha2::Sha256::new(); - h.update(bytes); - Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() - } - - fn write_blob(root: &std::path::Path, bytes: &[u8]) -> Result<(Digest, u64)> { - let digest = sha256_digest(bytes); - let size = bytes.len() as u64; - let dir = root.join("blobs").join("sha256"); - std::fs::create_dir_all(&dir)?; - std::fs::write(dir.join(digest.digest()), bytes)?; - Ok((digest, size)) - } - let td = tempfile::tempdir()?; - std::fs::write( - td.path().join("oci-layout"), - serde_json::to_vec(&serde_json::json!({"imageLayoutVersion":"1.0.0"}))?, - )?; - + fn to_other(e: E) -> Error { + Error::Other(e.to_string().into()) + } let layer_bytes = b"layer bytes"; - let (layer_digest, layer_size) = write_blob(td.path(), layer_bytes)?; - - let config_bytes = serde_json::to_vec(&serde_json::json!({ - "architecture": "amd64", - "os": "linux", - "rootfs": { - "type": "layers", - "diff_ids": [layer_digest.to_string()], - }, - "config": {}, - }))?; - let (config_digest, config_size) = write_blob(td.path(), &config_bytes)?; - - let manifest_bytes = serde_json::to_vec(&serde_json::json!({ - "schemaVersion": 2, - "mediaType": "application/vnd.oci.image.manifest.v1+json", - "config": { - "mediaType": "application/vnd.oci.image.config.v1+json", - "digest": config_digest.to_string(), - "size": config_size, - }, - "layers": [{ - "mediaType": "application/vnd.oci.image.layer.v1.tar", - "digest": layer_digest.to_string(), - "size": layer_size, - }], - }))?; - let (manifest_digest, manifest_size) = write_blob(td.path(), &manifest_bytes)?; - - std::fs::write( - td.path().join("index.json"), - serde_json::to_vec(&serde_json::json!({ - "schemaVersion": 2, - "mediaType": "application/vnd.oci.image.index.v1+json", - "manifests": [{ - "mediaType": "application/vnd.oci.image.manifest.v1+json", - "digest": manifest_digest.to_string(), - "size": manifest_size, - "annotations": { - "org.opencontainers.image.ref.name": "test", - } - }] - }))?, - )?; + let dir = ocidir::cap_std::fs::Dir::open_ambient_dir( + td.path(), + ocidir::cap_std::ambient_authority(), + ) + .map_err(to_other)?; + let oci_dir = OciDir::ensure(dir).map_err(to_other)?; + let mut layerw = oci_dir.create_gzip_layer(None).map_err(to_other)?; + layerw.write_all(layer_bytes)?; + let layer = layerw.complete().map_err(to_other)?; + let layer_desc = layer.descriptor().build().unwrap(); + let layer_digest = Digest::from_str(layer_desc.digest().as_ref()).map_err(to_other)?; + let layer_size = layer_desc.size(); + + let mut manifest = oci_dir + .new_empty_manifest() + .map_err(to_other)? + .build() + .map_err(to_other)?; + let mut config = ocidir_spec::image::ImageConfigurationBuilder::default() + .architecture("amd64") + .os("linux") + .build() + .unwrap(); + oci_dir.push_layer(&mut manifest, &mut config, layer, "layer", None); + let config_desc = oci_dir.write_config(config).map_err(to_other)?; + manifest.set_config(config_desc); + oci_dir + .insert_manifest( + manifest, + Some("test"), + ocidir_spec::image::Platform::default(), + ) + .map_err(to_other)?; let proxy = ImageProxy::new().await?; let imgref = format!("oci:{}:test", td.path().display()); @@ -1457,17 +1440,12 @@ mod tests { } }; - let BlobStream { - source, - reader, - driver, - .. - } = proxy + let stream = proxy .get_blob_stream(&img, &layer_digest, layer_size) .await?; - assert_eq!(source, expected_source); + assert_eq!(stream.source(), expected_source); + let (mut reader, driver) = stream.into_parts(); - let mut reader = reader; let mut sink = tokio::io::sink(); let read = async move { let n = tokio::io::copy(&mut *reader, &mut sink).await?;