From 2ec7e14049546f85f98e725c6a840751f448abd2 Mon Sep 17 00:00:00 2001 From: Vicent Marti Date: Mon, 8 Feb 2021 12:03:48 +0100 Subject: [PATCH] plan: remove mutexes The mutexes that were guarding the 'stats' section of plans in vtgate and vttablet were not being properly obeyed. A lot of the code was accessing the stats without acquiring the mutex first, mostly because there is no global consistency requirement between all the fields of the stats. Because of this, remove the Mutexes from both plans and switch to using Atomic operations to handle the stats fields. This reduces the sizes of the plans on cache and improves the accuracy of our memory usage metrics, since we do not have accurate size calculations for mutex objects, as they're foreign to the project. Signed-off-by: Vicent Marti --- go/vt/vtgate/engine/primitive.go | 55 +++++++++---------- go/vt/vtgate/executor.go | 8 +-- go/vt/vtgate/executor_scatter_stats.go | 30 +++++----- go/vt/vtgate/logstats.go | 2 +- go/vt/vtgate/queryz_test.go | 8 +-- go/vt/vtgate/resolver.go | 2 +- go/vt/vtgate/vcursor_impl.go | 6 +- go/vt/vttablet/tabletserver/cached_size.go | 4 +- go/vt/vttablet/tabletserver/query_engine.go | 54 +++++++++--------- go/vt/vttablet/tabletserver/query_executor.go | 2 +- go/vt/vttablet/tabletserver/queryz.go | 10 ++-- 11 files changed, 89 insertions(+), 92 deletions(-) diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 467255943f9..3eff3f9c7ea 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -18,7 +18,7 @@ package engine import ( "encoding/json" - "sync" + "sync/atomic" "time" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -154,13 +154,12 @@ type ( Instructions Primitive // Instructions contains the instructions needed to fulfil the query. BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting - mu sync.Mutex // Mutex to protect the fields below - ExecCount uint64 // Count of times this plan was executed - ExecTime time.Duration // Total execution time - ShardQueries uint64 // Total number of shard queries - RowsReturned uint64 // Total number of rows - RowsAffected uint64 // Total number of rows - Errors uint64 // Total number of errors + ExecCount uint64 // Count of times this plan was executed + ExecTime uint64 // Total execution time + ShardQueries uint64 // Total number of shard queries + RowsReturned uint64 // Total number of rows + RowsAffected uint64 // Total number of rows + Errors uint64 // Total number of errors } // Match is used to check if a Primitive matches @@ -199,26 +198,22 @@ type ( // AddStats updates the plan execution statistics func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) { - p.mu.Lock() - p.ExecCount += execCount - p.ExecTime += execTime - p.ShardQueries += shardQueries - p.RowsAffected += rowsAffected - p.RowsReturned += rowsReturned - p.Errors += errors - p.mu.Unlock() + atomic.AddUint64(&p.ExecCount, execCount) + atomic.AddUint64(&p.ExecTime, uint64(execTime)) + atomic.AddUint64(&p.ShardQueries, shardQueries) + atomic.AddUint64(&p.RowsAffected, rowsAffected) + atomic.AddUint64(&p.RowsReturned, rowsReturned) + atomic.AddUint64(&p.Errors, errors) } // Stats returns a copy of the plan execution statistics func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) { - p.mu.Lock() - execCount = p.ExecCount - execTime = p.ExecTime - shardQueries = p.ShardQueries - rowsAffected = p.RowsAffected - rowsReturned = p.RowsReturned - errors = p.Errors - p.mu.Unlock() + execCount = atomic.LoadUint64(&p.ExecCount) + execTime = time.Duration(atomic.LoadUint64(&p.ExecTime)) + shardQueries = atomic.LoadUint64(&p.ShardQueries) + rowsAffected = atomic.LoadUint64(&p.RowsAffected) + rowsReturned = atomic.LoadUint64(&p.RowsReturned) + errors = atomic.LoadUint64(&p.Errors) return } @@ -263,12 +258,12 @@ func (p *Plan) MarshalJSON() ([]byte, error) { QueryType: p.Type.String(), Original: p.Original, Instructions: instructions, - ExecCount: p.ExecCount, - ExecTime: p.ExecTime, - ShardQueries: p.ShardQueries, - RowsAffected: p.RowsAffected, - RowsReturned: p.RowsReturned, - Errors: p.Errors, + ExecCount: atomic.LoadUint64(&p.ExecCount), + ExecTime: time.Duration(atomic.LoadUint64(&p.ExecTime)), + ShardQueries: atomic.LoadUint64(&p.ShardQueries), + RowsAffected: atomic.LoadUint64(&p.RowsAffected), + RowsReturned: atomic.LoadUint64(&p.RowsReturned), + Errors: atomic.LoadUint64(&p.Errors), } return json.Marshal(marshalPlan) } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index f50287b9a46..241540c5eee 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -372,7 +372,7 @@ func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, lo func (e *Executor) handleCommit(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) { execStart := time.Now() logStats.PlanTime = execStart.Sub(logStats.StartTime) - logStats.ShardQueries = uint32(len(safeSession.ShardSessions)) + logStats.ShardQueries = uint64(len(safeSession.ShardSessions)) e.updateQueryCounts("Commit", "", "", int64(logStats.ShardQueries)) err := e.txConn.Commit(ctx, safeSession) @@ -388,7 +388,7 @@ func (e *Executor) Commit(ctx context.Context, safeSession *SafeSession) error { func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) { execStart := time.Now() logStats.PlanTime = execStart.Sub(logStats.StartTime) - logStats.ShardQueries = uint32(len(safeSession.ShardSessions)) + logStats.ShardQueries = uint64(len(safeSession.ShardSessions)) e.updateQueryCounts("Rollback", "", "", int64(logStats.ShardQueries)) err := e.txConn.Rollback(ctx, safeSession) logStats.CommitTime = time.Since(execStart) @@ -398,7 +398,7 @@ func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession, sql string, planType string, logStats *LogStats, nonTxResponse func(query string) (*sqltypes.Result, error), ignoreMaxMemoryRows bool) (*sqltypes.Result, error) { execStart := time.Now() logStats.PlanTime = execStart.Sub(logStats.StartTime) - logStats.ShardQueries = uint32(len(safeSession.ShardSessions)) + logStats.ShardQueries = uint64(len(safeSession.ShardSessions)) e.updateQueryCounts(planType, "", "", int64(logStats.ShardQueries)) defer func() { logStats.ExecuteTime = time.Since(execStart) @@ -1622,7 +1622,7 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, } logStats.RowsAffected = qr.RowsAffected - plan.AddStats(1, time.Since(logStats.StartTime), uint64(logStats.ShardQueries), qr.RowsAffected, uint64(len(qr.Rows)), errCount) + plan.AddStats(1, time.Since(logStats.StartTime), logStats.ShardQueries, qr.RowsAffected, uint64(len(qr.Rows)), errCount) return qr.Fields, err } diff --git a/go/vt/vtgate/executor_scatter_stats.go b/go/vt/vtgate/executor_scatter_stats.go index 760c18e8cc4..74641f11de6 100644 --- a/go/vt/vtgate/executor_scatter_stats.go +++ b/go/vt/vtgate/executor_scatter_stats.go @@ -20,6 +20,7 @@ import ( "fmt" "html/template" "net/http" + "sync/atomic" "time" "vitess.io/vitess/go/vt/logz" @@ -74,16 +75,16 @@ func (e *Executor) gatherScatterStats() (statsResults, error) { } plans = append(plans, plan) routes = append(routes, route) - scatterExecTime += plan.ExecTime - scatterCount += plan.ExecCount + scatterExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime)) + scatterCount += atomic.LoadUint64(&plan.ExecCount) } if readOnly { - readOnlyTime += plan.ExecTime - readOnlyCount += plan.ExecCount + readOnlyTime += time.Duration(atomic.LoadUint64(&plan.ExecTime)) + readOnlyCount += atomic.LoadUint64(&plan.ExecCount) } - totalExecTime += plan.ExecTime - totalCount += plan.ExecCount + totalExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime)) + totalCount += atomic.LoadUint64(&plan.ExecCount) return true }) if err != nil { @@ -94,19 +95,22 @@ func (e *Executor) gatherScatterStats() (statsResults, error) { resultItems := make([]*statsResultItem, len(plans)) for i, plan := range plans { route := routes[i] + execCount := atomic.LoadUint64(&plan.ExecCount) + execTime := time.Duration(atomic.LoadUint64(&plan.ExecTime)) + var avgTimePerQuery int64 - if plan.ExecCount != 0 { - avgTimePerQuery = plan.ExecTime.Nanoseconds() / int64(plan.ExecCount) + if execCount != 0 { + avgTimePerQuery = execTime.Nanoseconds() / int64(execCount) } resultItems[i] = &statsResultItem{ Query: plan.Original, AvgTimePerQuery: time.Duration(avgTimePerQuery), - PercentTimeOfReads: 100 * float64(plan.ExecTime) / float64(readOnlyTime), - PercentTimeOfScatters: 100 * float64(plan.ExecTime) / float64(scatterExecTime), - PercentCountOfReads: 100 * float64(plan.ExecCount) / float64(readOnlyCount), - PercentCountOfScatters: 100 * float64(plan.ExecCount) / float64(scatterCount), + PercentTimeOfReads: 100 * float64(execTime) / float64(readOnlyTime), + PercentTimeOfScatters: 100 * float64(execTime) / float64(scatterExecTime), + PercentCountOfReads: 100 * float64(execCount) / float64(readOnlyCount), + PercentCountOfScatters: 100 * float64(execCount) / float64(scatterCount), From: route.Keyspace.Name + "." + route.TableName, - Count: plan.ExecCount, + Count: execCount, } } result := statsResults{ diff --git a/go/vt/vtgate/logstats.go b/go/vt/vtgate/logstats.go index 7b2c6e20bc2..e70f550632e 100644 --- a/go/vt/vtgate/logstats.go +++ b/go/vt/vtgate/logstats.go @@ -47,7 +47,7 @@ type LogStats struct { BindVariables map[string]*querypb.BindVariable StartTime time.Time EndTime time.Time - ShardQueries uint32 + ShardQueries uint64 RowsAffected uint64 RowsReturned uint64 PlanTime time.Duration diff --git a/go/vt/vtgate/queryz_test.go b/go/vt/vtgate/queryz_test.go index d158567a11f..67147c4e678 100644 --- a/go/vt/vtgate/queryz_test.go +++ b/go/vt/vtgate/queryz_test.go @@ -49,7 +49,7 @@ func TestQueryzHandler(t *testing.T) { t.Fatalf("couldn't get plan from cache") } plan1 := result.(*engine.Plan) - plan1.ExecTime = 1 * time.Millisecond + plan1.ExecTime = uint64(1 * time.Millisecond) // scatter sql = "select id from user" @@ -61,7 +61,7 @@ func TestQueryzHandler(t *testing.T) { t.Fatalf("couldn't get plan from cache") } plan2 := result.(*engine.Plan) - plan2.ExecTime = 1 * time.Second + plan2.ExecTime = uint64(1 * time.Second) sql = "insert into user (id, name) values (:id, :name)" _, err = executorExec(executor, sql, map[string]*querypb.BindVariable{ @@ -91,8 +91,8 @@ func TestQueryzHandler(t *testing.T) { require.NoError(t, err) - plan3.ExecTime = 100 * time.Millisecond - plan4.ExecTime = 200 * time.Millisecond + plan3.ExecTime = uint64(100 * time.Millisecond) + plan4.ExecTime = uint64(200 * time.Millisecond) queryzHandler(executor, resp, req) body, _ := ioutil.ReadAll(resp.Body) diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go index 25d466f1c82..a06a1949eab 100644 --- a/go/vt/vtgate/resolver.go +++ b/go/vt/vtgate/resolver.go @@ -74,7 +74,7 @@ func (res *Resolver) Execute( return nil, err } if logStats != nil { - logStats.ShardQueries = uint32(len(rss)) + logStats.ShardQueries = uint64(len(rss)) } autocommit := len(rss) == 1 && canAutocommit && session.AutocommitApproval() diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index de023c26fe1..a2560932ed7 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -402,7 +402,7 @@ func (vc *vcursorImpl) Execute(method string, query string, bindVars map[string] // ExecuteMultiShard is part of the engine.VCursor interface. func (vc *vcursorImpl) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, autocommit bool) (*sqltypes.Result, []error) { - atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(queries))) + atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(queries))) qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, autocommit, vc.ignoreMaxMemoryRows) if errs == nil && rollbackOnError { @@ -457,13 +457,13 @@ func (vc *vcursorImpl) ExecuteStandalone(query string, bindVars map[string]*quer // StreamExeculteMulti is the streaming version of ExecuteMultiShard. func (vc *vcursorImpl) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error { - atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(rss))) + atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(rss))) return vc.executor.StreamExecuteMulti(vc.ctx, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.safeSession.Options, callback) } // ExecuteKeyspaceID is part of the engine.VCursor interface. func (vc *vcursorImpl) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) { - atomic.AddUint32(&vc.logStats.ShardQueries, 1) + atomic.AddUint64(&vc.logStats.ShardQueries, 1) rss, _, err := vc.ResolveDestinations(keyspace, nil, []key.Destination{key.DestinationKeyspaceID(ksid)}) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletserver/cached_size.go b/go/vt/vttablet/tabletserver/cached_size.go index eadc6dee317..7c803f4fe1e 100644 --- a/go/vt/vttablet/tabletserver/cached_size.go +++ b/go/vt/vttablet/tabletserver/cached_size.go @@ -23,10 +23,12 @@ func (cached *TabletPlan) CachedSize(alloc bool) int64 { } size := int64(0) if alloc { - size += int64(112) + size += int64(128) } // field Plan *vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder.Plan size += cached.Plan.CachedSize(true) + // field Original string + size += int64(len(cached.Original)) // field Fields []*vitess.io/vitess/go/vt/proto/query.Field { size += int64(cap(cached.Fields)) * int64(8) diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 67b6d4c68da..94f37090d47 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "sync" + "sync/atomic" "time" "context" @@ -60,37 +61,32 @@ type TabletPlan struct { Rules *rules.Rules Authorized []*tableacl.ACLResult - mu sync.Mutex - QueryCount int64 - Time time.Duration - MysqlTime time.Duration - RowsAffected int64 - RowsReturned int64 - ErrorCount int64 + QueryCount uint64 + Time uint64 + MysqlTime uint64 + RowsAffected uint64 + RowsReturned uint64 + ErrorCount uint64 } // AddStats updates the stats for the current TabletPlan. -func (ep *TabletPlan) AddStats(queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) { - ep.mu.Lock() - ep.QueryCount += queryCount - ep.Time += duration - ep.MysqlTime += mysqlTime - ep.RowsAffected += rowsAffected - ep.RowsReturned += rowsReturned - ep.ErrorCount += errorCount - ep.mu.Unlock() +func (ep *TabletPlan) AddStats(queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) { + atomic.AddUint64(&ep.QueryCount, queryCount) + atomic.AddUint64(&ep.Time, uint64(duration)) + atomic.AddUint64(&ep.MysqlTime, uint64(mysqlTime)) + atomic.AddUint64(&ep.RowsAffected, rowsAffected) + atomic.AddUint64(&ep.RowsReturned, rowsReturned) + atomic.AddUint64(&ep.ErrorCount, errorCount) } // Stats returns the current stats of TabletPlan. -func (ep *TabletPlan) Stats() (queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) { - ep.mu.Lock() - queryCount = ep.QueryCount - duration = ep.Time - mysqlTime = ep.MysqlTime - rowsAffected = ep.RowsAffected - rowsReturned = ep.RowsReturned - errorCount = ep.ErrorCount - ep.mu.Unlock() +func (ep *TabletPlan) Stats() (queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) { + queryCount = atomic.LoadUint64(&ep.QueryCount) + duration = time.Duration(atomic.LoadUint64(&ep.Time)) + mysqlTime = time.Duration(atomic.LoadUint64(&ep.MysqlTime)) + rowsAffected = atomic.LoadUint64(&ep.RowsAffected) + rowsReturned = atomic.LoadUint64(&ep.RowsReturned) + errorCount = atomic.LoadUint64(&ep.ErrorCount) return } @@ -431,12 +427,12 @@ type perQueryStats struct { Query string Table string Plan planbuilder.PlanType - QueryCount int64 + QueryCount uint64 Time time.Duration MysqlTime time.Duration - RowsAffected int64 - RowsReturned int64 - ErrorCount int64 + RowsAffected uint64 + RowsReturned uint64 + ErrorCount uint64 } func (qe *QueryEngine) handleHTTPQueryPlans(response http.ResponseWriter, request *http.Request) { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index a168311591a..b289c18545e 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -87,7 +87,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { return } qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, int64(reply.RowsAffected), 0) - qre.plan.AddStats(1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0) + qre.plan.AddStats(1, duration, mysqlTime, reply.RowsAffected, uint64(len(reply.Rows)), 0) qre.logStats.RowsAffected = int(reply.RowsAffected) qre.logStats.Rows = reply.Rows qre.tsv.Stats().ResultHistogram.Add(int64(len(reply.Rows))) diff --git a/go/vt/vttablet/tabletserver/queryz.go b/go/vt/vttablet/tabletserver/queryz.go index f44816f8f14..a564cc3a420 100644 --- a/go/vt/vttablet/tabletserver/queryz.go +++ b/go/vt/vttablet/tabletserver/queryz.go @@ -76,12 +76,12 @@ type queryzRow struct { Query string Table string Plan planbuilder.PlanType - Count int64 + Count uint64 tm time.Duration mysqlTime time.Duration - RowsAffected int64 - RowsReturned int64 - Errors int64 + RowsAffected uint64 + RowsReturned uint64 + Errors uint64 Color string } @@ -164,7 +164,7 @@ func queryzHandler(qe *QueryEngine, w http.ResponseWriter, r *http.Request) { Value.Count, Value.tm, Value.mysqlTime, Value.RowsAffected, Value.RowsReturned, Value.Errors = plan.Stats() var timepq time.Duration if Value.Count != 0 { - timepq = time.Duration(int64(Value.tm) / Value.Count) + timepq = Value.tm / time.Duration(Value.Count) } if timepq < 10*time.Millisecond { Value.Color = "low"