From 5b7be7479c41b0d13efd0776e8f28708d05badec Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 2 Mar 2023 23:33:13 +0800 Subject: [PATCH 1/4] This is an automated cherry-pick of #41879 Signed-off-by: ti-chi-bot --- ddl/index_merge_tmp_test.go | 82 +++++++++++++++++++++++++++++++++++++ table/tables/index.go | 57 ++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 42cf75bb4a2cd..2a262f0b72891 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -870,3 +870,85 @@ func TestAddIndexMultipleDelete(t *testing.T) { tk.MustQuery("select * from t;").Check(testkit.Rows()) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) } +<<<<<<< HEAD:ddl/index_merge_tmp_test.go +======= + +func TestAddIndexDuplicateAndWriteConflict(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int);") + tk.MustExec("insert into t values (1, 1);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + var runCancel bool + callback.OnJobRunAfterExported = func(job *model.Job) { + if t.Failed() || runCancel { + return + } + switch job.SchemaState { + case model.StateWriteOnly: + _, err := tk1.Exec("insert into t values (2, 1);") + assert.NoError(t, err) + } + if job.State == model.JobStateRollingback { + _, err := tk1.Exec("admin cancel ddl jobs " + strconv.FormatInt(job.ID, 10)) + assert.NoError(t, err) + runCancel = true + } + } + d.SetHook(callback) + + tk.MustGetErrCode("alter table t add unique index idx(b);", errno.ErrCancelledDDLJob) + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 1", "2 1")) +} + +func TestAddIndexUpdateUntouchedValues(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int, k int);") + tk.MustExec("insert into t values (1, 1, 1);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + var runDML bool + callback.OnJobRunAfterExported = func(job *model.Job) { + if t.Failed() || runDML { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("begin;") + assert.NoError(t, err) + _, err = tk1.Exec("update t set k=k+1 where id = 1;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (2, 1, 2);") + // Should not report "invalid temp index value". + assert.NoError(t, err) + _, err = tk1.Exec("commit;") + assert.NoError(t, err) + runDML = true + } + } + d.SetHook(callback) + + tk.MustGetErrCode("alter table t add unique index idx(b);", errno.ErrDupEntry) + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 2", "2 1 2")) +} +>>>>>>> 1c1c388d6e (ddl: never write untouched index values to temp index (#41879)):ddl/indexmergetest/merge_test.go diff --git a/table/tables/index.go b/table/tables/index.go index 4b10605535c8f..488ecfe6e534b 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -185,12 +185,69 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if err != nil { return nil, err } +<<<<<<< HEAD if len(tempKey) > 0 { if !opt.Untouched { // Untouched key-values never occur in the storage. tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} val = tempVal.Encode(nil) } err = txn.GetMemBuffer().Set(tempKey, val) +======= + + opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic + + if !distinct || skipCheck || opt.Untouched { + val := idxVal + if opt.Untouched && (keyIsTempIdxKey || len(tempKey) > 0) { + // Untouched key-values never occur in the storage and the temp index is not public. + // It is unnecessary to write the untouched temp index key-values. + continue + } + if keyIsTempIdxKey { + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) + } + err = txn.GetMemBuffer().Set(key, val) + if err != nil { + return nil, err + } + if len(tempKey) > 0 { + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) + err = txn.GetMemBuffer().Set(tempKey, val) + if err != nil { + return nil, err + } + } + if !opt.IgnoreAssertion && (!opt.Untouched) { + if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { + err = txn.SetAssertion(key, kv.SetAssertUnknown) + } else { + err = txn.SetAssertion(key, kv.SetAssertNotExist) + } + } + if err != nil { + return nil, err + } + continue + } + + var value []byte + if c.tblInfo.TempTableType != model.TempTableNone { + // Always check key for temporary table because it does not write to TiKV + value, err = txn.Get(ctx, key) + } else if sctx.GetSessionVars().LazyCheckKeyNotExists() { + value, err = txn.GetMemBuffer().Get(ctx, key) + } else { + value, err = txn.Get(ctx, key) + } + if err != nil && !kv.IsErrNotFound(err) { + return nil, err + } + var tempIdxVal tablecodec.TempIndexValue + if len(value) > 0 && keyIsTempIdxKey { + tempIdxVal, err = tablecodec.DecodeTempIndexValue(value) +>>>>>>> 1c1c388d6e (ddl: never write untouched index values to temp index (#41879)) if err != nil { return nil, err } From 75e978e0d4846c162faeb02bd56677e755710f67 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 3 Mar 2023 11:10:43 +0800 Subject: [PATCH 2/4] resolve conflicts --- ddl/index_merge_tmp_test.go | 41 ---------------------- table/tables/index.go | 70 +++++-------------------------------- 2 files changed, 8 insertions(+), 103 deletions(-) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 2a262f0b72891..8e2433f261b2d 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -870,46 +870,6 @@ func TestAddIndexMultipleDelete(t *testing.T) { tk.MustQuery("select * from t;").Check(testkit.Rows()) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) } -<<<<<<< HEAD:ddl/index_merge_tmp_test.go -======= - -func TestAddIndexDuplicateAndWriteConflict(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("create table t(id int primary key, b int);") - tk.MustExec("insert into t values (1, 1);") - - tk1 := testkit.NewTestKit(t, store) - tk1.MustExec("use test") - - d := dom.DDL() - originalCallback := d.GetHook() - defer d.SetHook(originalCallback) - callback := &callback.TestDDLCallback{} - var runCancel bool - callback.OnJobRunAfterExported = func(job *model.Job) { - if t.Failed() || runCancel { - return - } - switch job.SchemaState { - case model.StateWriteOnly: - _, err := tk1.Exec("insert into t values (2, 1);") - assert.NoError(t, err) - } - if job.State == model.JobStateRollingback { - _, err := tk1.Exec("admin cancel ddl jobs " + strconv.FormatInt(job.ID, 10)) - assert.NoError(t, err) - runCancel = true - } - } - d.SetHook(callback) - - tk.MustGetErrCode("alter table t add unique index idx(b);", errno.ErrCancelledDDLJob) - tk.MustExec("admin check table t;") - tk.MustQuery("select * from t;").Check(testkit.Rows("1 1", "2 1")) -} func TestAddIndexUpdateUntouchedValues(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) @@ -951,4 +911,3 @@ func TestAddIndexUpdateUntouchedValues(t *testing.T) { tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 2", "2 1 2")) } ->>>>>>> 1c1c388d6e (ddl: never write untouched index values to temp index (#41879)):ddl/indexmergetest/merge_test.go diff --git a/table/tables/index.go b/table/tables/index.go index 488ecfe6e534b..b8a164a9b2ece 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -177,7 +177,12 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if !distinct || skipCheck || opt.Untouched { val := idxVal - if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. + if opt.Untouched && (keyIsTempIdxKey || len(tempKey) > 0) { + // Untouched key-values never occur in the storage and the temp index is not public. + // It is unnecessary to write the untouched temp index key-values. + return nil, nil + } + if keyIsTempIdxKey { tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} val = tempVal.Encode(nil) } @@ -185,69 +190,10 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if err != nil { return nil, err } -<<<<<<< HEAD if len(tempKey) > 0 { - if !opt.Untouched { // Untouched key-values never occur in the storage. - tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} - val = tempVal.Encode(nil) - } + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) err = txn.GetMemBuffer().Set(tempKey, val) -======= - - opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic - - if !distinct || skipCheck || opt.Untouched { - val := idxVal - if opt.Untouched && (keyIsTempIdxKey || len(tempKey) > 0) { - // Untouched key-values never occur in the storage and the temp index is not public. - // It is unnecessary to write the untouched temp index key-values. - continue - } - if keyIsTempIdxKey { - tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} - val = tempVal.Encode(nil) - } - err = txn.GetMemBuffer().Set(key, val) - if err != nil { - return nil, err - } - if len(tempKey) > 0 { - tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} - val = tempVal.Encode(nil) - err = txn.GetMemBuffer().Set(tempKey, val) - if err != nil { - return nil, err - } - } - if !opt.IgnoreAssertion && (!opt.Untouched) { - if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { - err = txn.SetAssertion(key, kv.SetAssertUnknown) - } else { - err = txn.SetAssertion(key, kv.SetAssertNotExist) - } - } - if err != nil { - return nil, err - } - continue - } - - var value []byte - if c.tblInfo.TempTableType != model.TempTableNone { - // Always check key for temporary table because it does not write to TiKV - value, err = txn.Get(ctx, key) - } else if sctx.GetSessionVars().LazyCheckKeyNotExists() { - value, err = txn.GetMemBuffer().Get(ctx, key) - } else { - value, err = txn.Get(ctx, key) - } - if err != nil && !kv.IsErrNotFound(err) { - return nil, err - } - var tempIdxVal tablecodec.TempIndexValue - if len(value) > 0 && keyIsTempIdxKey { - tempIdxVal, err = tablecodec.DecodeTempIndexValue(value) ->>>>>>> 1c1c388d6e (ddl: never write untouched index values to temp index (#41879)) if err != nil { return nil, err } From 86b5e60b04b1f585a1e404365e65cd7ea8a4e9e5 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 3 Mar 2023 11:34:42 +0800 Subject: [PATCH 3/4] resolve conflicts --- ddl/index_merge_tmp_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 8e2433f261b2d..8dd5283d6c3de 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -885,9 +885,9 @@ func TestAddIndexUpdateUntouchedValues(t *testing.T) { d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) - callback := &callback.TestDDLCallback{} + callback := &ddl.TestDDLCallback{} var runDML bool - callback.OnJobRunAfterExported = func(job *model.Job) { + callback.OnJobRunBeforeExported = func(job *model.Job) { if t.Failed() || runDML { return } From 394c2b7284fdc14aad9a3b280b23d3f39996694a Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 3 Mar 2023 12:28:54 +0800 Subject: [PATCH 4/4] fix closeWriters --- ddl/ingest/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/ingest/engine.go b/ddl/ingest/engine.go index ac0287b26637a..fce350df09e40 100644 --- a/ddl/ingest/engine.go +++ b/ddl/ingest/engine.go @@ -204,7 +204,7 @@ func (ei *engineInfo) newWriterContext(workerID int, unique bool) (*WriterContex func (ei *engineInfo) closeWriters() error { var firstErr error - for wid := range ei.writerCache.Keys() { + for _, wid := range ei.writerCache.Keys() { if w, ok := ei.writerCache.Load(wid); ok { _, err := w.Close(ei.ctx) if err != nil {