Skip to content

Commit

Permalink
Merge pull request #5577 from tonistiigi/llb-constraints-race-fix
Browse files Browse the repository at this point in the history
llb: avoid concurrent map write on parallel marshal
  • Loading branch information
AkihiroSuda authored Dec 11, 2024
2 parents cd99592 + 30413b5 commit da59f1a
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 26 deletions.
1 change: 0 additions & 1 deletion client/llb/definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
// For example, after marshalling a LLB state and sending over the wire, the
// LLB state can be reconstructed from the definition.
type DefinitionOp struct {
MarshalCache
mu sync.Mutex
ops map[digest.Digest]*pb.Op
defs map[digest.Digest][]byte
Expand Down
9 changes: 6 additions & 3 deletions client/llb/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type DiffOp struct {
MarshalCache
cache MarshalCache
lower Output
upper Output
output Output
Expand All @@ -31,7 +31,10 @@ func (m *DiffOp) Validate(ctx context.Context, constraints *Constraints) error {
}

func (m *DiffOp) Marshal(ctx context.Context, constraints *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := m.Load(constraints); err == nil {
cache := m.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(constraints); err == nil {
return dgst, dt, md, srcs, nil
}
if err := m.Validate(ctx, constraints); err != nil {
Expand Down Expand Up @@ -72,7 +75,7 @@ func (m *DiffOp) Marshal(ctx context.Context, constraints *Constraints) (digest.
return "", nil, nil, nil, err
}

return m.Store(dt, md, m.constraints.SourceLocations, constraints)
return cache.Store(dt, md, m.constraints.SourceLocations, constraints)
}

func (m *DiffOp) Output() Output {
Expand Down
14 changes: 10 additions & 4 deletions client/llb/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type mount struct {
}

type ExecOp struct {
MarshalCache
cache MarshalCache
proxyEnv *ProxyEnv
root Output
mounts []*mount
Expand All @@ -63,6 +63,9 @@ type ExecOp struct {
}

func (e *ExecOp) AddMount(target string, source Output, opt ...MountOption) Output {
cache := e.cache.Acquire()
defer cache.Release()

m := &mount{
target: target,
source: source,
Expand All @@ -84,7 +87,7 @@ func (e *ExecOp) AddMount(target string, source Output, opt ...MountOption) Outp
}
m.output = o
}
e.Store(nil, nil, nil, nil)
cache.Store(nil, nil, nil, nil)
e.isValidated = false
return m.output
}
Expand Down Expand Up @@ -128,7 +131,10 @@ func (e *ExecOp) Validate(ctx context.Context, c *Constraints) error {
}

func (e *ExecOp) Marshal(ctx context.Context, c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := e.Load(c); err == nil {
cache := e.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(c); err == nil {
return dgst, dt, md, srcs, nil
}

Expand Down Expand Up @@ -446,7 +452,7 @@ func (e *ExecOp) Marshal(ctx context.Context, c *Constraints) (digest.Digest, []
if err != nil {
return "", nil, nil, nil, err
}
return e.Store(dt, md, e.constraints.SourceLocations, c)
return cache.Store(dt, md, e.constraints.SourceLocations, c)
}

func (e *ExecOp) Output() Output {
Expand Down
7 changes: 5 additions & 2 deletions client/llb/fileop.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,10 @@ func (ms *marshalState) add(fa *FileAction, c *Constraints) (*fileActionState, e
}

func (f *FileOp) Marshal(ctx context.Context, c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := f.Load(c); err == nil {
cache := f.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(c); err == nil {
return dgst, dt, md, srcs, nil
}

Expand Down Expand Up @@ -816,7 +819,7 @@ func (f *FileOp) Marshal(ctx context.Context, c *Constraints) (digest.Digest, []
if err != nil {
return "", nil, nil, nil, err
}
return f.Store(dt, md, f.constraints.SourceLocations, c)
return cache.Store(dt, md, f.constraints.SourceLocations, c)
}

func normalizePath(parent, p string, keepSlash bool) string {
Expand Down
13 changes: 13 additions & 0 deletions client/llb/fileop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/moby/buildkit/solver/pb"
digest "github.com/opencontainers/go-digest"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestFileMkdir(t *testing.T) {
Expand Down Expand Up @@ -737,3 +738,15 @@ func TestFileOpMarshalConsistency(t *testing.T) {
prevDef = def.Def
}
}

func TestParallelMarshal(t *testing.T) {
st := Scratch().File(Mkfile("/tmp", 0644, []byte("tmp 1")))
eg, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 100; i++ {
eg.Go(func() error {
_, err := st.Marshal(ctx)
return err
})
}
require.NoError(t, eg.Wait())
}
9 changes: 6 additions & 3 deletions client/llb/llbbuild/llbbuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewBuildOp(source llb.Output, opt ...BuildOption) llb.Vertex {
}

type build struct {
llb.MarshalCache
cache llb.MarshalCache
source llb.Output
info *BuildInfo
constraints llb.Constraints
Expand All @@ -47,7 +47,10 @@ func (b *build) Validate(context.Context, *llb.Constraints) error {
}

func (b *build) Marshal(ctx context.Context, c *llb.Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*llb.SourceLocation, error) {
if dgst, dt, md, srcs, err := b.Load(c); err == nil {
cache := b.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(c); err == nil {
return dgst, dt, md, srcs, nil
}

Expand Down Expand Up @@ -85,7 +88,7 @@ func (b *build) Marshal(ctx context.Context, c *llb.Constraints) (digest.Digest,
if err != nil {
return "", nil, nil, nil, err
}
return b.Store(dt, md, b.constraints.SourceLocations, c)
return cache.Store(dt, md, b.constraints.SourceLocations, c)
}

func (b *build) Output() llb.Output {
Expand Down
29 changes: 22 additions & 7 deletions client/llb/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,30 +117,45 @@ func MarshalConstraints(base, override *Constraints) (*pb.Op, *pb.OpMetadata) {
}

type MarshalCache struct {
cache sync.Map
mu sync.Mutex
cache map[*Constraints]*marshalCacheResult
}

func (mc *MarshalCache) Load(c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
v, ok := mc.cache.Load(c)
type MarshalCacheInstance struct {
*MarshalCache
}

func (mc *MarshalCache) Acquire() *MarshalCacheInstance {
mc.mu.Lock()
return &MarshalCacheInstance{mc}
}

func (mc *MarshalCacheInstance) Load(c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
res, ok := mc.cache[c]
if !ok {
return "", nil, nil, nil, cerrdefs.ErrNotFound
}

res := v.(*marshalCacheResult)
return res.digest, res.dt, res.md, res.srcs, nil
}

func (mc *MarshalCache) Store(dt []byte, md *pb.OpMetadata, srcs []*SourceLocation, c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
func (mc *MarshalCacheInstance) Store(dt []byte, md *pb.OpMetadata, srcs []*SourceLocation, c *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
res := &marshalCacheResult{
digest: digest.FromBytes(dt),
dt: dt,
md: md,
srcs: srcs,
}
mc.cache.Store(c, res)
if mc.cache == nil {
mc.cache = make(map[*Constraints]*marshalCacheResult)
}
mc.cache[c] = res
return res.digest, res.dt, res.md, res.srcs, nil
}

func (mc *MarshalCacheInstance) Release() {
mc.mu.Unlock()
}

type marshalCacheResult struct {
digest digest.Digest
dt []byte
Expand Down
9 changes: 6 additions & 3 deletions client/llb/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type MergeOp struct {
MarshalCache
cache MarshalCache
inputs []Output
output Output
constraints Constraints
Expand All @@ -32,7 +32,10 @@ func (m *MergeOp) Validate(ctx context.Context, constraints *Constraints) error
}

func (m *MergeOp) Marshal(ctx context.Context, constraints *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := m.Load(constraints); err == nil {
cache := m.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(constraints); err == nil {
return dgst, dt, md, srcs, nil
}

Expand All @@ -59,7 +62,7 @@ func (m *MergeOp) Marshal(ctx context.Context, constraints *Constraints) (digest
return "", nil, nil, nil, err
}

return m.Store(dt, md, m.constraints.SourceLocations, constraints)
return cache.Store(dt, md, m.constraints.SourceLocations, constraints)
}

func (m *MergeOp) Output() Output {
Expand Down
9 changes: 6 additions & 3 deletions client/llb/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

type SourceOp struct {
MarshalCache
cache MarshalCache
id string
attrs map[string]string
output Output
Expand Down Expand Up @@ -49,7 +49,10 @@ func (s *SourceOp) Validate(ctx context.Context, c *Constraints) error {
}

func (s *SourceOp) Marshal(ctx context.Context, constraints *Constraints) (digest.Digest, []byte, *pb.OpMetadata, []*SourceLocation, error) {
if dgst, dt, md, srcs, err := s.Load(constraints); err == nil {
cache := s.cache.Acquire()
defer cache.Release()

if dgst, dt, md, srcs, err := cache.Load(constraints); err == nil {
return dgst, dt, md, srcs, nil
}

Expand Down Expand Up @@ -82,7 +85,7 @@ func (s *SourceOp) Marshal(ctx context.Context, constraints *Constraints) (diges
return "", nil, nil, nil, err
}

return s.Store(dt, md, s.constraints.SourceLocations, constraints)
return cache.Store(dt, md, s.constraints.SourceLocations, constraints)
}

func (s *SourceOp) Output() Output {
Expand Down

0 comments on commit da59f1a

Please sign in to comment.