diff --git a/ttl/client/BUILD.bazel b/ttl/client/BUILD.bazel
index 6f2c7acaae481..e842ad03a887b 100644
--- a/ttl/client/BUILD.bazel
+++ b/ttl/client/BUILD.bazel
@@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 go_library(
     name = "client",
-    srcs = ["command.go"],
+    srcs = [
+        "command.go",
+        "notification.go",
+    ],
     importpath = "github.com/pingcap/tidb/ttl/client",
     visibility = ["//visibility:public"],
     deps = [
+        "//ddl/util",
         "//util/logutil",
         "@com_github_google_uuid//:uuid",
         "@com_github_pingcap_errors//:errors",
diff --git a/ttl/client/command.go b/ttl/client/command.go
index bad2d756353cd..a285d9b186e3c 100644
--- a/ttl/client/command.go
+++ b/ttl/client/command.go
@@ -112,12 +112,13 @@ func TriggerNewTTLJob(ctx context.Context, cli CommandClient, dbName, tableName
 	return &resp, nil
 }
 
+// etcdClient is the client of etcd which implements the commandCli and notificationCli interface
 type etcdClient struct {
 	etcdCli *clientv3.Client
 }
 
-// NewEtcdCommandClient creates a client with etcd
-func NewEtcdCommandClient(etcdCli *clientv3.Client) CommandClient {
+// NewCommandClient creates a command client with etcd
+func NewCommandClient(etcdCli *clientv3.Client) CommandClient {
 	return &etcdClient{
 		etcdCli: etcdCli,
 	}
@@ -196,6 +197,7 @@ loop:
 	return json.Unmarshal(cmdResp.Data, obj)
 }
 
+// Command implements the CommandClient
 func (c *etcdClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) {
 	requestID, err := c.sendCmd(ctx, cmdType, request)
 	if err != nil {
@@ -204,6 +206,7 @@ func (c *etcdClient) Command(ctx context.Context, cmdType string, request interf
 	return requestID, c.waitCmdResponse(ctx, requestID, &response)
 }
 
+// TakeCommand implements the CommandClient
 func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error) {
 	resp, err := c.etcdCli.Delete(ctx, ttlCmdKeyRequestPrefix+reqID)
 	if err != nil {
@@ -212,6 +215,7 @@ func (c *etcdClient) TakeCommand(ctx context.Context, reqID string) (bool, error
 	return resp.Deleted > 0, nil
 }
 
+// ResponseCommand implements the CommandClient
 func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj interface{}) error {
 	resp := &cmdResponse{
 		RequestID: reqID,
@@ -241,6 +245,7 @@ func (c *etcdClient) ResponseCommand(ctx context.Context, reqID string, obj inte
 	return err
 }
 
+// WatchCommand implements the CommandClient
 func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
 	ch := make(chan *CmdRequest)
 	go func() {
@@ -279,20 +284,24 @@ func (c *etcdClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
 	return ch
 }
 
+// mockClient is a mock implementation for CommandCli and NotificationCli
 type mockClient struct {
 	sync.Mutex
-	store    map[string]interface{}
-	watchers []chan *CmdRequest
+	store                map[string]interface{}
+	commandWatchers      []chan *CmdRequest
+	notificationWatchers map[string][]chan clientv3.WatchResponse
 }
 
-// NewMockCommandClient creates a mock client
+// NewMockCommandClient creates a mock command client
 func NewMockCommandClient() CommandClient {
 	return &mockClient{
-		store:    make(map[string]interface{}),
-		watchers: make([]chan *CmdRequest, 0, 1),
+		store:                make(map[string]interface{}),
+		commandWatchers:      make([]chan *CmdRequest, 0, 1),
+		notificationWatchers: make(map[string][]chan clientv3.WatchResponse),
 	}
 }
 
+// Command implements the CommandClient
 func (c *mockClient) Command(ctx context.Context, cmdType string, request interface{}, response interface{}) (string, error) {
 	ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlCmdKeyLeaseSeconds))
 	defer cancel()
@@ -346,7 +355,7 @@ func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interf
 	defer c.Unlock()
 	key := ttlCmdKeyRequestPrefix + reqID
 	c.store[key] = req
-	for _, ch := range c.watchers {
+	for _, ch := range c.commandWatchers {
 		select {
 		case <-ctx.Done():
 			return reqID, ctx.Err()
@@ -358,6 +367,7 @@ func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request interf
 	return reqID, nil
 }
 
+// TakeCommand implements the CommandClient
 func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error) {
 	c.Lock()
 	defer c.Unlock()
@@ -369,6 +379,7 @@ func (c *mockClient) TakeCommand(_ context.Context, reqID string) (bool, error)
 	return false, nil
 }
 
+// ResponseCommand implements the CommandClient
 func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interface{}) error {
 	c.Lock()
 	defer c.Unlock()
@@ -391,11 +402,12 @@ func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj interf
 	return nil
 }
 
+// WatchCommand implements the CommandClient
 func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
 	c.Lock()
 	defer c.Unlock()
 	ch := make(chan *CmdRequest, 16+len(c.store))
-	c.watchers = append(c.watchers, ch)
+	c.commandWatchers = append(c.commandWatchers, ch)
 	for key, val := range c.store {
 		if strings.HasPrefix(key, ttlCmdKeyRequestPrefix) {
 			if req, ok := val.(*CmdRequest); ok {
@@ -407,9 +419,9 @@ func (c *mockClient) WatchCommand(ctx context.Context) <-chan *CmdRequest {
 		<-ctx.Done()
 		c.Lock()
 		defer c.Unlock()
-		for i, chItem := range c.watchers {
+		for i, chItem := range c.commandWatchers {
 			if chItem == ch {
-				c.watchers = append(c.watchers[:i], c.watchers[i+1:]...)
+				c.commandWatchers = append(c.commandWatchers[:i], c.commandWatchers[i+1:]...)
 				break
 			}
 		}
diff --git a/ttl/client/command_test.go b/ttl/client/command_test.go
index 830137f32904e..69cde75309ad6 100644
--- a/ttl/client/command_test.go
+++ b/ttl/client/command_test.go
@@ -42,7 +42,7 @@ func TestCommandClient(t *testing.T) {
 	defer cluster.Terminate(t)
 	etcd := cluster.RandClient()
 
-	etcdCli := NewEtcdCommandClient(etcd)
+	etcdCli := NewCommandClient(etcd)
 	mockCli := NewMockCommandClient()
 
 	ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
diff --git a/ttl/client/notification.go b/ttl/client/notification.go
new file mode 100644
index 0000000000000..6c44cd0dd7aa9
--- /dev/null
+++ b/ttl/client/notification.go
@@ -0,0 +1,79 @@
+// Copyright 2023 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package client
+
+import (
+	"context"
+
+	"github.com/pingcap/tidb/ddl/util"
+	clientv3 "go.etcd.io/etcd/client/v3"
+)
+
+const ttlNotificationPrefix string = "/tidb/ttl/notification/"
+
+// NotificationClient is a client to notify other TTL workers
+type NotificationClient interface {
+	// Notify sends a notification
+	Notify(ctx context.Context, typ string, data string) error
+	// WatchNotification opens a channel, in which we could receive all notifications
+	WatchNotification(ctx context.Context, typ string) clientv3.WatchChan
+}
+
+// NewNotificationClient creates a notification client with etcd
+func NewNotificationClient(etcdCli *clientv3.Client) NotificationClient {
+	return &etcdClient{
+		etcdCli: etcdCli,
+	}
+}
+
+// Notify stores the corresponding K-V in the etcd
+func (c *etcdClient) Notify(ctx context.Context, typ string, data string) error {
+	return util.PutKVToEtcd(ctx, c.etcdCli, 1, ttlNotificationPrefix+typ, data)
+}
+
+// WatchNotification returns a go channel to get notification
+func (c *etcdClient) WatchNotification(ctx context.Context, typ string) clientv3.WatchChan {
+	return c.etcdCli.Watch(ctx, ttlNotificationPrefix+typ)
+}
+
+// NewMockNotificationClient creates a mock notification client
+func NewMockNotificationClient() NotificationClient {
+	return &mockClient{
+		store:                make(map[string]interface{}),
+		commandWatchers:      make([]chan *CmdRequest, 0, 1),
+		notificationWatchers: make(map[string][]chan clientv3.WatchResponse),
+	}
+}
+
+// Notify implements the NotificationClient
+func (c *mockClient) Notify(_ context.Context, typ string, data string) error {
+	c.Lock()
+	defer c.Unlock()
+
+	for _, ch := range c.notificationWatchers[typ] {
+		ch <- clientv3.WatchResponse{}
+	}
+	return nil
+}
+
+// WatchNotification implements the NotificationClient
+func (c *mockClient) WatchNotification(_ context.Context, typ string) clientv3.WatchChan {
+	c.Lock()
+	defer c.Unlock()
+
+	ch := make(chan clientv3.WatchResponse, 1)
+	c.notificationWatchers[typ] = append(c.notificationWatchers[typ], ch)
+	return ch
+}
diff --git a/ttl/ttlworker/config.go b/ttl/ttlworker/config.go
index c1774bc667348..468150c3949a7 100644
--- a/ttl/ttlworker/config.go
+++ b/ttl/ttlworker/config.go
@@ -32,6 +32,7 @@ const ttlJobTimeout = 6 * time.Hour
 
 const taskManagerLoopTickerInterval = time.Minute
 const ttlTaskHeartBeatTickerInterval = time.Minute
+const ttlTaskGCInterval = time.Hour
 
 func getUpdateInfoSchemaCacheInterval() time.Duration {
 	failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go
index 223128b52f26c..77a66d7e3f761 100644
--- a/ttl/ttlworker/job_manager.go
+++ b/ttl/ttlworker/job_manager.go
@@ -38,6 +38,8 @@ import (
 	"go.uber.org/zap"
 )
 
+const scanTaskNotificationType string = "scan"
+
 const insertNewTableIntoStatusTemplate = "INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (%?, %?)"
 const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status
 	SET current_job_id = %?,
@@ -82,8 +84,9 @@ type JobManager struct {
 	// id is the ddl id of this instance
 	id string
 
-	store  kv.Storage
-	cmdCli client.CommandClient
+	store           kv.Storage
+	cmdCli          client.CommandClient
+	notificationCli client.NotificationClient
 
 	// infoSchemaCache and tableStatusCache are a cache stores the information from info schema and the tidb_ttl_table_status
 	// table. They don't need to be protected by mutex, because they are only used in job loop goroutine.
@@ -113,9 +116,11 @@ func NewJobManager(id string, sessPool sessionPool, store kv.Storage, etcdCli *c
 	manager.tableStatusCache = cache.NewTableStatusCache(getUpdateTTLTableStatusCacheInterval())
 
 	if etcdCli != nil {
-		manager.cmdCli = client.NewEtcdCommandClient(etcdCli)
+		manager.cmdCli = client.NewCommandClient(etcdCli)
+		manager.notificationCli = client.NewNotificationClient(etcdCli)
 	} else {
 		manager.cmdCli = client.NewMockCommandClient()
+		manager.notificationCli = client.NewMockNotificationClient()
 	}
 
 	manager.taskManager = newTaskManager(manager.ctx, sessPool, manager.infoSchemaCache, id)
@@ -150,6 +155,7 @@ func (m *JobManager) jobLoop() error {
 	checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval())
 
 	cmdWatcher := m.cmdCli.WatchCommand(m.ctx)
+	scanTaskNotificationWatcher := m.notificationCli.WatchNotification(m.ctx, scanTaskNotificationType)
 	m.taskManager.resizeWorkersWithSysVar()
 	for {
 		m.reportMetrics()
@@ -208,6 +214,17 @@ func (m *JobManager) jobLoop() error {
 		// Task Manager Loop
 		case <-scheduleTaskTicker:
 			m.taskManager.rescheduleTasks(se, now)
+		case _, ok := <-scanTaskNotificationWatcher:
+			if !ok {
+				if m.ctx.Err() != nil {
+					return nil
+				}
+
+				logutil.BgLogger().Warn("The TTL scan task notification watcher is closed unexpectedly, re-watch it again")
+				scanTaskNotificationWatcher = m.notificationCli.WatchNotification(m.ctx, scanTaskNotificationType)
+				continue
+			}
+			m.taskManager.rescheduleTasks(se, now)
 		case <-taskCheckTicker:
 			m.taskManager.checkInvalidTask(se)
 			m.taskManager.checkFinishedTask(se, now)
@@ -611,10 +628,18 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
 	if err != nil {
 		return nil, err
 	}
-	return m.createNewJob(now, table)
+
+	job := m.createNewJob(now, table)
+
+	// job is created, notify every scan managers to fetch new tasks
+	err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id)
+	if err != nil {
+		logutil.Logger(m.ctx).Warn("fail to trigger scan tasks", zap.Error(err))
+	}
+	return job, nil
 }
 
-func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
+func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) *ttlJob {
 	id := m.tableStatusCache.Tables[table.ID].CurrentJobID
 
 	return &ttlJob{
@@ -627,7 +652,7 @@ func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*t
 		tbl: table,
 
 		status: cache.JobStatusWaiting,
-	}, nil
+	}
 }
 
 // updateHeartBeat updates the heartbeat for all task with current instance as owner
@@ -687,6 +712,11 @@ func (m *JobManager) GetCommandCli() client.CommandClient {
 	return m.cmdCli
 }
 
+// GetNotificationCli returns the notification client
+func (m *JobManager) GetNotificationCli() client.NotificationClient {
+	return m.notificationCli
+}
+
 type ttlSummary struct {
 	TotalRows   uint64 `json:"total_rows"`
 	SuccessRows uint64 `json:"success_rows"`
diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go
index c763e1363aecd..313b9257249a6 100644
--- a/ttl/ttlworker/job_manager_integration_test.go
+++ b/ttl/ttlworker/job_manager_integration_test.go
@@ -407,6 +407,44 @@ func TestJobTimeout(t *testing.T) {
 	tk.MustQuery("select count(*) from mysql.tidb_ttl_task").Check(testkit.Rows("0"))
 }
 
+func TestTriggerScanTask(t *testing.T) {
+	store, dom := testkit.CreateMockStoreAndDomain(t)
+	tk := testkit.NewTestKit(t, store)
+	sessionFactory := sessionFactory(t, store)
+
+	now := time.Now()
+
+	se := sessionFactory()
+	m := dom.TTLJobManager()
+	m.TaskManager().ResizeWorkersWithSysVar()
+	nCli := m.GetNotificationCli()
+
+	tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'")
+	require.NoError(t, m.InfoSchemaCache().Update(se))
+
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+	go func() {
+		<-nCli.WatchNotification(context.Background(), "scan")
+		wg.Done()
+	}()
+	m.RescheduleJobs(se, now)
+
+	// notification is sent
+	wg.Wait()
+
+	for time.Now().Before(now.Add(time.Second * 5)) {
+		time.Sleep(time.Second)
+		rows := tk.MustQuery("SELECT status FROM mysql.tidb_ttl_task").Rows()
+		if len(rows) == 0 {
+			break
+		}
+		if rows[0][0] == cache.TaskStatusFinished {
+			break
+		}
+	}
+}
+
 func waitAndStopTTLManager(t *testing.T, dom *domain.Domain) {
 	maxWaitTime := 30
 	for {