Skip to content

Commit

Permalink
kv: refactor MemBuffer and reduce the memory usage (#18372)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zejun Li authored Jul 15, 2020
1 parent 85679e9 commit 761a961
Show file tree
Hide file tree
Showing 38 changed files with 1,024 additions and 934 deletions.
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
// and retry later if the job is not cancelled.
schemaVer, runJobErr = w.runDDLJob(d, t, job)
if job.IsCancelled() {
txn.Discard()
txn.Reset()
err = w.finishDDLJob(t, job)
return errors.Trace(err)
}
Expand All @@ -480,7 +480,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
// then shouldn't discard the KV modification.
// And the job state is rollback done, it means the job was already finished, also shouldn't discard too.
// Otherwise, we should discard the KV modification when running job.
txn.Discard()
txn.Reset()
}
err = w.updateDDLJob(t, job, runJobErr != nil)
if err = w.handleUpdateJobError(t, job, err); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx
}

// Create the index.
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle)
handle, err := w.index.Create(w.sessCtx, txn.GetUnionStore(), idxRecord.vals, idxRecord.handle)
if err != nil {
if kv.ErrKeyExists.Equal(err) && idxRecord.handle.Equal(handle) {
// Index already exists, skip it.
Expand Down
2 changes: 1 addition & 1 deletion ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func checkIndexExists(ctx sessionctx.Context, tbl table.Table, indexValue interf
if err != nil {
return errors.Trace(err)
}
doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, txn, types.MakeDatums(indexValue), kv.IntHandle(handle))
doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, txn.GetUnionStore(), types.MakeDatums(indexValue), kv.IntHandle(handle))
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
return result, err
}

_, err = e.index.Create(e.ctx, txn, row.idxVals, row.handle)
_, err = e.index.Create(e.ctx, txn.GetUnionStore(), row.idxVals, row.handle)
if err != nil {
return result, err
}
Expand Down
48 changes: 24 additions & 24 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,21 +340,21 @@ func (s *testSuite5) TestAdminCleanupIndex(c *C) {

txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr2.Create(s.ctx, txn, types.MakeDatums(1), kv.IntHandle(-100))
_, err = indexOpr2.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(1), kv.IntHandle(-100))
c.Assert(err, IsNil)
_, err = indexOpr2.Create(s.ctx, txn, types.MakeDatums(6), kv.IntHandle(100))
_, err = indexOpr2.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(6), kv.IntHandle(100))
c.Assert(err, IsNil)
_, err = indexOpr2.Create(s.ctx, txn, types.MakeDatums(8), kv.IntHandle(100))
_, err = indexOpr2.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(8), kv.IntHandle(100))
c.Assert(err, IsNil)
_, err = indexOpr2.Create(s.ctx, txn, types.MakeDatums(nil), kv.IntHandle(101))
_, err = indexOpr2.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(nil), kv.IntHandle(101))
c.Assert(err, IsNil)
_, err = indexOpr2.Create(s.ctx, txn, types.MakeDatums(nil), kv.IntHandle(102))
_, err = indexOpr2.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(nil), kv.IntHandle(102))
c.Assert(err, IsNil)
_, err = indexOpr3.Create(s.ctx, txn, types.MakeDatums(6), kv.IntHandle(200))
_, err = indexOpr3.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(6), kv.IntHandle(200))
c.Assert(err, IsNil)
_, err = indexOpr3.Create(s.ctx, txn, types.MakeDatums(6), kv.IntHandle(-200))
_, err = indexOpr3.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(6), kv.IntHandle(-200))
c.Assert(err, IsNil)
_, err = indexOpr3.Create(s.ctx, txn, types.MakeDatums(8), kv.IntHandle(-200))
_, err = indexOpr3.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(8), kv.IntHandle(-200))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -409,9 +409,9 @@ func (s *testSuite5) TestAdminCleanupIndexForPartitionTable(c *C) {

txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr2.Create(s.ctx, txn, types.MakeDatums(idxValue), kv.IntHandle(handle))
_, err = indexOpr2.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(idxValue), kv.IntHandle(handle))
c.Assert(err, IsNil)
_, err = indexOpr3.Create(s.ctx, txn, types.MakeDatums(idxValue), kv.IntHandle(handle))
_, err = indexOpr3.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(idxValue), kv.IntHandle(handle))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -490,11 +490,11 @@ func (s *testSuite5) TestAdminCleanupIndexPKNotHandle(c *C) {

txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(7, 10), kv.IntHandle(-100))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(7, 10), kv.IntHandle(-100))
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(4, 6), kv.IntHandle(100))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(4, 6), kv.IntHandle(100))
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(-7, 4), kv.IntHandle(101))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(-7, 4), kv.IntHandle(101))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -543,9 +543,9 @@ func (s *testSuite5) TestAdminCleanupIndexMore(c *C) {
for i := 0; i < 2000; i++ {
c1 := int64(2*i + 7)
c2 := int64(2*i + 8)
_, err = indexOpr1.Create(s.ctx, txn, types.MakeDatums(c1, c2), kv.IntHandle(c1))
_, err = indexOpr1.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(c1, c2), kv.IntHandle(c1))
c.Assert(err, IsNil, Commentf(errors.ErrorStack(err)))
_, err = indexOpr2.Create(s.ctx, txn, types.MakeDatums(c2), kv.IntHandle(c1))
_, err = indexOpr2.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(c2), kv.IntHandle(c1))
c.Assert(err, IsNil)
}
err = txn.Commit(context.Background())
Expand Down Expand Up @@ -619,7 +619,7 @@ func (s *testSuite3) TestAdminCheckPartitionTableFailed(c *C) {
// Manual recover index.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i), kv.IntHandle(i))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(i), kv.IntHandle(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -633,7 +633,7 @@ func (s *testSuite3) TestAdminCheckPartitionTableFailed(c *C) {
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), kv.IntHandle(i+8))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(i+8), kv.IntHandle(i+8))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -656,7 +656,7 @@ func (s *testSuite3) TestAdminCheckPartitionTableFailed(c *C) {
indexOpr := tables.NewIndex(tblInfo.GetPartitionInfo().Definitions[partitionIdx].ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(i+8), kv.IntHandle(i))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(i+8), kv.IntHandle(i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -719,7 +719,7 @@ func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
// Index c2 has one more values than table data: 0, and the handle 0 hasn't correlative record.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(0), kv.IntHandle(0))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(0), kv.IntHandle(0))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -735,9 +735,9 @@ func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
err = indexOpr.Delete(sc, txn, types.MakeDatums(0), kv.IntHandle(0))
c.Assert(err, IsNil)
// Make sure the index value "19" is smaller "21". Then we scan to "19" before "21".
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(19), kv.IntHandle(10))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(19), kv.IntHandle(10))
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(13), kv.IntHandle(2))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(13), kv.IntHandle(2))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand All @@ -763,7 +763,7 @@ func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
// Index c2 has one line of data is 19, the corresponding table data is 20.
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(12), kv.IntHandle(2))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(12), kv.IntHandle(2))
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(20), kv.IntHandle(10))
c.Assert(err, IsNil)
Expand All @@ -778,7 +778,7 @@ func (s *testSuite5) TestAdminCheckTableFailed(c *C) {
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(19), kv.IntHandle(10))
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(20), kv.IntHandle(10))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(20), kv.IntHandle(10))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -935,7 +935,7 @@ func (s *testSuite5) TestAdminCheckWithSnapshot(c *C) {
idxOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = idxOpr.Create(s.ctx, txn, types.MakeDatums(2), kv.IntHandle(100))
_, err = idxOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(2), kv.IntHandle(100))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *testSuite3) TestInconsistentIndex(c *C) {
for i := 0; i < 10; i++ {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = idxOp.Create(ctx, txn, types.MakeDatums(i+10), kv.IntHandle(100+i))
_, err = idxOp.Create(ctx, txn.GetUnionStore(), types.MakeDatums(i+10), kv.IntHandle(100+i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
6 changes: 3 additions & 3 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (s *testSuite3) TestAdmin(c *C) {
tb, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("admin_test"))
c.Assert(err, IsNil)
c.Assert(tb.Indices(), HasLen, 1)
_, err = tb.Indices()[0].Create(mock.NewContext(), txn, types.MakeDatums(int64(10)), kv.IntHandle(1))
_, err = tb.Indices()[0].Create(mock.NewContext(), txn.GetUnionStore(), types.MakeDatums(int64(10)), kv.IntHandle(1))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down Expand Up @@ -3345,7 +3345,7 @@ func (s *testSuite) TestCheckIndex(c *C) {
// table data (handle, data): (1, 10), (2, 20), (4, 40)
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(30)), kv.IntHandle(3))
_, err = idx.Create(mockCtx, txn.GetUnionStore(), types.MakeDatums(int64(30)), kv.IntHandle(3))
c.Assert(err, IsNil)
key := tablecodec.EncodeRowKey(tb.Meta().ID, kv.IntHandle(4).Encoded())
setColValue(c, txn, key, types.NewDatum(int64(40)))
Expand All @@ -3360,7 +3360,7 @@ func (s *testSuite) TestCheckIndex(c *C) {
// table data (handle, data): (1, 10), (2, 20), (4, 40)
txn, err = s.store.Begin()
c.Assert(err, IsNil)
_, err = idx.Create(mockCtx, txn, types.MakeDatums(int64(40)), kv.IntHandle(4))
_, err = idx.Create(mockCtx, txn.GetUnionStore(), types.MakeDatums(int64(40)), kv.IntHandle(4))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
15 changes: 6 additions & 9 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,25 +992,22 @@ func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
}

func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.Datum, reserveAutoIDCount int) error {
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
if !e.ctx.GetSessionVars().ConstraintCheckInPlace {
txn.SetOption(kv.PresumeKeyNotExists, nil)
func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.Datum, reserveAutoIDCount int) (err error) {
vars := e.ctx.GetSessionVars()
if !vars.ConstraintCheckInPlace {
vars.PresumeKeyNotExists = true
}
if reserveAutoIDCount > 0 {
_, err = e.Table.AddRecord(e.ctx, row, table.WithCtx(ctx), table.WithReserveAutoIDHint(reserveAutoIDCount))
} else {
_, err = e.Table.AddRecord(e.ctx, row, table.WithCtx(ctx))
}
txn.DelOption(kv.PresumeKeyNotExists)
vars.PresumeKeyNotExists = false
if err != nil {
return err
}
if e.lastInsertID != 0 {
e.ctx.GetSessionVars().SetLastInsertID(e.lastInsertID)
vars.SetLastInsertID(e.lastInsertID)
}
return nil
}
2 changes: 1 addition & 1 deletion executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2573,7 +2573,7 @@ func (s *testSuite7) TestReplaceLog(c *C) {

txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = indexOpr.Create(s.ctx, txn, types.MakeDatums(1), kv.IntHandle(1))
_, err = indexOpr.Create(s.ctx, txn.GetUnionStore(), types.MakeDatums(1), kv.IntHandle(1))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
100 changes: 0 additions & 100 deletions kv/buffer_store.go

This file was deleted.

Loading

0 comments on commit 761a961

Please sign in to comment.