Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly delete invalid layers during image pull #3358

Merged
merged 11 commits into from
Jan 9, 2025
38 changes: 22 additions & 16 deletions src/internal/packager/images/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/zarf-dev/zarf/src/pkg/logger"
"io/fs"
"maps"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/zarf-dev/zarf/src/pkg/logger"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/google/go-containerregistry/pkg/crane"
Expand All @@ -34,8 +36,6 @@ import (
"github.com/google/go-containerregistry/pkg/v1/types"
"github.com/moby/moby/client"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/pkg/layout"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/pkg/transform"
"github.com/zarf-dev/zarf/src/pkg/utils"
Expand Down Expand Up @@ -249,7 +249,7 @@ func Pull(ctx context.Context, cfg PullConfig) (map[transform.Image]v1.Image, er
toPull := maps.Clone(fetched)

err = retry.Do(func() error {
saved, err := SaveConcurrent(ctx, cranePath, toPull)
saved, err := SaveConcurrent(ctx, cranePath, toPull, cfg.CacheDirectory)
// Done save, remove from download list.
for k := range saved {
delete(toPull, k)
Expand All @@ -264,7 +264,7 @@ func Pull(ctx context.Context, cfg PullConfig) (map[transform.Image]v1.Image, er
message.Warnf("Failed to save images in parallel, falling back to sequential save: %s", err.Error())
l.Warn("failed to save images in parallel, falling back to sequential save", "error", err.Error())
err = retry.Do(func() error {
saved, err := SaveSequential(ctx, cranePath, toPull)
saved, err := SaveSequential(ctx, cranePath, toPull, cfg.CacheDirectory)
for k := range saved {
delete(toPull, k)
}
Expand Down Expand Up @@ -313,8 +313,19 @@ func Pull(ctx context.Context, cfg PullConfig) (map[transform.Image]v1.Image, er
return fetched, nil
}

// from https://github.com/google/go-containerregistry/blob/6bce25ecf0297c1aa9072bc665b5cf58d53e1c54/pkg/v1/cache/fs.go#L143
func layerCachePath(path string, h v1.Hash) string {
var file string
if runtime.GOOS == "windows" {
file = fmt.Sprintf("%s-%s", h.Algorithm, h.Hex)
} else {
file = h.String()
}
return filepath.Join(path, file)
}

// CleanupInProgressLayers removes incomplete layers from the cache.
func CleanupInProgressLayers(ctx context.Context, img v1.Image) error {
func CleanupInProgressLayers(ctx context.Context, img v1.Image, cacheDirectory string) error {
layers, err := img.Layers()
if err != nil {
return err
Expand All @@ -331,12 +342,7 @@ func CleanupInProgressLayers(ctx context.Context, img v1.Image) error {
if err != nil {
return err
}
absPath, err := config.GetAbsCachePath()
if err != nil {
return err
}
cacheDir := filepath.Join(absPath, layout.ImagesDir)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue was that we were expecting the contents to be under an "images" folder which does not exist in the crane cache

location := filepath.Join(cacheDir, digest.String())
location := layerCachePath(cacheDirectory, digest)
info, err := os.Stat(location)
if errors.Is(err, fs.ErrNotExist) {
return nil
Expand All @@ -356,7 +362,7 @@ func CleanupInProgressLayers(ctx context.Context, img v1.Image) error {
}

// SaveSequential saves images sequentially.
func SaveSequential(ctx context.Context, cl clayout.Path, m map[transform.Image]v1.Image) (map[transform.Image]v1.Image, error) {
func SaveSequential(ctx context.Context, cl clayout.Path, m map[transform.Image]v1.Image, cacheDirectory string) (map[transform.Image]v1.Image, error) {
l := logger.From(ctx)
saved := map[transform.Image]v1.Image{}
for info, img := range m {
Expand All @@ -370,7 +376,7 @@ func SaveSequential(ctx context.Context, cl clayout.Path, m map[transform.Image]
}
l.Info("saving image", "ref", info.Reference, "size", size, "method", "sequential")
if err := cl.AppendImage(img, clayout.WithAnnotations(annotations)); err != nil {
if err := CleanupInProgressLayers(ctx, img); err != nil {
if err := CleanupInProgressLayers(ctx, img, cacheDirectory); err != nil {
message.WarnErr(err, "failed to clean up in-progress layers, please run `zarf tools clear-cache`")
l.Error("failed to clean up in-progress layers. please run `zarf tools clear-cache`")
}
Expand All @@ -388,7 +394,7 @@ func SaveSequential(ctx context.Context, cl clayout.Path, m map[transform.Image]
}

// SaveConcurrent saves images in a concurrent, bounded manner.
func SaveConcurrent(ctx context.Context, cl clayout.Path, m map[transform.Image]v1.Image) (map[transform.Image]v1.Image, error) {
func SaveConcurrent(ctx context.Context, cl clayout.Path, m map[transform.Image]v1.Image, cacheDirectory string) (map[transform.Image]v1.Image, error) {
l := logger.From(ctx)
saved := map[transform.Image]v1.Image{}

Expand Down Expand Up @@ -416,7 +422,7 @@ func SaveConcurrent(ctx context.Context, cl clayout.Path, m map[transform.Image]
wStart := time.Now()
l.Info("saving image", "ref", info.Reference, "size", size, "method", "concurrent")
if err := cl.WriteImage(img); err != nil {
if err := CleanupInProgressLayers(ectx, img); err != nil {
if err := CleanupInProgressLayers(ectx, img, cacheDirectory); err != nil {
message.WarnErr(err, "failed to clean up in-progress layers, please run `zarf tools clear-cache`")
l.Error("failed to clean up in-progress layers. please run `zarf tools clear-cache`")
}
Expand Down
42 changes: 42 additions & 0 deletions src/internal/packager/images/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package images

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -104,6 +105,7 @@ func TestPull(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ref, err := transform.ParseImageRef(tc.ref)
require.NoError(t, err)
destDir := t.TempDir()
Expand Down Expand Up @@ -133,6 +135,7 @@ func TestPull(t *testing.T) {
}

t.Run("pulling a cosign image is successful and doesn't add anything to the cache", func(t *testing.T) {
t.Parallel()
ref, err := transform.ParseImageRef("ghcr.io/stefanprodan/podinfo:sha256-57a654ace69ec02ba8973093b6a786faa15640575fbf0dbb603db55aca2ccec8.sig")
require.NoError(t, err)
destDir := t.TempDir()
Expand All @@ -153,4 +156,43 @@ func TestPull(t *testing.T) {
require.NoError(t, err)
require.Empty(t, dir)
})

t.Run("pulling an image with an invalid layer in the cache should still pull the image", func(t *testing.T) {
t.Parallel()
ref, err := transform.ParseImageRef("ghcr.io/fluxcd/image-automation-controller@sha256:48a89734dc82c3a2d4138554b3ad4acf93230f770b3a582f7f48be38436d031c")
require.NoError(t, err)
destDir := t.TempDir()
cacheDir := t.TempDir()
invalidContent := []byte("this mimics a corrupted file")
// This is the sha of a layer of the image. Crane will make a file using this sha in the cache
// we intentionally put junk data into the cache with this layer to test that it will get cleaned up.
correctLayerSha := "d94c8059c3cffb9278601bf9f8be070d50c84796401a4c5106eb8a4042445bbc"
hash, err := v1.NewHash(fmt.Sprintf("sha256:%s", correctLayerSha))
require.NoError(t, err)
invalidLayerPath := layerCachePath(cacheDir, hash)
err = os.WriteFile(invalidLayerPath, invalidContent, 0777)
require.NoError(t, err)

pullConfig := PullConfig{
DestinationDirectory: destDir,
CacheDirectory: cacheDir,
ImageList: []transform.Image{
ref,
},
}
_, err = Pull(context.Background(), pullConfig)
require.NoError(t, err)

// Verify the cache layer has the correct sha
nowValidCachedLayer, err := os.ReadFile(invalidLayerPath)
cachedLayerSha := sha256.Sum256(nowValidCachedLayer)
require.Equal(t, correctLayerSha, fmt.Sprintf("%x", cachedLayerSha))
require.NoError(t, err)
// Verify the pulled layer hsa the correct sha
pulledLayerPath := filepath.Join(destDir, "blobs", "sha256", hash.Hex)
pulledLayer, err := os.ReadFile(pulledLayerPath)
require.NoError(t, err)
pulledLayerSha := sha256.Sum256(pulledLayer)
require.Equal(t, correctLayerSha, fmt.Sprintf("%x", pulledLayerSha))
})
}
Loading