From d84f7fe6cdb6d33464e5cdcee36a8418a535d681 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 1 Jun 2022 21:42:20 +0800 Subject: [PATCH] tp: support per-table checkpoint (#5709) Signed-off-by: Neil Shen --- cdc/scheduler/internal/tp/agent.go | 6 +- cdc/scheduler/internal/tp/agent_test.go | 7 +- cdc/scheduler/internal/tp/capture_manager.go | 4 - cdc/scheduler/internal/tp/coordinator.go | 31 ++- cdc/scheduler/internal/tp/coordinator_test.go | 190 +++++++++++++- .../internal/tp/replication_manager.go | 44 +++- .../internal/tp/replication_manager_test.go | 21 +- cdc/scheduler/internal/tp/replication_set.go | 100 +++++--- .../internal/tp/replication_set_test.go | 132 ++++++++-- .../tp/schedulepb/table_schedule.pb.go | 242 ++++++++---------- proto/table_schedule.proto | 6 +- 11 files changed, 530 insertions(+), 253 deletions(-) diff --git a/cdc/scheduler/internal/tp/agent.go b/cdc/scheduler/internal/tp/agent.go index 86736edc871..838fbdcc54f 100644 --- a/cdc/scheduler/internal/tp/agent.go +++ b/cdc/scheduler/internal/tp/agent.go @@ -311,7 +311,7 @@ func (a *agent) handleMessageDispatchTableRequest( } task = &dispatchTableTask{ TableID: req.AddTable.GetTableID(), - StartTs: req.AddTable.GetCheckpoint().GetCheckpointTs(), + StartTs: req.AddTable.GetCheckpoint().CheckpointTs, IsRemove: false, IsPrepare: req.AddTable.GetIsSecondary(), Epoch: epoch, @@ -387,7 +387,7 @@ func (a *agent) newRemoveTableResponseMessage( Response: &schedulepb.DispatchTableResponse_RemoveTable{ RemoveTable: &schedulepb.RemoveTableResponse{ Status: &status, - Checkpoint: &status.Checkpoint, + Checkpoint: status.Checkpoint, }, }, }, @@ -453,7 +453,7 @@ func (a *agent) newAddTableResponseMessage( Response: &schedulepb.DispatchTableResponse_AddTable{ AddTable: &schedulepb.AddTableResponse{ Status: &status, - Checkpoint: &status.Checkpoint, + Checkpoint: status.Checkpoint, Reject: reject, }, }, diff --git a/cdc/scheduler/internal/tp/agent_test.go b/cdc/scheduler/internal/tp/agent_test.go index af6bd150ce3..8b84b79cd02 100644 --- a/cdc/scheduler/internal/tp/agent_test.go +++ b/cdc/scheduler/internal/tp/agent_test.go @@ -77,7 +77,6 @@ func TestAgentCollectTableStatus(t *testing.T) { a.runningTasks[model.TableID(0)] = &dispatchTableTask{IsRemove: true} status := a.newTableStatus(model.TableID(0)) require.Equal(t, schedulepb.TableStateStopping, status.State) - } func TestAgentHandleDispatchTableTask(t *testing.T) { @@ -221,13 +220,13 @@ func TestAgentHandleMessageStopping(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{}, }, }, }, } // add table request should not be handled, so the running task count is 0. response = a.handleMessage([]*schedulepb.Message{addTableRequest}) + require.Len(t, response, 0) require.Len(t, a.runningTasks, 0) // mock agent have running task before stopping but processed yet. @@ -288,7 +287,7 @@ func TestAgentHandleMessage(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{}, + Checkpoint: schedulepb.Checkpoint{}, }, }, }, @@ -392,7 +391,7 @@ func TestAgentTick(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{}, + Checkpoint: schedulepb.Checkpoint{}, }, }, }, diff --git a/cdc/scheduler/internal/tp/capture_manager.go b/cdc/scheduler/internal/tp/capture_manager.go index e0f78f31a64..4bb4d5a250e 100644 --- a/cdc/scheduler/internal/tp/capture_manager.go +++ b/cdc/scheduler/internal/tp/capture_manager.go @@ -100,10 +100,6 @@ func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *capture } } -func (c *captureManager) CaptureTableSets() map[model.CaptureID]*CaptureStatus { - return c.Captures -} - func (c *captureManager) CheckAllCaptureInitialized() bool { if !c.checkAllCaptureInitialized() { return false diff --git a/cdc/scheduler/internal/tp/coordinator.go b/cdc/scheduler/internal/tp/coordinator.go index 9fa3f0aa347..f9787cc1e6d 100644 --- a/cdc/scheduler/internal/tp/coordinator.go +++ b/cdc/scheduler/internal/tp/coordinator.go @@ -27,6 +27,8 @@ import ( "go.uber.org/zap" ) +const checkpointCannotProceed = internal.CheckpointCannotProceed + type scheduler interface { Name() string Schedule( @@ -82,11 +84,7 @@ func (c *coordinator) Tick( // All captures that are alive according to the latest Etcd states. aliveCaptures map[model.CaptureID]*model.CaptureInfo, ) (newCheckpointTs, newResolvedTs model.Ts, err error) { - err = c.poll(ctx, checkpointTs, currentTables, aliveCaptures) - if err != nil { - return internal.CheckpointCannotProceed, internal.CheckpointCannotProceed, errors.Trace(err) - } - return internal.CheckpointCannotProceed, internal.CheckpointCannotProceed, nil + return c.poll(ctx, checkpointTs, currentTables, aliveCaptures) } func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {} @@ -100,10 +98,10 @@ func (c *coordinator) Close(ctx context.Context) {} func (c *coordinator) poll( ctx context.Context, checkpointTs model.Ts, currentTables []model.TableID, aliveCaptures map[model.CaptureID]*model.CaptureInfo, -) error { +) (newCheckpointTs, newResolvedTs model.Ts, err error) { recvMsgs, err := c.recvMsgs(ctx) if err != nil { - return errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } var msgBuf []*schedulepb.Message @@ -112,17 +110,17 @@ func (c *coordinator) poll( msgBuf = append(msgBuf, msgs...) msgs = c.captureM.HandleAliveCaptureUpdate(aliveCaptures) msgBuf = append(msgBuf, msgs...) - if c.captureM.CheckAllCaptureInitialized() { + if !c.captureM.CheckAllCaptureInitialized() { // Skip handling messages and tasks for replication manager, // as not all capture are initialized. - return c.sendMsgs(ctx, msgBuf) + return checkpointCannotProceed, checkpointCannotProceed, c.sendMsgs(ctx, msgBuf) } // Handle capture membership changes. if changes := c.captureM.TakeChanges(); changes != nil { - msgs, err = c.replicationM.HandleCaptureChanges(changes) + msgs, err = c.replicationM.HandleCaptureChanges(changes, checkpointTs) if err != nil { - return errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } msgBuf = append(msgBuf, msgs...) } @@ -130,7 +128,7 @@ func (c *coordinator) poll( // Handle received messages to advance replication set. msgs, err = c.replicationM.HandleMessage(recvMsgs) if err != nil { - return errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } msgBuf = append(msgBuf, msgs...) @@ -150,18 +148,19 @@ func (c *coordinator) poll( // Handle generated schedule tasks. msgs, err = c.replicationM.HandleTasks(allTasks) if err != nil { - return errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } msgBuf = append(msgBuf, msgs...) // Send new messages. err = c.sendMsgs(ctx, msgBuf) if err != nil { - return errors.Trace(err) + return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err) } - // checkpoint calculation - return nil + // Checkpoint calculation + newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables) + return newCheckpointTs, newResolvedTs, nil } func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) { diff --git a/cdc/scheduler/internal/tp/coordinator_test.go b/cdc/scheduler/internal/tp/coordinator_test.go index 0db032be2c7..4a7dabecbb7 100644 --- a/cdc/scheduler/internal/tp/coordinator_test.go +++ b/cdc/scheduler/internal/tp/coordinator_test.go @@ -15,15 +15,27 @@ package tp import ( "context" + "fmt" + "math" "testing" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" + "github.com/pingcap/tiflow/pkg/leakutil" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" ) +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} + type mockTrans struct { sendBuffer []*schedulepb.Message recvBuffer []*schedulepb.Message + + keepRecvBuffer bool } func newMockTrans() *mockTrans { @@ -43,6 +55,9 @@ func (m *mockTrans) Send(ctx context.Context, msgs []*schedulepb.Message) error } func (m *mockTrans) Recv(ctx context.Context) ([]*schedulepb.Message, error) { + if m.keepRecvBuffer { + return m.recvBuffer, nil + } messages := m.recvBuffer[:len(m.recvBuffer)] m.recvBuffer = make([]*schedulepb.Message, 0) return messages, nil @@ -74,7 +89,7 @@ func TestCoordinatorRecvMsgs(t *testing.T) { ctx := context.Background() trans := &mockTrans{} - cood := coordinator{ + coord := coordinator{ version: "6.2.0", revision: schedulepb.OwnerRevision{Revision: 3}, trans: trans, @@ -83,7 +98,7 @@ func TestCoordinatorRecvMsgs(t *testing.T) { trans.recvBuffer = append(trans.recvBuffer, &schedulepb.Message{ Header: &schedulepb.Message_Header{ - OwnerRevision: cood.revision, + OwnerRevision: coord.revision, }, From: "1", MsgType: schedulepb.MsgDispatchTableResponse, }) @@ -95,13 +110,178 @@ func TestCoordinatorRecvMsgs(t *testing.T) { From: "2", MsgType: schedulepb.MsgDispatchTableResponse, }) - msgs, err := cood.recvMsgs(ctx) + msgs, err := coord.recvMsgs(ctx) require.NoError(t, err) - require.EqualValues(t, []*schedulepb.Message{{ Header: &schedulepb.Message_Header{ - OwnerRevision: cood.revision, + OwnerRevision: coord.revision, }, From: "1", MsgType: schedulepb.MsgDispatchTableResponse, }}, msgs) } + +func TestCoordinatorInit(t *testing.T) { +} + +func TestCoordinatorHeartbeat(t *testing.T) { +} + +func TestCoordinatorHeartbeatResponse(t *testing.T) { +} + +func TestCoordinatorAddCapture(t *testing.T) { +} + +func TestCoordinatorRemoveCapture(t *testing.T) { +} + +func benchmarkCoordinator( + b *testing.B, + factory func(total int) ( + name string, + coord *coordinator, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + ), +) { + log.SetLevel(zapcore.DPanicLevel) + ctx := context.Background() + size := 16384 + for total := 1; total <= size; total *= 2 { + name, coord, currentTables, captures := factory(total) + b.ResetTimer() + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + coord.poll(ctx, 0, currentTables, captures) + } + }) + b.StopTimer() + } +} + +func BenchmarkCoordinatorInit(b *testing.B) { + benchmarkCoordinator(b, func(total int) ( + name string, + coord *coordinator, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + ) { + const captureCount = 8 + captures = map[model.CaptureID]*model.CaptureInfo{} + for i := 0; i < captureCount; i++ { + captures[fmt.Sprint(i)] = &model.CaptureInfo{} + } + currentTables = make([]model.TableID, 0, total) + for i := 0; i < total; i++ { + currentTables = append(currentTables, int64(10000+i)) + } + coord = &coordinator{ + trans: &mockTrans{}, + scheduler: []scheduler{newBalancer()}, + replicationM: newReplicationManager(10), + // Disable heartbeat. + captureM: newCaptureManager(schedulepb.OwnerRevision{}, math.MaxInt), + } + name = fmt.Sprintf("InitTable %d", total) + return name, coord, currentTables, captures + }) +} + +func BenchmarkCoordinatorHeartbeat(b *testing.B) { + benchmarkCoordinator(b, func(total int) ( + name string, + coord *coordinator, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + ) { + const captureCount = 8 + captures = map[model.CaptureID]*model.CaptureInfo{} + // Always heartbeat. + captureM := newCaptureManager(schedulepb.OwnerRevision{}, 0) + captureM.initialized = true + for i := 0; i < captureCount; i++ { + captures[fmt.Sprint(i)] = &model.CaptureInfo{} + captureM.Captures[fmt.Sprint(i)] = &CaptureStatus{State: CaptureStateInitialized} + } + currentTables = make([]model.TableID, 0, total) + for i := 0; i < total; i++ { + currentTables = append(currentTables, int64(10000+i)) + } + coord = &coordinator{ + trans: &mockTrans{}, + scheduler: []scheduler{}, + replicationM: newReplicationManager(10), + captureM: captureM, + } + name = fmt.Sprintf("Heartbeat %d", total) + return name, coord, currentTables, captures + }) +} + +func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) { + benchmarkCoordinator(b, func(total int) ( + name string, + coord *coordinator, + currentTables []model.TableID, + captures map[model.CaptureID]*model.CaptureInfo, + ) { + const captureCount = 8 + captures = map[model.CaptureID]*model.CaptureInfo{} + // Disable heartbeat. + captureM := newCaptureManager(schedulepb.OwnerRevision{}, math.MaxInt) + captureM.initialized = true + for i := 0; i < captureCount; i++ { + captures[fmt.Sprint(i)] = &model.CaptureInfo{} + captureM.Captures[fmt.Sprint(i)] = &CaptureStatus{State: CaptureStateInitialized} + } + replicationM := newReplicationManager(10) + currentTables = make([]model.TableID, 0, total) + heartbeatResp := make(map[model.CaptureID]*schedulepb.Message) + for i := 0; i < total; i++ { + tableID := int64(10000 + i) + currentTables = append(currentTables, tableID) + captureID := fmt.Sprint(i % captureCount) + rep, err := newReplicationSet(tableID, 0, map[string]*schedulepb.TableStatus{ + captureID: { + TableID: tableID, + State: schedulepb.TableStateReplicating, + }, + }) + if err != nil { + b.Fatal(err) + } + replicationM.tables[tableID] = rep + _, ok := heartbeatResp[captureID] + if !ok { + heartbeatResp[captureID] = &schedulepb.Message{ + Header: &schedulepb.Message_Header{}, + From: captureID, + MsgType: schedulepb.MsgHeartbeatResponse, + HeartbeatResponse: &schedulepb.HeartbeatResponse{}, + } + } + heartbeatResp[captureID].HeartbeatResponse.Tables = append( + heartbeatResp[captureID].HeartbeatResponse.Tables, + schedulepb.TableStatus{ + TableID: tableID, + State: schedulepb.TableStateReplicating, + }) + } + recvMsgs := make([]*schedulepb.Message, 0, len(heartbeatResp)) + for _, resp := range heartbeatResp { + recvMsgs = append(recvMsgs, resp) + } + trans := &mockTrans{ + recvBuffer: recvMsgs, + keepRecvBuffer: true, + } + coord = &coordinator{ + trans: trans, + scheduler: []scheduler{}, + replicationM: replicationM, + captureM: captureM, + } + name = fmt.Sprintf("HeartbeatResponse %d", total) + return name, coord, currentTables, captures + }) +} diff --git a/cdc/scheduler/internal/tp/replication_manager.go b/cdc/scheduler/internal/tp/replication_manager.go index 8aa99c21f92..555b6b77aff 100644 --- a/cdc/scheduler/internal/tp/replication_manager.go +++ b/cdc/scheduler/internal/tp/replication_manager.go @@ -14,6 +14,8 @@ package tp import ( + "math" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -28,6 +30,7 @@ type callback func() type burstBalance struct { // Add tables to captures AddTables, RemoveTables map[model.TableID]model.CaptureID + checkpointTs model.Ts } type moveTable struct { @@ -36,8 +39,9 @@ type moveTable struct { } type addTable struct { - TableID model.TableID - CaptureID model.CaptureID + TableID model.TableID + CaptureID model.CaptureID + CheckpointTs model.Ts } type removeTable struct { @@ -70,7 +74,7 @@ func newReplicationManager(maxTaskConcurrency int) *replicationManager { } func (r *replicationManager) HandleCaptureChanges( - changes *captureChanges, + changes *captureChanges, checkpointTs model.Ts, ) ([]*schedulepb.Message, error) { if changes.Init != nil { if len(r.tables) != 0 { @@ -88,7 +92,7 @@ func (r *replicationManager) HandleCaptureChanges( } } for tableID, status := range tableStatus { - table, err := newReplicationSet(tableID, status) + table, err := newReplicationSet(tableID, checkpointTs, status) if err != nil { return nil, errors.Trace(err) } @@ -141,8 +145,7 @@ func (r *replicationManager) handleMessageHeartbeatResponse( from model.CaptureID, msg *schedulepb.HeartbeatResponse, ) ([]*schedulepb.Message, error) { sentMsgs := make([]*schedulepb.Message, 0) - for i := range msg.Tables { - status := msg.Tables[i] + for _, status := range msg.Tables { table, ok := r.tables[status.TableID] if !ok { log.Info("tpscheduler: ignore table status no table found", @@ -283,7 +286,7 @@ func (r *replicationManager) handleAddTableTask( var err error table := r.tables[task.TableID] if table == nil { - table, err = newReplicationSet(task.TableID, nil) + table, err = newReplicationSet(task.TableID, task.CheckpointTs, nil) if err != nil { return nil, errors.Trace(err) } @@ -366,3 +369,30 @@ func (r *replicationManager) handleBurstBalanceTasks( func (r *replicationManager) ReplicationSets() map[model.TableID]*ReplicationSet { return r.tables } + +func (r *replicationManager) AdvanceCheckpoint( + currentTables []model.TableID, +) (newCheckpointTs, newResolvedTs model.Ts) { + newCheckpointTs, newResolvedTs = math.MaxUint64, math.MaxUint64 + checkpointChanged := false + for _, tableID := range currentTables { + table, ok := r.tables[tableID] + if !ok { + // Can not advance checkpoint there is a table missing. + return checkpointCannotProceed, checkpointCannotProceed + } + // Find the minimum checkpoint ts and resolved ts. + if newCheckpointTs > table.Checkpoint.CheckpointTs { + newCheckpointTs = table.Checkpoint.CheckpointTs + checkpointChanged = true + } + if newResolvedTs > table.Checkpoint.ResolvedTs { + newResolvedTs = table.Checkpoint.ResolvedTs + checkpointChanged = true + } + } + if !checkpointChanged { + return checkpointCannotProceed, checkpointCannotProceed + } + return newCheckpointTs, newResolvedTs +} diff --git a/cdc/scheduler/internal/tp/replication_manager_test.go b/cdc/scheduler/internal/tp/replication_manager_test.go index aa0032e5898..04e7aae0988 100644 --- a/cdc/scheduler/internal/tp/replication_manager_test.go +++ b/cdc/scheduler/internal/tp/replication_manager_test.go @@ -44,7 +44,6 @@ func TestReplicationManagerHandleAddTableTask(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, }, }, }, @@ -85,7 +84,6 @@ func TestReplicationManagerHandleAddTableTask(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, }, }, }, @@ -127,13 +125,13 @@ func TestReplicationManagerRemoveTable(t *testing.T) { // Ignore remove table if there is no such table. msgs, err := r.HandleTasks([]*scheduleTask{{ removeTable: &removeTable{TableID: 1, CaptureID: "1"}, - accept: func() { t.Fatal("must not accpet") }, + accept: func() { t.Fatal("must not accept") }, }}) require.Nil(t, err) require.Len(t, msgs, 0) // Add the table. - tbl, err := newReplicationSet(1, map[string]*schedulepb.TableStatus{ + tbl, err := newReplicationSet(1, 0, map[string]*schedulepb.TableStatus{ "1": {TableID: 1, State: schedulepb.TableStateReplicating}, }) require.Nil(t, err) @@ -222,13 +220,13 @@ func TestReplicationManagerMoveTable(t *testing.T) { // Ignore move table if it's not exist. msgs, err := r.HandleTasks([]*scheduleTask{{ moveTable: &moveTable{TableID: 1, DestCapture: dest}, - accept: func() { t.Fatal("must not accpet") }, + accept: func() { t.Fatal("must not accept") }, }}) require.Nil(t, err) require.Len(t, msgs, 0) // Add the table. - tbl, err := newReplicationSet(1, map[string]*schedulepb.TableStatus{ + tbl, err := newReplicationSet(1, 0, map[string]*schedulepb.TableStatus{ source: {TableID: 1, State: schedulepb.TableStateReplicating}, }) require.Nil(t, err) @@ -253,7 +251,6 @@ func TestReplicationManagerMoveTable(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, }, }, }, @@ -321,7 +318,6 @@ func TestReplicationManagerMoveTable(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, }, }, }, @@ -385,7 +381,6 @@ func TestReplicationManagerBurstBalance(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: tableID, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, }, }, }, @@ -395,7 +390,7 @@ func TestReplicationManagerBurstBalance(t *testing.T) { } // Add a new table. - r.tables[5], err = newReplicationSet(5, map[string]*schedulepb.TableStatus{ + r.tables[5], err = newReplicationSet(5, 0, map[string]*schedulepb.TableStatus{ "5": {TableID: 5, State: schedulepb.TableStateReplicating}, }) require.Nil(t, err) @@ -421,7 +416,6 @@ func TestReplicationManagerBurstBalance(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 4, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, }, }, }, @@ -462,7 +456,6 @@ func TestReplicationManagerMaxTaskConcurrency(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: 1, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: 0}, }, }, }, @@ -494,7 +487,7 @@ func TestReplicationManagerHandleCaptureChanges(t *testing.T) { }, "4": {{TableID: 4, State: schedulepb.TableStateStopping}}, }} - msgs, err := r.HandleCaptureChanges(&changes) + msgs, err := r.HandleCaptureChanges(&changes, 0) require.Nil(t, err) require.Len(t, msgs, 0) require.Len(t, r.tables, 4) @@ -506,7 +499,7 @@ func TestReplicationManagerHandleCaptureChanges(t *testing.T) { changes = captureChanges{Removed: map[string][]schedulepb.TableStatus{ "1": {{TableID: 1, State: schedulepb.TableStateReplicating}}, }} - msgs, err = r.HandleCaptureChanges(&changes) + msgs, err = r.HandleCaptureChanges(&changes, 0) require.Nil(t, err) require.Len(t, msgs, 0) require.Len(t, r.tables, 4) diff --git a/cdc/scheduler/internal/tp/replication_set.go b/cdc/scheduler/internal/tp/replication_set.go index 6eeeb6e3c7e..4d924c30adb 100644 --- a/cdc/scheduler/internal/tp/replication_set.go +++ b/cdc/scheduler/internal/tp/replication_set.go @@ -36,7 +36,8 @@ import ( // │ Commit ├──>│ Replicating │────────────>│ Removing │ // └────────┘ └─────────────┘ └──────────┘ // -// When a capture shutdown unexpectly, we transit the state to Absent immediately. +// When a capture shutdown unexpectedly, we may need to transit the state to +// Absent or Replicating immediately. type ReplicationSetState int const ( @@ -76,23 +77,27 @@ func (r ReplicationSetState) String() string { // ReplicationSet is a state machine that manages replication states. type ReplicationSet struct { - TableID model.TableID - State ReplicationSetState - Primary model.CaptureID - Secondary model.CaptureID - Captures map[model.CaptureID]struct{} - CheckpointTs model.Ts + TableID model.TableID + State ReplicationSetState + Primary model.CaptureID + Secondary model.CaptureID + Captures map[model.CaptureID]struct{} + Checkpoint schedulepb.Checkpoint } func newReplicationSet( - tableID model.TableID, tableStatus map[model.CaptureID]*schedulepb.TableStatus, + tableID model.TableID, + checkpoint model.Ts, + tableStatus map[model.CaptureID]*schedulepb.TableStatus, ) (*ReplicationSet, error) { - r := &ReplicationSet{TableID: tableID, Captures: make(map[string]struct{})} + r := &ReplicationSet{ + TableID: tableID, + Captures: make(map[string]struct{}), + Checkpoint: schedulepb.Checkpoint{CheckpointTs: checkpoint}, + } committed := false for captureID, table := range tableStatus { - if r.CheckpointTs <= table.Checkpoint.CheckpointTs { - r.CheckpointTs = table.Checkpoint.CheckpointTs - } + r.updateCheckpoint(table.Checkpoint) if r.TableID != table.TableID { return nil, r.inconsistentError(table, captureID, "tpscheduler: table id inconsistent") @@ -290,7 +295,7 @@ func (r *ReplicationSet) pollOnPrepare( AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -309,6 +314,7 @@ func (r *ReplicationSet) pollOnPrepare( } case schedulepb.TableStateReplicating: if r.Primary == captureID { + r.updateCheckpoint(input.Checkpoint) return nil, false, nil } case schedulepb.TableStateStopping, schedulepb.TableStateStopped: @@ -383,7 +389,7 @@ func (r *ReplicationSet) pollOnCommit( AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -391,6 +397,7 @@ func (r *ReplicationSet) pollOnCommit( } case schedulepb.TableStateStopped, schedulepb.TableStateAbsent: if r.Primary == captureID { + r.updateCheckpoint(input.Checkpoint) original := r.Primary delete(r.Captures, r.Primary) r.Primary = "" @@ -418,7 +425,7 @@ func (r *ReplicationSet) pollOnCommit( AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -437,20 +444,23 @@ func (r *ReplicationSet) pollOnCommit( } case schedulepb.TableStateReplicating: - if r.Secondary != "" && r.Primary == captureID { - // Original primary is not stopped, ask for stopping. - return &schedulepb.Message{ - To: captureID, - MsgType: schedulepb.MsgDispatchTableRequest, - DispatchTableRequest: &schedulepb.DispatchTableRequest{ - Request: &schedulepb.DispatchTableRequest_RemoveTable{ - RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, + if r.Primary == captureID { + r.updateCheckpoint(input.Checkpoint) + if r.Secondary != "" { + // Original primary is not stopped, ask for stopping. + return &schedulepb.Message{ + To: captureID, + MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{TableID: r.TableID}, + }, }, - }, - }, false, nil - } - if r.Secondary == "" && r.Primary == captureID { - // There are three cases, + }, false, nil + } + + // There are three cases for empty secondary. + // // 1. Secondary has promoted to primary, and the new primary is // replicating, transit to Replicating. // 2. Secondary has shutdown during Commit, the original primary @@ -468,6 +478,7 @@ func (r *ReplicationSet) pollOnCommit( case schedulepb.TableStateStopping: if r.Primary == captureID && r.Secondary != "" { + r.updateCheckpoint(input.Checkpoint) return nil, false, nil } case schedulepb.TableStatePreparing: @@ -485,6 +496,7 @@ func (r *ReplicationSet) pollOnReplicating( switch input.State { case schedulepb.TableStateReplicating: if r.Primary == captureID { + r.updateCheckpoint(input.Checkpoint) return nil, false, nil } return nil, false, r.multiplePrimaryError( @@ -496,6 +508,8 @@ func (r *ReplicationSet) pollOnReplicating( case schedulepb.TableStateStopping: case schedulepb.TableStateStopped: if r.Primary == captureID { + r.updateCheckpoint(input.Checkpoint) + // Primary is stopped, but we still has secondary. // Clear primary and promote secondary when it's prepared. log.Info("tpscheduler: primary is stopped during Replicating", @@ -572,12 +586,12 @@ func (r *ReplicationSet) handleAddTable( log.Info("tpscheduler: replication state transition, add table", zap.Stringer("old", oldState), zap.Stringer("new", r.State)) r.Captures[captureID] = struct{}{} - status := &schedulepb.TableStatus{ + status := schedulepb.TableStatus{ TableID: r.TableID, State: schedulepb.TableStateAbsent, Checkpoint: schedulepb.Checkpoint{}, } - return r.poll(status, captureID) + return r.poll(&status, captureID) } func (r *ReplicationSet) handleMoveTable( @@ -601,12 +615,12 @@ func (r *ReplicationSet) handleMoveTable( zap.Stringer("old", oldState), zap.Stringer("new", r.State)) r.Secondary = dest r.Captures[dest] = struct{}{} - status := &schedulepb.TableStatus{ + status := schedulepb.TableStatus{ TableID: r.TableID, State: schedulepb.TableStateAbsent, Checkpoint: schedulepb.Checkpoint{}, } - return r.poll(status, r.Secondary) + return r.poll(&status, r.Secondary) } func (r *ReplicationSet) handleRemoveTable() ([]*schedulepb.Message, error) { @@ -626,12 +640,15 @@ func (r *ReplicationSet) handleRemoveTable() ([]*schedulepb.Message, error) { r.State = ReplicationSetStateRemoving log.Info("tpscheduler: replication state transition, remove table", zap.Stringer("old", oldState), zap.Stringer("new", r.State)) - status := &schedulepb.TableStatus{ - TableID: r.TableID, - State: schedulepb.TableStateReplicating, - Checkpoint: schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + status := schedulepb.TableStatus{ + TableID: r.TableID, + State: schedulepb.TableStateReplicating, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: r.Checkpoint.CheckpointTs, + ResolvedTs: r.Checkpoint.ResolvedTs, + }, } - return r.poll(status, r.Primary) + return r.poll(&status, r.Primary) } func (r *ReplicationSet) hasRemoved() bool { @@ -654,3 +671,12 @@ func (r *ReplicationSet) handleCaptureShutdown( } return r.poll(&status, captureID) } + +func (r *ReplicationSet) updateCheckpoint(checkpoint schedulepb.Checkpoint) { + if r.Checkpoint.CheckpointTs < checkpoint.CheckpointTs { + r.Checkpoint.CheckpointTs = checkpoint.CheckpointTs + } + if r.Checkpoint.ResolvedTs < checkpoint.ResolvedTs { + r.Checkpoint.ResolvedTs = checkpoint.ResolvedTs + } +} diff --git a/cdc/scheduler/internal/tp/replication_set_test.go b/cdc/scheduler/internal/tp/replication_set_test.go index b8f933e506f..2d0d48da53f 100644 --- a/cdc/scheduler/internal/tp/replication_set_test.go +++ b/cdc/scheduler/internal/tp/replication_set_test.go @@ -51,6 +51,7 @@ func iterPermutation(sequence []int, fn func(sequence []int)) { func TestNewReplicationSet(t *testing.T) { testcases := []struct { set *ReplicationSet + checkpoint model.Ts tableStatus map[model.CaptureID]*schedulepb.TableStatus }{ { @@ -65,11 +66,17 @@ func TestNewReplicationSet(t *testing.T) { Primary: "1", State: ReplicationSetStateReplicating, Captures: map[string]struct{}{"1": {}}, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: 2, ResolvedTs: 2, + }, }, + checkpoint: 2, tableStatus: map[model.CaptureID]*schedulepb.TableStatus{ "1": { - State: schedulepb.TableStateReplicating, - Checkpoint: schedulepb.Checkpoint{}, + State: schedulepb.TableStateReplicating, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: 1, ResolvedTs: 2, + }, }, }, }, @@ -90,19 +97,20 @@ func TestNewReplicationSet(t *testing.T) { { // Rebuild move table state, Prepare. set: &ReplicationSet{ - State: ReplicationSetStatePrepare, - Primary: "2", - Secondary: "1", - Captures: map[string]struct{}{"1": {}, "2": {}}, + State: ReplicationSetStatePrepare, + Primary: "2", + Secondary: "1", + Captures: map[string]struct{}{"1": {}, "2": {}}, + Checkpoint: schedulepb.Checkpoint{CheckpointTs: 2}, }, tableStatus: map[model.CaptureID]*schedulepb.TableStatus{ "1": { State: schedulepb.TableStatePreparing, - Checkpoint: schedulepb.Checkpoint{}, + Checkpoint: schedulepb.Checkpoint{CheckpointTs: 1}, }, "2": { State: schedulepb.TableStateReplicating, - Checkpoint: schedulepb.Checkpoint{}, + Checkpoint: schedulepb.Checkpoint{CheckpointTs: 2}, }, }, }, @@ -196,8 +204,9 @@ func TestNewReplicationSet(t *testing.T) { for id, tc := range testcases { set := tc.set status := tc.tableStatus + checkpoint := tc.checkpoint - output, err := newReplicationSet(0, status) + output, err := newReplicationSet(0, checkpoint, status) if set == nil { require.Error(t, err) } else { @@ -241,7 +250,7 @@ func TestReplicationSetPoll(t *testing.T) { Checkpoint: schedulepb.Checkpoint{}, } } - r, _ := newReplicationSet(1, status) + r, _ := newReplicationSet(1, 0, status) var tableStates []int for state := range schedulepb.TableState_name { tableStates = append(tableStates, int(state)) @@ -272,7 +281,7 @@ func TestReplicationSetPollUnknownCapture(t *testing.T) { t.Parallel() tableID := model.TableID(1) - r, err := newReplicationSet(tableID, map[model.CaptureID]*schedulepb.TableStatus{ + r, err := newReplicationSet(tableID, 0, map[model.CaptureID]*schedulepb.TableStatus{ "1": { TableID: tableID, State: schedulepb.TableStateReplicating, @@ -294,7 +303,7 @@ func TestReplicationSetAddTable(t *testing.T) { from := "1" tableID := model.TableID(1) - r, err := newReplicationSet(tableID, nil) + r, err := newReplicationSet(tableID, 0, nil) require.Nil(t, err) // Absent -> Prepare @@ -309,7 +318,7 @@ func TestReplicationSetAddTable(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -337,7 +346,7 @@ func TestReplicationSetAddTable(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -370,7 +379,7 @@ func TestReplicationSetAddTable(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -389,6 +398,25 @@ func TestReplicationSetAddTable(t *testing.T) { require.Equal(t, ReplicationSetStateReplicating, r.State) require.Equal(t, from, r.Primary) require.Equal(t, "", r.Secondary) + + // Replicating -> Replicating + msgs, err = r.handleTableStatus(from, &schedulepb.TableStatus{ + TableID: tableID, + State: schedulepb.TableStateReplicating, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 4, + }, + }) + require.Nil(t, err) + require.Len(t, msgs, 0) + require.Equal(t, ReplicationSetStateReplicating, r.State) + require.Equal(t, from, r.Primary) + require.Equal(t, "", r.Secondary) + require.Equal(t, schedulepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 4, + }, r.Checkpoint) } func TestReplicationSetRemoveTable(t *testing.T) { @@ -396,7 +424,7 @@ func TestReplicationSetRemoveTable(t *testing.T) { from := "1" tableID := model.TableID(1) - r, err := newReplicationSet(tableID, nil) + r, err := newReplicationSet(tableID, 0, nil) require.Nil(t, err) // Ignore removing table if it's not in replicating. @@ -474,7 +502,7 @@ func TestReplicationSetMoveTable(t *testing.T) { t.Parallel() tableID := model.TableID(1) - r, err := newReplicationSet(tableID, nil) + r, err := newReplicationSet(tableID, 0, nil) require.Nil(t, err) source := "1" @@ -504,7 +532,7 @@ func TestReplicationSetMoveTable(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -518,6 +546,22 @@ func TestReplicationSetMoveTable(t *testing.T) { require.Nil(t, err) require.Len(t, msgs, 0) + // Source primary sends heartbeat response + msgs, err = r.handleTableStatus(source, &schedulepb.TableStatus{ + TableID: tableID, + State: schedulepb.TableStateReplicating, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: 1, + ResolvedTs: 1, + }, + }) + require.Nil(t, err) + require.Len(t, msgs, 0) + require.Equal(t, schedulepb.Checkpoint{ + CheckpointTs: 1, + ResolvedTs: 1, + }, r.Checkpoint) + // AddTableRequest is lost somehow, send AddTableRequest again. msgs, err = r.handleTableStatus(dest, &schedulepb.TableStatus{ TableID: tableID, @@ -533,7 +577,7 @@ func TestReplicationSetMoveTable(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -565,6 +609,10 @@ func TestReplicationSetMoveTable(t *testing.T) { msgs, err = r.handleTableStatus(source, &schedulepb.TableStatus{ TableID: tableID, State: schedulepb.TableStateReplicating, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: 2, + ResolvedTs: 3, + }, }) require.Nil(t, err) require.Len(t, msgs, 1, "%v", r) @@ -580,23 +628,39 @@ func TestReplicationSetMoveTable(t *testing.T) { require.Equal(t, ReplicationSetStateCommit, r.State) require.Equal(t, source, r.Primary) require.Equal(t, dest, r.Secondary) + require.Equal(t, schedulepb.Checkpoint{ + CheckpointTs: 2, + ResolvedTs: 3, + }, r.Checkpoint) // Removing source is in-progress. msgs, err = r.handleTableStatus(source, &schedulepb.TableStatus{ TableID: tableID, State: schedulepb.TableStateStopping, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 3, + }, }) require.Nil(t, err) require.Len(t, msgs, 0) require.Equal(t, ReplicationSetStateCommit, r.State) require.Equal(t, source, r.Primary) require.Equal(t, dest, r.Secondary) + require.Equal(t, schedulepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 3, + }, r.Checkpoint) // Source is removed. rClone := clone(r) msgs, err = r.handleTableStatus(source, &schedulepb.TableStatus{ TableID: tableID, State: schedulepb.TableStateStopped, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 4, + }, }) require.Nil(t, err) require.Len(t, msgs, 1) @@ -608,7 +672,7 @@ func TestReplicationSetMoveTable(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -616,8 +680,13 @@ func TestReplicationSetMoveTable(t *testing.T) { require.Equal(t, ReplicationSetStateCommit, r.State) require.Equal(t, dest, r.Primary) require.Equal(t, "", r.Secondary) + require.Equal(t, schedulepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 4, + }, r.Checkpoint) // Source stopped message is lost somehow. + // rClone has checkpoint ts 3, resolved ts 3 msgs, err = rClone.handleTableStatus(source, &schedulepb.TableStatus{ TableID: tableID, State: schedulepb.TableStateAbsent, @@ -632,14 +701,21 @@ func TestReplicationSetMoveTable(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: schedulepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 3, + }, }, }, }, }, msgs[0]) - require.Equal(t, ReplicationSetStateCommit, r.State) - require.Equal(t, dest, r.Primary) - require.Equal(t, "", r.Secondary) + require.Equal(t, ReplicationSetStateCommit, rClone.State) + require.Equal(t, dest, rClone.Primary) + require.Equal(t, "", rClone.Secondary) + require.Equal(t, schedulepb.Checkpoint{ + CheckpointTs: 3, + ResolvedTs: 3, + }, rClone.Checkpoint) // Commit -> Replicating msgs, err = r.handleTableStatus(dest, &schedulepb.TableStatus{ @@ -658,7 +734,7 @@ func TestReplicationSetCaptureShutdown(t *testing.T) { from := "1" tableID := model.TableID(1) - r, err := newReplicationSet(tableID, nil) + r, err := newReplicationSet(tableID, 0, nil) require.Nil(t, err) // Add table, Absent -> Prepare @@ -673,7 +749,7 @@ func TestReplicationSetCaptureShutdown(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -803,7 +879,7 @@ func TestReplicationSetCaptureShutdown(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: false, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, @@ -881,7 +957,7 @@ func TestReplicationSetCaptureShutdown(t *testing.T) { AddTable: &schedulepb.AddTableRequest{ TableID: r.TableID, IsSecondary: true, - Checkpoint: &schedulepb.Checkpoint{CheckpointTs: r.CheckpointTs}, + Checkpoint: r.Checkpoint, }, }, }, diff --git a/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go b/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go index 1a5cfaef53f..5a9bbdef1fe 100644 --- a/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go +++ b/cdc/scheduler/internal/tp/schedulepb/table_schedule.pb.go @@ -162,7 +162,7 @@ func (m *Checkpoint) GetResolvedTs() github_com_pingcap_tiflow_cdc_model.Ts { type AddTableRequest struct { TableID github_com_pingcap_tiflow_cdc_model.TableID `protobuf:"varint,1,opt,name=table_id,json=tableId,proto3,casttype=github.com/pingcap/tiflow/cdc/model.TableID" json:"table_id,omitempty"` IsSecondary bool `protobuf:"varint,2,opt,name=is_secondary,json=isSecondary,proto3" json:"is_secondary,omitempty"` - Checkpoint *Checkpoint `protobuf:"bytes,3,opt,name=checkpoint,proto3" json:"checkpoint,omitempty"` + Checkpoint Checkpoint `protobuf:"bytes,3,opt,name=checkpoint,proto3" json:"checkpoint"` } func (m *AddTableRequest) Reset() { *m = AddTableRequest{} } @@ -212,11 +212,11 @@ func (m *AddTableRequest) GetIsSecondary() bool { return false } -func (m *AddTableRequest) GetCheckpoint() *Checkpoint { +func (m *AddTableRequest) GetCheckpoint() Checkpoint { if m != nil { return m.Checkpoint } - return nil + return Checkpoint{} } type RemoveTableRequest struct { @@ -350,7 +350,7 @@ func (*DispatchTableRequest) XXX_OneofWrappers() []interface{} { type AddTableResponse struct { Status *TableStatus `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` - Checkpoint *Checkpoint `protobuf:"bytes,2,opt,name=checkpoint,proto3" json:"checkpoint,omitempty"` + Checkpoint Checkpoint `protobuf:"bytes,2,opt,name=checkpoint,proto3" json:"checkpoint"` Reject bool `protobuf:"varint,3,opt,name=reject,proto3" json:"reject,omitempty"` } @@ -394,11 +394,11 @@ func (m *AddTableResponse) GetStatus() *TableStatus { return nil } -func (m *AddTableResponse) GetCheckpoint() *Checkpoint { +func (m *AddTableResponse) GetCheckpoint() Checkpoint { if m != nil { return m.Checkpoint } - return nil + return Checkpoint{} } func (m *AddTableResponse) GetReject() bool { @@ -410,7 +410,7 @@ func (m *AddTableResponse) GetReject() bool { type RemoveTableResponse struct { Status *TableStatus `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` - Checkpoint *Checkpoint `protobuf:"bytes,2,opt,name=checkpoint,proto3" json:"checkpoint,omitempty"` + Checkpoint Checkpoint `protobuf:"bytes,2,opt,name=checkpoint,proto3" json:"checkpoint"` } func (m *RemoveTableResponse) Reset() { *m = RemoveTableResponse{} } @@ -453,11 +453,11 @@ func (m *RemoveTableResponse) GetStatus() *TableStatus { return nil } -func (m *RemoveTableResponse) GetCheckpoint() *Checkpoint { +func (m *RemoveTableResponse) GetCheckpoint() Checkpoint { if m != nil { return m.Checkpoint } - return nil + return Checkpoint{} } type DispatchTableResponse struct { @@ -972,78 +972,77 @@ func init() { func init() { proto.RegisterFile("table_schedule.proto", fileDescriptor_ab4bb9c6b16cfa4d) } var fileDescriptor_ab4bb9c6b16cfa4d = []byte{ - // 1127 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x4b, 0x6f, 0xe3, 0xd4, - 0x17, 0x8f, 0x93, 0x34, 0x8f, 0x93, 0x4e, 0xc7, 0x73, 0x27, 0x6d, 0xf3, 0xf7, 0x1f, 0x12, 0x63, - 0xa1, 0xaa, 0x64, 0x98, 0x84, 0xe9, 0xb0, 0x40, 0xc3, 0x02, 0x35, 0x33, 0x45, 0xad, 0x50, 0xe9, - 0xc8, 0xed, 0xf0, 0x12, 0x52, 0xe4, 0xd8, 0x77, 0x12, 0xd3, 0xc4, 0xd7, 0xf8, 0xba, 0xad, 0xba, - 0x41, 0x62, 0x9b, 0x15, 0x2b, 0x76, 0xd9, 0xb1, 0xe2, 0x1b, 0xf0, 0x0d, 0xba, 0x60, 0xd1, 0x15, - 0x42, 0x42, 0x8a, 0xa0, 0xfd, 0x0e, 0x2c, 0xca, 0x06, 0xf9, 0xde, 0x6b, 0x3b, 0x49, 0x33, 0x24, - 0x65, 0x04, 0x62, 0xe7, 0x7b, 0x1e, 0xbf, 0xf3, 0xfa, 0x9d, 0x13, 0x05, 0x8a, 0xbe, 0xd1, 0xea, - 0xe2, 0x26, 0x35, 0x3b, 0xd8, 0x3a, 0xea, 0xe2, 0x9a, 0xeb, 0x11, 0x9f, 0xa0, 0x57, 0x5d, 0xdb, - 0x69, 0x9b, 0x86, 0x5b, 0xf3, 0xed, 0xe7, 0x5d, 0x72, 0x52, 0x33, 0x2d, 0xb3, 0x16, 0x9a, 0xb8, - 0x2d, 0xa5, 0xd8, 0x26, 0x6d, 0xc2, 0x2c, 0xeb, 0xc1, 0x17, 0x77, 0xd2, 0xbe, 0x97, 0x00, 0x1e, - 0x77, 0xb0, 0x79, 0xe8, 0x12, 0xdb, 0xf1, 0xd1, 0x1e, 0xdc, 0x32, 0xa3, 0x57, 0xd3, 0xa7, 0x25, - 0x49, 0x95, 0xd6, 0xd3, 0x8d, 0xea, 0xd5, 0xb0, 0xb2, 0xd6, 0xb6, 0xfd, 0xce, 0x51, 0xab, 0x66, - 0x92, 0x5e, 0x5d, 0x44, 0xaa, 0xf3, 0x48, 0x75, 0xd3, 0x32, 0xeb, 0x3d, 0x62, 0xe1, 0x6e, 0xed, - 0x80, 0xea, 0x8b, 0x31, 0xc0, 0x01, 0x45, 0x1f, 0x40, 0xc1, 0xc3, 0x94, 0x74, 0x8f, 0xb1, 0x15, - 0xc0, 0x25, 0x6f, 0x0c, 0x07, 0xa1, 0xfb, 0x01, 0xd5, 0x7e, 0x92, 0xe0, 0xf6, 0xa6, 0x65, 0x1d, - 0x04, 0xd5, 0xeb, 0xf8, 0xcb, 0x23, 0x4c, 0x7d, 0xf4, 0x0c, 0x72, 0xbc, 0x1b, 0xb6, 0xc5, 0x92, - 0x4d, 0x35, 0x1e, 0x5d, 0x0c, 0x2b, 0x59, 0x66, 0xb3, 0xf3, 0xe4, 0x6a, 0x58, 0xb9, 0x37, 0x57, - 0x20, 0x6e, 0xae, 0x67, 0x19, 0xd6, 0x8e, 0x85, 0x5e, 0x83, 0x45, 0x9b, 0x36, 0x29, 0x36, 0x89, - 0x63, 0x19, 0xde, 0x29, 0x4b, 0x3c, 0xa7, 0x17, 0x6c, 0xba, 0x1f, 0x8a, 0xd0, 0x0e, 0x40, 0x5c, - 0x6a, 0x29, 0xa5, 0x4a, 0xeb, 0x85, 0x8d, 0x37, 0x6a, 0x7f, 0x39, 0x84, 0x5a, 0xdc, 0x6a, 0x7d, - 0xc4, 0x59, 0x3b, 0x04, 0xa4, 0xe3, 0x1e, 0x39, 0xc6, 0xff, 0x42, 0x69, 0xda, 0x99, 0x04, 0xc5, - 0x27, 0x36, 0x75, 0x0d, 0xdf, 0xec, 0x8c, 0xc5, 0xdb, 0x85, 0xbc, 0x61, 0x59, 0x4d, 0x66, 0xc7, - 0x02, 0x16, 0x36, 0x6a, 0x33, 0xea, 0x99, 0x98, 0xc6, 0x76, 0x42, 0xcf, 0x19, 0x42, 0x84, 0x3e, - 0x82, 0x45, 0x8f, 0x15, 0x25, 0x10, 0x93, 0x0c, 0xf1, 0xc1, 0x0c, 0xc4, 0xeb, 0x7d, 0xd8, 0x4e, - 0xe8, 0x05, 0x2f, 0x96, 0x36, 0xf2, 0x90, 0xf5, 0xb8, 0x46, 0xfb, 0x41, 0x02, 0x39, 0x4e, 0x81, - 0xba, 0xc4, 0xa1, 0x18, 0x35, 0x20, 0x43, 0x7d, 0xc3, 0x3f, 0xa2, 0xa2, 0x86, 0xea, 0x8c, 0x88, - 0xcc, 0x7b, 0x9f, 0x79, 0xe8, 0xc2, 0x73, 0x62, 0xb6, 0xc9, 0x97, 0x98, 0x2d, 0x5a, 0x81, 0x8c, - 0x87, 0xbf, 0xc0, 0x26, 0xa7, 0x48, 0x4e, 0x17, 0x2f, 0xed, 0x3b, 0x09, 0xee, 0x8e, 0x15, 0xfb, - 0x9f, 0x4c, 0x5f, 0xfb, 0x51, 0x82, 0xe5, 0x09, 0xb6, 0x88, 0x44, 0x3f, 0xbc, 0x4e, 0x97, 0xfa, - 0xdc, 0x74, 0xe1, 0x18, 0x63, 0x7c, 0xf9, 0x78, 0x2a, 0x5f, 0x36, 0x6e, 0xc2, 0x97, 0x08, 0x75, - 0x8c, 0x30, 0x00, 0x39, 0x4f, 0xa8, 0x34, 0x0c, 0xf9, 0x6d, 0x6c, 0x78, 0x7e, 0x0b, 0x1b, 0x3e, - 0xfa, 0x04, 0xf2, 0xe1, 0x82, 0x05, 0xdd, 0x4e, 0xad, 0xa7, 0x1a, 0xef, 0x5e, 0x0c, 0x2b, 0x39, - 0xb1, 0x32, 0xf4, 0xa6, 0x2b, 0x96, 0x13, 0x2b, 0x46, 0xb5, 0xdf, 0x25, 0x28, 0x8c, 0x0c, 0xe6, - 0x9f, 0xba, 0x52, 0xef, 0xc1, 0x42, 0x30, 0x71, 0xde, 0xab, 0xa5, 0x99, 0x23, 0x8e, 0x32, 0xc2, - 0x3a, 0xf7, 0x43, 0x7b, 0x2f, 0x75, 0xc3, 0x1a, 0xe9, 0xb3, 0x61, 0x25, 0x31, 0x46, 0x97, 0xaf, - 0xe0, 0x4e, 0xd4, 0xdf, 0x88, 0x29, 0xdb, 0x90, 0x61, 0x19, 0xf3, 0x26, 0xdf, 0x88, 0xd2, 0x22, - 0x84, 0xf0, 0x47, 0x15, 0x28, 0x04, 0x67, 0xd9, 0x27, 0x6e, 0x80, 0x20, 0xae, 0x32, 0xd8, 0x74, - 0x5f, 0x48, 0xb4, 0x7b, 0x70, 0x6b, 0xef, 0xc4, 0xc1, 0x9e, 0x8e, 0x8f, 0x6d, 0x6a, 0x13, 0x07, - 0x29, 0xc1, 0xf0, 0xf9, 0x37, 0xef, 0xbc, 0x1e, 0xbd, 0xb5, 0x35, 0x58, 0x7a, 0xea, 0x11, 0x13, - 0x53, 0x4a, 0xbc, 0x2d, 0x97, 0x98, 0x1d, 0x54, 0x84, 0x05, 0x1c, 0x7c, 0x30, 0xd3, 0xbc, 0xce, - 0x1f, 0xda, 0xd7, 0x59, 0xc8, 0xee, 0x62, 0x4a, 0x8d, 0x36, 0x46, 0x5b, 0x90, 0xe9, 0x60, 0xc3, - 0xc2, 0x9e, 0xa0, 0xfc, 0xfd, 0x19, 0xb5, 0x08, 0xbf, 0xda, 0x36, 0x73, 0xd2, 0x85, 0x33, 0xda, - 0x82, 0x5c, 0x8f, 0xb6, 0x9b, 0xfe, 0xa9, 0x1b, 0x0e, 0xaf, 0x3a, 0x1f, 0xd0, 0xc1, 0xa9, 0x8b, - 0xf5, 0x6c, 0x8f, 0xb6, 0x83, 0x0f, 0xb4, 0x05, 0xe9, 0xe7, 0x1e, 0xe9, 0xb1, 0xc9, 0xe5, 0x1b, - 0x0f, 0xae, 0x86, 0x95, 0xfb, 0xf3, 0x10, 0xe9, 0xb1, 0xe1, 0xfa, 0x47, 0x5e, 0x40, 0x25, 0xe6, - 0x8e, 0x36, 0x21, 0xe9, 0x93, 0x52, 0xfa, 0xef, 0x82, 0x24, 0x7d, 0x82, 0x6c, 0x58, 0xb1, 0xc4, - 0x99, 0xe0, 0xfb, 0xdb, 0x14, 0x47, 0xba, 0xb4, 0xc0, 0xfa, 0xf4, 0x70, 0x46, 0x79, 0xd3, 0x7e, - 0x91, 0xf4, 0xa2, 0x35, 0xed, 0x77, 0xaa, 0x0b, 0xab, 0xd7, 0x42, 0x71, 0xa6, 0x95, 0x32, 0x2c, - 0xd6, 0xdb, 0x37, 0x8b, 0xc5, 0x7d, 0xf5, 0x65, 0x6b, 0xea, 0x99, 0x7b, 0x1f, 0xf2, 0x9d, 0x90, - 0xd1, 0xa5, 0x2c, 0xc3, 0x5f, 0x9f, 0x81, 0x1f, 0x6f, 0x40, 0xec, 0x8a, 0x9a, 0x80, 0xa2, 0x47, - 0x9c, 0x70, 0x8e, 0x01, 0xbe, 0x35, 0x37, 0x60, 0x98, 0xec, 0x9d, 0xce, 0xa4, 0x48, 0xf9, 0x45, - 0x82, 0x0c, 0x67, 0x19, 0x2a, 0x41, 0xf6, 0x18, 0x7b, 0x11, 0xe7, 0xf3, 0x7a, 0xf8, 0x44, 0x9f, - 0xc2, 0x12, 0x09, 0xf6, 0xa3, 0x19, 0x2d, 0x05, 0x3f, 0xb3, 0x6f, 0xce, 0xc8, 0x60, 0x6c, 0xa9, - 0xc4, 0x52, 0xde, 0x22, 0x63, 0x9b, 0xf6, 0x39, 0xdc, 0x76, 0xc3, 0x6d, 0x6a, 0xf2, 0x2d, 0x4a, - 0xcd, 0xb5, 0x22, 0xe3, 0x3b, 0x28, 0xc0, 0x97, 0xdc, 0x31, 0x69, 0xf5, 0xdb, 0x24, 0x40, 0x7c, - 0xbf, 0x90, 0x06, 0xd9, 0x67, 0xce, 0xa1, 0x43, 0x4e, 0x1c, 0x39, 0xa1, 0x2c, 0xf7, 0x07, 0xea, - 0x9d, 0x58, 0x29, 0x14, 0x48, 0x85, 0xcc, 0x66, 0x8b, 0x62, 0xc7, 0x97, 0x25, 0xa5, 0xd8, 0x1f, - 0xa8, 0x72, 0x6c, 0xc2, 0xe5, 0x68, 0x0d, 0xf2, 0x4f, 0x3d, 0xec, 0x1a, 0x9e, 0xed, 0xb4, 0xe5, - 0xa4, 0xb2, 0xda, 0x1f, 0xa8, 0x77, 0x63, 0xa3, 0x48, 0x85, 0x5e, 0x87, 0x1c, 0x7f, 0x60, 0x4b, - 0x4e, 0x29, 0x2b, 0xfd, 0x81, 0x8a, 0x26, 0xcd, 0xb0, 0x85, 0xaa, 0x50, 0xd0, 0xb1, 0xdb, 0xb5, - 0x4d, 0xc3, 0x0f, 0xf0, 0xd2, 0xca, 0xff, 0xfa, 0x03, 0x75, 0x79, 0xe4, 0xe8, 0xc6, 0xca, 0x00, - 0x31, 0xbc, 0x59, 0xf2, 0xc2, 0x24, 0x62, 0xa8, 0x09, 0xaa, 0x64, 0xdf, 0xd8, 0x92, 0x33, 0x93, - 0x55, 0x0a, 0x45, 0xf5, 0x0f, 0x09, 0x0a, 0x23, 0xb7, 0x01, 0x95, 0x01, 0x76, 0x69, 0x3b, 0x6e, - 0xce, 0x52, 0x7f, 0xa0, 0x8e, 0x48, 0xd0, 0x3b, 0xb0, 0xba, 0x4b, 0xdb, 0xd3, 0xd6, 0x4d, 0x96, - 0x94, 0xff, 0xf7, 0x07, 0xea, 0x8b, 0xd4, 0xe8, 0x11, 0x94, 0xae, 0xab, 0x38, 0xf9, 0xe4, 0xa4, - 0xf2, 0x4a, 0x7f, 0xa0, 0xbe, 0x50, 0x8f, 0x34, 0x58, 0xdc, 0xa5, 0xed, 0x88, 0xc7, 0x72, 0x4a, - 0x91, 0xfb, 0x03, 0x75, 0x4c, 0x86, 0x36, 0xa0, 0x38, 0xfa, 0x8e, 0xb0, 0xd3, 0x4a, 0xa9, 0x3f, - 0x50, 0xa7, 0xea, 0x1a, 0xeb, 0xe7, 0xbf, 0x95, 0x13, 0x67, 0x17, 0x65, 0xe9, 0xfc, 0xa2, 0x2c, - 0xfd, 0x7a, 0x51, 0x96, 0xbe, 0xb9, 0x2c, 0x27, 0xce, 0x2f, 0xcb, 0x89, 0x9f, 0x2f, 0xcb, 0x89, - 0xcf, 0x20, 0x66, 0x59, 0x2b, 0xc3, 0xfe, 0xf0, 0x3c, 0xfc, 0x33, 0x00, 0x00, 0xff, 0xff, 0xb8, - 0xe4, 0x98, 0x15, 0x3d, 0x0d, 0x00, 0x00, + // 1119 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x4b, 0x6f, 0xe3, 0x54, + 0x14, 0x8e, 0x93, 0x34, 0x8f, 0x93, 0x4e, 0xc7, 0xbd, 0x93, 0xb6, 0xc1, 0x40, 0x62, 0x2c, 0x54, + 0x95, 0x0c, 0x93, 0x30, 0x1d, 0x16, 0x68, 0x58, 0xa0, 0x66, 0xa6, 0xa8, 0x23, 0x54, 0x3a, 0x72, + 0x3b, 0xbc, 0x84, 0x14, 0x39, 0xf6, 0x9d, 0xc4, 0x34, 0xf1, 0x35, 0xbe, 0x6e, 0xab, 0x6e, 0x90, + 0xd8, 0x66, 0xc5, 0x8a, 0x5d, 0xfe, 0x00, 0x7f, 0x82, 0x6d, 0x17, 0x2c, 0xba, 0x44, 0x02, 0x45, + 0xd0, 0xfe, 0x07, 0x16, 0x65, 0x83, 0x7c, 0xef, 0xb5, 0x9d, 0xa4, 0x19, 0x92, 0xf2, 0x12, 0x3b, + 0xdf, 0xf3, 0xf8, 0xce, 0xeb, 0x3b, 0x27, 0x0a, 0x14, 0x7d, 0xa3, 0xd5, 0xc5, 0x4d, 0x6a, 0x76, + 0xb0, 0x75, 0xd4, 0xc5, 0x35, 0xd7, 0x23, 0x3e, 0x41, 0xaf, 0xba, 0xb6, 0xd3, 0x36, 0x0d, 0xb7, + 0xe6, 0xdb, 0xcf, 0xbb, 0xe4, 0xa4, 0x66, 0x5a, 0x66, 0x2d, 0x34, 0x71, 0x5b, 0x4a, 0xb1, 0x4d, + 0xda, 0x84, 0x59, 0xd6, 0x83, 0x2f, 0xee, 0xa4, 0x7d, 0x27, 0x01, 0x3c, 0xea, 0x60, 0xf3, 0xd0, + 0x25, 0xb6, 0xe3, 0xa3, 0x3d, 0xb8, 0x65, 0x46, 0xaf, 0xa6, 0x4f, 0x4b, 0x92, 0x2a, 0x6d, 0xa4, + 0x1b, 0xd5, 0xab, 0x61, 0x65, 0xbd, 0x6d, 0xfb, 0x9d, 0xa3, 0x56, 0xcd, 0x24, 0xbd, 0xba, 0x88, + 0x54, 0xe7, 0x91, 0xea, 0xa6, 0x65, 0xd6, 0x7b, 0xc4, 0xc2, 0xdd, 0xda, 0x01, 0xd5, 0x17, 0x63, + 0x80, 0x03, 0x8a, 0x3e, 0x80, 0x82, 0x87, 0x29, 0xe9, 0x1e, 0x63, 0x2b, 0x80, 0x4b, 0xde, 0x18, + 0x0e, 0x42, 0xf7, 0x03, 0xaa, 0xfd, 0x2c, 0xc1, 0xed, 0x2d, 0xcb, 0x3a, 0x08, 0xaa, 0xd7, 0xf1, + 0x97, 0x47, 0x98, 0xfa, 0xe8, 0x19, 0xe4, 0x78, 0x37, 0x6c, 0x8b, 0x25, 0x9b, 0x6a, 0x3c, 0xbc, + 0x18, 0x56, 0xb2, 0xcc, 0xe6, 0xc9, 0xe3, 0xab, 0x61, 0xe5, 0xee, 0x5c, 0x81, 0xb8, 0xb9, 0x9e, + 0x65, 0x58, 0x4f, 0x2c, 0xf4, 0x1a, 0x2c, 0xda, 0xb4, 0x49, 0xb1, 0x49, 0x1c, 0xcb, 0xf0, 0x4e, + 0x59, 0xe2, 0x39, 0xbd, 0x60, 0xd3, 0xfd, 0x50, 0x84, 0xf6, 0x00, 0xe2, 0x52, 0x4b, 0x29, 0x55, + 0xda, 0x28, 0x6c, 0xbe, 0x51, 0xfb, 0xd3, 0x21, 0xd4, 0xe2, 0x56, 0x37, 0xd2, 0x67, 0xc3, 0x4a, + 0x42, 0x1f, 0x81, 0xd0, 0x0e, 0x01, 0xe9, 0xb8, 0x47, 0x8e, 0xf1, 0x7f, 0x50, 0xa0, 0x76, 0x26, + 0x41, 0xf1, 0xb1, 0x4d, 0x5d, 0xc3, 0x37, 0x3b, 0x63, 0xf1, 0x76, 0x21, 0x6f, 0x58, 0x56, 0x93, + 0xd9, 0xb1, 0x80, 0x85, 0xcd, 0xda, 0x8c, 0xaa, 0x26, 0x66, 0xb2, 0x93, 0xd0, 0x73, 0x86, 0x10, + 0xa1, 0x8f, 0x60, 0xd1, 0x63, 0x45, 0x09, 0xc4, 0x24, 0x43, 0xbc, 0x3f, 0x03, 0xf1, 0x7a, 0x1f, + 0x76, 0x12, 0x7a, 0xc1, 0x8b, 0xa5, 0x8d, 0x3c, 0x64, 0x3d, 0xae, 0xd1, 0xbe, 0x97, 0x40, 0x8e, + 0x53, 0xa0, 0x2e, 0x71, 0x28, 0x46, 0x0d, 0xc8, 0x50, 0xdf, 0xf0, 0x8f, 0xa8, 0xa8, 0xa1, 0x3a, + 0x23, 0x22, 0xf3, 0xde, 0x67, 0x1e, 0xba, 0xf0, 0x9c, 0x98, 0x70, 0xf2, 0x6f, 0x4f, 0x18, 0xad, + 0x42, 0xc6, 0xc3, 0x5f, 0x60, 0x93, 0xd3, 0x25, 0xa7, 0x8b, 0x57, 0xb0, 0x85, 0x77, 0xc6, 0x4a, + 0xfe, 0x1f, 0x17, 0xa1, 0xfd, 0x20, 0xc1, 0xca, 0x04, 0x73, 0x44, 0xba, 0x1f, 0x5e, 0xa7, 0x4e, + 0x7d, 0x6e, 0xea, 0x70, 0x8c, 0x31, 0xee, 0x7c, 0x3c, 0x95, 0x3b, 0x9b, 0x37, 0xe1, 0x4e, 0x84, + 0x3a, 0x46, 0x1e, 0x80, 0x9c, 0x27, 0x54, 0x1a, 0x86, 0xfc, 0x0e, 0x36, 0x3c, 0xbf, 0x85, 0x0d, + 0x1f, 0x7d, 0x02, 0xf9, 0x70, 0xd9, 0x82, 0x9e, 0xa7, 0x36, 0x52, 0x8d, 0x77, 0x2f, 0x86, 0x95, + 0x9c, 0x58, 0x1f, 0x7a, 0xd3, 0x75, 0xcb, 0x89, 0x75, 0xa3, 0xda, 0x6f, 0x12, 0x14, 0x46, 0xc6, + 0xf3, 0x6f, 0xdd, 0xad, 0xf7, 0x60, 0x21, 0x98, 0x3b, 0xef, 0xd5, 0xd2, 0xcc, 0x41, 0x47, 0x19, + 0x61, 0x9d, 0xfb, 0xfd, 0xf3, 0x57, 0xed, 0x2b, 0x58, 0x8e, 0xfa, 0x1b, 0x31, 0x65, 0x07, 0x32, + 0x2c, 0x63, 0xde, 0xe4, 0x1b, 0x11, 0x5b, 0x84, 0x10, 0xfe, 0xa8, 0x02, 0x85, 0xe0, 0x50, 0xfb, + 0xc4, 0x0d, 0x10, 0xc4, 0x9d, 0x06, 0x9b, 0xee, 0x0b, 0x89, 0x76, 0x17, 0x6e, 0xed, 0x9d, 0x38, + 0xd8, 0xd3, 0xf1, 0xb1, 0x4d, 0x6d, 0xe2, 0x20, 0x25, 0x18, 0x3e, 0xff, 0xe6, 0x9d, 0xd7, 0xa3, + 0xb7, 0xb6, 0x0e, 0x4b, 0x4f, 0x3d, 0x62, 0x62, 0x4a, 0x89, 0xb7, 0xed, 0x12, 0xb3, 0x83, 0x8a, + 0xb0, 0x80, 0x83, 0x0f, 0x66, 0x9a, 0xd7, 0xf9, 0x43, 0xfb, 0x3a, 0x0b, 0xd9, 0x5d, 0x4c, 0xa9, + 0xd1, 0xc6, 0x68, 0x1b, 0x32, 0x1d, 0x6c, 0x58, 0xd8, 0x13, 0x94, 0xbf, 0x37, 0xa3, 0x16, 0xe1, + 0x57, 0xdb, 0x61, 0x4e, 0xba, 0x70, 0x46, 0xdb, 0x90, 0xeb, 0xd1, 0x76, 0xd3, 0x3f, 0x75, 0xc3, + 0xe1, 0x55, 0xe7, 0x03, 0x3a, 0x38, 0x75, 0xb1, 0x9e, 0xed, 0xd1, 0x76, 0xf0, 0x81, 0xb6, 0x21, + 0xfd, 0xdc, 0x23, 0x3d, 0x36, 0xb9, 0x7c, 0xe3, 0xfe, 0xd5, 0xb0, 0x72, 0x6f, 0x1e, 0x22, 0x3d, + 0x32, 0x5c, 0xff, 0xc8, 0x0b, 0xa8, 0xc4, 0xdc, 0xd1, 0x16, 0x24, 0x7d, 0x52, 0x4a, 0xff, 0x55, + 0x90, 0xa4, 0x4f, 0x90, 0x0d, 0xab, 0x96, 0x38, 0x13, 0x7c, 0x7f, 0x9b, 0xe2, 0x60, 0x97, 0x16, + 0x58, 0x9f, 0x1e, 0xcc, 0x28, 0x6f, 0xda, 0xaf, 0x93, 0x5e, 0xb4, 0xa6, 0xfd, 0x66, 0x75, 0x61, + 0xed, 0x5a, 0x28, 0xce, 0xb4, 0x52, 0x86, 0xc5, 0x7a, 0xfb, 0x66, 0xb1, 0xb8, 0xaf, 0xbe, 0x62, + 0x4d, 0x3d, 0x73, 0xef, 0x43, 0xbe, 0x13, 0x32, 0xba, 0x94, 0x65, 0xf8, 0x1b, 0x33, 0xf0, 0xe3, + 0x0d, 0x88, 0x5d, 0x51, 0x13, 0x50, 0xf4, 0x88, 0x13, 0xce, 0x31, 0xc0, 0xb7, 0xe6, 0x06, 0x0c, + 0x93, 0x5d, 0xee, 0x4c, 0x8a, 0x94, 0x9f, 0x24, 0xc8, 0x70, 0x96, 0xa1, 0x12, 0x64, 0x8f, 0xb1, + 0x17, 0x71, 0x3e, 0xaf, 0x87, 0x4f, 0xf4, 0x29, 0x2c, 0x91, 0x60, 0x3f, 0x9a, 0xd1, 0x52, 0xf0, + 0x33, 0xfb, 0xe6, 0x8c, 0x0c, 0xc6, 0x96, 0x4a, 0x2c, 0xe5, 0x2d, 0x32, 0xb6, 0x69, 0x9f, 0xc3, + 0x6d, 0x37, 0xdc, 0xa6, 0x26, 0xdf, 0xa2, 0xd4, 0x5c, 0x2b, 0x32, 0xbe, 0x83, 0x02, 0x7c, 0xc9, + 0x1d, 0x93, 0x56, 0xbf, 0x4d, 0x02, 0xc4, 0xf7, 0x0b, 0x69, 0x90, 0x7d, 0xe6, 0x1c, 0x3a, 0xe4, + 0xc4, 0x91, 0x13, 0xca, 0x4a, 0x7f, 0xa0, 0x2e, 0xc7, 0x4a, 0xa1, 0x40, 0x2a, 0x64, 0xb6, 0x5a, + 0x14, 0x3b, 0xbe, 0x2c, 0x29, 0xc5, 0xfe, 0x40, 0x95, 0x63, 0x13, 0x2e, 0x47, 0xeb, 0x90, 0x7f, + 0xea, 0x61, 0xd7, 0xf0, 0x6c, 0xa7, 0x2d, 0x27, 0x95, 0xb5, 0xfe, 0x40, 0xbd, 0x13, 0x1b, 0x45, + 0x2a, 0xf4, 0x3a, 0xe4, 0xf8, 0x03, 0x5b, 0x72, 0x4a, 0x59, 0xed, 0x0f, 0x54, 0x34, 0x69, 0x86, + 0x2d, 0x54, 0x85, 0x82, 0x8e, 0xdd, 0xae, 0x6d, 0x1a, 0x7e, 0x80, 0x97, 0x56, 0x5e, 0xea, 0x0f, + 0xd4, 0x95, 0x91, 0xa3, 0x1b, 0x2b, 0x03, 0xc4, 0xf0, 0x66, 0xc9, 0x0b, 0x93, 0x88, 0xa1, 0x26, + 0xa8, 0x92, 0x7d, 0x63, 0x4b, 0xce, 0x4c, 0x56, 0x29, 0x14, 0xd5, 0xdf, 0x25, 0x28, 0x8c, 0xdc, + 0x06, 0x54, 0x06, 0xd8, 0xa5, 0xed, 0xb8, 0x39, 0x4b, 0xfd, 0x81, 0x3a, 0x22, 0x41, 0xef, 0xc0, + 0xda, 0x2e, 0x6d, 0x4f, 0x5b, 0x37, 0x59, 0x52, 0x5e, 0xee, 0x0f, 0xd4, 0x17, 0xa9, 0xd1, 0x43, + 0x28, 0x5d, 0x57, 0x71, 0xf2, 0xc9, 0x49, 0xe5, 0x95, 0xfe, 0x40, 0x7d, 0xa1, 0x1e, 0x69, 0xb0, + 0xb8, 0x4b, 0xdb, 0x11, 0x8f, 0xe5, 0x94, 0x22, 0xf7, 0x07, 0xea, 0x98, 0x0c, 0x6d, 0x42, 0x71, + 0xf4, 0x1d, 0x61, 0xa7, 0x95, 0x52, 0x7f, 0xa0, 0x4e, 0xd5, 0x35, 0x36, 0xce, 0x7f, 0x2d, 0x27, + 0xce, 0x2e, 0xca, 0xd2, 0xf9, 0x45, 0x59, 0xfa, 0xe5, 0xa2, 0x2c, 0x7d, 0x73, 0x59, 0x4e, 0x9c, + 0x5f, 0x96, 0x13, 0x3f, 0x5e, 0x96, 0x13, 0x9f, 0x41, 0xcc, 0xb2, 0x56, 0x86, 0xfd, 0x05, 0x7a, + 0xf0, 0x47, 0x00, 0x00, 0x00, 0xff, 0xff, 0x36, 0x49, 0xc0, 0x38, 0x4f, 0x0d, 0x00, 0x00, } func (m *Checkpoint) Marshal() (dAtA []byte, err error) { @@ -1099,18 +1098,16 @@ func (m *AddTableRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.Checkpoint != nil { - { - size, err := m.Checkpoint.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintTableSchedule(dAtA, i, uint64(size)) + { + size, err := m.Checkpoint.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err } - i-- - dAtA[i] = 0x1a + i -= size + i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } + i-- + dAtA[i] = 0x1a if m.IsSecondary { i-- if m.IsSecondary { @@ -1261,18 +1258,16 @@ func (m *AddTableResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x18 } - if m.Checkpoint != nil { - { - size, err := m.Checkpoint.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintTableSchedule(dAtA, i, uint64(size)) + { + size, err := m.Checkpoint.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err } - i-- - dAtA[i] = 0x12 + i -= size + i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } + i-- + dAtA[i] = 0x12 if m.Status != nil { { size, err := m.Status.MarshalToSizedBuffer(dAtA[:i]) @@ -1308,18 +1303,16 @@ func (m *RemoveTableResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.Checkpoint != nil { - { - size, err := m.Checkpoint.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintTableSchedule(dAtA, i, uint64(size)) + { + size, err := m.Checkpoint.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err } - i-- - dAtA[i] = 0x12 + i -= size + i = encodeVarintTableSchedule(dAtA, i, uint64(size)) } + i-- + dAtA[i] = 0x12 if m.Status != nil { { size, err := m.Status.MarshalToSizedBuffer(dAtA[:i]) @@ -1789,10 +1782,8 @@ func (m *AddTableRequest) Size() (n int) { if m.IsSecondary { n += 2 } - if m.Checkpoint != nil { - l = m.Checkpoint.Size() - n += 1 + l + sovTableSchedule(uint64(l)) - } + l = m.Checkpoint.Size() + n += 1 + l + sovTableSchedule(uint64(l)) return n } @@ -1854,10 +1845,8 @@ func (m *AddTableResponse) Size() (n int) { l = m.Status.Size() n += 1 + l + sovTableSchedule(uint64(l)) } - if m.Checkpoint != nil { - l = m.Checkpoint.Size() - n += 1 + l + sovTableSchedule(uint64(l)) - } + l = m.Checkpoint.Size() + n += 1 + l + sovTableSchedule(uint64(l)) if m.Reject { n += 2 } @@ -1874,10 +1863,8 @@ func (m *RemoveTableResponse) Size() (n int) { l = m.Status.Size() n += 1 + l + sovTableSchedule(uint64(l)) } - if m.Checkpoint != nil { - l = m.Checkpoint.Size() - n += 1 + l + sovTableSchedule(uint64(l)) - } + l = m.Checkpoint.Size() + n += 1 + l + sovTableSchedule(uint64(l)) return n } @@ -2241,9 +2228,6 @@ func (m *AddTableRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.Checkpoint == nil { - m.Checkpoint = &Checkpoint{} - } if err := m.Checkpoint.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -2552,9 +2536,6 @@ func (m *AddTableResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.Checkpoint == nil { - m.Checkpoint = &Checkpoint{} - } if err := m.Checkpoint.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } @@ -2694,9 +2675,6 @@ func (m *RemoveTableResponse) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.Checkpoint == nil { - m.Checkpoint = &Checkpoint{} - } if err := m.Checkpoint.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } diff --git a/proto/table_schedule.proto b/proto/table_schedule.proto index a7b2c3667a5..d52152e4cec 100644 --- a/proto/table_schedule.proto +++ b/proto/table_schedule.proto @@ -30,7 +30,7 @@ message AddTableRequest { (gogoproto.customname) = "TableID" ]; bool is_secondary = 2; - Checkpoint checkpoint = 3; + Checkpoint checkpoint = 3 [(gogoproto.nullable) = false]; } message RemoveTableRequest { @@ -49,13 +49,13 @@ message DispatchTableRequest { message AddTableResponse { TableStatus status = 1; - Checkpoint checkpoint = 2; + Checkpoint checkpoint = 2 [(gogoproto.nullable) = false]; bool reject = 3; } message RemoveTableResponse { TableStatus status = 1; - Checkpoint checkpoint = 2; + Checkpoint checkpoint = 2 [(gogoproto.nullable) = false]; } message DispatchTableResponse {