Skip to content

Commit

Permalink
executor: add auto id allocator execution runtime stats (#28013)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Oct 8, 2021
1 parent b63bc2b commit 267a3b8
Show file tree
Hide file tree
Showing 21 changed files with 515 additions and 88 deletions.
7 changes: 4 additions & 3 deletions br/pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"bytes"
"context"
"fmt"
"math"
"sort"
Expand Down Expand Up @@ -350,11 +351,11 @@ func (kvcodec *tableKVEncoder) AddRecord(
incrementalBits--
}
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType)
_ = alloc.Rebase(value.GetInt64()&((1<<incrementalBits)-1), false)
_ = alloc.Rebase(context.Background(), value.GetInt64()&((1<<incrementalBits)-1), false)
}
if isAutoIncCol {
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.RowIDAllocType)
_ = alloc.Rebase(getAutoRecordID(value, &col.FieldType), false)
_ = alloc.Rebase(context.Background(), getAutoRecordID(value, &col.FieldType), false)
}
}

Expand All @@ -370,7 +371,7 @@ func (kvcodec *tableKVEncoder) AddRecord(
}
record = append(record, value)
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.RowIDAllocType)
_ = alloc.Rebase(value.GetInt64(), false)
_ = alloc.Rebase(context.Background(), value.GetInt64(), false)
}
_, err = kvcodec.tbl.AddRecord(kvcodec.se, record)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kv

import (
"context"
"sync/atomic"

"github.com/pingcap/tidb/meta/autoid"
Expand All @@ -40,7 +41,7 @@ func NewPanickingAllocators(base int64) autoid.Allocators {
}

// Rebase implements the autoid.Allocator interface
func (alloc *panickingAllocator) Rebase(newBase int64, allocIDs bool) error {
func (alloc *panickingAllocator) Rebase(ctx context.Context, newBase int64, allocIDs bool) error {
// CAS
for {
oldBase := atomic.LoadInt64(alloc.base)
Expand Down
7 changes: 4 additions & 3 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kv

import (
"context"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -376,13 +377,13 @@ func (kvcodec *tableKVEncoder) Encode(
if isAutoRandom && isPk {
incrementalBits := autoRandomIncrementBits(col, int(meta.AutoRandomBits))
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType)
if err := alloc.Rebase(value.GetInt64()&((1<<incrementalBits)-1), false); err != nil {
if err := alloc.Rebase(context.Background(), value.GetInt64()&((1<<incrementalBits)-1), false); err != nil {
return nil, errors.Trace(err)
}
}
if isAutoIncCol {
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoIncrementType)
if err := alloc.Rebase(getAutoRecordID(value, &col.FieldType), false); err != nil {
if err := alloc.Rebase(context.Background(), getAutoRecordID(value, &col.FieldType), false); err != nil {
return nil, errors.Trace(err)
}
}
Expand All @@ -403,7 +404,7 @@ func (kvcodec *tableKVEncoder) Encode(
}
record = append(record, value)
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.RowIDAllocType)
if err := alloc.Rebase(rowValue, false); err != nil {
if err := alloc.Rebase(context.Background(), rowValue, false); err != nil {
return nil, errors.Trace(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1500,12 +1500,12 @@ func (tr *TableRestore) restoreTable(
// rebase the allocator so it exceeds the number of rows.
if tr.tableInfo.Core.PKIsHandle && tr.tableInfo.Core.ContainsAutoRandomBits() {
cp.AllocBase = mathutil.MaxInt64(cp.AllocBase, tr.tableInfo.Core.AutoRandID)
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(cp.AllocBase, false); err != nil {
if err := tr.alloc.Get(autoid.AutoRandomType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
return false, err
}
} else {
cp.AllocBase = mathutil.MaxInt64(cp.AllocBase, tr.tableInfo.Core.AutoIncID)
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(cp.AllocBase, false); err != nil {
if err := tr.alloc.Get(autoid.RowIDAllocType).Rebase(context.Background(), cp.AllocBase, false); err != nil {
return false, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,7 @@ func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,
if err != nil {
return errors.Trace(err)
}
err = autoRandAlloc.Rebase(nextAutoIncID, false)
err = autoRandAlloc.Rebase(context.Background(), nextAutoIncID, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2283,7 +2283,7 @@ func checkCharsetAndCollation(cs string, co string) error {
func (d *ddl) handleAutoIncID(tbInfo *model.TableInfo, schemaID int64, newEnd int64, tp autoid.AllocatorType) error {
allocs := autoid.NewAllocatorsFromTblInfo(d.store, schemaID, tbInfo)
if alloc := allocs.Get(tp); alloc != nil {
err := alloc.Rebase(newEnd, false)
err := alloc.Rebase(context.Background(), newEnd, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func onRebaseAutoID(store kv.Storage, t *meta.Meta, job *model.Job, tp autoid.Al
if force {
err = alloc.ForceRebase(newEnd)
} else {
err = alloc.Rebase(newEnd, false)
err = alloc.Rebase(context.Background(), newEnd, false)
}
if err != nil {
job.State = model.JobStateCancelled
Expand Down
2 changes: 1 addition & 1 deletion executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ func (s *testSuite8) TestShardRowIDBits(c *C) {
c.Assert(err, IsNil)
maxID := 1<<(64-15-1) - 1
alloc := tbl.Allocators(tk.Se).Get(autoid.RowIDAllocType)
err = alloc.Rebase(int64(maxID)-1, false)
err = alloc.Rebase(context.Background(), int64(maxID)-1, false)
c.Assert(err, IsNil)
tk.MustExec("insert into t1 values(1)")

Expand Down
5 changes: 5 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -302,6 +303,10 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// Next implements the Executor Next interface.
func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.collectRuntimeStatsEnabled() {
ctx = context.WithValue(ctx, autoid.AllocatorRuntimeStatsCtxKey, e.stats.AllocatorRuntimeStats)
}

if len(e.children) > 0 && e.children[0] != nil {
return insertRowsFromSelect(ctx, e)
}
Expand Down
65 changes: 50 additions & 15 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
}
// Use the value if it's not null and not 0.
if recordID != 0 {
err = e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType).Rebase(recordID, true)
err = e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType).Rebase(ctx, recordID, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -801,7 +801,7 @@ func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Dat
}
// Use the value if it's not null and not 0.
if recordID != 0 {
err = e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType).Rebase(recordID, true)
err = e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType).Rebase(ctx, recordID, true)
if err != nil {
return types.Datum{}, err
}
Expand Down Expand Up @@ -877,7 +877,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum,
if !e.ctx.GetSessionVars().AllowAutoRandExplicitInsert {
return types.Datum{}, ddl.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomExplicitInsertDisabledErrMsg)
}
err = e.rebaseAutoRandomID(recordID, &c.FieldType)
err = e.rebaseAutoRandomID(ctx, recordID, &c.FieldType)
if err != nil {
return types.Datum{}, err
}
Expand Down Expand Up @@ -936,7 +936,7 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F
return autoRandomID, nil
}

func (e *InsertValues) rebaseAutoRandomID(recordID int64, fieldType *types.FieldType) error {
func (e *InsertValues) rebaseAutoRandomID(ctx context.Context, recordID int64, fieldType *types.FieldType) error {
if recordID < 0 {
return nil
}
Expand All @@ -946,7 +946,7 @@ func (e *InsertValues) rebaseAutoRandomID(recordID int64, fieldType *types.Field
layout := autoid.NewShardIDLayout(fieldType, tableInfo.AutoRandomBits)
autoRandomID := layout.IncrementalMask() & recordID

return alloc.Rebase(autoRandomID, true)
return alloc.Rebase(ctx, autoRandomID, true)
}

func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
Expand All @@ -963,7 +963,7 @@ func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, h
if !e.ctx.GetSessionVars().AllowWriteRowID {
return types.Datum{}, errors.Errorf("insert, update and replace statements for _tidb_rowid are not supported.")
}
err = e.rebaseImplicitRowID(recordID)
err = e.rebaseImplicitRowID(ctx, recordID)
if err != nil {
return types.Datum{}, err
}
Expand All @@ -990,7 +990,7 @@ func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, h
return d, nil
}

func (e *InsertValues) rebaseImplicitRowID(recordID int64) error {
func (e *InsertValues) rebaseImplicitRowID(ctx context.Context, recordID int64) error {
if recordID < 0 {
return nil
}
Expand All @@ -1000,7 +1000,7 @@ func (e *InsertValues) rebaseImplicitRowID(recordID int64) error {
layout := autoid.NewShardIDLayout(types.NewFieldType(mysql.TypeLonglong), tableInfo.ShardRowIDBits)
newTiDBRowIDBase := layout.IncrementalMask() & recordID

return alloc.Rebase(newTiDBRowIDBase, true)
return alloc.Rebase(ctx, newTiDBRowIDBase, true)
}

func (e *InsertValues) handleWarning(err error) {
Expand All @@ -1013,10 +1013,9 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.stats == nil {
snapshotStats := &txnsnapshot.SnapshotRuntimeStats{}
e.stats = &InsertRuntimeStat{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
Prefetch: 0,
CheckInsertTime: 0,
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
AllocatorRuntimeStats: autoid.NewAllocatorRuntimeStats(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
Expand Down Expand Up @@ -1140,20 +1139,46 @@ func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.
type InsertRuntimeStat struct {
*execdetails.BasicRuntimeStats
*txnsnapshot.SnapshotRuntimeStats
*autoid.AllocatorRuntimeStats
CheckInsertTime time.Duration
Prefetch time.Duration
}

func (e *InsertRuntimeStat) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 32))
var allocatorStatsStr string
if e.AllocatorRuntimeStats != nil {
allocatorStatsStr = e.AllocatorRuntimeStats.String()
}
if e.CheckInsertTime == 0 {
// For replace statement.
if allocatorStatsStr != "" {
buf.WriteString(allocatorStatsStr)
}
if e.Prefetch > 0 && e.SnapshotRuntimeStats != nil {
return fmt.Sprintf("prefetch: %v, rpc:{%v}", execdetails.FormatDuration(e.Prefetch), e.SnapshotRuntimeStats.String())
if buf.Len() > 0 {
buf.WriteString(", ")
}
buf.WriteString("prefetch: ")
buf.WriteString(execdetails.FormatDuration(e.Prefetch))
buf.WriteString(", rpc: {")
buf.WriteString(e.SnapshotRuntimeStats.String())
buf.WriteString("}")
return buf.String()
}
return ""
}
buf := bytes.NewBuffer(make([]byte, 0, 32))
buf.WriteString(fmt.Sprintf("prepare:%v, ", execdetails.FormatDuration(time.Duration(e.BasicRuntimeStats.GetTime())-e.CheckInsertTime)))
if allocatorStatsStr != "" {
buf.WriteString("prepare: {total: ")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.BasicRuntimeStats.GetTime()) - e.CheckInsertTime))
buf.WriteString(", ")
buf.WriteString(allocatorStatsStr)
buf.WriteString("}, ")
} else {
buf.WriteString("prepare: ")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.BasicRuntimeStats.GetTime()) - e.CheckInsertTime))
buf.WriteString(", ")
}
if e.Prefetch > 0 {
buf.WriteString(fmt.Sprintf("check_insert: {total_time: %v, mem_insert_time: %v, prefetch: %v",
execdetails.FormatDuration(e.CheckInsertTime),
Expand Down Expand Up @@ -1185,6 +1210,9 @@ func (e *InsertRuntimeStat) Clone() execdetails.RuntimeStats {
basicStats := e.BasicRuntimeStats.Clone()
newRs.BasicRuntimeStats = basicStats.(*execdetails.BasicRuntimeStats)
}
if e.AllocatorRuntimeStats != nil {
newRs.AllocatorRuntimeStats = e.AllocatorRuntimeStats.Clone()
}
return newRs
}

Expand All @@ -1210,6 +1238,13 @@ func (e *InsertRuntimeStat) Merge(other execdetails.RuntimeStats) {
e.BasicRuntimeStats.Merge(tmp.BasicRuntimeStats)
}
}
if tmp.AllocatorRuntimeStats != nil {
if e.AllocatorRuntimeStats == nil {
e.AllocatorRuntimeStats = tmp.AllocatorRuntimeStats.Clone()
} else {
e.AllocatorRuntimeStats.Merge(tmp.AllocatorRuntimeStats)
}
}
e.Prefetch += tmp.Prefetch
e.CheckInsertTime += tmp.CheckInsertTime
}
Expand Down
4 changes: 2 additions & 2 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,10 +1460,10 @@ func (s *testSuite10) TestInsertRuntimeStat(c *C) {
Prefetch: 1 * time.Second,
}
stats.BasicRuntimeStats.Record(5*time.Second, 1)
c.Assert(stats.String(), Equals, "prepare:3s, check_insert: {total_time: 2s, mem_insert_time: 1s, prefetch: 1s}")
c.Assert(stats.String(), Equals, "prepare: 3s, check_insert: {total_time: 2s, mem_insert_time: 1s, prefetch: 1s}")
c.Assert(stats.String(), Equals, stats.Clone().String())
stats.Merge(stats.Clone())
c.Assert(stats.String(), Equals, "prepare:6s, check_insert: {total_time: 4s, mem_insert_time: 2s, prefetch: 2s}")
c.Assert(stats.String(), Equals, "prepare: 6s, check_insert: {total_time: 4s, mem_insert_time: 2s, prefetch: 2s}")
}

func (s *testSerialSuite) TestDuplicateEntryMessage(c *C) {
Expand Down
5 changes: 5 additions & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -250,6 +251,10 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
// Next implements the Executor Next interface.
func (e *ReplaceExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.collectRuntimeStatsEnabled() {
ctx = context.WithValue(ctx, autoid.AllocatorRuntimeStatsCtxKey, e.stats.AllocatorRuntimeStats)
}

if len(e.children) > 0 && e.children[0] != nil {
return insertRowsFromSelect(ctx, e)
}
Expand Down
Loading

0 comments on commit 267a3b8

Please sign in to comment.