diff --git a/cdc/scheduler/internal/tp/capture_manager.go b/cdc/scheduler/internal/tp/capture_manager.go index 241a6100331..143fd0c298d 100644 --- a/cdc/scheduler/internal/tp/capture_manager.go +++ b/cdc/scheduler/internal/tp/capture_manager.go @@ -85,16 +85,16 @@ type captureManager struct { func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *captureManager { return &captureManager{ OwnerRev: rev, - Captures: make(map[string]*CaptureStatus), + Captures: make(map[model.CaptureID]*CaptureStatus), heartbeatTick: heartbeatTick, } } -func (c *captureManager) captureTableSets() map[model.CaptureID]*CaptureStatus { +func (c *captureManager) CaptureTableSets() map[model.CaptureID]*CaptureStatus { return c.Captures } -func (c *captureManager) checkAllCaptureInitialized() bool { +func (c *captureManager) CheckAllCaptureInitialized() bool { for _, captrueStatus := range c.Captures { if captrueStatus.State == CaptureStateUninitialize { return false @@ -103,7 +103,7 @@ func (c *captureManager) checkAllCaptureInitialized() bool { return true } -func (c *captureManager) tick() []*schedulepb.Message { +func (c *captureManager) Tick() []*schedulepb.Message { c.tickCounter++ if c.tickCounter < c.heartbeatTick { return nil @@ -120,10 +120,10 @@ func (c *captureManager) tick() []*schedulepb.Message { return msgs } -func (c *captureManager) poll( +func (c *captureManager) Poll( aliveCaptures map[model.CaptureID]*model.CaptureInfo, msgs []*schedulepb.Message, -) ([]*schedulepb.Message, bool) { +) []*schedulepb.Message { outMsgs := c.onAliveCaptureUpdate(aliveCaptures) for _, msg := range msgs { if msg.MsgType == schedulepb.MsgHeartbeatResponse { @@ -135,7 +135,7 @@ func (c *captureManager) poll( msg.GetHeartbeatResponse(), msg.Header.ProcessorEpoch) } } - return outMsgs, c.checkAllCaptureInitialized() + return outMsgs } func (c *captureManager) onAliveCaptureUpdate( diff --git a/cdc/scheduler/internal/tp/capture_manager_test.go b/cdc/scheduler/internal/tp/capture_manager_test.go index b151c463140..b752b8d8141 100644 --- a/cdc/scheduler/internal/tp/capture_manager_test.go +++ b/cdc/scheduler/internal/tp/capture_manager_test.go @@ -57,34 +57,45 @@ func TestCaptureManagerPoll(t *testing.T) { cm := newCaptureManager(rev, 2) // Initial poll for alive captures. - msgs, hasInit := cm.poll(ms, nil) - require.False(t, hasInit) + msgs := cm.Poll(ms, nil) require.ElementsMatch(t, []*schedulepb.Message{ {To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}}, {To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}}, }, msgs) + require.False(t, cm.CheckAllCaptureInitialized()) // Poll one response - msgs, hasInit = cm.poll(ms, []*schedulepb.Message{ + msgs = cm.Poll(ms, []*schedulepb.Message{ { Header: &schedulepb.Message_Header{}, From: "1", MsgType: schedulepb.MsgHeartbeatResponse, HeartbeatResponse: &schedulepb.HeartbeatResponse{}, }, }) - require.False(t, hasInit) require.Empty(t, msgs) + require.False(t, cm.CheckAllCaptureInitialized()) // Poll another response - msgs, hasInit = cm.poll(ms, []*schedulepb.Message{ + msgs = cm.Poll(ms, []*schedulepb.Message{ { Header: &schedulepb.Message_Header{}, From: "2", MsgType: schedulepb.MsgHeartbeatResponse, HeartbeatResponse: &schedulepb.HeartbeatResponse{}, }, }) - require.True(t, hasInit, "%v %v", cm.Captures["1"], cm.Captures["2"]) require.Empty(t, msgs) + require.True(t, cm.CheckAllCaptureInitialized(), "%v %v", cm.Captures["1"], cm.Captures["2"]) + + // Poll unknown capture response + msgs = cm.Poll(ms, []*schedulepb.Message{ + { + Header: &schedulepb.Message_Header{}, From: "unknown", + MsgType: schedulepb.MsgHeartbeatResponse, + HeartbeatResponse: &schedulepb.HeartbeatResponse{}, + }, + }) + require.Empty(t, msgs) + require.True(t, cm.CheckAllCaptureInitialized()) } func TestCaptureManagerTick(t *testing.T) { @@ -94,22 +105,22 @@ func TestCaptureManagerTick(t *testing.T) { cm := newCaptureManager(rev, 2) // No heartbeat if there is no capture. - msgs := cm.tick() + msgs := cm.Tick() require.Empty(t, msgs) - msgs = cm.tick() + msgs = cm.Tick() require.Empty(t, msgs) ms := map[model.CaptureID]*model.CaptureInfo{ "1": {}, "2": {}, } - _, hasInit := cm.poll(ms, nil) - require.False(t, hasInit) + cm.Poll(ms, nil) + require.False(t, cm.CheckAllCaptureInitialized()) // Heartbeat even if capture is uninitialize. - msgs = cm.tick() + msgs = cm.Tick() require.Empty(t, msgs) - msgs = cm.tick() + msgs = cm.Tick() require.ElementsMatch(t, []*schedulepb.Message{ {To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}}, {To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}}, @@ -119,10 +130,10 @@ func TestCaptureManagerTick(t *testing.T) { for _, s := range []CaptureState{CaptureStateInitialized, CaptureStateStopping} { cm.Captures["1"].State = s cm.Captures["2"].State = s - require.True(t, cm.checkAllCaptureInitialized()) - msgs = cm.tick() + require.True(t, cm.CheckAllCaptureInitialized()) + msgs = cm.Tick() require.Empty(t, msgs) - msgs = cm.tick() + msgs = cm.Tick() require.ElementsMatch(t, []*schedulepb.Message{ {To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}}, {To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}}, diff --git a/cdc/scheduler/internal/tp/coordinator.go b/cdc/scheduler/internal/tp/coordinator.go index 4c62b049a49..6e592a53f72 100644 --- a/cdc/scheduler/internal/tp/coordinator.go +++ b/cdc/scheduler/internal/tp/coordinator.go @@ -15,10 +15,16 @@ package tp import ( "context" + "log" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/scheduler/internal" + "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/p2p" + "github.com/pingcap/tiflow/pkg/version" + "go.uber.org/zap" ) type scheduler interface { @@ -34,12 +40,38 @@ type scheduler interface { var _ internal.Scheduler = (*coordinator)(nil) type coordinator struct { + version string + revision schedulepb.OwnerRevision trans transport scheduler []scheduler replicationM *replicationManager captureM *captureManager } +// NewCoordinator returns a two phase scheduler. +func NewCoordinator( + ctx context.Context, + changeFeedID model.ChangeFeedID, + checkpointTs model.Ts, + messageServer *p2p.MessageServer, + messageRouter p2p.MessageRouter, + ownerRevision int64, + cfg *config.SchedulerConfig, +) (internal.Scheduler, error) { + trans, err := newTranport(ctx, changeFeedID, messageServer, messageRouter) + if err != nil { + return nil, errors.Trace(err) + } + revision := schedulepb.OwnerRevision{Revision: ownerRevision} + return &coordinator{ + version: version.ReleaseSemver(), + revision: revision, + trans: trans, + replicationM: newReplicationManager(cfg.MaxTaskConcurrency), + captureM: newCaptureManager(revision, cfg.HeartbeatTick), + }, nil +} + func (c *coordinator) Tick( ctx context.Context, // Latest global checkpoint of the changefeed @@ -68,29 +100,44 @@ func (c *coordinator) poll( ctx context.Context, checkpointTs model.Ts, currentTables []model.TableID, aliveCaptures map[model.CaptureID]*model.CaptureInfo, ) error { - recvMsgs, err := c.trans.Recv(ctx) + recvMsgs, err := c.recvMsgs(ctx) if err != nil { return errors.Trace(err) } - sentMsgs, hasInit := c.captureM.poll(aliveCaptures, recvMsgs) - if !hasInit { + + sentMsgs := c.captureM.Tick() + msgs := c.captureM.Poll(aliveCaptures, recvMsgs) + sentMsgs = append(sentMsgs, msgs...) + if c.captureM.CheckAllCaptureInitialized() { + // Skip polling replication manager as not all capture are initialized. err := c.trans.Send(ctx, sentMsgs) return errors.Trace(err) } - captureTables := c.captureM.captureTableSets() + // Handling received messages to advance replication set. + msgs, err = c.replicationM.HandleMessage(recvMsgs) + if err != nil { + return errors.Trace(err) + } + sentMsgs = append(sentMsgs, msgs...) + + // Generate schedule tasks based on the current status. + captureTables := c.captureM.CaptureTableSets() allTasks := make([]*scheduleTask, 0) for _, sched := range c.scheduler { tasks := sched.Schedule(checkpointTs, currentTables, aliveCaptures, captureTables) allTasks = append(allTasks, tasks...) } - msgs, err := c.replicationM.poll( - ctx, checkpointTs, currentTables, aliveCaptures, recvMsgs, allTasks) + + // Handling generated schedule tasks. + msgs, err = c.replicationM.HandleTasks(allTasks) if err != nil { return errors.Trace(err) } sentMsgs = append(sentMsgs, msgs...) - err = c.trans.Send(ctx, sentMsgs) + + // Send new messages. + err = c.sendMsgs(ctx, sentMsgs) if err != nil { return errors.Trace(err) } @@ -98,3 +145,36 @@ func (c *coordinator) poll( // checkpoint calcuation return nil } + +func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) { + recvMsgs, err := c.trans.Recv(ctx) + if err != nil { + return nil, errors.Trace(err) + } + + n := 0 + for _, val := range recvMsgs { + // Filter stale messages. + if val.Header.OwnerRevision == c.revision { + recvMsgs[n] = val + n++ + } + } + return recvMsgs[:n], nil +} + +func (c *coordinator) sendMsgs(ctx context.Context, msgs []*schedulepb.Message) error { + for i := range msgs { + m := msgs[i] + m.Header = &schedulepb.Message_Header{ + Version: c.version, + OwnerRevision: c.revision, + } + // Correctness check. + if len(m.To) == 0 || m.MsgType == schedulepb.MsgUnknown { + log.Panic("invalid message no destination or unknown message type", + zap.Any("message", m)) + } + } + return c.trans.Send(ctx, msgs) +} diff --git a/cdc/scheduler/internal/tp/coordinator_test.go b/cdc/scheduler/internal/tp/coordinator_test.go index c7ab071a5e7..921727ce3a8 100644 --- a/cdc/scheduler/internal/tp/coordinator_test.go +++ b/cdc/scheduler/internal/tp/coordinator_test.go @@ -12,3 +12,79 @@ // limitations under the License. package tp + +import ( + "context" + "testing" + + "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "github.com/stretchr/testify/require" +) + +type mockTrans struct { + send func(ctx context.Context, msgs []*schedulepb.Message) error + recv func(ctx context.Context) ([]*schedulepb.Message, error) +} + +func (m *mockTrans) Send(ctx context.Context, msgs []*schedulepb.Message) error { + return m.send(ctx, msgs) +} +func (m *mockTrans) Recv(ctx context.Context) ([]*schedulepb.Message, error) { + return m.recv(ctx) +} + +func TestCoordinatorSendMsgs(t *testing.T) { + t.Parallel() + ctx := context.Background() + trans := &mockTrans{} + cood := coordinator{ + version: "6.2.0", + revision: schedulepb.OwnerRevision{Revision: 3}, + trans: trans, + } + trans.send = func(ctx context.Context, msgs []*schedulepb.Message) error { + require.EqualValues(t, []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + Version: cood.version, + OwnerRevision: cood.revision, + }, + To: "1", MsgType: schedulepb.MsgDispatchTableRequest, + }}, msgs) + return nil + } + cood.sendMsgs( + ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}}) +} + +func TestCoordinatorRecvMsgs(t *testing.T) { + t.Parallel() + + ctx := context.Background() + trans := &mockTrans{} + cood := coordinator{ + version: "6.2.0", + revision: schedulepb.OwnerRevision{Revision: 3}, + trans: trans, + } + trans.recv = func(ctx context.Context) ([]*schedulepb.Message, error) { + return []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + OwnerRevision: cood.revision, + }, + From: "1", MsgType: schedulepb.MsgDispatchTableResponse, + }, { + Header: &schedulepb.Message_Header{ + OwnerRevision: schedulepb.OwnerRevision{Revision: 4}, + }, + From: "2", MsgType: schedulepb.MsgDispatchTableResponse, + }}, nil + } + msgs, err := cood.recvMsgs(ctx) + require.Nil(t, err) + require.EqualValues(t, []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + OwnerRevision: cood.revision, + }, + From: "1", MsgType: schedulepb.MsgDispatchTableResponse, + }}, msgs) +} diff --git a/cdc/scheduler/internal/tp/replication_manager.go b/cdc/scheduler/internal/tp/replication_manager.go index 6f2f480c05a..b03287bd068 100644 --- a/cdc/scheduler/internal/tp/replication_manager.go +++ b/cdc/scheduler/internal/tp/replication_manager.go @@ -14,128 +14,301 @@ package tp import ( - "context" - "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "go.uber.org/zap" ) -type callback func(model.TableID) +type callback func() // burstBalance for changefeed set up or unplaned TiCDC node failure. // TiCDC needs to balance intrrupted tables as soon as possible. -//nolint:deadcode type burstBalance struct { - tables map[model.TableID]model.CaptureID - - done callback + // Add tables to captures + Tables map[model.TableID]model.CaptureID } type moveTable struct { - tableID model.TableID - sourceCapture model.CaptureID - destCapture model.CaptureID - - done callback + TableID model.TableID + DestCapture model.CaptureID } type addTable struct { - tableID model.TableID - captureID model.CaptureID - - done callback + TableID model.TableID + CaptureID model.CaptureID } -type deleteTable struct { - tableID model.TableID - captureID model.CaptureID - - done callback +type removeTable struct { + TableID model.TableID + CaptureID model.CaptureID } type scheduleTask struct { - moveTable *moveTable - addTable *addTable - deleteTable *deleteTable + moveTable *moveTable + addTable *addTable + removeTable *removeTable + burstBalance *burstBalance + + accept callback } type replicationManager struct { - version string - tables map[model.TableID]*ReplicationSet - runningTasks map[model.TableID]*scheduleTask -} - -func (r *replicationManager) poll( - ctx context.Context, - // Latest global checkpoint of the changefeed - checkpointTs model.Ts, - // All tables that SHOULD be replicated (or started) at the current checkpoint. - currentTables []model.TableID, - // All captures that are alive according to the latest Etcd states. - aliveCaptures map[model.CaptureID]*model.CaptureInfo, + tables map[model.TableID]*ReplicationSet + + runningTasks map[model.TableID]*scheduleTask + maxTaskConcurrency int +} + +func newReplicationManager(maxTaskConcurrency int) *replicationManager { + return &replicationManager{ + tables: make(map[int64]*ReplicationSet), + runningTasks: make(map[int64]*scheduleTask), + maxTaskConcurrency: maxTaskConcurrency, + } +} + +func (r *replicationManager) HandleMessage( msgs []*schedulepb.Message, - tasks []*scheduleTask, ) ([]*schedulepb.Message, error) { - msgBuf := make([]*schedulepb.Message, 0) + sentMegs := make([]*schedulepb.Message, 0) + for i := range msgs { + msg := msgs[i] + switch msg.MsgType { + case schedulepb.MsgCheckpoint: + msgs, err := r.handleMessageCheckpoint(msg.Checkpoints) + if err != nil { + return nil, errors.Trace(err) + } + sentMegs = append(sentMegs, msgs...) + case schedulepb.MsgDispatchTableResponse: + msgs, err := r.handleMessageDispatchTableResponse(msg.From, msg.DispatchTableResponse) + if err != nil { + return nil, errors.Trace(err) + } + sentMegs = append(sentMegs, msgs...) + case schedulepb.MsgHeartbeatResponse: + msgs, err := r.handleMessageHeartbeatResponse(msg.From, msg.HeartbeatResponse) + if err != nil { + return nil, errors.Trace(err) + } + sentMegs = append(sentMegs, msgs...) + } + } + return sentMegs, nil +} - sendMsgs, err := r.handleMessage(msgs) - if err != nil { - return nil, errors.Trace(err) +func (r *replicationManager) handleMessageHeartbeatResponse( + from model.CaptureID, msg *schedulepb.HeartbeatResponse, +) ([]*schedulepb.Message, error) { + sentMsgs := make([]*schedulepb.Message, 0) + for i := range msg.Tables { + status := msg.Tables[i] + table, ok := r.tables[status.TableID] + if !ok { + log.Info("tpscheduler: ignore table status no table found", + zap.Any("message", status)) + continue + } + msgs, err := table.handleTableStatus(from, &status) + if err != nil { + return nil, errors.Trace(err) + } + if table.hasRemoved() { + log.Info("tpscheduler: table has removed", zap.Int64("tableID", status.TableID)) + delete(r.tables, status.TableID) + } + sentMsgs = append(sentMsgs, msgs...) + } + return sentMsgs, nil +} + +func (r *replicationManager) handleMessageDispatchTableResponse( + from model.CaptureID, msg *schedulepb.DispatchTableResponse, +) ([]*schedulepb.Message, error) { + var status *schedulepb.TableStatus + switch resp := msg.Response.(type) { + case *schedulepb.DispatchTableResponse_AddTable: + status = resp.AddTable.Status + case *schedulepb.DispatchTableResponse_RemoveTable: + status = resp.RemoveTable.Status + default: + log.Warn("tpscheduler: ignore unknown dispatch table response", + zap.Any("message", msg)) + return nil, nil } - msgBuf = append(msgBuf, sendMsgs...) - sendMsgs, err = r.handleMessage(msgs) + table, ok := r.tables[status.TableID] + if !ok { + log.Info("tpscheduler: ignore table status no table found", + zap.Any("message", status)) + return nil, nil + } + msgs, err := table.handleTableStatus(from, status) if err != nil { return nil, errors.Trace(err) } - msgBuf = append(msgBuf, sendMsgs...) - - return msgBuf, nil + if table.hasRemoved() { + log.Info("tpscheduler: table has removed", zap.Int64("tableID", status.TableID)) + delete(r.tables, status.TableID) + } + return msgs, nil } -func (r *replicationManager) handleMessage( - msg []*schedulepb.Message, +func (r *replicationManager) handleMessageCheckpoint( + checkpoints map[model.TableID]schedulepb.Checkpoint, ) ([]*schedulepb.Message, error) { - // s.handleMessageHeartbeat() - // s.handleMessageCheckpoint() - // s.handleMessageDispatchTableResponse() return nil, nil } -func (r *replicationManager) handleMessageHeartbeat(msg *schedulepb.Heartbeat) { - // TODO: build s.tables from Heartbeat message. -} +func (r *replicationManager) HandleTasks( + tasks []*scheduleTask, +) ([]*schedulepb.Message, error) { + // Check if a running task is finished. + for tableID := range r.runningTasks { + if table, ok := r.tables[tableID]; ok { + // If table is back to Replicating or Removed, + // the running task is finished. + if table.State == ReplicationSetStateReplicating || table.hasRemoved() { + delete(r.runningTasks, tableID) + } + } else { + // No table found, remove the task + delete(r.runningTasks, tableID) + } + } -func (r *replicationManager) handleMessageDispatchTableResponse( - msg *schedulepb.DispatchTableResponse, -) { - // TODO: update s.tables from DispatchTableResponse message. -} + sentMsgs := make([]*schedulepb.Message, 0) + for _, task := range tasks { + // Burst balance does not affect by maxTaskConcurrency. + if task.burstBalance != nil { + msgs, err := r.handleBurstBalanceTasks(task.burstBalance) + if err != nil { + return nil, errors.Trace(err) + } + sentMsgs = append(sentMsgs, msgs...) + if task.accept != nil { + task.accept() + } + continue + } -func (r *replicationManager) handleMessageCheckpoint(msg *schedulepb.Checkpoint) { - // TODO: update s.tables from Checkpoint message. -} + // Check if accpeting one more task exceeds maxTaskConcurrency. + if len(r.runningTasks)+1 > r.maxTaskConcurrency { + log.Debug("tpcheduler: too many running task") + // Does not use break, in case there is burst balance task + // in the remaining tasks. + continue + } -// ======== + var tableID model.TableID + if task.addTable != nil { + tableID = task.addTable.TableID + } else if task.removeTable != nil { + tableID = task.removeTable.TableID + } else if task.moveTable != nil { + tableID = task.moveTable.TableID + } -func (r *replicationManager) handleTasks(tasks []*scheduleTask) { - // s.handleTaskAddTable(nil) - // s.handleTaskMoveTable(nil) - // s.handleTaskDeleteTable(nil) + // Skip task if the table is already running a task, + // or the table has removed. + if _, ok := r.runningTasks[tableID]; ok { + log.Info("tpscheduler: ignore task, already exists", + zap.Any("task", task)) + continue + } + if _, ok := r.tables[tableID]; !ok && task.addTable == nil { + log.Info("tpscheduler: ignore task, table not found", + zap.Any("task", task)) + continue + } + + var msgs []*schedulepb.Message + var err error + if task.addTable != nil { + msgs, err = r.handleAddTableTask(task.addTable) + } else if task.removeTable != nil { + msgs, err = r.handleRemoveTableTask(task.removeTable) + } else if task.moveTable != nil { + msgs, err = r.handleMoveTableTask(task.moveTable) + } + if err != nil { + return nil, errors.Trace(err) + } + sentMsgs = append(sentMsgs, msgs...) + r.runningTasks[tableID] = task + if task.accept != nil { + task.accept() + } + } + return sentMsgs, nil } -func (r *replicationManager) handleTaskMoveTable(task *moveTable) error { - // TODO: update s.runingTasks and s.tables. - return nil +func (r *replicationManager) handleAddTableTask( + task *addTable, +) ([]*schedulepb.Message, error) { + var err error + table := r.tables[task.TableID] + if table == nil { + table, err = newReplicationSet(task.TableID, nil) + if err != nil { + return nil, errors.Trace(err) + } + r.tables[task.TableID] = table + } + return table.handleAddTable(task.CaptureID) +} + +func (r *replicationManager) handleRemoveTableTask( + task *removeTable, +) ([]*schedulepb.Message, error) { + table := r.tables[task.TableID] + if table.hasRemoved() { + log.Info("tpscheduler: table has removed", zap.Int64("tableID", task.TableID)) + delete(r.tables, task.TableID) + return nil, nil + } + return table.handleRemoveTable() } -func (r *replicationManager) handleTaskAddTable(task *addTable) error { - // TODO: update s.runingTasks and s.tables. - return nil +func (r *replicationManager) handleMoveTableTask( + task *moveTable, +) ([]*schedulepb.Message, error) { + table := r.tables[task.TableID] + return table.handleMoveTable(task.DestCapture) } -func (r *replicationManager) handleTaskDeleteTable(task *deleteTable) error { - // TODO: update s.runingTasks and s.tables. - return nil +func (r *replicationManager) handleBurstBalanceTasks( + task *burstBalance, +) ([]*schedulepb.Message, error) { + perCapture := make(map[model.CaptureID]int) + for _, captureID := range task.Tables { + perCapture[captureID]++ + } + fields := make([]zap.Field, 0, len(perCapture)) + for captureID, count := range perCapture { + fields = append(fields, zap.Int(captureID, count)) + } + fields = append(fields, zap.Int("total", len(task.Tables))) + log.Info("tpscheduler: handle burst balance task", fields...) + + sentMsgs := make([]*schedulepb.Message, 0, len(task.Tables)) + for tableID := range task.Tables { + if r.runningTasks[tableID] != nil { + // Skip add table if the table is already running a task. + continue + } + captureID := task.Tables[tableID] + msgs, err := r.handleAddTableTask(&addTable{ + TableID: tableID, CaptureID: captureID, + }) + if err != nil { + return nil, errors.Trace(err) + } + sentMsgs = append(sentMsgs, msgs...) + // Just for place holding. + r.runningTasks[tableID] = &scheduleTask{} + } + return sentMsgs, nil } diff --git a/cdc/scheduler/internal/tp/replication_manager_test.go b/cdc/scheduler/internal/tp/replication_manager_test.go new file mode 100644 index 00000000000..f1dc7b961f1 --- /dev/null +++ b/cdc/scheduler/internal/tp/replication_manager_test.go @@ -0,0 +1,460 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tp + +import ( + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "github.com/stretchr/testify/require" +) + +func TestReplicationManagerHandleAddTableTask(t *testing.T) { + t.Parallel() + + r := newReplicationManager(10) + addTableCh := make(chan int, 1) + // Absent -> Prepare + msgs, err := r.HandleTasks([]*scheduleTask{{ + addTable: &addTable{TableID: 1, CaptureID: "1"}, + accept: func() { + addTableCh <- 1 + close(addTableCh) + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 1) + require.EqualValues(t, &schedulepb.Message{ + To: "1", + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, + }, + }, + }, + }, msgs[0]) + require.NotNil(t, r.runningTasks[1]) + require.Equal(t, 1, <-addTableCh) + + // Ignore if add the table again. + msgs, err = r.HandleTasks([]*scheduleTask{{ + addTable: &addTable{TableID: 1, CaptureID: "1"}, + accept: func() { t.Fatalf("must not accept") }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) + + // Prepare -> Commit. + msgs, err = r.HandleMessage([]*schedulepb.Message{{ + From: "1", + MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_AddTable{ + AddTable: &schedulepb.AddTableResponse{ + Status: &schedulepb.TableStatus{ + TableID: 1, + State: schedulepb.TableStatePrepared, + }, + }, + }, + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 1) + require.EqualValues(t, &schedulepb.Message{ + To: "1", + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: false, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, + }, + }, + }, + }, msgs[0]) + require.Equal(t, ReplicationSetStateCommit, r.tables[1].State) + require.Equal(t, "1", r.tables[1].Primary) + require.Equal(t, "", r.tables[1].Secondary) + + // Commit -> Replicating through heartbeat response. + msgs, err = r.HandleMessage([]*schedulepb.Message{{ + From: "1", + MsgType: schedulepb.MsgHeartbeatResponse, + HeartbeatResponse: &schedulepb.HeartbeatResponse{ + Tables: []schedulepb.TableStatus{{ + TableID: 1, + State: schedulepb.TableStateReplicating, + }}, + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) + require.Equal(t, ReplicationSetStateReplicating, r.tables[1].State) + require.Equal(t, "1", r.tables[1].Primary) + require.Equal(t, "", r.tables[1].Secondary) + + // Handle task again to clear runningTasks + msgs, err = r.HandleTasks(nil) + require.Nil(t, err) + require.Len(t, msgs, 0) + require.Nil(t, r.runningTasks[1]) +} + +func TestReplicationManagerRemoveTable(t *testing.T) { + t.Parallel() + + r := newReplicationManager(10) + removeTableCh := make(chan int, 1) + + // Ignore remove table if there is no such table. + msgs, err := r.HandleTasks([]*scheduleTask{{ + removeTable: &removeTable{TableID: 1, CaptureID: "1"}, + accept: func() { t.Fatal("must not accpet") }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) + + // Add the table. + tbl, err := newReplicationSet(1, map[string]*schedulepb.TableStatus{ + "1": {TableID: 1, State: schedulepb.TableStateReplicating}, + }) + require.Equal(t, ReplicationSetStateReplicating, tbl.State) + r.tables[1] = tbl + + // Remove the table. + msgs, err = r.HandleTasks([]*scheduleTask{{ + removeTable: &removeTable{TableID: 1, CaptureID: "1"}, + accept: func() { + removeTableCh <- 1 + close(removeTableCh) + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 1) + require.EqualValues(t, &schedulepb.Message{ + To: "1", + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: 1}, + }, + }, + }, msgs[0]) + require.NotNil(t, r.runningTasks[1]) + require.Equal(t, 1, <-removeTableCh) + + // Ignore if remove table again. + msgs, err = r.HandleTasks([]*scheduleTask{{ + removeTable: &removeTable{TableID: 1, CaptureID: "1"}, + accept: func() { t.Fatalf("must not accept") }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) + + // Removing is in-progress through remove table response. + msgs, err = r.HandleMessage([]*schedulepb.Message{{ + From: "1", + MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableResponse{ + Status: &schedulepb.TableStatus{ + TableID: 1, + State: schedulepb.TableStateStopping, + }, + }, + }, + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) + + // Removed through heartbeat response. + msgs, err = r.HandleMessage([]*schedulepb.Message{{ + From: "1", + MsgType: schedulepb.MsgHeartbeatResponse, + HeartbeatResponse: &schedulepb.HeartbeatResponse{ + Tables: []schedulepb.TableStatus{{ + TableID: 1, + State: schedulepb.TableStateStopped, + }}, + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) + require.Nil(t, r.tables[1]) + + // Handle task again to clear runningTasks + msgs, err = r.HandleTasks(nil) + require.Nil(t, err) + require.Len(t, msgs, 0) + require.Nil(t, r.runningTasks[1]) +} + +func TestReplicationManagerMoveTable(t *testing.T) { + t.Parallel() + + r := newReplicationManager(10) + moveTableCh := make(chan int, 1) + + source := "1" + dest := "2" + + // Ignore move table if it's not exist. + msgs, err := r.HandleTasks([]*scheduleTask{{ + moveTable: &moveTable{TableID: 1, DestCapture: dest}, + accept: func() { t.Fatal("must not accpet") }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) + + // Add the table. + tbl, err := newReplicationSet(1, map[string]*schedulepb.TableStatus{ + source: {TableID: 1, State: schedulepb.TableStateReplicating}, + }) + require.Equal(t, ReplicationSetStateReplicating, tbl.State) + r.tables[1] = tbl + + // Replicating -> Prepare + msgs, err = r.HandleTasks([]*scheduleTask{{ + moveTable: &moveTable{TableID: 1, DestCapture: dest}, + accept: func() { + moveTableCh <- 1 + close(moveTableCh) + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 1) + require.EqualValues(t, &schedulepb.Message{ + To: dest, + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, + }, + }, + }, + }, msgs[0]) + require.NotNil(t, r.runningTasks[1]) + require.Equal(t, 1, <-moveTableCh) + + // Ignore if move table again. + msgs, err = r.HandleTasks([]*scheduleTask{{ + moveTable: &moveTable{TableID: 1, DestCapture: dest}, + accept: func() { + moveTableCh <- 1 + close(moveTableCh) + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) + + // Prepare -> Commit. + msgs, err = r.HandleMessage([]*schedulepb.Message{{ + From: dest, + MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_AddTable{ + AddTable: &schedulepb.AddTableResponse{ + Status: &schedulepb.TableStatus{ + TableID: 1, + State: schedulepb.TableStatePrepared, + }, + }, + }, + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 1) + require.EqualValues(t, &schedulepb.Message{ + To: source, + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: 1}, + }, + }, + }, msgs[0]) + + // Source is removed, + // updates it's table status through heartbeat response. + msgs, err = r.HandleMessage([]*schedulepb.Message{{ + From: source, + MsgType: schedulepb.MsgHeartbeatResponse, + HeartbeatResponse: &schedulepb.HeartbeatResponse{ + Tables: []schedulepb.TableStatus{{ + TableID: 1, + State: schedulepb.TableStateStopped, + }}, + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 1) + require.EqualValues(t, &schedulepb.Message{ + To: dest, + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: false, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, + }, + }, + }, + }, msgs[0]) + + // Commit -> Replicating + msgs, err = r.HandleMessage([]*schedulepb.Message{{ + From: dest, + MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_AddTable{ + AddTable: &schedulepb.AddTableResponse{ + Status: &schedulepb.TableStatus{ + TableID: 1, + State: schedulepb.TableStateReplicating, + }, + }, + }, + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) + require.Equal(t, ReplicationSetStateReplicating, r.tables[1].State) + require.Equal(t, dest, r.tables[1].Primary) + + // Handle task again to clear runningTasks + msgs, err = r.HandleTasks(nil) + require.Nil(t, err) + require.Len(t, msgs, 0) + require.Nil(t, r.runningTasks[1]) +} + +func TestReplicationManagerBurstBalance(t *testing.T) { + t.Parallel() + + r := newReplicationManager(1) + balanceTableCh := make(chan int, 1) + + // Burst balance is not limited by maxTaskConcurrency. + msgs, err := r.HandleTasks([]*scheduleTask{{ + addTable: &addTable{TableID: 1, CaptureID: "0"}, + }, { + burstBalance: &burstBalance{Tables: map[int64]string{ + 1: "1", 2: "2", 3: "3", + }}, + accept: func() { + balanceTableCh <- 1 + }, + }}) + require.Nil(t, err) + require.Equal(t, 1, <-balanceTableCh) + require.Len(t, msgs, 3) + for tableID, captureID := range map[model.TableID]model.CaptureID{ + 1: "0", 2: "2", 3: "3", + } { + require.Contains(t, msgs, &schedulepb.Message{ + To: captureID, + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: tableID, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, + }, + }, + }, + }, msgs) + require.Contains(t, r.tables, tableID) + require.Contains(t, r.runningTasks, tableID) + } + + // More burst balance is still allowed. + msgs, err = r.HandleTasks([]*scheduleTask{{ + burstBalance: &burstBalance{Tables: map[int64]string{4: "4", 1: "0"}}, + accept: func() { + balanceTableCh <- 1 + }, + }}) + require.Nil(t, err) + require.Equal(t, 1, <-balanceTableCh) + require.Len(t, msgs, 1) + require.Contains(t, msgs, &schedulepb.Message{ + To: "4", + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 4, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, + }, + }, + }, + }, msgs) +} + +func TestReplicationManagerMaxTaskConcurrency(t *testing.T) { + t.Parallel() + + r := newReplicationManager(1) + addTableCh := make(chan int, 1) + + msgs, err := r.HandleTasks([]*scheduleTask{{ + addTable: &addTable{TableID: 1, CaptureID: "1"}, + accept: func() { + addTableCh <- 1 + close(addTableCh) + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 1) + require.EqualValues(t, &schedulepb.Message{ + To: "1", + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, + }, + }, + }, + }, msgs[0]) + require.NotNil(t, r.runningTasks[1]) + require.Equal(t, 1, <-addTableCh) + + // No more tasks allowed. + msgs, err = r.HandleTasks([]*scheduleTask{{ + addTable: &addTable{TableID: 2, CaptureID: "1"}, + accept: func() { + t.Fatal("must not accept") + }, + }}) + require.Nil(t, err) + require.Len(t, msgs, 0) +} diff --git a/cdc/scheduler/internal/tp/replication_set.go b/cdc/scheduler/internal/tp/replication_set.go index aae2fb52bcb..e75e730eb59 100644 --- a/cdc/scheduler/internal/tp/replication_set.go +++ b/cdc/scheduler/internal/tp/replication_set.go @@ -224,7 +224,9 @@ func (r *ReplicationSet) poll( To: captureID, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, + }, }, }}, nil } @@ -304,10 +306,12 @@ func (r *ReplicationSet) pollOnPrepare( To: captureID, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, false, nil @@ -348,7 +352,9 @@ func (r *ReplicationSet) pollOnCommit( To: r.Primary, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, + }, }, }, false, nil } @@ -364,10 +370,12 @@ func (r *ReplicationSet) pollOnCommit( To: captureID, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: false, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, false, nil @@ -386,10 +394,12 @@ func (r *ReplicationSet) pollOnCommit( To: r.Primary, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: false, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, false, nil @@ -402,8 +412,8 @@ func (r *ReplicationSet) pollOnCommit( To: captureID, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: r.TableID, + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, }, }, }, false, nil @@ -467,7 +477,9 @@ func (r *ReplicationSet) pollOnRemoving( To: captureID, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, + }, }, }, false, nil case schedulepb.TableStateStopped: diff --git a/cdc/scheduler/internal/tp/replication_set_test.go b/cdc/scheduler/internal/tp/replication_set_test.go index 043a14751b8..581e437683c 100644 --- a/cdc/scheduler/internal/tp/replication_set_test.go +++ b/cdc/scheduler/internal/tp/replication_set_test.go @@ -241,10 +241,7 @@ func TestReplicationSetPoll(t *testing.T) { Checkpoint: schedulepb.Checkpoint{}, } } - r, err := newReplicationSet(1, status) - if err != nil { - t.Errorf("fail to new replication set %+v", err) - } + r, _ := newReplicationSet(1, status) var tableStates []int for state := range schedulepb.TableState_name { tableStates = append(tableStates, int(state)) @@ -287,10 +284,12 @@ func TestReplicationSetAddTable(t *testing.T) { To: from, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, msgs[0]) @@ -313,10 +312,12 @@ func TestReplicationSetAddTable(t *testing.T) { To: from, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, msgs[0]) @@ -344,10 +345,12 @@ func TestReplicationSetAddTable(t *testing.T) { To: from, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: false, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, msgs[0]) @@ -392,8 +395,8 @@ func TestReplicationSetRemoveTable(t *testing.T) { To: from, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: r.TableID, + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, }, }, }, msgs[0]) @@ -433,19 +436,19 @@ func TestReplicationSetMoveTable(t *testing.T) { r, err := newReplicationSet(tableID, nil) require.Nil(t, err) - original := "1" + source := "1" dest := "2" // Ignore removing table if it's not in replicating. r.State = ReplicationSetStatePrepare - r.Secondary = original - r.Captures[original] = struct{}{} + r.Secondary = source + r.Captures[source] = struct{}{} msgs, err := r.handleMoveTable(dest) require.Nil(t, err) require.Len(t, msgs, 0) require.NotContains(t, r.Captures, dest) r.State = ReplicationSetStateReplicating - r.Primary = original + r.Primary = source r.Secondary = "" // Replicating -> Prepare @@ -456,16 +459,18 @@ func TestReplicationSetMoveTable(t *testing.T) { To: dest, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, msgs[0]) require.Equal(t, ReplicationSetStatePrepare, r.State) require.Equal(t, dest, r.Secondary) - require.Equal(t, original, r.Primary) + require.Equal(t, source, r.Primary) // No-op if add table again. msgs, err = r.handleAddTable(dest) @@ -483,10 +488,12 @@ func TestReplicationSetMoveTable(t *testing.T) { To: dest, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, msgs[0]) @@ -501,52 +508,52 @@ func TestReplicationSetMoveTable(t *testing.T) { require.Nil(t, err) require.Len(t, msgs, 1) require.EqualValues(t, &schedulepb.Message{ - To: original, + To: source, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: r.TableID, + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, }, }, }, msgs[0]) require.Equal(t, ReplicationSetStateCommit, r.State) - require.Equal(t, original, r.Primary) + require.Equal(t, source, r.Primary) require.Equal(t, dest, r.Secondary) // Source updates it's table status - msgs, err = r.handleTableStatus(original, &schedulepb.TableStatus{ + msgs, err = r.handleTableStatus(source, &schedulepb.TableStatus{ TableID: tableID, State: schedulepb.TableStateReplicating, }) require.Nil(t, err) require.Len(t, msgs, 1, "%v", r) require.EqualValues(t, &schedulepb.Message{ - To: original, + To: source, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: r.TableID, + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, }, }, }, msgs[0]) require.Equal(t, ReplicationSetStateCommit, r.State) - require.Equal(t, original, r.Primary) + require.Equal(t, source, r.Primary) require.Equal(t, dest, r.Secondary) // Removing source is in-progress. - msgs, err = r.handleTableStatus(original, &schedulepb.TableStatus{ + msgs, err = r.handleTableStatus(source, &schedulepb.TableStatus{ TableID: tableID, State: schedulepb.TableStateStopping, }) require.Nil(t, err) require.Len(t, msgs, 0) require.Equal(t, ReplicationSetStateCommit, r.State) - require.Equal(t, original, r.Primary) + require.Equal(t, source, r.Primary) require.Equal(t, dest, r.Secondary) // Source is removed. rSnapshot := *r - msgs, err = r.handleTableStatus(original, &schedulepb.TableStatus{ + msgs, err = r.handleTableStatus(source, &schedulepb.TableStatus{ TableID: tableID, State: schedulepb.TableStateStopped, }) @@ -556,10 +563,12 @@ func TestReplicationSetMoveTable(t *testing.T) { To: dest, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: false, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, msgs[0]) @@ -568,7 +577,7 @@ func TestReplicationSetMoveTable(t *testing.T) { require.Equal(t, "", r.Secondary) // Source stopped message is lost somehow. - msgs, err = rSnapshot.handleTableStatus(original, &schedulepb.TableStatus{ + msgs, err = rSnapshot.handleTableStatus(source, &schedulepb.TableStatus{ TableID: tableID, State: schedulepb.TableStateAbsent, }) @@ -578,10 +587,12 @@ func TestReplicationSetMoveTable(t *testing.T) { To: dest, MsgType: schedulepb.MsgDispatchTableRequest, DispatchTableRequest: &schedulepb.DispatchTableRequest{ - AddTable: &schedulepb.AddTableRequest{ - TableID: r.TableID, - IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: r.TableID, + IsSecondary: false, + Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + }, }, }, }, msgs[0]) diff --git a/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go b/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go index 363e494c466..c498d4e5648 100644 --- a/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go +++ b/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go @@ -267,8 +267,10 @@ func (m *RemoveTableRequest) GetTableID() github_com_pingcap_tiflow_cdc_model.Ta } type DispatchTableRequest struct { - AddTable *AddTableRequest `protobuf:"bytes,1,opt,name=add_table,json=addTable,proto3" json:"add_table,omitempty"` - RemoveTable *RemoveTableRequest `protobuf:"bytes,2,opt,name=remove_table,json=removeTable,proto3" json:"remove_table,omitempty"` + // Types that are valid to be assigned to Request: + // *DispatchTableRequest_AddTable + // *DispatchTableRequest_RemoveTable + Request isDispatchTableRequest_Request `protobuf_oneof:"request"` } func (m *DispatchTableRequest) Reset() { *m = DispatchTableRequest{} } @@ -304,25 +306,55 @@ func (m *DispatchTableRequest) XXX_DiscardUnknown() { var xxx_messageInfo_DispatchTableRequest proto.InternalMessageInfo -func (m *DispatchTableRequest) GetAddTable() *AddTableRequest { +type isDispatchTableRequest_Request interface { + isDispatchTableRequest_Request() + MarshalTo([]byte) (int, error) + Size() int +} + +type DispatchTableRequest_AddTable struct { + AddTable *AddTableRequest `protobuf:"bytes,1,opt,name=add_table,json=addTable,proto3,oneof" json:"add_table,omitempty"` +} +type DispatchTableRequest_RemoveTable struct { + RemoveTable *RemoveTableRequest `protobuf:"bytes,2,opt,name=remove_table,json=removeTable,proto3,oneof" json:"remove_table,omitempty"` +} + +func (*DispatchTableRequest_AddTable) isDispatchTableRequest_Request() {} +func (*DispatchTableRequest_RemoveTable) isDispatchTableRequest_Request() {} + +func (m *DispatchTableRequest) GetRequest() isDispatchTableRequest_Request { if m != nil { - return m.AddTable + return m.Request + } + return nil +} + +func (m *DispatchTableRequest) GetAddTable() *AddTableRequest { + if x, ok := m.GetRequest().(*DispatchTableRequest_AddTable); ok { + return x.AddTable } return nil } func (m *DispatchTableRequest) GetRemoveTable() *RemoveTableRequest { - if m != nil { - return m.RemoveTable + if x, ok := m.GetRequest().(*DispatchTableRequest_RemoveTable); ok { + return x.RemoveTable } return nil } +// XXX_OneofWrappers is for the internal use of the proto package. +func (*DispatchTableRequest) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*DispatchTableRequest_AddTable)(nil), + (*DispatchTableRequest_RemoveTable)(nil), + } +} + type AddTableResponse struct { - TableID github_com_pingcap_tiflow_cdc_model.TableID `protobuf:"varint,1,opt,name=table_id,json=tableId,proto3,casttype=github.com/pingcap/tiflow/cdc/model.TableID" json:"table_id,omitempty"` - Status *TableStatus `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` - Checkpoint *Checkpoint `protobuf:"bytes,3,opt,name=checkpoint,proto3" json:"checkpoint,omitempty"` - Reject bool `protobuf:"varint,4,opt,name=reject,proto3" json:"reject,omitempty"` + Status *TableStatus `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + Checkpoint *Checkpoint `protobuf:"bytes,2,opt,name=checkpoint,proto3" json:"checkpoint,omitempty"` + Reject bool `protobuf:"varint,3,opt,name=reject,proto3" json:"reject,omitempty"` } func (m *AddTableResponse) Reset() { *m = AddTableResponse{} } @@ -358,13 +390,6 @@ func (m *AddTableResponse) XXX_DiscardUnknown() { var xxx_messageInfo_AddTableResponse proto.InternalMessageInfo -func (m *AddTableResponse) GetTableID() github_com_pingcap_tiflow_cdc_model.TableID { - if m != nil { - return m.TableID - } - return 0 -} - func (m *AddTableResponse) GetStatus() *TableStatus { if m != nil { return m.Status @@ -387,9 +412,8 @@ func (m *AddTableResponse) GetReject() bool { } type RemoveTableResponse struct { - TableID github_com_pingcap_tiflow_cdc_model.TableID `protobuf:"varint,1,opt,name=table_id,json=tableId,proto3,casttype=github.com/pingcap/tiflow/cdc/model.TableID" json:"table_id,omitempty"` - Status *TableStatus `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` - Checkpoint *Checkpoint `protobuf:"bytes,3,opt,name=checkpoint,proto3" json:"checkpoint,omitempty"` + Status *TableStatus `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + Checkpoint *Checkpoint `protobuf:"bytes,2,opt,name=checkpoint,proto3" json:"checkpoint,omitempty"` } func (m *RemoveTableResponse) Reset() { *m = RemoveTableResponse{} } @@ -425,13 +449,6 @@ func (m *RemoveTableResponse) XXX_DiscardUnknown() { var xxx_messageInfo_RemoveTableResponse proto.InternalMessageInfo -func (m *RemoveTableResponse) GetTableID() github_com_pingcap_tiflow_cdc_model.TableID { - if m != nil { - return m.TableID - } - return 0 -} - func (m *RemoveTableResponse) GetStatus() *TableStatus { if m != nil { return m.Status @@ -447,8 +464,10 @@ func (m *RemoveTableResponse) GetCheckpoint() *Checkpoint { } type DispatchTableResponse struct { - AddTable *AddTableResponse `protobuf:"bytes,1,opt,name=add_table,json=addTable,proto3" json:"add_table,omitempty"` - RemoveTable *RemoveTableResponse `protobuf:"bytes,2,opt,name=remove_table,json=removeTable,proto3" json:"remove_table,omitempty"` + // Types that are valid to be assigned to Response: + // *DispatchTableResponse_AddTable + // *DispatchTableResponse_RemoveTable + Response isDispatchTableResponse_Response `protobuf_oneof:"response"` } func (m *DispatchTableResponse) Reset() { *m = DispatchTableResponse{} } @@ -484,20 +503,51 @@ func (m *DispatchTableResponse) XXX_DiscardUnknown() { var xxx_messageInfo_DispatchTableResponse proto.InternalMessageInfo -func (m *DispatchTableResponse) GetAddTable() *AddTableResponse { +type isDispatchTableResponse_Response interface { + isDispatchTableResponse_Response() + MarshalTo([]byte) (int, error) + Size() int +} + +type DispatchTableResponse_AddTable struct { + AddTable *AddTableResponse `protobuf:"bytes,1,opt,name=add_table,json=addTable,proto3,oneof" json:"add_table,omitempty"` +} +type DispatchTableResponse_RemoveTable struct { + RemoveTable *RemoveTableResponse `protobuf:"bytes,2,opt,name=remove_table,json=removeTable,proto3,oneof" json:"remove_table,omitempty"` +} + +func (*DispatchTableResponse_AddTable) isDispatchTableResponse_Response() {} +func (*DispatchTableResponse_RemoveTable) isDispatchTableResponse_Response() {} + +func (m *DispatchTableResponse) GetResponse() isDispatchTableResponse_Response { if m != nil { - return m.AddTable + return m.Response + } + return nil +} + +func (m *DispatchTableResponse) GetAddTable() *AddTableResponse { + if x, ok := m.GetResponse().(*DispatchTableResponse_AddTable); ok { + return x.AddTable } return nil } func (m *DispatchTableResponse) GetRemoveTable() *RemoveTableResponse { - if m != nil { - return m.RemoveTable + if x, ok := m.GetResponse().(*DispatchTableResponse_RemoveTable); ok { + return x.RemoveTable } return nil } +// XXX_OneofWrappers is for the internal use of the proto package. +func (*DispatchTableResponse) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*DispatchTableResponse_AddTable)(nil), + (*DispatchTableResponse_RemoveTable)(nil), + } +} + type Heartbeat struct { } @@ -735,15 +785,15 @@ func (m *ProcessorEpoch) GetEpoch() string { } type Message struct { - Header *Message_Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` - MsgType MessageType `protobuf:"varint,2,opt,name=msg_type,json=msgType,proto3,enum=pingcap.tiflow.cdc.schedulepb.MessageType" json:"msg_type,omitempty"` - From github_com_pingcap_tiflow_cdc_model.CaptureID `protobuf:"bytes,3,opt,name=from,proto3,casttype=github.com/pingcap/tiflow/cdc/model.CaptureID" json:"from,omitempty"` - To github_com_pingcap_tiflow_cdc_model.CaptureID `protobuf:"bytes,4,opt,name=to,proto3,casttype=github.com/pingcap/tiflow/cdc/model.CaptureID" json:"to,omitempty"` - DispatchTableRequest *DispatchTableRequest `protobuf:"bytes,5,opt,name=dispatch_table_request,json=dispatchTableRequest,proto3" json:"dispatch_table_request,omitempty"` - DispatchTableResponse *DispatchTableResponse `protobuf:"bytes,6,opt,name=dispatch_table_response,json=dispatchTableResponse,proto3" json:"dispatch_table_response,omitempty"` - Heartbeat *Heartbeat `protobuf:"bytes,7,opt,name=heartbeat,proto3" json:"heartbeat,omitempty"` - HeartbeatResponse *HeartbeatResponse `protobuf:"bytes,8,opt,name=heartbeat_response,json=heartbeatResponse,proto3" json:"heartbeat_response,omitempty"` - Checkpoint *Checkpoint `protobuf:"bytes,9,opt,name=checkpoint,proto3" json:"checkpoint,omitempty"` + Header *Message_Header `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` + MsgType MessageType `protobuf:"varint,2,opt,name=msg_type,json=msgType,proto3,enum=pingcap.tiflow.cdc.schedulepb.MessageType" json:"msg_type,omitempty"` + From github_com_pingcap_tiflow_cdc_model.CaptureID `protobuf:"bytes,3,opt,name=from,proto3,casttype=github.com/pingcap/tiflow/cdc/model.CaptureID" json:"from,omitempty"` + To github_com_pingcap_tiflow_cdc_model.CaptureID `protobuf:"bytes,4,opt,name=to,proto3,casttype=github.com/pingcap/tiflow/cdc/model.CaptureID" json:"to,omitempty"` + DispatchTableRequest *DispatchTableRequest `protobuf:"bytes,5,opt,name=dispatch_table_request,json=dispatchTableRequest,proto3" json:"dispatch_table_request,omitempty"` + DispatchTableResponse *DispatchTableResponse `protobuf:"bytes,6,opt,name=dispatch_table_response,json=dispatchTableResponse,proto3" json:"dispatch_table_response,omitempty"` + Heartbeat *Heartbeat `protobuf:"bytes,7,opt,name=heartbeat,proto3" json:"heartbeat,omitempty"` + HeartbeatResponse *HeartbeatResponse `protobuf:"bytes,8,opt,name=heartbeat_response,json=heartbeatResponse,proto3" json:"heartbeat_response,omitempty"` + Checkpoints map[github_com_pingcap_tiflow_cdc_model.TableID]Checkpoint `protobuf:"bytes,9,rep,name=checkpoints,proto3,castkey=github.com/pingcap/tiflow/cdc/model.TableID" json:"checkpoints" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (m *Message) Reset() { *m = Message{} } @@ -835,9 +885,9 @@ func (m *Message) GetHeartbeatResponse() *HeartbeatResponse { return nil } -func (m *Message) GetCheckpoint() *Checkpoint { +func (m *Message) GetCheckpoints() map[github_com_pingcap_tiflow_cdc_model.TableID]Checkpoint { if m != nil { - return m.Checkpoint + return m.Checkpoints } return nil } @@ -919,83 +969,89 @@ func init() { proto.RegisterType((*OwnerRevision)(nil), "pingcap.tiflow.cdc.schedulepb.OwnerRevision") proto.RegisterType((*ProcessorEpoch)(nil), "pingcap.tiflow.cdc.schedulepb.ProcessorEpoch") proto.RegisterType((*Message)(nil), "pingcap.tiflow.cdc.schedulepb.Message") + proto.RegisterMapType((map[github_com_pingcap_tiflow_cdc_model.TableID]Checkpoint)(nil), "pingcap.tiflow.cdc.schedulepb.Message.CheckpointsEntry") proto.RegisterType((*Message_Header)(nil), "pingcap.tiflow.cdc.schedulepb.Message.Header") } func init() { proto.RegisterFile("table_schedule.proto", fileDescriptor_ab4bb9c6b16cfa4d) } var fileDescriptor_ab4bb9c6b16cfa4d = []byte{ - // 1114 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x57, 0x4f, 0x6f, 0xe3, 0x44, - 0x14, 0x8f, 0x93, 0x34, 0x7f, 0x5e, 0xda, 0xae, 0x3b, 0x9b, 0x6e, 0x83, 0x81, 0xc4, 0x58, 0xab, - 0xaa, 0x64, 0xd9, 0x84, 0xed, 0x72, 0x40, 0x7b, 0x41, 0xcd, 0x6e, 0x51, 0xab, 0xa5, 0xea, 0xca, - 0x4d, 0x0f, 0x20, 0xa4, 0xc8, 0xb1, 0x67, 0x1d, 0xd3, 0x24, 0x63, 0x3c, 0x6e, 0xab, 0x5e, 0xf8, - 0x00, 0x3e, 0xed, 0x89, 0x9b, 0x0f, 0x5c, 0xf9, 0x06, 0x48, 0x7c, 0x80, 0x3d, 0x70, 0xe8, 0x09, - 0x21, 0x21, 0x55, 0xd0, 0x7e, 0x07, 0x0e, 0xe5, 0x82, 0x3c, 0x9e, 0xd8, 0x71, 0x9b, 0x25, 0x29, - 0xcb, 0x72, 0xe1, 0x36, 0xf3, 0xfe, 0xfc, 0xde, 0xbc, 0xf7, 0x7e, 0xef, 0x59, 0x86, 0xb2, 0xab, - 0x75, 0xfb, 0xb8, 0x43, 0xf5, 0x1e, 0x36, 0x0e, 0xfb, 0xb8, 0x61, 0x3b, 0xc4, 0x25, 0xe8, 0x5d, - 0xdb, 0x1a, 0x9a, 0xba, 0x66, 0x37, 0x5c, 0xeb, 0x79, 0x9f, 0x1c, 0x37, 0x74, 0x43, 0x6f, 0x8c, - 0x4c, 0xec, 0xae, 0x54, 0x36, 0x89, 0x49, 0x98, 0x65, 0x33, 0x38, 0x85, 0x4e, 0xca, 0xf7, 0x02, - 0xc0, 0xe3, 0x1e, 0xd6, 0x0f, 0x6c, 0x62, 0x0d, 0x5d, 0xb4, 0x0b, 0x0b, 0x7a, 0x74, 0xeb, 0xb8, - 0xb4, 0x22, 0xc8, 0xc2, 0x5a, 0xb6, 0x55, 0xbf, 0x3c, 0xab, 0xad, 0x9a, 0x96, 0xdb, 0x3b, 0xec, - 0x36, 0x74, 0x32, 0x68, 0xf2, 0x48, 0xcd, 0x30, 0x52, 0x53, 0x37, 0xf4, 0xe6, 0x80, 0x18, 0xb8, - 0xdf, 0x68, 0x53, 0x75, 0x3e, 0x06, 0x68, 0x53, 0xf4, 0x14, 0x4a, 0x0e, 0xa6, 0xa4, 0x7f, 0x84, - 0x8d, 0x00, 0x2e, 0x7d, 0x63, 0x38, 0x18, 0xb9, 0xb7, 0xa9, 0xf2, 0xb3, 0x00, 0xb7, 0x36, 0x0c, - 0xa3, 0x1d, 0x64, 0xaf, 0xe2, 0xaf, 0x0f, 0x31, 0x75, 0xd1, 0x3e, 0x14, 0xc2, 0x6a, 0x58, 0x06, - 0x7b, 0x6c, 0xa6, 0xf5, 0xe8, 0xfc, 0xac, 0x96, 0x67, 0x36, 0xdb, 0x4f, 0x2e, 0xcf, 0x6a, 0xf7, - 0x66, 0x0a, 0x14, 0x9a, 0xab, 0x79, 0x86, 0xb5, 0x6d, 0xa0, 0xf7, 0x60, 0xde, 0xa2, 0x1d, 0x8a, - 0x75, 0x32, 0x34, 0x34, 0xe7, 0x84, 0x3d, 0xbc, 0xa0, 0x96, 0x2c, 0xba, 0x37, 0x12, 0xa1, 0x6d, - 0x80, 0x38, 0xd5, 0x4a, 0x46, 0x16, 0xd6, 0x4a, 0xeb, 0xef, 0x37, 0xfe, 0xb6, 0x09, 0x8d, 0xb8, - 0xd4, 0xea, 0x98, 0xb3, 0x72, 0x00, 0x48, 0xc5, 0x03, 0x72, 0x84, 0xff, 0x83, 0xd4, 0x94, 0x1f, - 0x04, 0x28, 0x3f, 0xb1, 0xa8, 0xad, 0xb9, 0x7a, 0x2f, 0x11, 0xef, 0x29, 0x14, 0x35, 0xc3, 0xe8, - 0x30, 0x3b, 0x16, 0xb0, 0xb4, 0xde, 0x98, 0x92, 0xcf, 0x95, 0x6e, 0xa8, 0x05, 0x8d, 0x0b, 0x50, - 0x1b, 0xe6, 0x1d, 0x96, 0x12, 0xc7, 0x4b, 0x33, 0xbc, 0x07, 0x53, 0xf0, 0xae, 0x57, 0x41, 0x2d, - 0x39, 0xb1, 0x4c, 0x79, 0x91, 0x06, 0x31, 0x8e, 0x49, 0x6d, 0x32, 0xa4, 0xf8, 0x4d, 0x51, 0xa0, - 0x05, 0x39, 0xea, 0x6a, 0xee, 0x21, 0xe5, 0x6f, 0xaf, 0x4f, 0x79, 0x3b, 0x03, 0xd8, 0x63, 0x1e, - 0x2a, 0xf7, 0xfc, 0x17, 0x39, 0x82, 0xee, 0x40, 0xce, 0xc1, 0x5f, 0x61, 0xdd, 0xad, 0x64, 0x19, - 0x17, 0xf9, 0x4d, 0xf9, 0x53, 0x80, 0xdb, 0x89, 0xb2, 0xfd, 0x9f, 0xaa, 0xa2, 0xfc, 0x28, 0xc0, - 0xf2, 0x15, 0x32, 0xf3, 0xfc, 0x3f, 0xbb, 0xce, 0xe6, 0xe6, 0xcc, 0x6c, 0x0e, 0x31, 0xc6, 0xe8, - 0xbc, 0x3f, 0x91, 0xce, 0xeb, 0x37, 0xa1, 0x33, 0xc7, 0x4c, 0xf0, 0xb9, 0x04, 0xc5, 0x2d, 0xac, - 0x39, 0x6e, 0x17, 0x6b, 0xae, 0xf2, 0x87, 0x00, 0xa5, 0xb1, 0x72, 0xbd, 0xa9, 0x0e, 0x7e, 0x02, - 0x73, 0x41, 0x1f, 0xc2, 0x1c, 0x16, 0xa7, 0x16, 0x3e, 0x7a, 0x11, 0x56, 0x43, 0x3f, 0xb4, 0xfb, - 0x5a, 0xed, 0x6b, 0x65, 0x5f, 0x9e, 0xd5, 0x52, 0x89, 0x26, 0x7e, 0x03, 0x4b, 0x51, 0x15, 0xa2, - 0xfe, 0x6d, 0x41, 0x8e, 0xbd, 0x38, 0xf8, 0x06, 0x65, 0x6e, 0x46, 0x34, 0x1e, 0x82, 0xfb, 0xa3, - 0x1a, 0x94, 0x82, 0x5d, 0xee, 0x12, 0x3b, 0x40, 0xe0, 0xab, 0x1c, 0x2c, 0xba, 0xc7, 0x25, 0xca, - 0x3d, 0x58, 0xd8, 0x3d, 0x1e, 0x62, 0x47, 0xc5, 0x47, 0x16, 0xb5, 0xc8, 0x10, 0x49, 0x50, 0x70, - 0xf8, 0x39, 0xac, 0xbc, 0x1a, 0xdd, 0x95, 0x55, 0x58, 0x7c, 0xe6, 0x10, 0x1d, 0x53, 0x4a, 0x9c, - 0x4d, 0x9b, 0xe8, 0x3d, 0x54, 0x86, 0x39, 0x1c, 0x1c, 0x98, 0x69, 0x51, 0x0d, 0x2f, 0xca, 0x4f, - 0x79, 0xc8, 0xef, 0x60, 0x4a, 0x35, 0x13, 0xa3, 0x4d, 0xc8, 0xf5, 0xb0, 0x66, 0x60, 0x87, 0x13, - 0xf1, 0xfe, 0x94, 0x5c, 0xb8, 0x5f, 0x63, 0x8b, 0x39, 0xa9, 0xdc, 0x19, 0x6d, 0x42, 0x61, 0x40, - 0xcd, 0x8e, 0x7b, 0x62, 0x8f, 0x9a, 0x57, 0x9f, 0x0d, 0xa8, 0x7d, 0x62, 0x63, 0x35, 0x3f, 0xa0, - 0x66, 0x70, 0x40, 0x9b, 0x90, 0x7d, 0xee, 0x90, 0x01, 0xeb, 0x5c, 0xb1, 0xf5, 0xe0, 0xf2, 0xac, - 0x76, 0x7f, 0x16, 0x22, 0x3d, 0xd6, 0x6c, 0xf7, 0xd0, 0x09, 0xa8, 0xc4, 0xdc, 0xd1, 0x06, 0xa4, - 0x5d, 0xc2, 0x96, 0xd1, 0x3f, 0x02, 0x49, 0xbb, 0x04, 0x59, 0x70, 0xc7, 0xe0, 0xc3, 0x1b, 0xce, - 0x55, 0xc7, 0x09, 0xb7, 0x7e, 0x65, 0x8e, 0xd5, 0xe9, 0xe1, 0x94, 0xf4, 0x26, 0x7d, 0xc6, 0xd4, - 0xb2, 0x31, 0xe9, 0xe3, 0xd6, 0x87, 0x95, 0x6b, 0xa1, 0x42, 0xa6, 0x55, 0x72, 0x2c, 0xd6, 0x47, - 0x37, 0x8b, 0xc5, 0xa7, 0x79, 0xd9, 0x98, 0xb8, 0x7c, 0x3e, 0x85, 0x62, 0x6f, 0xc4, 0xe8, 0x4a, - 0x9e, 0xe1, 0xaf, 0x4d, 0xc1, 0x8f, 0x27, 0x20, 0x76, 0x45, 0x1d, 0x40, 0xd1, 0x25, 0x7e, 0x70, - 0x81, 0x01, 0x7e, 0x38, 0x33, 0xe0, 0xe8, 0xb1, 0x4b, 0xbd, 0x6b, 0x53, 0x96, 0x5c, 0xc5, 0xc5, - 0xd7, 0x58, 0xc5, 0xd2, 0xaf, 0x02, 0xe4, 0x42, 0xc2, 0xa2, 0x0a, 0xe4, 0x8f, 0xb0, 0x13, 0x8d, - 0x4f, 0x51, 0x1d, 0x5d, 0xd1, 0xe7, 0xb0, 0x48, 0x82, 0x51, 0xeb, 0x44, 0xf3, 0x15, 0x6e, 0xd2, - 0x0f, 0xa6, 0xc4, 0x4c, 0xcc, 0x27, 0x9f, 0xef, 0x05, 0x92, 0x18, 0xda, 0x2f, 0xe1, 0x96, 0x3d, - 0x1a, 0xcc, 0x4e, 0x38, 0x90, 0x99, 0x99, 0xa6, 0x2d, 0x39, 0xce, 0x1c, 0x7c, 0xd1, 0x4e, 0x48, - 0xeb, 0xdf, 0xa6, 0x01, 0xe2, 0x55, 0x88, 0x14, 0xc8, 0xef, 0x0f, 0x0f, 0x86, 0xe4, 0x78, 0x28, - 0xa6, 0xa4, 0x65, 0xcf, 0x97, 0x97, 0x62, 0x25, 0x57, 0x20, 0x19, 0x72, 0x1b, 0x5d, 0x8a, 0x87, - 0xae, 0x28, 0x48, 0x65, 0xcf, 0x97, 0xc5, 0xd8, 0x24, 0x94, 0xa3, 0x55, 0x28, 0x3e, 0x73, 0xb0, - 0xad, 0x39, 0xd6, 0xd0, 0x14, 0xd3, 0xd2, 0x8a, 0xe7, 0xcb, 0xb7, 0x63, 0xa3, 0x48, 0x85, 0xee, - 0x42, 0x21, 0xbc, 0x60, 0x43, 0xcc, 0x48, 0x77, 0x3c, 0x5f, 0x46, 0x57, 0xcd, 0xb0, 0x81, 0xea, - 0x50, 0x52, 0xb1, 0xdd, 0xb7, 0x74, 0xcd, 0x0d, 0xf0, 0xb2, 0xd2, 0x5b, 0x9e, 0x2f, 0x2f, 0x8f, - 0xed, 0xef, 0x58, 0x19, 0x20, 0x8e, 0xd6, 0x9f, 0x38, 0x77, 0x15, 0x71, 0xa4, 0x09, 0xb2, 0x64, - 0x67, 0x6c, 0x88, 0xb9, 0xab, 0x59, 0x72, 0x45, 0xfd, 0xbb, 0x34, 0x94, 0xc6, 0xd6, 0x0c, 0xaa, - 0x02, 0xec, 0x50, 0x33, 0x2e, 0xce, 0xa2, 0xe7, 0xcb, 0x63, 0x12, 0x74, 0x17, 0x16, 0x76, 0xa8, - 0x19, 0x73, 0x48, 0x14, 0xa4, 0x25, 0xcf, 0x97, 0x93, 0x42, 0xf4, 0x31, 0xac, 0xec, 0x50, 0x73, - 0xd2, 0x7c, 0x8b, 0x69, 0xe9, 0x6d, 0xcf, 0x97, 0x5f, 0xa5, 0x46, 0x8f, 0xa0, 0x72, 0x5d, 0x15, - 0xb2, 0x5d, 0xcc, 0x48, 0xef, 0x78, 0xbe, 0xfc, 0x4a, 0x3d, 0x52, 0x60, 0x7e, 0x87, 0x9a, 0xd1, - 0xe0, 0x88, 0x59, 0x49, 0xf4, 0x7c, 0x39, 0x21, 0x43, 0xeb, 0x50, 0x1e, 0xbf, 0x47, 0xd8, 0x73, - 0x52, 0xc5, 0xf3, 0xe5, 0x89, 0xba, 0xd6, 0xda, 0xe9, 0xef, 0xd5, 0xd4, 0xcb, 0xf3, 0xaa, 0x70, - 0x7a, 0x5e, 0x15, 0x7e, 0x3b, 0xaf, 0x0a, 0x2f, 0x2e, 0xaa, 0xa9, 0xd3, 0x8b, 0x6a, 0xea, 0x97, - 0x8b, 0x6a, 0xea, 0x0b, 0x88, 0xb9, 0xd8, 0xcd, 0xb1, 0xdf, 0xb2, 0x87, 0x7f, 0x05, 0x00, 0x00, - 0xff, 0xff, 0x65, 0xd5, 0x46, 0xda, 0xe3, 0x0d, 0x00, 0x00, + // 1186 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xcf, 0x6f, 0xe3, 0xc4, + 0x17, 0x8f, 0x93, 0x34, 0x3f, 0x5e, 0xda, 0xae, 0x3b, 0x9b, 0xee, 0xe6, 0xeb, 0x2f, 0x24, 0xc6, + 0x5a, 0x55, 0x25, 0xcb, 0x26, 0x6c, 0x8b, 0xc4, 0xaa, 0x17, 0xd4, 0xec, 0x16, 0xb5, 0x42, 0xa1, + 0x2b, 0xb7, 0x0b, 0x02, 0x21, 0x45, 0x8e, 0x3d, 0xeb, 0x98, 0x26, 0x1e, 0xe3, 0x71, 0x5a, 0xf5, + 0xc2, 0x81, 0x63, 0x0e, 0x88, 0x13, 0xb7, 0x1c, 0x90, 0x38, 0xf1, 0x1f, 0xf0, 0x1f, 0xf4, 0xc0, + 0xa1, 0x27, 0x84, 0x84, 0x54, 0xa0, 0xfd, 0x1f, 0x38, 0x54, 0x42, 0x42, 0x1e, 0x4f, 0xec, 0x24, + 0xcd, 0x92, 0x84, 0x15, 0x88, 0xdb, 0xcc, 0xbc, 0xf7, 0x3e, 0xef, 0xe7, 0xe7, 0x59, 0x86, 0xbc, + 0xa7, 0x35, 0xdb, 0xb8, 0x41, 0xf5, 0x16, 0x36, 0xba, 0x6d, 0x5c, 0x71, 0x5c, 0xe2, 0x11, 0xf4, + 0xaa, 0x63, 0xd9, 0xa6, 0xae, 0x39, 0x15, 0xcf, 0x7a, 0xde, 0x26, 0x27, 0x15, 0xdd, 0xd0, 0x2b, + 0x03, 0x15, 0xa7, 0x29, 0xe5, 0x4d, 0x62, 0x12, 0xa6, 0x59, 0xf5, 0x4f, 0x81, 0x91, 0xf2, 0x9d, + 0x00, 0xf0, 0xb8, 0x85, 0xf5, 0x23, 0x87, 0x58, 0xb6, 0x87, 0xf6, 0x61, 0x49, 0x0f, 0x6f, 0x0d, + 0x8f, 0x16, 0x04, 0x59, 0x58, 0x4f, 0xd6, 0xca, 0xd7, 0x17, 0xa5, 0x35, 0xd3, 0xf2, 0x5a, 0xdd, + 0x66, 0x45, 0x27, 0x9d, 0x2a, 0xf7, 0x54, 0x0d, 0x3c, 0x55, 0x75, 0x43, 0xaf, 0x76, 0x88, 0x81, + 0xdb, 0x95, 0x43, 0xaa, 0x2e, 0x46, 0x00, 0x87, 0x14, 0xbd, 0x07, 0x39, 0x17, 0x53, 0xd2, 0x3e, + 0xc6, 0x86, 0x0f, 0x17, 0x9f, 0x1b, 0x0e, 0x06, 0xe6, 0x87, 0x54, 0xf9, 0x51, 0x80, 0x5b, 0xdb, + 0x86, 0x71, 0xe8, 0x67, 0xaf, 0xe2, 0xcf, 0xba, 0x98, 0x7a, 0xe8, 0x19, 0x64, 0x82, 0x6a, 0x58, + 0x06, 0x0b, 0x36, 0x51, 0xdb, 0xba, 0xbc, 0x28, 0xa5, 0x99, 0xce, 0xde, 0x93, 0xeb, 0x8b, 0xd2, + 0xfd, 0x99, 0x1c, 0x05, 0xea, 0x6a, 0x9a, 0x61, 0xed, 0x19, 0xe8, 0x35, 0x58, 0xb4, 0x68, 0x83, + 0x62, 0x9d, 0xd8, 0x86, 0xe6, 0x9e, 0xb2, 0xc0, 0x33, 0x6a, 0xce, 0xa2, 0x07, 0x83, 0x27, 0xb4, + 0x07, 0x10, 0xa5, 0x5a, 0x48, 0xc8, 0xc2, 0x7a, 0x6e, 0xe3, 0xf5, 0xca, 0x5f, 0x36, 0xa1, 0x12, + 0x95, 0x5a, 0x1d, 0x32, 0x56, 0x8e, 0x00, 0xa9, 0xb8, 0x43, 0x8e, 0xf1, 0xbf, 0x90, 0x9a, 0x72, + 0x26, 0x40, 0xfe, 0x89, 0x45, 0x1d, 0xcd, 0xd3, 0x5b, 0x23, 0xfe, 0xea, 0x90, 0xd5, 0x0c, 0xa3, + 0xc1, 0xf4, 0x98, 0xc3, 0xdc, 0x46, 0x65, 0x4a, 0x3e, 0x63, 0xdd, 0xd8, 0x8d, 0xa9, 0x19, 0x8d, + 0x3f, 0xa1, 0x0f, 0x60, 0xd1, 0x65, 0x49, 0x71, 0xc4, 0x38, 0x43, 0x7c, 0x38, 0x05, 0xf1, 0x66, + 0x1d, 0x76, 0x63, 0x6a, 0xce, 0x8d, 0x5e, 0x6b, 0x59, 0x48, 0xbb, 0x81, 0x44, 0xf9, 0x5e, 0x00, + 0x31, 0x0a, 0x81, 0x3a, 0xc4, 0xa6, 0x18, 0xd5, 0x20, 0x45, 0x3d, 0xcd, 0xeb, 0x52, 0x9e, 0x43, + 0x79, 0x8a, 0x47, 0x66, 0x7d, 0xc0, 0x2c, 0x54, 0x6e, 0x39, 0xd6, 0xdb, 0xf8, 0x4b, 0xf4, 0x16, + 0xdd, 0x81, 0x94, 0x8b, 0x3f, 0xc5, 0x7a, 0x30, 0x22, 0x19, 0x95, 0xdf, 0x94, 0x6f, 0x05, 0xb8, + 0x3d, 0x92, 0xec, 0x7f, 0x32, 0x7c, 0xe5, 0x07, 0x01, 0x56, 0xc7, 0xa6, 0x85, 0x07, 0xfa, 0xfe, + 0xcd, 0x71, 0xa9, 0xce, 0x3c, 0x2e, 0x01, 0xc6, 0xc8, 0xbc, 0x7c, 0x38, 0x71, 0x5e, 0x36, 0xe6, + 0x99, 0x97, 0x10, 0x75, 0x64, 0x60, 0x00, 0x32, 0x2e, 0x17, 0x29, 0x39, 0xc8, 0xee, 0x62, 0xcd, + 0xf5, 0x9a, 0x58, 0xf3, 0x94, 0xdf, 0x05, 0xc8, 0x0d, 0x95, 0xef, 0x9f, 0xda, 0x25, 0xef, 0xc0, + 0x82, 0xdf, 0x97, 0x20, 0xa3, 0xe5, 0xa9, 0x8d, 0x08, 0x23, 0xc2, 0x6a, 0x60, 0x87, 0xf6, 0x5f, + 0x6a, 0xd3, 0xd4, 0x92, 0x67, 0x17, 0xa5, 0xd8, 0x48, 0x53, 0x3f, 0x87, 0x95, 0xb0, 0x0a, 0x61, + 0x3f, 0x77, 0x21, 0xc5, 0x22, 0xf6, 0x07, 0x2f, 0x31, 0xdf, 0xe0, 0x71, 0x17, 0xdc, 0x1e, 0x95, + 0x20, 0xe7, 0x2f, 0x4f, 0x8f, 0x38, 0x3e, 0x02, 0xdf, 0x9d, 0x60, 0xd1, 0x03, 0xfe, 0xa2, 0xdc, + 0x87, 0xa5, 0xfd, 0x13, 0x1b, 0xbb, 0x2a, 0x3e, 0xb6, 0xa8, 0x45, 0x6c, 0x24, 0xf9, 0x2d, 0x0a, + 0xce, 0x41, 0xe5, 0xd5, 0xf0, 0xae, 0xac, 0xc1, 0xf2, 0x53, 0x97, 0xe8, 0x98, 0x52, 0xe2, 0xee, + 0x38, 0x44, 0x6f, 0xa1, 0x3c, 0x2c, 0x60, 0xff, 0xc0, 0x54, 0xb3, 0x6a, 0x70, 0x51, 0xfe, 0xc8, + 0x40, 0xba, 0x8e, 0x29, 0xd5, 0x4c, 0x8c, 0x76, 0x20, 0xd5, 0xc2, 0x9a, 0x81, 0x5d, 0x3e, 0x98, + 0x0f, 0xa6, 0xe4, 0xc2, 0xed, 0x2a, 0xbb, 0xcc, 0x48, 0xe5, 0xc6, 0x68, 0x07, 0x32, 0x1d, 0x6a, + 0x36, 0xbc, 0x53, 0x67, 0xd0, 0xbc, 0xf2, 0x6c, 0x40, 0x87, 0xa7, 0x0e, 0x56, 0xd3, 0x1d, 0x6a, + 0xfa, 0x07, 0xb4, 0x03, 0xc9, 0xe7, 0x2e, 0xe9, 0xb0, 0xce, 0x65, 0x6b, 0x0f, 0xaf, 0x2f, 0x4a, + 0x0f, 0x66, 0x19, 0xa4, 0xc7, 0x9a, 0xe3, 0x75, 0x5d, 0x7f, 0x94, 0x98, 0x39, 0xda, 0x86, 0xb8, + 0x47, 0x0a, 0xc9, 0xbf, 0x0b, 0x12, 0xf7, 0x08, 0xb2, 0xe0, 0x8e, 0xc1, 0xc9, 0x1c, 0xb0, 0xac, + 0xc1, 0x57, 0x69, 0x61, 0x81, 0xd5, 0x69, 0x73, 0x4a, 0x7a, 0x93, 0xbe, 0x1b, 0x6a, 0xde, 0x98, + 0xf4, 0x35, 0x69, 0xc3, 0xdd, 0x1b, 0xae, 0x82, 0x49, 0x2b, 0xa4, 0x98, 0xaf, 0xb7, 0xe6, 0xf3, + 0x15, 0xd8, 0xaa, 0xab, 0xc6, 0xc4, 0x65, 0xf4, 0x2e, 0x64, 0x5b, 0x83, 0x89, 0x2e, 0xa4, 0x19, + 0xfe, 0xfa, 0x14, 0xfc, 0x88, 0x01, 0x91, 0x29, 0x6a, 0x00, 0x0a, 0x2f, 0x51, 0xc0, 0x19, 0x06, + 0xf8, 0xe6, 0xcc, 0x80, 0x83, 0x60, 0x57, 0x5a, 0x37, 0x58, 0xf6, 0xa5, 0x00, 0xb9, 0x88, 0x89, + 0xb4, 0x90, 0x65, 0x5c, 0x7b, 0x7b, 0xc6, 0xf9, 0x8c, 0x58, 0x4d, 0x77, 0x6c, 0xcf, 0x3d, 0xad, + 0x6d, 0xfa, 0xc4, 0xfb, 0xe2, 0x97, 0xf9, 0x36, 0xd3, 0x70, 0x00, 0xd2, 0xcf, 0x02, 0xa4, 0x82, + 0xb1, 0x47, 0x05, 0x48, 0x1f, 0x63, 0x37, 0x24, 0x61, 0x56, 0x1d, 0x5c, 0xd1, 0x47, 0xb0, 0x4c, + 0x7c, 0xc2, 0x36, 0x42, 0x96, 0x06, 0xdb, 0xf9, 0x8d, 0x29, 0x71, 0x8f, 0xb0, 0x9c, 0x6f, 0x89, + 0x25, 0x32, 0x42, 0xfd, 0x4f, 0xe0, 0x96, 0x33, 0xa0, 0x77, 0x23, 0xa0, 0x75, 0x62, 0x26, 0xce, + 0x8e, 0x2e, 0x05, 0x0e, 0xbe, 0xec, 0x8c, 0xbc, 0x4a, 0x16, 0x88, 0xe3, 0x35, 0x43, 0x22, 0x24, + 0x8e, 0xf0, 0x29, 0xdf, 0x33, 0xfe, 0xd1, 0xdf, 0xd0, 0xc7, 0x5a, 0xbb, 0x8b, 0xe7, 0xff, 0x54, + 0x06, 0x76, 0x5b, 0xf1, 0x47, 0x42, 0xf9, 0xeb, 0x38, 0x40, 0xb4, 0xbb, 0x91, 0x02, 0xe9, 0x67, + 0xf6, 0x91, 0x4d, 0x4e, 0x6c, 0x31, 0x26, 0xad, 0xf6, 0xfa, 0xf2, 0x4a, 0x24, 0xe4, 0x02, 0x24, + 0x43, 0x6a, 0xbb, 0x49, 0xb1, 0xed, 0x89, 0x82, 0x94, 0xef, 0xf5, 0x65, 0x31, 0x52, 0x09, 0xde, + 0xd1, 0x1a, 0x64, 0x9f, 0xba, 0xd8, 0xd1, 0x5c, 0xcb, 0x36, 0xc5, 0xb8, 0x74, 0xb7, 0xd7, 0x97, + 0x6f, 0x47, 0x4a, 0xa1, 0x08, 0xdd, 0x83, 0x4c, 0x70, 0xc1, 0x86, 0x98, 0x90, 0xee, 0xf4, 0xfa, + 0x32, 0x1a, 0x57, 0xc3, 0x06, 0x2a, 0x43, 0x4e, 0xc5, 0x4e, 0xdb, 0xd2, 0x35, 0xcf, 0xc7, 0x4b, + 0x4a, 0xff, 0xeb, 0xf5, 0xe5, 0xd5, 0xa1, 0x0f, 0x4e, 0x24, 0xf4, 0x11, 0x07, 0xfb, 0x5a, 0x5c, + 0x18, 0x47, 0x1c, 0x48, 0xfc, 0x2c, 0xd9, 0x19, 0x1b, 0x62, 0x6a, 0x3c, 0x4b, 0x2e, 0x28, 0x7f, + 0x13, 0x87, 0xdc, 0xd0, 0x5e, 0x44, 0x45, 0x80, 0x3a, 0x35, 0xa3, 0xe2, 0x2c, 0xf7, 0xfa, 0xf2, + 0xd0, 0x0b, 0xba, 0x07, 0x4b, 0x75, 0x6a, 0x46, 0x45, 0x16, 0x05, 0x69, 0xa5, 0xd7, 0x97, 0x47, + 0x1f, 0xd1, 0x23, 0xb8, 0x5b, 0xa7, 0xe6, 0xa4, 0x85, 0x24, 0xc6, 0xa5, 0xff, 0xf7, 0xfa, 0xf2, + 0x8b, 0xc4, 0x68, 0x0b, 0x0a, 0x37, 0x45, 0x01, 0x3d, 0xc5, 0x84, 0xf4, 0x4a, 0xaf, 0x2f, 0xbf, + 0x50, 0x8e, 0x14, 0x58, 0xac, 0x53, 0x33, 0x64, 0xba, 0x98, 0x94, 0xc4, 0x5e, 0x5f, 0x1e, 0x79, + 0x43, 0x1b, 0x90, 0x1f, 0xbe, 0x87, 0xd8, 0x0b, 0x52, 0xa1, 0xd7, 0x97, 0x27, 0xca, 0x6a, 0xeb, + 0xe7, 0xbf, 0x15, 0x63, 0x67, 0x97, 0x45, 0xe1, 0xfc, 0xb2, 0x28, 0xfc, 0x7a, 0x59, 0x14, 0xbe, + 0xba, 0x2a, 0xc6, 0xce, 0xaf, 0x8a, 0xb1, 0x9f, 0xae, 0x8a, 0xb1, 0x8f, 0x21, 0x1a, 0xbe, 0x66, + 0x8a, 0xfd, 0xb8, 0x6d, 0xfe, 0x19, 0x00, 0x00, 0xff, 0xff, 0xcb, 0x2c, 0x20, 0x3a, 0x05, 0x0e, + 0x00, 0x00, } func (m *Checkpoint) Marshal() (dAtA []byte, err error) { @@ -1129,18 +1185,25 @@ func (m *DispatchTableRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.RemoveTable != nil { + if m.Request != nil { { - size, err := m.RemoveTable.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { + size := m.Request.Size() + i -= size + if _, err := m.Request.MarshalTo(dAtA[i:]); err != nil { return 0, err } - i -= size - i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } - i-- - dAtA[i] = 0x12 } + return len(dAtA) - i, nil +} + +func (m *DispatchTableRequest_AddTable) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DispatchTableRequest_AddTable) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) if m.AddTable != nil { { size, err := m.AddTable.MarshalToSizedBuffer(dAtA[:i]) @@ -1155,7 +1218,27 @@ func (m *DispatchTableRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { } return len(dAtA) - i, nil } +func (m *DispatchTableRequest_RemoveTable) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} +func (m *DispatchTableRequest_RemoveTable) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.RemoveTable != nil { + { + size, err := m.RemoveTable.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTableSchedule(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} func (m *AddTableResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1184,7 +1267,7 @@ func (m *AddTableResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { dAtA[i] = 0 } i-- - dAtA[i] = 0x20 + dAtA[i] = 0x18 } if m.Checkpoint != nil { { @@ -1196,7 +1279,7 @@ func (m *AddTableResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x1a + dAtA[i] = 0x12 } if m.Status != nil { { @@ -1208,12 +1291,7 @@ func (m *AddTableResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x12 - } - if m.TableID != 0 { - i = encodeVarintTableSchedule(dAtA, i, uint64(m.TableID)) - i-- - dAtA[i] = 0x8 + dAtA[i] = 0xa } return len(dAtA) - i, nil } @@ -1248,7 +1326,7 @@ func (m *RemoveTableResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x1a + dAtA[i] = 0x12 } if m.Status != nil { { @@ -1260,12 +1338,7 @@ func (m *RemoveTableResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x12 - } - if m.TableID != 0 { - i = encodeVarintTableSchedule(dAtA, i, uint64(m.TableID)) - i-- - dAtA[i] = 0x8 + dAtA[i] = 0xa } return len(dAtA) - i, nil } @@ -1290,18 +1363,25 @@ func (m *DispatchTableResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.RemoveTable != nil { + if m.Response != nil { { - size, err := m.RemoveTable.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { + size := m.Response.Size() + i -= size + if _, err := m.Response.MarshalTo(dAtA[i:]); err != nil { return 0, err } - i -= size - i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } - i-- - dAtA[i] = 0x12 } + return len(dAtA) - i, nil +} + +func (m *DispatchTableResponse_AddTable) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DispatchTableResponse_AddTable) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) if m.AddTable != nil { { size, err := m.AddTable.MarshalToSizedBuffer(dAtA[:i]) @@ -1316,7 +1396,27 @@ func (m *DispatchTableResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { } return len(dAtA) - i, nil } +func (m *DispatchTableResponse_RemoveTable) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} +func (m *DispatchTableResponse_RemoveTable) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.RemoveTable != nil { + { + size, err := m.RemoveTable.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTableSchedule(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} func (m *Heartbeat) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1508,17 +1608,27 @@ func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.Checkpoint != nil { - { - size, err := m.Checkpoint.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err + if len(m.Checkpoints) > 0 { + for k := range m.Checkpoints { + v := m.Checkpoints[k] + baseI := i + { + size, err := (&v).MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } - i -= size - i = encodeVarintTableSchedule(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x12 + i = encodeVarintTableSchedule(dAtA, i, uint64(k)) + i-- + dAtA[i] = 0x8 + i = encodeVarintTableSchedule(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0x4a } - i-- - dAtA[i] = 0x4a } if m.HeartbeatResponse != nil { { @@ -1710,6 +1820,18 @@ func (m *RemoveTableRequest) Size() (n int) { } func (m *DispatchTableRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Request != nil { + n += m.Request.Size() + } + return n +} + +func (m *DispatchTableRequest_AddTable) Size() (n int) { if m == nil { return 0 } @@ -1719,22 +1841,26 @@ func (m *DispatchTableRequest) Size() (n int) { l = m.AddTable.Size() n += 1 + l + sovTableSchedule(uint64(l)) } + return n +} +func (m *DispatchTableRequest_RemoveTable) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l if m.RemoveTable != nil { l = m.RemoveTable.Size() n += 1 + l + sovTableSchedule(uint64(l)) } return n } - func (m *AddTableResponse) Size() (n int) { if m == nil { return 0 } var l int _ = l - if m.TableID != 0 { - n += 1 + sovTableSchedule(uint64(m.TableID)) - } if m.Status != nil { l = m.Status.Size() n += 1 + l + sovTableSchedule(uint64(l)) @@ -1755,9 +1881,6 @@ func (m *RemoveTableResponse) Size() (n int) { } var l int _ = l - if m.TableID != 0 { - n += 1 + sovTableSchedule(uint64(m.TableID)) - } if m.Status != nil { l = m.Status.Size() n += 1 + l + sovTableSchedule(uint64(l)) @@ -1770,6 +1893,18 @@ func (m *RemoveTableResponse) Size() (n int) { } func (m *DispatchTableResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Response != nil { + n += m.Response.Size() + } + return n +} + +func (m *DispatchTableResponse_AddTable) Size() (n int) { if m == nil { return 0 } @@ -1779,13 +1914,20 @@ func (m *DispatchTableResponse) Size() (n int) { l = m.AddTable.Size() n += 1 + l + sovTableSchedule(uint64(l)) } + return n +} +func (m *DispatchTableResponse_RemoveTable) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l if m.RemoveTable != nil { l = m.RemoveTable.Size() n += 1 + l + sovTableSchedule(uint64(l)) } return n } - func (m *Heartbeat) Size() (n int) { if m == nil { return 0 @@ -1892,9 +2034,14 @@ func (m *Message) Size() (n int) { l = m.HeartbeatResponse.Size() n += 1 + l + sovTableSchedule(uint64(l)) } - if m.Checkpoint != nil { - l = m.Checkpoint.Size() - n += 1 + l + sovTableSchedule(uint64(l)) + if len(m.Checkpoints) > 0 { + for k, v := range m.Checkpoints { + _ = k + _ = v + l = v.Size() + mapEntrySize := 1 + sovTableSchedule(uint64(k)) + 1 + l + sovTableSchedule(uint64(l)) + n += mapEntrySize + 1 + sovTableSchedule(uint64(mapEntrySize)) + } } return n } @@ -2262,12 +2409,11 @@ func (m *DispatchTableRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.AddTable == nil { - m.AddTable = &AddTableRequest{} - } - if err := m.AddTable.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + v := &AddTableRequest{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } + m.Request = &DispatchTableRequest_AddTable{v} iNdEx = postIndex case 2: if wireType != 2 { @@ -2298,12 +2444,11 @@ func (m *DispatchTableRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.RemoveTable == nil { - m.RemoveTable = &RemoveTableRequest{} - } - if err := m.RemoveTable.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + v := &RemoveTableRequest{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } + m.Request = &DispatchTableRequest_RemoveTable{v} iNdEx = postIndex default: iNdEx = preIndex @@ -2356,25 +2501,6 @@ func (m *AddTableResponse) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field TableID", wireType) - } - m.TableID = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTableSchedule - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.TableID |= github_com_pingcap_tiflow_cdc_model.TableID(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) } @@ -2410,7 +2536,7 @@ func (m *AddTableResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex - case 3: + case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Checkpoint", wireType) } @@ -2446,7 +2572,7 @@ func (m *AddTableResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex - case 4: + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Reject", wireType) } @@ -2517,25 +2643,6 @@ func (m *RemoveTableResponse) Unmarshal(dAtA []byte) error { } switch fieldNum { case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field TableID", wireType) - } - m.TableID = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowTableSchedule - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.TableID |= github_com_pingcap_tiflow_cdc_model.TableID(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) } @@ -2571,7 +2678,7 @@ func (m *RemoveTableResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex - case 3: + case 2: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Checkpoint", wireType) } @@ -2686,12 +2793,11 @@ func (m *DispatchTableResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.AddTable == nil { - m.AddTable = &AddTableResponse{} - } - if err := m.AddTable.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + v := &AddTableResponse{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } + m.Response = &DispatchTableResponse_AddTable{v} iNdEx = postIndex case 2: if wireType != 2 { @@ -2722,12 +2828,11 @@ func (m *DispatchTableResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.RemoveTable == nil { - m.RemoveTable = &RemoveTableResponse{} - } - if err := m.RemoveTable.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + v := &RemoveTableResponse{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } + m.Response = &DispatchTableResponse_RemoveTable{v} iNdEx = postIndex default: iNdEx = preIndex @@ -3470,7 +3575,7 @@ func (m *Message) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 9: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Checkpoint", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Checkpoints", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -3497,12 +3602,91 @@ func (m *Message) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.Checkpoint == nil { - m.Checkpoint = &Checkpoint{} - } - if err := m.Checkpoint.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } + if m.Checkpoints == nil { + m.Checkpoints = make(map[github_com_pingcap_tiflow_cdc_model.TableID]Checkpoint) + } + var mapkey int64 + mapvalue := &Checkpoint{} + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTableSchedule + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTableSchedule + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapkey |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + } else if fieldNum == 2 { + var mapmsglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTableSchedule + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapmsglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if mapmsglen < 0 { + return ErrInvalidLengthTableSchedule + } + postmsgIndex := iNdEx + mapmsglen + if postmsgIndex < 0 { + return ErrInvalidLengthTableSchedule + } + if postmsgIndex > l { + return io.ErrUnexpectedEOF + } + mapvalue = &Checkpoint{} + if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { + return err + } + iNdEx = postmsgIndex + } else { + iNdEx = entryPreIndex + skippy, err := skipTableSchedule(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTableSchedule + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.Checkpoints[github_com_pingcap_tiflow_cdc_model.TableID(mapkey)] = *mapvalue iNdEx = postIndex default: iNdEx = preIndex diff --git a/cdc/scheduler/internal/tp/schedulepb/table_schedule_manual_test.go b/cdc/scheduler/internal/tp/schedulepb/table_schedule_manual_test.go new file mode 100644 index 00000000000..2195526aacb --- /dev/null +++ b/cdc/scheduler/internal/tp/schedulepb/table_schedule_manual_test.go @@ -0,0 +1,91 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package schedulepb + +import ( + fmt "fmt" + testing "testing" + + "github.com/gogo/protobuf/proto" +) + +func benchmarkMessageCheckpoints(b *testing.B, bench func(b *testing.B, m *Message)) { + size := 16384 + for total := 1; total <= size; total *= 2 { + msg := Message{ + Checkpoints: map[int64]Checkpoint{}, + } + for i := 0; i < total; i++ { + msg.Checkpoints[int64(10000+i)] = Checkpoint{ + CheckpointTs: 433331421532337260, + ResolvedTs: 433331421532337261, + } + } + b.ResetTimer() + bench(b, &msg) + b.StopTimer() + } + +} + +func BenchmarkMessageCheckpointsProtoMarshal(b *testing.B) { + benchmarkMessageCheckpoints(b, func(b *testing.B, msg *Message) { + total := len(msg.Checkpoints) + b.Run(fmt.Sprintf("%d checkpoint(s) marshal", total), func(b *testing.B) { + totalLen := 0 + for i := 0; i < b.N; i++ { + dAtA, err := proto.Marshal(msg) + if err != nil { + panic(err) + } + totalLen += len(dAtA) + } + b.SetBytes(int64(totalLen / b.N)) + }) + }) +} + +func BenchmarkMessageCheckpointsProtoUnmarshal(b *testing.B) { + benchmarkMessageCheckpoints(b, func(b *testing.B, msg *Message) { + total := len(msg.Checkpoints) + b.Run(fmt.Sprintf("%d checkpoint(s) unmarshal", total), func(b *testing.B) { + dAtA, err := proto.Marshal(msg) + if err != nil { + panic(err) + } + dst := Message{} + totalLen := 0 + + for i := 0; i < b.N; i++ { + err := proto.Unmarshal(dAtA, &dst) + if err != nil { + panic(err) + } + totalLen += len(dAtA) + } + b.SetBytes(int64(totalLen / b.N)) + }) + }) +} + +func BenchmarkMessageCheckpointsProtoSize(b *testing.B) { + benchmarkMessageCheckpoints(b, func(b *testing.B, msg *Message) { + total := len(msg.Checkpoints) + b.Run(fmt.Sprintf("%d checkpoint(s) size", total), func(b *testing.B) { + for i := 0; i < b.N; i++ { + msg.Size() + } + }) + }) +} diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 19df3699ab3..a235c7a9c44 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -31,6 +31,9 @@ type DebugConfig struct { // The default value is true. EnableNewScheduler bool `toml:"enable-new-scheduler" json:"enable-new-scheduler"` Messages *MessagesConfig `toml:"messages" json:"messages"` + + EnableTwoPhaseScheduler bool + Scheduler *SchedulerConfig } // ValidateAndAdjust validates and adjusts the debug configuration @@ -41,5 +44,8 @@ func (c *DebugConfig) ValidateAndAdjust() error { if err := c.DB.ValidateAndAdjust(); err != nil { return errors.Trace(err) } + if err := c.Scheduler.ValidateAndAdjust(); err != nil { + return errors.Trace(err) + } return nil } diff --git a/pkg/config/scheduler_config.go b/pkg/config/scheduler_config.go new file mode 100644 index 00000000000..104876586bb --- /dev/null +++ b/pkg/config/scheduler_config.go @@ -0,0 +1,35 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + cerrors "github.com/pingcap/tiflow/pkg/errors" +) + +// SchedulerConfig configs TiCDC scheduler. +type SchedulerConfig struct { + HeartbeatTick int `toml:"heartbeat-tick" json:"heartbeat-tick"` + MaxTaskConcurrency int `toml:"max-task-concurrency" json:"max-task-concurrency"` +} + +// ValidateAndAdjust verifies that each parameter is valid. +func (c *SchedulerConfig) ValidateAndAdjust() error { + if c.HeartbeatTick <= 0 { + return cerrors.ErrInvalidServerOption.GenWithStackByArgs("heartbeat-tick must be larger than 0") + } + if c.MaxTaskConcurrency <= 0 { + return cerrors.ErrInvalidServerOption.GenWithStackByArgs("max-task-concurrency must be larger than 0") + } + return nil +} diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 864b9ad361b..0e53cb5140e 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -128,6 +128,12 @@ var defaultServerConfig = &ServerConfig{ IteratorSlowReadDuration: 256, }, Messages: defaultMessageConfig.Clone(), + + EnableTwoPhaseScheduler: false, + Scheduler: &SchedulerConfig{ + HeartbeatTick: 2, + MaxTaskConcurrency: 10, + }, }, } diff --git a/pkg/config/server_config_test.go b/pkg/config/server_config_test.go index d6a45f29a63..aadcdca1330 100644 --- a/pkg/config/server_config_test.go +++ b/pkg/config/server_config_test.go @@ -103,3 +103,20 @@ func TestKVClientConfigValidateAndAdjust(t *testing.T) { conf.RegionRetryDuration = -TomlDuration(time.Second) require.Error(t, conf.ValidateAndAdjust()) } + +func TestSchedulerConfigValidateAndAdjust(t *testing.T) { + t.Parallel() + conf := GetDefaultServerConfig().Clone().Debug.Scheduler + require.Nil(t, conf.ValidateAndAdjust()) + + conf.HeartbeatTick = -1 + require.Error(t, conf.ValidateAndAdjust()) + conf.HeartbeatTick = 0 + require.Error(t, conf.ValidateAndAdjust()) + conf.HeartbeatTick = 1 + + conf.MaxTaskConcurrency = -1 + require.Error(t, conf.ValidateAndAdjust()) + conf.MaxTaskConcurrency = 0 + require.Error(t, conf.ValidateAndAdjust()) +} diff --git a/proto/table_schedule.proto b/proto/table_schedule.proto index 08f768107c9..e52c96bb69d 100644 --- a/proto/table_schedule.proto +++ b/proto/table_schedule.proto @@ -41,32 +41,28 @@ message RemoveTableRequest { } message DispatchTableRequest { - AddTableRequest add_table = 1; - RemoveTableRequest remove_table = 2; + oneof request { + AddTableRequest add_table = 1; + RemoveTableRequest remove_table = 2; + } } message AddTableResponse { - int64 table_id = 1 [ - (gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.TableID", - (gogoproto.customname) = "TableID" - ]; - TableStatus status = 2; - Checkpoint checkpoint = 3; - bool reject = 4; + TableStatus status = 1; + Checkpoint checkpoint = 2; + bool reject = 3; } message RemoveTableResponse { - int64 table_id = 1 [ - (gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.TableID", - (gogoproto.customname) = "TableID" - ]; - TableStatus status = 2; - Checkpoint checkpoint = 3; + TableStatus status = 1; + Checkpoint checkpoint = 2; } message DispatchTableResponse { - AddTableResponse add_table = 1; - RemoveTableResponse remove_table = 2; + oneof response { + AddTableResponse add_table = 1; + RemoveTableResponse remove_table = 2; + } } message Heartbeat {} @@ -132,5 +128,8 @@ message Message { DispatchTableResponse dispatch_table_response = 6; Heartbeat heartbeat = 7; HeartbeatResponse heartbeat_response = 8; - Checkpoint checkpoint = 9; + map checkpoints = 9 [ + (gogoproto.castkey) = "github.com/pingcap/tiflow/cdc/model.TableID", + (gogoproto.nullable) = false + ]; }