diff --git a/ddl/util/util.go b/ddl/util/util.go index 83f6aa600ab54..f2b7c63a4ff14 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -27,10 +27,13 @@ import ( ) const ( - loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.gc_delete_range WHERE ts < %v` + deleteRangesTable = `gc_delete_range` + doneDeleteRangesTable = `gc_delete_range_done` + loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%s WHERE ts < %v` recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d` completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d` updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = "%s" WHERE job_id = %d AND element_id = %d AND start_key = "%s"` + deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %d AND element_id = %d` ) // DelRangeTask is for run delete-range command in gc_worker. @@ -46,7 +49,16 @@ func (t DelRangeTask) Range() ([]byte, []byte) { // LoadDeleteRanges loads delete range tasks from gc_delete_range table. func LoadDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRangeTask, _ error) { - sql := fmt.Sprintf(loadDeleteRangeSQL, safePoint) + return loadDeleteRangesFromTable(ctx, deleteRangesTable, safePoint) +} + +// LoadDoneDeleteRanges loads deleted ranges from gc_delete_range_done table. +func LoadDoneDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRangeTask, _ error) { + return loadDeleteRangesFromTable(ctx, doneDeleteRangesTable, safePoint) +} + +func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint uint64) (ranges []DelRangeTask, _ error) { + sql := fmt.Sprintf(loadDeleteRangeSQL, table, safePoint) rss, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) if len(rss) > 0 { defer terror.Call(rss[0].Close) @@ -101,6 +113,13 @@ func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error { return errors.Trace(err) } +// DeleteDoneRecord removes a record from gc_delete_range_done table. +func DeleteDoneRecord(ctx sessionctx.Context, dr DelRangeTask) error { + sql := fmt.Sprintf(deleteDoneRecordSQL, dr.JobID, dr.ElementID) + _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + return errors.Trace(err) +} + // UpdateDeleteRange is only for emulator. func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, oldStartKey kv.Key) error { newStartKeyHex := hex.EncodeToString(newStartKey) diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index da6c638866fe8..93a24331d9d9d 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -707,6 +707,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R return resp, nil } resp.RawScan = handler.handleKvRawScan(r) + case tikvrpc.CmdUnsafeDestroyRange: + panic("unimplemented") case tikvrpc.CmdCop: r := req.Cop if err := handler.checkRequestContext(reqCtx); err != nil { diff --git a/store/tikv/client.go b/store/tikv/client.go index aa1a4fb5fece9..b1e95abc3b1e6 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -61,11 +61,12 @@ var MaxCallMsgSize = 1<<31 - 1 // Timeout durations. const ( - dialTimeout = 5 * time.Second - readTimeoutShort = 20 * time.Second // For requests that read/write several key-values. - ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region. - ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times. - GCTimeout = 5 * time.Minute + dialTimeout = 5 * time.Second + readTimeoutShort = 20 * time.Second // For requests that read/write several key-values. + ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region. + ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times. + GCTimeout = 5 * time.Minute + UnsafeDestroyRangeTimeout = 5 * time.Minute grpcInitialWindowSize = 1 << 30 grpcInitialConnWindowSize = 1 << 30 diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 2739c00ed1c88..fc18742e04ead 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -24,6 +24,8 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/pd-client" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" @@ -45,6 +47,7 @@ type GCWorker struct { uuid string desc string store tikv.Storage + pdClient pd.Client gcIsRunning bool lastFinish time.Time cancel context.CancelFunc @@ -54,7 +57,7 @@ type GCWorker struct { } // NewGCWorker creates a GCWorker instance. -func NewGCWorker(store tikv.Storage) (tikv.GCHandler, error) { +func NewGCWorker(store tikv.Storage, pdClient pd.Client) (tikv.GCHandler, error) { ver, err := store.CurrentVersion() if err != nil { return nil, errors.Trace(err) @@ -67,6 +70,7 @@ func NewGCWorker(store tikv.Storage) (tikv.GCHandler, error) { uuid: strconv.FormatUint(ver.Ver, 16), desc: fmt.Sprintf("host:%s, pid:%d, start at %s", hostName, os.Getpid(), time.Now()), store: store, + pdClient: pdClient, gcIsRunning: false, lastFinish: time.Now(), done: make(chan error), @@ -98,10 +102,11 @@ const ( gcLeaderDescKey = "tikv_gc_leader_desc" gcLeaderLeaseKey = "tikv_gc_leader_lease" - gcLastRunTimeKey = "tikv_gc_last_run_time" - gcRunIntervalKey = "tikv_gc_run_interval" - gcDefaultRunInterval = time.Minute * 10 - gcWaitTime = time.Minute * 1 + gcLastRunTimeKey = "tikv_gc_last_run_time" + gcRunIntervalKey = "tikv_gc_run_interval" + gcDefaultRunInterval = time.Minute * 10 + gcWaitTime = time.Minute * 1 + gcRedoDeleteRangeDelay = 24 * time.Hour gcLifeTimeKey = "tikv_gc_life_time" gcDefaultLifeTime = time.Minute * 10 @@ -333,6 +338,13 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64) { w.done <- errors.Trace(err) return } + err = w.redoDeleteRanges(ctx, safePoint) + if err != nil { + log.Errorf("[gc worker] %s redo-delete range returns an error %v", w.uuid, errors.ErrorStack(err)) + metrics.GCJobFailureCounter.WithLabelValues("redo_delete_range").Inc() + w.done <- errors.Trace(err) + return + } err = w.doGC(ctx, safePoint) if err != nil { log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, errors.ErrorStack(err)) @@ -344,6 +356,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64) { w.done <- nil } +// `deleteRanges` processes all delete range records whose ts < safePoint in table `gc_delete_range` func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64) error { metrics.GCWorkerCounter.WithLabelValues("delete_range").Inc() @@ -356,21 +369,14 @@ func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64) error { log.Infof("[gc worker] %s start delete %v ranges", w.uuid, len(ranges)) startTime := time.Now() - regions := 0 for _, r := range ranges { - startKey, rangeEndKey := r.Range() - - deleteRangeTask := tikv.NewDeleteRangeTask(ctx, w.store, startKey, rangeEndKey) - err := deleteRangeTask.Execute() + startKey, endKey := r.Range() + err = w.sendUnsafeDestroyRangeRequest(ctx, startKey, endKey) if err != nil { return errors.Trace(err) } - if deleteRangeTask.IsCanceled() { - return errors.New("[gc worker] gc job canceled") - } - regions += deleteRangeTask.CompletedRegions() se := createSession(w.store) err = util.CompleteDeleteRange(se, r) se.Close() @@ -378,11 +384,88 @@ func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64) error { return errors.Trace(err) } } - log.Infof("[gc worker] %s finish delete %v ranges, regions: %v, cost time: %s", w.uuid, len(ranges), regions, time.Since(startTime)) + log.Infof("[gc worker] %s finish delete %v ranges, cost time: %s", w.uuid, len(ranges), time.Since(startTime)) metrics.GCHistogram.WithLabelValues("delete_ranges").Observe(time.Since(startTime).Seconds()) return nil } +// `redoDeleteRanges` checks all deleted ranges whose ts is at least `lifetime + 24h` ago. See TiKV RFC #2. +func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64) error { + metrics.GCWorkerCounter.WithLabelValues("redo_delete_range").Inc() + + // We check delete range records that are deleted about 24 hours ago. + redoDeleteRangesTs := safePoint - oracle.ComposeTS(int64(gcRedoDeleteRangeDelay.Seconds())*1000, 0) + + se := createSession(w.store) + ranges, err := util.LoadDoneDeleteRanges(se, redoDeleteRangesTs) + se.Close() + if err != nil { + return errors.Trace(err) + } + + log.Infof("[gc worker] %s start redo-delete %v ranges", w.uuid, len(ranges)) + startTime := time.Now() + for _, r := range ranges { + startKey, endKey := r.Range() + + err = w.sendUnsafeDestroyRangeRequest(ctx, startKey, endKey) + if err != nil { + return errors.Trace(err) + } + + se := createSession(w.store) + err := util.DeleteDoneRecord(se, r) + se.Close() + if err != nil { + return errors.Trace(err) + } + } + log.Infof("[gc worker] %s finish redo-delete %v ranges, cost time: %s", w.uuid, len(ranges), time.Since(startTime)) + metrics.GCHistogram.WithLabelValues("redo_delete_ranges").Observe(time.Since(startTime).Seconds()) + return nil +} + +func (w *GCWorker) sendUnsafeDestroyRangeRequest(ctx context.Context, startKey []byte, endKey []byte) error { + // Get all stores every time deleting a region. So the store list is less probably to be stale. + stores, err := w.pdClient.GetAllStores(ctx) + if err != nil { + log.Errorf("[gc worker] %s delete ranges: got an error while trying to get store list from pd: %v", w.uuid, errors.ErrorStack(err)) + return errors.Trace(err) + } + + req := &tikvrpc.Request{ + Type: tikvrpc.CmdUnsafeDestroyRange, + UnsafeDestroyRange: &kvrpcpb.UnsafeDestroyRangeRequest{ + StartKey: startKey, + EndKey: endKey, + }, + } + + var wg sync.WaitGroup + + for _, store := range stores { + if store.State != metapb.StoreState_Up { + continue + } + + address := store.Address + storeID := store.Id + wg.Add(1) + go func() { + defer wg.Done() + _, err1 := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.UnsafeDestroyRangeTimeout) + if err1 != nil { + log.Errorf("[gc worker] %s destroy range on store %v failed with error: %v", w.uuid, storeID, errors.ErrorStack(err)) + err = err1 + } + }() + } + + wg.Wait() + + return errors.Trace(err) +} + func (w *GCWorker) loadGCConcurrencyWithDefault() (int, error) { str, err := w.loadValueFromSysTable(gcConcurrencyKey) if err != nil { diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index f902cbc61de6d..490c148291a57 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -52,7 +52,7 @@ func (s *testGCWorkerSuite) SetUpTest(c *C) { s.store.SetOracle(s.oracle) s.dom, err = session.BootstrapSession(s.store) c.Assert(err, IsNil) - gcWorker, err := NewGCWorker(s.store) + gcWorker, err := NewGCWorker(s.store, nil) c.Assert(err, IsNil) gcWorker.Start() gcWorker.Close() diff --git a/store/tikv/interface.go b/store/tikv/interface.go index 70005e14dbc7a..88ed848c73b2b 100644 --- a/store/tikv/interface.go +++ b/store/tikv/interface.go @@ -16,6 +16,7 @@ package tikv import ( "time" + "github.com/pingcap/pd/pd-client" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -67,4 +68,4 @@ type GCHandler interface { // NewGCHandlerFunc creates a new GCHandler. // To enable real GC, we should assign the function to `gcworker.NewGCWorker`. -var NewGCHandlerFunc func(storage Storage) (GCHandler, error) +var NewGCHandlerFunc func(storage Storage, pdClient pd.Client) (GCHandler, error) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 7f270cf62730a..84d6747f3a344 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -221,7 +221,7 @@ func (s *tikvStore) StartGCWorker() error { return nil } - gcWorker, err := NewGCHandlerFunc(s) + gcWorker, err := NewGCHandlerFunc(s, s.pdClient) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index d6f562ba2f671..00fe8bcfcbf24 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -53,6 +53,8 @@ const ( CmdRawDeleteRange CmdRawScan + CmdUnsafeDestroyRange + CmdCop CmdType = 512 + iota CmdCopStream @@ -101,6 +103,8 @@ func (t CmdType) String() string { return "RawDeleteRange" case CmdRawScan: return "RawScan" + case CmdUnsafeDestroyRange: + return "UnsafeDestroyRange" case CmdCop: return "Cop" case CmdCopStream: @@ -118,59 +122,61 @@ func (t CmdType) String() string { // Request wraps all kv/coprocessor requests. type Request struct { kvrpcpb.Context - Type CmdType - Get *kvrpcpb.GetRequest - Scan *kvrpcpb.ScanRequest - Prewrite *kvrpcpb.PrewriteRequest - Commit *kvrpcpb.CommitRequest - Cleanup *kvrpcpb.CleanupRequest - BatchGet *kvrpcpb.BatchGetRequest - BatchRollback *kvrpcpb.BatchRollbackRequest - ScanLock *kvrpcpb.ScanLockRequest - ResolveLock *kvrpcpb.ResolveLockRequest - GC *kvrpcpb.GCRequest - DeleteRange *kvrpcpb.DeleteRangeRequest - RawGet *kvrpcpb.RawGetRequest - RawBatchGet *kvrpcpb.RawBatchGetRequest - RawPut *kvrpcpb.RawPutRequest - RawBatchPut *kvrpcpb.RawBatchPutRequest - RawDelete *kvrpcpb.RawDeleteRequest - RawBatchDelete *kvrpcpb.RawBatchDeleteRequest - RawDeleteRange *kvrpcpb.RawDeleteRangeRequest - RawScan *kvrpcpb.RawScanRequest - Cop *coprocessor.Request - MvccGetByKey *kvrpcpb.MvccGetByKeyRequest - MvccGetByStartTs *kvrpcpb.MvccGetByStartTsRequest - SplitRegion *kvrpcpb.SplitRegionRequest + Type CmdType + Get *kvrpcpb.GetRequest + Scan *kvrpcpb.ScanRequest + Prewrite *kvrpcpb.PrewriteRequest + Commit *kvrpcpb.CommitRequest + Cleanup *kvrpcpb.CleanupRequest + BatchGet *kvrpcpb.BatchGetRequest + BatchRollback *kvrpcpb.BatchRollbackRequest + ScanLock *kvrpcpb.ScanLockRequest + ResolveLock *kvrpcpb.ResolveLockRequest + GC *kvrpcpb.GCRequest + DeleteRange *kvrpcpb.DeleteRangeRequest + RawGet *kvrpcpb.RawGetRequest + RawBatchGet *kvrpcpb.RawBatchGetRequest + RawPut *kvrpcpb.RawPutRequest + RawBatchPut *kvrpcpb.RawBatchPutRequest + RawDelete *kvrpcpb.RawDeleteRequest + RawBatchDelete *kvrpcpb.RawBatchDeleteRequest + RawDeleteRange *kvrpcpb.RawDeleteRangeRequest + RawScan *kvrpcpb.RawScanRequest + UnsafeDestroyRange *kvrpcpb.UnsafeDestroyRangeRequest + Cop *coprocessor.Request + MvccGetByKey *kvrpcpb.MvccGetByKeyRequest + MvccGetByStartTs *kvrpcpb.MvccGetByStartTsRequest + SplitRegion *kvrpcpb.SplitRegionRequest } // Response wraps all kv/coprocessor responses. type Response struct { - Type CmdType - Get *kvrpcpb.GetResponse - Scan *kvrpcpb.ScanResponse - Prewrite *kvrpcpb.PrewriteResponse - Commit *kvrpcpb.CommitResponse - Cleanup *kvrpcpb.CleanupResponse - BatchGet *kvrpcpb.BatchGetResponse - BatchRollback *kvrpcpb.BatchRollbackResponse - ScanLock *kvrpcpb.ScanLockResponse - ResolveLock *kvrpcpb.ResolveLockResponse - GC *kvrpcpb.GCResponse - DeleteRange *kvrpcpb.DeleteRangeResponse - RawGet *kvrpcpb.RawGetResponse - RawBatchGet *kvrpcpb.RawBatchGetResponse - RawPut *kvrpcpb.RawPutResponse - RawBatchPut *kvrpcpb.RawBatchPutResponse - RawDelete *kvrpcpb.RawDeleteResponse - RawBatchDelete *kvrpcpb.RawBatchDeleteResponse - RawDeleteRange *kvrpcpb.RawDeleteRangeResponse - RawScan *kvrpcpb.RawScanResponse - Cop *coprocessor.Response - CopStream *CopStreamResponse - MvccGetByKey *kvrpcpb.MvccGetByKeyResponse - MvccGetByStartTS *kvrpcpb.MvccGetByStartTsResponse - SplitRegion *kvrpcpb.SplitRegionResponse + Type CmdType + Get *kvrpcpb.GetResponse + Scan *kvrpcpb.ScanResponse + Prewrite *kvrpcpb.PrewriteResponse + Commit *kvrpcpb.CommitResponse + Cleanup *kvrpcpb.CleanupResponse + BatchGet *kvrpcpb.BatchGetResponse + BatchRollback *kvrpcpb.BatchRollbackResponse + ScanLock *kvrpcpb.ScanLockResponse + ResolveLock *kvrpcpb.ResolveLockResponse + GC *kvrpcpb.GCResponse + DeleteRange *kvrpcpb.DeleteRangeResponse + RawGet *kvrpcpb.RawGetResponse + RawBatchGet *kvrpcpb.RawBatchGetResponse + RawPut *kvrpcpb.RawPutResponse + RawBatchPut *kvrpcpb.RawBatchPutResponse + RawDelete *kvrpcpb.RawDeleteResponse + RawBatchDelete *kvrpcpb.RawBatchDeleteResponse + RawDeleteRange *kvrpcpb.RawDeleteRangeResponse + RawScan *kvrpcpb.RawScanResponse + UnsafeDestroyRange *kvrpcpb.UnsafeDestroyRangeResponse + Cop *coprocessor.Response + CopStream *CopStreamResponse + MvccGetByKey *kvrpcpb.MvccGetByKeyResponse + MvccGetByStartTS *kvrpcpb.MvccGetByStartTsResponse + SplitRegion *kvrpcpb.SplitRegionResponse } // CopStreamResponse combinates tikvpb.Tikv_CoprocessorStreamClient and the first Recv() result together. @@ -229,6 +235,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.RawDeleteRange.Context = ctx case CmdRawScan: req.RawScan.Context = ctx + case CmdUnsafeDestroyRange: + req.UnsafeDestroyRange.Context = ctx case CmdCop: req.Cop.Context = ctx case CmdCopStream: @@ -327,6 +335,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { resp.RawScan = &kvrpcpb.RawScanResponse{ RegionError: e, } + case CmdUnsafeDestroyRange: + resp.UnsafeDestroyRange = &kvrpcpb.UnsafeDestroyRangeResponse{ + RegionError: e, + } case CmdCop: resp.Cop = &coprocessor.Response{ RegionError: e, @@ -397,6 +409,8 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) { e = resp.RawDeleteRange.GetRegionError() case CmdRawScan: e = resp.RawScan.GetRegionError() + case CmdUnsafeDestroyRange: + e = resp.UnsafeDestroyRange.GetRegionError() case CmdCop: e = resp.Cop.GetRegionError() case CmdCopStream: @@ -459,6 +473,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.RawDeleteRange, err = client.RawDeleteRange(ctx, req.RawDeleteRange) case CmdRawScan: resp.RawScan, err = client.RawScan(ctx, req.RawScan) + case CmdUnsafeDestroyRange: + resp.UnsafeDestroyRange, err = client.UnsafeDestroyRange(ctx, req.UnsafeDestroyRange) case CmdCop: resp.Cop, err = client.Coprocessor(ctx, req.Cop) case CmdCopStream: