Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ociindex utility package refactor #3409

Merged
merged 2 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 99 additions & 52 deletions client/ociindex/ociindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,76 +4,94 @@ import (
"encoding/json"
"io"
"os"
"path"

"github.com/gofrs/flock"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

const (
// IndexJSONLockFileSuffix is the suffix of the lock file
IndexJSONLockFileSuffix = ".lock"
// indexFile is the name of the index file
indexFile = "index.json"

// lockFileSuffix is the suffix of the lock file
lockFileSuffix = ".lock"
)

// PutDescToIndex puts desc to index with tag.
// Existing manifests with the same tag will be removed from the index.
func PutDescToIndex(index *ocispecs.Index, desc ocispecs.Descriptor, tag string) error {
if index == nil {
index = &ocispecs.Index{}
type StoreIndex struct {
indexPath string
lockPath string
}

func NewStoreIndex(storePath string) StoreIndex {
indexPath := path.Join(storePath, indexFile)
return StoreIndex{
indexPath: indexPath,
lockPath: indexPath + lockFileSuffix,
}
if index.SchemaVersion == 0 {
index.SchemaVersion = 2
}

func (s StoreIndex) Read() (*ocispecs.Index, error) {
lock := flock.New(s.lockPath)
locked, err := lock.TryRLock()
if err != nil {
return nil, errors.Wrapf(err, "could not lock %s", s.lockPath)
}
if tag != "" {
if desc.Annotations == nil {
desc.Annotations = make(map[string]string)
}
desc.Annotations[ocispecs.AnnotationRefName] = tag
// remove existing manifests with the same tag
var manifests []ocispecs.Descriptor
for _, m := range index.Manifests {
if m.Annotations[ocispecs.AnnotationRefName] != tag {
manifests = append(manifests, m)
}
}
index.Manifests = manifests
if !locked {
return nil, errors.Errorf("could not lock %s", s.lockPath)
}
index.Manifests = append(index.Manifests, desc)
return nil
defer func() {
lock.Unlock()
os.RemoveAll(s.lockPath)
}()

b, err := os.ReadFile(s.indexPath)
if err != nil {
return nil, errors.Wrapf(err, "could not read %s", s.indexPath)
}
var idx ocispecs.Index
if err := json.Unmarshal(b, &idx); err != nil {
return nil, errors.Wrapf(err, "could not unmarshal %s (%q)", s.indexPath, string(b))
}
return &idx, nil
}

func PutDescToIndexJSONFileLocked(indexJSONPath string, desc ocispecs.Descriptor, tag string) error {
lockPath := indexJSONPath + IndexJSONLockFileSuffix
lock := flock.New(lockPath)
func (s StoreIndex) Put(tag string, desc ocispecs.Descriptor) error {
lock := flock.New(s.lockPath)
locked, err := lock.TryLock()
if err != nil {
return errors.Wrapf(err, "could not lock %s", lockPath)
return errors.Wrapf(err, "could not lock %s", s.lockPath)
}
if !locked {
return errors.Errorf("could not lock %s", lockPath)
return errors.Errorf("could not lock %s", s.lockPath)
}
defer func() {
lock.Unlock()
os.RemoveAll(lockPath)
os.RemoveAll(s.lockPath)
}()
f, err := os.OpenFile(indexJSONPath, os.O_RDWR|os.O_CREATE, 0644)

f, err := os.OpenFile(s.indexPath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return errors.Wrapf(err, "could not open %s", indexJSONPath)
return errors.Wrapf(err, "could not open %s", s.indexPath)
}
defer f.Close()

var idx ocispecs.Index
b, err := io.ReadAll(f)
if err != nil {
return errors.Wrapf(err, "could not read %s", indexJSONPath)
return errors.Wrapf(err, "could not read %s", s.indexPath)
}
if len(b) > 0 {
if err := json.Unmarshal(b, &idx); err != nil {
return errors.Wrapf(err, "could not unmarshal %s (%q)", indexJSONPath, string(b))
return errors.Wrapf(err, "could not unmarshal %s (%q)", s.indexPath, string(b))
}
}
if err = PutDescToIndex(&idx, desc, tag); err != nil {

if err = insertDesc(&idx, desc, tag); err != nil {
return err
}

b, err = json.Marshal(idx)
if err != nil {
return err
Expand All @@ -87,27 +105,56 @@ func PutDescToIndexJSONFileLocked(indexJSONPath string, desc ocispecs.Descriptor
return nil
}

func ReadIndexJSONFileLocked(indexJSONPath string) (*ocispecs.Index, error) {
lockPath := indexJSONPath + IndexJSONLockFileSuffix
lock := flock.New(lockPath)
locked, err := lock.TryRLock()
func (s StoreIndex) Get(tag string) (*ocispecs.Descriptor, error) {
idx, err := s.Read()
if err != nil {
return nil, errors.Wrapf(err, "could not lock %s", lockPath)
return nil, err
}
if !locked {
return nil, errors.Errorf("could not lock %s", lockPath)

for _, m := range idx.Manifests {
if t, ok := m.Annotations[ocispecs.AnnotationRefName]; ok && t == tag {
return &m, nil
}
}
defer func() {
lock.Unlock()
os.RemoveAll(lockPath)
}()
b, err := os.ReadFile(indexJSONPath)
return nil, nil
}

func (s StoreIndex) GetSingle() (*ocispecs.Descriptor, error) {
idx, err := s.Read()
if err != nil {
return nil, errors.Wrapf(err, "could not read %s", indexJSONPath)
return nil, err
}
var idx ocispecs.Index
if err := json.Unmarshal(b, &idx); err != nil {
return nil, errors.Wrapf(err, "could not unmarshal %s (%q)", indexJSONPath, string(b))

if len(idx.Manifests) == 1 {
return &idx.Manifests[0], nil
}
return &idx, nil
return nil, nil
}

// insertDesc puts desc to index with tag.
// Existing manifests with the same tag will be removed from the index.
func insertDesc(index *ocispecs.Index, desc ocispecs.Descriptor, tag string) error {
if index == nil {
return nil
}

if index.SchemaVersion == 0 {
index.SchemaVersion = 2
}
if tag != "" {
if desc.Annotations == nil {
desc.Annotations = make(map[string]string)
}
desc.Annotations[ocispecs.AnnotationRefName] = tag
// remove existing manifests with the same tag
var manifests []ocispecs.Descriptor
for _, m := range index.Manifests {
if m.Annotations[ocispecs.AnnotationRefName] != tag {
manifests = append(manifests, m)
}
}
index.Manifests = manifests
}
index.Manifests = append(index.Manifests, desc)
return nil
}
60 changes: 30 additions & 30 deletions client/solve.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
ex = opt.Exports[0]
}

indicesToUpdate := []string{}
storesToUpdate := []string{}

if !opt.SessionPreInitialized {
if len(syncedDirs) > 0 {
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
return nil, err
}
contentStores["export"] = cs
indicesToUpdate = append(indicesToUpdate, filepath.Join(ex.OutputDir, "index.json"))
storesToUpdate = append(storesToUpdate, ex.OutputDir)
default:
s.Allow(filesync.NewFSSyncTargetDir(ex.OutputDir))
}
Expand Down Expand Up @@ -327,8 +327,9 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
if err = json.Unmarshal([]byte(manifestDescJSON), &manifestDesc); err != nil {
return nil, err
}
for indexJSONPath, tag := range cacheOpt.indicesToUpdate {
if err = ociindex.PutDescToIndexJSONFileLocked(indexJSONPath, manifestDesc, tag); err != nil {
for storePath, tag := range cacheOpt.storesToUpdate {
idx := ociindex.NewStoreIndex(storePath)
if err := idx.Put(tag, manifestDesc); err != nil {
return nil, err
}
}
Expand All @@ -342,12 +343,13 @@ func (c *Client) solve(ctx context.Context, def *llb.Definition, runGateway runG
if err = json.Unmarshal([]byte(manifestDescDt), &manifestDesc); err != nil {
return nil, err
}
for _, indexJSONPath := range indicesToUpdate {
for _, storePath := range storesToUpdate {
tag := "latest"
if t, ok := res.ExporterResponse["image.name"]; ok {
tag = t
}
if err = ociindex.PutDescToIndexJSONFileLocked(indexJSONPath, manifestDesc, tag); err != nil {
idx := ociindex.NewStoreIndex(storePath)
if err := idx.Put(tag, manifestDesc); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -406,10 +408,10 @@ func defaultSessionName() string {
}

type cacheOptions struct {
options controlapi.CacheOptions
contentStores map[string]content.Store // key: ID of content store ("local:" + csDir)
indicesToUpdate map[string]string // key: index.JSON file name, value: tag
frontendAttrs map[string]string
options controlapi.CacheOptions
contentStores map[string]content.Store // key: ID of content store ("local:" + csDir)
storesToUpdate map[string]string // key: path to content store, value: tag
frontendAttrs map[string]string
}

func parseCacheOptions(ctx context.Context, isGateway bool, opt SolveOpt) (*cacheOptions, error) {
Expand All @@ -418,7 +420,7 @@ func parseCacheOptions(ctx context.Context, isGateway bool, opt SolveOpt) (*cach
cacheImports []*controlapi.CacheOptionsEntry
)
contentStores := make(map[string]content.Store)
indicesToUpdate := make(map[string]string) // key: index.JSON file name, value: tag
storesToUpdate := make(map[string]string)
frontendAttrs := make(map[string]string)
for _, ex := range opt.CacheExports {
if ex.Type == "local" {
Expand All @@ -440,8 +442,7 @@ func parseCacheOptions(ctx context.Context, isGateway bool, opt SolveOpt) (*cach
tag = t
}
// TODO(AkihiroSuda): support custom index JSON path and tag
indexJSONPath := filepath.Join(csDir, "index.json")
indicesToUpdate[indexJSONPath] = tag
storesToUpdate[csDir] = tag
}
if ex.Type == "registry" {
regRef := ex.Attrs["ref"]
Expand All @@ -465,27 +466,26 @@ func parseCacheOptions(ctx context.Context, isGateway bool, opt SolveOpt) (*cach
bklog.G(ctx).Warning("local cache import at " + csDir + " not found due to err: " + err.Error())
continue
}
// if digest is not specified, load from "latest" tag
// if digest is not specified, attempt to load from tag
if im.Attrs["digest"] == "" {
idx, err := ociindex.ReadIndexJSONFileLocked(filepath.Join(csDir, "index.json"))
tag := "latest"
if t, ok := im.Attrs["tag"]; ok {
tag = t
}

idx := ociindex.NewStoreIndex(csDir)
desc, err := idx.Get(tag)
if err != nil {
bklog.G(ctx).Warning("local cache import at " + csDir + " not found due to err: " + err.Error())
continue
}
for _, m := range idx.Manifests {
tag := "latest"
if t, ok := im.Attrs["tag"]; ok {
tag = t
}
if m.Annotations[ocispecs.AnnotationRefName] == tag {
im.Attrs["digest"] = string(m.Digest)
break
}
}
if im.Attrs["digest"] == "" {
return nil, errors.New("local cache importer requires either explicit digest, \"latest\" tag or custom tag on index.json")
if desc != nil {
im.Attrs["digest"] = desc.Digest.String()
}
}
if im.Attrs["digest"] == "" {
return nil, errors.New("local cache importer requires either explicit digest, \"latest\" tag or custom tag on index.json")
}
contentStores["local:"+csDir] = cs
}
if im.Type == "registry" {
Expand Down Expand Up @@ -513,9 +513,9 @@ func parseCacheOptions(ctx context.Context, isGateway bool, opt SolveOpt) (*cach
Exports: cacheExports,
Imports: cacheImports,
},
contentStores: contentStores,
indicesToUpdate: indicesToUpdate,
frontendAttrs: frontendAttrs,
contentStores: contentStores,
storesToUpdate: storesToUpdate,
frontendAttrs: frontendAttrs,
}
return &res, nil
}