diff --git a/util/chunk/BUILD.bazel b/util/chunk/BUILD.bazel
index 1ff89ec1d07f2..94132870f766c 100644
--- a/util/chunk/BUILD.bazel
+++ b/util/chunk/BUILD.bazel
@@ -16,6 +16,7 @@ go_library(
         "pool.go",
         "row.go",
         "row_container.go",
+        "row_container_reader.go",
     ],
     importpath = "github.com/pingcap/tidb/util/chunk",
     visibility = ["//visibility:public"],
diff --git a/util/chunk/disk.go b/util/chunk/disk.go
index 2bca7a62e17c5..3c40e3861665b 100644
--- a/util/chunk/disk.go
+++ b/util/chunk/disk.go
@@ -15,6 +15,7 @@
 package chunk
 
 import (
+	"bufio"
 	"io"
 	"os"
 	"strconv"
@@ -172,13 +173,37 @@ func (l *ListInDisk) Add(chk *Chunk) (err error) {
 func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {
 	chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx))
 	chkSize := l.numRowsOfEachChunk[chkIdx]
-	for rowIdx := 0; rowIdx < chkSize; rowIdx++ {
-		_, _, err := l.GetRowAndAppendToChunk(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}, chk)
-		if err != nil {
-			return chk, err
+
+	firstRowOffset, err := l.getOffset(uint32(chkIdx), 0)
+	if err != nil {
+		return nil, err
+	}
+
+	// this channel is big enough and will never be blocked.
+	formatCh := make(chan rowInDisk, chkSize)
+	var formatChErr error
+	go func() {
+		defer close(formatCh)
+
+		// If the row is small, a bufio can significantly improve the performance. As benchmark shows, it's still not bad
+		// for longer rows.
+		r := bufio.NewReader(l.dataFile.getSectionReader(firstRowOffset))
+		format := rowInDisk{numCol: len(l.fieldTypes)}
+		for rowIdx := 0; rowIdx < chkSize; rowIdx++ {
+			_, err = format.ReadFrom(r)
+			if err != nil {
+				formatChErr = err
+				break
+			}
+
+			formatCh <- format
 		}
+	}()
+
+	for format := range formatCh {
+		_, chk = format.toRow(l.fieldTypes, chk)
 	}
-	return chk, nil
+	return chk, formatChErr
 }
 
 // GetRow gets a Row from the ListInDisk by RowPtr.
diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go
index 1217e86f31944..2b80fe644cd58 100644
--- a/util/chunk/disk_test.go
+++ b/util/chunk/disk_test.go
@@ -259,6 +259,7 @@ func testListInDisk(t *testing.T, concurrency int) {
 }
 
 func BenchmarkListInDisk_GetChunk(b *testing.B) {
+	b.StopTimer()
 	numChk, numRow := 10, 1000
 	chks, fields := initChunks(numChk, numRow)
 	l := NewListInDisk(fields)
@@ -267,6 +268,7 @@ func BenchmarkListInDisk_GetChunk(b *testing.B) {
 		_ = l.Add(chk)
 	}
 
+	b.StartTimer()
 	for i := 0; i < b.N; i++ {
 		v := i % numChk
 		_, _ = l.GetChunk(v)
diff --git a/util/chunk/row_container_reader.go b/util/chunk/row_container_reader.go
new file mode 100644
index 0000000000000..b96b20c6921c1
--- /dev/null
+++ b/util/chunk/row_container_reader.go
@@ -0,0 +1,163 @@
+// Copyright 2023 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package chunk
+
+import (
+	"context"
+	"runtime"
+	"sync"
+
+	"github.com/pingcap/tidb/util/logutil"
+)
+
+// RowContainerReader is a forward-only iterator for the row container. It provides an interface similar to other
+// iterators, but it doesn't provide `ReachEnd` function and requires manually closing to release goroutine.
+//
+// It's recommended to use the following pattern to use it:
+//
+//	for iter := NewRowContainerReader(rc); iter.Current() != iter.End(); iter.Next() {
+//		...
+//	}
+//	iter.Close()
+//	if iter.Error() != nil {
+//	}
+type RowContainerReader interface {
+	// Next returns the next Row.
+	Next() Row
+
+	// Current returns the current Row.
+	Current() Row
+
+	// End returns the invalid end Row.
+	End() Row
+
+	// Error returns none-nil error if anything wrong happens during the iteration.
+	Error() error
+
+	// Close closes the dumper
+	Close()
+}
+
+var _ RowContainerReader = &rowContainerReader{}
+
+// rowContainerReader is a forward-only iterator for the row container
+// It will spawn two goroutines for reading chunks from disk, and converting the chunk to rows. The row will only be sent
+// to `rowCh` inside only after when the full chunk has been read, to avoid concurrently read/write to the chunk.
+//
+// TODO: record the memory allocated for the channel and chunks.
+type rowContainerReader struct {
+	// context, cancel and waitgroup are used to stop and wait until all goroutine stops.
+	ctx    context.Context
+	cancel func()
+	wg     sync.WaitGroup
+
+	rc *RowContainer
+
+	currentRow Row
+	rowCh      chan Row
+
+	// this error will only be set by worker
+	err error
+}
+
+// Next implements RowContainerReader
+func (reader *rowContainerReader) Next() Row {
+	for row := range reader.rowCh {
+		reader.currentRow = row
+		return row
+	}
+	reader.currentRow = reader.End()
+	return reader.End()
+}
+
+// Current implements RowContainerReader
+func (reader *rowContainerReader) Current() Row {
+	return reader.currentRow
+}
+
+// End implements RowContainerReader
+func (*rowContainerReader) End() Row {
+	return Row{}
+}
+
+// Error implements RowContainerReader
+func (reader *rowContainerReader) Error() error {
+	return reader.err
+}
+
+func (reader *rowContainerReader) initializeChannel() {
+	if reader.rc.NumChunks() == 0 {
+		reader.rowCh = make(chan Row, 1024)
+	} else {
+		assumeChunkSize := reader.rc.NumRowsOfChunk(0)
+		// To avoid blocking in sending to `rowCh` and  don't start reading the next chunk, it'd be better to give it
+		// a buffer at least larger than a single chunk. Here it's allocated twice the chunk size to leave some margin.
+		reader.rowCh = make(chan Row, 2*assumeChunkSize)
+	}
+}
+
+// Close implements RowContainerReader
+func (reader *rowContainerReader) Close() {
+	reader.cancel()
+	reader.wg.Wait()
+}
+
+func (reader *rowContainerReader) startWorker() {
+	reader.wg.Add(1)
+	go func() {
+		defer close(reader.rowCh)
+		defer reader.wg.Done()
+
+		for chkIdx := 0; chkIdx < reader.rc.NumChunks(); chkIdx++ {
+			chk, err := reader.rc.GetChunk(chkIdx)
+			if err != nil {
+				reader.err = err
+				return
+			}
+
+			for i := 0; i < chk.NumRows(); i++ {
+				select {
+				case reader.rowCh <- chk.GetRow(i):
+				case <-reader.ctx.Done():
+					return
+				}
+			}
+		}
+	}()
+}
+
+// NewRowContainerReader creates a forward only iterator for row container
+func NewRowContainerReader(rc *RowContainer) *rowContainerReader {
+	ctx, cancel := context.WithCancel(context.Background())
+
+	reader := &rowContainerReader{
+		ctx:    ctx,
+		cancel: cancel,
+		wg:     sync.WaitGroup{},
+
+		rc: rc,
+	}
+	reader.initializeChannel()
+	reader.startWorker()
+	reader.Next()
+	runtime.SetFinalizer(reader, func(reader *rowContainerReader) {
+		if reader.ctx.Err() == nil {
+			logutil.BgLogger().Warn("rowContainerReader is closed by finalizer")
+			reader.Close()
+		}
+	})
+
+	return reader
+}
diff --git a/util/chunk/row_container_test.go b/util/chunk/row_container_test.go
index b0972c179388a..2160496813cfd 100644
--- a/util/chunk/row_container_test.go
+++ b/util/chunk/row_container_test.go
@@ -15,10 +15,14 @@
 package chunk
 
 import (
+	"crypto/rand"
+	rand2 "math/rand"
+	"sync"
 	"testing"
 	"time"
 
 	"github.com/pingcap/failpoint"
+	"github.com/pingcap/tidb/config"
 	"github.com/pingcap/tidb/parser/mysql"
 	"github.com/pingcap/tidb/types"
 	"github.com/pingcap/tidb/util/memory"
@@ -303,3 +307,187 @@ func TestActionBlocked(t *testing.T) {
 	ac.Action(tracker)
 	require.GreaterOrEqual(t, time.Since(starttime), 200*time.Millisecond)
 }
+
+func insertBytesRowsIntoRowContainer(t *testing.T, chkCount int, rowPerChk int) (*RowContainer, [][]byte) {
+	longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(4096).Build()
+	fields := []*types.FieldType{&longVarCharTyp}
+
+	rc := NewRowContainer(fields, chkCount)
+
+	allRows := [][]byte{}
+	// insert chunks
+	for i := 0; i < chkCount; i++ {
+		chk := NewChunkWithCapacity(fields, rowPerChk)
+		// insert rows for each chunk
+		for j := 0; j < rowPerChk; j++ {
+			length := rand2.Uint32()
+			randomBytes := make([]byte, length%4096)
+			_, err := rand.Read(randomBytes)
+			require.NoError(t, err)
+
+			chk.AppendBytes(0, randomBytes)
+			allRows = append(allRows, randomBytes)
+		}
+		require.NoError(t, rc.Add(chk))
+	}
+
+	return rc, allRows
+}
+
+func TestRowContainerReaderInDisk(t *testing.T) {
+	restore := config.RestoreFunc()
+	defer restore()
+	config.UpdateGlobal(func(conf *config.Config) {
+		conf.TempStoragePath = t.TempDir()
+	})
+
+	rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16)
+	rc.SpillToDisk()
+
+	reader := NewRowContainerReader(rc)
+	defer reader.Close()
+	for i := 0; i < 16; i++ {
+		for j := 0; j < 16; j++ {
+			row := reader.Current()
+			require.Equal(t, allRows[i*16+j], row.GetBytes(0))
+			reader.Next()
+		}
+	}
+}
+
+func TestCloseRowContainerReader(t *testing.T) {
+	restore := config.RestoreFunc()
+	defer restore()
+	config.UpdateGlobal(func(conf *config.Config) {
+		conf.TempStoragePath = t.TempDir()
+	})
+
+	rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 16)
+	rc.SpillToDisk()
+
+	// read 8.5 of these chunks
+	reader := NewRowContainerReader(rc)
+	defer reader.Close()
+	for i := 0; i < 8; i++ {
+		for j := 0; j < 16; j++ {
+			row := reader.Current()
+			require.Equal(t, allRows[i*16+j], row.GetBytes(0))
+			reader.Next()
+		}
+	}
+	for j := 0; j < 8; j++ {
+		row := reader.Current()
+		require.Equal(t, allRows[8*16+j], row.GetBytes(0))
+		reader.Next()
+	}
+}
+
+func TestConcurrentSpillWithRowContainerReader(t *testing.T) {
+	restore := config.RestoreFunc()
+	defer restore()
+	config.UpdateGlobal(func(conf *config.Config) {
+		conf.TempStoragePath = t.TempDir()
+	})
+
+	rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024)
+
+	var wg sync.WaitGroup
+	// concurrently read and spill to disk
+	wg.Add(1)
+	go func() {
+		defer wg.Done()
+		reader := NewRowContainerReader(rc)
+		defer reader.Close()
+
+		for i := 0; i < 16; i++ {
+			for j := 0; j < 1024; j++ {
+				row := reader.Current()
+				require.Equal(t, allRows[i*1024+j], row.GetBytes(0))
+				reader.Next()
+			}
+		}
+	}()
+	rc.SpillToDisk()
+	wg.Wait()
+}
+
+func TestReadAfterSpillWithRowContainerReader(t *testing.T) {
+	restore := config.RestoreFunc()
+	defer restore()
+	config.UpdateGlobal(func(conf *config.Config) {
+		conf.TempStoragePath = t.TempDir()
+	})
+
+	rc, allRows := insertBytesRowsIntoRowContainer(t, 16, 1024)
+
+	reader := NewRowContainerReader(rc)
+	defer reader.Close()
+	for i := 0; i < 8; i++ {
+		for j := 0; j < 1024; j++ {
+			row := reader.Current()
+			require.Equal(t, allRows[i*1024+j], row.GetBytes(0))
+			reader.Next()
+		}
+	}
+	rc.SpillToDisk()
+	for i := 8; i < 16; i++ {
+		for j := 0; j < 1024; j++ {
+			row := reader.Current()
+			require.Equal(t, allRows[i*1024+j], row.GetBytes(0))
+			reader.Next()
+		}
+	}
+}
+
+func BenchmarkRowContainerReaderInDiskWithRowSize512(b *testing.B) {
+	benchmarkRowContainerReaderInDiskWithRowLength(b, 512)
+}
+
+func BenchmarkRowContainerReaderInDiskWithRowSize1024(b *testing.B) {
+	benchmarkRowContainerReaderInDiskWithRowLength(b, 1024)
+}
+
+func BenchmarkRowContainerReaderInDiskWithRowSize4096(b *testing.B) {
+	benchmarkRowContainerReaderInDiskWithRowLength(b, 4096)
+}
+
+func benchmarkRowContainerReaderInDiskWithRowLength(b *testing.B, rowLength int) {
+	b.StopTimer()
+
+	restore := config.RestoreFunc()
+	defer restore()
+	config.UpdateGlobal(func(conf *config.Config) {
+		conf.TempStoragePath = b.TempDir()
+	})
+
+	longVarCharTyp := types.NewFieldTypeBuilder().SetType(mysql.TypeVarchar).SetFlen(rowLength).Build()
+	fields := []*types.FieldType{&longVarCharTyp}
+
+	randomBytes := make([]byte, rowLength)
+	_, err := rand.Read(randomBytes)
+	require.NoError(b, err)
+
+	// create a row container which stores the data in disk
+	rc := NewRowContainer(fields, 1<<10)
+	rc.SpillToDisk()
+
+	// insert `b.N * 1<<10` rows (`b.N` chunks) into the rc
+	for i := 0; i < b.N; i++ {
+		chk := NewChunkWithCapacity(fields, 1<<10)
+		for j := 0; j < 1<<10; j++ {
+			chk.AppendBytes(0, randomBytes)
+		}
+
+		rc.Add(chk)
+	}
+
+	reader := NewRowContainerReader(rc)
+	defer reader.Close()
+	b.StartTimer()
+	for n := 0; n < b.N; n++ {
+		for i := 0; i < 1<<10; i++ {
+			reader.Next()
+		}
+	}
+	require.NoError(b, reader.Error())
+}