diff --git a/br/pkg/kv/kv.go b/br/pkg/kv/kv.go index 229255b0d5867..cef7ff486c49f 100644 --- a/br/pkg/kv/kv.go +++ b/br/pkg/kv/kv.go @@ -16,6 +16,7 @@ package kv import ( "bytes" + "context" "fmt" "math" "sort" @@ -350,11 +351,11 @@ func (kvcodec *tableKVEncoder) AddRecord( incrementalBits-- } alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType) - _ = alloc.Rebase(value.GetInt64()&((1< 0 && e.children[0] != nil { return insertRowsFromSelect(ctx, e) } diff --git a/executor/insert_common.go b/executor/insert_common.go index 72ed1f51b584f..fb5ebc6c910e3 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -711,7 +711,7 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] } // Use the value if it's not null and not 0. if recordID != 0 { - err = e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType).Rebase(recordID, true) + err = e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType).Rebase(ctx, recordID, true) if err != nil { return nil, err } @@ -801,7 +801,7 @@ func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Dat } // Use the value if it's not null and not 0. if recordID != 0 { - err = e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType).Rebase(recordID, true) + err = e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType).Rebase(ctx, recordID, true) if err != nil { return types.Datum{}, err } @@ -877,7 +877,7 @@ func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, if !e.ctx.GetSessionVars().AllowAutoRandExplicitInsert { return types.Datum{}, ddl.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomExplicitInsertDisabledErrMsg) } - err = e.rebaseAutoRandomID(recordID, &c.FieldType) + err = e.rebaseAutoRandomID(ctx, recordID, &c.FieldType) if err != nil { return types.Datum{}, err } @@ -936,7 +936,7 @@ func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.F return autoRandomID, nil } -func (e *InsertValues) rebaseAutoRandomID(recordID int64, fieldType *types.FieldType) error { +func (e *InsertValues) rebaseAutoRandomID(ctx context.Context, recordID int64, fieldType *types.FieldType) error { if recordID < 0 { return nil } @@ -946,7 +946,7 @@ func (e *InsertValues) rebaseAutoRandomID(recordID int64, fieldType *types.Field layout := autoid.NewShardIDLayout(fieldType, tableInfo.AutoRandomBits) autoRandomID := layout.IncrementalMask() & recordID - return alloc.Rebase(autoRandomID, true) + return alloc.Rebase(ctx, autoRandomID, true) } func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { @@ -963,7 +963,7 @@ func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, h if !e.ctx.GetSessionVars().AllowWriteRowID { return types.Datum{}, errors.Errorf("insert, update and replace statements for _tidb_rowid are not supported.") } - err = e.rebaseImplicitRowID(recordID) + err = e.rebaseImplicitRowID(ctx, recordID) if err != nil { return types.Datum{}, err } @@ -990,7 +990,7 @@ func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, h return d, nil } -func (e *InsertValues) rebaseImplicitRowID(recordID int64) error { +func (e *InsertValues) rebaseImplicitRowID(ctx context.Context, recordID int64) error { if recordID < 0 { return nil } @@ -1000,7 +1000,7 @@ func (e *InsertValues) rebaseImplicitRowID(recordID int64) error { layout := autoid.NewShardIDLayout(types.NewFieldType(mysql.TypeLonglong), tableInfo.ShardRowIDBits) newTiDBRowIDBase := layout.IncrementalMask() & recordID - return alloc.Rebase(newTiDBRowIDBase, true) + return alloc.Rebase(ctx, newTiDBRowIDBase, true) } func (e *InsertValues) handleWarning(err error) { @@ -1013,10 +1013,9 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool { if e.stats == nil { snapshotStats := &txnsnapshot.SnapshotRuntimeStats{} e.stats = &InsertRuntimeStat{ - BasicRuntimeStats: e.runtimeStats, - SnapshotRuntimeStats: snapshotStats, - Prefetch: 0, - CheckInsertTime: 0, + BasicRuntimeStats: e.runtimeStats, + SnapshotRuntimeStats: snapshotStats, + AllocatorRuntimeStats: autoid.NewAllocatorRuntimeStats(), } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } @@ -1140,20 +1139,46 @@ func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types. type InsertRuntimeStat struct { *execdetails.BasicRuntimeStats *txnsnapshot.SnapshotRuntimeStats + *autoid.AllocatorRuntimeStats CheckInsertTime time.Duration Prefetch time.Duration } func (e *InsertRuntimeStat) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 32)) + var allocatorStatsStr string + if e.AllocatorRuntimeStats != nil { + allocatorStatsStr = e.AllocatorRuntimeStats.String() + } if e.CheckInsertTime == 0 { // For replace statement. + if allocatorStatsStr != "" { + buf.WriteString(allocatorStatsStr) + } if e.Prefetch > 0 && e.SnapshotRuntimeStats != nil { - return fmt.Sprintf("prefetch: %v, rpc:{%v}", execdetails.FormatDuration(e.Prefetch), e.SnapshotRuntimeStats.String()) + if buf.Len() > 0 { + buf.WriteString(", ") + } + buf.WriteString("prefetch: ") + buf.WriteString(execdetails.FormatDuration(e.Prefetch)) + buf.WriteString(", rpc: {") + buf.WriteString(e.SnapshotRuntimeStats.String()) + buf.WriteString("}") + return buf.String() } return "" } - buf := bytes.NewBuffer(make([]byte, 0, 32)) - buf.WriteString(fmt.Sprintf("prepare:%v, ", execdetails.FormatDuration(time.Duration(e.BasicRuntimeStats.GetTime())-e.CheckInsertTime))) + if allocatorStatsStr != "" { + buf.WriteString("prepare: {total: ") + buf.WriteString(execdetails.FormatDuration(time.Duration(e.BasicRuntimeStats.GetTime()) - e.CheckInsertTime)) + buf.WriteString(", ") + buf.WriteString(allocatorStatsStr) + buf.WriteString("}, ") + } else { + buf.WriteString("prepare: ") + buf.WriteString(execdetails.FormatDuration(time.Duration(e.BasicRuntimeStats.GetTime()) - e.CheckInsertTime)) + buf.WriteString(", ") + } if e.Prefetch > 0 { buf.WriteString(fmt.Sprintf("check_insert: {total_time: %v, mem_insert_time: %v, prefetch: %v", execdetails.FormatDuration(e.CheckInsertTime), @@ -1185,6 +1210,9 @@ func (e *InsertRuntimeStat) Clone() execdetails.RuntimeStats { basicStats := e.BasicRuntimeStats.Clone() newRs.BasicRuntimeStats = basicStats.(*execdetails.BasicRuntimeStats) } + if e.AllocatorRuntimeStats != nil { + newRs.AllocatorRuntimeStats = e.AllocatorRuntimeStats.Clone() + } return newRs } @@ -1210,6 +1238,13 @@ func (e *InsertRuntimeStat) Merge(other execdetails.RuntimeStats) { e.BasicRuntimeStats.Merge(tmp.BasicRuntimeStats) } } + if tmp.AllocatorRuntimeStats != nil { + if e.AllocatorRuntimeStats == nil { + e.AllocatorRuntimeStats = tmp.AllocatorRuntimeStats.Clone() + } else { + e.AllocatorRuntimeStats.Merge(tmp.AllocatorRuntimeStats) + } + } e.Prefetch += tmp.Prefetch e.CheckInsertTime += tmp.CheckInsertTime } diff --git a/executor/insert_test.go b/executor/insert_test.go index 23fd0ba99cba8..7de4702e4e755 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -1460,10 +1460,10 @@ func (s *testSuite10) TestInsertRuntimeStat(c *C) { Prefetch: 1 * time.Second, } stats.BasicRuntimeStats.Record(5*time.Second, 1) - c.Assert(stats.String(), Equals, "prepare:3s, check_insert: {total_time: 2s, mem_insert_time: 1s, prefetch: 1s}") + c.Assert(stats.String(), Equals, "prepare: 3s, check_insert: {total_time: 2s, mem_insert_time: 1s, prefetch: 1s}") c.Assert(stats.String(), Equals, stats.Clone().String()) stats.Merge(stats.Clone()) - c.Assert(stats.String(), Equals, "prepare:6s, check_insert: {total_time: 4s, mem_insert_time: 2s, prefetch: 2s}") + c.Assert(stats.String(), Equals, "prepare: 6s, check_insert: {total_time: 4s, mem_insert_time: 2s, prefetch: 2s}") } func (s *testSerialSuite) TestDuplicateEntryMessage(c *C) { diff --git a/executor/replace.go b/executor/replace.go index cf96ec99320bd..5531b55599579 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/parser/charset" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -250,6 +251,10 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { // Next implements the Executor Next interface. func (e *ReplaceExec) Next(ctx context.Context, req *chunk.Chunk) error { req.Reset() + if e.collectRuntimeStatsEnabled() { + ctx = context.WithValue(ctx, autoid.AllocatorRuntimeStatsCtxKey, e.stats.AllocatorRuntimeStats) + } + if len(e.children) > 0 && e.children[0] != nil { return insertRowsFromSelect(ctx, e) } diff --git a/executor/update.go b/executor/update.go index 5b946958302ab..06253479996c6 100644 --- a/executor/update.go +++ b/executor/update.go @@ -15,6 +15,7 @@ package executor import ( + "bytes" "context" "fmt" "runtime/trace" @@ -23,11 +24,13 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/memory" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" ) @@ -58,7 +61,7 @@ type UpdateExec struct { drained bool memTracker *memory.Tracker - stats *runtimeStatsWithSnapshot + stats *updateRuntimeStats handles []kv.Handle tableUpdatable []bool @@ -217,6 +220,9 @@ func (e *UpdateExec) unmatchedOuterRow(tblPos plannercore.TblColPosInfo, waitUpd func (e *UpdateExec) Next(ctx context.Context, req *chunk.Chunk) error { req.Reset() if !e.drained { + if e.collectRuntimeStatsEnabled() { + ctx = context.WithValue(ctx, autoid.AllocatorRuntimeStatsCtxKey, e.stats.AllocatorRuntimeStats) + } numRows, err := e.updateRows(ctx) if err != nil { return err @@ -414,7 +420,7 @@ func (e *UpdateExec) Close() error { e.setMessage() if e.runtimeStats != nil && e.stats != nil { txn, err := e.ctx.Txn(false) - if err == nil && txn.GetSnapshot() != nil { + if err == nil && txn.Valid() && txn.GetSnapshot() != nil { txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, nil) } } @@ -442,9 +448,9 @@ func (e *UpdateExec) setMessage() { func (e *UpdateExec) collectRuntimeStatsEnabled() bool { if e.runtimeStats != nil { if e.stats == nil { - snapshotStats := &txnsnapshot.SnapshotRuntimeStats{} - e.stats = &runtimeStatsWithSnapshot{ - SnapshotRuntimeStats: snapshotStats, + e.stats = &updateRuntimeStats{ + SnapshotRuntimeStats: &txnsnapshot.SnapshotRuntimeStats{}, + AllocatorRuntimeStats: autoid.NewAllocatorRuntimeStats(), } e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } @@ -452,3 +458,71 @@ func (e *UpdateExec) collectRuntimeStatsEnabled() bool { } return false } + +// updateRuntimeStats is the execution stats about update statements. +type updateRuntimeStats struct { + *txnsnapshot.SnapshotRuntimeStats + *autoid.AllocatorRuntimeStats +} + +func (e *updateRuntimeStats) String() string { + if e.SnapshotRuntimeStats == nil && e.AllocatorRuntimeStats == nil { + return "" + } + buf := bytes.NewBuffer(make([]byte, 0, 16)) + if e.SnapshotRuntimeStats != nil { + stats := e.SnapshotRuntimeStats.String() + if stats != "" { + buf.WriteString(stats) + } + } + if e.AllocatorRuntimeStats != nil { + stats := e.AllocatorRuntimeStats.String() + if stats != "" { + if buf.Len() > 0 { + buf.WriteString(", ") + } + buf.WriteString(stats) + } + } + return buf.String() +} + +// Clone implements the RuntimeStats interface. +func (e *updateRuntimeStats) Clone() execdetails.RuntimeStats { + newRs := &updateRuntimeStats{} + if e.SnapshotRuntimeStats != nil { + snapshotStats := e.SnapshotRuntimeStats.Clone() + newRs.SnapshotRuntimeStats = snapshotStats + } + if e.AllocatorRuntimeStats != nil { + newRs.AllocatorRuntimeStats = e.AllocatorRuntimeStats.Clone() + } + return newRs +} + +// Merge implements the RuntimeStats interface. +func (e *updateRuntimeStats) Merge(other execdetails.RuntimeStats) { + tmp, ok := other.(*updateRuntimeStats) + if !ok { + return + } + if tmp.SnapshotRuntimeStats != nil { + if e.SnapshotRuntimeStats == nil { + snapshotStats := tmp.SnapshotRuntimeStats.Clone() + e.SnapshotRuntimeStats = snapshotStats + } else { + e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats) + } + } + if tmp.AllocatorRuntimeStats != nil { + if e.AllocatorRuntimeStats == nil { + e.AllocatorRuntimeStats = tmp.AllocatorRuntimeStats.Clone() + } + } +} + +// Tp implements the RuntimeStats interface. +func (e *updateRuntimeStats) Tp() int { + return execdetails.TpUpdateRuntimeStats +} diff --git a/executor/write.go b/executor/write.go index 28925957bd764..835780c947214 100644 --- a/executor/write.go +++ b/executor/write.go @@ -111,14 +111,14 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old if err != nil { return false, err } - if err = t.Allocators(sctx).Get(autoid.RowIDAllocType).Rebase(recordID, true); err != nil { + if err = t.Allocators(sctx).Get(autoid.RowIDAllocType).Rebase(ctx, recordID, true); err != nil { return false, err } } if col.IsPKHandleColumn(t.Meta()) { handleChanged = true // Rebase auto random id if the field is changed. - if err := rebaseAutoRandomValue(sctx, t, &newData[i], col); err != nil { + if err := rebaseAutoRandomValue(ctx, sctx, t, &newData[i], col); err != nil { return false, err } } @@ -222,7 +222,7 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old return true, nil } -func rebaseAutoRandomValue(sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error { +func rebaseAutoRandomValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error { tableInfo := t.Meta() if !tableInfo.ContainsAutoRandomBits() { return nil @@ -237,7 +237,7 @@ func rebaseAutoRandomValue(sctx sessionctx.Context, t table.Table, newData *type layout := autoid.NewShardIDLayout(&col.FieldType, tableInfo.AutoRandomBits) // Set bits except incremental_bits to zero. recordID = recordID & (1< alloc.base { - if err := alloc.rebase4Signed(offset-1, true); err != nil { + if err := alloc.rebase4Signed(ctx, offset-1, true); err != nil { return 0, 0, err } } @@ -716,12 +747,26 @@ func (alloc *allocator) alloc4Signed(ctx context.Context, n uint64, increment, o consumeDur := startTime.Sub(alloc.lastAllocTime) nextStep = NextStep(alloc.step, consumeDur) } + + ctx, allocatorStats, commitDetail := getAllocatorStatsFromCtx(ctx) + if allocatorStats != nil { + allocatorStats.allocCount++ + defer func() { + if commitDetail != nil { + allocatorStats.mergeCommitDetail(*commitDetail) + } + }() + } + err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("alloc.alloc4Signed", opentracing.ChildOf(span.Context())) defer span1.Finish() opentracing.ContextWithSpan(ctx, span1) } + if allocatorStats != nil { + txn.SetOption(kv.CollectRuntimeStats, allocatorStats.SnapshotRuntimeStats) + } idAcc := alloc.getIDAccessor(txn) var err1 error @@ -770,7 +815,7 @@ func (alloc *allocator) alloc4Signed(ctx context.Context, n uint64, increment, o func (alloc *allocator) alloc4Unsigned(ctx context.Context, n uint64, increment, offset int64) (int64, int64, error) { // Check offset rebase if necessary. if uint64(offset-1) > uint64(alloc.base) { - if err := alloc.rebase4Unsigned(uint64(offset-1), true); err != nil { + if err := alloc.rebase4Unsigned(ctx, uint64(offset-1), true); err != nil { return 0, 0, err } } @@ -791,12 +836,27 @@ func (alloc *allocator) alloc4Unsigned(ctx context.Context, n uint64, increment, consumeDur := startTime.Sub(alloc.lastAllocTime) nextStep = NextStep(alloc.step, consumeDur) } + + ctx, allocatorStats, commitDetail := getAllocatorStatsFromCtx(ctx) + if allocatorStats != nil { + allocatorStats.allocCount++ + defer func() { + if commitDetail != nil { + allocatorStats.mergeCommitDetail(*commitDetail) + } + }() + } + err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("alloc.alloc4Unsigned", opentracing.ChildOf(span.Context())) defer span1.Finish() opentracing.ContextWithSpan(ctx, span1) } + if allocatorStats != nil { + txn.SetOption(kv.CollectRuntimeStats, allocatorStats.SnapshotRuntimeStats) + } + idAcc := alloc.getIDAccessor(txn) var err1 error newBase, err1 = idAcc.Get() @@ -842,6 +902,17 @@ func (alloc *allocator) alloc4Unsigned(ctx context.Context, n uint64, increment, return min, alloc.base, nil } +func getAllocatorStatsFromCtx(ctx context.Context) (context.Context, *AllocatorRuntimeStats, **tikvutil.CommitDetails) { + var allocatorStats *AllocatorRuntimeStats + var commitDetail *tikvutil.CommitDetails + ctxValue := ctx.Value(AllocatorRuntimeStatsCtxKey) + if ctxValue != nil { + allocatorStats = ctxValue.(*AllocatorRuntimeStats) + ctx = context.WithValue(ctx, tikvutil.CommitDetailCtxKey, &commitDetail) + } + return ctx, allocatorStats, &commitDetail +} + // alloc4Sequence is used to alloc value for sequence, there are several aspects different from autoid logic. // 1: sequence allocation don't need check rebase. // 2: sequence allocation don't need auto step. @@ -1024,3 +1095,111 @@ func (l *ShardIDLayout) IncrementalBitsCapacity() uint64 { func (l *ShardIDLayout) IncrementalMask() int64 { return (1 << l.IncrementalBits) - 1 } + +type allocatorRuntimeStatsCtxKeyType struct{} + +// AllocatorRuntimeStatsCtxKey is the context key of allocator runtime stats. +var AllocatorRuntimeStatsCtxKey = allocatorRuntimeStatsCtxKeyType{} + +// AllocatorRuntimeStats is the execution stats of auto id allocator. +type AllocatorRuntimeStats struct { + *txnsnapshot.SnapshotRuntimeStats + *execdetails.RuntimeStatsWithCommit + allocCount int + rebaseCount int +} + +// NewAllocatorRuntimeStats return a new AllocatorRuntimeStats. +func NewAllocatorRuntimeStats() *AllocatorRuntimeStats { + return &AllocatorRuntimeStats{ + SnapshotRuntimeStats: &txnsnapshot.SnapshotRuntimeStats{}, + } +} + +func (e *AllocatorRuntimeStats) mergeCommitDetail(detail *tikvutil.CommitDetails) { + if detail == nil { + return + } + if e.RuntimeStatsWithCommit == nil { + e.RuntimeStatsWithCommit = &execdetails.RuntimeStatsWithCommit{} + } + e.RuntimeStatsWithCommit.MergeCommitDetails(detail) +} + +// String implements the RuntimeStats interface. +func (e *AllocatorRuntimeStats) String() string { + if e.allocCount == 0 && e.rebaseCount == 0 { + return "" + } + var buf bytes.Buffer + buf.WriteString("auto_id_allocator: {") + initialSize := buf.Len() + if e.allocCount > 0 { + buf.WriteString("alloc_cnt: ") + buf.WriteString(strconv.FormatInt(int64(e.allocCount), 10)) + } + if e.rebaseCount > 0 { + if buf.Len() > initialSize { + buf.WriteString(", ") + } + buf.WriteString("rebase_cnt: ") + buf.WriteString(strconv.FormatInt(int64(e.rebaseCount), 10)) + } + if e.SnapshotRuntimeStats != nil { + stats := e.SnapshotRuntimeStats.String() + if stats != "" { + if buf.Len() > initialSize { + buf.WriteString(", ") + } + buf.WriteString(e.SnapshotRuntimeStats.String()) + } + } + if e.RuntimeStatsWithCommit != nil { + stats := e.RuntimeStatsWithCommit.String() + if stats != "" { + if buf.Len() > initialSize { + buf.WriteString(", ") + } + buf.WriteString(stats) + } + } + buf.WriteString("}") + return buf.String() +} + +// Clone implements the RuntimeStats interface. +func (e *AllocatorRuntimeStats) Clone() *AllocatorRuntimeStats { + newRs := &AllocatorRuntimeStats{ + allocCount: e.allocCount, + rebaseCount: e.rebaseCount, + } + if e.SnapshotRuntimeStats != nil { + snapshotStats := e.SnapshotRuntimeStats.Clone() + newRs.SnapshotRuntimeStats = snapshotStats + } + if e.RuntimeStatsWithCommit != nil { + newRs.RuntimeStatsWithCommit = e.RuntimeStatsWithCommit.Clone().(*execdetails.RuntimeStatsWithCommit) + } + return newRs +} + +// Merge implements the RuntimeStats interface. +func (e *AllocatorRuntimeStats) Merge(other *AllocatorRuntimeStats) { + if other == nil { + return + } + if other.SnapshotRuntimeStats != nil { + if e.SnapshotRuntimeStats == nil { + e.SnapshotRuntimeStats = other.SnapshotRuntimeStats.Clone() + } else { + e.SnapshotRuntimeStats.Merge(other.SnapshotRuntimeStats) + } + } + if other.RuntimeStatsWithCommit != nil { + if e.RuntimeStatsWithCommit == nil { + e.RuntimeStatsWithCommit = other.RuntimeStatsWithCommit.Clone().(*execdetails.RuntimeStatsWithCommit) + } else { + e.RuntimeStatsWithCommit.Merge(other.RuntimeStatsWithCommit) + } + } +} diff --git a/meta/autoid/autoid_test.go b/meta/autoid/autoid_test.go index cc1e8459ddfff..1fd0a1c07d1b3 100644 --- a/meta/autoid/autoid_test.go +++ b/meta/autoid/autoid_test.go @@ -83,22 +83,22 @@ func TestSignedAutoid(t *testing.T) { require.Equal(t, autoid.GetStep()+1, globalAutoID) // rebase - err = alloc.Rebase(int64(1), true) + err = alloc.Rebase(context.Background(), int64(1), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(3), id) - err = alloc.Rebase(int64(3), true) + err = alloc.Rebase(context.Background(), int64(3), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(4), id) - err = alloc.Rebase(int64(10), true) + err = alloc.Rebase(context.Background(), int64(10), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(11), id) - err = alloc.Rebase(int64(3010), true) + err = alloc.Rebase(context.Background(), int64(3010), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) @@ -112,7 +112,7 @@ func TestSignedAutoid(t *testing.T) { alloc = autoid.NewAllocator(store, 1, 2, false, autoid.RowIDAllocType) require.NotNil(t, alloc) - err = alloc.Rebase(int64(1), false) + err = alloc.Rebase(context.Background(), int64(1), false) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) @@ -120,27 +120,27 @@ func TestSignedAutoid(t *testing.T) { alloc = autoid.NewAllocator(store, 1, 3, false, autoid.RowIDAllocType) require.NotNil(t, alloc) - err = alloc.Rebase(int64(3210), false) + err = alloc.Rebase(context.Background(), int64(3210), false) require.NoError(t, err) alloc = autoid.NewAllocator(store, 1, 3, false, autoid.RowIDAllocType) require.NotNil(t, alloc) - err = alloc.Rebase(int64(3000), false) + err = alloc.Rebase(context.Background(), int64(3000), false) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(3211), id) - err = alloc.Rebase(int64(6543), false) + err = alloc.Rebase(context.Background(), int64(6543), false) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(6544), id) // Test the MaxInt64 is the upper bound of `alloc` function but not `rebase`. - err = alloc.Rebase(int64(math.MaxInt64-1), true) + err = alloc.Rebase(context.Background(), int64(math.MaxInt64-1), true) require.NoError(t, err) _, _, err = alloc.Alloc(ctx, 1, 1, 1) require.Error(t, err) - err = alloc.Rebase(int64(math.MaxInt64), true) + err = alloc.Rebase(context.Background(), int64(math.MaxInt64), true) require.NoError(t, err) // alloc N for signed @@ -169,7 +169,7 @@ func TestSignedAutoid(t *testing.T) { expected++ } - err = alloc.Rebase(int64(1000), false) + err = alloc.Rebase(context.Background(), int64(1000), false) require.NoError(t, err) min, max, err = alloc.Alloc(ctx, 3, 1, 1) require.NoError(t, err) @@ -179,7 +179,7 @@ func TestSignedAutoid(t *testing.T) { require.Equal(t, int64(1003), max) lastRemainOne := alloc.End() - err = alloc.Rebase(alloc.End()-2, false) + err = alloc.Rebase(context.Background(), alloc.End()-2, false) require.NoError(t, err) min, max, err = alloc.Alloc(ctx, 5, 1, 1) require.NoError(t, err) @@ -287,22 +287,22 @@ func TestUnsignedAutoid(t *testing.T) { require.Equal(t, autoid.GetStep()+1, globalAutoID) // rebase - err = alloc.Rebase(int64(1), true) + err = alloc.Rebase(context.Background(), int64(1), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(3), id) - err = alloc.Rebase(int64(3), true) + err = alloc.Rebase(context.Background(), int64(3), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(4), id) - err = alloc.Rebase(int64(10), true) + err = alloc.Rebase(context.Background(), int64(10), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(11), id) - err = alloc.Rebase(int64(3010), true) + err = alloc.Rebase(context.Background(), int64(3010), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) @@ -316,7 +316,7 @@ func TestUnsignedAutoid(t *testing.T) { alloc = autoid.NewAllocator(store, 1, 2, true, autoid.RowIDAllocType) require.NotNil(t, alloc) - err = alloc.Rebase(int64(1), false) + err = alloc.Rebase(context.Background(), int64(1), false) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) @@ -324,16 +324,16 @@ func TestUnsignedAutoid(t *testing.T) { alloc = autoid.NewAllocator(store, 1, 3, true, autoid.RowIDAllocType) require.NotNil(t, alloc) - err = alloc.Rebase(int64(3210), false) + err = alloc.Rebase(context.Background(), int64(3210), false) require.NoError(t, err) alloc = autoid.NewAllocator(store, 1, 3, true, autoid.RowIDAllocType) require.NotNil(t, alloc) - err = alloc.Rebase(int64(3000), false) + err = alloc.Rebase(context.Background(), int64(3000), false) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(3211), id) - err = alloc.Rebase(int64(6543), false) + err = alloc.Rebase(context.Background(), int64(6543), false) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) @@ -342,12 +342,12 @@ func TestUnsignedAutoid(t *testing.T) { // Test the MaxUint64 is the upper bound of `alloc` func but not `rebase`. var n uint64 = math.MaxUint64 - 1 un := int64(n) - err = alloc.Rebase(un, true) + err = alloc.Rebase(context.Background(), un, true) require.NoError(t, err) _, _, err = alloc.Alloc(ctx, 1, 1, 1) require.Error(t, err) un = int64(n + 1) - err = alloc.Rebase(un, true) + err = alloc.Rebase(context.Background(), un, true) require.NoError(t, err) // alloc N for unsigned @@ -363,7 +363,7 @@ func TestUnsignedAutoid(t *testing.T) { require.Equal(t, int64(1), min+1) require.Equal(t, int64(2), max) - err = alloc.Rebase(int64(500), true) + err = alloc.Rebase(context.Background(), int64(500), true) require.NoError(t, err) min, max, err = alloc.Alloc(ctx, 2, 1, 1) require.NoError(t, err) @@ -372,7 +372,7 @@ func TestUnsignedAutoid(t *testing.T) { require.Equal(t, int64(502), max) lastRemainOne := alloc.End() - err = alloc.Rebase(alloc.End()-2, false) + err = alloc.Rebase(context.Background(), alloc.End()-2, false) require.NoError(t, err) min, max, err = alloc.Alloc(ctx, 5, 1, 1) require.NoError(t, err) @@ -521,7 +521,7 @@ func TestRollbackAlloc(t *testing.T) { require.Equal(t, int64(0), alloc.Base()) require.Equal(t, int64(0), alloc.End()) - err = alloc.Rebase(100, true) + err = alloc.Rebase(context.Background(), 100, true) require.Error(t, err) require.Equal(t, int64(0), alloc.Base()) require.Equal(t, int64(0), alloc.End()) @@ -573,10 +573,10 @@ func TestAllocComputationIssue(t *testing.T) { require.NotNil(t, signedAlloc2) // the next valid two value must be 13 & 16, batch size = 6. - err = unsignedAlloc1.Rebase(10, false) + err = unsignedAlloc1.Rebase(context.Background(), 10, false) require.NoError(t, err) // the next valid two value must be 10 & 13, batch size = 6. - err = signedAlloc2.Rebase(7, false) + err = signedAlloc2.Rebase(context.Background(), 7, false) require.NoError(t, err) // Simulate the rest cache is not enough for next batch, assuming 10 & 13, batch size = 4. autoid.TestModifyBaseAndEndInjection(unsignedAlloc1, 9, 9) diff --git a/meta/autoid/memid.go b/meta/autoid/memid.go index 1a9af524959b2..21848c9d455d6 100644 --- a/meta/autoid/memid.go +++ b/meta/autoid/memid.go @@ -86,7 +86,7 @@ func (alloc *inMemoryAllocator) Alloc(ctx context.Context, n uint64, increment, // Rebase implements autoid.Allocator Rebase interface. // The requiredBase is the minimum base value after Rebase. // The real base may be greater than the required base. -func (alloc *inMemoryAllocator) Rebase(requiredBase int64, allocIDs bool) error { +func (alloc *inMemoryAllocator) Rebase(ctx context.Context, requiredBase int64, allocIDs bool) error { if alloc.isUnsigned { if uint64(requiredBase) > uint64(alloc.base) { alloc.base = requiredBase diff --git a/meta/autoid/memid_test.go b/meta/autoid/memid_test.go index 15dd436f98ed1..46f67170b0673 100644 --- a/meta/autoid/memid_test.go +++ b/meta/autoid/memid_test.go @@ -72,19 +72,19 @@ func TestInMemoryAlloc(t *testing.T) { require.Equal(t, int64(30), id) // rebase - err = alloc.Rebase(int64(40), true) + err = alloc.Rebase(context.Background(), int64(40), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(41), id) - err = alloc.Rebase(int64(10), true) + err = alloc.Rebase(context.Background(), int64(10), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) require.Equal(t, int64(42), id) // maxInt64 - err = alloc.Rebase(int64(math.MaxInt64-2), true) + err = alloc.Rebase(context.Background(), int64(math.MaxInt64-2), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) @@ -98,7 +98,7 @@ func TestInMemoryAlloc(t *testing.T) { require.NotNil(t, alloc) var n uint64 = math.MaxUint64 - 2 - err = alloc.Rebase(int64(n), true) + err = alloc.Rebase(context.Background(), int64(n), true) require.NoError(t, err) _, id, err = alloc.Alloc(ctx, 1, 1, 1) require.NoError(t, err) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 8e9e01d078c2b..5b55baa515bf4 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -2489,6 +2489,106 @@ func (s *testIntegrationSerialSuite) TestExplainAnalyzeDML(c *C) { checkExplain("BatchGet") } +func (s *testIntegrationSerialSuite) TestExplainAnalyzeDML2(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + + cases := []struct { + prepare string + sql string + planRegexp string + }{ + // Test for alloc auto ID. + { + sql: "insert into t () values ()", + planRegexp: ".*prepare.*total.*, auto_id_allocator.*alloc_cnt: 1, Get.*num_rpc.*total_time.*commit_txn.*prewrite.*get_commit_ts.*commit.*write_keys.*, insert.*", + }, + // Test for rebase ID. + { + sql: "insert into t (a) values (99000000000)", + planRegexp: ".*prepare.*total.*, auto_id_allocator.*rebase_cnt: 1, Get.*num_rpc.*total_time.*commit_txn.*prewrite.*get_commit_ts.*commit.*write_keys.*, insert.*", + }, + // Test for alloc auto ID and rebase ID. + { + sql: "insert into t (a) values (null), (99000000000)", + planRegexp: ".*prepare.*total.*, auto_id_allocator.*alloc_cnt: 1, rebase_cnt: 1, Get.*num_rpc.*total_time.*commit_txn.*prewrite.*get_commit_ts.*commit.*write_keys.*, insert.*", + }, + // Test for insert ignore. + { + sql: "insert ignore into t values (null,1), (2, 2), (99000000000, 3), (100000000000, 4)", + planRegexp: ".*prepare.*total.*, auto_id_allocator.*alloc_cnt: 1, rebase_cnt: 2, Get.*num_rpc.*total_time.*commit_txn.*count: 3, prewrite.*get_commit_ts.*commit.*write_keys.*, check_insert.*", + }, + // Test for insert on duplicate. + { + sql: "insert into t values (null,null), (1,1),(2,2) on duplicate key update a = a + 100000000000", + planRegexp: ".*prepare.*total.*, auto_id_allocator.*alloc_cnt: 1, rebase_cnt: 1, Get.*num_rpc.*total_time.*commit_txn.*count: 2, prewrite.*get_commit_ts.*commit.*write_keys.*, check_insert.*", + }, + // Test for replace with alloc ID. + { + sql: "replace into t () values ()", + planRegexp: ".*auto_id_allocator.*alloc_cnt: 1, Get.*num_rpc.*total_time.*commit_txn.*prewrite.*get_commit_ts.*commit.*write_keys.*", + }, + // Test for replace with alloc ID and rebase ID. + { + sql: "replace into t (a) values (null), (99000000000)", + planRegexp: ".*auto_id_allocator.*alloc_cnt: 1, rebase_cnt: 1, Get.*num_rpc.*total_time.*commit_txn.*prewrite.*get_commit_ts.*commit.*write_keys.*", + }, + // Test for update with rebase ID. + { + prepare: "insert into t values (1,1),(2,2)", + sql: "update t set a=a*100000000000", + planRegexp: ".*auto_id_allocator.*rebase_cnt: 2, Get.*num_rpc.*total_time.*commit_txn.*prewrite.*get_commit_ts.*commit.*write_keys.*", + }, + } + + for _, ca := range cases { + for i := 0; i < 3; i++ { + tk.MustExec("drop table if exists t") + switch i { + case 0: + tk.MustExec("create table t (a bigint auto_increment, b int, primary key (a));") + case 1: + tk.MustExec("create table t (a bigint unsigned auto_increment, b int, primary key (a));") + case 2: + if strings.Contains(ca.sql, "on duplicate key") { + continue + } + tk.MustExec("create table t (a bigint primary key auto_random(5), b int);") + tk.MustExec("set @@allow_auto_random_explicit_insert=1;") + default: + panic("should never happen") + } + if ca.prepare != "" { + tk.MustExec(ca.prepare) + } + res := tk.MustQuery("explain analyze " + ca.sql) + resBuff := bytes.NewBufferString("") + for _, row := range res.Rows() { + fmt.Fprintf(resBuff, "%s\t", row) + } + explain := resBuff.String() + c.Assert(explain, Matches, ca.planRegexp, Commentf("idx: %v,sql: %v", i, ca.sql)) + } + } + + // Test for table without auto id. + for _, ca := range cases { + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a bigint, b int);") + tk.MustExec("insert into t () values ()") + if ca.prepare != "" { + tk.MustExec(ca.prepare) + } + res := tk.MustQuery("explain analyze " + ca.sql) + resBuff := bytes.NewBufferString("") + for _, row := range res.Rows() { + fmt.Fprintf(resBuff, "%s\t", row) + } + explain := resBuff.String() + c.Assert(strings.Contains(explain, "auto_id_allocator"), IsFalse, Commentf("sql: %v, explain: %v", ca.sql, explain)) + } +} + func (s *testIntegrationSuite) TestPartitionExplain(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index 9d63e72e309e5..0dc813767907b 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -165,7 +165,7 @@ func TestBasic(t *testing.T) { alc := tb.Allocators(nil).Get(autoid.RowIDAllocType) require.NotNil(t, alc) - err = alc.Rebase(0, false) + err = alc.Rebase(context.Background(), 0, false) require.NoError(t, err) } @@ -419,7 +419,7 @@ func TestTableFromMeta(t *testing.T) { require.NoError(t, err) maxID := 1<<(64-15-1) - 1 - err = tb.Allocators(tk.Session()).Get(autoid.RowIDAllocType).Rebase(int64(maxID), false) + err = tb.Allocators(tk.Session()).Get(autoid.RowIDAllocType).Rebase(context.Background(), int64(maxID), false) require.NoError(t, err) _, err = tables.AllocHandle(context.Background(), tk.Session(), tb) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 4c3b1d9ddfd51..4265145c2d66a 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -415,16 +415,18 @@ const ( TpSelectResultRuntimeStats // TpInsertRuntimeStat is the tp for InsertRuntimeStat TpInsertRuntimeStat - // TpIndexLookUpRunTimeStats is the tp for TpIndexLookUpRunTimeStats + // TpIndexLookUpRunTimeStats is the tp for IndexLookUpRunTimeStats TpIndexLookUpRunTimeStats - // TpSlowQueryRuntimeStat is the tp for TpSlowQueryRuntimeStat + // TpSlowQueryRuntimeStat is the tp for SlowQueryRuntimeStat TpSlowQueryRuntimeStat // TpHashAggRuntimeStat is the tp for HashAggRuntimeStat TpHashAggRuntimeStat - // TpIndexMergeRunTimeStats is the tp for TpIndexMergeRunTimeStats + // TpIndexMergeRunTimeStats is the tp for IndexMergeRunTimeStats TpIndexMergeRunTimeStats - // TpBasicCopRunTimeStats is the tp for TpBasicCopRunTimeStats + // TpBasicCopRunTimeStats is the tp for BasicCopRunTimeStats TpBasicCopRunTimeStats + // TpUpdateRuntimeStats is the tp for UpdateRuntimeStats + TpUpdateRuntimeStats ) // RuntimeStats is used to express the executor runtime information. @@ -761,6 +763,7 @@ func (e *RuntimeStatsWithConcurrencyInfo) Merge(_ RuntimeStats) { // RuntimeStatsWithCommit is the RuntimeStats with commit detail. type RuntimeStatsWithCommit struct { Commit *util.CommitDetails + TxnCnt int LockKeys *util.LockKeysDetails } @@ -769,12 +772,27 @@ func (e *RuntimeStatsWithCommit) Tp() int { return TpRuntimeStatsWithCommit } +// MergeCommitDetails merges the commit details. +func (e *RuntimeStatsWithCommit) MergeCommitDetails(detail *util.CommitDetails) { + if detail == nil { + return + } + if e.Commit == nil { + e.Commit = detail + e.TxnCnt = 1 + return + } + e.Commit.Merge(detail) + e.TxnCnt++ +} + // Merge implements the RuntimeStats interface. func (e *RuntimeStatsWithCommit) Merge(rs RuntimeStats) { tmp, ok := rs.(*RuntimeStatsWithCommit) if !ok { return } + e.TxnCnt += tmp.TxnCnt if tmp.Commit != nil { if e.Commit == nil { e.Commit = &util.CommitDetails{} @@ -792,7 +810,9 @@ func (e *RuntimeStatsWithCommit) Merge(rs RuntimeStats) { // Clone implements the RuntimeStats interface. func (e *RuntimeStatsWithCommit) Clone() RuntimeStats { - newRs := RuntimeStatsWithCommit{} + newRs := RuntimeStatsWithCommit{ + TxnCnt: e.TxnCnt, + } if e.Commit != nil { newRs.Commit = e.Commit.Clone() } @@ -807,6 +827,12 @@ func (e *RuntimeStatsWithCommit) String() string { buf := bytes.NewBuffer(make([]byte, 0, 32)) if e.Commit != nil { buf.WriteString("commit_txn: {") + // Only print out when there are more than 1 transaction. + if e.TxnCnt > 1 { + buf.WriteString("count: ") + buf.WriteString(strconv.Itoa(e.TxnCnt)) + buf.WriteString(", ") + } if e.Commit.PrewriteTime > 0 { buf.WriteString("prewrite:") buf.WriteString(FormatDuration(e.Commit.PrewriteTime))