Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions core/mount/mount_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ func (m *Mount) mount(target string) (err error) {
options = m.Options
)

opt := parseMountOptions(options)
opt, err := parseMountOptions(options)
if err != nil {
return err
}
// The only remapping of both GID and UID is supported
if opt.uidmap != "" && opt.gidmap != "" {
if usernsFd, err = GetUsernsFD(opt.uidmap, opt.gidmap); err != nil {
Expand All @@ -116,7 +119,10 @@ func (m *Mount) mount(target string) (err error) {
if optionsSize(options) >= pagesize-512 {
recalcOpt = true
} else {
opt = parseMountOptions(options)
opt, err = parseMountOptions(options)
if err != nil {
return err
}
}
}
}
Expand All @@ -129,7 +135,10 @@ func (m *Mount) mount(target string) (err error) {
// recalculate opt in case of lowerdirs have been replaced
// by idmapped ones OR idmapped mounts' not used/supported.
if recalcOpt || (opt.uidmap == "" || opt.gidmap == "") {
opt = parseMountOptions(options)
opt, err = parseMountOptions(options)
if err != nil {
return err
}
}
}

Expand Down Expand Up @@ -325,7 +334,7 @@ func buildIDMappedPaths(lowerDirs []string, commonDir, idMappedDir string) []str

// parseMountOptions takes fstab style mount options and parses them for
// use with a standard mount() syscall
func parseMountOptions(options []string) (opt mountOpt) {
func parseMountOptions(options []string) (opt mountOpt, err error) {
loopOpt := "loop"
flagsMap := map[string]struct {
clear bool
Expand Down Expand Up @@ -358,6 +367,11 @@ func parseMountOptions(options []string) (opt mountOpt) {
"sync": {false, unix.MS_SYNCHRONOUS},
}
for _, o := range options {
// X-containerd.* options are internal mount options that should be processed
// by the mount manager before reaching this layer.
if strings.HasPrefix(o, "X-containerd.") {
return opt, fmt.Errorf("internal mount option %q was not consumed by the mount manager", o)
}
// If the option does not exist in the flags table or the flag
// is not supported on the platform,
// then it is a data value for a specific fs type
Expand Down
20 changes: 20 additions & 0 deletions core/mount/mount_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,3 +697,23 @@ func TestGetCommonDirectory(t *testing.T) {
})
}
}

func TestXContainerdOptionsFiltered(t *testing.T) {
testutil.RequiresRoot(t)

target := filepath.Join(t.TempDir(), "mnt")
require.NoError(t, os.MkdirAll(target, 0755))

m := Mount{
Type: "tmpfs",
Source: "tmpfs",
Options: []string{
"size=10M",
"X-containerd.custom=test-value",
"mode=0755",
},
}

err := m.Mount(target)
require.Error(t, err, "X-containerd.* options should cause an error")
}
6 changes: 6 additions & 0 deletions core/remotes/docker/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func (p dockerPusher) Writer(ctx context.Context, opts ...content.WriterOpt) (co
if wOpts.Ref == "" {
return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)
}
if wOpts.Desc.Digest == "" {
return nil, fmt.Errorf("descriptor digest must not be empty: %w", errdefs.ErrInvalidArgument)
}
if wOpts.Desc.MediaType == "" {
return nil, fmt.Errorf("descriptor media type must not be empty: %w", errdefs.ErrInvalidArgument)
}
return p.push(ctx, wOpts.Desc, wOpts.Ref, true)
}

Expand Down
8 changes: 8 additions & 0 deletions core/remotes/docker/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,14 @@ func (r *dockerResolver) Resolve(ctx context.Context, ref string) (string, ocisp
}

for _, u := range paths {
// falling back to /blobs endpoint should happen in extreme cases - those to
// support legacy registries. we want to limit the fallback to when /manifests endpoint
// returned 404. Falling back on transient errors could do more harm, like polluting
// the local content store with incorrectly typed descriptors as /blobs endpoint tends
// always return with application/octet-stream.
if firstErrPriority > 2 {
break
}
for i, host := range hosts {
req := base.request(host, http.MethodHead, u...)
if err := req.addNamespace(base.refspec.Hostname()); err != nil {
Expand Down
132 changes: 132 additions & 0 deletions core/remotes/docker/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,138 @@ func (m testManifest) RegisterHandler(r *http.ServeMux, name string) {
}
}

// TestResolveTransientManifestError verifies that a transient server error (5xx)
// from the /manifests/ endpoint does NOT cause containerd to fall back to the
// /blobs/ endpoint. Before this fix, a 500 from /manifests/ would cause Resolve()
// to silently retry via /blobs/, which returns "application/octet-stream" instead
// of a proper manifest media type — poisoning the descriptor and corrupting the
// local content store.
//
// The correct behavior is: 5xx from /manifests/ → return the server error, do NOT
// try /blobs/.
func TestResolveTransientManifestError(t *testing.T) {
var (
manifestCalled int
blobsCalled bool
repo = "test-repo"
dgst = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" // empty sha
)

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/manifests/"+dgst) {
manifestCalled++
w.WriteHeader(http.StatusInternalServerError)
return
}
if strings.HasSuffix(r.URL.Path, "/blobs/"+dgst) {
blobsCalled = true
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Docker-Content-Digest", dgst)
w.WriteHeader(http.StatusOK)
return
}
if r.URL.Path == "/v2/" {
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusNotFound)
}))
defer ts.Close()

resolver := NewResolver(ResolverOptions{
Hosts: func(string) ([]RegistryHost, error) {
return []RegistryHost{
{
Host: ts.URL[len("http://"):],
Scheme: "http",
Capabilities: HostCapabilityPull | HostCapabilityResolve,
},
}, nil
},
})

ref := fmt.Sprintf("%s/%s@%s", ts.URL[len("http://"):], repo, dgst)
_, _, err := resolver.Resolve(context.Background(), ref)

if manifestCalled == 0 {
t.Fatal("manifests endpoint was not called")
}
if blobsCalled {
t.Error("blobs endpoint was called, but should not have been after a 500 on /manifests/")
}
if err == nil {
t.Fatal("expected error from Resolve, but got nil")
}

// The error should surface the unexpected 500 status, not a generic "not found".
var unexpectedStatus remoteerrors.ErrUnexpectedStatus
if !errors.As(err, &unexpectedStatus) {
t.Errorf("expected ErrUnexpectedStatus (from 500), got %T: %v", err, err)
} else if unexpectedStatus.StatusCode != http.StatusInternalServerError {
t.Errorf("expected status 500, got %d", unexpectedStatus.StatusCode)
}
}

// TestResolve404ManifestFallback verifies that a 404 from /manifests/ DOES
// allow fallback to /blobs/. This preserves backward compatibility with
// non-standard registries that may only serve certain digests via /blobs/.
func TestResolve404ManifestFallback(t *testing.T) {
var (
manifestCalled bool
blobsCalled bool
repo = "test-repo"
dgst = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
)

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/manifests/"+dgst) {
manifestCalled = true
w.WriteHeader(http.StatusNotFound)
return
}
if strings.HasSuffix(r.URL.Path, "/blobs/"+dgst) {
blobsCalled = true
w.Header().Set("Content-Type", "application/vnd.docker.distribution.manifest.v2+json")
w.Header().Set("Docker-Content-Digest", dgst)
w.WriteHeader(http.StatusOK)
return
}
if r.URL.Path == "/v2/" {
w.WriteHeader(http.StatusOK)
return
}
}))
defer ts.Close()

resolver := NewResolver(ResolverOptions{
Hosts: func(string) ([]RegistryHost, error) {
return []RegistryHost{
{
Host: ts.URL[len("http://"):],
Scheme: "http",
Capabilities: HostCapabilityPull | HostCapabilityResolve,
},
}, nil
},
})

ref := fmt.Sprintf("%s/%s@%s", ts.URL[len("http://"):], repo, dgst)
_, desc, err := resolver.Resolve(context.Background(), ref)

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !manifestCalled {
t.Error("manifests endpoint was not called")
}
if !blobsCalled {
t.Error("blobs endpoint was not called on 404")
}
if desc.MediaType != "application/vnd.docker.distribution.manifest.v2+json" {
t.Errorf("unexpected media type: %s", desc.MediaType)
}
}

func newRefreshTokenServer(t testing.TB, name string, disablePOST bool, onFetchRefreshToken OnFetchRefreshToken) *refreshTokenServer {
return &refreshTokenServer{
T: t,
Expand Down
38 changes: 20 additions & 18 deletions core/transfer/streaming/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,29 @@ func SendStream(ctx context.Context, r io.Reader, stream streaming.Stream) {
max = remaining
}
b := (*buf)[:max]
n, err := r.Read(b)
if err != nil {
if !errors.Is(err, io.EOF) {
log.G(ctx).WithError(err).Errorf("failed to read stream source")
n, readErr := r.Read(b)
if n > 0 {
remaining = remaining - int32(n)

data := &transferapi.Data{
Data: b[:n],
}
anyType, err := typeurl.MarshalAny(data)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to marshal data for send")
// TODO: Send error message on stream before close to allow remote side to return error
return
}
if err := stream.Send(anyType); err != nil {
log.G(ctx).WithError(err).Errorf("send failed")
return
}
return
}
remaining = remaining - int32(n)

data := &transferapi.Data{
Data: b[:n],
}
anyType, err := typeurl.MarshalAny(data)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to marshal data for send")
// TODO: Send error message on stream before close to allow remote side to return error
return
}
if err := stream.Send(anyType); err != nil {
log.G(ctx).WithError(err).Errorf("send failed")
if readErr != nil {
if !errors.Is(readErr, io.EOF) {
log.G(ctx).WithError(readErr).Errorf("failed to read stream source")
// TODO: Send error message on stream before close to allow remote side to return error
}
return
}
}
Expand Down
39 changes: 39 additions & 0 deletions core/transfer/streaming/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,45 @@ func runWriterFuzz(ctx context.Context, t *testing.T, expected []byte) {
}
}

// TestSendReceiveEOFWithData verifies that data is not dropped when a reader
// returns both n > 0 and io.EOF from the same Read call.
func TestSendReceiveEOFWithData(t *testing.T) {
expected := []byte("one and only chunk with eof")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rs, ws := pipeStream()
SendStream(ctx, &eofReader{data: expected}, ws)
or := ReceiveStream(ctx, rs)

actual, err := io.ReadAll(or)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(expected, actual) {
t.Fatalf("received bytes are not equal\n\tactual: %v\n\texpected: %v", actual, expected)
}
}

// eofReader returns all remaining data together with io.EOF on the final Read,
// exercising the io.Reader contract where n > 0 && err == io.EOF.
type eofReader struct {
data []byte
}

func (r *eofReader) Read(p []byte) (int, error) {
if len(r.data) == 0 {
return 0, io.EOF
}
n := copy(p, r.data)
r.data = r.data[n:]
if len(r.data) == 0 {
return n, io.EOF
}
return n, nil
}

func chainStreams(ctx context.Context, r io.Reader) io.Reader {
rs, ws := pipeStream()
SendStream(ctx, r, ws)
Expand Down
29 changes: 29 additions & 0 deletions core/unpack/unpacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,15 @@ func (u *Unpacker) unpack(
case <-fetchC[i-fetchOffset]:
}

// In case of parallel unpack, the parent snapshot isn't provided to the snapshotter.
// The overlayfs will return bind mounts for all layers, we need to convert them
// to overlay mounts for the applier to perform whiteout conversion correctly.
// TODO: this is a temporary workaround until #13053 lands.
// See: https://github.com/containerd/containerd/issues/13030
if i > 0 && parallel && unpack.SnapshotterKey == "overlayfs" {
mounts = bindToOverlay(mounts)
}

diff, err := a.Apply(ctx, desc, mounts, unpack.ApplyOpts...)
if err != nil {
cleanup.Do(ctx, abort)
Expand Down Expand Up @@ -753,3 +762,23 @@ func uniquePart() string {
rand.Read(b[:])
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
}

// TODO: this is a temporary workaround until #13053 lands.
func bindToOverlay(mounts []mount.Mount) []mount.Mount {
if len(mounts) != 1 || mounts[0].Type != "bind" {
return mounts
}

m := mount.Mount{
Type: "overlay",
Source: "overlay",
}
for _, o := range mounts[0].Options {
if o != "rbind" {
m.Options = append(m.Options, o)
}
}
m.Options = append(m.Options, "upperdir="+mounts[0].Source)

return []mount.Mount{m}
}
Loading
Loading