From 9c099afae5d980063c75b411f03d8c7d3370c1a1 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 7 Mar 2024 13:36:43 +0800 Subject: [PATCH 1/5] remove timeout for ingest Signed-off-by: Leavrth --- br/pkg/checkpoint/checkpoint.go | 2 +- br/pkg/conn/conn.go | 4 ++-- br/pkg/restore/client.go | 16 +++++++++------- br/pkg/restore/import.go | 4 +--- br/pkg/utils/backoff.go | 18 +++++------------- br/pkg/utils/backoff_test.go | 2 +- 6 files changed, 19 insertions(+), 27 deletions(-) diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go index a7707afda2e04..079fb5a84c4df 100644 --- a/br/pkg/checkpoint/checkpoint.go +++ b/br/pkg/checkpoint/checkpoint.go @@ -596,7 +596,7 @@ func (r *CheckpointRunner[K, V]) getTS(ctx context.Context) (int64, int64, error } return nil - }, utils.NewPDReqBackoffer()) + }, utils.NewPDReqBackofferExt()) return p, l, errors.Trace(errRetry) } diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index d1dea6585edaf..339dbeefdb590 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -103,7 +103,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, return errors.Trace(err) }, - utils.NewPDReqBackoffer(), + utils.NewPDReqBackofferExt(), ) return stores, errors.Trace(errRetry) @@ -383,7 +383,7 @@ func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func } _ = resp.Body.Close() return nil - }, utils.NewPDReqBackoffer()) + }, utils.NewPDReqBackofferExt()) if err != nil { // if one store failed, break and return error return err diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 957700dba643c..38db99b5eece4 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -733,7 +733,7 @@ func (rc *Client) GetTSWithRetry(ctx context.Context) (uint64, error) { log.Warn("failed to get TS, retry it", zap.Uint("retry time", retry), logutil.ShortError(getTSErr)) } return getTSErr - }, utils.NewPDReqBackoffer()) + }, utils.NewPDReqBackofferExt()) if err != nil { log.Error("failed to get TS", zap.Error(err)) @@ -747,7 +747,7 @@ func (rc *Client) ResetTS(ctx context.Context, pdCtrl *pdutil.PdController) erro log.Info("reset pd timestamp", zap.Uint64("ts", restoreTS)) return utils.WithRetry(ctx, func() error { return pdCtrl.ResetTS(ctx, restoreTS) - }, utils.NewPDReqBackoffer()) + }, utils.NewPDReqBackofferExt()) } // GetPlacementRules return the current placement rules. @@ -760,7 +760,7 @@ func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pd i++ placementRules, err = pdutil.GetPlacementRules(ctx, pdAddrs[idx], rc.tlsConf) return errors.Trace(err) - }, utils.NewPDReqBackoffer()) + }, utils.NewPDReqBackofferExt()) return placementRules, errors.Trace(errRetry) } @@ -1520,12 +1520,14 @@ LOOPFORTABLE: restoreFn := func() error { filesGroups := getGroupFiles(filesReplica, rc.fileImporter.supportMultiIngest) for _, filesGroup := range filesGroups { - if importErr := func(fs []*backuppb.File) error { + if importErr := func(fs []*backuppb.File) (err error) { fileStart := time.Now() defer func() { - log.Info("import files done", logutil.Files(filesGroup), - zap.Duration("take", time.Since(fileStart))) - updateCh.Inc() + if err == nil { + log.Info("import files done", logutil.Files(filesGroup), + zap.Duration("take", time.Since(fileStart))) + updateCh.Inc() + } }() return rc.fileImporter.ImportSSTFiles(ectx, fs, rewriteRules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()) }(filesGroup); importErr != nil { diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index c1b46b353134e..eb3de1bb3f1a1 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -1186,13 +1186,11 @@ func (importer *FileImporter) downloadRawKVSSTV2( } func (importer *FileImporter) ingest( - c context.Context, + ctx context.Context, files []*backuppb.File, info *split.RegionInfo, downloadMetas []*import_sstpb.SSTMeta, ) error { - ctx, cancel := context.WithTimeout(c, gRPCTimeOut) - defer cancel() for { ingestResp, errIngest := importer.ingestSSTs(ctx, downloadMetas, info) if errIngest != nil { diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 6b7aa7a127863..96fe837e2cca1 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -31,10 +31,6 @@ const ( backupSSTWaitInterval = 2 * time.Second backupSSTMaxWaitInterval = 3 * time.Second - resetTSRetryTime = 16 - resetTSWaitInterval = 50 * time.Millisecond - resetTSMaxWaitInterval = 500 * time.Millisecond - resetTSRetryTimeExt = 600 resetTSWaitIntervalExt = 500 * time.Millisecond resetTSMaxWaitIntervalExt = 300 * time.Second @@ -167,7 +163,11 @@ func NewBackupSSTBackoffer() Backoffer { } func (bo *importerBackoffer) NextBackoff(err error) time.Duration { - log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err)) + defer func() { + if bo.attempt != 0 { + log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err)) + } + }() // we don't care storeID here. res := bo.errContext.HandleErrorMsg(err.Error(), 0) if res.Strategy == RetryStrategy { @@ -220,14 +220,6 @@ type pdReqBackoffer struct { maxDelayTime time.Duration } -func NewPDReqBackoffer() Backoffer { - return &pdReqBackoffer{ - attempt: resetTSRetryTime, - delayTime: resetTSWaitInterval, - maxDelayTime: resetTSMaxWaitInterval, - } -} - func NewPDReqBackofferExt() Backoffer { return &pdReqBackoffer{ attempt: resetTSRetryTimeExt, diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index 316896bde3f0d..8719220c9c816 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -116,7 +116,7 @@ func TestBackoffWithRetryableError(t *testing.T) { func TestPdBackoffWithRetryableError(t *testing.T) { var counter int - backoffer := utils.NewPDReqBackoffer() + backoffer := utils.NewPDReqBackofferExt() gRPCError := status.Error(codes.Unavailable, "transport is closing") err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() From 59b1ba0831549d1066cdf4364590eeca947930f8 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Thu, 7 Mar 2024 13:41:16 +0800 Subject: [PATCH 2/5] log if error is not retryable Signed-off-by: Leavrth --- br/pkg/utils/backoff.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 96fe837e2cca1..b7c1d6b1d5cee 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -164,8 +164,8 @@ func NewBackupSSTBackoffer() Backoffer { func (bo *importerBackoffer) NextBackoff(err error) time.Duration { defer func() { - if bo.attempt != 0 { - log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err)) + if bo.attempt == 0 { + log.Warn("failed to import ssts by unretryable error or retry attempt exhausted", zap.Int("attempt", bo.attempt), zap.Error(err)) } }() // we don't care storeID here. From 44743acc3c093acddc02038c1c80841a734d7369 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 8 Mar 2024 15:47:14 +0800 Subject: [PATCH 3/5] retry the grcp context cancel error in pd backoff Signed-off-by: Leavrth --- br/pkg/utils/backoff.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index b7c1d6b1d5cee..7f1ff66c8be71 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -241,8 +241,12 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { bo.delayTime = 2 * bo.delayTime bo.attempt-- default: + // If the connection timeout, pd client would cancel the context, and return grpc context cancel error. + // So make the codes.Canceled retryable too. + // It's OK to retry the grpc context cancel error, because the parent context cancel returns context.Canceled. + // For example, cancel the `ectx` and then pdClient.GetTS(ectx) returns context.Canceled instead of grpc context canceled. switch status.Code(e) { - case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss, codes.Unknown: + case codes.DeadlineExceeded, codes.Canceled, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss, codes.Unknown: bo.delayTime = 2 * bo.delayTime bo.attempt-- default: From 5085dc924eb059f270c1fbc6e66aeb73a970ffa8 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Mon, 11 Mar 2024 13:47:41 +0800 Subject: [PATCH 4/5] commit some suggestions Signed-off-by: Leavrth --- br/pkg/checkpoint/checkpoint.go | 2 +- br/pkg/conn/conn.go | 4 ++-- br/pkg/restore/client.go | 6 +++--- br/pkg/utils/backoff.go | 17 ++++++++++++----- br/pkg/utils/backoff_test.go | 2 +- 5 files changed, 19 insertions(+), 12 deletions(-) diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go index 079fb5a84c4df..a7707afda2e04 100644 --- a/br/pkg/checkpoint/checkpoint.go +++ b/br/pkg/checkpoint/checkpoint.go @@ -596,7 +596,7 @@ func (r *CheckpointRunner[K, V]) getTS(ctx context.Context) (int64, int64, error } return nil - }, utils.NewPDReqBackofferExt()) + }, utils.NewPDReqBackoffer()) return p, l, errors.Trace(errRetry) } diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 339dbeefdb590..d1dea6585edaf 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -103,7 +103,7 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, return errors.Trace(err) }, - utils.NewPDReqBackofferExt(), + utils.NewPDReqBackoffer(), ) return stores, errors.Trace(errRetry) @@ -383,7 +383,7 @@ func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func } _ = resp.Body.Close() return nil - }, utils.NewPDReqBackofferExt()) + }, utils.NewPDReqBackoffer()) if err != nil { // if one store failed, break and return error return err diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 38db99b5eece4..1fe97552743f5 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -733,7 +733,7 @@ func (rc *Client) GetTSWithRetry(ctx context.Context) (uint64, error) { log.Warn("failed to get TS, retry it", zap.Uint("retry time", retry), logutil.ShortError(getTSErr)) } return getTSErr - }, utils.NewPDReqBackofferExt()) + }, utils.NewPDReqBackoffer()) if err != nil { log.Error("failed to get TS", zap.Error(err)) @@ -747,7 +747,7 @@ func (rc *Client) ResetTS(ctx context.Context, pdCtrl *pdutil.PdController) erro log.Info("reset pd timestamp", zap.Uint64("ts", restoreTS)) return utils.WithRetry(ctx, func() error { return pdCtrl.ResetTS(ctx, restoreTS) - }, utils.NewPDReqBackofferExt()) + }, utils.NewPDReqBackoffer()) } // GetPlacementRules return the current placement rules. @@ -760,7 +760,7 @@ func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pd i++ placementRules, err = pdutil.GetPlacementRules(ctx, pdAddrs[idx], rc.tlsConf) return errors.Trace(err) - }, utils.NewPDReqBackofferExt()) + }, utils.NewPDReqBackoffer()) return placementRules, errors.Trace(errRetry) } diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 7f1ff66c8be71..658a4d965d886 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -31,6 +31,10 @@ const ( backupSSTWaitInterval = 2 * time.Second backupSSTMaxWaitInterval = 3 * time.Second + resetTSRetryTime = 32 + resetTSWaitInterval = 50 * time.Millisecond + resetTSMaxWaitInterval = 2 * time.Second + resetTSRetryTimeExt = 600 resetTSWaitIntervalExt = 500 * time.Millisecond resetTSMaxWaitIntervalExt = 300 * time.Second @@ -163,11 +167,6 @@ func NewBackupSSTBackoffer() Backoffer { } func (bo *importerBackoffer) NextBackoff(err error) time.Duration { - defer func() { - if bo.attempt == 0 { - log.Warn("failed to import ssts by unretryable error or retry attempt exhausted", zap.Int("attempt", bo.attempt), zap.Error(err)) - } - }() // we don't care storeID here. res := bo.errContext.HandleErrorMsg(err.Error(), 0) if res.Strategy == RetryStrategy { @@ -220,6 +219,14 @@ type pdReqBackoffer struct { maxDelayTime time.Duration } +func NewPDReqBackoffer() Backoffer { + return &pdReqBackoffer{ + attempt: resetTSRetryTime, + delayTime: resetTSWaitInterval, + maxDelayTime: resetTSMaxWaitInterval, + } +} + func NewPDReqBackofferExt() Backoffer { return &pdReqBackoffer{ attempt: resetTSRetryTimeExt, diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index 8719220c9c816..316896bde3f0d 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -116,7 +116,7 @@ func TestBackoffWithRetryableError(t *testing.T) { func TestPdBackoffWithRetryableError(t *testing.T) { var counter int - backoffer := utils.NewPDReqBackofferExt() + backoffer := utils.NewPDReqBackoffer() gRPCError := status.Error(codes.Unavailable, "transport is closing") err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() From 42ebaff4c189a68d05d8fe80cbc97d17f009476e Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 12 Mar 2024 10:32:28 +0800 Subject: [PATCH 5/5] fix unit test Signed-off-by: Leavrth --- br/pkg/utils/backoff_test.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index 316896bde3f0d..c2a9247f4269d 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -123,9 +123,12 @@ func TestPdBackoffWithRetryableError(t *testing.T) { if counter == 2 { return io.EOF } + if counter == 6 { + return context.Canceled + } return gRPCError }, backoffer) - require.Equal(t, 16, counter) + require.Equal(t, 7, counter) require.Equal(t, []error{ gRPCError, gRPCError, @@ -133,16 +136,7 @@ func TestPdBackoffWithRetryableError(t *testing.T) { gRPCError, gRPCError, gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, + context.Canceled, }, multierr.Errors(err)) }