diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 26d6753..49a6519 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -67,9 +67,57 @@ jobs: - name: Test ostree-rs-ext run: ./ci/test-ostree-rs-ext.sh + test-old-skopeo: + name: Test (old skopeo) + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + ref: ${{ github.event.pull_request.head.sha }} + fetch-depth: 20 + - name: Test in AlmaLinux bootc container (skopeo 1.18) + run: | + set -euxo pipefail + rm -rf .ci-cargo-home + mkdir -p .ci-cargo-home + docker run --rm \ + -e CARGO_TARGET_DIR=/tmp/target \ + -e EXPECT_BLOB_STREAM_SOURCE=GetBlob \ + -v "$PWD:/src" \ + -v "$PWD/.ci-cargo-home:/root/.cargo" \ + -w /src \ + quay.io/almalinuxorg/almalinux-bootc:10.0 \ + sh -lc 'dnf -y install cargo rustc openssl-devel pkg-config >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' + sudo rm -rf .ci-cargo-home + + test-new-skopeo: + name: Test (new skopeo) + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + ref: ${{ github.event.pull_request.head.sha }} + fetch-depth: 20 + - name: Test in CentOS Stream 10 container (skopeo >= 1.19) + run: | + set -euxo pipefail + rm -rf .ci-cargo-home + mkdir -p .ci-cargo-home + docker run --rm \ + -e CARGO_TARGET_DIR=/tmp/target \ + -e EXPECT_BLOB_STREAM_SOURCE=GetRawBlob \ + -v "$PWD:/src" \ + -v "$PWD/.ci-cargo-home:/root/.cargo" \ + -w /src \ + quay.io/centos/centos:stream10 \ + sh -lc 'dnf -y install cargo rustc skopeo openssl-devel pkg-config >/dev/null && skopeo --version && cargo test --all-features -- --nocapture --quiet' + sudo rm -rf .ci-cargo-home + required-checks: if: always() - needs: [semver-checks, build-test] + needs: [semver-checks, build-test, test-old-skopeo, test-new-skopeo] runs-on: ubuntu-latest steps: - name: Check all jobs diff --git a/Cargo.toml b/Cargo.toml index 5ee696c..976845f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ rust-version = "1.70.0" [dependencies] futures-util = "0.3.13" +hex = "0.4.3" # NOTE when bumping this in a semver-incompatible way, because we re-export it you # must also bump the semver of this project. # See also https://github.com/youki-dev/oci-spec-rs/pull/288 @@ -18,6 +19,7 @@ rustix = { version = "1.0", features = ["process", "fs", "net"] } serde = { features = ["derive"], version = "1.0.125" } serde_json = "1.0.64" semver = "1.0.4" +sha2 = "0.10.9" thiserror = "2" tokio = { features = ["fs", "io-util", "macros", "process", "rt", "sync"], version = "1" } tracing = "0.1" @@ -29,6 +31,7 @@ itertools = "0.14.0" anyhow = "1.0" bytes = "1.5" clap = { version = "4.4", features = ["derive"] } +ocidir = "0.7.2" tempfile = "3.20.0" [lib] diff --git a/src/imageproxy.rs b/src/imageproxy.rs index 5ceba9f..d81b7ee 100644 --- a/src/imageproxy.rs +++ b/src/imageproxy.rs @@ -20,6 +20,7 @@ use cap_std_ext::{cap_std, cap_tempfile}; use futures_util::{Future, FutureExt}; use itertools::Itertools; use serde::{Deserialize, Serialize}; +use sha2::Digest as _; use std::fs::File; use std::iter::FusedIterator; use std::num::NonZeroU32; @@ -29,10 +30,11 @@ use std::os::unix::prelude::CommandExt; use std::path::PathBuf; use std::pin::Pin; use std::process::{Command, Stdio}; +use std::str::FromStr; use std::sync::{Arc, Mutex, OnceLock}; use thiserror::Error; -use tokio::io::{AsyncBufRead, AsyncReadExt}; -use tokio::sync::Mutex as AsyncMutex; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf}; +use tokio::sync::{oneshot, Mutex as AsyncMutex}; use tokio::task::JoinError; use tracing::instrument; @@ -55,6 +57,9 @@ pub enum Error { /// An error returned from the remote proxy #[error("proxy request returned error: {0}")] RequestReturned(Box), + /// An error returned via the `GetRawBlob` error pipe. + #[error(transparent)] + BlobError(#[from] GetBlobError), #[error("semantic version error: {0}")] SemanticVersion(#[from] semver::Error), #[error("proxy too old (requested={requested_version} found={found_version}) error")] @@ -98,6 +103,205 @@ impl From for Error { /// The error type returned from this crate. pub type Result = std::result::Result; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BlobStreamSource { + /// Used `GetRawBlob`. + GetRawBlob, + /// Fell back to `GetBlob`. + GetBlob, +} + +/// A streaming blob reader and "driver" future. +pub struct BlobStream<'a> { + source: BlobStreamSource, + expected_size: u64, + reader: Box, + driver: futures_util::future::BoxFuture<'a, Result<()>>, +} + +impl<'a> BlobStream<'a> { + pub fn source(&self) -> BlobStreamSource { + self.source + } + + pub fn expected_size(&self) -> u64 { + self.expected_size + } + + pub fn into_parts( + self, + ) -> ( + Box, + futures_util::future::BoxFuture<'a, Result<()>>, + ) { + (self.reader, self.driver) + } +} + +impl std::fmt::Debug for BlobStream<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlobStream") + .field("source", &self.source) + .field("expected_size", &self.expected_size) + .finish_non_exhaustive() + } +} + +#[derive(Debug)] +enum VerifiedBlobReadResult { + Complete { nbytes: u64, digest: Digest }, + Incomplete, +} + +#[derive(Debug)] +enum Hasher { + Sha256(sha2::Sha256), + Sha384(sha2::Sha384), + Sha512(sha2::Sha512), +} + +impl Hasher { + fn new_for_digest(digest: &Digest) -> Result { + use oci_spec::image::DigestAlgorithm; + Ok(match digest.algorithm() { + DigestAlgorithm::Sha256 => Self::Sha256(sha2::Sha256::new()), + DigestAlgorithm::Sha384 => Self::Sha384(sha2::Sha384::new()), + DigestAlgorithm::Sha512 => Self::Sha512(sha2::Sha512::new()), + DigestAlgorithm::Other(a) => { + return Err(Error::Other( + format!("Unsupported digest algorithm for blob verification: {a}").into(), + )); + } + _ => { + return Err(Error::Other( + format!( + "Unsupported digest algorithm for blob verification: {}", + digest.algorithm().as_ref() + ) + .into(), + )); + } + }) + } + + fn update(&mut self, chunk: &[u8]) { + match self { + Self::Sha256(h) => h.update(chunk), + Self::Sha384(h) => h.update(chunk), + Self::Sha512(h) => h.update(chunk), + } + } + + fn finalize_digest(self) -> Digest { + let (algorithm, hex) = match self { + Self::Sha256(h) => ("sha256", hex::encode(h.finalize())), + Self::Sha384(h) => ("sha384", hex::encode(h.finalize())), + Self::Sha512(h) => ("sha512", hex::encode(h.finalize())), + }; + Digest::from_str(&format!("{algorithm}:{hex}")).expect("valid digest") + } +} + +/// Wraps an [`AsyncRead`] and computes a digest; sends the result on EOF so the +/// driver future can verify the stream without re-reading it. +#[derive(Debug)] +struct VerifiedBlobReader { + inner: R, + nbytes: u64, + hasher: Option, + completion: Option>, +} + +impl VerifiedBlobReader { + fn new( + inner: R, + expected: Digest, + completion: oneshot::Sender, + ) -> Result { + let hasher = Hasher::new_for_digest(&expected)?; + Ok(Self { + inner, + nbytes: 0, + hasher: Some(hasher), + completion: Some(completion), + }) + } +} + +impl AsyncRead for VerifiedBlobReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> std::task::Poll> { + if buf.remaining() == 0 { + return std::task::Poll::Ready(Ok(())); + } + // ReadBuf may already have data; only hash the newly appended bytes. + let before = buf.filled().len(); + match Pin::new(&mut self.inner).poll_read(cx, buf) { + v @ std::task::Poll::Ready(Ok(())) => { + let after = buf.filled().len(); + let delta = after.checked_sub(before).unwrap(); + if delta > 0 { + let chunk = &buf.filled()[before..after]; + let hasher = self.hasher.as_mut().expect("hasher missing before EOF"); + hasher.update(chunk); + self.nbytes += delta as u64; + } else { + // EOF reached + let Some(tx) = self.completion.take() else { + return v; + }; + let Some(hasher) = self.hasher.take() else { + return v; + }; + let _ = tx.send(VerifiedBlobReadResult::Complete { + nbytes: self.nbytes, + digest: hasher.finalize_digest(), + }); + } + v + } + o => o, + } + } +} + +impl Drop for VerifiedBlobReader { + fn drop(&mut self) { + if let Some(tx) = self.completion.take() { + let _ = tx.send(VerifiedBlobReadResult::Incomplete); + } + } +} + +fn verify_blob_bytes_read( + expected: &Digest, + expected_size: u64, + r: VerifiedBlobReadResult, +) -> Result<()> { + match r { + VerifiedBlobReadResult::Incomplete => Ok(()), + VerifiedBlobReadResult::Complete { nbytes, digest } => { + if nbytes != expected_size { + return Err(Error::Other( + format!( + "Blob size mismatch for {expected}: expected {expected_size} bytes, read {nbytes} bytes" + ) + .into(), + )); + } + if digest != *expected { + return Err(Error::Other( + format!("Blob digest mismatch for {expected}: computed {digest}").into(), + )); + } + Ok(()) + } + } +} + /// File descriptor range which is reserved for passing data down into the proxy; /// avoid configuring the command to use files in this range. (Also, stdin is /// reserved) @@ -123,6 +327,11 @@ fn layer_info_piped_proto_version() -> &'static semver::VersionReq { LAYER_INFO_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.7").unwrap()) } +fn raw_blob_proto_version() -> &'static semver::VersionReq { + static RAW_BLOB_PROTO_VERSION: OnceLock = OnceLock::new(); + RAW_BLOB_PROTO_VERSION.get_or_init(|| semver::VersionReq::parse("0.2.8").unwrap()) +} + #[derive(Serialize)] struct Request { method: String, @@ -498,8 +707,8 @@ impl ImageProxy { let supported = base_proto_version(); if !supported.matches(&protover) { return Err(Error::ProxyTooOld { - requested_version: protover.to_string().into(), - found_version: supported.to_string().into(), + requested_version: supported.to_string().into(), + found_version: protover.to_string().into(), }); } r.protover = protover; @@ -507,6 +716,14 @@ impl ImageProxy { Ok(r) } + pub fn protocol_version(&self) -> &semver::Version { + &self.protover + } + + pub fn supports_get_raw_blob(&self) -> bool { + raw_blob_proto_version().matches(&self.protover) + } + /// Create and send a request. Should only be used by impl_request. async fn impl_request_raw( sockfd: Arc>, @@ -773,6 +990,61 @@ impl ImageProxy { Ok((bloblen, fd, err)) } + /// Fetch a blob as a stream, preferring `GetRawBlob` and falling back to `GetBlob`. + /// + /// The returned `driver` future completes only after proxy-side processing finishes; it also + /// verifies `expected_size` and `digest` for the `GetRawBlob` path. + #[instrument] + pub async fn get_blob_stream<'a>( + &'a self, + img: &OpenedImage, + digest: &Digest, + expected_size: u64, + ) -> Result> { + if !self.supports_get_raw_blob() { + let (reader, driver) = self.get_blob(img, digest, expected_size).await?; + let driver = driver.boxed(); + return Ok(BlobStream { + source: BlobStreamSource::GetBlob, + expected_size, + reader: Box::new(reader), + driver, + }); + } + + let (reported_size, fd, err) = self.get_raw_blob(img, digest).await?; + if let Some(sz) = reported_size { + if sz != expected_size { + return Err(Error::Other( + format!( + "Blob size mismatch for {digest}: expected {expected_size} bytes, got {sz} bytes" + ) + .into(), + )); + } + } + + let expected = digest.clone(); + let (tx, rx) = oneshot::channel(); + let verified = VerifiedBlobReader::new(fd, expected.clone(), tx)?; + let driver = async move { + err.await.map_err(Error::from)?; + match rx.await { + Ok(r) => verify_blob_bytes_read(&expected, expected_size, r), + Err(e) => Err(Error::Other( + format!("Blob stream verification channel error: {e}").into(), + )), + } + } + .boxed(); + Ok(BlobStream { + source: BlobStreamSource::GetRawBlob, + expected_size, + reader: Box::new(verified), + driver, + }) + } + /// Fetch a descriptor. The requested size and digest are verified (by the proxy process). #[instrument] pub async fn get_descriptor( @@ -1039,6 +1311,155 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_verified_blob_reader_ok() -> Result<()> { + use std::str::FromStr; + use tokio::io::AsyncReadExt; + + let data = b"hello world"; + let mut tmp = tempfile::NamedTempFile::new()?; + tmp.as_file_mut().write_all(data)?; + tmp.as_file_mut().sync_all()?; + + let digest = { + let mut h = sha2::Sha256::new(); + h.update(data); + Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() + }; + + let fd = tokio::fs::File::open(tmp.path()).await?; + let (tx, rx) = oneshot::channel(); + let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?; + + let mut out = Vec::new(); + reader.read_to_end(&mut out).await?; + assert_eq!(&out, data); + + let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?; + verify_blob_bytes_read(&digest, data.len() as u64, result)?; + Ok(()) + } + + #[tokio::test] + async fn test_verified_blob_reader_digest_mismatch() -> Result<()> { + use std::str::FromStr; + use tokio::io::AsyncReadExt; + + let data = b"hello world"; + let mut tmp = tempfile::NamedTempFile::new()?; + tmp.as_file_mut().write_all(data)?; + tmp.as_file_mut().sync_all()?; + + let digest = { + let mut h = sha2::Sha256::new(); + h.update(b"not the content"); + Digest::from_str(&format!("sha256:{}", hex::encode(h.finalize()))).unwrap() + }; + + let fd = tokio::fs::File::open(tmp.path()).await?; + let (tx, rx) = oneshot::channel(); + let mut reader = VerifiedBlobReader::new(fd, digest.clone(), tx)?; + + let mut out = Vec::new(); + reader.read_to_end(&mut out).await?; + assert_eq!(&out, data); + + let result = rx.await.map_err(|e| Error::Other(e.to_string().into()))?; + assert!(verify_blob_bytes_read(&digest, data.len() as u64, result).is_err()); + Ok(()) + } + + #[tokio::test] + async fn test_get_blob_stream_oci_dir() -> Result<()> { + use ocidir::{oci_spec as ocidir_spec, OciDir}; + + if !check_skopeo() { + return Ok(()); + } + + let td = tempfile::tempdir()?; + fn to_other(e: E) -> Error { + Error::Other(e.to_string().into()) + } + let layer_bytes = b"layer bytes"; + let dir = ocidir::cap_std::fs::Dir::open_ambient_dir( + td.path(), + ocidir::cap_std::ambient_authority(), + ) + .map_err(to_other)?; + let oci_dir = OciDir::ensure(dir).map_err(to_other)?; + let mut layerw = oci_dir.create_gzip_layer(None).map_err(to_other)?; + layerw.write_all(layer_bytes)?; + let layer = layerw.complete().map_err(to_other)?; + let layer_desc = layer.descriptor().build().unwrap(); + let layer_digest = Digest::from_str(layer_desc.digest().as_ref()).map_err(to_other)?; + let layer_size = layer_desc.size(); + + let mut manifest = oci_dir + .new_empty_manifest() + .map_err(to_other)? + .build() + .map_err(to_other)?; + let mut config = ocidir_spec::image::ImageConfigurationBuilder::default() + .architecture("amd64") + .os("linux") + .build() + .unwrap(); + oci_dir.push_layer(&mut manifest, &mut config, layer, "layer", None); + let config_desc = oci_dir.write_config(config).map_err(to_other)?; + manifest.set_config(config_desc); + oci_dir + .insert_manifest( + manifest, + Some("test"), + ocidir_spec::image::Platform::default(), + ) + .map_err(to_other)?; + + let proxy = ImageProxy::new().await?; + let imgref = format!("oci:{}:test", td.path().display()); + let img = proxy.open_image(&imgref).await?; + + let expected_source = match std::env::var("EXPECT_BLOB_STREAM_SOURCE").ok().as_deref() { + Some("GetRawBlob") => BlobStreamSource::GetRawBlob, + Some("GetBlob") => BlobStreamSource::GetBlob, + Some(v) => { + return Err(Error::Other( + format!( + "Invalid EXPECT_BLOB_STREAM_SOURCE={v}; expected GetRawBlob or GetBlob" + ) + .into(), + )); + } + None => { + if proxy.supports_get_raw_blob() { + BlobStreamSource::GetRawBlob + } else { + BlobStreamSource::GetBlob + } + } + }; + + let stream = proxy + .get_blob_stream(&img, &layer_digest, layer_size) + .await?; + assert_eq!(stream.source(), expected_source); + let (mut reader, driver) = stream.into_parts(); + + let mut sink = tokio::io::sink(); + let read = async move { + let n = tokio::io::copy(&mut *reader, &mut sink).await?; + Result::Ok(n) + }; + let (n, driver) = tokio::join!(read, driver); + assert_eq!(n?, layer_size); + driver?; + + proxy.close_image(&img).await?; + proxy.finalize().await?; + Ok(()) + } + // Helper to create a dummy OwnedFd using memfd_create for testing. fn create_dummy_fd() -> OwnedFd { memfd_create(c"test-fd", MemfdFlags::CLOEXEC).unwrap()