Skip to content

Commit

Permalink
plan: remove mutexes
Browse files Browse the repository at this point in the history
The mutexes that were guarding the 'stats' section of plans in vtgate
and vttablet were not being properly obeyed. A lot of the code was
accessing the stats without acquiring the mutex first, mostly because
there is no global consistency requirement between all the fields of the
stats. Because of this, remove the Mutexes from both plans and switch to
using Atomic operations to handle the stats fields. This reduces the
sizes of the plans on cache and improves the accuracy of our memory
usage metrics, since we do not have accurate size calculations for mutex
objects, as they're foreign to the project.

Signed-off-by: Vicent Marti <[email protected]>
  • Loading branch information
vmg committed Feb 8, 2021
1 parent 01a2c52 commit 2ec7e14
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 92 deletions.
55 changes: 25 additions & 30 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package engine

import (
"encoding/json"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -154,13 +154,12 @@ type (
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting

mu sync.Mutex // Mutex to protect the fields below
ExecCount uint64 // Count of times this plan was executed
ExecTime time.Duration // Total execution time
ShardQueries uint64 // Total number of shard queries
RowsReturned uint64 // Total number of rows
RowsAffected uint64 // Total number of rows
Errors uint64 // Total number of errors
ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
ShardQueries uint64 // Total number of shard queries
RowsReturned uint64 // Total number of rows
RowsAffected uint64 // Total number of rows
Errors uint64 // Total number of errors
}

// Match is used to check if a Primitive matches
Expand Down Expand Up @@ -199,26 +198,22 @@ type (

// AddStats updates the plan execution statistics
func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
p.mu.Lock()
p.ExecCount += execCount
p.ExecTime += execTime
p.ShardQueries += shardQueries
p.RowsAffected += rowsAffected
p.RowsReturned += rowsReturned
p.Errors += errors
p.mu.Unlock()
atomic.AddUint64(&p.ExecCount, execCount)
atomic.AddUint64(&p.ExecTime, uint64(execTime))
atomic.AddUint64(&p.ShardQueries, shardQueries)
atomic.AddUint64(&p.RowsAffected, rowsAffected)
atomic.AddUint64(&p.RowsReturned, rowsReturned)
atomic.AddUint64(&p.Errors, errors)
}

// Stats returns a copy of the plan execution statistics
func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
p.mu.Lock()
execCount = p.ExecCount
execTime = p.ExecTime
shardQueries = p.ShardQueries
rowsAffected = p.RowsAffected
rowsReturned = p.RowsReturned
errors = p.Errors
p.mu.Unlock()
execCount = atomic.LoadUint64(&p.ExecCount)
execTime = time.Duration(atomic.LoadUint64(&p.ExecTime))
shardQueries = atomic.LoadUint64(&p.ShardQueries)
rowsAffected = atomic.LoadUint64(&p.RowsAffected)
rowsReturned = atomic.LoadUint64(&p.RowsReturned)
errors = atomic.LoadUint64(&p.Errors)
return
}

Expand Down Expand Up @@ -263,12 +258,12 @@ func (p *Plan) MarshalJSON() ([]byte, error) {
QueryType: p.Type.String(),
Original: p.Original,
Instructions: instructions,
ExecCount: p.ExecCount,
ExecTime: p.ExecTime,
ShardQueries: p.ShardQueries,
RowsAffected: p.RowsAffected,
RowsReturned: p.RowsReturned,
Errors: p.Errors,
ExecCount: atomic.LoadUint64(&p.ExecCount),
ExecTime: time.Duration(atomic.LoadUint64(&p.ExecTime)),
ShardQueries: atomic.LoadUint64(&p.ShardQueries),
RowsAffected: atomic.LoadUint64(&p.RowsAffected),
RowsReturned: atomic.LoadUint64(&p.RowsReturned),
Errors: atomic.LoadUint64(&p.Errors),
}
return json.Marshal(marshalPlan)
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, lo
func (e *Executor) handleCommit(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
logStats.ShardQueries = uint64(len(safeSession.ShardSessions))
e.updateQueryCounts("Commit", "", "", int64(logStats.ShardQueries))

err := e.txConn.Commit(ctx, safeSession)
Expand All @@ -388,7 +388,7 @@ func (e *Executor) Commit(ctx context.Context, safeSession *SafeSession) error {
func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
logStats.ShardQueries = uint64(len(safeSession.ShardSessions))
e.updateQueryCounts("Rollback", "", "", int64(logStats.ShardQueries))
err := e.txConn.Rollback(ctx, safeSession)
logStats.CommitTime = time.Since(execStart)
Expand All @@ -398,7 +398,7 @@ func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession,
func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession, sql string, planType string, logStats *LogStats, nonTxResponse func(query string) (*sqltypes.Result, error), ignoreMaxMemoryRows bool) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
logStats.ShardQueries = uint64(len(safeSession.ShardSessions))
e.updateQueryCounts(planType, "", "", int64(logStats.ShardQueries))
defer func() {
logStats.ExecuteTime = time.Since(execStart)
Expand Down Expand Up @@ -1622,7 +1622,7 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession,
}
logStats.RowsAffected = qr.RowsAffected

plan.AddStats(1, time.Since(logStats.StartTime), uint64(logStats.ShardQueries), qr.RowsAffected, uint64(len(qr.Rows)), errCount)
plan.AddStats(1, time.Since(logStats.StartTime), logStats.ShardQueries, qr.RowsAffected, uint64(len(qr.Rows)), errCount)

return qr.Fields, err
}
Expand Down
30 changes: 17 additions & 13 deletions go/vt/vtgate/executor_scatter_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"html/template"
"net/http"
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/logz"
Expand Down Expand Up @@ -74,16 +75,16 @@ func (e *Executor) gatherScatterStats() (statsResults, error) {
}
plans = append(plans, plan)
routes = append(routes, route)
scatterExecTime += plan.ExecTime
scatterCount += plan.ExecCount
scatterExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime))
scatterCount += atomic.LoadUint64(&plan.ExecCount)
}
if readOnly {
readOnlyTime += plan.ExecTime
readOnlyCount += plan.ExecCount
readOnlyTime += time.Duration(atomic.LoadUint64(&plan.ExecTime))
readOnlyCount += atomic.LoadUint64(&plan.ExecCount)
}

totalExecTime += plan.ExecTime
totalCount += plan.ExecCount
totalExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime))
totalCount += atomic.LoadUint64(&plan.ExecCount)
return true
})
if err != nil {
Expand All @@ -94,19 +95,22 @@ func (e *Executor) gatherScatterStats() (statsResults, error) {
resultItems := make([]*statsResultItem, len(plans))
for i, plan := range plans {
route := routes[i]
execCount := atomic.LoadUint64(&plan.ExecCount)
execTime := time.Duration(atomic.LoadUint64(&plan.ExecTime))

var avgTimePerQuery int64
if plan.ExecCount != 0 {
avgTimePerQuery = plan.ExecTime.Nanoseconds() / int64(plan.ExecCount)
if execCount != 0 {
avgTimePerQuery = execTime.Nanoseconds() / int64(execCount)
}
resultItems[i] = &statsResultItem{
Query: plan.Original,
AvgTimePerQuery: time.Duration(avgTimePerQuery),
PercentTimeOfReads: 100 * float64(plan.ExecTime) / float64(readOnlyTime),
PercentTimeOfScatters: 100 * float64(plan.ExecTime) / float64(scatterExecTime),
PercentCountOfReads: 100 * float64(plan.ExecCount) / float64(readOnlyCount),
PercentCountOfScatters: 100 * float64(plan.ExecCount) / float64(scatterCount),
PercentTimeOfReads: 100 * float64(execTime) / float64(readOnlyTime),
PercentTimeOfScatters: 100 * float64(execTime) / float64(scatterExecTime),
PercentCountOfReads: 100 * float64(execCount) / float64(readOnlyCount),
PercentCountOfScatters: 100 * float64(execCount) / float64(scatterCount),
From: route.Keyspace.Name + "." + route.TableName,
Count: plan.ExecCount,
Count: execCount,
}
}
result := statsResults{
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/logstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type LogStats struct {
BindVariables map[string]*querypb.BindVariable
StartTime time.Time
EndTime time.Time
ShardQueries uint32
ShardQueries uint64
RowsAffected uint64
RowsReturned uint64
PlanTime time.Duration
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/queryz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestQueryzHandler(t *testing.T) {
t.Fatalf("couldn't get plan from cache")
}
plan1 := result.(*engine.Plan)
plan1.ExecTime = 1 * time.Millisecond
plan1.ExecTime = uint64(1 * time.Millisecond)

// scatter
sql = "select id from user"
Expand All @@ -61,7 +61,7 @@ func TestQueryzHandler(t *testing.T) {
t.Fatalf("couldn't get plan from cache")
}
plan2 := result.(*engine.Plan)
plan2.ExecTime = 1 * time.Second
plan2.ExecTime = uint64(1 * time.Second)

sql = "insert into user (id, name) values (:id, :name)"
_, err = executorExec(executor, sql, map[string]*querypb.BindVariable{
Expand Down Expand Up @@ -91,8 +91,8 @@ func TestQueryzHandler(t *testing.T) {

require.NoError(t, err)

plan3.ExecTime = 100 * time.Millisecond
plan4.ExecTime = 200 * time.Millisecond
plan3.ExecTime = uint64(100 * time.Millisecond)
plan4.ExecTime = uint64(200 * time.Millisecond)

queryzHandler(executor, resp, req)
body, _ := ioutil.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (res *Resolver) Execute(
return nil, err
}
if logStats != nil {
logStats.ShardQueries = uint32(len(rss))
logStats.ShardQueries = uint64(len(rss))
}

autocommit := len(rss) == 1 && canAutocommit && session.AutocommitApproval()
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (vc *vcursorImpl) Execute(method string, query string, bindVars map[string]

// ExecuteMultiShard is part of the engine.VCursor interface.
func (vc *vcursorImpl) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, autocommit bool) (*sqltypes.Result, []error) {
atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(queries)))
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(queries)))
qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, autocommit, vc.ignoreMaxMemoryRows)

if errs == nil && rollbackOnError {
Expand Down Expand Up @@ -457,13 +457,13 @@ func (vc *vcursorImpl) ExecuteStandalone(query string, bindVars map[string]*quer

// StreamExeculteMulti is the streaming version of ExecuteMultiShard.
func (vc *vcursorImpl) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error {
atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(rss)))
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(rss)))
return vc.executor.StreamExecuteMulti(vc.ctx, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.safeSession.Options, callback)
}

// ExecuteKeyspaceID is part of the engine.VCursor interface.
func (vc *vcursorImpl) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) {
atomic.AddUint32(&vc.logStats.ShardQueries, 1)
atomic.AddUint64(&vc.logStats.ShardQueries, 1)
rss, _, err := vc.ResolveDestinations(keyspace, nil, []key.Destination{key.DestinationKeyspaceID(ksid)})
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 25 additions & 29 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"

"context"
Expand Down Expand Up @@ -60,37 +61,32 @@ type TabletPlan struct {
Rules *rules.Rules
Authorized []*tableacl.ACLResult

mu sync.Mutex
QueryCount int64
Time time.Duration
MysqlTime time.Duration
RowsAffected int64
RowsReturned int64
ErrorCount int64
QueryCount uint64
Time uint64
MysqlTime uint64
RowsAffected uint64
RowsReturned uint64
ErrorCount uint64
}

// AddStats updates the stats for the current TabletPlan.
func (ep *TabletPlan) AddStats(queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) {
ep.mu.Lock()
ep.QueryCount += queryCount
ep.Time += duration
ep.MysqlTime += mysqlTime
ep.RowsAffected += rowsAffected
ep.RowsReturned += rowsReturned
ep.ErrorCount += errorCount
ep.mu.Unlock()
func (ep *TabletPlan) AddStats(queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) {
atomic.AddUint64(&ep.QueryCount, queryCount)
atomic.AddUint64(&ep.Time, uint64(duration))
atomic.AddUint64(&ep.MysqlTime, uint64(mysqlTime))
atomic.AddUint64(&ep.RowsAffected, rowsAffected)
atomic.AddUint64(&ep.RowsReturned, rowsReturned)
atomic.AddUint64(&ep.ErrorCount, errorCount)
}

// Stats returns the current stats of TabletPlan.
func (ep *TabletPlan) Stats() (queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) {
ep.mu.Lock()
queryCount = ep.QueryCount
duration = ep.Time
mysqlTime = ep.MysqlTime
rowsAffected = ep.RowsAffected
rowsReturned = ep.RowsReturned
errorCount = ep.ErrorCount
ep.mu.Unlock()
func (ep *TabletPlan) Stats() (queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) {
queryCount = atomic.LoadUint64(&ep.QueryCount)
duration = time.Duration(atomic.LoadUint64(&ep.Time))
mysqlTime = time.Duration(atomic.LoadUint64(&ep.MysqlTime))
rowsAffected = atomic.LoadUint64(&ep.RowsAffected)
rowsReturned = atomic.LoadUint64(&ep.RowsReturned)
errorCount = atomic.LoadUint64(&ep.ErrorCount)
return
}

Expand Down Expand Up @@ -431,12 +427,12 @@ type perQueryStats struct {
Query string
Table string
Plan planbuilder.PlanType
QueryCount int64
QueryCount uint64
Time time.Duration
MysqlTime time.Duration
RowsAffected int64
RowsReturned int64
ErrorCount int64
RowsAffected uint64
RowsReturned uint64
ErrorCount uint64
}

func (qe *QueryEngine) handleHTTPQueryPlans(response http.ResponseWriter, request *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return
}
qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, int64(reply.RowsAffected), 0)
qre.plan.AddStats(1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0)
qre.plan.AddStats(1, duration, mysqlTime, reply.RowsAffected, uint64(len(reply.Rows)), 0)
qre.logStats.RowsAffected = int(reply.RowsAffected)
qre.logStats.Rows = reply.Rows
qre.tsv.Stats().ResultHistogram.Add(int64(len(reply.Rows)))
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletserver/queryz.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ type queryzRow struct {
Query string
Table string
Plan planbuilder.PlanType
Count int64
Count uint64
tm time.Duration
mysqlTime time.Duration
RowsAffected int64
RowsReturned int64
Errors int64
RowsAffected uint64
RowsReturned uint64
Errors uint64
Color string
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func queryzHandler(qe *QueryEngine, w http.ResponseWriter, r *http.Request) {
Value.Count, Value.tm, Value.mysqlTime, Value.RowsAffected, Value.RowsReturned, Value.Errors = plan.Stats()
var timepq time.Duration
if Value.Count != 0 {
timepq = time.Duration(int64(Value.tm) / Value.Count)
timepq = Value.tm / time.Duration(Value.Count)
}
if timepq < 10*time.Millisecond {
Value.Color = "low"
Expand Down

0 comments on commit 2ec7e14

Please sign in to comment.