Skip to content

Commit

Permalink
Merge pull request #7468 from vmg/vmg/plan-no-mutex
Browse files Browse the repository at this point in the history
plan: remove mutexes
  • Loading branch information
harshit-gangal authored Feb 9, 2021
2 parents c3e3e5b + 2ec7e14 commit 2002d79
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 92 deletions.
55 changes: 25 additions & 30 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package engine

import (
"encoding/json"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -154,13 +154,12 @@ type (
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting

mu sync.Mutex // Mutex to protect the fields below
ExecCount uint64 // Count of times this plan was executed
ExecTime time.Duration // Total execution time
ShardQueries uint64 // Total number of shard queries
RowsReturned uint64 // Total number of rows
RowsAffected uint64 // Total number of rows
Errors uint64 // Total number of errors
ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
ShardQueries uint64 // Total number of shard queries
RowsReturned uint64 // Total number of rows
RowsAffected uint64 // Total number of rows
Errors uint64 // Total number of errors
}

// Match is used to check if a Primitive matches
Expand Down Expand Up @@ -199,26 +198,22 @@ type (

// AddStats updates the plan execution statistics
func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
p.mu.Lock()
p.ExecCount += execCount
p.ExecTime += execTime
p.ShardQueries += shardQueries
p.RowsAffected += rowsAffected
p.RowsReturned += rowsReturned
p.Errors += errors
p.mu.Unlock()
atomic.AddUint64(&p.ExecCount, execCount)
atomic.AddUint64(&p.ExecTime, uint64(execTime))
atomic.AddUint64(&p.ShardQueries, shardQueries)
atomic.AddUint64(&p.RowsAffected, rowsAffected)
atomic.AddUint64(&p.RowsReturned, rowsReturned)
atomic.AddUint64(&p.Errors, errors)
}

// Stats returns a copy of the plan execution statistics
func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
p.mu.Lock()
execCount = p.ExecCount
execTime = p.ExecTime
shardQueries = p.ShardQueries
rowsAffected = p.RowsAffected
rowsReturned = p.RowsReturned
errors = p.Errors
p.mu.Unlock()
execCount = atomic.LoadUint64(&p.ExecCount)
execTime = time.Duration(atomic.LoadUint64(&p.ExecTime))
shardQueries = atomic.LoadUint64(&p.ShardQueries)
rowsAffected = atomic.LoadUint64(&p.RowsAffected)
rowsReturned = atomic.LoadUint64(&p.RowsReturned)
errors = atomic.LoadUint64(&p.Errors)
return
}

Expand Down Expand Up @@ -263,12 +258,12 @@ func (p *Plan) MarshalJSON() ([]byte, error) {
QueryType: p.Type.String(),
Original: p.Original,
Instructions: instructions,
ExecCount: p.ExecCount,
ExecTime: p.ExecTime,
ShardQueries: p.ShardQueries,
RowsAffected: p.RowsAffected,
RowsReturned: p.RowsReturned,
Errors: p.Errors,
ExecCount: atomic.LoadUint64(&p.ExecCount),
ExecTime: time.Duration(atomic.LoadUint64(&p.ExecTime)),
ShardQueries: atomic.LoadUint64(&p.ShardQueries),
RowsAffected: atomic.LoadUint64(&p.RowsAffected),
RowsReturned: atomic.LoadUint64(&p.RowsReturned),
Errors: atomic.LoadUint64(&p.Errors),
}
return json.Marshal(marshalPlan)
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, lo
func (e *Executor) handleCommit(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
logStats.ShardQueries = uint64(len(safeSession.ShardSessions))
e.updateQueryCounts("Commit", "", "", int64(logStats.ShardQueries))

err := e.txConn.Commit(ctx, safeSession)
Expand All @@ -388,7 +388,7 @@ func (e *Executor) Commit(ctx context.Context, safeSession *SafeSession) error {
func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
logStats.ShardQueries = uint64(len(safeSession.ShardSessions))
e.updateQueryCounts("Rollback", "", "", int64(logStats.ShardQueries))
err := e.txConn.Rollback(ctx, safeSession)
logStats.CommitTime = time.Since(execStart)
Expand All @@ -398,7 +398,7 @@ func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession,
func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession, sql string, planType string, logStats *LogStats, nonTxResponse func(query string) (*sqltypes.Result, error), ignoreMaxMemoryRows bool) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
logStats.ShardQueries = uint64(len(safeSession.ShardSessions))
e.updateQueryCounts(planType, "", "", int64(logStats.ShardQueries))
defer func() {
logStats.ExecuteTime = time.Since(execStart)
Expand Down Expand Up @@ -1622,7 +1622,7 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession,
}
logStats.RowsAffected = qr.RowsAffected

plan.AddStats(1, time.Since(logStats.StartTime), uint64(logStats.ShardQueries), qr.RowsAffected, uint64(len(qr.Rows)), errCount)
plan.AddStats(1, time.Since(logStats.StartTime), logStats.ShardQueries, qr.RowsAffected, uint64(len(qr.Rows)), errCount)

return qr.Fields, err
}
Expand Down
30 changes: 17 additions & 13 deletions go/vt/vtgate/executor_scatter_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"html/template"
"net/http"
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/logz"
Expand Down Expand Up @@ -74,16 +75,16 @@ func (e *Executor) gatherScatterStats() (statsResults, error) {
}
plans = append(plans, plan)
routes = append(routes, route)
scatterExecTime += plan.ExecTime
scatterCount += plan.ExecCount
scatterExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime))
scatterCount += atomic.LoadUint64(&plan.ExecCount)
}
if readOnly {
readOnlyTime += plan.ExecTime
readOnlyCount += plan.ExecCount
readOnlyTime += time.Duration(atomic.LoadUint64(&plan.ExecTime))
readOnlyCount += atomic.LoadUint64(&plan.ExecCount)
}

totalExecTime += plan.ExecTime
totalCount += plan.ExecCount
totalExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime))
totalCount += atomic.LoadUint64(&plan.ExecCount)
return true
})
if err != nil {
Expand All @@ -94,19 +95,22 @@ func (e *Executor) gatherScatterStats() (statsResults, error) {
resultItems := make([]*statsResultItem, len(plans))
for i, plan := range plans {
route := routes[i]
execCount := atomic.LoadUint64(&plan.ExecCount)
execTime := time.Duration(atomic.LoadUint64(&plan.ExecTime))

var avgTimePerQuery int64
if plan.ExecCount != 0 {
avgTimePerQuery = plan.ExecTime.Nanoseconds() / int64(plan.ExecCount)
if execCount != 0 {
avgTimePerQuery = execTime.Nanoseconds() / int64(execCount)
}
resultItems[i] = &statsResultItem{
Query: plan.Original,
AvgTimePerQuery: time.Duration(avgTimePerQuery),
PercentTimeOfReads: 100 * float64(plan.ExecTime) / float64(readOnlyTime),
PercentTimeOfScatters: 100 * float64(plan.ExecTime) / float64(scatterExecTime),
PercentCountOfReads: 100 * float64(plan.ExecCount) / float64(readOnlyCount),
PercentCountOfScatters: 100 * float64(plan.ExecCount) / float64(scatterCount),
PercentTimeOfReads: 100 * float64(execTime) / float64(readOnlyTime),
PercentTimeOfScatters: 100 * float64(execTime) / float64(scatterExecTime),
PercentCountOfReads: 100 * float64(execCount) / float64(readOnlyCount),
PercentCountOfScatters: 100 * float64(execCount) / float64(scatterCount),
From: route.Keyspace.Name + "." + route.TableName,
Count: plan.ExecCount,
Count: execCount,
}
}
result := statsResults{
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/logstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type LogStats struct {
BindVariables map[string]*querypb.BindVariable
StartTime time.Time
EndTime time.Time
ShardQueries uint32
ShardQueries uint64
RowsAffected uint64
RowsReturned uint64
PlanTime time.Duration
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/queryz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestQueryzHandler(t *testing.T) {
t.Fatalf("couldn't get plan from cache")
}
plan1 := result.(*engine.Plan)
plan1.ExecTime = 1 * time.Millisecond
plan1.ExecTime = uint64(1 * time.Millisecond)

// scatter
sql = "select id from user"
Expand All @@ -61,7 +61,7 @@ func TestQueryzHandler(t *testing.T) {
t.Fatalf("couldn't get plan from cache")
}
plan2 := result.(*engine.Plan)
plan2.ExecTime = 1 * time.Second
plan2.ExecTime = uint64(1 * time.Second)

sql = "insert into user (id, name) values (:id, :name)"
_, err = executorExec(executor, sql, map[string]*querypb.BindVariable{
Expand Down Expand Up @@ -91,8 +91,8 @@ func TestQueryzHandler(t *testing.T) {

require.NoError(t, err)

plan3.ExecTime = 100 * time.Millisecond
plan4.ExecTime = 200 * time.Millisecond
plan3.ExecTime = uint64(100 * time.Millisecond)
plan4.ExecTime = uint64(200 * time.Millisecond)

queryzHandler(executor, resp, req)
body, _ := ioutil.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (res *Resolver) Execute(
return nil, err
}
if logStats != nil {
logStats.ShardQueries = uint32(len(rss))
logStats.ShardQueries = uint64(len(rss))
}

autocommit := len(rss) == 1 && canAutocommit && session.AutocommitApproval()
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (vc *vcursorImpl) Execute(method string, query string, bindVars map[string]

// ExecuteMultiShard is part of the engine.VCursor interface.
func (vc *vcursorImpl) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, autocommit bool) (*sqltypes.Result, []error) {
atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(queries)))
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(queries)))
qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, autocommit, vc.ignoreMaxMemoryRows)

if errs == nil && rollbackOnError {
Expand Down Expand Up @@ -457,13 +457,13 @@ func (vc *vcursorImpl) ExecuteStandalone(query string, bindVars map[string]*quer

// StreamExeculteMulti is the streaming version of ExecuteMultiShard.
func (vc *vcursorImpl) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error {
atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(rss)))
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(rss)))
return vc.executor.StreamExecuteMulti(vc.ctx, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.safeSession.Options, callback)
}

// ExecuteKeyspaceID is part of the engine.VCursor interface.
func (vc *vcursorImpl) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) {
atomic.AddUint32(&vc.logStats.ShardQueries, 1)
atomic.AddUint64(&vc.logStats.ShardQueries, 1)
rss, _, err := vc.ResolveDestinations(keyspace, nil, []key.Destination{key.DestinationKeyspaceID(ksid)})
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 25 additions & 29 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"

"context"
Expand Down Expand Up @@ -60,37 +61,32 @@ type TabletPlan struct {
Rules *rules.Rules
Authorized []*tableacl.ACLResult

mu sync.Mutex
QueryCount int64
Time time.Duration
MysqlTime time.Duration
RowsAffected int64
RowsReturned int64
ErrorCount int64
QueryCount uint64
Time uint64
MysqlTime uint64
RowsAffected uint64
RowsReturned uint64
ErrorCount uint64
}

// AddStats updates the stats for the current TabletPlan.
func (ep *TabletPlan) AddStats(queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) {
ep.mu.Lock()
ep.QueryCount += queryCount
ep.Time += duration
ep.MysqlTime += mysqlTime
ep.RowsAffected += rowsAffected
ep.RowsReturned += rowsReturned
ep.ErrorCount += errorCount
ep.mu.Unlock()
func (ep *TabletPlan) AddStats(queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) {
atomic.AddUint64(&ep.QueryCount, queryCount)
atomic.AddUint64(&ep.Time, uint64(duration))
atomic.AddUint64(&ep.MysqlTime, uint64(mysqlTime))
atomic.AddUint64(&ep.RowsAffected, rowsAffected)
atomic.AddUint64(&ep.RowsReturned, rowsReturned)
atomic.AddUint64(&ep.ErrorCount, errorCount)
}

// Stats returns the current stats of TabletPlan.
func (ep *TabletPlan) Stats() (queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) {
ep.mu.Lock()
queryCount = ep.QueryCount
duration = ep.Time
mysqlTime = ep.MysqlTime
rowsAffected = ep.RowsAffected
rowsReturned = ep.RowsReturned
errorCount = ep.ErrorCount
ep.mu.Unlock()
func (ep *TabletPlan) Stats() (queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) {
queryCount = atomic.LoadUint64(&ep.QueryCount)
duration = time.Duration(atomic.LoadUint64(&ep.Time))
mysqlTime = time.Duration(atomic.LoadUint64(&ep.MysqlTime))
rowsAffected = atomic.LoadUint64(&ep.RowsAffected)
rowsReturned = atomic.LoadUint64(&ep.RowsReturned)
errorCount = atomic.LoadUint64(&ep.ErrorCount)
return
}

Expand Down Expand Up @@ -431,12 +427,12 @@ type perQueryStats struct {
Query string
Table string
Plan planbuilder.PlanType
QueryCount int64
QueryCount uint64
Time time.Duration
MysqlTime time.Duration
RowsAffected int64
RowsReturned int64
ErrorCount int64
RowsAffected uint64
RowsReturned uint64
ErrorCount uint64
}

func (qe *QueryEngine) handleHTTPQueryPlans(response http.ResponseWriter, request *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return
}
qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, int64(reply.RowsAffected), 0)
qre.plan.AddStats(1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0)
qre.plan.AddStats(1, duration, mysqlTime, reply.RowsAffected, uint64(len(reply.Rows)), 0)
qre.logStats.RowsAffected = int(reply.RowsAffected)
qre.logStats.Rows = reply.Rows
qre.tsv.Stats().ResultHistogram.Add(int64(len(reply.Rows)))
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletserver/queryz.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ type queryzRow struct {
Query string
Table string
Plan planbuilder.PlanType
Count int64
Count uint64
tm time.Duration
mysqlTime time.Duration
RowsAffected int64
RowsReturned int64
Errors int64
RowsAffected uint64
RowsReturned uint64
Errors uint64
Color string
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func queryzHandler(qe *QueryEngine, w http.ResponseWriter, r *http.Request) {
Value.Count, Value.tm, Value.mysqlTime, Value.RowsAffected, Value.RowsReturned, Value.Errors = plan.Stats()
var timepq time.Duration
if Value.Count != 0 {
timepq = time.Duration(int64(Value.tm) / Value.Count)
timepq = Value.tm / time.Duration(Value.Count)
}
if timepq < 10*time.Millisecond {
Value.Color = "low"
Expand Down

0 comments on commit 2002d79

Please sign in to comment.