Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ default-members = [
"crates/composefs-setup-root",
"crates/composefs-storage",
"crates/composefs-erofs-debug",
"crates/composefs-splitfdstream",
"crates/composefs-splitdirfdstream",
]
resolver = "2"

Expand Down Expand Up @@ -43,8 +43,6 @@ composefs-ostree = { version = "0.7.0", path = "crates/composefs-ostree", defaul
cap-std-ext = "5.1.2"
ocidir = "0.7.2"

# JSON-RPC with FD passing for userns helper
jsonrpc-fdpass = { version = "0.1.0", default-features = false }
zlink = { version = "0.5", default-features = false, features = ["tokio", "introspection", "proxy", "server", "service", "tracing"] }
zlink-core = { version = "0.5", default-features = false, features = ["introspection", "std", "tracing"] }

Expand Down
15 changes: 11 additions & 4 deletions crates/composefs-ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ path = "src/main.rs"
[features]
default = ['pre-6.15', 'oci', 'containers-storage', 'ostree']
http = ['composefs-http']
oci = ['composefs-oci', 'composefs-oci/varlink']
oci = ['composefs-oci', 'composefs-oci/varlink', 'composefs-splitdirfdstream']
containers-storage = ['composefs-oci/containers-storage', 'cstorage']
ostree = ['composefs-ostree']
rhel9 = ['composefs/rhel9']
Expand All @@ -33,19 +33,26 @@ comfy-table = { version = "7.1", default-features = false }
composefs = { workspace = true, features = ["varlink"] }
composefs-boot = { workspace = true }
composefs-oci = { workspace = true, optional = true, features = ["boot"] }
composefs-splitdirfdstream = { path = "../composefs-splitdirfdstream", version = "0.7.0", optional = true }
composefs-http = { workspace = true, optional = true }
cstorage = { package = "composefs-storage", path = "../composefs-storage", version = "0.7.0", features = ["userns-helper"], optional = true }
cstorage = { package = "composefs-storage", path = "../composefs-storage", version = "0.7.0", features = ["layer-transfer"], optional = true }
composefs-ostree = { workspace = true, optional = true }
env_logger = { version = "0.11.0", default-features = false }
hex = { version = "0.4.0", default-features = false }
indicatif = { version = "0.17.0", default-features = false }
libsystemd = { version = "0.7" }
log = { version = "0.4", default-features = false }
rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] }
rustix = { version = "1.0.0", default-features = false, features = ["fs", "net", "pipe", "process"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
tokio = { version = "1.24.2", default-features = false, features = ["io-std", "io-util", "net", "rt", "sync"] }
tokio = { version = "1.24.2", default-features = false, features = ["io-std", "io-util", "net", "rt", "rt-multi-thread", "sync"] }
zlink = { workspace = true }

[dev-dependencies]
similar-asserts = "1.7.0"
tar = { version = "0.4.38", default-features = false }
tempfile = "3.8.0"
tokio = { version = "1.24.2", default-features = false, features = ["macros", "rt-multi-thread"] }

[lints]
workspace = true
225 changes: 219 additions & 6 deletions crates/composefs-ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl From<ErofsVersion> for composefs::erofs::format::FormatVersion {
/// start with `@`.
#[cfg(feature = "oci")]
#[derive(Debug, Clone)]
pub(crate) enum OciReference {
pub enum OciReference {
/// A content-addressable digest such as `sha256:abcdef…`.
Digest(composefs_oci::OciDigest),
/// A named ref resolved through the repository's ref tree, typically
Expand Down Expand Up @@ -377,6 +377,31 @@ enum OciCommand {
#[arg(long, value_enum, default_value_t = LocalFetchCli::Disabled)]
local_fetch: LocalFetchCli,
},
/// Copy an OCI image (and its layers) from another composefs repository
/// into this repository.
///
/// The destination repository is selected by the global `--repo`/`--user`/
/// `--system` flags. The source is `--from`.
///
/// Pass `--zerocopy` to attempt reflink (then hardlink) instead of copying
/// object data. This requires both repositories to be on the same
/// filesystem, to use the same hash algorithm, and the caller to have
/// `CAP_DAC_READ_SEARCH` (i.e. root).
/// Without `--zerocopy`, objects are always copied, which is safe on any
/// filesystem and across repositories using different hash algorithms.
Copy {
/// Image to copy (tag name or `@digest`).
image: OciReference,
/// Path to the source composefs repository.
#[clap(long)]
from: PathBuf,
/// Tag to assign to the image in the destination repository.
#[clap(long)]
name: Option<String>,
/// Use reflink/hardlink zero-copy transfer (requires same filesystem, same hash algorithm, and root).
#[clap(long)]
zerocopy: bool,
},

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you split out the oci copy cli support into a separate commit?

/// List all tagged OCI images in the repository
#[clap(name = "images")]
ListImages {
Expand Down Expand Up @@ -940,12 +965,18 @@ pub async fn run_if_socket_activated() -> Result<bool> {
if std::env::args_os().len() != 1 {
return Ok(false);
}
let Some(listener) = crate::varlink::try_activated_listener()? else {
return Ok(false);
};
let service = crate::varlink::CfsctlService::activated();
crate::varlink::serve_activated(service, listener).await?;
Ok(true)
match crate::varlink::try_activated_listener()? {
Some(crate::varlink::ActivatedSocket::Connected(l)) => {
crate::varlink::serve_activated(service, l).await?;
Ok(true)
}
Some(crate::varlink::ActivatedSocket::Listening(listener)) => {
crate::varlink::serve_on_listener(service, listener).await?;
Ok(true)
}
None => Ok(false),
}
}

/// Top-level dispatch: handle init specially, otherwise open repo and run.
Expand Down Expand Up @@ -1147,6 +1178,111 @@ where
Ok(repo)
}

/// Copy an OCI image (and all its layers) from one repository to another using varlink connections.
#[cfg(feature = "oci")]
pub async fn copy_image(
conn_src: &mut zlink::unix::Connection,
conn_dest: &mut zlink::unix::Connection,
handle_src: u64,
handle_dest: u64,
image: &OciReference,
name: Option<&str>,
zerocopy: bool,
) -> Result<crate::varlink::layer_sync::FinalizeImageReply> {
use crate::varlink::layer_sync::LayerRef;
use crate::varlink::oci::OciError;
use crate::varlink::proxy::{GetLayerParams, OciProxy};
use anyhow::ensure;
use zlink::futures_util::StreamExt as _;

let image_str = image.to_string();
let inspect = conn_src
.inspect(handle_src, &image_str)
.await
.context("zlink transport error calling Inspect")?
.map_err(|e: OciError| anyhow::anyhow!("Inspect failed: {e:?}"))?;

ensure!(
!inspect.manifest.is_empty(),
"inspect returned empty manifest"
);
ensure!(!inspect.config.is_empty(), "inspect returned empty config");

// Extract ordered layer identifiers via the shared helper that handles
// both container images (rootfs.diff_ids) and OCI artifacts (manifest
// layer digests).
let diff_ids_ordered = composefs_oci::extract_layer_ids(&inspect.manifest, &inspect.config)
.context("extracting layer identifiers")?;

let mut layer_refs: Vec<LayerRef> = Vec::with_capacity(diff_ids_ordered.len());

for diff_id in &diff_ids_ordered {
let has = conn_dest
.has_layer(handle_dest, diff_id)
.await
.context("zlink transport error calling HasLayer")?
.map_err(|e: OciError| anyhow::anyhow!("HasLayer failed: {e:?}"))?;

let layer_verity = if has.present {
has.layer_verity
.context("HasLayer returned present=true but no layer_verity")?
} else {
let get_params = GetLayerParams {
diff_id: Some(diff_id.to_string()),
storage: None,
};
let mut get_stream = std::pin::pin!(
conn_src
.get_layer(handle_src, get_params)
.await
.context("zlink transport error calling GetLayer")?
);
let mut all_fds: Vec<std::os::fd::OwnedFd> = Vec::new();
let mut get_reply = None;
while let Some(item) = get_stream.next().await {
let (result, fds) = item.context("GetLayer stream frame error")?;
let reply =
result.map_err(|e: OciError| anyhow::anyhow!("GetLayer failed: {e:?}"))?;
get_reply = Some(reply);
all_fds.extend(fds);
}
let get_reply = get_reply.context("GetLayer returned empty stream")?;
let dir_count = get_reply.dir_count as usize;

let pipe_and_dirfds_len = 1 + dir_count;
let lifetime_fds = all_fds.split_off(pipe_and_dirfds_len);

let put_reply = conn_dest
.put_layer(handle_dest, diff_id, zerocopy, all_fds)
.await
.context("zlink transport error calling PutLayer")?
.map_err(|e: OciError| anyhow::anyhow!("PutLayer failed: {e:?}"))?;
drop(lifetime_fds);

put_reply.layer_verity
};

layer_refs.push(LayerRef {
diff_id: diff_id.clone(),
layer_verity,
});
}

let finalize = conn_dest
.finalize_image(
handle_dest,
&inspect.manifest,
&inspect.config,
layer_refs,
name,
)
.await
.context("zlink transport error calling FinalizeImage")?
.map_err(|e: OciError| anyhow::anyhow!("FinalizeImage failed: {e:?}"))?;

Ok(finalize)
}

/// Resolve an [`OciReference`] to an [`OciImage`].
#[cfg(feature = "oci")]
pub(crate) fn resolve_oci_image<ObjectID: FsVerityHashValue>(
Expand Down Expand Up @@ -1350,6 +1486,8 @@ where
ObjectID: FsVerityHashValue,
{
let repo = Arc::new(repo);
#[cfg(feature = "oci")]
let dest_path = resolve_repo_path(&args)?;
match args.cmd {
Command::Init { .. } => {
// Handled in run_app before we get here
Expand Down Expand Up @@ -1453,6 +1591,81 @@ where
println!("Boot image: {}", image_verity.to_hex());
}
}
OciCommand::Copy {
ref image,
ref from,
ref name,
zerocopy,
} => {
use crate::varlink::proxy::RepositoryProxy;

let src_hash = resolve_hash_type(from, args.hash, !args.no_upgrade)
.with_context(|| format!("opening source repository {}", from.display()))?;
let dest_hash = resolve_hash_type(&dest_path, args.hash, !args.no_upgrade)
.with_context(|| {
format!("opening destination repository {}", dest_path.display())
})?;

if zerocopy && src_hash != dest_hash {
anyhow::bail!(
"--zerocopy requires matching hash algorithms; \
source uses {src_hash:?} but destination uses {dest_hash:?}"
);
}

let from_str = from.to_str().context("source path is not valid UTF-8")?;
let dest_str = dest_path
.to_str()
.context("destination path is not valid UTF-8")?;

let service_src = crate::varlink::CfsctlService::new();
let service_dest = crate::varlink::CfsctlService::new();

let (mut conn_src, _srv_src) = crate::varlink::spawn_in_process(service_src)
.context("spawning source in-process service")?;
let (mut conn_dest, _srv_dest) = crate::varlink::spawn_in_process(service_dest)
.context("spawning destination in-process service")?;

let handle_src = conn_src
.open_repository(Some(from_str), None, None)
.await
.context("zlink transport error calling OpenRepository on source")?
.map_err(|e| anyhow::anyhow!("OpenRepository failed on source: {e:?}"))?
.handle;

let handle_dest = conn_dest
.open_repository(Some(dest_str), None, None)
.await
.context("zlink transport error calling OpenRepository on destination")?
.map_err(|e| anyhow::anyhow!("OpenRepository failed on destination: {e:?}"))?
.handle;

let finalize_reply = copy_image(
&mut conn_src,
&mut conn_dest,
handle_src,
handle_dest,
image,
name.as_deref(),
zerocopy,
)
.await?;

let tag_info = if let Some(n) = name {
format!(", tagged as {n}")
} else {
String::new()
};
println!(
"Copied image {image} from {} to destination repo{}",
from.display(),
tag_info
);
println!("Manifest digest: {}", finalize_reply.manifest_digest);
println!("Manifest verity: {}", finalize_reply.manifest_verity);
println!("Config digest: {}", finalize_reply.config_digest);
println!("Config verity: {}", finalize_reply.config_verity);
}
OciCommand::ListImages { json } => {
let images = composefs_oci::oci_image::list_images(&repo)?;

Expand Down
Loading
Loading