diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 83f3cf4df2fb9..a75e855851718 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -828,7 +828,7 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e req.Reset() for { - resultTask, err := e.getResultTask() + resultTask, err := e.getResultTask(ctx) if err != nil { return errors.Trace(err) } @@ -846,7 +846,7 @@ func (e *IndexMergeReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) e } } -func (e *IndexMergeReaderExecutor) getResultTask() (*indexMergeTableTask, error) { +func (e *IndexMergeReaderExecutor) getResultTask(ctx context.Context) (*indexMergeTableTask, error) { failpoint.Inject("testIndexMergeMainReturnEarly", func(_ failpoint.Value) { // To make sure processWorker make resultCh to be full. // When main goroutine close finished, processWorker may be stuck when writing resultCh. @@ -860,8 +860,14 @@ func (e *IndexMergeReaderExecutor) getResultTask() (*indexMergeTableTask, error) if !ok { return nil, nil } - if err := <-task.doneCh; err != nil { - return nil, errors.Trace(err) + + select { + case <-ctx.Done(): + return nil, errors.Trace(ctx.Err()) + case err := <-task.doneCh: + if err != nil { + return nil, errors.Trace(err) + } } // Release the memory usage of last task before we handle a new task.