diff --git a/go/vt/vtgate/executorcontext/vcursor_impl.go b/go/vt/vtgate/executorcontext/vcursor_impl.go index 40317f5103a..df989fd7a67 100644 --- a/go/vt/vtgate/executorcontext/vcursor_impl.go +++ b/go/vt/vtgate/executorcontext/vcursor_impl.go @@ -22,6 +22,7 @@ import ( "io" "sort" "strings" + "sync" "sync/atomic" "time" @@ -154,6 +155,8 @@ type ( observer ResultsObserver + // this protects the interOpStats and shardsStats fields from concurrent writes + mu sync.Mutex // this is a map of the number of rows that every primitive has returned // if this field is nil, it means that we are not logging operator traffic interOpStats map[engine.Primitive]engine.RowsReceived @@ -642,21 +645,29 @@ func (vc *VCursorImpl) ExecutePrimitive(ctx context.Context, primitive engine.Pr } func (vc *VCursorImpl) logOpTraffic(primitive engine.Primitive, res *sqltypes.Result) { - if vc.interOpStats != nil { - rows := vc.interOpStats[primitive] - if res == nil { - rows = append(rows, 0) - } else { - rows = append(rows, len(res.Rows)) - } - vc.interOpStats[primitive] = rows + if vc.interOpStats == nil { + return + } + + vc.mu.Lock() + defer vc.mu.Unlock() + + rows := vc.interOpStats[primitive] + if res == nil { + rows = append(rows, 0) + } else { + rows = append(rows, len(res.Rows)) } + vc.interOpStats[primitive] = rows } func (vc *VCursorImpl) logShardsQueried(primitive engine.Primitive, shardsNb int) { - if vc.shardsStats != nil { - vc.shardsStats[primitive] += engine.ShardsQueried(shardsNb) + if vc.shardsStats == nil { + return } + vc.mu.Lock() + defer vc.mu.Unlock() + vc.shardsStats[primitive] += engine.ShardsQueried(shardsNb) } func (vc *VCursorImpl) ExecutePrimitiveStandalone(ctx context.Context, primitive engine.Primitive, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {