Skip to content

Commit

Permalink
storage: support partial storage with zstd:chunked
Browse files Browse the repository at this point in the history
Signed-off-by: Giuseppe Scrivano <[email protected]>
  • Loading branch information
giuseppe committed Jul 2, 2021
1 parent 93477e6 commit 67a4c99
Showing 1 changed file with 92 additions and 6 deletions.
98 changes: 92 additions & 6 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/containers/image/v5/pkg/blobinfocache/none"
"github.com/containers/image/v5/types"
"github.com/containers/storage"
"github.com/containers/storage/drivers"
"github.com/containers/storage/pkg/archive"
"github.com/containers/storage/pkg/chunked"
"github.com/containers/storage/pkg/ioutils"
digest "github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -77,12 +79,13 @@ type storageImageDestination struct {
indexToStorageID map[int]*string
// All accesses to below data are protected by `lock` which is made
// *explicit* in the code.
blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed)
indexToPulledLayerInfo map[int]*manifest.LayerInfo // Mapping from layer (by index) to pulled down blob
blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer
blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed)
indexToPulledLayerInfo map[int]*manifest.LayerInfo // Mapping from layer (by index) to pulled down blob
blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer
diffOutputs map[digest.Digest]*graphdriver.DriverWithDifferOutput // Mapping from digest to differ output
}

type storageImageCloser struct {
Expand Down Expand Up @@ -404,6 +407,7 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (*
SignaturesSizes: make(map[digest.Digest][]int),
indexToStorageID: make(map[int]*string),
indexToPulledLayerInfo: make(map[int]*manifest.LayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
}
return image, nil
}
Expand All @@ -419,6 +423,11 @@ func (s *storageImageDestination) Close() error {
for _, al := range s.blobAdditionalLayer {
al.Release()
}
for _, v := range s.diffOutputs {
if v.Target != "" {
_ = s.imageRef.transport.store.CleanupStagingDirectory(v.Target)
}
}
return os.RemoveAll(s.directory)
}

Expand Down Expand Up @@ -573,6 +582,61 @@ func (s *storageImageDestination) tryReusingBlobWithSrcRef(ctx context.Context,
return s.tryReusingBlobLocked(ctx, blobinfo, cache, canSubstitute)
}

type zstdFetcher struct {
stream internalTypes.ImageSourceSeekable
ctx context.Context
blobInfo types.BlobInfo
}

// GetBlobAt converts from chunked.GetBlobAt to ImageSourceSeekable.GetBlobAt.
func (f *zstdFetcher) GetBlobAt(chunks []chunked.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
var newChunks []internalTypes.ImageSourceChunk
for _, v := range chunks {
i := internalTypes.ImageSourceChunk{
Offset: v.Offset,
Length: v.Length,
}
newChunks = append(newChunks, i)
}
rc, errs, err := f.stream.GetBlobAt(f.ctx, f.blobInfo, newChunks)
if _, ok := err.(internalTypes.BadPartialRequestError); ok {
err = chunked.ErrBadRequest{}
}
return rc, errs, err

}

// PutBlobPartial attempts to create a blob using the data that is already present at the destination storage. stream is accessed
// in a non-sequential way to retrieve the missing chunks.
func (s *storageImageDestination) PutBlobPartial(ctx context.Context, stream internalTypes.ImageSourceSeekable, srcInfo types.BlobInfo, cache types.BlobInfoCache) (types.BlobInfo, error) {
fetcher := zstdFetcher{
stream: stream,
ctx: ctx,
blobInfo: srcInfo,
}

differ, err := chunked.GetDiffer(ctx, s.imageRef.transport.store, srcInfo.Size, srcInfo.Annotations, &fetcher)
if err != nil {
return srcInfo, err
}

out, err := s.imageRef.transport.store.ApplyDiffWithDiffer("", nil, differ)
if err != nil {
return srcInfo, err
}

blobDigest := srcInfo.Digest

s.lock.Lock()
s.blobDiffIDs[blobDigest] = blobDigest
s.fileSizes[blobDigest] = 0
s.filenames[blobDigest] = ""
s.diffOutputs[blobDigest] = out
s.lock.Unlock()

return srcInfo, nil
}

// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
Expand Down Expand Up @@ -844,6 +908,27 @@ func (s *storageImageDestination) commitLayer(ctx context.Context, blob manifest
return nil
}

s.lock.Lock()
diffOutput, ok := s.diffOutputs[blob.Digest]
s.lock.Unlock()
if ok {
layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil)
if err != nil {
return err
}

// FIXME: what to do with the uncompressed digest?
diffOutput.UncompressedDigest = blob.Digest

if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, nil); err != nil {
_ = s.imageRef.transport.store.Delete(layer.ID)
return err
}

s.indexToStorageID[index] = &layer.ID
return nil
}

s.lock.Lock()
al, ok := s.blobAdditionalLayer[blob.Digest]
s.lock.Unlock()
Expand Down Expand Up @@ -969,6 +1054,7 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
return errors.Wrapf(err, "parsing manifest")
}
layerBlobs := man.LayerInfos()

// Extract, commit, or find the layers.
for i, blob := range layerBlobs {
if err := s.commitLayer(ctx, blob, i); err != nil {
Expand Down

0 comments on commit 67a4c99

Please sign in to comment.