diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go
index 46e777b6f941d..72e174fb0a77b 100644
--- a/pkg/domain/domain.go
+++ b/pkg/domain/domain.go
@@ -2268,7 +2268,7 @@ func quitStatsOwner(do *Domain, mgr owner.Manager) {
 func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) {
 	statsHandle := do.StatsHandle()
 	for i, ctx := range ctxList {
-		statsHandle.StatsLoad.SubCtxs[i] = ctx
+		statsHandle.SetSubCtxs(i, ctx)
 		do.wg.Add(1)
 		go statsHandle.SubLoadWorker(ctx, do.exit, do.wg)
 	}
diff --git a/pkg/planner/core/casetest/planstats/BUILD.bazel b/pkg/planner/core/casetest/planstats/BUILD.bazel
index 094d33c59f3ac..c789c86065f71 100644
--- a/pkg/planner/core/casetest/planstats/BUILD.bazel
+++ b/pkg/planner/core/casetest/planstats/BUILD.bazel
@@ -21,7 +21,7 @@ go_test(
         "//pkg/sessionctx",
         "//pkg/sessionctx/stmtctx",
         "//pkg/statistics",
-        "//pkg/statistics/handle",
+        "//pkg/statistics/handle/util",
         "//pkg/table",
         "//pkg/testkit",
         "//pkg/testkit/testdata",
diff --git a/pkg/planner/core/casetest/planstats/plan_stats_test.go b/pkg/planner/core/casetest/planstats/plan_stats_test.go
index f29b42d2bfbf4..7be6b9eeb0c6a 100644
--- a/pkg/planner/core/casetest/planstats/plan_stats_test.go
+++ b/pkg/planner/core/casetest/planstats/plan_stats_test.go
@@ -32,7 +32,7 @@ import (
 	"github.com/pingcap/tidb/pkg/sessionctx"
 	"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
 	"github.com/pingcap/tidb/pkg/statistics"
-	"github.com/pingcap/tidb/pkg/statistics/handle"
+	utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util"
 	"github.com/pingcap/tidb/pkg/table"
 	"github.com/pingcap/tidb/pkg/testkit"
 	"github.com/pingcap/tidb/pkg/testkit/testdata"
@@ -268,7 +268,7 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
 	neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false}
 	resultCh := make(chan stmtctx.StatsLoadResult, 1)
 	timeout := time.Duration(1<<63 - 1)
-	task := &handle.NeededItemTask{
+	task := &utilstats.NeededItemTask{
 		TableItemID: neededColumn,
 		ResultCh:    resultCh,
 		ToTimeout:   time.Now().Local().Add(timeout),
diff --git a/pkg/statistics/handle/BUILD.bazel b/pkg/statistics/handle/BUILD.bazel
index 1a872a1b55589..229888fe212f1 100644
--- a/pkg/statistics/handle/BUILD.bazel
+++ b/pkg/statistics/handle/BUILD.bazel
@@ -6,7 +6,6 @@ go_library(
         "bootstrap.go",
         "ddl.go",
         "handle.go",
-        "handle_hist.go",
     ],
     importpath = "github.com/pingcap/tidb/pkg/statistics/handle",
     visibility = ["//visibility:public"],
@@ -15,7 +14,6 @@ go_library(
         "//pkg/ddl/util",
         "//pkg/infoschema",
         "//pkg/kv",
-        "//pkg/metrics",
         "//pkg/parser/model",
         "//pkg/parser/mysql",
         "//pkg/parser/terror",
@@ -29,14 +27,13 @@ go_library(
         "//pkg/statistics/handle/history",
         "//pkg/statistics/handle/lockstats",
         "//pkg/statistics/handle/storage",
+        "//pkg/statistics/handle/syncload",
         "//pkg/statistics/handle/usage",
         "//pkg/statistics/handle/util",
         "//pkg/types",
-        "//pkg/util",
         "//pkg/util/chunk",
         "//pkg/util/logutil",
         "@com_github_pingcap_errors//:errors",
-        "@com_github_pingcap_failpoint//:failpoint",
         "@com_github_tiancaiamao_gp//:gp",
         "@org_uber_go_atomic//:atomic",
         "@org_uber_go_zap//:zap",
@@ -48,25 +45,19 @@ go_test(
     timeout = "short",
     srcs = [
         "ddl_test.go",
-        "handle_hist_test.go",
         "main_test.go",
     ],
     embed = [":handle"],
     flaky = True,
     race = "on",
-    shard_count = 8,
+    shard_count = 4,
     deps = [
-        "//pkg/config",
         "//pkg/parser/model",
         "//pkg/planner/cardinality",
-        "//pkg/sessionctx",
-        "//pkg/sessionctx/stmtctx",
         "//pkg/testkit",
         "//pkg/testkit/testsetup",
         "//pkg/types",
-        "//pkg/util/mathutil",
         "//pkg/util/mock",
-        "@com_github_pingcap_failpoint//:failpoint",
         "@com_github_stretchr_testify//require",
         "@org_uber_go_goleak//:goleak",
     ],
diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go
index 4c01d10e10f48..7eb89c99b431f 100644
--- a/pkg/statistics/handle/handle.go
+++ b/pkg/statistics/handle/handle.go
@@ -18,11 +18,9 @@ import (
 	"math"
 	"time"
 
-	"github.com/pingcap/tidb/pkg/config"
 	ddlUtil "github.com/pingcap/tidb/pkg/ddl/util"
 	"github.com/pingcap/tidb/pkg/parser/model"
 	"github.com/pingcap/tidb/pkg/sessionctx"
-	"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
 	"github.com/pingcap/tidb/pkg/statistics"
 	"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze"
 	"github.com/pingcap/tidb/pkg/statistics/handle/cache"
@@ -30,6 +28,7 @@ import (
 	"github.com/pingcap/tidb/pkg/statistics/handle/history"
 	"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
 	"github.com/pingcap/tidb/pkg/statistics/handle/storage"
+	"github.com/pingcap/tidb/pkg/statistics/handle/syncload"
 	"github.com/pingcap/tidb/pkg/statistics/handle/usage"
 	"github.com/pingcap/tidb/pkg/statistics/handle/util"
 	"github.com/pingcap/tidb/pkg/util/logutil"
@@ -70,6 +69,9 @@ type Handle struct {
 	// StatsAnalyze is used to handle auto-analyze and manage analyze jobs.
 	util.StatsAnalyze
 
+	// StatsSyncLoad is used to load stats syncly.
+	util.StatsSyncLoad
+
 	// StatsReadWriter is used to read/write stats from/to storage.
 	util.StatsReadWriter
 
@@ -94,9 +96,6 @@ type Handle struct {
 	// StatsCache ...
 	util.StatsCache
 
-	// StatsLoad is used to load stats concurrently
-	StatsLoad StatsLoad
-
 	lease atomic2.Duration
 }
 
@@ -111,7 +110,6 @@ func (h *Handle) Clear() {
 
 // NewHandle creates a Handle for update stats.
 func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool util.SessionPool, tracker sessionctx.SysProcTracker, autoAnalyzeProcIDGetter func() uint64) (*Handle, error) {
-	cfg := config.GetGlobalConfig()
 	handle := &Handle{
 		gpool:                   gp.New(math.MaxInt16, time.Minute),
 		ddlEventCh:              make(chan *ddlUtil.Event, 1000),
@@ -135,11 +133,8 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti
 	handle.StatsHistory = history.NewStatsHistory(handle)
 	handle.StatsUsage = usage.NewStatsUsageImpl(handle)
 	handle.StatsAnalyze = autoanalyze.NewStatsAnalyze(handle)
+	handle.StatsSyncLoad = syncload.NewStatsSyncLoad(handle)
 	handle.StatsGlobal = globalstats.NewStatsGlobal(handle)
-	handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
-	handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
-	handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
-	handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
 	return handle, nil
 }
 
diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel
new file mode 100644
index 0000000000000..910011c8f6f00
--- /dev/null
+++ b/pkg/statistics/handle/syncload/BUILD.bazel
@@ -0,0 +1,44 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "syncload",
+    srcs = ["stats_syncload.go"],
+    importpath = "github.com/pingcap/tidb/pkg/statistics/handle/syncload",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//pkg/config",
+        "//pkg/metrics",
+        "//pkg/parser/model",
+        "//pkg/parser/mysql",
+        "//pkg/sessionctx",
+        "//pkg/sessionctx/stmtctx",
+        "//pkg/statistics",
+        "//pkg/statistics/handle/storage",
+        "//pkg/statistics/handle/util",
+        "//pkg/types",
+        "//pkg/util",
+        "//pkg/util/logutil",
+        "@com_github_pingcap_errors//:errors",
+        "@com_github_pingcap_failpoint//:failpoint",
+        "@org_uber_go_zap//:zap",
+    ],
+)
+
+go_test(
+    name = "syncload_test",
+    timeout = "short",
+    srcs = ["stats_syncload_test.go"],
+    flaky = True,
+    race = "on",
+    shard_count = 4,
+    deps = [
+        "//pkg/config",
+        "//pkg/parser/model",
+        "//pkg/sessionctx",
+        "//pkg/sessionctx/stmtctx",
+        "//pkg/testkit",
+        "//pkg/util/mathutil",
+        "@com_github_pingcap_failpoint//:failpoint",
+        "@com_github_stretchr_testify//require",
+    ],
+)
diff --git a/pkg/statistics/handle/handle_hist.go b/pkg/statistics/handle/syncload/stats_syncload.go
similarity index 72%
rename from pkg/statistics/handle/handle_hist.go
rename to pkg/statistics/handle/syncload/stats_syncload.go
index 924fda56698b0..b071cdac5523b 100644
--- a/pkg/statistics/handle/handle_hist.go
+++ b/pkg/statistics/handle/syncload/stats_syncload.go
@@ -1,4 +1,4 @@
-// Copyright 2021 PingCAP, Inc.
+// Copyright 2023 PingCAP, Inc.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -12,11 +12,10 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package handle
+package syncload
 
 import (
 	"fmt"
-	"sync"
 	"time"
 
 	"github.com/pingcap/errors"
@@ -36,30 +35,36 @@ import (
 	"go.uber.org/zap"
 )
 
+type statsSyncLoad struct {
+	statsHandle utilstats.StatsHandle
+	StatsLoad   utilstats.StatsLoad
+}
+
+// NewStatsSyncLoad creates a new StatsSyncLoad.
+func NewStatsSyncLoad(statsHandle utilstats.StatsHandle) utilstats.StatsSyncLoad {
+	s := &statsSyncLoad{statsHandle: statsHandle}
+	cfg := config.GetGlobalConfig()
+	s.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
+	s.StatsLoad.NeededItemsCh = make(chan *utilstats.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
+	s.StatsLoad.TimeoutItemsCh = make(chan *utilstats.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
+	s.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
+	return s
+}
+
 type statsWrapper struct {
 	col *statistics.Column
 	idx *statistics.Index
 }
 
-// StatsLoad is used to load stats concurrently
-type StatsLoad struct {
-	NeededItemsCh  chan *NeededItemTask
-	TimeoutItemsCh chan *NeededItemTask
-	WorkingColMap  map[model.TableItemID][]chan stmtctx.StatsLoadResult
-	SubCtxs        []sessionctx.Context
-	sync.Mutex
-}
-
-// NeededItemTask represents one needed column/indices with expire time.
-type NeededItemTask struct {
-	ToTimeout   time.Time
-	ResultCh    chan stmtctx.StatsLoadResult
-	TableItemID model.TableItemID
+// SetSubCtxs sets the sessionctx which is used to run queries background.
+// TODO: use SessionPool instead.
+func (s *statsSyncLoad) SetSubCtxs(idx int, sctx sessionctx.Context) {
+	s.StatsLoad.SubCtxs[idx] = sctx
 }
 
 // SendLoadRequests send neededColumns requests
-func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.TableItemID, timeout time.Duration) error {
-	remainedItems := h.removeHistLoadedColumns(neededHistItems)
+func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.TableItemID, timeout time.Duration) error {
+	remainedItems := s.removeHistLoadedColumns(neededHistItems)
 
 	failpoint.Inject("assertSyncLoadItems", func(val failpoint.Value) {
 		if sc.OptimizeTracer != nil {
@@ -76,9 +81,9 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
 	sc.StatsLoad.Timeout = timeout
 	sc.StatsLoad.NeededItems = remainedItems
 	sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
-	tasks := make([]*NeededItemTask, 0)
+	tasks := make([]*utilstats.NeededItemTask, 0)
 	for _, item := range remainedItems {
-		task := &NeededItemTask{
+		task := &utilstats.NeededItemTask{
 			TableItemID: item,
 			ToTimeout:   time.Now().Local().Add(timeout),
 			ResultCh:    sc.StatsLoad.ResultCh,
@@ -89,7 +94,7 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
 	defer timer.Stop()
 	for _, task := range tasks {
 		select {
-		case h.StatsLoad.NeededItemsCh <- task:
+		case s.StatsLoad.NeededItemsCh <- task:
 			continue
 		case <-timer.C:
 			return errors.New("sync load stats channel is full and timeout sending task to channel")
@@ -100,7 +105,7 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
 }
 
 // SyncWaitStatsLoad sync waits loading of neededColumns and return false if timeout
-func (*Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error {
+func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error {
 	if len(sc.StatsLoad.NeededItems) <= 0 {
 		return nil
 	}
@@ -141,10 +146,10 @@ func (*Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error {
 }
 
 // removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache.
-func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []model.TableItemID {
+func (s *statsSyncLoad) removeHistLoadedColumns(neededItems []model.TableItemID) []model.TableItemID {
 	remainedItems := make([]model.TableItemID, 0, len(neededItems))
 	for _, item := range neededItems {
-		tbl, ok := h.Get(item.TableID)
+		tbl, ok := s.statsHandle.Get(item.TableID)
 		if !ok {
 			continue
 		}
@@ -161,11 +166,11 @@ func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []mode
 }
 
 // AppendNeededItem appends needed columns/indices to ch, it is only used for test
-func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error {
+func (s *statsSyncLoad) AppendNeededItem(task *utilstats.NeededItemTask, timeout time.Duration) error {
 	timer := time.NewTimer(timeout)
 	defer timer.Stop()
 	select {
-	case h.StatsLoad.NeededItemsCh <- task:
+	case s.StatsLoad.NeededItemsCh <- task:
 	case <-timer.C:
 		return errors.New("Channel is full and timeout writing to channel")
 	}
@@ -175,22 +180,22 @@ func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) e
 var errExit = errors.New("Stop loading since domain is closed")
 
 // SubLoadWorker loads hist data for each column
-func (h *Handle) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) {
+func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) {
 	defer func() {
 		exitWg.Done()
 		logutil.BgLogger().Info("SubLoadWorker exited.")
 	}()
 	// if the last task is not successfully handled in last round for error or panic, pass it to this round to retry
-	var lastTask *NeededItemTask
+	var lastTask *utilstats.NeededItemTask
 	for {
-		task, err := h.HandleOneTask(sctx, lastTask, exit)
+		task, err := s.HandleOneTask(sctx, lastTask, exit)
 		lastTask = task
 		if err != nil {
 			switch err {
 			case errExit:
 				return
 			default:
-				time.Sleep(h.Lease() / 10)
+				time.Sleep(s.statsHandle.Lease() / 10)
 				continue
 			}
 		}
@@ -198,7 +203,7 @@ func (h *Handle) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exit
 }
 
 // HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere.
-func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error) {
+func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *utilstats.NeededItemTask, exit chan struct{}) (task *utilstats.NeededItemTask, err error) {
 	defer func() {
 		// recover for each task, worker keeps working
 		if r := recover(); r != nil {
@@ -207,7 +212,7 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
 		}
 	}()
 	if lastTask == nil {
-		task, err = h.drainColTask(exit)
+		task, err = s.drainColTask(exit)
 		if err != nil {
 			if err != errExit {
 				logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
@@ -217,15 +222,15 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
 	} else {
 		task = lastTask
 	}
-	return h.handleOneItemTask(sctx, task)
+	return s.handleOneItemTask(sctx, task)
 }
 
-func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask) (*NeededItemTask, error) {
+func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *utilstats.NeededItemTask) (*utilstats.NeededItemTask, error) {
 	result := stmtctx.StatsLoadResult{Item: task.TableItemID}
 	item := result.Item
-	tbl, ok := h.Get(item.TableID)
+	tbl, ok := s.statsHandle.Get(item.TableID)
 	if !ok {
-		h.writeToResultChan(task.ResultCh, result)
+		s.writeToResultChan(task.ResultCh, result)
 		return nil, nil
 	}
 	var err error
@@ -233,27 +238,27 @@ func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask
 	if item.IsIndex {
 		index, ok := tbl.Indices[item.ID]
 		if !ok || index.IsFullLoad() {
-			h.writeToResultChan(task.ResultCh, result)
+			s.writeToResultChan(task.ResultCh, result)
 			return nil, nil
 		}
 		wrapper.idx = index
 	} else {
 		col, ok := tbl.Columns[item.ID]
 		if !ok || col.IsFullLoad() {
-			h.writeToResultChan(task.ResultCh, result)
+			s.writeToResultChan(task.ResultCh, result)
 			return nil, nil
 		}
 		wrapper.col = col
 	}
 	// to avoid duplicated handling in concurrent scenario
-	working := h.setWorking(result.Item, task.ResultCh)
+	working := s.setWorking(result.Item, task.ResultCh)
 	if !working {
-		h.writeToResultChan(task.ResultCh, result)
+		s.writeToResultChan(task.ResultCh, result)
 		return nil, nil
 	}
 	t := time.Now()
 	needUpdate := false
-	wrapper, err = h.readStatsForOneItem(sctx, item, wrapper)
+	wrapper, err = s.readStatsForOneItem(sctx, item, wrapper)
 	if err != nil {
 		result.Error = err
 		return task, err
@@ -268,15 +273,15 @@ func (h *Handle) handleOneItemTask(sctx sessionctx.Context, task *NeededItemTask
 		}
 	}
 	metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
-	if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
-		h.writeToResultChan(task.ResultCh, result)
+	if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx) {
+		s.writeToResultChan(task.ResultCh, result)
 	}
-	h.finishWorking(result)
+	s.finishWorking(result)
 	return nil, nil
 }
 
 // readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously
-func (*Handle) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper) (*statsWrapper, error) {
+func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.TableItemID, w *statsWrapper) (*statsWrapper, error) {
 	failpoint.Inject("mockReadStatsForOnePanic", nil)
 	failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) {
 		if val.(bool) {
@@ -362,33 +367,33 @@ func (*Handle) readStatsForOneItem(sctx sessionctx.Context, item model.TableItem
 }
 
 // drainColTask will hang until a column task can return, and either task or error will be returned.
-func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
+func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*utilstats.NeededItemTask, error) {
 	// select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh
 	for {
 		select {
 		case <-exit:
 			return nil, errExit
-		case task, ok := <-h.StatsLoad.NeededItemsCh:
+		case task, ok := <-s.StatsLoad.NeededItemsCh:
 			if !ok {
 				return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed")
 			}
 			// if the task has already timeout, no sql is sync-waiting for it,
 			// so do not handle it just now, put it to another channel with lower priority
 			if time.Now().After(task.ToTimeout) {
-				h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
+				s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task)
 				continue
 			}
 			return task, nil
-		case task, ok := <-h.StatsLoad.TimeoutItemsCh:
+		case task, ok := <-s.StatsLoad.TimeoutItemsCh:
 			select {
 			case <-exit:
 				return nil, errExit
-			case task0, ok0 := <-h.StatsLoad.NeededItemsCh:
+			case task0, ok0 := <-s.StatsLoad.NeededItemsCh:
 				if !ok0 {
 					return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed")
 				}
 				// send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh
-				h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
+				s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task)
 				return task0, nil
 			default:
 				if !ok {
@@ -402,7 +407,7 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
 }
 
 // writeToTimeoutChan writes in a nonblocking way, and if the channel queue is full, it's ok to drop the task.
-func (*Handle) writeToTimeoutChan(taskCh chan *NeededItemTask, task *NeededItemTask) {
+func (*statsSyncLoad) writeToTimeoutChan(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask) {
 	select {
 	case taskCh <- task:
 	default:
@@ -410,7 +415,7 @@ func (*Handle) writeToTimeoutChan(taskCh chan *NeededItemTask, task *NeededItemT
 }
 
 // writeToChanWithTimeout writes a task to a channel and blocks until timeout.
-func (*Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *NeededItemTask, timeout time.Duration) error {
+func (*statsSyncLoad) writeToChanWithTimeout(taskCh chan *utilstats.NeededItemTask, task *utilstats.NeededItemTask, timeout time.Duration) error {
 	timer := time.NewTimer(timeout)
 	defer timer.Stop()
 	select {
@@ -422,7 +427,7 @@ func (*Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *NeededI
 }
 
 // writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact.
-func (*Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) {
+func (*statsSyncLoad) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) {
 	defer func() {
 		if r := recover(); r != nil {
 			logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack"))
@@ -435,12 +440,12 @@ func (*Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtc
 }
 
 // updateCachedItem updates the column/index hist to global statsCache.
-func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index) (updated bool) {
-	h.StatsLoad.Lock()
-	defer h.StatsLoad.Unlock()
+func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statistics.Column, idxHist *statistics.Index) (updated bool) {
+	s.StatsLoad.Lock()
+	defer s.StatsLoad.Unlock()
 	// Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions
 	// like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already.
-	tbl, ok := h.Get(item.TableID)
+	tbl, ok := s.statsHandle.Get(item.TableID)
 	if !ok {
 		return true
 	}
@@ -459,35 +464,35 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
 		tbl = tbl.Copy()
 		tbl.Indices[item.ID] = idxHist
 	}
-	h.UpdateStatsCache([]*statistics.Table{tbl}, nil)
+	s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil)
 	return true
 }
 
-func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
-	h.StatsLoad.Lock()
-	defer h.StatsLoad.Unlock()
-	chList, ok := h.StatsLoad.WorkingColMap[item]
+func (s *statsSyncLoad) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
+	s.StatsLoad.Lock()
+	defer s.StatsLoad.Unlock()
+	chList, ok := s.StatsLoad.WorkingColMap[item]
 	if ok {
 		if chList[0] == resultCh {
 			return true // just return for duplicate setWorking
 		}
-		h.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
+		s.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
 		return false
 	}
 	chList = []chan stmtctx.StatsLoadResult{}
 	chList = append(chList, resultCh)
-	h.StatsLoad.WorkingColMap[item] = chList
+	s.StatsLoad.WorkingColMap[item] = chList
 	return true
 }
 
-func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) {
-	h.StatsLoad.Lock()
-	defer h.StatsLoad.Unlock()
-	if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok {
+func (s *statsSyncLoad) finishWorking(result stmtctx.StatsLoadResult) {
+	s.StatsLoad.Lock()
+	defer s.StatsLoad.Unlock()
+	if chList, ok := s.StatsLoad.WorkingColMap[result.Item]; ok {
 		list := chList[1:]
 		for _, ch := range list {
-			h.writeToResultChan(ch, result)
+			s.writeToResultChan(ch, result)
 		}
 	}
-	delete(h.StatsLoad.WorkingColMap, result.Item)
+	delete(s.StatsLoad.WorkingColMap, result.Item)
 }
diff --git a/pkg/statistics/handle/handle_hist_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go
similarity index 92%
rename from pkg/statistics/handle/handle_hist_test.go
rename to pkg/statistics/handle/syncload/stats_syncload_test.go
index 28d95020510ce..356dda7e6dd2d 100644
--- a/pkg/statistics/handle/handle_hist_test.go
+++ b/pkg/statistics/handle/syncload/stats_syncload_test.go
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package handle_test
+package syncload_test
 
 import (
 	"testing"
@@ -40,14 +40,14 @@ func TestSyncLoadSkipUnAnalyzedItems(t *testing.T) {
 	h.SetLease(1)
 
 	// no item would be loaded
-	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems", `return(0)`))
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems", `return(0)`))
 	tk.MustQuery("trace plan select * from t where a > 10")
-	failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems")
+	failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems")
 	tk.MustExec("analyze table t1")
 	// one column would be loaded
-	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems", `return(1)`))
+	require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems", `return(1)`))
 	tk.MustQuery("trace plan select * from t1 where a > 10")
-	failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/assertSyncLoadItems")
+	failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/assertSyncLoadItems")
 }
 
 func TestConcurrentLoadHist(t *testing.T) {
@@ -175,11 +175,11 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
 		inTerms  string
 	}{
 		{
-			failPath: "github.com/pingcap/tidb/pkg/statistics/handle/mockReadStatsForOnePanic",
+			failPath: "github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOnePanic",
 			inTerms:  "panic",
 		},
 		{
-			failPath: "github.com/pingcap/tidb/pkg/statistics/handle/mockReadStatsForOneFail",
+			failPath: "github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail",
 			inTerms:  "return(true)",
 		},
 	}
@@ -206,18 +206,10 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
 		task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
 		require.Error(t, err1)
 		require.NotNil(t, task1)
-		list, ok := h.StatsLoad.WorkingColMap[neededColumns[0]]
-		require.True(t, ok)
-		require.Len(t, list, 1)
-		require.Equal(t, stmtCtx1.StatsLoad.ResultCh, list[0])
 
 		task2, err2 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
 		require.Nil(t, err2)
 		require.Nil(t, task2)
-		list, ok = h.StatsLoad.WorkingColMap[neededColumns[0]]
-		require.True(t, ok)
-		require.Len(t, list, 2)
-		require.Equal(t, stmtCtx2.StatsLoad.ResultCh, list[1])
 
 		require.NoError(t, failpoint.Disable(fp.failPath))
 		task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
diff --git a/pkg/statistics/handle/util/BUILD.bazel b/pkg/statistics/handle/util/BUILD.bazel
index 2b3e27d834cdb..292161a37817c 100644
--- a/pkg/statistics/handle/util/BUILD.bazel
+++ b/pkg/statistics/handle/util/BUILD.bazel
@@ -16,10 +16,12 @@ go_library(
         "//pkg/parser/model",
         "//pkg/parser/terror",
         "//pkg/sessionctx",
+        "//pkg/sessionctx/stmtctx",
         "//pkg/sessionctx/variable",
         "//pkg/statistics",
         "//pkg/table",
         "//pkg/types",
+        "//pkg/util",
         "//pkg/util/chunk",
         "//pkg/util/intest",
         "//pkg/util/sqlexec",
diff --git a/pkg/statistics/handle/util/interfaces.go b/pkg/statistics/handle/util/interfaces.go
index eb894d22118e4..6d2d1d9db81c0 100644
--- a/pkg/statistics/handle/util/interfaces.go
+++ b/pkg/statistics/handle/util/interfaces.go
@@ -16,14 +16,17 @@ package util
 
 import (
 	"context"
+	"sync"
 	"time"
 
 	"github.com/pingcap/tidb/pkg/infoschema"
 	"github.com/pingcap/tidb/pkg/parser/ast"
 	"github.com/pingcap/tidb/pkg/parser/model"
 	"github.com/pingcap/tidb/pkg/sessionctx"
+	"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
 	"github.com/pingcap/tidb/pkg/statistics"
 	"github.com/pingcap/tidb/pkg/types"
+	"github.com/pingcap/tidb/pkg/util"
 	"github.com/pingcap/tidb/pkg/util/sqlexec"
 	"github.com/tiancaiamao/gp"
 )
@@ -294,6 +297,44 @@ type StatsReadWriter interface {
 	SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error)
 }
 
+// NeededItemTask represents one needed column/indices with expire time.
+type NeededItemTask struct {
+	ToTimeout   time.Time
+	ResultCh    chan stmtctx.StatsLoadResult
+	TableItemID model.TableItemID
+}
+
+// StatsLoad is used to load stats concurrently
+type StatsLoad struct {
+	NeededItemsCh  chan *NeededItemTask
+	TimeoutItemsCh chan *NeededItemTask
+	WorkingColMap  map[model.TableItemID][]chan stmtctx.StatsLoadResult
+	SubCtxs        []sessionctx.Context
+	sync.Mutex
+}
+
+// StatsSyncLoad implement the sync-load feature.
+type StatsSyncLoad interface {
+	// SendLoadRequests sends load requests to the channel.
+	SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.TableItemID, timeout time.Duration) error
+
+	// SyncWaitStatsLoad will wait for the load requests to finish.
+	SyncWaitStatsLoad(sc *stmtctx.StatementContext) error
+
+	// AppendNeededItem appends a needed item to the channel.
+	AppendNeededItem(task *NeededItemTask, timeout time.Duration) error
+
+	// SubLoadWorker will start a goroutine to handle the load requests.
+	SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper)
+
+	// HandleOneTask will handle one task.
+	HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, exit chan struct{}) (task *NeededItemTask, err error)
+
+	// SetSubCtxs sets the sessionctx which is used to run queries background.
+	// TODO: use SessionPool instead.
+	SetSubCtxs(idx int, sctx sessionctx.Context)
+}
+
 // StatsGlobal is used to manage partition table global stats.
 type StatsGlobal interface {
 	// MergePartitionStats2GlobalStatsByTableID merges partition stats to global stats by table ID.