diff --git a/pkg/v1/remote/write.go b/pkg/v1/remote/write.go index fb3dd7974..7b9f31e19 100644 --- a/pkg/v1/remote/write.go +++ b/pkg/v1/remote/write.go @@ -337,16 +337,29 @@ func (r *progressReader) Close() error { return r.rc.Close() } // streamBlob streams the contents of the blob to the specified location. // On failure, this will return an error. On success, this will return the location // header indicating how to commit the streamed blob. -func (w *writer) streamBlob(ctx context.Context, blob io.ReadCloser, streamLocation string) (commitLocation string, rerr error) { +func (w *writer) streamBlob(ctx context.Context, layer v1.Layer, streamLocation string) (commitLocation string, rerr error) { reset := func() {} defer func() { if rerr != nil { reset() } }() + blob, err := layer.Compressed() + if err != nil { + return "", err + } + + getBody := layer.Compressed if w.updates != nil { var count int64 blob = &progressReader{rc: blob, updates: w.updates, lastUpdate: w.lastUpdate, count: &count} + getBody = func() (io.ReadCloser, error) { + blob, err := layer.Compressed() + if err != nil { + return nil, err + } + return &progressReader{rc: blob, updates: w.updates, lastUpdate: w.lastUpdate, count: &count}, nil + } reset = func() { atomic.AddInt64(&w.lastUpdate.Complete, -count) w.updates <- *w.lastUpdate @@ -357,6 +370,10 @@ func (w *writer) streamBlob(ctx context.Context, blob io.ReadCloser, streamLocat if err != nil { return "", err } + if _, ok := layer.(*stream.Layer); !ok { + // We can't retry streaming layers. + req.GetBody = getBody + } req.Header.Set("Content-Type", "application/octet-stream") resp, err := w.client.Do(req.WithContext(ctx)) @@ -467,11 +484,7 @@ func (w *writer) uploadOne(ctx context.Context, l v1.Layer) error { ctx = redact.NewContext(ctx, "omitting binary blobs from logs") } - blob, err := l.Compressed() - if err != nil { - return err - } - location, err = w.streamBlob(ctx, blob, location) + location, err = w.streamBlob(ctx, l, location) if err != nil { return err } diff --git a/pkg/v1/remote/write_test.go b/pkg/v1/remote/write_test.go index 3d031c1a8..092465e9c 100644 --- a/pkg/v1/remote/write_test.go +++ b/pkg/v1/remote/write_test.go @@ -586,12 +586,8 @@ func TestStreamBlob(t *testing.T) { if err != nil { t.Fatalf("ConfigLayer: %v", err) } - blob, err := l.Compressed() - if err != nil { - t.Fatalf("layer.Compressed: %v", err) - } - commitLocation, err := w.streamBlob(context.Background(), blob, streamLocation.String()) + commitLocation, err := w.streamBlob(context.Background(), l, streamLocation.String()) if err != nil { t.Errorf("streamBlob() = %v", err) } @@ -638,12 +634,8 @@ func TestStreamLayer(t *testing.T) { streamLocation := w.url(expectedPath) sl := stream.NewLayer(newBlob()) - blob, err := sl.Compressed() - if err != nil { - t.Fatalf("layer.Compressed: %v", err) - } - commitLocation, err := w.streamBlob(context.Background(), blob, streamLocation.String()) + commitLocation, err := w.streamBlob(context.Background(), sl, streamLocation.String()) if err != nil { t.Errorf("streamBlob: %v", err) }