From 2bbeb08866385dae49c221de5870a16fb417d926 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sun, 16 Jan 2022 21:53:41 +0000 Subject: [PATCH] backup: use correct Context in restore workers Previously some of the workers, which are called by ctxgroup goroutines, were using RestoreDataProcessor.Ctx, instead of the child context that the group created, which, critically, is cancelled if any group task fails. This could mean one worker in the group fails and stops draining a channel and returns an error to the group, which cancels its context, but another worker trying to write to that channel hangs if it is not checking the passed, now cancelled context. Release note (bug fix): fix a case where a RESTORE job could hang if it encountered an error when ingesting restored data. --- pkg/ccl/backupccl/restore_data_processor.go | 18 ++++++++---------- .../backupccl/restore_data_processor_test.go | 4 ++-- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 66badc289171..e38289b18b15 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -172,7 +172,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(rd.sstCh) for entry := range entries { - if err := rd.openSSTs(entry, rd.sstCh); err != nil { + if err := rd.openSSTs(ctx, entry, rd.sstCh); err != nil { return err } } @@ -182,7 +182,7 @@ func (rd *restoreDataProcessor) Start(ctx context.Context) { rd.phaseGroup.GoCtx(func(ctx context.Context) error { defer close(rd.progCh) - return rd.runRestoreWorkers(rd.sstCh) + return rd.runRestoreWorkers(ctx, rd.sstCh) }) } @@ -256,9 +256,8 @@ type mergedSST struct { } func (rd *restoreDataProcessor) openSSTs( - entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST, + ctx context.Context, entry execinfrapb.RestoreSpanEntry, sstCh chan mergedSST, ) error { - ctx := rd.Ctx ctxDone := ctx.Done() // The sstables only contain MVCC data and no intents, so using an MVCC @@ -315,7 +314,7 @@ func (rd *restoreDataProcessor) openSSTs( return nil } - log.VEventf(rd.Ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) + log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) for _, file := range entry.Files { log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key) @@ -338,8 +337,8 @@ func (rd *restoreDataProcessor) openSSTs( return sendIters(iters, dirs) } -func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error { - return ctxgroup.GroupWorkers(rd.Ctx, rd.numWorkers, func(ctx context.Context, _ int) error { +func (rd *restoreDataProcessor) runRestoreWorkers(ctx context.Context, ssts chan mergedSST) error { + return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, _ int) error { for { done, err := func() (done bool, _ error) { sstIter, ok := <-ssts @@ -348,7 +347,7 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error { return done, nil } - summary, err := rd.processRestoreSpanEntry(sstIter) + summary, err := rd.processRestoreSpanEntry(ctx, sstIter) if err != nil { return done, err } @@ -374,10 +373,9 @@ func (rd *restoreDataProcessor) runRestoreWorkers(ssts chan mergedSST) error { } func (rd *restoreDataProcessor) processRestoreSpanEntry( - sst mergedSST, + ctx context.Context, sst mergedSST, ) (roachpb.BulkOpSummary, error) { db := rd.flowCtx.Cfg.DB - ctx := rd.Ctx evalCtx := rd.EvalCtx var summary roachpb.BulkOpSummary diff --git a/pkg/ccl/backupccl/restore_data_processor_test.go b/pkg/ccl/backupccl/restore_data_processor_test.go index fbbef31b3a39..ba902825a066 100644 --- a/pkg/ccl/backupccl/restore_data_processor_test.go +++ b/pkg/ccl/backupccl/restore_data_processor_test.go @@ -389,10 +389,10 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) { mockRestoreDataSpec) require.NoError(t, err) ssts := make(chan mergedSST, 1) - require.NoError(t, mockRestoreDataProcessor.openSSTs(restoreSpanEntry, ssts)) + require.NoError(t, mockRestoreDataProcessor.openSSTs(ctx, restoreSpanEntry, ssts)) close(ssts) sst := <-ssts - _, err = mockRestoreDataProcessor.processRestoreSpanEntry(sst) + _, err = mockRestoreDataProcessor.processRestoreSpanEntry(ctx, sst) require.NoError(t, err) clientKVs, err := kvDB.Scan(ctx, reqStartKey, reqEndKey, 0)