Skip to content

Commit

Permalink
Support layer deltas
Browse files Browse the repository at this point in the history
Deltas are a way to avoid downloading a full copy if a layer tar file
if you have a previous version of the layer available locally. In
testing these deltas have been shown to be around 10x to 100x smaller
than the .tar.gz files for typical Linux base images.

In the typical client-side case we have some previous version of the
image stored in container-storage somewhere, which means that we have
an uncompressed files available, but not the actual tarball
(compressed or not).

This means we can use github.com/alexlarsson/tar-diff which takes
two tar files and produces a delta file which when applied on the
untar:ed content of the first tarfile produces the (bitwise identical)
content of the uncompressed second tarfile. It just happens that the
uncompressed tarfile is exactly what we need to reproduce, because that
is how the layers are refered to in the image config (the DiffIDs).

How this works is that we use OCI artifacts to store, for each regular
image a manifest with information about the available deltas for the
image.  This image looks like a regular manifest, except each layer
contains a tar-diff (as a blob) an uses the existing annotations key
to record which DiffIDs the layer applies to.

For example, a manifest would look like this:

```
{
  "schemaVersion": 2,
  "config": {
    "mediaType": "application/vnd.oci.image.config.v1+json",
    "digest": "sha256:ca3d163bab055381827226140568f3bef7eaac187cebd76878e0b63e9e442356",
    "size": 3
  },
  "layers": [
    {
      "mediaType": "application/vnd.redhat.tardiff",
      "digest":
"sha256:49402288de20a465616174a38aca4746f46be2c3f9519fe4d14fc7f83f44a32a",
      "size": 7059734,
      "annotations": {
          "com.redhat.deltaFrom":
"sha256:b9137868142acd7ce4d62216e2b03e63e9800e2b647bf682492d3e9c5e66277c",
          "com.redhat.deltaTo":
"sha256:c88d2d437799c2879fded33ee358429e1eb954968a25f3153e2e0e26fef7ef28"
      }
    }
  ]
}
```

The config blob is just an json file containing "{}". Ideally it
should not be of type application/vnd.oci.image.config.v1+json,
because that is reserved for docker-style images. However, as
explained in oras-project/oras#129, docker hub
doesn't currently support any other type. For registries that support
OCI artifacts we should instead use some other type so that tooling
can know that this is not a regular image.

The way we attach the delta manifest to the image is that we store it
in the same repo and a tag name based on the manifest digest like
"delta-${shortid}".

The delta layers record which DiffID they apply to, which is what we
want to use to look up the pre-existing layers to use as delta source
material, and it is what the delta apply will generate. This means
however that using the deltas only works if we're allowed to
substitute blobs, but this doesn't seem to be an issue in the typical
case.

Signed-off-by: Alexander Larsson <[email protected]>
  • Loading branch information
alexlarsson committed Apr 23, 2020
1 parent d1bf7c5 commit faabfc6
Show file tree
Hide file tree
Showing 23 changed files with 397 additions and 6 deletions.
144 changes: 139 additions & 5 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"io/ioutil"
"os"
"reflect"
"sort"
"strings"
"sync"
"time"

"github.com/alexlarsson/tar-diff/pkg/tar-patch"
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/image"
"github.com/containers/image/v5/internal/pkg/platform"
Expand Down Expand Up @@ -790,6 +792,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
srcInfosUpdated = true
}

deltaLayers, err := ic.src.DeltaLayers(ctx)
if err != nil {
return err
}

type copyLayerData struct {
destInfo types.BlobInfo
diffID digest.Digest
Expand All @@ -810,7 +817,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress) {
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress, deltaLayers []types.BlobInfo) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
Expand All @@ -825,7 +832,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool)
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, index, srcLayer, toEncrypt, pool, deltaLayers)
}
data[index] = cld
}
Expand Down Expand Up @@ -857,7 +864,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
if err != nil {
return errors.Wrapf(err, "Can't acquire semaphore")
}
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool)
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool, deltaLayers)
}

// Wait for all layers to be copied
Expand Down Expand Up @@ -1040,9 +1047,83 @@ type diffIDResult struct {
err error
}

// Get all the deltas that apply to this layer
func (ic *imageCopier) getMatchingDeltaLayers(ctx context.Context, srcIndex int, deltaLayers []types.BlobInfo) (digest.Digest, []*types.BlobInfo) {
if deltaLayers == nil {
return "", nil
}
config, _ := ic.src.OCIConfig(ctx)
if config == nil || config.RootFS.DiffIDs == nil || len(config.RootFS.DiffIDs) <= srcIndex {
return "", nil
}

layerDiffId := config.RootFS.DiffIDs[srcIndex]

var matchingLayers []*types.BlobInfo
for i := range deltaLayers {
deltaLayer := &deltaLayers[i]
to := deltaLayer.Annotations["com.redhat.deltaTo"]
if to == layerDiffId.String() {
matchingLayers = append(matchingLayers, deltaLayer)
}
}

return layerDiffId, matchingLayers
}

// Looks at which of the matching delta froms have locally available data and picks the best one
func (ic *imageCopier) resolveDeltaLayer(ctx context.Context, matchingDeltas []*types.BlobInfo) (io.ReadCloser, tar_patch.DataSource, types.BlobInfo, error) {
// Sort smallest deltas so we favour the smallest useable one
sort.Slice(matchingDeltas, func(i, j int) bool {
return matchingDeltas[i].Size < matchingDeltas[j].Size
})

for i := range matchingDeltas {
matchingDelta := matchingDeltas[i]
from := matchingDelta.Annotations["com.redhat.deltaFrom"]
fromDigest, err := digest.Parse(from)
if err != nil {
continue // Silently ignore if server specified a werid format
}

dataSource, err := ic.c.dest.GetLayerDeltaData(ctx, fromDigest)
if err != nil {
return nil, nil, types.BlobInfo{}, err // Internal error
}
if dataSource == nil {
continue // from layer doesn't exist
}

logrus.Debugf("Using delta %v for DiffID %v", matchingDelta.Digest, fromDigest)

deltaStream, _, err := ic.c.rawSource.GetBlob(ctx, *matchingDelta, ic.c.blobInfoCache)
if err != nil {
return nil, nil, types.BlobInfo{}, errors.Wrapf(err, "Error reading delta blob %s", matchingDelta.Digest)
}
return deltaStream, dataSource, *matchingDelta, nil
}
return nil, nil, types.BlobInfo{}, nil
}

func (ic *imageCopier) canUseDeltas(srcInfo types.BlobInfo) (bool, string) {
// Deltas rewrite the manifest to refer to the uncompressed digest, so we must be able to substiture blobs
if !ic.canSubstituteBlobs {
return false, ""
}

switch srcInfo.MediaType {
case manifest.DockerV2Schema2LayerMediaType, manifest.DockerV2SchemaLayerMediaTypeUncompressed:
return true, manifest.DockerV2SchemaLayerMediaTypeUncompressed
case imgspecv1.MediaTypeImageLayer, imgspecv1.MediaTypeImageLayerGzip, imgspecv1.MediaTypeImageLayerZstd:
return true, imgspecv1.MediaTypeImageLayer
}

return false, ""
}

// copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcIndex int, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, deltaLayers []types.BlobInfo) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
// Diffs are needed if we are encrypting an image or trying to decrypt an image
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" || toEncrypt || (isOciEncrypted(srcInfo.MediaType) && ic.ociDecryptConfig != nil)
Expand All @@ -1061,6 +1142,52 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
}

// First look for a delta that matches this layer and substitute the result of that
if ok, deltaResultMediaType := ic.canUseDeltas(srcInfo); ok {
// Get deltas going TO this layer
deltaDiffID, matchingDeltas := ic.getMatchingDeltaLayers(ctx, srcIndex, deltaLayers)
// Get best possible FROM delta
deltaStream, deltaDataSource, matchingDelta, err := ic.resolveDeltaLayer(ctx, matchingDeltas)
if err != nil {
return types.BlobInfo{}, "", err
}
if deltaStream != nil {
bar := ic.c.createProgressBar(pool, matchingDelta, "delta", "done")

wrappedDeltaStream := bar.ProxyReader(deltaStream)

// Convert deltaStream to uncompressed tar layer stream
pr, pw := io.Pipe()
go func() {
if err := tar_patch.Apply(deltaStream, deltaDataSource, pw); err != nil {
// We will notice this error when failing to verify the digest, so leave it be
logrus.Infof("Failed to apply layer delta: %v", err)
}
deltaDataSource.Close()
deltaStream.Close()
wrappedDeltaStream.Close()
pw.Close()
}()
defer pr.Close()

// Copy uncompressed tar layer to destination, verifying the diffID
blobInfo, err := ic.c.copyBlobFromStream(ctx, pr, types.BlobInfo{Digest: deltaDiffID, Size: -1, MediaType: deltaResultMediaType, Annotations: srcInfo.Annotations}, nil, ic.canModifyManifest, false, toEncrypt, nil)
if err != nil {
return types.BlobInfo{}, "", err
}

bar.SetTotal(matchingDelta.Size, true)

// We verified this when streaming the applied delta above
diffID := deltaDiffID

// Record the fact that this blob is uncompressed
ic.c.blobInfoCache.RecordDigestUncompressedPair(diffID, diffID)

return blobInfo, diffID, err
}
}

// Fallback: copy the layer, computing the diffID if we need to do so
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
Expand Down Expand Up @@ -1210,7 +1337,9 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
isCompressed := decompressor != nil
destStream = bar.ProxyReader(destStream)
if bar != nil {
destStream = bar.ProxyReader(destStream)
}

// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
Expand All @@ -1229,6 +1358,11 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
logrus.Debugf("Using original blob without modification for encrypted blob")
compressionOperation = types.PreserveOriginal
inputInfo = srcInfo
} else if canModifyBlob && manifest.IsNoCompressType(srcInfo.MediaType) {
// This is a blob we should not repack, such as a delta
logrus.Debugf("Using original blob without modification for no-compress type")
compressionOperation = types.PreserveOriginal
inputInfo = srcInfo
} else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Compress && !isCompressed {
logrus.Debugf("Compressing blob on the fly")
compressionOperation = types.Compress
Expand Down
10 changes: 10 additions & 0 deletions copy/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -77,6 +78,15 @@ func (f fakeImageSource) SupportsEncryption(ctx context.Context) bool {
func (f fakeImageSource) Size() (int64, error) {
panic("Unexpected call to a mock function")
}
func (f fakeImageSource) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
panic("Unexpected call to a mock function")
}
func (f fakeImageSource) GetDeltaManifestDestination(ctx context.Context, instanceDigest *digest.Digest) (types.ImageDestination, error) {
panic("Unexpected call to a mock function")
}
func (f fakeImageSource) DeltaLayers(ctx context.Context) ([]types.BlobInfo, error) {
panic("Unexpected call to a mock function")
}

func TestDetermineManifestConversion(t *testing.T) {
supportS1S2OCI := []string{
Expand Down
5 changes: 5 additions & 0 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"runtime"

"github.com/alexlarsson/tar-diff/pkg/tar-patch"
"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
Expand Down Expand Up @@ -245,6 +246,10 @@ func (d *dirImageDestination) Commit(context.Context, types.UnparsedImage) error
return nil
}

func (d *dirImageDestination) GetLayerDeltaData(ctx context.Context, diffID digest.Digest) (tar_patch.DataSource, error) {
return nil, nil
}

// returns true if path exists
func pathExists(path string) (bool, error) {
_, err := os.Stat(path)
Expand Down
8 changes: 8 additions & 0 deletions directory/directory_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ func (s *dirImageSource) GetManifest(ctx context.Context, instanceDigest *digest
return m, manifest.GuessMIMEType(m), err
}

func (s *dirImageSource) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
return nil, "", nil
}

func (s *dirImageSource) GetDeltaManifestDestination(ctx context.Context, instanceDigest *digest.Digest) (types.ImageDestination, error) {
return nil, nil
}

// HasThreadSafeGetBlob indicates whether GetBlob can be executed concurrently.
func (s *dirImageSource) HasThreadSafeGetBlob() bool {
return false
Expand Down
6 changes: 6 additions & 0 deletions docker/archive/dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"io"
"os"

"github.com/alexlarsson/tar-diff/pkg/tar-patch"
"github.com/containers/image/v5/docker/tarfile"
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -70,3 +72,7 @@ func (d *archiveImageDestination) Close() error {
func (d *archiveImageDestination) Commit(ctx context.Context, unparsedToplevel types.UnparsedImage) error {
return d.Destination.Commit(ctx)
}

func (d *archiveImageDestination) GetLayerDeltaData(ctx context.Context, diffID digest.Digest) (tar_patch.DataSource, error) {
return nil, nil
}
6 changes: 6 additions & 0 deletions docker/daemon/daemon_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"io"

"github.com/alexlarsson/tar-diff/pkg/tar-patch"
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/docker/tarfile"
"github.com/containers/image/v5/types"
"github.com/docker/docker/client"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -146,3 +148,7 @@ func (d *daemonImageDestination) Commit(ctx context.Context, unparsedToplevel ty
return err
}
}

func (d *daemonImageDestination) GetLayerDeltaData(ctx context.Context, diffID digest.Digest) (tar_patch.DataSource, error) {
return nil, nil
}
5 changes: 5 additions & 0 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path/filepath"
"strings"

"github.com/alexlarsson/tar-diff/pkg/tar-patch"
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/internal/iolimits"
"github.com/containers/image/v5/manifest"
Expand Down Expand Up @@ -642,3 +643,7 @@ sigExists:
func (d *dockerImageDestination) Commit(context.Context, types.UnparsedImage) error {
return nil
}

func (d *dockerImageDestination) GetLayerDeltaData(ctx context.Context, diffID digest.Digest) (tar_patch.DataSource, error) {
return nil, nil
}
37 changes: 37 additions & 0 deletions docker/docker_image_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,43 @@ func (s *dockerImageSource) fetchManifest(ctx context.Context, tagOrDigest strin
return manblob, simplifyContentType(res.Header.Get("Content-Type")), nil
}

func (s *dockerImageSource) getDeltaManifestTagName(ctx context.Context, instanceDigest *digest.Digest) (string, error) {
digest, err := s.manifestDigest(ctx, instanceDigest)
if err != nil {
return "", err
}

return "delta-" + digest.Encoded()[:12], nil
}

func (s *dockerImageSource) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
tagname, err := s.getDeltaManifestTagName(ctx, instanceDigest)
if err != nil {
return nil, "", err
}
// Don't return error if the manifest doesn't exist, only for internal errors
// Deltas are an optional optimization anyway
mb, mt, _ := s.fetchManifest(ctx, tagname)
return mb, mt, nil
}

func (s *dockerImageSource) GetDeltaManifestDestination(ctx context.Context, instanceDigest *digest.Digest) (types.ImageDestination, error) {
tagname, err := s.getDeltaManifestTagName(ctx, instanceDigest)
if err != nil {
return nil, err
}
deltaRef, err := reference.WithTag(s.ref.ref, tagname)
if err != nil {
return nil, err
}

dr, err := newReference(deltaRef)
if err != nil {
return nil, err
}
return newImageDestination(s.c.sys, dr)
}

// ensureManifestIsLoaded sets s.cachedManifest and s.cachedManifestMIMEType
//
// ImageSource implementations are not required or expected to do any caching,
Expand Down
8 changes: 8 additions & 0 deletions docker/tarfile/src.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,14 @@ func (s *Source) GetManifest(ctx context.Context, instanceDigest *digest.Digest)
return s.generatedManifest, manifest.DockerV2Schema2MediaType, nil
}

func (s *Source) GetDeltaManifest(ctx context.Context, instanceDigest *digest.Digest) ([]byte, string, error) {
return nil, "", nil
}

func (s *Source) GetDeltaManifestDestination(ctx context.Context, instanceDigest *digest.Digest) (types.ImageDestination, error) {
return nil, nil
}

// uncompressedReadCloser is an io.ReadCloser that closes both the uncompressed stream and the underlying input.
type uncompressedReadCloser struct {
io.Reader
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/14rcole/gopopulate v0.0.0-20180821133914-b175b219e774 // indirect
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/BurntSushi/toml v0.3.1
github.com/alexlarsson/tar-diff v0.0.0-20200420105158-0c605fa029cc
github.com/containers/libtrust v0.0.0-20190913040956-14b96171aa3b
github.com/containers/ocicrypt v1.0.2
github.com/containers/storage v1.19.0
Expand Down
Loading

0 comments on commit faabfc6

Please sign in to comment.