diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 21e92c0a0e694..1f6b386d20cf2 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -1167,6 +1167,7 @@ func (rc *Client) RestoreSSTFiles( var rangeFiles []*backuppb.File var leftFiles []*backuppb.File +<<<<<<< HEAD for rangeFiles, leftFiles = drainFilesByRange(files, rc.fileImporter.supportMultiIngest); len(rangeFiles) != 0; rangeFiles, leftFiles = drainFilesByRange(leftFiles, rc.fileImporter.supportMultiIngest) { filesReplica := rangeFiles rc.workerPool.ApplyOnErrorGroup(eg, @@ -1179,6 +1180,62 @@ func (rc *Client) RestoreSSTFiles( }() return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher, rc.backupMeta.ApiVersion) }) +======= +LOOPFORTABLE: + for _, tableIDWithFile := range tableIDWithFiles { + tableID := tableIDWithFile.TableID + files := tableIDWithFile.Files + fileCount += len(files) + for rangeFiles, leftFiles = drainFilesByRange(files); len(rangeFiles) != 0; rangeFiles, leftFiles = drainFilesByRange(leftFiles) { + filesReplica := rangeFiles + if ectx.Err() != nil { + log.Warn("Restoring encountered error and already stopped, give up remained files.", + zap.Int("remained", len(leftFiles)), + logutil.ShortError(ectx.Err())) + // We will fetch the error from the errgroup then (If there were). + // Also note if the parent context has been canceled or something, + // breaking here directly is also a reasonable behavior. + break LOOPFORTABLE + } + restoreFn := func() error { + filesGroups := getGroupFiles(filesReplica, rc.fileImporter.supportMultiIngest) + for _, filesGroup := range filesGroups { + if importErr := func(fs []*backuppb.File) (err error) { + fileStart := time.Now() + defer func() { + if err == nil { + log.Info("import files done", logutil.Files(filesGroup), + zap.Duration("take", time.Since(fileStart))) + updateCh.Inc() + } + }() + return rc.fileImporter.ImportSSTFiles(ectx, fs, rewriteRules, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()) + }(filesGroup); importErr != nil { + return errors.Trace(importErr) + } + } + + // the data of this range has been import done + if runner != nil && len(filesReplica) > 0 { + rangeKey := getFileRangeKey(filesReplica[0].Name) + // The checkpoint range shows this ranges of kvs has been restored into + // the table corresponding to the table-id. + if err := checkpoint.AppendRangesForRestore(ectx, runner, tableID, rangeKey); err != nil { + return errors.Trace(err) + } + } + return nil + } + if rc.granularity == string(CoarseGrained) { + eg.Go(restoreFn) + } else { + // if we are not use coarse granularity which means + // we still pipeline split & scatter regions and import sst files + // just keep the consistency as before. + rc.workerPool.ApplyOnErrorGroup(eg, restoreFn) + } + } +>>>>>>> d604b069399 (br: stop log when full restore failed (#51578)) } if err := eg.Wait(); err != nil { diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index 425f170334cd3..b14f523e2bd4d 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -759,6 +759,10 @@ func (importer *FileImporter) downloadRawKVSST( func (importer *FileImporter) ingest( ctx context.Context, +<<<<<<< HEAD +======= + files []*backuppb.File, +>>>>>>> d604b069399 (br: stop log when full restore failed (#51578)) info *split.RegionInfo, downloadMetas []*import_sstpb.SSTMeta, ) error { diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index d0a21d92b45e8..040df3486fb14 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -26,9 +26,17 @@ const ( downloadSSTWaitInterval = 1 * time.Second downloadSSTMaxWaitInterval = 4 * time.Second +<<<<<<< HEAD resetTSRetryTime = 16 +======= + backupSSTRetryTimes = 5 + backupSSTWaitInterval = 2 * time.Second + backupSSTMaxWaitInterval = 3 * time.Second + + resetTSRetryTime = 32 +>>>>>>> d604b069399 (br: stop log when full restore failed (#51578)) resetTSWaitInterval = 50 * time.Millisecond - resetTSMaxWaitInterval = 500 * time.Millisecond + resetTSMaxWaitInterval = 2 * time.Second resetTSRetryTimeExt = 600 resetTSWaitIntervalExt = 500 * time.Millisecond @@ -135,7 +143,6 @@ func NewDownloadSSTBackoffer() Backoffer { } func (bo *importerBackoffer) NextBackoff(err error) time.Duration { - log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err)) // we don't care storeID here. res := bo.errContext.HandleErrorMsg(err.Error(), 0) if res.Strategy == RetryStrategy { @@ -209,8 +216,12 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { bo.delayTime = 2 * bo.delayTime bo.attempt-- default: + // If the connection timeout, pd client would cancel the context, and return grpc context cancel error. + // So make the codes.Canceled retryable too. + // It's OK to retry the grpc context cancel error, because the parent context cancel returns context.Canceled. + // For example, cancel the `ectx` and then pdClient.GetTS(ectx) returns context.Canceled instead of grpc context canceled. switch status.Code(e) { - case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss, codes.Unknown: + case codes.DeadlineExceeded, codes.Canceled, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss, codes.Unknown: bo.delayTime = 2 * bo.delayTime bo.attempt-- default: diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index dc09826fd7806..aa5d048e90890 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -123,9 +123,12 @@ func TestPdBackoffWithRetryableError(t *testing.T) { if counter == 2 { return io.EOF } + if counter == 6 { + return context.Canceled + } return gRPCError }, backoffer) - require.Equal(t, 16, counter) + require.Equal(t, 7, counter) require.Equal(t, []error{ gRPCError, gRPCError, @@ -133,16 +136,7 @@ func TestPdBackoffWithRetryableError(t *testing.T) { gRPCError, gRPCError, gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, - gRPCError, + context.Canceled, }, multierr.Errors(err)) }