Skip to content

Commit

Permalink
storage destination: commit layer in PutBlob()
Browse files Browse the repository at this point in the history
Commit a layer directly in PutBlob() to the storage. This will allow
future commits to implement blob locking without leaking and deferring
parts of the logic to callers.

Note that API of PutBlob() and TryReusingBlob() are extended with an
index corresponding to the layer's index in the image.  Currently,
only the storage destination is making use of the index.

Signed-off-by: Valentin Rothberg <[email protected]>
  • Loading branch information
vrothberg committed Apr 15, 2019
1 parent 3313dcd commit cdbac2d
Show file tree
Hide file tree
Showing 15 changed files with 339 additions and 182 deletions.
91 changes: 57 additions & 34 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,6 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
err error
}

// copyGroup is used to determine if all layers are copied
copyGroup := sync.WaitGroup{}
copyGroup.Add(numLayers)

// copySemaphore is used to limit the number of parallel downloads to
// avoid malicious images causing troubles and to be nice to servers.
var copySemaphore *semaphore.Weighted
Expand All @@ -467,43 +463,60 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
copySemaphore = semaphore.NewWeighted(int64(1))
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
// copyGroup is used to determine if all layers are copied
copyGroup := sync.WaitGroup{}

cancelCtx, cancelFunc := context.WithCancel(ctx)
progressPool, progressCleanup := ic.c.newProgressPool(cancelCtx)
defer progressCleanup()

layerIndex := 0
digestToCopyData := make(map[digest.Digest]*copyLayerData)
for _, srcLayer := range srcInfos {
// Do not copy a layer twice
if _, ok := digestToCopyData[srcLayer.Digest]; ok {
continue
}

cld := copyLayerData{}
digestToCopyData[srcLayer.Digest] = &cld
if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 {
// DiffIDs are, currently, needed only when converting from schema1.
// In which case src.LayerInfos will not have URLs because schema1
// does not support them.
if ic.diffIDsAreNeeded {
cld.err = errors.New("getting DiffID for foreign layers is unimplemented")
cancelFunc()
return errors.New("getting DiffID for foreign layers is unimplemented")
} else {
logrus.Debugf("Skipping foreign layer %q copy to %s", srcLayer.Digest, ic.c.dest.Reference().Transport().Name())
cld.destInfo = srcLayer
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool)
}
data[index] = cld
}

func() { // A scope for defer
progressPool, progressCleanup := ic.c.newProgressPool(ctx)
defer progressCleanup()

for i, srcLayer := range srcInfos {
copySemaphore.Acquire(ctx, 1)
go copyLayerHelper(i, srcLayer, progressPool)
}
copySemaphore.Acquire(cancelCtx, 1)
copyGroup.Add(1)
go func(index int, srcLayer types.BlobInfo, cld *copyLayerData) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(cancelCtx, srcLayer, index, progressPool)
if cld.err != nil {
logrus.Errorf("copying layer %d failed: %v", index, cld.err)
cancelFunc()
}
}(layerIndex, srcLayer, &cld)
layerIndex++
}

// Wait for all layers to be copied
copyGroup.Wait()
}()
// Wait for all layers to be copied
copyGroup.Wait()

destInfos := make([]types.BlobInfo, numLayers)
diffIDs := make([]digest.Digest, numLayers)
for i, cld := range data {
for i, srcLayer := range srcInfos {
cld, ok := digestToCopyData[srcLayer.Digest]
if !ok {
return errors.Errorf("no copy data for layer %q", srcLayer.Digest)
}
if cld.err != nil {
return cld.err
}
Expand All @@ -516,6 +529,16 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
ic.manifestUpdates.InformationOnly.LayerDiffIDs = diffIDs
}
if srcInfosUpdated || layerDigestsDiffer(srcInfos, destInfos) {
logrus.Errorf("DIGESTS DIFFER!")
for _, b := range srcInfos {
logrus.Errorf("%q", b.Digest)
}
logrus.Errorf("----------------------------")
for _, b := range destInfos {
logrus.Errorf("%q", b.Digest)
}
logrus.Errorf("----------------------------")

ic.manifestUpdates.LayerInfos = destInfos
}
return nil
Expand Down Expand Up @@ -625,7 +648,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar)
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar)
if err != nil {
return types.BlobInfo{}, err
}
Expand All @@ -651,13 +674,13 @@ type diffIDResult struct {

// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

// If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source.
if !diffIDIsNeeded {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
Expand All @@ -678,7 +701,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar)
blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar)
if err != nil {
return types.BlobInfo{}, "", err
}
Expand Down Expand Up @@ -709,7 +732,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po
// perhaps compressing the stream if canCompress,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
layerIndexInImage int, diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

Expand All @@ -733,7 +756,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea
return pipeWriter
}
}
blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success
blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success
return blobInfo, diffIDChan, err
// We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan
}
Expand Down Expand Up @@ -768,7 +791,7 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc)
// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil,
// perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied blob.
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers.
Expand Down Expand Up @@ -847,7 +870,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig)
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig)
if err != nil {
return types.BlobInfo{}, errors.Wrap(err, "Error writing blob")
}
Expand Down
4 changes: 2 additions & 2 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob")
if err != nil {
return types.BlobInfo{}, err
Expand Down Expand Up @@ -182,7 +182,7 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
Expand Down
4 changes: 2 additions & 2 deletions directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestGetPutBlob(t *testing.T) {
require.NoError(t, err)
defer dest.Close()
assert.Equal(t, types.PreserveOriginal, dest.DesiredLayerCompression())
info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, cache, false)
info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, 0, cache, false)
assert.NoError(t, err)
err = dest.Commit(context.Background())
assert.NoError(t, err)
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestPutBlobDigestFailure(t *testing.T) {
dest, err := ref.NewImageDestination(context.Background(), nil)
require.NoError(t, err)
defer dest.Close()
_, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false)
_, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, 0, cache, false)
assert.Error(t, err)
assert.Contains(t, digestErrorString, err.Error())
err = dest.Commit(context.Background())
Expand Down
8 changes: 4 additions & 4 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/containers/image/pkg/blobinfocache/none"
"github.com/containers/image/types"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/api/v2"
v2 "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/client"
"github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -124,12 +124,12 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
if inputInfo.Digest.String() != "" {
// This should not really be necessary, at least the copy code calls TryReusingBlob automatically.
// Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value.
// But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_.
haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, none.NoCache, false)
haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, none.NoCache, false)
if err != nil {
return types.BlobInfo{}, err
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
Expand Down
6 changes: 3 additions & 3 deletions docker/tarfile/dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (d *Destination) HasThreadSafePutBlob() bool {
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
// Ouch, we need to stream the blob into a temporary file just to determine the size.
// When the layer is decompressed, we also have to generate the digest on uncompressed datas.
if inputInfo.Size == -1 || inputInfo.Digest.String() == "" {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
}

// Maybe the blob has been already sent
ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, cache, false)
ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, cache, false)
if err != nil {
return types.BlobInfo{}, err
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf("Can not check for a blob with unknown digest")
}
Expand Down
2 changes: 1 addition & 1 deletion image/docker_schema2.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (m *manifestSchema2) convertToManifestSchema1(ctx context.Context, dest typ
logrus.Debugf("Uploading empty layer during conversion to schema 1")
// Ideally we should update the relevant BlobInfoCache about this layer, but that would require passing it down here,
// and anyway this blob is so small that it’s easier to just copy it than to worry about figuring out another location where to get it.
info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, none.NoCache, false)
info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, -1, none.NoCache, false)
if err != nil {
return nil, errors.Wrap(err, "Error uploading empty layer")
}
Expand Down
4 changes: 2 additions & 2 deletions image/docker_schema2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (d *memoryImageDest) IgnoresEmbeddedDockerReference() bool {
func (d *memoryImageDest) HasThreadSafePutBlob() bool {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
if d.storedBlobs == nil {
d.storedBlobs = make(map[digest.Digest][]byte)
}
Expand All @@ -415,7 +415,7 @@ func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputIn
d.storedBlobs[inputInfo.Digest] = contents
return types.BlobInfo{Digest: inputInfo.Digest, Size: int64(len(contents))}, nil
}
func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) {
func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, int, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) {
panic("Unexpected call to a mock function")
}
func (d *memoryImageDest) PutManifest(ctx context.Context, m []byte) error {
Expand Down
Loading

0 comments on commit cdbac2d

Please sign in to comment.