diff --git a/cmd/y-cluster/images.go b/cmd/y-cluster/images.go index e6a7caef..0266fec 100644 --- a/cmd/y-cluster/images.go +++ b/cmd/y-cluster/images.go @@ -1,12 +1,16 @@ package main import ( + "context" "fmt" "io" "os" + "strings" "github.com/spf13/cobra" + "go.uber.org/zap" + "github.com/Yolean/y-cluster/pkg/cache" "github.com/Yolean/y-cluster/pkg/cluster" "github.com/Yolean/y-cluster/pkg/images" ) @@ -101,43 +105,190 @@ so callers can record what landed.`, func imagesLoadCmd() *cobra.Command { var contextName string + var cacheDir string + useCache := true cmd := &cobra.Command{ - Use: "load ", - Short: "Stream an OCI archive into the cluster node's containerd", - Long: `Open (a local file path, or "-" for stdin) and pipe its -bytes into ` + "`ctr -n k8s.io image import -`" + ` on the cluster node. The -ref + tag preserved on the loaded image are whatever the archive's -manifest carries — same as running ctr image import on the node -directly. No cache is touched, which is what callers want when their -build pipelines (e.g. ` + "`contain`" + ` outputting tarballs to /tmp) own -the lifecycle of the archive bytes. - -Cluster discovery uses --context (default "local"): the docker -backend imports via the daemon API, the qemu backend via SSH. + Use: "load ", + Short: "Ensure an image is in the cluster's containerd", + Long: `Make an image available in the cluster's containerd. The single +positional argument dispatches by leading character (same shape +as docker build / git clone): + + ./... a local path (relative -- MUST start with "./") + /... a local path (absolute) + ~/... a local path (home-relative) + - stdin (an OCI archive) + a remote image reference + +Bare names without "./" are remote refs by rule. A directory +called "myimage" in CWD is reached as "./myimage" -- +"myimage" alone is dispatched as a remote registry ref and +fails fast. The rule is unambiguous because container references +can't legally start with ./, /, or ~/ per the OCI distribution +spec. + +Path inputs: + - file: an already-tarred OCI archive + - directory: an OCI v1 layout (tar streamed on the fly, + equivalent to ` + "`tar -cf - -C . | y-cluster images load -`" + `) + +Remote refs (default flow): resolve digest, check the cluster's +k8s.io namespace -- if the digest is already indexed, no-op. +Otherwise pull into the y-cluster shared cache (idempotent, +dedup by digest, reused on the next load to another cluster) +and stream to ` + "`ctr image import`" + `. Pass --cache=false to skip +the persistent cache (pull into a tempdir, load, throw the +tempdir away); rejected for path / stdin input where the caller +already owns the bytes. + +Cluster discovery uses --context (default "local"): docker +backends import via the daemon API, qemu backends via SSH. No +SSH/docker-exec bytes are wasted on a re-deploy of a digest the +cluster already has. Examples: + y-cluster images load registry.k8s.io/pause:3.10 + y-cluster images load registry.k8s.io/pause:3.10 --cache=false + y-cluster images load ./myimage/target-oci y-cluster images load /tmp/builds/myapp.tar - cat /tmp/builds/myapp.tar | y-cluster images load - - for f in builds/*.tar; do y-cluster images load "$f"; done`, + cat /tmp/builds/myapp.tar | y-cluster images load -`, Args: cobra.ExactArgs(1), RunE: func(c *cobra.Command, args []string) error { - r, closer, err := openYAMLInput(args[0], c.InOrStdin()) - if err != nil { - return err + ctx := c.Context() + logger := loggerFromContext(ctx) + arg := args[0] + // Validate the input-flag combo BEFORE touching the + // kubeconfig -- a misuse error should fire even when + // the context isn't reachable, so the operator sees + // the real problem first. + switch { + case arg == "-": + if !useCache { + return fmt.Errorf("--cache=false is incompatible with stdin input") + } + case isPathArg(arg): + if !useCache { + return fmt.Errorf("--cache=false is incompatible with path input %q (caller already owns the bytes)", arg) + } } - defer closer() - lr, err := cluster.Lookup(c.Context(), "", contextName) + lr, err := cluster.Lookup(ctx, "", contextName) if err != nil { return err } - return images.Load(c.Context(), lr, r, loggerFromContext(c.Context())) + switch { + case arg == "-": + return images.Load(ctx, lr, c.InOrStdin(), logger) + case isPathArg(arg): + return loadFromPath(ctx, lr, arg, logger) + default: + return loadFromRef(ctx, lr, arg, cacheDir, useCache, logger) + } }, } cmd.Flags().StringVar(&contextName, "context", cluster.DefaultContext, "kubeconfig context name") + cmd.Flags().StringVar(&cacheDir, "cache-dir", "", "override cache root (also: $Y_CLUSTER_CACHE_DIR) -- only honoured for remote-ref input") + cmd.Flags().BoolVar(&useCache, "cache", true, "for remote-ref input, keep the pulled layout in the shared cache for reuse on the next load to another cluster") return cmd } +// isPathArg classifies the positional argument as a path +// (vs. a remote registry ref). Container references can't +// legally start with these characters per the OCI distribution +// spec -- hosts never lead with `.`, `/`, or `~` -- so the rule +// is unambiguous. Bare names like "nginx" are remote refs; +// callers who want the local "nginx" directory write "./nginx", +// matching standard shell hygiene. The `~` rule is mostly +// documentation -- the shell expands tilde before exec so the +// program usually sees an absolute path -- but we honour it +// when the shell hasn't expanded for any reason. +func isPathArg(arg string) bool { + return strings.HasPrefix(arg, "./") || + strings.HasPrefix(arg, "/") || + strings.HasPrefix(arg, "~/") || + arg == "." || arg == ".." +} + +// loadFromPath dispatches an existing-path input to either the +// archive (file) or OCI-layout (dir) variant of Load. Errors +// clearly when the path doesn't exist so the operator doesn't +// have to guess whether they typo'd. +func loadFromPath(ctx context.Context, lr *cluster.LookupResult, path string, logger *zap.Logger) error { + st, err := os.Stat(path) + if err != nil { + return err + } + if st.IsDir() { + r := images.TarOCIDirReader(path) + defer func() { _ = r.Close() }() + return images.Load(ctx, lr, r, logger) + } + f, err := os.Open(path) + if err != nil { + return err + } + defer func() { _ = f.Close() }() + return images.Load(ctx, lr, f, logger) +} + +// loadFromRef implements the "ref -> cluster" flow: resolve +// digest, skip if the cluster already has it, otherwise pull +// and stream to containerd. The cache root is either the +// shared y-cluster cache (useCache=true) or a tempdir cleaned +// up after the load (useCache=false). Same code path for both +// -- the only difference is the root and whether we +// RemoveAll at the end. No second cache structure. +func loadFromRef(ctx context.Context, lr *cluster.LookupResult, ref, cacheDir string, useCache bool, logger *zap.Logger) error { + // Resolve digest first so we can ask the cluster whether + // it already has the image -- and skip the pull entirely + // when the answer is yes. Tag input HEADs; digest input + // is offline-safe. + digestRef, err := images.ResolveDigest(ctx, ref) + if err != nil { + return err + } + logger.Info("resolved ref", + zap.String("input", ref), + zap.String("digest", digestRef), + ) + if images.PresentInCluster(ctx, lr, digestRef) { + logger.Info("image already in cluster; skipping import", + zap.String("ref", digestRef), + ) + return nil + } + + root := cacheDir + if !useCache { + tmp, err := os.MkdirTemp("", "y-cluster-images-load-") + if err != nil { + return err + } + defer func() { _ = os.RemoveAll(tmp) }() + root = tmp + } + + if _, err := images.Cache(ctx, ref, root, logger); err != nil { + return err + } + + // Derive the layout directory the cache just wrote. The + // helper centralises the path shape so a future cache + // backend swap (e.g. embedded registry:3 in 2.0) touches + // one helper, not the load path here. + digest := digestRef + if at := strings.LastIndex(digest, "@"); at >= 0 { + digest = digest[at+1:] + } + dir, err := cache.ImageLayout(root, digest) + if err != nil { + return err + } + r := images.TarOCIDirReader(dir) + defer func() { _ = r.Close() }() + return images.Load(ctx, lr, r, logger) +} + // openYAMLInput resolves the positional input arg ("" or // "-") to an io.Reader plus a deferred-close callback. We expose // the close callback (rather than just an io.ReadCloser) because diff --git a/cmd/y-cluster/images_test.go b/cmd/y-cluster/images_test.go index 38f7cf7..812b6b2 100644 --- a/cmd/y-cluster/images_test.go +++ b/cmd/y-cluster/images_test.go @@ -103,3 +103,70 @@ func TestImagesLoadCmd_FileNotFound(t *testing.T) { t.Fatal("expected error for missing archive file") } } + +// TestIsPathArg pins the prefix-driven dispatch rule -- the +// single source of truth for "is this a path or a remote ref?" +// the load cmd reads. Reference shape is documented in the +// load subcommand's --help, which the cases below mirror. +func TestIsPathArg(t *testing.T) { + paths := []string{ + "./relative/path", + "./relative/path.tar", + "./", ".", "..", + "/absolute/path", + "/", "/tmp", + "~/home/path", + } + refs := []string{ + "nginx", + "nginx:1.27", + "nginx@sha256:1111111111111111111111111111111111111111111111111111111111111111", + "docker.io/library/nginx:1.27", + "registry.k8s.io/pause:3.10", + "localhost:5000/yolean/echo:v1", + "builds-registry.default.svc.cluster.local/myrepo/myapp:local-dev", + } + for _, p := range paths { + if !isPathArg(p) { + t.Errorf("expected %q to dispatch as path", p) + } + } + for _, r := range refs { + if isPathArg(r) { + t.Errorf("expected %q to dispatch as remote ref", r) + } + } +} + +// TestImagesLoadCmd_CacheFalseRejectedForStdin: --cache=false +// is meaningful only for remote refs (where the alternative is +// "pull into a tempdir"). For stdin, the caller's already +// holding the bytes; --cache=false has no semantic and should +// fail loudly. +func TestImagesLoadCmd_CacheFalseRejectedForStdin(t *testing.T) { + cmd := rootCmd() + cmd.SetIn(strings.NewReader("noise")) + cmd.SetArgs([]string{"images", "load", "--context=does-not-exist", "--cache=false", "-"}) + err := cmd.Execute() + if err == nil { + t.Fatal("expected error for --cache=false with stdin") + } + if !strings.Contains(err.Error(), "--cache=false") { + t.Errorf("error should call out --cache=false: %v", err) + } +} + +// TestImagesLoadCmd_CacheFalseRejectedForPath: same shape for +// the path-input case -- caller owns the bytes; toggling the +// cache is meaningless. +func TestImagesLoadCmd_CacheFalseRejectedForPath(t *testing.T) { + cmd := rootCmd() + cmd.SetArgs([]string{"images", "load", "--context=does-not-exist", "--cache=false", "./some/path"}) + err := cmd.Execute() + if err == nil { + t.Fatal("expected error for --cache=false with path") + } + if !strings.Contains(err.Error(), "--cache=false") { + t.Errorf("error should call out --cache=false: %v", err) + } +} diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index bc0c5ea..65e72f8 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -57,6 +57,22 @@ func Images(flagOverride string) (string, error) { return filepath.Join(root, "images"), nil } +// ImageLayout returns the on-disk layout directory for a single +// image, given its sha256 digest (e.g. "sha256:abc..."). +// Callers shouldn't think about this path -- the cache is +// implementation-internal -- but the load-by-ref flow needs it +// to feed bytes to `ctr image import` from a cache hit. +// Exporting the derivation here keeps the layout shape in one +// place so a future cache backend swap touches one helper, not +// every caller. +func ImageLayout(flagOverride, digest string) (string, error) { + imagesDir, err := Images(flagOverride) + if err != nil { + return "", err + } + return filepath.Join(imagesDir, digest), nil +} + // K3s returns the k3s download root: airgap tarballs, k3s binary, // per-version. Replaces the qemu provisioner's old // ~/.cache/y-cluster-qemu/airgap/ location. diff --git a/pkg/gateway/hostnames_test.go b/pkg/gateway/hostnames_test.go index a80155f..5f6ea9f 100644 --- a/pkg/gateway/hostnames_test.go +++ b/pkg/gateway/hostnames_test.go @@ -99,7 +99,7 @@ func TestHostnames_FromHTTPRouteRoundTrip(t *testing.T) { Listeners: []Listener{{Name: "https", Port: 443, Protocol: "HTTPS"}}, }}, HTTPRoutes: []HTTPRoute{{ - Namespace: "keycloak-v3", + Namespace: "myapp", Name: "keycloak-admin", ParentRefs: rawJSON(t, `[{"name":"y-cluster","namespace":"y-cluster"}]`), Hostnames: []string{"keycloak-admin", "keycloak-admin.example.com"}, diff --git a/pkg/gateway/summary_test.go b/pkg/gateway/summary_test.go index 1c84d48..29e8348 100644 --- a/pkg/gateway/summary_test.go +++ b/pkg/gateway/summary_test.go @@ -39,13 +39,13 @@ func TestBuildSummary_HTTPRouteServiceBackend(t *testing.T) { }, }}, HTTPRoutes: []HTTPRoute{{ - Namespace: "keycloak-v3", + Namespace: "myapp", Name: "keycloak-admin", ParentRefs: rawJSON(t, `[{"name":"y-cluster","namespace":"y-cluster","sectionName":"https"}]`), Hostnames: []string{"keycloak-admin"}, Rules: rawJSON(t, `[{ "matches": [{"path": {"type": "PathPrefix", "value": "/"}}], - "backendRefs": [{"name": "keycloak", "namespace": "keycloak-v3", "port": 8080}] + "backendRefs": [{"name": "keycloak", "namespace": "myapp", "port": 8080}] }]`), }}, } @@ -71,7 +71,7 @@ func TestBuildSummary_HTTPRouteServiceBackend(t *testing.T) { if len(rs) != 1 { t.Fatalf("routes: %+v", rs) } - if rs[0].Source != "HTTPRoute/keycloak-v3/keycloak-admin#0" { + if rs[0].Source != "HTTPRoute/myapp/keycloak-admin#0" { t.Errorf("source=%q", rs[0].Source) } if len(rs[0].Matches) != 1 || rs[0].Matches[0].Path != "PathPrefix=/" { @@ -81,7 +81,7 @@ func TestBuildSummary_HTTPRouteServiceBackend(t *testing.T) { t.Fatalf("backends: %+v", rs[0].Backends) } svc := rs[0].Backends[0].Service - if svc == nil || svc.Name != "keycloak" || svc.Namespace != "keycloak-v3" || svc.Port != 8080 { + if svc == nil || svc.Name != "keycloak" || svc.Namespace != "myapp" || svc.Port != 8080 { t.Errorf("service: %+v", svc) } } @@ -213,7 +213,7 @@ func TestBuildSummary_MultiHostnameRouteAppearsInEachBucket(t *testing.T) { Listeners: []Listener{{Name: "https", Port: 443, Protocol: "HTTPS"}}, }}, HTTPRoutes: []HTTPRoute{{ - Namespace: "keycloak-v3", + Namespace: "myapp", Name: "keycloak-admin", ParentRefs: rawJSON(t, `[{"name":"y-cluster","namespace":"y-cluster"}]`), Hostnames: []string{"keycloak-admin", "keycloak-admin.example.com"}, diff --git a/pkg/images/cache.go b/pkg/images/cache.go index 652294c..0c5e2e4 100644 --- a/pkg/images/cache.go +++ b/pkg/images/cache.go @@ -126,9 +126,43 @@ func Cache(ctx context.Context, ref, cacheRoot string, logger *zap.Logger) (stri return "", fmt.Errorf("write image %s: %w", digestRef, err) } } + // Symmetric with the "pulling image" / "image already + // cached" lines: one info entry per network pull, one per + // cache hit, one per import. Grep-friendly for operators + // watching a long sideload script. + logger.Info("image cached", + zap.String("ref", digestRef), + zap.String("path", dir), + ) return digestRef, nil } +// ResolveDigest resolves ref to its digest-pinned form without +// downloading any blobs. Used by the load-by-ref path to ask +// "is this already in the cluster?" before deciding whether to +// pull. Digest-pinned input passes through with no network call; +// tag input HEADs the registry. +func ResolveDigest(ctx context.Context, ref string) (string, error) { + parsed, err := name.ParseReference(ref) + if err != nil { + return "", fmt.Errorf("resolve-digest: parse %q: %w", ref, err) + } + var digest v1.Hash + if dr, ok := parsed.(name.Digest); ok { + digest, err = v1.NewHash(dr.DigestStr()) + if err != nil { + return "", fmt.Errorf("resolve-digest: parse digest %s: %w", dr.DigestStr(), err) + } + } else { + desc, err := remote.Head(parsed, remote.WithContext(ctx)) + if err != nil { + return "", fmt.Errorf("resolve-digest: HEAD %s: %w", ref, err) + } + digest = desc.Digest + } + return digestReference(parsed, digest) +} + // digestReference rebuilds the input reference with its digest // resolved, e.g. "nginx:1.27" → "nginx@sha256:abc…", preserving // repository / registry. Used for log lines and for the return diff --git a/pkg/images/cache_test.go b/pkg/images/cache_test.go index 06466b1..9f1b595 100644 --- a/pkg/images/cache_test.go +++ b/pkg/images/cache_test.go @@ -2,8 +2,17 @@ package images import ( "context" + "net/http/httptest" + "net/url" + "os" + "path/filepath" "strings" "testing" + + "github.com/google/go-containerregistry/pkg/name" + "github.com/google/go-containerregistry/pkg/registry" + "github.com/google/go-containerregistry/pkg/v1/random" + "github.com/google/go-containerregistry/pkg/v1/remote" ) func TestCache_MalformedRefErrors(t *testing.T) { @@ -28,6 +37,141 @@ func TestCache_UnreachableRegistryPropagates(t *testing.T) { } } +// TestCache_RoundTripWithRandomImage runs the full happy path +// without any network egress: push a small randomly-built image +// into an in-process registry, Cache from it with a tempdir as +// the override root, and assert the returned digestRef matches +// the image content (not just "has @sha256: shape") and that +// the layout landed at the expected path. +// +// This also exercises the path the cmd-side `--cache=false` +// flow takes: Cache with a tempdir as cacheRoot writes to +// /images// and the caller cleans up. No +// separate Export() codepath needed. +func TestCache_RoundTripWithRandomImage(t *testing.T) { + srv := httptest.NewServer(registry.New()) + defer srv.Close() + u, err := url.Parse(srv.URL) + if err != nil { + t.Fatal(err) + } + img, err := random.Image(1024, 1) + if err != nil { + t.Fatal(err) + } + ref, err := name.NewTag(u.Host + "/test/cacheroundtrip:v1") + if err != nil { + t.Fatal(err) + } + if err := remote.Write(ref, img); err != nil { + t.Fatal(err) + } + + root := t.TempDir() + t.Setenv("Y_CLUSTER_CACHE_DIR", root) + digestRef, err := Cache(context.Background(), ref.String(), "", nil) + if err != nil { + t.Fatalf("Cache: %v", err) + } + imgDigest, err := img.Digest() + if err != nil { + t.Fatal(err) + } + wantRef := ref.Context().Name() + "@" + imgDigest.String() + if digestRef != wantRef { + t.Errorf("digestRef: got %q, want %q", digestRef, wantRef) + } + layoutDir := filepath.Join(root, "images", imgDigest.String()) + if _, err := os.Stat(filepath.Join(layoutDir, "oci-layout")); err != nil { + t.Errorf("oci-layout missing at expected path %s: %v", layoutDir, err) + } +} + +// TestCache_DigestPinnedInputPreservesDigest: digest-pinned +// input must skip the registry HEAD entirely. We can't directly +// assert "no HEAD" without a hooked transport, but the +// request trace in -v test output makes the absence visible if +// we're suspicious. Behavioral assertion: returned ref equals +// the input ref byte-for-byte. +func TestCache_DigestPinnedInputPreservesDigest(t *testing.T) { + srv := httptest.NewServer(registry.New()) + defer srv.Close() + u, err := url.Parse(srv.URL) + if err != nil { + t.Fatal(err) + } + img, err := random.Image(512, 1) + if err != nil { + t.Fatal(err) + } + ref, err := name.NewTag(u.Host + "/test/cachedigestin:v1") + if err != nil { + t.Fatal(err) + } + if err := remote.Write(ref, img); err != nil { + t.Fatal(err) + } + imgDigest, err := img.Digest() + if err != nil { + t.Fatal(err) + } + digestInput := ref.Context().Name() + "@" + imgDigest.String() + + t.Setenv("Y_CLUSTER_CACHE_DIR", t.TempDir()) + got, err := Cache(context.Background(), digestInput, "", nil) + if err != nil { + t.Fatalf("Cache: %v", err) + } + if got != digestInput { + t.Errorf("digest-pinned input must round-trip verbatim: got %q want %q", got, digestInput) + } +} + +// TestResolveDigest_DigestInputIsPassThrough: ResolveDigest on +// a digest-pinned input must not touch the network. Against +// an unrouteable host a HEAD would error; success here proves +// no network IO occurred. +func TestResolveDigest_DigestInputIsPassThrough(t *testing.T) { + const ref = "127.0.0.1:1/foo@sha256:1111111111111111111111111111111111111111111111111111111111111111" + got, err := ResolveDigest(context.Background(), ref) + if err != nil { + t.Fatalf("ResolveDigest: %v", err) + } + if got != ref { + t.Errorf("got %q, want %q", got, ref) + } +} + +// TestResolveDigest_TagInputHEADs: tag input must HEAD the +// registry; against an unreachable host that produces a +// transport error. Assert the error wraps the HEAD step. +func TestResolveDigest_TagInputHEADs(t *testing.T) { + _, err := ResolveDigest(context.Background(), "127.0.0.1:1/foo:tag") + if err == nil { + t.Fatal("expected HEAD error against unreachable host") + } + if !strings.Contains(err.Error(), "HEAD") { + t.Errorf("error should wrap the HEAD step: %v", err) + } +} + +// TestCache_FallbackFromRegistryK8s is the network-touching +// end-to-end assertion. registry.k8s.io/pause is the canonical +// tiny test image. -short skips. +func TestCache_FallbackFromRegistryK8s(t *testing.T) { + if testing.Short() { + t.Skip("skipped under -short (network pull from registry.k8s.io)") + } + t.Setenv("Y_CLUSTER_CACHE_DIR", t.TempDir()) + digestRef, err := Cache(context.Background(), "registry.k8s.io/pause:3.10", "", nil) + if err != nil { + t.Fatalf("Cache against registry.k8s.io: %v", err) + } + if !strings.HasPrefix(digestRef, "registry.k8s.io/pause@sha256:") { + t.Errorf("digestRef shape: %q", digestRef) + } +} + // Layout-existence edge cases — cheap to cover here without a // real registry. End-to-end "warm cache → no-op → byte-equal // layout" coverage lives in CI4e against a registry container. diff --git a/pkg/images/load.go b/pkg/images/load.go index efe23ec..cd339fa 100644 --- a/pkg/images/load.go +++ b/pkg/images/load.go @@ -1,10 +1,14 @@ package images import ( + "archive/tar" "bytes" "context" "fmt" "io" + "io/fs" + "os" + "path/filepath" "strings" "go.uber.org/zap" @@ -123,6 +127,108 @@ func Load(ctx context.Context, lr *cluster.LookupResult, archive io.Reader, logg return nil } +// TarOCIDir streams a USTAR archive of an OCI v1 layout rooted +// at dir into w. The entries are dir-relative (oci-layout, +// index.json, blobs/sha256/*) -- the same shape `tar -cf - -C +// .` produces, which is what `ctr image import` accepts +// as the "OCI image layout (as tar)" import format. +// +// Used by the load path when the caller supplies a directory: +// stream a tar of the layout straight into the cluster node's +// containerd without making an intermediate file on disk. +// Streaming (not building in memory) keeps memory bounded. +func TarOCIDir(dir string, w io.Writer) error { + tw := tar.NewWriter(w) + err := filepath.Walk(dir, func(path string, info fs.FileInfo, walkErr error) error { + if walkErr != nil { + return walkErr + } + rel, err := filepath.Rel(dir, path) + if err != nil { + return err + } + if rel == "." { + return nil + } + hdr, err := tar.FileInfoHeader(info, "") + if err != nil { + return err + } + hdr.Name = filepath.ToSlash(rel) + if info.IsDir() && !strings.HasSuffix(hdr.Name, "/") { + hdr.Name += "/" + } + if err := tw.WriteHeader(hdr); err != nil { + return err + } + if !info.Mode().IsRegular() { + return nil + } + f, err := os.Open(path) + if err != nil { + return err + } + _, copyErr := io.Copy(tw, f) + _ = f.Close() + return copyErr + }) + if err != nil { + _ = tw.Close() + return err + } + return tw.Close() +} + +// TarOCIDirReader is the streaming variant of TarOCIDir: returns +// an io.ReadCloser the caller pipes into Load. Internally runs +// TarOCIDir in a goroutine through an io.Pipe; the caller MUST +// Close() to release the goroutine even on aborted reads. +func TarOCIDirReader(dir string) io.ReadCloser { + pr, pw := io.Pipe() + go func() { + err := TarOCIDir(dir, pw) + _ = pw.CloseWithError(err) + }() + return pr +} + +// PresentInCluster reports whether digestRef (e.g. +// "docker.io/library/nginx@sha256:abc...") is already indexed in +// the cluster's k8s.io containerd namespace. Returns false on any +// lookup failure -- the caller treats that as "import anyway", +// which is safe because `ctr image import` is itself idempotent +// at the content-store level. +// +// Match policy: succeeds when SOME row in `ctr image list` has +// the same digest as digestRef, regardless of the row's ref +// name. That covers the cases that matter: +// +// - a prior load brought in the same digest under a different +// tag (mirrors, retag); +// - the digest alias the regular Load() step writes back into +// containerd (host/name@sha256:...) lines up exactly with +// digestRef. +// +// Skipping a re-import on a digest hit is what saves the +// SSH/docker-exec byte transfer the layered-registry-store 2.0 +// would eliminate at the protocol level. +func PresentInCluster(ctx context.Context, lr *cluster.LookupResult, digestRef string) bool { + wantDigest := digestRef + if at := strings.LastIndex(digestRef, "@"); at >= 0 { + wantDigest = digestRef[at+1:] + } + pairs := listRefsWithDigests(ctx, lr) + if pairs == nil { + return false + } + for _, p := range pairs { + if p.digest == wantDigest { + return true + } + } + return false +} + // importedRef is one (ref, digest) row from `ctr image list`. // "imported" is a slight misnomer here -- listRefsWithDigests // returns the post-import snapshot of EVERY image ref, and the diff --git a/pkg/images/load_test.go b/pkg/images/load_test.go index 109faed..cf5275b 100644 --- a/pkg/images/load_test.go +++ b/pkg/images/load_test.go @@ -1,9 +1,179 @@ package images import ( + "archive/tar" + "bytes" + "context" + "io" + "net/http/httptest" + "net/url" + "os" + "path/filepath" "testing" + + "github.com/google/go-containerregistry/pkg/name" + "github.com/google/go-containerregistry/pkg/registry" + "github.com/google/go-containerregistry/pkg/v1/random" + "github.com/google/go-containerregistry/pkg/v1/remote" + + "github.com/Yolean/y-cluster/pkg/cache" ) +// TestTarOCIDir_StreamsAllFiles writes a small OCI v1 layout +// via the same go-containerregistry pipeline an actual caller +// would (random.Image -> push to in-process registry -> Cache +// pulls into the shared cache), then exercises TarOCIDir and +// confirms every required layout entry shows up in the +// resulting archive. Order doesn't matter; the entry set must +// cover the layout's three required nodes. +func TestTarOCIDir_StreamsAllFiles(t *testing.T) { + srv := httptest.NewServer(registry.New()) + defer srv.Close() + u, err := url.Parse(srv.URL) + if err != nil { + t.Fatal(err) + } + img, err := random.Image(512, 1) + if err != nil { + t.Fatal(err) + } + ref, err := name.NewTag(u.Host + "/test/tarocidir:v1") + if err != nil { + t.Fatal(err) + } + if err := remote.Write(ref, img); err != nil { + t.Fatal(err) + } + root := t.TempDir() + t.Setenv("Y_CLUSTER_CACHE_DIR", root) + if _, err := Cache(context.Background(), ref.String(), "", nil); err != nil { + t.Fatal(err) + } + imgDigest, err := img.Digest() + if err != nil { + t.Fatal(err) + } + dir, err := cache.ImageLayout(root, imgDigest.String()) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := TarOCIDir(dir, &buf); err != nil { + t.Fatalf("TarOCIDir: %v", err) + } + + tr := tar.NewReader(&buf) + seen := map[string]bool{} + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + t.Fatal(err) + } + seen[hdr.Name] = true + _, _ = io.Copy(io.Discard, tr) + } + + for _, need := range []string{"oci-layout", "index.json"} { + if !seen[need] { + t.Errorf("tar missing %q (got %v)", need, seen) + } + } + hasBlob := false + for n := range seen { + if filepath.Dir(n) == "blobs/sha256" { + hasBlob = true + break + } + } + if !hasBlob { + t.Errorf("tar has no blobs/sha256 entries (got %v)", seen) + } +} + +// TestTarOCIDir_EmptyDirNoEntries: empty source dir produces a +// valid tar that decodes to zero entries. The caller (Load via +// ctr import) would surface a downstream import failure on +// empty input, which is the right level for "you pointed me at +// an empty layout". +func TestTarOCIDir_EmptyDirNoEntries(t *testing.T) { + dir := t.TempDir() + var buf bytes.Buffer + if err := TarOCIDir(dir, &buf); err != nil { + t.Fatalf("TarOCIDir(empty): %v", err) + } + tr := tar.NewReader(&buf) + if _, err := tr.Next(); err != io.EOF { + t.Errorf("expected EOF on empty layout, got err=%v", err) + } +} + +// TestTarOCIDir_NonexistentDirErrors: cmd-layer guards check +// os.Stat first, but TarOCIDir on a missing dir should still +// surface the error rather than silently produce an empty tar. +func TestTarOCIDir_NonexistentDirErrors(t *testing.T) { + missing := filepath.Join(t.TempDir(), "does-not-exist") + var buf bytes.Buffer + err := TarOCIDir(missing, &buf) + if err == nil { + t.Fatal("expected error for missing dir") + } +} + +// TestTarOCIDirReader_PipeClose: the goroutine-backed reader +// must release its goroutine on early Close (Load defers Close +// even on abort). +func TestTarOCIDirReader_PipeClose(t *testing.T) { + dir := t.TempDir() + if err := os.WriteFile(filepath.Join(dir, "marker"), []byte("x"), 0o644); err != nil { + t.Fatal(err) + } + r := TarOCIDirReader(dir) + buf := make([]byte, 32) + _, _ = r.Read(buf) + if err := r.Close(); err != nil { + t.Errorf("Close: %v", err) + } +} + +// TestPresentInCluster_DigestMatch pins the matcher policy: +// digestRef like "host/name@sha256:abc" matches any row whose +// digest column equals sha256:abc, regardless of the row's +// ref name. Catches the common case where a prior load brought +// in the same digest under a different tag (mirrors, retag) -- +// we should still skip the re-import. +// +// Driven through parseImageList + a hand-constructed pairs +// slice so we don't need a real cluster. +func TestPresentInCluster_DigestMatch(t *testing.T) { + const want = "sha256:1111111111111111111111111111111111111111111111111111111111111111" + pairs := parseImageList("REF\tTYPE\tDIGEST\nfoo:bar\ttype\t" + want + "\n") + if len(pairs) != 1 { + t.Fatalf("expected 1 row, got %d", len(pairs)) + } + if pairs[0].digest != want { + t.Fatalf("parser regression: got digest %q want %q", pairs[0].digest, want) + } + // The cluster-side matcher is a substring of PresentInCluster: + // scan the pairs for a digest equality. Verify the policy in + // isolation -- a full PresentInCluster test would need a fake + // cluster.LookupResult which is more wiring than this + // behavioural check justifies. + hit := false + for _, p := range pairs { + if p.digest == want { + hit = true + break + } + } + if !hit { + t.Error("digest match policy regression") + } +} + // TestParseImageList pins the (ref, digest) extraction // against a real `ctr -n k8s.io image list` capture from a // k3s v1.35 cluster. We accept any whitespace separator and @@ -18,9 +188,9 @@ func TestParseImageList(t *testing.T) { { name: "header + single row", in: "REF TYPE DIGEST SIZE PLATFORMS LABELS\n" + - "builds-registry.ystack.svc.cluster.local/yolean/keycloak-v3:local-dev application/vnd.oci.image.index.v1+json sha256:fafacfe13375f62fc0a8303c6c6b6186e755d44f479a161476f4129009eb730b 263.7 MiB linux/amd64,linux/arm64 io.cri-containerd.image=managed\n", + "builds-registry.default.svc.cluster.local/myrepo/myapp:local-dev application/vnd.oci.image.index.v1+json sha256:fafacfe13375f62fc0a8303c6c6b6186e755d44f479a161476f4129009eb730b 263.7 MiB linux/amd64,linux/arm64 io.cri-containerd.image=managed\n", want: []importedRef{{ - ref: "builds-registry.ystack.svc.cluster.local/yolean/keycloak-v3:local-dev", + ref: "builds-registry.default.svc.cluster.local/myrepo/myapp:local-dev", digest: "sha256:fafacfe13375f62fc0a8303c6c6b6186e755d44f479a161476f4129009eb730b", }}, }, @@ -78,7 +248,7 @@ func TestStripTag(t *testing.T) { // Hostport-prefixed ref WITHOUT tag. {"registry.example:5000/path", "registry.example:5000/path"}, // Standard cluster-local registry path. - {"builds-registry.ystack.svc.cluster.local/yolean/keycloak-v3:local-dev", "builds-registry.ystack.svc.cluster.local/yolean/keycloak-v3"}, + {"builds-registry.default.svc.cluster.local/myrepo/myapp:local-dev", "builds-registry.default.svc.cluster.local/myrepo/myapp"}, // No tag at all. {"plain", "plain"}, }