Skip to content

Commit

Permalink
Merge branch 'master' into binding_set
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzf678 authored Dec 21, 2022
2 parents 16b65cf + 2150c6b commit f94977d
Show file tree
Hide file tree
Showing 43 changed files with 697 additions and 208 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4467,8 +4467,8 @@ def go_deps():
name = "org_golang_x_time",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/time",
sum = "h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE=",
version = "v0.2.0",
sum = "h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_tools",
Expand Down
7 changes: 1 addition & 6 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,12 +807,7 @@ func (b *backfillScheduler) initCopReqSenderPool() {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, ver.Ver)
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore())
}

func (b *backfillScheduler) canSkipError(err error) bool {
Expand Down
4 changes: 2 additions & 2 deletions ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ func SetBatchInsertDeleteRangeSize(i int) {

var NewCopContext4Test = newCopContext

func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS uint64,
func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, store kv.Storage,
batchSize int) ([]*indexRecord, bool, error) {
variable.SetDDLReorgBatchSize(int32(batchSize))
task := &reorgBackfillTask{
id: 1,
startKey: startKey,
endKey: endKey,
}
pool := newCopReqSenderPool(context.Background(), copCtx, startTS)
pool := newCopReqSenderPool(context.Background(), copCtx, store)
pool.adjustSize(1)
pool.tasksCh <- task
idxRec, _, _, done, err := pool.fetchRowColValsFromCop(*task)
Expand Down
18 changes: 18 additions & 0 deletions ddl/fktest/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,24 @@ func TestCreateTableWithForeignKeyPrivilegeCheck(t *testing.T) {
tk2.MustExec("create table t4 (a int, foreign key fk(a) references t1(id), foreign key (a) references t3(id));")
}

func TestAlterTableWithForeignKeyPrivilegeCheck(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create user 'u1'@'%' identified by '';")
tk.MustExec("grant create,alter on *.* to 'u1'@'%';")
tk.MustExec("create table t1 (id int key);")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.Session().Auth(&auth.UserIdentity{Username: "u1", Hostname: "localhost", CurrentUser: true, AuthUsername: "u1", AuthHostname: "%"}, nil, []byte("012345678901234567890"))
tk2.MustExec("create table t2 (a int)")
err := tk2.ExecToErr("alter table t2 add foreign key (a) references t1 (id) on update cascade")
require.Error(t, err)
require.Equal(t, "[planner:1142]REFERENCES command denied to user 'u1'@'%' for table 't1'", err.Error())
tk.MustExec("grant references on test.t1 to 'u1'@'%';")
tk2.MustExec("alter table t2 add foreign key (a) references t1 (id) on update cascade")
}

func TestRenameTableWithForeignKeyMetaInfo(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
17 changes: 11 additions & 6 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ type copReqSenderPool struct {
resultsCh chan idxRecResult
results generic.SyncMap[int, struct{}]

ctx context.Context
copCtx *copContext
startTS uint64
ctx context.Context
copCtx *copContext
store kv.Storage

senders []*copReqSender
wg sync.WaitGroup
Expand Down Expand Up @@ -139,7 +139,12 @@ func (c *copReqSender) run() {
curTaskID = task.id
logutil.BgLogger().Info("[ddl-ingest] start a cop-request task",
zap.Int("id", task.id), zap.String("task", task.String()))
rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey())
ver, err := p.store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
}
rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
Expand Down Expand Up @@ -167,7 +172,7 @@ func (c *copReqSender) run() {
}
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool {
func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage) *copReqSenderPool {
poolSize := copReadChunkPoolSize()
idxBufPool := make(chan []*indexRecord, poolSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
Expand All @@ -181,7 +186,7 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64
results: generic.NewSyncMap[int, struct{}](10),
ctx: ctx,
copCtx: copCtx,
startTS: startTS,
store: store,
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
idxBufPool: idxBufPool,
Expand Down
2 changes: 1 addition & 1 deletion ddl/index_cop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) {
endKey := startKey.PrefixNext()
txn, err := store.Begin()
require.NoError(t, err)
idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, txn.StartTS(), 10)
idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, store, 10)
require.NoError(t, err)
require.False(t, done)
require.NoError(t, txn.Rollback())
Expand Down
36 changes: 13 additions & 23 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,9 +1095,7 @@ func TestCheckFailReport(t *testing.T) {
require.NoError(t, txn.Commit(tk.ctx))

ctx, hook := withLogHook(tk.ctx, t, "inconsistency")
_, err = tk.Exec(ctx, "admin check table admin_test")
require.Error(t, err)
require.Equal(t, "[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 1, index-values:\"\" != record-values:\"handle: 1, values: [KindInt64 1]\"", err.Error())
tk.MustGetErrMsg(ctx, "admin check table admin_test", "[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 1, index-values:\"\" != record-values:\"handle: 1, values: [KindInt64 1]\"")
hook.checkLogCount(t, 1)
hook.logs[0].checkMsg(t, "admin check found data inconsistency")
hook.logs[0].checkField(t,
Expand All @@ -1119,9 +1117,7 @@ func TestCheckFailReport(t *testing.T) {
require.NoError(t, txn.Commit(tk.ctx))

ctx, hook := withLogHook(tk.ctx, t, "inconsistency")
_, err = tk.Exec(ctx, "admin check table admin_test")
require.Error(t, err)
require.Equal(t, "[admin:8223]data inconsistency in table: admin_test, index: k2, handle: 1, index-values:\"\" != record-values:\"handle: 1, values: [KindString 10]\"", err.Error())
tk.MustGetErrMsg(ctx, "admin check table admin_test", "[admin:8223]data inconsistency in table: admin_test, index: k2, handle: 1, index-values:\"\" != record-values:\"handle: 1, values: [KindString 10]\"")
hook.checkLogCount(t, 1)
hook.logs[0].checkMsg(t, "admin check found data inconsistency")
hook.logs[0].checkField(t,
Expand All @@ -1143,9 +1139,8 @@ func TestCheckFailReport(t *testing.T) {
require.NoError(t, txn.Commit(tk.ctx))

ctx, hook := withLogHook(tk.ctx, t, "inconsistency")
_, err = tk.Exec(ctx, "admin check table admin_test")
require.Error(t, err)
require.Equal(t, "[admin:8223]data inconsistency in table: admin_test, index: k2, handle: 1, index-values:\"handle: 1, values: [KindString 100 KindInt64 1]\" != record-values:\"\"", err.Error())
tk.MustGetErrMsg(ctx, "admin check table admin_test",
"[admin:8223]data inconsistency in table: admin_test, index: k2, handle: 1, index-values:\"handle: 1, values: [KindString 100 KindInt64 1]\" != record-values:\"\"")
hook.checkLogCount(t, 1)
logEntry := hook.logs[0]
logEntry.checkMsg(t, "admin check found data inconsistency")
Expand Down Expand Up @@ -1188,9 +1183,8 @@ func TestCheckFailReport(t *testing.T) {
require.NoError(t, txn.Commit(tk.ctx))

ctx, hook := withLogHook(tk.ctx, t, "inconsistency")
_, err = tk.Exec(ctx, "admin check table admin_test")
require.Error(t, err)
require.Equal(t, "[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 1, index-values:\"handle: 1, values: [KindInt64 10 KindInt64 1]\" != record-values:\"\"", err.Error())
tk.MustGetErrMsg(ctx, "admin check table admin_test",
"[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 1, index-values:\"handle: 1, values: [KindInt64 10 KindInt64 1]\" != record-values:\"\"")
hook.checkLogCount(t, 1)
logEntry := hook.logs[0]
logEntry.checkMsg(t, "admin check found data inconsistency")
Expand Down Expand Up @@ -1233,9 +1227,8 @@ func TestCheckFailReport(t *testing.T) {
require.NoError(t, err)
require.NoError(t, txn.Commit(tk.ctx))
ctx, hook := withLogHook(tk.ctx, t, "inconsistency")
_, err = tk.Exec(ctx, "admin check table admin_test")
require.Error(t, err)
require.Equal(t, "[executor:8134]data inconsistency in table: admin_test, index: uk1, col: c2, handle: \"1\", index-values:\"KindInt64 20\" != record-values:\"KindInt64 10\", compare err:<nil>", err.Error())
tk.MustGetErrMsg(ctx, "admin check table admin_test",
"[executor:8134]data inconsistency in table: admin_test, index: uk1, col: c2, handle: \"1\", index-values:\"KindInt64 20\" != record-values:\"KindInt64 10\", compare err:<nil>")
hook.checkLogCount(t, 1)
logEntry := hook.logs[0]
logEntry.checkMsg(t, "admin check found data inconsistency")
Expand All @@ -1261,9 +1254,8 @@ func TestCheckFailReport(t *testing.T) {
require.NoError(t, err)
require.NoError(t, txn.Commit(tk.ctx))
ctx, hook := withLogHook(tk.ctx, t, "inconsistency")
_, err = tk.Exec(ctx, "admin check table admin_test")
require.Error(t, err)
require.Equal(t, "[executor:8134]data inconsistency in table: admin_test, index: k2, col: c3, handle: \"1\", index-values:\"KindString 200\" != record-values:\"KindString 100\", compare err:<nil>", err.Error())
tk.MustGetErrMsg(ctx, "admin check table admin_test",
"[executor:8134]data inconsistency in table: admin_test, index: k2, col: c3, handle: \"1\", index-values:\"KindString 200\" != record-values:\"KindString 100\", compare err:<nil>")
hook.checkLogCount(t, 1)
logEntry := hook.logs[0]
logEntry.checkMsg(t, "admin check found data inconsistency")
Expand Down Expand Up @@ -1301,12 +1293,10 @@ func TestCheckFailReport(t *testing.T) {

// TODO(tiancaiamao): admin check doesn't support the chunk protocol.
// Remove this after https://github.com/pingcap/tidb/issues/35156
_, err = tk.Exec(ctx, "set @@tidb_enable_chunk_rpc = off")
require.NoError(t, err)
tk.MustExec(ctx, "set @@tidb_enable_chunk_rpc = off")

_, err = tk.Exec(ctx, "admin check table admin_test")
require.Error(t, err)
require.Equal(t, `[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 282574488403969, index-values:"handle: 282574488403969, values: [KindInt64 282578800083201 KindInt64 282574488403969]" != record-values:""`, err.Error())
tk.MustGetErrMsg(ctx, "admin check table admin_test",
`[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 282574488403969, index-values:"handle: 282574488403969, values: [KindInt64 282578800083201 KindInt64 282574488403969]" != record-values:""`)
hook.checkLogCount(t, 1)
logEntry := hook.logs[0]
logEntry.checkMsg(t, "admin check found data inconsistency")
Expand Down
2 changes: 1 addition & 1 deletion executor/analyzetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_test(
"main_test.go",
],
flaky = True,
race = "on",
shard_count = 50,
deps = [
"//domain",
Expand All @@ -30,6 +29,7 @@ go_test(
"//tablecodec",
"//testkit",
"//types",
"//util",
"//util/codec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
114 changes: 114 additions & 0 deletions executor/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package analyzetest
import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"testing"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
Expand Down Expand Up @@ -3060,3 +3062,115 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseCount"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/injectBaseModifyCount"))
}

func TestGlobalMemoryControlForAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set global tidb_mem_oom_action = 'cancel'")
tk0.MustExec("set global tidb_server_memory_limit = 512MB")
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk0.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tk0.MustExec("use test")
tk0.MustExec("create table t(a int)")
tk0.MustExec("insert into t select 1")
for i := 1; i <= 8; i++ {
tk0.MustExec("insert into t select * from t") // 256 Lines
}
sql := "analyze table t with 1.0 samplerate;" // Need about 100MB
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume"))
}()
_, err := tk0.Exec(sql)
require.True(t, strings.Contains(err.Error(), "Out Of Memory Quota!"))
runtime.GC()
}

func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
originalVal1 := tk.MustQuery("select @@global.tidb_mem_oom_action").Rows()[0][0].(string)
tk.MustExec("set global tidb_mem_oom_action = 'cancel'")
//originalVal2 := tk.MustQuery("select @@global.tidb_server_memory_limit").Rows()[0][0].(string)
tk.MustExec("set global tidb_server_memory_limit = 512MB")
originalVal3 := tk.MustQuery("select @@global.tidb_server_memory_limit_sess_min_size").Rows()[0][0].(string)
tk.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")
defer func() {
tk.MustExec(fmt.Sprintf("set global tidb_mem_oom_action = %v", originalVal1))
//tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit = %v", originalVal2))
tk.MustExec(fmt.Sprintf("set global tidb_server_memory_limit_sess_min_size = %v", originalVal3))
}()

// clean child trackers
oldChildTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
for _, tracker := range oldChildTrackers {
tracker.Detach()
}
defer func() {
for _, tracker := range oldChildTrackers {
tracker.AttachTo(executor.GlobalAnalyzeMemoryTracker)
}
}()
childTrackers := executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
require.Len(t, childTrackers, 0)

tk.MustExec("use test")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t select 1")
for i := 1; i <= 8; i++ {
tk.MustExec("insert into t select * from t") // 256 Lines
}
_, err0 := tk.Exec("analyze table t with 1.0 samplerate;")
require.NoError(t, err0)
rs0 := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed")
require.Len(t, rs0.Rows(), 0)

h := dom.StatsHandle()
originalVal4 := handle.AutoAnalyzeMinCnt
originalVal5 := tk.MustQuery("select @@global.tidb_auto_analyze_ratio").Rows()[0][0].(string)
handle.AutoAnalyzeMinCnt = 0
tk.MustExec("set global tidb_auto_analyze_ratio = 0.001")
defer func() {
handle.AutoAnalyzeMinCnt = originalVal4
tk.MustExec(fmt.Sprintf("set global tidb_auto_analyze_ratio = %v", originalVal5))
}()

sm := &testkit.MockSessionManager{
Dom: dom,
PS: []*util.ProcessInfo{tk.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tk.MustExec("insert into t values(4),(5),(6)")
require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll))
err := h.Update(dom.InfoSchema())
require.NoError(t, err)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume"))
}()
tk.MustQuery("select 1")
childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
require.Len(t, childTrackers, 0)

h.HandleAutoAnalyze(dom.InfoSchema())
rs := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed")
failReason := rs.Rows()[0][0].(string)
require.True(t, strings.Contains(failReason, "Out Of Memory Quota!"))

childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
require.Len(t, childTrackers, 0)
}
Loading

0 comments on commit f94977d

Please sign in to comment.