Skip to content

Commit 6fcb822

Browse files
authored
Merge pull request containerd#11921 from dmcgowan/unpack-progress
Tar unpack progress through transfer service
2 parents acc74b3 + 479cf42 commit 6fcb822

6 files changed

Lines changed: 164 additions & 13 deletions

File tree

cmd/ctr/commands/images/pull.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,14 @@ func ProgressHandler(ctx context.Context, out io.Writer) (transfer.ProgressFunc,
265265
statuses = map[string]*progressNode{}
266266
roots = []*progressNode{}
267267
progress transfer.ProgressFunc
268-
pc = make(chan transfer.Progress, 1)
269-
status string
270-
closeC = make(chan struct{})
268+
// Use a buffered channel for progress to allow multiple completed
269+
// progress updates to be processed before shutting down the progress
270+
// handler. Currently the progress stream does not have an explicit
271+
// end, however, done indicates the server has already commpleted
272+
// sending all progress.
273+
pc = make(chan transfer.Progress, 5)
274+
status string
275+
closeC = make(chan struct{})
271276
)
272277

273278
progress = func(p transfer.Progress) {
@@ -409,7 +414,7 @@ func displayNode(w io.Writer, prefix string, nodes []*progressNode) int64 {
409414
name := prefix + pf + displayName(status.Name)
410415

411416
switch status.Event {
412-
case "downloading", "uploading":
417+
case "downloading", "uploading", "extracting":
413418
var bar progress.Bar
414419
if status.Total > 0.0 {
415420
bar = progress.Bar(float64(status.Progress) / float64(status.Total))
@@ -425,7 +430,7 @@ func displayNode(w io.Writer, prefix string, nodes []*progressNode) int64 {
425430
name,
426431
status.Event,
427432
bar)
428-
case "complete":
433+
case "complete", "extracted":
429434
bar := progress.Bar(1.0)
430435
fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t\n",
431436
name,

core/diff/apply/apply.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,16 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
7171
if err != nil {
7272
return emptyDesc, fmt.Errorf("failed to get reader from content store: %w", err)
7373
}
74-
defer ra.Close()
74+
var r io.ReadCloser
75+
if config.Progress != nil {
76+
r = newProgressReader(ra, config.Progress)
77+
} else {
78+
r = newReadCloser(ra)
79+
}
80+
defer r.Close()
7581

7682
var processors []diff.StreamProcessor
77-
processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra))
83+
processor := diff.NewProcessorChain(desc.MediaType, r)
7884
processors = append(processors, processor)
7985
for {
8086
if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil {
@@ -110,6 +116,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
110116
}
111117
}
112118
}
119+
113120
return ocispec.Descriptor{
114121
MediaType: ocispec.MediaTypeImageLayer,
115122
Size: rc.c,
@@ -129,3 +136,46 @@ func (rc *readCounter) Read(p []byte) (n int, err error) {
129136
}
130137
return
131138
}
139+
140+
type progressReader struct {
141+
rc *readCounter
142+
c io.Closer
143+
p func(int64)
144+
}
145+
146+
func newProgressReader(ra content.ReaderAt, p func(int64)) io.ReadCloser {
147+
return &progressReader{
148+
rc: &readCounter{
149+
r: content.NewReader(ra),
150+
c: 0,
151+
},
152+
c: ra,
153+
p: p,
154+
}
155+
}
156+
157+
func (pr *progressReader) Read(p []byte) (n int, err error) {
158+
// Call the progress function with the current count, indicating
159+
// the previously read content has been processed. Initial
160+
// progress of 0 indicates start of processing.
161+
pr.p(pr.rc.c)
162+
n, err = pr.rc.Read(p)
163+
return
164+
}
165+
166+
func (pr *progressReader) Close() error {
167+
pr.p(pr.rc.c)
168+
return pr.c.Close()
169+
}
170+
171+
type readCloser struct {
172+
io.Reader
173+
io.Closer
174+
}
175+
176+
func newReadCloser(ra content.ReaderAt) io.ReadCloser {
177+
return &readCloser{
178+
Reader: content.NewReader(ra),
179+
Closer: ra,
180+
}
181+
}

core/diff/diff.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ type ApplyConfig struct {
6969
ProcessorPayloads map[string]typeurl.Any
7070
// SyncFs is to synchronize the underlying filesystem containing files
7171
SyncFs bool
72+
// Progress is a function which reports status of processed read data
73+
Progress func(int64)
7274
}
7375

7476
// ApplyOpt is used to configure an Apply operation
@@ -135,6 +137,18 @@ func WithSyncFs(sync bool) ApplyOpt {
135137
}
136138
}
137139

140+
// WithProgress is used to indicate process of the apply operation, should
141+
// atleast expect a progress of 0 and of the final size. It is up to the applier
142+
// how much progress it reports in between.
143+
func WithProgress(f func(ocispec.Descriptor, int64)) ApplyOpt {
144+
return func(_ context.Context, desc ocispec.Descriptor, c *ApplyConfig) error {
145+
c.Progress = func(state int64) {
146+
f(desc, state)
147+
}
148+
return nil
149+
}
150+
}
151+
138152
// WithSourceDateEpoch specifies the timestamp used to provide control for reproducibility.
139153
// See also https://reproducible-builds.org/docs/source-date-epoch/ .
140154
//

core/diff/proxy/differ.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
5656
for k, v := range config.ProcessorPayloads {
5757
payloads[k] = typeurl.MarshalProto(v)
5858
}
59-
59+
if config.Progress != nil {
60+
config.Progress(0)
61+
}
6062
req := &diffapi.ApplyRequest{
6163
Diff: oci.DescriptorToProto(desc),
6264
Mounts: mount.ToProto(mounts),
@@ -67,6 +69,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
6769
if err != nil {
6870
return ocispec.Descriptor{}, errgrpc.ToNative(err)
6971
}
72+
if config.Progress != nil {
73+
config.Progress(desc.Size)
74+
}
7075
return oci.DescriptorFromProto(resp.Applied), nil
7176
}
7277

core/transfer/local/progress.go

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type ProgressTracker struct {
3535
root string
3636
transferState string
3737
added chan jobUpdate
38+
extraction chan extractionUpdate
3839
waitC chan struct{}
3940

4041
parents map[digest.Digest][]ocispec.Descriptor
@@ -47,6 +48,8 @@ const (
4748
jobAdded jobState = iota
4849
jobInProgress
4950
jobComplete
51+
jobExtracting
52+
jobExtracted
5053
)
5154

5255
type jobStatus struct {
@@ -72,12 +75,18 @@ type StatusTracker interface {
7275
Check(context.Context, digest.Digest) (bool, error)
7376
}
7477

78+
type extractionUpdate struct {
79+
desc ocispec.Descriptor
80+
progress int64
81+
}
82+
7583
// NewProgressTracker tracks content download progress
7684
func NewProgressTracker(root, transferState string) *ProgressTracker {
7785
return &ProgressTracker{
7886
root: root,
7987
transferState: transferState,
8088
added: make(chan jobUpdate, 1),
89+
extraction: make(chan extractionUpdate, 1),
8190
waitC: make(chan struct{}),
8291
parents: map[digest.Digest][]ocispec.Descriptor{},
8392
}
@@ -97,7 +106,8 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre
97106
log.G(ctx).WithError(err).Error("failed to get statuses for progress")
98107
}
99108
for dgst, job := range jobs {
100-
if job.state != jobComplete {
109+
switch job.state {
110+
case jobAdded, jobInProgress:
101111
status, ok := active.Status(job.name)
102112
if ok {
103113
if status.Offset > job.progress {
@@ -130,6 +140,30 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre
130140
jobs[dgst] = job
131141
}
132142
}
143+
case jobExtracting:
144+
if job.progress == job.desc.Size {
145+
pf(transfer.Progress{
146+
Event: "extracted",
147+
Name: job.name,
148+
Parents: job.parents,
149+
Progress: job.desc.Size,
150+
Total: job.desc.Size,
151+
Desc: &job.desc,
152+
})
153+
job.state = jobExtracted
154+
jobs[dgst] = job
155+
} else {
156+
pf(transfer.Progress{
157+
Event: "extracting",
158+
Name: job.name,
159+
Parents: job.parents,
160+
Progress: job.progress,
161+
Total: job.desc.Size,
162+
Desc: &job.desc,
163+
})
164+
}
165+
case jobComplete, jobExtracted:
166+
// No progress to send
133167
}
134168
}
135169
}
@@ -161,10 +195,9 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre
161195
}
162196
jobs[update.desc.Digest] = job
163197
pf(transfer.Progress{
164-
Event: "waiting",
165-
Name: name,
166-
Parents: parents,
167-
//Digest: desc.Digest.String(),
198+
Event: "waiting",
199+
Name: name,
200+
Parents: parents,
168201
Progress: 0,
169202
Total: update.desc.Size,
170203
Desc: &job.desc,
@@ -181,7 +214,37 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre
181214
job.state = jobComplete
182215
job.progress = job.desc.Size
183216
}
217+
case extraction := <-j.extraction:
218+
job, ok := jobs[extraction.desc.Digest]
219+
if !ok {
220+
// Only captures the parents defined before,
221+
// could handle parent updates in same thread
222+
// if there is a synchronization issue
223+
var parents []string
224+
j.parentL.Lock()
225+
for _, parent := range j.parents[extraction.desc.Digest] {
226+
parents = append(parents, remotes.MakeRefKey(ctx, parent))
227+
}
228+
j.parentL.Unlock()
229+
if len(parents) == 0 {
230+
parents = []string{j.root}
231+
}
232+
name := remotes.MakeRefKey(ctx, extraction.desc)
184233

234+
job = &jobStatus{
235+
state: jobExtracting,
236+
name: name,
237+
parents: parents,
238+
progress: extraction.progress,
239+
desc: extraction.desc,
240+
}
241+
jobs[extraction.desc.Digest] = job
242+
} else {
243+
if job.state != jobExtracting {
244+
job.state = jobExtracting
245+
}
246+
job.progress = extraction.progress
247+
}
185248
case <-tc.C:
186249
update()
187250
// Next timer?
@@ -226,6 +289,16 @@ func (j *ProgressTracker) AddChildren(desc ocispec.Descriptor, children []ocispe
226289

227290
}
228291

292+
func (j *ProgressTracker) ExtractProgress(desc ocispec.Descriptor, progress int64) {
293+
if j == nil {
294+
return
295+
}
296+
j.extraction <- extractionUpdate{
297+
desc: desc,
298+
progress: progress,
299+
}
300+
}
301+
229302
func (j *ProgressTracker) Wait() {
230303
// timeout rather than rely on cancel
231304
timeout := time.After(10 * time.Second)

core/transfer/local/pull.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2727

2828
"github.com/containerd/containerd/v2/core/content"
29+
"github.com/containerd/containerd/v2/core/diff"
2930
"github.com/containerd/containerd/v2/core/images"
3031
"github.com/containerd/containerd/v2/core/remotes"
3132
"github.com/containerd/containerd/v2/core/remotes/docker"
@@ -200,6 +201,9 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
200201
if v, ok := mu.SnapshotterExports["enable_remote_snapshot_annotations"]; ok && v == "true" {
201202
enableRemoteSnapshotAnnotations = true
202203
}
204+
if progressTracker != nil {
205+
mu.ApplyOpts = append(mu.ApplyOpts, diff.WithProgress(progressTracker.ExtractProgress))
206+
}
203207
uopts = append(uopts, unpack.WithUnpackPlatform(mu))
204208
} else {
205209
log.G(ctx).WithFields(log.Fields{

0 commit comments

Comments
 (0)