Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 171 additions & 20 deletions cmd/y-cluster/images.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package main

import (
"context"
"fmt"
"io"
"os"
"strings"

"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/Yolean/y-cluster/pkg/cache"
"github.com/Yolean/y-cluster/pkg/cluster"
"github.com/Yolean/y-cluster/pkg/images"
)
Expand Down Expand Up @@ -101,43 +105,190 @@ so callers can record what landed.`,

func imagesLoadCmd() *cobra.Command {
var contextName string
var cacheDir string
useCache := true

cmd := &cobra.Command{
Use: "load <archive|->",
Short: "Stream an OCI archive into the cluster node's containerd",
Long: `Open <archive> (a local file path, or "-" for stdin) and pipe its
bytes into ` + "`ctr -n k8s.io image import -`" + ` on the cluster node. The
ref + tag preserved on the loaded image are whatever the archive's
manifest carries — same as running ctr image import on the node
directly. No cache is touched, which is what callers want when their
build pipelines (e.g. ` + "`contain`" + ` outputting tarballs to /tmp) own
the lifecycle of the archive bytes.

Cluster discovery uses --context (default "local"): the docker
backend imports via the daemon API, the qemu backend via SSH.
Use: "load <ref|path|->",
Short: "Ensure an image is in the cluster's containerd",
Long: `Make an image available in the cluster's containerd. The single
positional argument dispatches by leading character (same shape
as docker build / git clone):

./... a local path (relative -- MUST start with "./")
/... a local path (absolute)
~/... a local path (home-relative)
- stdin (an OCI archive)
<other> a remote image reference

Bare names without "./" are remote refs by rule. A directory
called "myimage" in CWD is reached as "./myimage" --
"myimage" alone is dispatched as a remote registry ref and
fails fast. The rule is unambiguous because container references
can't legally start with ./, /, or ~/ per the OCI distribution
spec.

Path inputs:
- file: an already-tarred OCI archive
- directory: an OCI v1 layout (tar streamed on the fly,
equivalent to ` + "`tar -cf - -C <dir> . | y-cluster images load -`" + `)

Remote refs (default flow): resolve digest, check the cluster's
k8s.io namespace -- if the digest is already indexed, no-op.
Otherwise pull into the y-cluster shared cache (idempotent,
dedup by digest, reused on the next load to another cluster)
and stream to ` + "`ctr image import`" + `. Pass --cache=false to skip
the persistent cache (pull into a tempdir, load, throw the
tempdir away); rejected for path / stdin input where the caller
already owns the bytes.

Cluster discovery uses --context (default "local"): docker
backends import via the daemon API, qemu backends via SSH. No
SSH/docker-exec bytes are wasted on a re-deploy of a digest the
cluster already has.

Examples:
y-cluster images load registry.k8s.io/pause:3.10
y-cluster images load registry.k8s.io/pause:3.10 --cache=false
y-cluster images load ./myimage/target-oci
y-cluster images load /tmp/builds/myapp.tar
cat /tmp/builds/myapp.tar | y-cluster images load -
for f in builds/*.tar; do y-cluster images load "$f"; done`,
cat /tmp/builds/myapp.tar | y-cluster images load -`,
Args: cobra.ExactArgs(1),
RunE: func(c *cobra.Command, args []string) error {
r, closer, err := openYAMLInput(args[0], c.InOrStdin())
if err != nil {
return err
ctx := c.Context()
logger := loggerFromContext(ctx)
arg := args[0]
// Validate the input-flag combo BEFORE touching the
// kubeconfig -- a misuse error should fire even when
// the context isn't reachable, so the operator sees
// the real problem first.
switch {
case arg == "-":
if !useCache {
return fmt.Errorf("--cache=false is incompatible with stdin input")
}
case isPathArg(arg):
if !useCache {
return fmt.Errorf("--cache=false is incompatible with path input %q (caller already owns the bytes)", arg)
}
}
defer closer()
lr, err := cluster.Lookup(c.Context(), "", contextName)
lr, err := cluster.Lookup(ctx, "", contextName)
if err != nil {
return err
}
return images.Load(c.Context(), lr, r, loggerFromContext(c.Context()))
switch {
case arg == "-":
return images.Load(ctx, lr, c.InOrStdin(), logger)
case isPathArg(arg):
return loadFromPath(ctx, lr, arg, logger)
default:
return loadFromRef(ctx, lr, arg, cacheDir, useCache, logger)
}
},
}
cmd.Flags().StringVar(&contextName, "context", cluster.DefaultContext, "kubeconfig context name")
cmd.Flags().StringVar(&cacheDir, "cache-dir", "", "override cache root (also: $Y_CLUSTER_CACHE_DIR) -- only honoured for remote-ref input")
cmd.Flags().BoolVar(&useCache, "cache", true, "for remote-ref input, keep the pulled layout in the shared cache for reuse on the next load to another cluster")
return cmd
}

// isPathArg classifies the positional argument as a path
// (vs. a remote registry ref). Container references can't
// legally start with these characters per the OCI distribution
// spec -- hosts never lead with `.`, `/`, or `~` -- so the rule
// is unambiguous. Bare names like "nginx" are remote refs;
// callers who want the local "nginx" directory write "./nginx",
// matching standard shell hygiene. The `~` rule is mostly
// documentation -- the shell expands tilde before exec so the
// program usually sees an absolute path -- but we honour it
// when the shell hasn't expanded for any reason.
func isPathArg(arg string) bool {
return strings.HasPrefix(arg, "./") ||
strings.HasPrefix(arg, "/") ||
strings.HasPrefix(arg, "~/") ||
arg == "." || arg == ".."
}

// loadFromPath dispatches an existing-path input to either the
// archive (file) or OCI-layout (dir) variant of Load. Errors
// clearly when the path doesn't exist so the operator doesn't
// have to guess whether they typo'd.
func loadFromPath(ctx context.Context, lr *cluster.LookupResult, path string, logger *zap.Logger) error {
st, err := os.Stat(path)
if err != nil {
return err
}
if st.IsDir() {
r := images.TarOCIDirReader(path)
defer func() { _ = r.Close() }()
return images.Load(ctx, lr, r, logger)
}
f, err := os.Open(path)
if err != nil {
return err
}
defer func() { _ = f.Close() }()
return images.Load(ctx, lr, f, logger)
}

// loadFromRef implements the "ref -> cluster" flow: resolve
// digest, skip if the cluster already has it, otherwise pull
// and stream to containerd. The cache root is either the
// shared y-cluster cache (useCache=true) or a tempdir cleaned
// up after the load (useCache=false). Same code path for both
// -- the only difference is the root and whether we
// RemoveAll at the end. No second cache structure.
func loadFromRef(ctx context.Context, lr *cluster.LookupResult, ref, cacheDir string, useCache bool, logger *zap.Logger) error {
// Resolve digest first so we can ask the cluster whether
// it already has the image -- and skip the pull entirely
// when the answer is yes. Tag input HEADs; digest input
// is offline-safe.
digestRef, err := images.ResolveDigest(ctx, ref)
if err != nil {
return err
}
logger.Info("resolved ref",
zap.String("input", ref),
zap.String("digest", digestRef),
)
if images.PresentInCluster(ctx, lr, digestRef) {
logger.Info("image already in cluster; skipping import",
zap.String("ref", digestRef),
)
return nil
}

root := cacheDir
if !useCache {
tmp, err := os.MkdirTemp("", "y-cluster-images-load-")
if err != nil {
return err
}
defer func() { _ = os.RemoveAll(tmp) }()
root = tmp
}

if _, err := images.Cache(ctx, ref, root, logger); err != nil {
return err
}

// Derive the layout directory the cache just wrote. The
// helper centralises the path shape so a future cache
// backend swap (e.g. embedded registry:3 in 2.0) touches
// one helper, not the load path here.
digest := digestRef
if at := strings.LastIndex(digest, "@"); at >= 0 {
digest = digest[at+1:]
}
dir, err := cache.ImageLayout(root, digest)
if err != nil {
return err
}
r := images.TarOCIDirReader(dir)
defer func() { _ = r.Close() }()
return images.Load(ctx, lr, r, logger)
}

// openYAMLInput resolves the positional input arg ("<path>" or
// "-") to an io.Reader plus a deferred-close callback. We expose
// the close callback (rather than just an io.ReadCloser) because
Expand Down
67 changes: 67 additions & 0 deletions cmd/y-cluster/images_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,70 @@ func TestImagesLoadCmd_FileNotFound(t *testing.T) {
t.Fatal("expected error for missing archive file")
}
}

// TestIsPathArg pins the prefix-driven dispatch rule -- the
// single source of truth for "is this a path or a remote ref?"
// the load cmd reads. Reference shape is documented in the
// load subcommand's --help, which the cases below mirror.
func TestIsPathArg(t *testing.T) {
paths := []string{
"./relative/path",
"./relative/path.tar",
"./", ".", "..",
"/absolute/path",
"/", "/tmp",
"~/home/path",
}
refs := []string{
"nginx",
"nginx:1.27",
"nginx@sha256:1111111111111111111111111111111111111111111111111111111111111111",
"docker.io/library/nginx:1.27",
"registry.k8s.io/pause:3.10",
"localhost:5000/yolean/echo:v1",
"builds-registry.default.svc.cluster.local/myrepo/myapp:local-dev",
}
for _, p := range paths {
if !isPathArg(p) {
t.Errorf("expected %q to dispatch as path", p)
}
}
for _, r := range refs {
if isPathArg(r) {
t.Errorf("expected %q to dispatch as remote ref", r)
}
}
}

// TestImagesLoadCmd_CacheFalseRejectedForStdin: --cache=false
// is meaningful only for remote refs (where the alternative is
// "pull into a tempdir"). For stdin, the caller's already
// holding the bytes; --cache=false has no semantic and should
// fail loudly.
func TestImagesLoadCmd_CacheFalseRejectedForStdin(t *testing.T) {
cmd := rootCmd()
cmd.SetIn(strings.NewReader("noise"))
cmd.SetArgs([]string{"images", "load", "--context=does-not-exist", "--cache=false", "-"})
err := cmd.Execute()
if err == nil {
t.Fatal("expected error for --cache=false with stdin")
}
if !strings.Contains(err.Error(), "--cache=false") {
t.Errorf("error should call out --cache=false: %v", err)
}
}

// TestImagesLoadCmd_CacheFalseRejectedForPath: same shape for
// the path-input case -- caller owns the bytes; toggling the
// cache is meaningless.
func TestImagesLoadCmd_CacheFalseRejectedForPath(t *testing.T) {
cmd := rootCmd()
cmd.SetArgs([]string{"images", "load", "--context=does-not-exist", "--cache=false", "./some/path"})
err := cmd.Execute()
if err == nil {
t.Fatal("expected error for --cache=false with path")
}
if !strings.Contains(err.Error(), "--cache=false") {
t.Errorf("error should call out --cache=false: %v", err)
}
}
16 changes: 16 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ func Images(flagOverride string) (string, error) {
return filepath.Join(root, "images"), nil
}

// ImageLayout returns the on-disk layout directory for a single
// image, given its sha256 digest (e.g. "sha256:abc...").
// Callers shouldn't think about this path -- the cache is
// implementation-internal -- but the load-by-ref flow needs it
// to feed bytes to `ctr image import` from a cache hit.
// Exporting the derivation here keeps the layout shape in one
// place so a future cache backend swap touches one helper, not
// every caller.
func ImageLayout(flagOverride, digest string) (string, error) {
imagesDir, err := Images(flagOverride)
if err != nil {
return "", err
}
return filepath.Join(imagesDir, digest), nil
}

// K3s returns the k3s download root: airgap tarballs, k3s binary,
// per-version. Replaces the qemu provisioner's old
// ~/.cache/y-cluster-qemu/airgap/<version> location.
Expand Down
2 changes: 1 addition & 1 deletion pkg/gateway/hostnames_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestHostnames_FromHTTPRouteRoundTrip(t *testing.T) {
Listeners: []Listener{{Name: "https", Port: 443, Protocol: "HTTPS"}},
}},
HTTPRoutes: []HTTPRoute{{
Namespace: "keycloak-v3",
Namespace: "myapp",
Name: "keycloak-admin",
ParentRefs: rawJSON(t, `[{"name":"y-cluster","namespace":"y-cluster"}]`),
Hostnames: []string{"keycloak-admin", "keycloak-admin.example.com"},
Expand Down
10 changes: 5 additions & 5 deletions pkg/gateway/summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ func TestBuildSummary_HTTPRouteServiceBackend(t *testing.T) {
},
}},
HTTPRoutes: []HTTPRoute{{
Namespace: "keycloak-v3",
Namespace: "myapp",
Name: "keycloak-admin",
ParentRefs: rawJSON(t, `[{"name":"y-cluster","namespace":"y-cluster","sectionName":"https"}]`),
Hostnames: []string{"keycloak-admin"},
Rules: rawJSON(t, `[{
"matches": [{"path": {"type": "PathPrefix", "value": "/"}}],
"backendRefs": [{"name": "keycloak", "namespace": "keycloak-v3", "port": 8080}]
"backendRefs": [{"name": "keycloak", "namespace": "myapp", "port": 8080}]
}]`),
}},
}
Expand All @@ -71,7 +71,7 @@ func TestBuildSummary_HTTPRouteServiceBackend(t *testing.T) {
if len(rs) != 1 {
t.Fatalf("routes: %+v", rs)
}
if rs[0].Source != "HTTPRoute/keycloak-v3/keycloak-admin#0" {
if rs[0].Source != "HTTPRoute/myapp/keycloak-admin#0" {
t.Errorf("source=%q", rs[0].Source)
}
if len(rs[0].Matches) != 1 || rs[0].Matches[0].Path != "PathPrefix=/" {
Expand All @@ -81,7 +81,7 @@ func TestBuildSummary_HTTPRouteServiceBackend(t *testing.T) {
t.Fatalf("backends: %+v", rs[0].Backends)
}
svc := rs[0].Backends[0].Service
if svc == nil || svc.Name != "keycloak" || svc.Namespace != "keycloak-v3" || svc.Port != 8080 {
if svc == nil || svc.Name != "keycloak" || svc.Namespace != "myapp" || svc.Port != 8080 {
t.Errorf("service: %+v", svc)
}
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestBuildSummary_MultiHostnameRouteAppearsInEachBucket(t *testing.T) {
Listeners: []Listener{{Name: "https", Port: 443, Protocol: "HTTPS"}},
}},
HTTPRoutes: []HTTPRoute{{
Namespace: "keycloak-v3",
Namespace: "myapp",
Name: "keycloak-admin",
ParentRefs: rawJSON(t, `[{"name":"y-cluster","namespace":"y-cluster"}]`),
Hostnames: []string{"keycloak-admin", "keycloak-admin.example.com"},
Expand Down
Loading