From 15e1750895d9f5a46faab8bb0ea376d506adf70a Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Thu, 4 Apr 2019 16:31:38 +0200 Subject: [PATCH] storage destination: commit layer in PutBlob() Commit a layer directly in PutBlob() to the storage. This will allow future commits to implement blob locking without leaking and deferring parts of the logic to callers. Signed-off-by: Valentin Rothberg --- copy/copy.go | 11 +- storage/storage_image.go | 304 ++++++++++++++++++++++----------------- 2 files changed, 184 insertions(+), 131 deletions(-) diff --git a/copy/copy.go b/copy/copy.go index 3ed8a2b824..b02e170cc5 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -468,7 +468,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { } data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) { + copyLayerHelper := func(index int, srcLayer types.BlobInfo, previousLayer string, pool *mpb.Progress) { defer copySemaphore.Release(1) defer copyGroup.Done() cld := copyLayerData{} @@ -483,7 +483,8 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) } } else { - cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool) + c := context.WithValue(ctx, "previousLayer", previousLayer) + cld.destInfo, cld.diffID, cld.err = ic.copyLayer(c, srcLayer, pool) } data[index] = cld } @@ -493,8 +494,12 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { defer progressCleanup() for i, srcLayer := range srcInfos { + previousLayer := "" + if i >= 1 { + previousLayer = srcInfos[i-1].Digest.String() + } copySemaphore.Acquire(ctx, 1) - go copyLayerHelper(i, srcLayer, progressPool) + go copyLayerHelper(i, srcLayer, previousLayer, progressPool) } // Wait for all layers to be copied diff --git a/storage/storage_image.go b/storage/storage_image.go index b39d2bcc04..bc907dacfd 100644 --- a/storage/storage_image.go +++ b/storage/storage_image.go @@ -18,7 +18,6 @@ import ( "github.com/containers/image/image" "github.com/containers/image/internal/tmpdir" "github.com/containers/image/manifest" - "github.com/containers/image/pkg/blobinfocache/none" "github.com/containers/image/types" "github.com/containers/storage" "github.com/containers/storage/pkg/archive" @@ -54,16 +53,18 @@ type storageImageSource struct { } type storageImageDestination struct { - imageRef storageReference - directory string // Temporary directory where we store blobs until Commit() time - nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs - manifest []byte // Manifest contents, temporary - signatures []byte // Signature contents, temporary - putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions - blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs - fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes - filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them - SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + imageRef storageReference + directory string // Temporary directory where we store blobs until Commit() time + nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs + manifest []byte // Manifest contents, temporary + signatures []byte // Signature contents, temporary + putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions + blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs + fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes + filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them + SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + layerIDs map[string]string // Mapping from layer blobsums to the storage ID + previousLayerResult map[string]chan error // Used to serialize concurrent PutBlob calls } type storageImageCloser struct { @@ -323,16 +324,30 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e return nil, errors.Wrapf(err, "error creating a temporary directory") } image := &storageImageDestination{ - imageRef: imageRef, - directory: directory, - blobDiffIDs: make(map[digest.Digest]digest.Digest), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, + imageRef: imageRef, + directory: directory, + blobDiffIDs: make(map[digest.Digest]digest.Digest), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + previousLayerResult: make(map[string]chan error), + layerIDs: make(map[string]string), + SignatureSizes: []int{}, } return image, nil } +func (s *storageImageDestination) getChannelForLayer(layer string) chan error { + s.putBlobMutex.Lock() + channel, ok := s.previousLayerResult[layer] + if !ok { + // A buffered channel to allow non-blocking sends + channel = make(chan error, 1) + s.previousLayerResult[layer] = channel + } + s.putBlobMutex.Unlock() + return channel +} + // Reference returns the reference used to set up this destination. Note that this should directly correspond to user's intent, // e.g. it should use the public hostname instead of the result of resolving CNAMEs or following redirects. func (s *storageImageDestination) Reference() types.ImageReference { @@ -425,21 +440,28 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, } // This is safe because we have just computed both values ourselves. cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest()) - return types.BlobInfo{ + blob := types.BlobInfo{ Digest: blobDigest, Size: blobSize, MediaType: blobinfo.MediaType, - }, nil + } + + if !isConfig { + channel := s.getChannelForLayer(blobinfo.Digest.String()) + id, err := s.commitBlob(ctx, blob) + s.putBlobMutex.Lock() + s.layerIDs[blobinfo.Digest.String()] = id + s.putBlobMutex.Unlock() + channel <- err + if err != nil { + return errorBlobInfo, err + } + } + return blob, nil } -// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination -// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). -// info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. -// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. -// May use and/or update cache. -func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +// tryReusingBlob is a helper method for TryReusingBlob to wrap it +func (s *storageImageDestination) tryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { // lock the entire method as it executes fairly quickly s.putBlobMutex.Lock() defer s.putBlobMutex.Unlock() @@ -467,6 +489,7 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t if len(layers) > 0 { // Save this for completeness. s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest + s.layerIDs[blobinfo.Digest.String()] = layers[0].ID return true, types.BlobInfo{ Digest: blobinfo.Digest, Size: layers[0].UncompressedSize, @@ -482,6 +505,7 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t if len(layers) > 0 { // Record the uncompressed value so that we can use it to calculate layer IDs. s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest + s.layerIDs[blobinfo.Digest.String()] = layers[0].ID return true, types.BlobInfo{ Digest: blobinfo.Digest, Size: layers[0].CompressedSize, @@ -500,6 +524,7 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t } if len(layers) > 0 { s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest + s.layerIDs[blobinfo.Digest.String()] = layers[0].ID return true, types.BlobInfo{ Digest: uncompressedDigest, Size: layers[0].UncompressedSize, @@ -513,6 +538,22 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t return false, types.BlobInfo{}, nil } +// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination +// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). +// info.Digest must not be empty. +// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. +// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. +// May use and/or update cache. +func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + reusable, blob, err := s.tryReusingBlob(ctx, blobinfo, cache, canSubstitute) + if err != nil || reusable { + channel := s.getChannelForLayer(blobinfo.Digest.String()) + channel <- err + } + return reusable, blob, err +} + // computeID computes a recommended image ID based on information we have so far. If // the manifest is not of a type that we recognize, we return an empty value, indicating // that since we don't have a recommendation, a random ID should be used if one needs @@ -572,6 +613,109 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er return nil, errors.New("blob not found") } +func (s *storageImageDestination) commitBlob(ctx context.Context, blob types.BlobInfo) (string, error) { + previousID := "" + if val := ctx.Value("previousLayer"); val != nil { + previousLayer := val.(string) + if previousLayer != "" { + channel := s.getChannelForLayer(previousLayer) + if err := <-channel; err != nil { + return "", errors.Wrapf(err, "error committing previous layer") + } + var ok bool + s.putBlobMutex.Lock() + previousID, ok = s.layerIDs[previousLayer] + s.putBlobMutex.Unlock() + if !ok { + return "", fmt.Errorf("error comitting layer %q: missing parent %q", blob.Digest.String(), previousLayer) + } + } + } + + // Check if there's already a layer with the ID that we'd give to the result of applying + // this layer blob to its parent, if it has one, or the blob's hex value otherwise. + s.putBlobMutex.Lock() + diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + s.putBlobMutex.Unlock() + if !haveDiffID { + return "", errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) + } + id := diffID.Hex() + if previousID != "" { + id = digest.Canonical.FromBytes([]byte(previousID + "+" + diffID.Hex())).Hex() + } + if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { + return id, nil + } + // Check if we previously cached a file with that blob's contents. If we didn't, + // then we need to read the desired contents from a layer. + s.putBlobMutex.Lock() + filename, ok := s.filenames[blob.Digest] + s.putBlobMutex.Unlock() + if !ok { + // Try to find the layer with contents matching that blobsum. + layer := "" + layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } else { + layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) + if err2 == nil && len(layers) > 0 { + layer = layers[0].ID + } + } + if layer == "" { + return "", errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) + } + // Read the layer's contents. + noCompression := archive.Uncompressed + diffOptions := &storage.DiffOptions{ + Compression: &noCompression, + } + diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) + if err2 != nil { + return "", errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) + } + // Copy the layer diff to a file. Diff() takes a lock that it holds + // until the ReadCloser that it returns is closed, and PutLayer() wants + // the same lock, so the diff can't just be directly streamed from one + // to the other. + filename = s.computeNextBlobCacheFile() + file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) + if err != nil { + diff.Close() + return "", errors.Wrapf(err, "error creating temporary file %q", filename) + } + // Copy the data to the file. + // TODO: This can take quite some time, and should ideally be cancellable using + // ctx.Done(). + _, err = io.Copy(file, diff) + diff.Close() + file.Close() + if err != nil { + return "", errors.Wrapf(err, "error storing blob to file %q", filename) + } + // Make sure that we can find this file later, should we need the layer's + // contents again. + s.putBlobMutex.Lock() + s.filenames[blob.Digest] = filename + s.putBlobMutex.Unlock() + } + // Read the cached blob and use it as a diff. + file, err := os.Open(filename) + if err != nil { + return "", errors.Wrapf(err, "error opening file %q", filename) + } + defer file.Close() + // Build the new layer using the diff, regardless of where it came from. + // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). + layer, _, err := s.imageRef.transport.store.PutLayer(id, previousID, nil, "", false, nil, file) + if err != nil && errors.Cause(err) != storage.ErrDuplicateID { + return "", errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + } + return layer.ID, nil +} + func (s *storageImageDestination) Commit(ctx context.Context) error { // Find the list of layer blobs. if len(s.manifest) == 0 { @@ -583,107 +727,10 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { } layerBlobs := man.LayerInfos() // Extract or find the layers. - lastLayer := "" - for _, blob := range layerBlobs { - if blob.EmptyLayer { - continue - } - - // Check if there's already a layer with the ID that we'd give to the result of applying - // this layer blob to its parent, if it has one, or the blob's hex value otherwise. - diffID, haveDiffID := s.blobDiffIDs[blob.Digest] - if !haveDiffID { - // Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(), - // or to even check if we had it. - // Use none.NoCache to avoid a repeated DiffID lookup in the BlobInfoCache; a caller - // that relies on using a blob digest that has never been seeen by the store had better call - // TryReusingBlob; not calling PutBlob already violates the documented API, so there’s only - // so far we are going to accommodate that (if we should be doing that at all). - logrus.Debugf("looking for diffID for blob %+v", blob.Digest) - has, _, err := s.TryReusingBlob(ctx, blob.BlobInfo, none.NoCache, false) - if err != nil { - return errors.Wrapf(err, "error checking for a layer based on blob %q", blob.Digest.String()) - } - if !has { - return errors.Errorf("error determining uncompressed digest for blob %q", blob.Digest.String()) - } - diffID, haveDiffID = s.blobDiffIDs[blob.Digest] - if !haveDiffID { - return errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) - } - } - id := diffID.Hex() - if lastLayer != "" { - id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffID.Hex())).Hex() - } - if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { - // There's already a layer that should have the right contents, just reuse it. - lastLayer = layer.ID - continue - } - // Check if we previously cached a file with that blob's contents. If we didn't, - // then we need to read the desired contents from a layer. - filename, ok := s.filenames[blob.Digest] - if !ok { - // Try to find the layer with contents matching that blobsum. - layer := "" - layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } else { - layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } - } - if layer == "" { - return errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) - } - // Read the layer's contents. - noCompression := archive.Uncompressed - diffOptions := &storage.DiffOptions{ - Compression: &noCompression, - } - diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) - if err2 != nil { - return errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) - } - // Copy the layer diff to a file. Diff() takes a lock that it holds - // until the ReadCloser that it returns is closed, and PutLayer() wants - // the same lock, so the diff can't just be directly streamed from one - // to the other. - filename = s.computeNextBlobCacheFile() - file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) - if err != nil { - diff.Close() - return errors.Wrapf(err, "error creating temporary file %q", filename) - } - // Copy the data to the file. - // TODO: This can take quite some time, and should ideally be cancellable using - // ctx.Done(). - _, err = io.Copy(file, diff) - diff.Close() - file.Close() - if err != nil { - return errors.Wrapf(err, "error storing blob to file %q", filename) - } - // Make sure that we can find this file later, should we need the layer's - // contents again. - s.filenames[blob.Digest] = filename - } - // Read the cached blob and use it as a diff. - file, err := os.Open(filename) - if err != nil { - return errors.Wrapf(err, "error opening file %q", filename) - } - defer file.Close() - // Build the new layer using the diff, regardless of where it came from. - // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). - layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, nil, file) - if err != nil && errors.Cause(err) != storage.ErrDuplicateID { - return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) - } - lastLayer = layer.ID + lastBlob := layerBlobs[len(layerBlobs)-1] + lastLayer, ok := s.layerIDs[lastBlob.Digest.String()] + if !ok { + return fmt.Errorf("could not find top layer") } // If one of those blobs was a configuration blob, then we can try to dig out the date when the image @@ -724,6 +771,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { for blob := range s.filenames { dataBlobs[blob] = struct{}{} } + for _, layerBlob := range layerBlobs { delete(dataBlobs, layerBlob.Digest) }