From 1b3daaa396cc49ce5433f892bd3e2d8b4a0009e6 Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Fri, 26 Apr 2024 11:18:35 -0700 Subject: [PATCH 1/3] solver: prevent edge merge to inactive states Before this, it was possible for an edge merge to happen to a target edge/state that is no longer in the actives map, e.g. 1. Job A solves some edges 2. Job B solves some edges that get merged with job A's edges 3. Job A is discarded 4. Job C solves some edges that get merged with job B's edges, which recursively end up merging to job A's, which no longer exist in the actives map. While this doesn't always result in an error, given the right state and order of operations this can result in `getState` or `getEdge` being called on the "stale" edges that are inactive and an error. E.g. if a stale dep transitions from desired state `cache-fast` to `cache-slow`, the slow cache logic will call `getState` on it and return a `compute cache` error. I also *suspect* this is the same root cause behind `inconsistent graph state` errors still seen occasionally, but have not repro'd that error locally so can't be 100% sure yet. The fix here updates `state.setEdge` to register all the jobs in the source edge's state with the target edge's state. This works out because: 1. edges that are the target of a merge will not have their state removed from the actives map if only the original job creating them is discarded 1. those edges are still removed eventually, but only when all jobs referencing them (now including jobs referencing them via edge merges) are discarded Signed-off-by: Erik Sipsma --- solver/jobs.go | 6 +++ solver/scheduler_test.go | 82 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/solver/jobs.go b/solver/jobs.go index ddde959d34c1..8b5487a5e58b 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -176,6 +176,12 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) { targetEdge.takeOwnership(e) if targetState != nil { + targetState.mu.Lock() + for j := range s.jobs { + targetState.jobs[j] = struct{}{} + } + targetState.mu.Unlock() + if _, ok := targetState.allPw[s.mpw]; !ok { targetState.mpw.Add(s.mpw) targetState.allPw[s.mpw] = struct{}{} diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 81acbc986ed3..06f99207f830 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3447,6 +3447,88 @@ func TestUnknownBuildID(t *testing.T) { require.Contains(t, err.Error(), "no such job") } +func TestStaleEdgeMerge(t *testing.T) { + // should not be possible to merge to an edge no longer in the actives map + t.Parallel() + ctx := context.TODO() + + s := NewSolver(SolverOpt{ + ResolveOpFunc: testOpResolver, + }) + defer s.Close() + + // These should all end up edge merged + v0 := vtxAdd(2, vtxOpt{name: "v0", inputs: []Edge{ + {Vertex: vtxConst(3, vtxOpt{})}, + {Vertex: vtxConst(4, vtxOpt{})}, + }}) + v1 := vtxAdd(2, vtxOpt{name: "v1", inputs: []Edge{ + {Vertex: vtxConst(3, vtxOpt{})}, + {Vertex: vtxConst(4, vtxOpt{})}, + }}) + v2 := vtxAdd(2, vtxOpt{name: "v2", inputs: []Edge{ + {Vertex: vtxConst(3, vtxOpt{})}, + {Vertex: vtxConst(4, vtxOpt{})}, + }}) + + j0, err := s.NewJob("job0") + require.NoError(t, err) + g0 := Edge{Vertex: v0} + res, err := j0.Build(ctx, g0) + require.NoError(t, err) + require.NotNil(t, res) + + // this edge should be merged with the one from j0 + j1, err := s.NewJob("job1") + require.NoError(t, err) + g1 := Edge{Vertex: v1} + res, err = j1.Build(ctx, g1) + require.NoError(t, err) + require.NotNil(t, res) + + // discard j0, verify that v0 is still active and it's state contains j1 since j1's + // edge was merged to v0's state + require.NoError(t, j0.Discard()) + require.Contains(t, s.actives, v0.Digest()) + require.Contains(t, s.actives, v1.Digest()) + require.NotContains(t, s.actives[v0.Digest()].jobs, j0) + require.Contains(t, s.actives[v0.Digest()].jobs, j1) + require.Contains(t, s.actives[v1.Digest()].jobs, j1) + + // verify another job can still merge + j2, err := s.NewJob("job2") + require.NoError(t, err) + g2 := Edge{Vertex: v2} + res, err = j2.Build(ctx, g2) + require.NoError(t, err) + require.NotNil(t, res) + + require.Contains(t, s.actives, v0.Digest()) + require.Contains(t, s.actives, v1.Digest()) + require.Contains(t, s.actives, v2.Digest()) + require.NotContains(t, s.actives[v0.Digest()].jobs, j0) + require.Contains(t, s.actives[v0.Digest()].jobs, j1) + require.Contains(t, s.actives[v0.Digest()].jobs, j2) + require.Contains(t, s.actives[v1.Digest()].jobs, j1) + require.Contains(t, s.actives[v2.Digest()].jobs, j2) + + // discard j1, verify only referenced edges still exist + require.NoError(t, j1.Discard()) + require.Contains(t, s.actives, v0.Digest()) + require.NotContains(t, s.actives, v1.Digest()) + require.Contains(t, s.actives, v2.Digest()) + require.NotContains(t, s.actives[v0.Digest()].jobs, j0) + require.NotContains(t, s.actives[v0.Digest()].jobs, j1) + require.Contains(t, s.actives[v0.Digest()].jobs, j2) + require.Contains(t, s.actives[v2.Digest()].jobs, j2) + + // discard the last job and verify everything was removed now + require.NoError(t, j2.Discard()) + require.NotContains(t, s.actives, v0.Digest()) + require.NotContains(t, s.actives, v1.Digest()) + require.NotContains(t, s.actives, v2.Digest()) +} + func generateSubGraph(nodes int) (Edge, int) { if nodes == 1 { value := rand.Int() % 500 //nolint:gosec From eab04dc35d263f8fe34f028e1a83241d23f4f327 Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Fri, 26 Apr 2024 11:30:36 -0700 Subject: [PATCH 2/3] solver: use logrus fields for more scheduler debug logs While debugging solver bugs with scheduler debug logs I ended up with so many logs that I needed to write a program that parsed them and extracted relevant information so it could be summarized more comprehensibly. That was greatly simplified by updating some of the old scheduler debug logs to use logrus fields rather than one-off `fmt.Sprintf`s. All the same information as before is present, just formatted more consistently for easier parsability. I also ended up removing one of the logs I recently added that printed each job for a vertex in `loadUnlocked`. I realized that information was parsable from the rest of the logs and thus was mostly just extra noise. Signed-off-by: Erik Sipsma --- solver/jobs.go | 24 +++++++-------- solver/scheduler.go | 72 +++++++++++++++++++++++++++++++++++++-------- 2 files changed, 71 insertions(+), 25 deletions(-) diff --git a/solver/jobs.go b/solver/jobs.go index 8b5487a5e58b..2ee5b09b8398 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -476,16 +476,25 @@ func (jl *Solver) loadUnlocked(ctx context.Context, v, parent Vertex, j *Job, ca if debugScheduler { lg := bklog.G(ctx). WithField("vertex_name", v.Name()). - WithField("vertex_digest", v.Digest()) + WithField("vertex_digest", v.Digest()). + WithField("actives_digest_key", dgst) if j != nil { lg = lg.WithField("job", j.id) } lg.Debug("adding active vertex") + for i, inp := range v.Inputs() { + lg.WithField("input_index", i). + WithField("input_vertex_name", inp.Vertex.Name()). + WithField("input_vertex_digest", inp.Vertex.Digest()). + WithField("input_edge_index", inp.Index). + Debug("new active vertex input") + } } } else if debugScheduler { lg := bklog.G(ctx). WithField("vertex_name", v.Name()). - WithField("vertex_digest", v.Digest()) + WithField("vertex_digest", v.Digest()). + WithField("actives_digest_key", dgst) if j != nil { lg = lg.WithField("job", j.id) } @@ -505,17 +514,6 @@ func (jl *Solver) loadUnlocked(ctx context.Context, v, parent Vertex, j *Job, ca if _, ok := st.jobs[j]; !ok { st.jobs[j] = struct{}{} } - if debugScheduler { - jobIDs := make([]string, 0, len(st.jobs)) - for j := range st.jobs { - jobIDs = append(jobIDs, j.id) - } - bklog.G(ctx). - WithField("vertex_name", v.Name()). - WithField("vertex_digest", v.Digest()). - WithField("jobs", jobIDs). - Debug("current jobs for vertex") - } } st.mu.Unlock() diff --git a/solver/scheduler.go b/solver/scheduler.go index fe39f1dae39f..3cfef8f934b4 100644 --- a/solver/scheduler.go +++ b/solver/scheduler.go @@ -423,28 +423,58 @@ func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, erro } func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pipe.Receiver) { - log := bklog.G(context.TODO()) - - log.Debugf(">> unpark %s req=%d upt=%d out=%d state=%s %s", e.edge.Vertex.Name(), len(inc), len(updates), len(allPipes), e.state, e.edge.Vertex.Digest()) + log := bklog.G(context.TODO()). + WithField("edge_vertex_name", e.edge.Vertex.Name()). + WithField("edge_vertex_digest", e.edge.Vertex.Digest()). + WithField("edge_index", e.edge.Index) + + log. + WithField("edge_state", e.state). + WithField("req", len(inc)). + WithField("upt", len(updates)). + WithField("out", len(allPipes)). + Debug(">> unpark") for i, dep := range e.deps { des := edgeStatusInitial if dep.req != nil { des = dep.req.Request().(*edgeRequest).desiredState } - log.Debugf(":: dep%d %s state=%s des=%s keys=%d hasslowcache=%v preprocessfunc=%v", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des, len(dep.keys), e.slowCacheFunc(dep) != nil, e.preprocessFunc(dep) != nil) + log. + WithField("dep_index", i). + WithField("dep_vertex_name", e.edge.Vertex.Inputs()[i].Vertex.Name()). + WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[i].Vertex.Digest()). + WithField("dep_state", dep.state). + WithField("dep_desired_state", des). + WithField("dep_keys", len(dep.keys)). + WithField("dep_has_slow_cache", e.slowCacheFunc(dep) != nil). + WithField("dep_preprocess_func", e.preprocessFunc(dep) != nil). + Debug(":: dep") } for i, in := range inc { req := in.Request() - log.Debugf("> incoming-%d: %p dstate=%s canceled=%v", i, in, req.Payload.(*edgeRequest).desiredState, req.Canceled) + log. + WithField("incoming_index", i). + WithField("incoming_pointer", in). + WithField("incoming_desired_state", req.Payload.(*edgeRequest).desiredState). + WithField("incoming_canceled", req.Canceled). + Debug("> incoming") } for i, up := range updates { if up == e.cacheMapReq { - log.Debugf("> update-%d: %p cacheMapReq complete=%v", i, up, up.Status().Completed) + log. + WithField("update_index", i). + WithField("update_pointer", up). + WithField("update_complete", up.Status().Completed). + Debug("> update cacheMapReq") } else if up == e.execReq { - log.Debugf("> update-%d: %p execReq complete=%v", i, up, up.Status().Completed) + log. + WithField("update_index", i). + WithField("update_pointer", up). + WithField("update_complete", up.Status().Completed). + Debug("> update execReq") } else { st, ok := up.Status().Value.(*edgeState) if ok { @@ -452,9 +482,18 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip if dep, ok := e.depRequests[up]; ok { index = int(dep.index) } - log.Debugf("> update-%d: %p input-%d keys=%d state=%s", i, up, index, len(st.keys), st.state) + log. + WithField("update_index", i). + WithField("update_pointer", up). + WithField("update_complete", up.Status().Completed). + WithField("update_input_index", index). + WithField("update_keys", len(st.keys)). + WithField("update_state", st.state). + Debugf("> update edgeState") } else { - log.Debugf("> update-%d: unknown", i) + log. + WithField("update_index", i). + Debug("> update unknown") } } } @@ -463,7 +502,16 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip func debugSchedulerPostUnpark(e *edge, inc []pipe.Sender) { log := bklog.G(context.TODO()) for i, in := range inc { - log.Debugf("< incoming-%d: %p completed=%v", i, in, in.Status().Completed) - } - log.Debugf("<< unpark %s\n", e.edge.Vertex.Name()) + log. + WithField("incoming_index", i). + WithField("incoming_pointer", in). + WithField("incoming_complete", in.Status().Completed). + Debug("< incoming") + } + log. + WithField("edge_vertex_name", e.edge.Vertex.Name()). + WithField("edge_vertex_digest", e.edge.Vertex.Digest()). + WithField("edge_index", e.edge.Index). + WithField("edge_state", e.state). + Debug("<< unpark") } From ffdbd863b34ad0c6505949ebbda5e61722c939ad Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Fri, 26 Apr 2024 18:55:25 -0700 Subject: [PATCH 3/3] solver: recursively add merge source jobs to target and ancestors Signed-off-by: Erik Sipsma --- solver/jobs.go | 51 +++++++++++++++++++++++--- solver/scheduler_test.go | 78 +++++++++++++++++++++++++++++++++------- 2 files changed, 111 insertions(+), 18 deletions(-) diff --git a/solver/jobs.go b/solver/jobs.go index 2ee5b09b8398..ea8a9a1b3b3e 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -176,11 +176,7 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) { targetEdge.takeOwnership(e) if targetState != nil { - targetState.mu.Lock() - for j := range s.jobs { - targetState.jobs[j] = struct{}{} - } - targetState.mu.Unlock() + targetState.addJobs(s, map[*state]struct{}{}) if _, ok := targetState.allPw[s.mpw]; !ok { targetState.mpw.Add(s.mpw) @@ -189,6 +185,51 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) { } } +// addJobs recursively adds jobs to state and all its ancestors. currently +// only used during edge merges to add jobs from the source of the merge to the +// target and its ancestors. +// requires that Solver.mu is read-locked and srcState.mu is locked +func (s *state) addJobs(srcState *state, memo map[*state]struct{}) { + if _, ok := memo[s]; ok { + return + } + memo[s] = struct{}{} + + s.mu.Lock() + defer s.mu.Unlock() + + for j := range srcState.jobs { + s.jobs[j] = struct{}{} + } + + for _, inputEdge := range s.vtx.Inputs() { + inputState, ok := s.solver.actives[inputEdge.Vertex.Digest()] + if !ok { + bklog.G(context.TODO()). + WithField("vertex_digest", inputEdge.Vertex.Digest()). + Error("input vertex not found during addJobs") + continue + } + inputState.addJobs(srcState, memo) + + // tricky case: if the inputState's edge was *already* merged we should + // also add jobs to the merged edge's state + mergedInputEdge := inputState.getEdge(inputEdge.Index) + if mergedInputEdge == nil || mergedInputEdge.edge.Vertex.Digest() == inputEdge.Vertex.Digest() { + // not merged + continue + } + mergedInputState, ok := s.solver.actives[mergedInputEdge.edge.Vertex.Digest()] + if !ok { + bklog.G(context.TODO()). + WithField("vertex_digest", mergedInputEdge.edge.Vertex.Digest()). + Error("merged input vertex not found during addJobs") + continue + } + mergedInputState.addJobs(srcState, memo) + } +} + func (s *state) combinedCacheManager() CacheManager { s.mu.Lock() cms := make([]CacheManager, 0, len(s.cache)+1) diff --git a/solver/scheduler_test.go b/solver/scheduler_test.go index 06f99207f830..46ef12de7635 100644 --- a/solver/scheduler_test.go +++ b/solver/scheduler_test.go @@ -3457,18 +3457,19 @@ func TestStaleEdgeMerge(t *testing.T) { }) defer s.Close() + depV0 := vtxConst(1, vtxOpt{name: "depV0"}) + depV1 := vtxConst(1, vtxOpt{name: "depV1"}) + depV2 := vtxConst(1, vtxOpt{name: "depV2"}) + // These should all end up edge merged v0 := vtxAdd(2, vtxOpt{name: "v0", inputs: []Edge{ - {Vertex: vtxConst(3, vtxOpt{})}, - {Vertex: vtxConst(4, vtxOpt{})}, + {Vertex: depV0}, }}) v1 := vtxAdd(2, vtxOpt{name: "v1", inputs: []Edge{ - {Vertex: vtxConst(3, vtxOpt{})}, - {Vertex: vtxConst(4, vtxOpt{})}, + {Vertex: depV1}, }}) v2 := vtxAdd(2, vtxOpt{name: "v2", inputs: []Edge{ - {Vertex: vtxConst(3, vtxOpt{})}, - {Vertex: vtxConst(4, vtxOpt{})}, + {Vertex: depV2}, }}) j0, err := s.NewJob("job0") @@ -3478,6 +3479,11 @@ func TestStaleEdgeMerge(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) + require.Contains(t, s.actives, v0.Digest()) + require.Contains(t, s.actives[v0.Digest()].jobs, j0) + require.Contains(t, s.actives, depV0.Digest()) + require.Contains(t, s.actives[depV0.Digest()].jobs, j0) + // this edge should be merged with the one from j0 j1, err := s.NewJob("job1") require.NoError(t, err) @@ -3486,14 +3492,37 @@ func TestStaleEdgeMerge(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) + require.Contains(t, s.actives, v0.Digest()) + require.Contains(t, s.actives[v0.Digest()].jobs, j0) + require.Contains(t, s.actives[v0.Digest()].jobs, j1) + require.Contains(t, s.actives, depV0.Digest()) + require.Contains(t, s.actives[depV0.Digest()].jobs, j0) + require.Contains(t, s.actives[depV0.Digest()].jobs, j1) + + require.Contains(t, s.actives, v1.Digest()) + require.NotContains(t, s.actives[v1.Digest()].jobs, j0) + require.Contains(t, s.actives[v1.Digest()].jobs, j1) + require.Contains(t, s.actives, depV1.Digest()) + require.NotContains(t, s.actives[depV1.Digest()].jobs, j0) + require.Contains(t, s.actives[depV1.Digest()].jobs, j1) + // discard j0, verify that v0 is still active and it's state contains j1 since j1's // edge was merged to v0's state require.NoError(t, j0.Discard()) + require.Contains(t, s.actives, v0.Digest()) - require.Contains(t, s.actives, v1.Digest()) require.NotContains(t, s.actives[v0.Digest()].jobs, j0) require.Contains(t, s.actives[v0.Digest()].jobs, j1) + require.Contains(t, s.actives, depV0.Digest()) + require.NotContains(t, s.actives[depV0.Digest()].jobs, j0) + require.Contains(t, s.actives[depV0.Digest()].jobs, j1) + + require.Contains(t, s.actives, v1.Digest()) + require.NotContains(t, s.actives[v1.Digest()].jobs, j0) require.Contains(t, s.actives[v1.Digest()].jobs, j1) + require.Contains(t, s.actives, depV1.Digest()) + require.NotContains(t, s.actives[depV1.Digest()].jobs, j0) + require.Contains(t, s.actives[depV1.Digest()].jobs, j1) // verify another job can still merge j2, err := s.NewJob("job2") @@ -3504,29 +3533,52 @@ func TestStaleEdgeMerge(t *testing.T) { require.NotNil(t, res) require.Contains(t, s.actives, v0.Digest()) - require.Contains(t, s.actives, v1.Digest()) - require.Contains(t, s.actives, v2.Digest()) - require.NotContains(t, s.actives[v0.Digest()].jobs, j0) require.Contains(t, s.actives[v0.Digest()].jobs, j1) require.Contains(t, s.actives[v0.Digest()].jobs, j2) + require.Contains(t, s.actives, depV0.Digest()) + require.Contains(t, s.actives[depV0.Digest()].jobs, j1) + require.Contains(t, s.actives[depV0.Digest()].jobs, j2) + + require.Contains(t, s.actives, v1.Digest()) require.Contains(t, s.actives[v1.Digest()].jobs, j1) + require.NotContains(t, s.actives[v1.Digest()].jobs, j2) + require.Contains(t, s.actives, depV1.Digest()) + require.Contains(t, s.actives[depV1.Digest()].jobs, j1) + require.NotContains(t, s.actives[depV1.Digest()].jobs, j2) + + require.Contains(t, s.actives, v2.Digest()) + require.NotContains(t, s.actives[v2.Digest()].jobs, j1) require.Contains(t, s.actives[v2.Digest()].jobs, j2) + require.Contains(t, s.actives, depV2.Digest()) + require.NotContains(t, s.actives[depV2.Digest()].jobs, j1) + require.Contains(t, s.actives[depV2.Digest()].jobs, j2) // discard j1, verify only referenced edges still exist require.NoError(t, j1.Discard()) + require.Contains(t, s.actives, v0.Digest()) - require.NotContains(t, s.actives, v1.Digest()) - require.Contains(t, s.actives, v2.Digest()) - require.NotContains(t, s.actives[v0.Digest()].jobs, j0) require.NotContains(t, s.actives[v0.Digest()].jobs, j1) require.Contains(t, s.actives[v0.Digest()].jobs, j2) + require.Contains(t, s.actives, depV0.Digest()) + require.NotContains(t, s.actives[depV0.Digest()].jobs, j1) + require.Contains(t, s.actives[depV0.Digest()].jobs, j2) + + require.NotContains(t, s.actives, v1.Digest()) + require.NotContains(t, s.actives, depV1.Digest()) + + require.Contains(t, s.actives, v2.Digest()) require.Contains(t, s.actives[v2.Digest()].jobs, j2) + require.Contains(t, s.actives, depV2.Digest()) + require.Contains(t, s.actives[depV2.Digest()].jobs, j2) // discard the last job and verify everything was removed now require.NoError(t, j2.Discard()) require.NotContains(t, s.actives, v0.Digest()) require.NotContains(t, s.actives, v1.Digest()) require.NotContains(t, s.actives, v2.Digest()) + require.NotContains(t, s.actives, depV0.Digest()) + require.NotContains(t, s.actives, depV1.Digest()) + require.NotContains(t, s.actives, depV2.Digest()) } func generateSubGraph(nodes int) (Edge, int) {