diff --git a/go/vt/vtgate/engine/primitive.go b/go/vt/vtgate/engine/primitive.go index 467255943f9..3eff3f9c7ea 100644 --- a/go/vt/vtgate/engine/primitive.go +++ b/go/vt/vtgate/engine/primitive.go @@ -18,7 +18,7 @@ package engine import ( "encoding/json" - "sync" + "sync/atomic" "time" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -154,13 +154,12 @@ type ( Instructions Primitive // Instructions contains the instructions needed to fulfil the query. BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting - mu sync.Mutex // Mutex to protect the fields below - ExecCount uint64 // Count of times this plan was executed - ExecTime time.Duration // Total execution time - ShardQueries uint64 // Total number of shard queries - RowsReturned uint64 // Total number of rows - RowsAffected uint64 // Total number of rows - Errors uint64 // Total number of errors + ExecCount uint64 // Count of times this plan was executed + ExecTime uint64 // Total execution time + ShardQueries uint64 // Total number of shard queries + RowsReturned uint64 // Total number of rows + RowsAffected uint64 // Total number of rows + Errors uint64 // Total number of errors } // Match is used to check if a Primitive matches @@ -199,26 +198,22 @@ type ( // AddStats updates the plan execution statistics func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) { - p.mu.Lock() - p.ExecCount += execCount - p.ExecTime += execTime - p.ShardQueries += shardQueries - p.RowsAffected += rowsAffected - p.RowsReturned += rowsReturned - p.Errors += errors - p.mu.Unlock() + atomic.AddUint64(&p.ExecCount, execCount) + atomic.AddUint64(&p.ExecTime, uint64(execTime)) + atomic.AddUint64(&p.ShardQueries, shardQueries) + atomic.AddUint64(&p.RowsAffected, rowsAffected) + atomic.AddUint64(&p.RowsReturned, rowsReturned) + atomic.AddUint64(&p.Errors, errors) } // Stats returns a copy of the plan execution statistics func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) { - p.mu.Lock() - execCount = p.ExecCount - execTime = p.ExecTime - shardQueries = p.ShardQueries - rowsAffected = p.RowsAffected - rowsReturned = p.RowsReturned - errors = p.Errors - p.mu.Unlock() + execCount = atomic.LoadUint64(&p.ExecCount) + execTime = time.Duration(atomic.LoadUint64(&p.ExecTime)) + shardQueries = atomic.LoadUint64(&p.ShardQueries) + rowsAffected = atomic.LoadUint64(&p.RowsAffected) + rowsReturned = atomic.LoadUint64(&p.RowsReturned) + errors = atomic.LoadUint64(&p.Errors) return } @@ -263,12 +258,12 @@ func (p *Plan) MarshalJSON() ([]byte, error) { QueryType: p.Type.String(), Original: p.Original, Instructions: instructions, - ExecCount: p.ExecCount, - ExecTime: p.ExecTime, - ShardQueries: p.ShardQueries, - RowsAffected: p.RowsAffected, - RowsReturned: p.RowsReturned, - Errors: p.Errors, + ExecCount: atomic.LoadUint64(&p.ExecCount), + ExecTime: time.Duration(atomic.LoadUint64(&p.ExecTime)), + ShardQueries: atomic.LoadUint64(&p.ShardQueries), + RowsAffected: atomic.LoadUint64(&p.RowsAffected), + RowsReturned: atomic.LoadUint64(&p.RowsReturned), + Errors: atomic.LoadUint64(&p.Errors), } return json.Marshal(marshalPlan) } diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index f50287b9a46..241540c5eee 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -372,7 +372,7 @@ func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, lo func (e *Executor) handleCommit(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) { execStart := time.Now() logStats.PlanTime = execStart.Sub(logStats.StartTime) - logStats.ShardQueries = uint32(len(safeSession.ShardSessions)) + logStats.ShardQueries = uint64(len(safeSession.ShardSessions)) e.updateQueryCounts("Commit", "", "", int64(logStats.ShardQueries)) err := e.txConn.Commit(ctx, safeSession) @@ -388,7 +388,7 @@ func (e *Executor) Commit(ctx context.Context, safeSession *SafeSession) error { func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) { execStart := time.Now() logStats.PlanTime = execStart.Sub(logStats.StartTime) - logStats.ShardQueries = uint32(len(safeSession.ShardSessions)) + logStats.ShardQueries = uint64(len(safeSession.ShardSessions)) e.updateQueryCounts("Rollback", "", "", int64(logStats.ShardQueries)) err := e.txConn.Rollback(ctx, safeSession) logStats.CommitTime = time.Since(execStart) @@ -398,7 +398,7 @@ func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession, sql string, planType string, logStats *LogStats, nonTxResponse func(query string) (*sqltypes.Result, error), ignoreMaxMemoryRows bool) (*sqltypes.Result, error) { execStart := time.Now() logStats.PlanTime = execStart.Sub(logStats.StartTime) - logStats.ShardQueries = uint32(len(safeSession.ShardSessions)) + logStats.ShardQueries = uint64(len(safeSession.ShardSessions)) e.updateQueryCounts(planType, "", "", int64(logStats.ShardQueries)) defer func() { logStats.ExecuteTime = time.Since(execStart) @@ -1622,7 +1622,7 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, } logStats.RowsAffected = qr.RowsAffected - plan.AddStats(1, time.Since(logStats.StartTime), uint64(logStats.ShardQueries), qr.RowsAffected, uint64(len(qr.Rows)), errCount) + plan.AddStats(1, time.Since(logStats.StartTime), logStats.ShardQueries, qr.RowsAffected, uint64(len(qr.Rows)), errCount) return qr.Fields, err } diff --git a/go/vt/vtgate/executor_scatter_stats.go b/go/vt/vtgate/executor_scatter_stats.go index 760c18e8cc4..74641f11de6 100644 --- a/go/vt/vtgate/executor_scatter_stats.go +++ b/go/vt/vtgate/executor_scatter_stats.go @@ -20,6 +20,7 @@ import ( "fmt" "html/template" "net/http" + "sync/atomic" "time" "vitess.io/vitess/go/vt/logz" @@ -74,16 +75,16 @@ func (e *Executor) gatherScatterStats() (statsResults, error) { } plans = append(plans, plan) routes = append(routes, route) - scatterExecTime += plan.ExecTime - scatterCount += plan.ExecCount + scatterExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime)) + scatterCount += atomic.LoadUint64(&plan.ExecCount) } if readOnly { - readOnlyTime += plan.ExecTime - readOnlyCount += plan.ExecCount + readOnlyTime += time.Duration(atomic.LoadUint64(&plan.ExecTime)) + readOnlyCount += atomic.LoadUint64(&plan.ExecCount) } - totalExecTime += plan.ExecTime - totalCount += plan.ExecCount + totalExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime)) + totalCount += atomic.LoadUint64(&plan.ExecCount) return true }) if err != nil { @@ -94,19 +95,22 @@ func (e *Executor) gatherScatterStats() (statsResults, error) { resultItems := make([]*statsResultItem, len(plans)) for i, plan := range plans { route := routes[i] + execCount := atomic.LoadUint64(&plan.ExecCount) + execTime := time.Duration(atomic.LoadUint64(&plan.ExecTime)) + var avgTimePerQuery int64 - if plan.ExecCount != 0 { - avgTimePerQuery = plan.ExecTime.Nanoseconds() / int64(plan.ExecCount) + if execCount != 0 { + avgTimePerQuery = execTime.Nanoseconds() / int64(execCount) } resultItems[i] = &statsResultItem{ Query: plan.Original, AvgTimePerQuery: time.Duration(avgTimePerQuery), - PercentTimeOfReads: 100 * float64(plan.ExecTime) / float64(readOnlyTime), - PercentTimeOfScatters: 100 * float64(plan.ExecTime) / float64(scatterExecTime), - PercentCountOfReads: 100 * float64(plan.ExecCount) / float64(readOnlyCount), - PercentCountOfScatters: 100 * float64(plan.ExecCount) / float64(scatterCount), + PercentTimeOfReads: 100 * float64(execTime) / float64(readOnlyTime), + PercentTimeOfScatters: 100 * float64(execTime) / float64(scatterExecTime), + PercentCountOfReads: 100 * float64(execCount) / float64(readOnlyCount), + PercentCountOfScatters: 100 * float64(execCount) / float64(scatterCount), From: route.Keyspace.Name + "." + route.TableName, - Count: plan.ExecCount, + Count: execCount, } } result := statsResults{ diff --git a/go/vt/vtgate/logstats.go b/go/vt/vtgate/logstats.go index 7b2c6e20bc2..e70f550632e 100644 --- a/go/vt/vtgate/logstats.go +++ b/go/vt/vtgate/logstats.go @@ -47,7 +47,7 @@ type LogStats struct { BindVariables map[string]*querypb.BindVariable StartTime time.Time EndTime time.Time - ShardQueries uint32 + ShardQueries uint64 RowsAffected uint64 RowsReturned uint64 PlanTime time.Duration diff --git a/go/vt/vtgate/queryz_test.go b/go/vt/vtgate/queryz_test.go index d158567a11f..67147c4e678 100644 --- a/go/vt/vtgate/queryz_test.go +++ b/go/vt/vtgate/queryz_test.go @@ -49,7 +49,7 @@ func TestQueryzHandler(t *testing.T) { t.Fatalf("couldn't get plan from cache") } plan1 := result.(*engine.Plan) - plan1.ExecTime = 1 * time.Millisecond + plan1.ExecTime = uint64(1 * time.Millisecond) // scatter sql = "select id from user" @@ -61,7 +61,7 @@ func TestQueryzHandler(t *testing.T) { t.Fatalf("couldn't get plan from cache") } plan2 := result.(*engine.Plan) - plan2.ExecTime = 1 * time.Second + plan2.ExecTime = uint64(1 * time.Second) sql = "insert into user (id, name) values (:id, :name)" _, err = executorExec(executor, sql, map[string]*querypb.BindVariable{ @@ -91,8 +91,8 @@ func TestQueryzHandler(t *testing.T) { require.NoError(t, err) - plan3.ExecTime = 100 * time.Millisecond - plan4.ExecTime = 200 * time.Millisecond + plan3.ExecTime = uint64(100 * time.Millisecond) + plan4.ExecTime = uint64(200 * time.Millisecond) queryzHandler(executor, resp, req) body, _ := ioutil.ReadAll(resp.Body) diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go index 25d466f1c82..a06a1949eab 100644 --- a/go/vt/vtgate/resolver.go +++ b/go/vt/vtgate/resolver.go @@ -74,7 +74,7 @@ func (res *Resolver) Execute( return nil, err } if logStats != nil { - logStats.ShardQueries = uint32(len(rss)) + logStats.ShardQueries = uint64(len(rss)) } autocommit := len(rss) == 1 && canAutocommit && session.AutocommitApproval() diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index de023c26fe1..a2560932ed7 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -402,7 +402,7 @@ func (vc *vcursorImpl) Execute(method string, query string, bindVars map[string] // ExecuteMultiShard is part of the engine.VCursor interface. func (vc *vcursorImpl) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, autocommit bool) (*sqltypes.Result, []error) { - atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(queries))) + atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(queries))) qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, autocommit, vc.ignoreMaxMemoryRows) if errs == nil && rollbackOnError { @@ -457,13 +457,13 @@ func (vc *vcursorImpl) ExecuteStandalone(query string, bindVars map[string]*quer // StreamExeculteMulti is the streaming version of ExecuteMultiShard. func (vc *vcursorImpl) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error { - atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(rss))) + atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(rss))) return vc.executor.StreamExecuteMulti(vc.ctx, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.safeSession.Options, callback) } // ExecuteKeyspaceID is part of the engine.VCursor interface. func (vc *vcursorImpl) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) { - atomic.AddUint32(&vc.logStats.ShardQueries, 1) + atomic.AddUint64(&vc.logStats.ShardQueries, 1) rss, _, err := vc.ResolveDestinations(keyspace, nil, []key.Destination{key.DestinationKeyspaceID(ksid)}) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletserver/cached_size.go b/go/vt/vttablet/tabletserver/cached_size.go index eadc6dee317..7c803f4fe1e 100644 --- a/go/vt/vttablet/tabletserver/cached_size.go +++ b/go/vt/vttablet/tabletserver/cached_size.go @@ -23,10 +23,12 @@ func (cached *TabletPlan) CachedSize(alloc bool) int64 { } size := int64(0) if alloc { - size += int64(112) + size += int64(128) } // field Plan *vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder.Plan size += cached.Plan.CachedSize(true) + // field Original string + size += int64(len(cached.Original)) // field Fields []*vitess.io/vitess/go/vt/proto/query.Field { size += int64(cap(cached.Fields)) * int64(8) diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 67b6d4c68da..94f37090d47 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "sync" + "sync/atomic" "time" "context" @@ -60,37 +61,32 @@ type TabletPlan struct { Rules *rules.Rules Authorized []*tableacl.ACLResult - mu sync.Mutex - QueryCount int64 - Time time.Duration - MysqlTime time.Duration - RowsAffected int64 - RowsReturned int64 - ErrorCount int64 + QueryCount uint64 + Time uint64 + MysqlTime uint64 + RowsAffected uint64 + RowsReturned uint64 + ErrorCount uint64 } // AddStats updates the stats for the current TabletPlan. -func (ep *TabletPlan) AddStats(queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) { - ep.mu.Lock() - ep.QueryCount += queryCount - ep.Time += duration - ep.MysqlTime += mysqlTime - ep.RowsAffected += rowsAffected - ep.RowsReturned += rowsReturned - ep.ErrorCount += errorCount - ep.mu.Unlock() +func (ep *TabletPlan) AddStats(queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) { + atomic.AddUint64(&ep.QueryCount, queryCount) + atomic.AddUint64(&ep.Time, uint64(duration)) + atomic.AddUint64(&ep.MysqlTime, uint64(mysqlTime)) + atomic.AddUint64(&ep.RowsAffected, rowsAffected) + atomic.AddUint64(&ep.RowsReturned, rowsReturned) + atomic.AddUint64(&ep.ErrorCount, errorCount) } // Stats returns the current stats of TabletPlan. -func (ep *TabletPlan) Stats() (queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) { - ep.mu.Lock() - queryCount = ep.QueryCount - duration = ep.Time - mysqlTime = ep.MysqlTime - rowsAffected = ep.RowsAffected - rowsReturned = ep.RowsReturned - errorCount = ep.ErrorCount - ep.mu.Unlock() +func (ep *TabletPlan) Stats() (queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) { + queryCount = atomic.LoadUint64(&ep.QueryCount) + duration = time.Duration(atomic.LoadUint64(&ep.Time)) + mysqlTime = time.Duration(atomic.LoadUint64(&ep.MysqlTime)) + rowsAffected = atomic.LoadUint64(&ep.RowsAffected) + rowsReturned = atomic.LoadUint64(&ep.RowsReturned) + errorCount = atomic.LoadUint64(&ep.ErrorCount) return } @@ -431,12 +427,12 @@ type perQueryStats struct { Query string Table string Plan planbuilder.PlanType - QueryCount int64 + QueryCount uint64 Time time.Duration MysqlTime time.Duration - RowsAffected int64 - RowsReturned int64 - ErrorCount int64 + RowsAffected uint64 + RowsReturned uint64 + ErrorCount uint64 } func (qe *QueryEngine) handleHTTPQueryPlans(response http.ResponseWriter, request *http.Request) { diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index a168311591a..b289c18545e 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -87,7 +87,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { return } qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, int64(reply.RowsAffected), 0) - qre.plan.AddStats(1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0) + qre.plan.AddStats(1, duration, mysqlTime, reply.RowsAffected, uint64(len(reply.Rows)), 0) qre.logStats.RowsAffected = int(reply.RowsAffected) qre.logStats.Rows = reply.Rows qre.tsv.Stats().ResultHistogram.Add(int64(len(reply.Rows))) diff --git a/go/vt/vttablet/tabletserver/queryz.go b/go/vt/vttablet/tabletserver/queryz.go index f44816f8f14..a564cc3a420 100644 --- a/go/vt/vttablet/tabletserver/queryz.go +++ b/go/vt/vttablet/tabletserver/queryz.go @@ -76,12 +76,12 @@ type queryzRow struct { Query string Table string Plan planbuilder.PlanType - Count int64 + Count uint64 tm time.Duration mysqlTime time.Duration - RowsAffected int64 - RowsReturned int64 - Errors int64 + RowsAffected uint64 + RowsReturned uint64 + Errors uint64 Color string } @@ -164,7 +164,7 @@ func queryzHandler(qe *QueryEngine, w http.ResponseWriter, r *http.Request) { Value.Count, Value.tm, Value.mysqlTime, Value.RowsAffected, Value.RowsReturned, Value.Errors = plan.Stats() var timepq time.Duration if Value.Count != 0 { - timepq = time.Duration(int64(Value.tm) / Value.Count) + timepq = Value.tm / time.Duration(Value.Count) } if timepq < 10*time.Millisecond { Value.Color = "low"