Skip to content

Commit

Permalink
Implement caching for module's OCI layers
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Prodan <[email protected]>
  • Loading branch information
stefanprodan committed Nov 1, 2023
1 parent 66dc2b6 commit 40ce311
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 22 deletions.
26 changes: 14 additions & 12 deletions internal/engine/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,23 @@ import (

// Fetcher downloads a module and extracts it locally.
type Fetcher struct {
ctx context.Context
src string
dst string
version string
creds string
ctx context.Context
src string
dst string
cacheDir string
version string
creds string
}

// NewFetcher creates a Fetcher for the given module.
func NewFetcher(ctx context.Context, src, version, dst, creds string) *Fetcher {
func NewFetcher(ctx context.Context, src, version, dst, cacheDir, creds string) *Fetcher {
return &Fetcher{
ctx: ctx,
src: src,
dst: dst,
version: version,
creds: creds,
ctx: ctx,
src: src,
dst: dst,
version: version,
cacheDir: cacheDir,
creds: creds,
}
}

Expand Down Expand Up @@ -103,5 +105,5 @@ func (f *Fetcher) fetchRemoteModule(dstDir string) (*apiv1.ModuleReference, erro
}

opts := oci.Options(f.ctx, f.creds)
return oci.PullModule(ociURL, dstDir, opts)
return oci.PullModule(ociURL, dstDir, f.cacheDir, opts)
}
7 changes: 6 additions & 1 deletion internal/oci/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package oci
import (
"context"
"fmt"
"os"
"path/filepath"
"testing"

Expand Down Expand Up @@ -90,7 +91,8 @@ func TestModuleOperations(t *testing.T) {
}

dstPath := filepath.Join(tmpDir, "artifact")
modRef, err := PullModule(digestURL, dstPath, opts)
cacheDir := t.TempDir()
modRef, err := PullModule(digestURL, dstPath, cacheDir, opts)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(modRef.Version).To(BeEquivalentTo(imgVersion))
g.Expect(filepath.Join(dstPath, "timoni.ignore")).ToNot(BeAnExistingFile())
Expand All @@ -106,4 +108,7 @@ func TestModuleOperations(t *testing.T) {
} {
g.Expect(filepath.Join(dstPath, entry)).To(Or(BeAnExistingFile(), BeADirectory()))
}
cachedLayers, err := os.ReadDir(cacheDir)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(len(cachedLayers)).To(BeEquivalentTo(2))
}
69 changes: 60 additions & 9 deletions internal/oci/pull_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package oci
import (
"bytes"
"fmt"
"io"
"os"
"path"

"github.com/fluxcd/pkg/tar"
"github.com/google/go-containerregistry/pkg/crane"
Expand All @@ -31,9 +34,10 @@ import (
// - determines the artifact digest corresponding to the module version
// - fetches the manifest of the remote artifact
// - verifies that artifact config matches Timoni's media type
// - download all the compressed layer matching Timoni's media type
// - downloads all the compressed layer matching Timoni's media type (if not cached)
// - stores the compressed layers in the local cache (if caching is enabled)
// - extracts the module contents to the destination directory
func PullModule(ociURL, dstPath string, opts []crane.Option) (*apiv1.ModuleReference, error) {
func PullModule(ociURL, dstPath, cacheDir string, opts []crane.Option) (*apiv1.ModuleReference, error) {
ref, err := parseArtifactRef(ociURL)
if err != nil {
return nil, err
Expand All @@ -43,7 +47,7 @@ func PullModule(ociURL, dstPath string, opts []crane.Option) (*apiv1.ModuleRefer

digest, err := crane.Digest(ref.String(), opts...)
if err != nil {
return nil, fmt.Errorf("resolving the digest for '%s' failed: %w", ociURL, err)
return nil, fmt.Errorf("resolving digest of '%s' failed: %w", ociURL, err)
}

manifestJSON, err := crane.Manifest(ref.String(), opts...)
Expand Down Expand Up @@ -76,25 +80,72 @@ func PullModule(ociURL, dstPath string, opts []crane.Option) (*apiv1.ModuleRefer
Digest: digest,
}

// If caching is disable, download the compressed layers to an ephemeral tmp dir.
if cacheDir == "" {
tmpDir, err := os.MkdirTemp("", apiv1.FieldManager)
if err != nil {
return nil, err
}
defer os.RemoveAll(tmpDir)
cacheDir = tmpDir
}

var foundLayer bool
for _, layer := range manifest.Layers {
if layer.MediaType == apiv1.ContentMediaType {
foundLayer = true
layerDigest := layer.Digest.String()
blobURL := fmt.Sprintf("%s@%s", repoURL, layerDigest)
layer, err := crane.PullLayer(blobURL, opts...)
if err != nil {
return nil, fmt.Errorf("pulling layer %s failed: %w", layerDigest, err)

isCached := false
cachedLayer := path.Join(cacheDir, fmt.Sprintf("%s.tgz", layer.Digest.Hex))
if _, err := os.Stat(cachedLayer); err == nil {
isCached = true
}

blob, err := layer.Compressed()
// Pull the compressed layer from the registry and persist the gzip tarball
// in the cache at '<cache-dir>/<layer-digest-hex>.tgz'.
if !isCached {
layer, err := crane.PullLayer(blobURL, opts...)
if err != nil {
return nil, fmt.Errorf("pulling layer %s failed: %w", layerDigest, err)
}

remote, err := layer.Compressed()
if err != nil {
return nil, fmt.Errorf("pulling layer %s failed: %w", layerDigest, err)
}

local, err := os.Create(cachedLayer)
if err != nil {
return nil, fmt.Errorf("writing layer to storage failed: %w", err)
}

if _, err := io.Copy(local, remote); err != nil {
return nil, fmt.Errorf("writing layer to storage failed: %w", err)
}

if err := local.Close(); err != nil {
return nil, fmt.Errorf("writing layer to storage failed: %w", err)
}
}

reader, err := os.Open(cachedLayer)
if err != nil {
return nil, fmt.Errorf("extracting layer %s failed: %w", layerDigest, err)
return nil, fmt.Errorf("reading layer from storage failed: %w", err)
}

if err = tar.Untar(blob, dstPath, tar.WithMaxUntarSize(-1)); err != nil {
// Extract the contents from the gzip tarball stored in cache.
// If extraction fails, the gzip tarball is removed from cache.
if err = tar.Untar(reader, dstPath, tar.WithMaxUntarSize(-1)); err != nil {
_ = reader.Close()
_ = os.Remove(cachedLayer)
return nil, fmt.Errorf("extracting layer %s failed: %w", layerDigest, err)
}

if err := reader.Close(); err != nil {
return nil, fmt.Errorf("reading layer from storage failed: %w", err)
}
}
}

Expand Down

0 comments on commit 40ce311

Please sign in to comment.