diff --git a/cache/instructioncache/cache.go b/cache/instructioncache/cache.go new file mode 100644 index 000000000000..f1180b6caf9a --- /dev/null +++ b/cache/instructioncache/cache.go @@ -0,0 +1,112 @@ +package instructioncache + +import ( + "github.com/boltdb/bolt" + "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/cache/metadata" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +const cacheKey = "buildkit.instructioncache" + +type cacheGroup struct { + Snapshots []string `json:"snapshots"` +} + +type LocalStore struct { + MetadataStore *metadata.Store + Cache cache.Accessor +} + +func (ls *LocalStore) Set(key string, refsAny []interface{}) error { + refs, err := toReferenceArray(refsAny) + if err != nil { + return err + } + cg := cacheGroup{} + for _, r := range refs { + cg.Snapshots = append(cg.Snapshots, r.ID()) + } + v, err := metadata.NewValue(cg) + if err != nil { + return err + } + v.Index = index(key) + for _, r := range refs { + si, _ := ls.MetadataStore.Get(r.ID()) + if err := si.Update(func(b *bolt.Bucket) error { // TODO: should share transaction + return si.SetValue(b, index(key), *v) + }); err != nil { + return err + } + } + return nil +} + +func (ls *LocalStore) Lookup(ctx context.Context, key string) ([]interface{}, error) { + snaps, err := ls.MetadataStore.Search(index(key)) + if err != nil { + return nil, err + } + refs := make([]cache.ImmutableRef, 0) + var retErr error +loop0: + for _, s := range snaps { + retErr = nil + for _, r := range refs { + r.Release(context.TODO()) + } + refs = nil + + v := s.Get(index(key)) + if v != nil { + var cg cacheGroup + if err = v.Unmarshal(&cg); err != nil { + retErr = err + continue + } + for _, id := range cg.Snapshots { + r, err := ls.Cache.Get(ctx, id) + if err != nil { + retErr = err + continue loop0 + } + refs = append(refs, r) + } + retErr = nil + break + } + } + if retErr != nil { + for _, r := range refs { + r.Release(context.TODO()) + } + refs = nil + } + return toAny(refs), retErr +} + +func index(k string) string { + return cacheKey + "::" + k +} + +func toReferenceArray(in []interface{}) ([]cache.ImmutableRef, error) { + out := make([]cache.ImmutableRef, 0, len(in)) + for _, i := range in { + r, ok := i.(cache.ImmutableRef) + if !ok { + return nil, errors.Errorf("invalid reference") + } + out = append(out, r) + } + return out, nil +} + +func toAny(in []cache.ImmutableRef) []interface{} { + out := make([]interface{}, 0, len(in)) + for _, i := range in { + out = append(out, i) + } + return out +} diff --git a/cache/metadata/metadata.go b/cache/metadata/metadata.go index 7be93477cefd..5035b7b22c8d 100644 --- a/cache/metadata/metadata.go +++ b/cache/metadata/metadata.go @@ -210,6 +210,14 @@ func (s *StorageItem) Update(fn func(b *bolt.Bucket) error) error { return s.storage.Update(s.id, fn) } +func (s *StorageItem) Keys() []string { + keys := make([]string, 0, len(s.values)) + for k := range s.values { + keys = append(keys, k) + } + return keys +} + func (s *StorageItem) Get(k string) *Value { return s.values[k] } diff --git a/client/solve.go b/client/solve.go index c901a14258d8..ddfb82fe0795 100644 --- a/client/solve.go +++ b/client/solve.go @@ -69,6 +69,7 @@ func (c *Client) Solve(ctx context.Context, r io.Reader, statusChan chan *SolveS Started: v.Started, Completed: v.Completed, Error: v.Error, + Cached: v.Cached, }) } for _, v := range resp.Statuses { diff --git a/control/control.go b/control/control.go index dcde822dfefd..5832f20ecfe2 100644 --- a/control/control.go +++ b/control/control.go @@ -15,10 +15,11 @@ import ( ) type Opt struct { - Snapshotter snapshot.Snapshotter - CacheManager cache.Manager - Worker worker.Worker - SourceManager *source.Manager + Snapshotter snapshot.Snapshotter + CacheManager cache.Manager + Worker worker.Worker + SourceManager *source.Manager + InstructionCache solver.InstructionCache } type Controller struct { // TODO: ControlService @@ -30,9 +31,10 @@ func NewController(opt Opt) (*Controller, error) { c := &Controller{ opt: opt, solver: solver.NewLLBSolver(solver.LLBOpt{ - SourceManager: opt.SourceManager, - CacheManager: opt.CacheManager, - Worker: opt.Worker, + SourceManager: opt.SourceManager, + CacheManager: opt.CacheManager, + Worker: opt.Worker, + InstructionCache: opt.InstructionCache, }), } return c, nil @@ -98,6 +100,7 @@ func (c *Controller) Status(req *controlapi.StatusRequest, stream controlapi.Con Started: v.Started, Completed: v.Completed, Error: v.Error, + Cached: v.Cached, }) } for _, v := range ss.Statuses { diff --git a/control/control_default.go b/control/control_default.go index b9dd0ff84f03..7aa8d28e2c33 100644 --- a/control/control_default.go +++ b/control/control_default.go @@ -9,6 +9,7 @@ import ( "github.com/containerd/containerd/rootfs" ctdsnapshot "github.com/containerd/containerd/snapshot" "github.com/moby/buildkit/cache" + "github.com/moby/buildkit/cache/instructioncache" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/snapshot/blobmapping" "github.com/moby/buildkit/source" @@ -44,6 +45,11 @@ func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) { return nil, err } + ic := &instructioncache.LocalStore{ + MetadataStore: md, + Cache: cm, + } + sm, err := source.NewManager() if err != nil { return nil, err @@ -62,8 +68,9 @@ func defaultControllerOpts(root string, pd pullDeps) (*Opt, error) { sm.Register(is) return &Opt{ - Snapshotter: snapshotter, - CacheManager: cm, - SourceManager: sm, + Snapshotter: snapshotter, + CacheManager: cm, + SourceManager: sm, + InstructionCache: ic, }, nil } diff --git a/control/control_standalone_test.go b/control/control_standalone_test.go index 2989200adbd9..c68466b2a62e 100644 --- a/control/control_standalone_test.go +++ b/control/control_standalone_test.go @@ -65,7 +65,10 @@ func TestControl(t *testing.T) { img, err := source.NewImageIdentifier("docker.io/library/busybox:latest") assert.NoError(t, err) - snap, err := sm.Pull(ctx, img) + src, err := sm.Resolve(ctx, img) + assert.NoError(t, err) + + snap, err := src.Snapshot(ctx) assert.NoError(t, err) mounts, err := snap.Mount(ctx) diff --git a/solver/exec.go b/solver/exec.go index d45bf43b2244..eee0915906ad 100644 --- a/solver/exec.go +++ b/solver/exec.go @@ -1,6 +1,7 @@ package solver import ( + "encoding/json" "io" "os" @@ -10,24 +11,39 @@ import ( "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/util/progress" "github.com/moby/buildkit/worker" + digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "golang.org/x/net/context" ) type execOp struct { - op *pb.Op_Exec + op *pb.ExecOp cm cache.Manager w worker.Worker } func newExecOp(op *pb.Op_Exec, cm cache.Manager, w worker.Worker) (Op, error) { return &execOp{ - op: op, + op: op.Exec, cm: cm, w: w, }, nil } +func (e *execOp) CacheKey(ctx context.Context, inputs []string) (string, error) { + dt, err := json.Marshal(struct { + Inputs []string + Exec *pb.ExecOp + }{ + Inputs: inputs, + Exec: e.op, + }) + if err != nil { + return "", err + } + return digest.FromBytes(dt).String(), nil +} + func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, error) { mounts := make(map[string]cache.Mountable) @@ -44,7 +60,7 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro } }() - for _, m := range e.op.Exec.Mounts { + for _, m := range e.op.Mounts { var mountable cache.Mountable if int(m.Input) > len(inputs) { return nil, errors.Errorf("missing input %d", m.Input) @@ -72,9 +88,9 @@ func (e *execOp) Run(ctx context.Context, inputs []Reference) ([]Reference, erro } meta := worker.Meta{ - Args: e.op.Exec.Meta.Args, - Env: e.op.Exec.Meta.Env, - Cwd: e.op.Exec.Meta.Cwd, + Args: e.op.Meta.Args, + Env: e.op.Meta.Env, + Cwd: e.op.Meta.Cwd, } stdout := newStreamWriter(ctx, 1) diff --git a/solver/refcache.go b/solver/refcache.go deleted file mode 100644 index 50411f6c858f..000000000000 --- a/solver/refcache.go +++ /dev/null @@ -1,159 +0,0 @@ -package solver - -import ( - "sync" - - "github.com/moby/buildkit/util/flightcontrol" - "github.com/moby/buildkit/util/progress" - digest "github.com/opencontainers/go-digest" - "github.com/pkg/errors" - "golang.org/x/net/context" -) - -// refCache holds the references to snapshots what are currently activve -// and allows sharing them between jobs - -type refCache struct { - mu sync.Mutex - cache map[digest.Digest]*cachedReq - flightcontrol.Group -} - -type cachedReq struct { - jobs map[*job]struct{} - value []*sharedRef - progressCtx context.Context -} - -func (c *refCache) probe(j *job, key digest.Digest) bool { - c.mu.Lock() - if c.cache == nil { - c.cache = make(map[digest.Digest]*cachedReq) - } - cr, ok := c.cache[key] - if !ok { - cr = &cachedReq{jobs: make(map[*job]struct{})} - c.cache[key] = cr - } - cr.jobs[j] = struct{}{} - if ok && cr.value != nil { - c.mu.Unlock() - return true - } - c.mu.Unlock() - return false -} -func (c *refCache) get(key digest.Digest) ([]Reference, error) { - c.mu.Lock() - defer c.mu.Unlock() - v, ok := c.cache[key] - // these errors should not be reached - if !ok { - return nil, errors.Errorf("no ref cache found") - } - if v.value == nil { - return nil, errors.Errorf("no ref cache value set") - } - refs := make([]Reference, 0, len(v.value)) - for _, r := range v.value { - refs = append(refs, r.Clone()) - } - return refs, nil -} -func (c *refCache) set(ctx context.Context, key digest.Digest, refs []Reference) { - c.mu.Lock() - sharedRefs := make([]*sharedRef, 0, len(refs)) - for _, r := range refs { - sharedRefs = append(sharedRefs, newSharedRef(r)) - } - c.cache[key].value = sharedRefs - c.cache[key].progressCtx = ctx - c.mu.Unlock() -} -func (c *refCache) cancel(j *job) { - c.mu.Lock() - for k, r := range c.cache { - if _, ok := r.jobs[j]; ok { - delete(r.jobs, j) - } - if len(r.jobs) == 0 { - for _, r := range r.value { - go r.Release(context.TODO()) - } - delete(c.cache, k) - } - } - c.mu.Unlock() -} - -func (c *refCache) writeProgressSnapshot(ctx context.Context, key digest.Digest) error { - pw, ok, _ := progress.FromContext(ctx) - if ok { - c.mu.Lock() - v, ok := c.cache[key] - if !ok { - c.mu.Unlock() - return errors.Errorf("no ref cache found") - } - pctx := v.progressCtx - c.mu.Unlock() - if pctx != nil { - return flightcontrol.WriteProgress(pctx, pw) - } - } - return nil -} - -// sharedRef is a wrapper around releasable that allows you to make new -// releasable child objects -type sharedRef struct { - mu sync.Mutex - refs map[*sharedRefInstance]struct{} - main Reference - Reference -} - -func newSharedRef(main Reference) *sharedRef { - mr := &sharedRef{ - refs: make(map[*sharedRefInstance]struct{}), - Reference: main, - } - mr.main = mr.Clone() - return mr -} - -func (mr *sharedRef) Clone() Reference { - mr.mu.Lock() - r := &sharedRefInstance{sharedRef: mr} - mr.refs[r] = struct{}{} - mr.mu.Unlock() - return r -} - -func (mr *sharedRef) Release(ctx context.Context) error { - return mr.main.Release(ctx) -} - -func (mr *sharedRef) Sys() Reference { - sys := mr.Reference - if s, ok := sys.(interface { - Sys() Reference - }); ok { - return s.Sys() - } - return sys -} - -type sharedRefInstance struct { - *sharedRef -} - -func (r *sharedRefInstance) Release(ctx context.Context) error { - r.sharedRef.mu.Lock() - defer r.sharedRef.mu.Unlock() - delete(r.sharedRef.refs, r) - if len(r.sharedRef.refs) == 0 { - return r.sharedRef.Reference.Release(ctx) - } - return nil -} diff --git a/solver/solver.go b/solver/solver.go index ebfb774bb17a..f2bddc1ac9d5 100644 --- a/solver/solver.go +++ b/solver/solver.go @@ -1,6 +1,7 @@ package solver import ( + "github.com/Sirupsen/logrus" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/client" "github.com/moby/buildkit/solver/pb" @@ -13,9 +14,10 @@ import ( ) type LLBOpt struct { - SourceManager *source.Manager - CacheManager cache.Manager // TODO: this shouldn't be needed before instruction cache - Worker worker.Worker + SourceManager *source.Manager + CacheManager cache.Manager // TODO: this shouldn't be needed before instruction cache + Worker worker.Worker + InstructionCache InstructionCache } func NewLLBSolver(opt LLBOpt) *Solver { @@ -28,7 +30,7 @@ func NewLLBSolver(opt LLBOpt) *Solver { default: return nil, errors.Errorf("invalid op type %T", op) } - }) + }, opt.InstructionCache) } // ResolveOpFunc finds an Op implementation for a vertex @@ -41,22 +43,24 @@ type Reference interface { // Op is an implementation for running a vertex type Op interface { - // CacheKeys(context.Context, [][]string) ([]string, error) + CacheKey(context.Context, []string) (string, error) Run(ctx context.Context, inputs []Reference) (outputs []Reference, err error) } -// type Cache interface { -// Lookup(context.Context, string) ([]Reference, error) -// } +type InstructionCache interface { + Lookup(ctx context.Context, key string) ([]interface{}, error) // TODO: regular ref + Set(key string, refs []interface{}) error +} type Solver struct { - resolve ResolveOpFunc - jobs *jobList - active refCache + resolve ResolveOpFunc + jobs *jobList + activeState activeState + cache InstructionCache } -func New(resolve ResolveOpFunc) *Solver { - return &Solver{resolve: resolve, jobs: newJobList()} +func New(resolve ResolveOpFunc, cache InstructionCache) *Solver { + return &Solver{resolve: resolve, jobs: newJobList(), cache: cache} } func (s *Solver) Solve(ctx context.Context, id string, v Vertex) error { @@ -78,7 +82,7 @@ func (s *Solver) Solve(ctx context.Context, id string, v Vertex) error { refs, err := s.getRefs(ctx, j, j.g) closeProgressWriter() - s.active.cancel(j) + s.activeState.cancel(j) if err != nil { return err } @@ -99,9 +103,77 @@ func (s *Solver) Status(ctx context.Context, id string, statusChan chan *client. return j.pipe(ctx, statusChan) } +func (s *Solver) getCacheKey(ctx context.Context, j *job, g *vertex) (cacheKey string, retErr error) { + state, err := s.activeState.vertexState(j, g.digest, func() (Op, error) { + return s.resolve(g) + }) + if err != nil { + return "", err + } + + inputs := make([]string, len(g.inputs)) + if len(g.inputs) > 0 { + eg, ctx := errgroup.WithContext(ctx) + for i, in := range g.inputs { + func(i int, in *vertex) { + eg.Go(func() error { + k, err := s.getCacheKey(ctx, j, in) + if err != nil { + return err + } + inputs[i] = k + return nil + }) + }(i, in.vertex) + } + if err := eg.Wait(); err != nil { + return "", err + } + } + + pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", g.Digest())) + defer pw.Close() + + if len(g.inputs) == 0 { + g.notifyStarted(ctx) + defer func() { + g.notifyCompleted(ctx, false, retErr) + }() + } + + return state.GetCacheKey(ctx, func(ctx context.Context, op Op) (string, error) { + return op.CacheKey(ctx, inputs) + }) +} + func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Reference, retErr error) { + state, err := s.activeState.vertexState(j, g.digest, func() (Op, error) { + return s.resolve(g) + }) + if err != nil { + return nil, err + } - s.active.probe(j, g.digest) // this registers the key with the job + var cacheKey string + if s.cache != nil { + var err error + cacheKey, err = s.getCacheKey(ctx, j, g) + if err != nil { + return nil, err + } + cacheRefsAny, err := s.cache.Lookup(ctx, cacheKey) + if err != nil { + return nil, err + } + if len(cacheRefsAny) > 0 { + cacheRefs, err := toReferenceArray(cacheRefsAny) + if err != nil { + return nil, err + } + g.recursiveMarkCached(ctx) + return cacheRefs, nil + } + } // refs contains all outputs for all input vertexes refs := make([][]*sharedRef, len(g.inputs)) @@ -156,29 +228,39 @@ func (s *Solver) getRefs(ctx context.Context, j *job, g *vertex) (retRef []Refer g.notifyStarted(ctx) defer func() { - g.notifyCompleted(ctx, retErr) + g.notifyCompleted(ctx, false, retErr) }() - _, err := s.active.Do(ctx, g.digest.String(), func(doctx context.Context) (interface{}, error) { - if hit := s.active.probe(j, g.digest); hit { - if err := s.active.writeProgressSnapshot(ctx, g.digest); err != nil { - return nil, err - } - return nil, nil - } - op, err := s.resolve(g) + return state.GetRefs(ctx, func(ctx context.Context, op Op) ([]Reference, error) { + refs, err := op.Run(ctx, inputRefs) if err != nil { return nil, err } - refs, err := op.Run(doctx, inputRefs) - if err != nil { - return nil, err + if s.cache != nil { + if err := s.cache.Set(cacheKey, toAny(refs)); err != nil { + logrus.Errorf("failed to save cache for %s: %v", cacheKey, err) + } } - s.active.set(doctx, g.digest, refs) - return nil, nil + return refs, nil }) - if err != nil { - return nil, err +} + +func toReferenceArray(in []interface{}) ([]Reference, error) { + out := make([]Reference, 0, len(in)) + for _, i := range in { + r, ok := i.(Reference) + if !ok { + return nil, errors.Errorf("invalid reference") + } + out = append(out, r) + } + return out, nil +} + +func toAny(in []Reference) []interface{} { + out := make([]interface{}, 0, len(in)) + for _, i := range in { + out = append(out, i) } - return s.active.get(g.digest) + return out } diff --git a/solver/source.go b/solver/source.go index 647deeb4606f..ffd506b79bc8 100644 --- a/solver/source.go +++ b/solver/source.go @@ -1,14 +1,18 @@ package solver import ( + "sync" + "github.com/moby/buildkit/solver/pb" "github.com/moby/buildkit/source" "golang.org/x/net/context" ) type sourceOp struct { - op *pb.Op_Source - sm *source.Manager + mu sync.Mutex + op *pb.Op_Source + sm *source.Manager + src source.SourceInstance } func newSourceOp(op *pb.Op_Source, sm *source.Manager) (Op, error) { @@ -18,12 +22,38 @@ func newSourceOp(op *pb.Op_Source, sm *source.Manager) (Op, error) { }, nil } -func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) { +func (s *sourceOp) instance(ctx context.Context) (source.SourceInstance, error) { + s.mu.Lock() + if s.src != nil { + return s.src, nil + } id, err := source.FromString(s.op.Source.Identifier) if err != nil { return nil, err } - ref, err := s.sm.Pull(ctx, id) + src, err := s.sm.Resolve(ctx, id) + if err != nil { + return nil, err + } + s.src = src + s.mu.Unlock() + return s.src, nil +} + +func (s *sourceOp) CacheKey(ctx context.Context, _ []string) (string, error) { + src, err := s.instance(ctx) + if err != nil { + return "", err + } + return src.CacheKey(ctx) +} + +func (s *sourceOp) Run(ctx context.Context, _ []Reference) ([]Reference, error) { + src, err := s.instance(ctx) + if err != nil { + return nil, err + } + ref, err := src.Snapshot(ctx) if err != nil { return nil, err } diff --git a/solver/state.go b/solver/state.go new file mode 100644 index 000000000000..5273569dc0ce --- /dev/null +++ b/solver/state.go @@ -0,0 +1,183 @@ +package solver + +import ( + "sync" + + "github.com/moby/buildkit/util/flightcontrol" + "github.com/moby/buildkit/util/progress" + digest "github.com/opencontainers/go-digest" + "golang.org/x/net/context" +) + +// activeState holds the references to snapshots what are currently activve +// and allows sharing them between jobs + +type activeState struct { + mu sync.Mutex + states map[digest.Digest]*state + flightcontrol.Group +} + +type state struct { + *activeState + key digest.Digest + jobs map[*job]struct{} + refs []*sharedRef + cacheKey string + op Op + progressCtx context.Context + cacheCtx context.Context +} + +func (s *activeState) vertexState(j *job, key digest.Digest, cb func() (Op, error)) (*state, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.states == nil { + s.states = map[digest.Digest]*state{} + } + + st, ok := s.states[key] + if !ok { + op, err := cb() + if err != nil { + return nil, err + } + st = &state{key: key, jobs: map[*job]struct{}{}, op: op, activeState: s} + s.states[key] = st + } + st.jobs[j] = struct{}{} + return st, nil +} + +func (s *activeState) cancel(j *job) { + s.mu.Lock() + defer s.mu.Unlock() + for k, st := range s.states { + if _, ok := st.jobs[j]; ok { + delete(st.jobs, j) + } + if len(st.jobs) == 0 { + for _, r := range st.refs { + go r.Release(context.TODO()) + } + delete(s.states, k) + } + } +} + +func (s *state) GetRefs(ctx context.Context, cb func(context.Context, Op) ([]Reference, error)) ([]Reference, error) { + _, err := s.Do(ctx, s.key.String(), func(doctx context.Context) (interface{}, error) { + if s.refs != nil { + if err := writeProgressSnapshot(s.progressCtx, ctx); err != nil { + return nil, err + } + return nil, nil + } + refs, err := cb(doctx, s.op) + if err != nil { + return nil, err + } + sharedRefs := make([]*sharedRef, 0, len(refs)) + for _, r := range refs { + sharedRefs = append(sharedRefs, newSharedRef(r)) + } + s.refs = sharedRefs + s.progressCtx = doctx + return nil, nil + }) + if err != nil { + return nil, err + } + refs := make([]Reference, 0, len(s.refs)) + for _, r := range s.refs { + refs = append(refs, r.Clone()) + } + return refs, nil +} + +func (s *state) GetCacheKey(ctx context.Context, cb func(context.Context, Op) (string, error)) (string, error) { + _, err := s.Do(ctx, "cache:"+s.key.String(), func(doctx context.Context) (interface{}, error) { + if s.cacheKey != "" { + if err := writeProgressSnapshot(s.cacheCtx, ctx); err != nil { + return nil, err + } + return nil, nil + } + cacheKey, err := cb(doctx, s.op) + if err != nil { + return nil, err + } + s.cacheKey = cacheKey + s.cacheCtx = doctx + return nil, nil + }) + if err != nil { + return "", err + } + return s.cacheKey, nil +} + +func writeProgressSnapshot(srcCtx, destCtx context.Context) error { + pw, ok, _ := progress.FromContext(destCtx) + if ok { + if srcCtx != nil { + return flightcontrol.WriteProgress(srcCtx, pw) + } + } + return nil +} + +// sharedRef is a wrapper around releasable that allows you to make new +// releasable child objects +type sharedRef struct { + mu sync.Mutex + refs map[*sharedRefInstance]struct{} + main Reference + Reference +} + +func newSharedRef(main Reference) *sharedRef { + mr := &sharedRef{ + refs: make(map[*sharedRefInstance]struct{}), + Reference: main, + } + mr.main = mr.Clone() + return mr +} + +func (mr *sharedRef) Clone() Reference { + mr.mu.Lock() + r := &sharedRefInstance{sharedRef: mr} + mr.refs[r] = struct{}{} + mr.mu.Unlock() + return r +} + +func (mr *sharedRef) Release(ctx context.Context) error { + return mr.main.Release(ctx) +} + +func (mr *sharedRef) Sys() Reference { + sys := mr.Reference + if s, ok := sys.(interface { + Sys() Reference + }); ok { + return s.Sys() + } + return sys +} + +type sharedRefInstance struct { + *sharedRef +} + +func (r *sharedRefInstance) Release(ctx context.Context) error { + r.sharedRef.mu.Lock() + defer r.sharedRef.mu.Unlock() + delete(r.sharedRef.refs, r) + if len(r.sharedRef.refs) == 0 { + return r.sharedRef.Reference.Release(ctx) + } + return nil +} diff --git a/solver/vertex.go b/solver/vertex.go index 1f0eed0c4068..b30b24368db2 100644 --- a/solver/vertex.go +++ b/solver/vertex.go @@ -86,13 +86,26 @@ func (v *vertex) notifyStarted(ctx context.Context) { pw.Write(v.Digest().String(), v.clientVertex) } -func (v *vertex) notifyCompleted(ctx context.Context, err error) { +func (v *vertex) notifyCompleted(ctx context.Context, cached bool, err error) { pw, _, _ := progress.FromContext(ctx) defer pw.Close() now := time.Now() + if v.clientVertex.Started == nil { + v.clientVertex.Started = &now + } v.clientVertex.Completed = &now + v.clientVertex.Cached = cached if err != nil { v.clientVertex.Error = err.Error() } pw.Write(v.Digest().String(), v.clientVertex) } + +func (v *vertex) recursiveMarkCached(ctx context.Context) { + for _, inp := range v.inputs { + inp.vertex.recursiveMarkCached(ctx) + } + if v.clientVertex.Started == nil { + v.notifyCompleted(ctx, true, nil) + } +} diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index e6a1b4939e0c..9baa4c53dc6d 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -62,29 +62,63 @@ func (is *imageSource) ID() string { return source.DockerImageScheme } -func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.ImmutableRef, error) { - // TODO: update this to always centralize layer downloads/unpacks - // TODO: progress status +type puller struct { + is *imageSource + resolveOnce sync.Once + src *source.ImageIdentifier + desc ocispec.Descriptor + ref string + resolveErr error +} +func (is *imageSource) Resolve(ctx context.Context, id source.Identifier) (source.SourceInstance, error) { imageIdentifier, ok := id.(*source.ImageIdentifier) if !ok { return nil, errors.New("invalid identifier") } - resolveProgressDone := oneOffProgress(ctx, "resolve "+imageIdentifier.Reference.String()) - ref, desc, err := is.resolver.Resolve(ctx, imageIdentifier.Reference.String()) - if err != nil { - return nil, resolveProgressDone(err) + p := &puller{ + src: imageIdentifier, + is: is, + } + return p, nil +} + +func (p *puller) resolve(ctx context.Context) error { + p.resolveOnce.Do(func() { + resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String()) + ref, desc, err := p.is.resolver.Resolve(ctx, p.src.Reference.String()) + if err != nil { + p.resolveErr = err + resolveProgressDone(err) + return + } + p.desc = desc + p.ref = ref + resolveProgressDone(nil) + }) + return p.resolveErr +} + +func (p *puller) CacheKey(ctx context.Context) (string, error) { + if err := p.resolve(ctx); err != nil { + return "", err + } + return p.desc.Digest.String(), nil +} + +func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) { + if err := p.resolve(ctx); err != nil { + return nil, err } - resolveProgressDone(nil) - ongoing := newJobs(ref) + ongoing := newJobs(p.ref) pctx, stopProgress := context.WithCancel(ctx) - go showProgress(pctx, ongoing, is.ContentStore) + go showProgress(pctx, ongoing, p.is.ContentStore) - fetcher, err := is.resolver.Fetcher(ctx, ref) + fetcher, err := p.is.resolver.Fetcher(ctx, p.ref) if err != nil { stopProgress() return nil, err @@ -98,23 +132,23 @@ func (is *imageSource) Pull(ctx context.Context, id source.Identifier) (cache.Im ongoing.add(desc) return nil, nil }), - remotes.FetchHandler(is.ContentStore, fetcher), - images.ChildrenHandler(is.ContentStore), + remotes.FetchHandler(p.is.ContentStore, fetcher), + images.ChildrenHandler(p.is.ContentStore), } - if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil { + if err := images.Dispatch(ctx, images.Handlers(handlers...), p.desc); err != nil { stopProgress() return nil, err } stopProgress() - unpackProgressDone := oneOffProgress(ctx, "unpacking "+imageIdentifier.Reference.String()) - chainid, err := is.unpack(ctx, desc) + unpackProgressDone := oneOffProgress(ctx, "unpacking "+p.src.Reference.String()) + chainid, err := p.is.unpack(ctx, p.desc) if err != nil { return nil, unpackProgressDone(err) } unpackProgressDone(nil) - return is.CacheAccessor.Get(ctx, chainid) + return p.is.CacheAccessor.Get(ctx, chainid) } func (is *imageSource) unpack(ctx context.Context, desc ocispec.Descriptor) (string, error) { diff --git a/source/manager.go b/source/manager.go index b7c9f67f17db..90290577b704 100644 --- a/source/manager.go +++ b/source/manager.go @@ -10,18 +10,13 @@ import ( type Source interface { ID() string - Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error) + Resolve(ctx context.Context, id Identifier) (SourceInstance, error) } -// type Source interface { -// ID() string -// Resolve(ctx context.Context, id Identifier) (SourceInstance, error) -// } -// -// type SourceInstance interface { -// GetCacheKey(ctx context.Context) ([]string, error) -// GetSnapshot(ctx context.Context) (cache.ImmutableRef, error) -// } +type SourceInstance interface { + CacheKey(ctx context.Context) (string, error) + Snapshot(ctx context.Context) (cache.ImmutableRef, error) +} type Manager struct { mu sync.Mutex @@ -40,7 +35,7 @@ func (sm *Manager) Register(src Source) { sm.mu.Unlock() } -func (sm *Manager) Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, error) { +func (sm *Manager) Resolve(ctx context.Context, id Identifier) (SourceInstance, error) { sm.mu.Lock() src, ok := sm.sources[id.ID()] sm.mu.Unlock() @@ -49,5 +44,5 @@ func (sm *Manager) Pull(ctx context.Context, id Identifier) (cache.ImmutableRef, return nil, errors.Errorf("no handler fro %s", id.ID()) } - return src.Pull(ctx, id) + return src.Resolve(ctx, id) } diff --git a/util/progress/progressui/display.go b/util/progress/progressui/display.go index 4910fd7fe242..19dad48b25ca 100644 --- a/util/progress/progressui/display.go +++ b/util/progress/progressui/display.go @@ -176,6 +176,9 @@ func (t *trace) displayInfo() (d displayInfo) { j.name = "ERROR " + j.name } } + if v.Cached { + j.name = "CACHED " + j.name + } d.jobs = append(d.jobs, j) for _, s := range v.statuses { j := job{