diff --git a/core/mount/mount_linux.go b/core/mount/mount_linux.go index 4dc8f0d4d2d8c..d38a2c7c161ae 100644 --- a/core/mount/mount_linux.go +++ b/core/mount/mount_linux.go @@ -92,7 +92,10 @@ func (m *Mount) mount(target string) (err error) { options = m.Options ) - opt := parseMountOptions(options) + opt, err := parseMountOptions(options) + if err != nil { + return err + } // The only remapping of both GID and UID is supported if opt.uidmap != "" && opt.gidmap != "" { if usernsFd, err = GetUsernsFD(opt.uidmap, opt.gidmap); err != nil { @@ -116,7 +119,10 @@ func (m *Mount) mount(target string) (err error) { if optionsSize(options) >= pagesize-512 { recalcOpt = true } else { - opt = parseMountOptions(options) + opt, err = parseMountOptions(options) + if err != nil { + return err + } } } } @@ -129,7 +135,10 @@ func (m *Mount) mount(target string) (err error) { // recalculate opt in case of lowerdirs have been replaced // by idmapped ones OR idmapped mounts' not used/supported. if recalcOpt || (opt.uidmap == "" || opt.gidmap == "") { - opt = parseMountOptions(options) + opt, err = parseMountOptions(options) + if err != nil { + return err + } } } @@ -325,7 +334,7 @@ func buildIDMappedPaths(lowerDirs []string, commonDir, idMappedDir string) []str // parseMountOptions takes fstab style mount options and parses them for // use with a standard mount() syscall -func parseMountOptions(options []string) (opt mountOpt) { +func parseMountOptions(options []string) (opt mountOpt, err error) { loopOpt := "loop" flagsMap := map[string]struct { clear bool @@ -358,6 +367,11 @@ func parseMountOptions(options []string) (opt mountOpt) { "sync": {false, unix.MS_SYNCHRONOUS}, } for _, o := range options { + // X-containerd.* options are internal mount options that should be processed + // by the mount manager before reaching this layer. + if strings.HasPrefix(o, "X-containerd.") { + return opt, fmt.Errorf("internal mount option %q was not consumed by the mount manager", o) + } // If the option does not exist in the flags table or the flag // is not supported on the platform, // then it is a data value for a specific fs type diff --git a/core/mount/mount_linux_test.go b/core/mount/mount_linux_test.go index 8f45bdff4b974..2c6047d0a0d6b 100644 --- a/core/mount/mount_linux_test.go +++ b/core/mount/mount_linux_test.go @@ -697,3 +697,23 @@ func TestGetCommonDirectory(t *testing.T) { }) } } + +func TestXContainerdOptionsFiltered(t *testing.T) { + testutil.RequiresRoot(t) + + target := filepath.Join(t.TempDir(), "mnt") + require.NoError(t, os.MkdirAll(target, 0755)) + + m := Mount{ + Type: "tmpfs", + Source: "tmpfs", + Options: []string{ + "size=10M", + "X-containerd.custom=test-value", + "mode=0755", + }, + } + + err := m.Mount(target) + require.Error(t, err, "X-containerd.* options should cause an error") +} diff --git a/core/remotes/docker/pusher.go b/core/remotes/docker/pusher.go index 4c8e4f1ba835c..4ab15bba8378d 100644 --- a/core/remotes/docker/pusher.go +++ b/core/remotes/docker/pusher.go @@ -60,6 +60,12 @@ func (p dockerPusher) Writer(ctx context.Context, opts ...content.WriterOpt) (co if wOpts.Ref == "" { return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument) } + if wOpts.Desc.Digest == "" { + return nil, fmt.Errorf("descriptor digest must not be empty: %w", errdefs.ErrInvalidArgument) + } + if wOpts.Desc.MediaType == "" { + return nil, fmt.Errorf("descriptor media type must not be empty: %w", errdefs.ErrInvalidArgument) + } return p.push(ctx, wOpts.Desc, wOpts.Ref, true) } diff --git a/core/remotes/docker/resolver.go b/core/remotes/docker/resolver.go index 451b07496e76f..f40ee962cc9ca 100644 --- a/core/remotes/docker/resolver.go +++ b/core/remotes/docker/resolver.go @@ -292,6 +292,14 @@ func (r *dockerResolver) Resolve(ctx context.Context, ref string) (string, ocisp } for _, u := range paths { + // falling back to /blobs endpoint should happen in extreme cases - those to + // support legacy registries. we want to limit the fallback to when /manifests endpoint + // returned 404. Falling back on transient errors could do more harm, like polluting + // the local content store with incorrectly typed descriptors as /blobs endpoint tends + // always return with application/octet-stream. + if firstErrPriority > 2 { + break + } for i, host := range hosts { req := base.request(host, http.MethodHead, u...) if err := req.addNamespace(base.refspec.Hostname()); err != nil { diff --git a/core/remotes/docker/resolver_test.go b/core/remotes/docker/resolver_test.go index b553101b70b48..83fbc4d507b60 100644 --- a/core/remotes/docker/resolver_test.go +++ b/core/remotes/docker/resolver_test.go @@ -1104,6 +1104,138 @@ func (m testManifest) RegisterHandler(r *http.ServeMux, name string) { } } +// TestResolveTransientManifestError verifies that a transient server error (5xx) +// from the /manifests/ endpoint does NOT cause containerd to fall back to the +// /blobs/ endpoint. Before this fix, a 500 from /manifests/ would cause Resolve() +// to silently retry via /blobs/, which returns "application/octet-stream" instead +// of a proper manifest media type — poisoning the descriptor and corrupting the +// local content store. +// +// The correct behavior is: 5xx from /manifests/ → return the server error, do NOT +// try /blobs/. +func TestResolveTransientManifestError(t *testing.T) { + var ( + manifestCalled int + blobsCalled bool + repo = "test-repo" + dgst = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" // empty sha + ) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/manifests/"+dgst) { + manifestCalled++ + w.WriteHeader(http.StatusInternalServerError) + return + } + if strings.HasSuffix(r.URL.Path, "/blobs/"+dgst) { + blobsCalled = true + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Docker-Content-Digest", dgst) + w.WriteHeader(http.StatusOK) + return + } + if r.URL.Path == "/v2/" { + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusNotFound) + })) + defer ts.Close() + + resolver := NewResolver(ResolverOptions{ + Hosts: func(string) ([]RegistryHost, error) { + return []RegistryHost{ + { + Host: ts.URL[len("http://"):], + Scheme: "http", + Capabilities: HostCapabilityPull | HostCapabilityResolve, + }, + }, nil + }, + }) + + ref := fmt.Sprintf("%s/%s@%s", ts.URL[len("http://"):], repo, dgst) + _, _, err := resolver.Resolve(context.Background(), ref) + + if manifestCalled == 0 { + t.Fatal("manifests endpoint was not called") + } + if blobsCalled { + t.Error("blobs endpoint was called, but should not have been after a 500 on /manifests/") + } + if err == nil { + t.Fatal("expected error from Resolve, but got nil") + } + + // The error should surface the unexpected 500 status, not a generic "not found". + var unexpectedStatus remoteerrors.ErrUnexpectedStatus + if !errors.As(err, &unexpectedStatus) { + t.Errorf("expected ErrUnexpectedStatus (from 500), got %T: %v", err, err) + } else if unexpectedStatus.StatusCode != http.StatusInternalServerError { + t.Errorf("expected status 500, got %d", unexpectedStatus.StatusCode) + } +} + +// TestResolve404ManifestFallback verifies that a 404 from /manifests/ DOES +// allow fallback to /blobs/. This preserves backward compatibility with +// non-standard registries that may only serve certain digests via /blobs/. +func TestResolve404ManifestFallback(t *testing.T) { + var ( + manifestCalled bool + blobsCalled bool + repo = "test-repo" + dgst = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/manifests/"+dgst) { + manifestCalled = true + w.WriteHeader(http.StatusNotFound) + return + } + if strings.HasSuffix(r.URL.Path, "/blobs/"+dgst) { + blobsCalled = true + w.Header().Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json") + w.Header().Set("Docker-Content-Digest", dgst) + w.WriteHeader(http.StatusOK) + return + } + if r.URL.Path == "/v2/" { + w.WriteHeader(http.StatusOK) + return + } + })) + defer ts.Close() + + resolver := NewResolver(ResolverOptions{ + Hosts: func(string) ([]RegistryHost, error) { + return []RegistryHost{ + { + Host: ts.URL[len("http://"):], + Scheme: "http", + Capabilities: HostCapabilityPull | HostCapabilityResolve, + }, + }, nil + }, + }) + + ref := fmt.Sprintf("%s/%s@%s", ts.URL[len("http://"):], repo, dgst) + _, desc, err := resolver.Resolve(context.Background(), ref) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !manifestCalled { + t.Error("manifests endpoint was not called") + } + if !blobsCalled { + t.Error("blobs endpoint was not called on 404") + } + if desc.MediaType != "application/vnd.docker.distribution.manifest.v2+json" { + t.Errorf("unexpected media type: %s", desc.MediaType) + } +} + func newRefreshTokenServer(t testing.TB, name string, disablePOST bool, onFetchRefreshToken OnFetchRefreshToken) *refreshTokenServer { return &refreshTokenServer{ T: t, diff --git a/core/transfer/streaming/stream.go b/core/transfer/streaming/stream.go index f571313920f00..967d40e889554 100644 --- a/core/transfer/streaming/stream.go +++ b/core/transfer/streaming/stream.go @@ -111,27 +111,29 @@ func SendStream(ctx context.Context, r io.Reader, stream streaming.Stream) { max = remaining } b := (*buf)[:max] - n, err := r.Read(b) - if err != nil { - if !errors.Is(err, io.EOF) { - log.G(ctx).WithError(err).Errorf("failed to read stream source") + n, readErr := r.Read(b) + if n > 0 { + remaining = remaining - int32(n) + + data := &transferapi.Data{ + Data: b[:n], + } + anyType, err := typeurl.MarshalAny(data) + if err != nil { + log.G(ctx).WithError(err).Errorf("failed to marshal data for send") // TODO: Send error message on stream before close to allow remote side to return error + return + } + if err := stream.Send(anyType); err != nil { + log.G(ctx).WithError(err).Errorf("send failed") + return } - return - } - remaining = remaining - int32(n) - - data := &transferapi.Data{ - Data: b[:n], - } - anyType, err := typeurl.MarshalAny(data) - if err != nil { - log.G(ctx).WithError(err).Errorf("failed to marshal data for send") - // TODO: Send error message on stream before close to allow remote side to return error - return } - if err := stream.Send(anyType); err != nil { - log.G(ctx).WithError(err).Errorf("send failed") + if readErr != nil { + if !errors.Is(readErr, io.EOF) { + log.G(ctx).WithError(readErr).Errorf("failed to read stream source") + // TODO: Send error message on stream before close to allow remote side to return error + } return } } diff --git a/core/transfer/streaming/stream_test.go b/core/transfer/streaming/stream_test.go index 533f4f4edfe31..32b4956183f8a 100644 --- a/core/transfer/streaming/stream_test.go +++ b/core/transfer/streaming/stream_test.go @@ -105,6 +105,45 @@ func runWriterFuzz(ctx context.Context, t *testing.T, expected []byte) { } } +// TestSendReceiveEOFWithData verifies that data is not dropped when a reader +// returns both n > 0 and io.EOF from the same Read call. +func TestSendReceiveEOFWithData(t *testing.T) { + expected := []byte("one and only chunk with eof") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rs, ws := pipeStream() + SendStream(ctx, &eofReader{data: expected}, ws) + or := ReceiveStream(ctx, rs) + + actual, err := io.ReadAll(or) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(expected, actual) { + t.Fatalf("received bytes are not equal\n\tactual: %v\n\texpected: %v", actual, expected) + } +} + +// eofReader returns all remaining data together with io.EOF on the final Read, +// exercising the io.Reader contract where n > 0 && err == io.EOF. +type eofReader struct { + data []byte +} + +func (r *eofReader) Read(p []byte) (int, error) { + if len(r.data) == 0 { + return 0, io.EOF + } + n := copy(p, r.data) + r.data = r.data[n:] + if len(r.data) == 0 { + return n, io.EOF + } + return n, nil +} + func chainStreams(ctx context.Context, r io.Reader) io.Reader { rs, ws := pipeStream() SendStream(ctx, r, ws) diff --git a/core/unpack/unpacker.go b/core/unpack/unpacker.go index f396903ac8d07..67f2dcb7c4587 100644 --- a/core/unpack/unpacker.go +++ b/core/unpack/unpacker.go @@ -530,6 +530,15 @@ func (u *Unpacker) unpack( case <-fetchC[i-fetchOffset]: } + // In case of parallel unpack, the parent snapshot isn't provided to the snapshotter. + // The overlayfs will return bind mounts for all layers, we need to convert them + // to overlay mounts for the applier to perform whiteout conversion correctly. + // TODO: this is a temporary workaround until #13053 lands. + // See: https://github.com/containerd/containerd/issues/13030 + if i > 0 && parallel && unpack.SnapshotterKey == "overlayfs" { + mounts = bindToOverlay(mounts) + } + diff, err := a.Apply(ctx, desc, mounts, unpack.ApplyOpts...) if err != nil { cleanup.Do(ctx, abort) @@ -753,3 +762,23 @@ func uniquePart() string { rand.Read(b[:]) return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:])) } + +// TODO: this is a temporary workaround until #13053 lands. +func bindToOverlay(mounts []mount.Mount) []mount.Mount { + if len(mounts) != 1 || mounts[0].Type != "bind" { + return mounts + } + + m := mount.Mount{ + Type: "overlay", + Source: "overlay", + } + for _, o := range mounts[0].Options { + if o != "rbind" { + m.Options = append(m.Options, o) + } + } + m.Options = append(m.Options, "upperdir="+mounts[0].Source) + + return []mount.Mount{m} +} diff --git a/core/unpack/unpacker_test.go b/core/unpack/unpacker_test.go index 66da7b585c8c1..bdd08348f56f7 100644 --- a/core/unpack/unpacker_test.go +++ b/core/unpack/unpacker_test.go @@ -19,8 +19,10 @@ package unpack import ( "crypto/rand" "fmt" + "reflect" "testing" + "github.com/containerd/containerd/v2/core/mount" "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" ) @@ -91,3 +93,87 @@ func BenchmarkUnpackWithChainIDs(b *testing.B) { }) } } + +func TestBindToOverlay(t *testing.T) { + testCases := []struct { + name string + mounts []mount.Mount + expect []mount.Mount + }{ + { + name: "single bind mount", + mounts: []mount.Mount{ + { + Type: "bind", + Source: "/path/to/source", + Options: []string{"ro", "rbind"}, + }, + }, + expect: []mount.Mount{ + { + Type: "overlay", + Source: "overlay", + Options: []string{ + "ro", + "upperdir=/path/to/source", + }, + }, + }, + }, + { + name: "overlay mount", + mounts: []mount.Mount{ + { + Type: "overlay", + Source: "overlay", + Options: []string{ + "lowerdir=/path/to/lower", + "upperdir=/path/to/upper", + }, + }, + }, + expect: []mount.Mount{ + { + Type: "overlay", + Source: "overlay", + Options: []string{ + "lowerdir=/path/to/lower", + "upperdir=/path/to/upper", + }, + }, + }, + }, + { + name: "multiple mounts", + mounts: []mount.Mount{ + { + Type: "bind", + Source: "/path/to/source1", + }, + { + Type: "bind", + Source: "/path/to/source2", + }, + }, + expect: []mount.Mount{ + { + Type: "bind", + Source: "/path/to/source1", + }, + { + Type: "bind", + Source: "/path/to/source2", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := bindToOverlay(tc.mounts) + if !reflect.DeepEqual(result, tc.expect) { + t.Errorf("unexpected result: got %v, want %v", result, tc.expect) + } + }) + } +} diff --git a/integration/client/container_linux_test.go b/integration/client/container_linux_test.go index 7f44c06cd19a6..1de8776cc9ac7 100644 --- a/integration/client/container_linux_test.go +++ b/integration/client/container_linux_test.go @@ -35,6 +35,7 @@ import ( cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2" "github.com/containerd/containerd/api/types/runc/options" "github.com/containerd/errdefs" + "github.com/containerd/platforms" "github.com/stretchr/testify/assert" . "github.com/containerd/containerd/v2/client" @@ -50,6 +51,7 @@ import ( "github.com/opencontainers/runtime-spec/specs-go" "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" "golang.org/x/sys/unix" ) @@ -1812,3 +1814,70 @@ func TestIssue10589(t *testing.T) { require.NoError(t, err, "container status") assert.Equal(t, Stopped, status.Status) } + +// TestIssue13030 is a regression test for parallel image unpacking. +// The test validates that when multiple layers are unpacked in parallel, +// that whiteout files are properly processed and do not cause files to +// be unexpectedly present in the final rootfs. +// +// https://github.com/containerd/containerd/issues/13030 +func TestIssue13030(t *testing.T) { + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { client.Close() }) + + ctx, cancel := testContext(t) + t.Cleanup(cancel) + + image, err := client.Pull(ctx, + images.Get(images.Whiteout), + WithPlatformMatcher(platforms.Default()), + WithPullUnpack, + WithUnpackOpts([]UnpackOpt{WithUnpackLimiter(semaphore.NewWeighted(3))}), + ) + t.Cleanup(func() { + client.ImageService().Delete(ctx, images.Get(images.Whiteout)) + }) + if err != nil { + t.Fatal(err) + } + + container, err := client.NewContainer(ctx, t.Name(), + WithNewSnapshot(t.Name(), image), + WithNewSpec(oci.WithImageConfig(image), + withProcessArgs("/bin/sh", "-e", "-c", "test ! -e /file-to-delete && test ! -e /dir-to-delete")), + ) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + container.Delete(ctx, WithSnapshotCleanup) + }) + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + task.Delete(ctx) + }) + + statusC, err := task.Wait(ctx) + if err != nil { + t.Fatal(err) + } + err = task.Start(ctx) + if err != nil { + t.Fatal(err) + } + status := <-statusC + code, _, err := status.Result() + if err != nil { + t.Fatal(err) + } + if code != 0 { + t.Errorf("expected status 0 from wait but received %d", code) + } +} diff --git a/integration/images/image_list.go b/integration/images/image_list.go index 47cbb1b1e1e0c..2d851f68da61a 100644 --- a/integration/images/image_list.go +++ b/integration/images/image_list.go @@ -38,6 +38,7 @@ type ImageList struct { VolumeOwnership string ArgsEscaped string Nginx string + Whiteout string } var ( @@ -57,6 +58,7 @@ func initImages(imageListFile string) { VolumeOwnership: "ghcr.io/containerd/volume-ownership:2.1", ArgsEscaped: "cplatpublic.azurecr.io/args-escaped-test-image-ns:1.0", Nginx: "ghcr.io/containerd/nginx:1.27.0", + Whiteout: "ghcr.io/containerd/whiteout-test:1.0", } if imageListFile != "" { @@ -96,6 +98,8 @@ const ( ArgsEscaped // Nginx image Nginx + // Whiteout image + Whiteout ) func initImageMap(imageList ImageList) map[int]string { @@ -108,6 +112,7 @@ func initImageMap(imageList ImageList) map[int]string { images[VolumeOwnership] = imageList.VolumeOwnership images[ArgsEscaped] = imageList.ArgsEscaped images[Nginx] = imageList.Nginx + images[Whiteout] = imageList.Whiteout return images }