Skip to content

Commit

Permalink
solver: use logrus fields for more scheduler debug logs
Browse files Browse the repository at this point in the history
While debugging solver bugs with scheduler debug logs I ended up with so
many logs that I needed to write a program that parsed them and
extracted relevant information so it could be summarized more
comprehensibly.

That was greatly simplified by updating some of the old scheduler debug
logs to use logrus fields rather than one-off `fmt.Sprintf`s. All the
same information as before is present, just formatted more consistently
for easier parsability.

I also ended up removing one of the logs I recently added that printed
each job for a vertex in `loadUnlocked`. I realized that information was
parsable from the rest of the logs and thus was mostly just extra noise.

Signed-off-by: Erik Sipsma <[email protected]>
  • Loading branch information
sipsma committed Apr 26, 2024
1 parent 1b3daaa commit eab04dc
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 25 deletions.
24 changes: 11 additions & 13 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,16 +476,25 @@ func (jl *Solver) loadUnlocked(ctx context.Context, v, parent Vertex, j *Job, ca
if debugScheduler {
lg := bklog.G(ctx).
WithField("vertex_name", v.Name()).
WithField("vertex_digest", v.Digest())
WithField("vertex_digest", v.Digest()).
WithField("actives_digest_key", dgst)
if j != nil {
lg = lg.WithField("job", j.id)
}
lg.Debug("adding active vertex")
for i, inp := range v.Inputs() {
lg.WithField("input_index", i).
WithField("input_vertex_name", inp.Vertex.Name()).
WithField("input_vertex_digest", inp.Vertex.Digest()).
WithField("input_edge_index", inp.Index).
Debug("new active vertex input")
}
}
} else if debugScheduler {
lg := bklog.G(ctx).
WithField("vertex_name", v.Name()).
WithField("vertex_digest", v.Digest())
WithField("vertex_digest", v.Digest()).
WithField("actives_digest_key", dgst)
if j != nil {
lg = lg.WithField("job", j.id)
}
Expand All @@ -505,17 +514,6 @@ func (jl *Solver) loadUnlocked(ctx context.Context, v, parent Vertex, j *Job, ca
if _, ok := st.jobs[j]; !ok {
st.jobs[j] = struct{}{}
}
if debugScheduler {
jobIDs := make([]string, 0, len(st.jobs))
for j := range st.jobs {
jobIDs = append(jobIDs, j.id)
}
bklog.G(ctx).
WithField("vertex_name", v.Name()).
WithField("vertex_digest", v.Digest()).
WithField("jobs", jobIDs).
Debug("current jobs for vertex")
}
}
st.mu.Unlock()

Expand Down
72 changes: 60 additions & 12 deletions solver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,38 +423,77 @@ func (pf *pipeFactory) NewFuncRequest(f func(context.Context) (interface{}, erro
}

func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pipe.Receiver) {
log := bklog.G(context.TODO())

log.Debugf(">> unpark %s req=%d upt=%d out=%d state=%s %s", e.edge.Vertex.Name(), len(inc), len(updates), len(allPipes), e.state, e.edge.Vertex.Digest())
log := bklog.G(context.TODO()).
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index)

log.
WithField("edge_state", e.state).
WithField("req", len(inc)).
WithField("upt", len(updates)).
WithField("out", len(allPipes)).
Debug(">> unpark")

for i, dep := range e.deps {
des := edgeStatusInitial
if dep.req != nil {
des = dep.req.Request().(*edgeRequest).desiredState
}
log.Debugf(":: dep%d %s state=%s des=%s keys=%d hasslowcache=%v preprocessfunc=%v", i, e.edge.Vertex.Inputs()[i].Vertex.Name(), dep.state, des, len(dep.keys), e.slowCacheFunc(dep) != nil, e.preprocessFunc(dep) != nil)
log.
WithField("dep_index", i).
WithField("dep_vertex_name", e.edge.Vertex.Inputs()[i].Vertex.Name()).
WithField("dep_vertex_digest", e.edge.Vertex.Inputs()[i].Vertex.Digest()).
WithField("dep_state", dep.state).
WithField("dep_desired_state", des).
WithField("dep_keys", len(dep.keys)).
WithField("dep_has_slow_cache", e.slowCacheFunc(dep) != nil).
WithField("dep_preprocess_func", e.preprocessFunc(dep) != nil).
Debug(":: dep")
}

for i, in := range inc {
req := in.Request()
log.Debugf("> incoming-%d: %p dstate=%s canceled=%v", i, in, req.Payload.(*edgeRequest).desiredState, req.Canceled)
log.
WithField("incoming_index", i).
WithField("incoming_pointer", in).
WithField("incoming_desired_state", req.Payload.(*edgeRequest).desiredState).
WithField("incoming_canceled", req.Canceled).
Debug("> incoming")
}

for i, up := range updates {
if up == e.cacheMapReq {
log.Debugf("> update-%d: %p cacheMapReq complete=%v", i, up, up.Status().Completed)
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
Debug("> update cacheMapReq")
} else if up == e.execReq {
log.Debugf("> update-%d: %p execReq complete=%v", i, up, up.Status().Completed)
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
Debug("> update execReq")
} else {
st, ok := up.Status().Value.(*edgeState)
if ok {
index := -1
if dep, ok := e.depRequests[up]; ok {
index = int(dep.index)
}
log.Debugf("> update-%d: %p input-%d keys=%d state=%s", i, up, index, len(st.keys), st.state)
log.
WithField("update_index", i).
WithField("update_pointer", up).
WithField("update_complete", up.Status().Completed).
WithField("update_input_index", index).
WithField("update_keys", len(st.keys)).
WithField("update_state", st.state).
Debugf("> update edgeState")
} else {
log.Debugf("> update-%d: unknown", i)
log.
WithField("update_index", i).
Debug("> update unknown")
}
}
}
Expand All @@ -463,7 +502,16 @@ func debugSchedulerPreUnpark(e *edge, inc []pipe.Sender, updates, allPipes []pip
func debugSchedulerPostUnpark(e *edge, inc []pipe.Sender) {
log := bklog.G(context.TODO())
for i, in := range inc {
log.Debugf("< incoming-%d: %p completed=%v", i, in, in.Status().Completed)
}
log.Debugf("<< unpark %s\n", e.edge.Vertex.Name())
log.
WithField("incoming_index", i).
WithField("incoming_pointer", in).
WithField("incoming_complete", in.Status().Completed).
Debug("< incoming")
}
log.
WithField("edge_vertex_name", e.edge.Vertex.Name()).
WithField("edge_vertex_digest", e.edge.Vertex.Digest()).
WithField("edge_index", e.edge.Index).
WithField("edge_state", e.state).
Debug("<< unpark")
}

0 comments on commit eab04dc

Please sign in to comment.