Skip to content

Commit

Permalink
tp: support per-table checkpoint (pingcap#5709)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Jun 23, 2022
1 parent 3f5e584 commit d84f7fe
Show file tree
Hide file tree
Showing 11 changed files with 530 additions and 253 deletions.
6 changes: 3 additions & 3 deletions cdc/scheduler/internal/tp/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (a *agent) handleMessageDispatchTableRequest(
}
task = &dispatchTableTask{
TableID: req.AddTable.GetTableID(),
StartTs: req.AddTable.GetCheckpoint().GetCheckpointTs(),
StartTs: req.AddTable.GetCheckpoint().CheckpointTs,
IsRemove: false,
IsPrepare: req.AddTable.GetIsSecondary(),
Epoch: epoch,
Expand Down Expand Up @@ -387,7 +387,7 @@ func (a *agent) newRemoveTableResponseMessage(
Response: &schedulepb.DispatchTableResponse_RemoveTable{
RemoveTable: &schedulepb.RemoveTableResponse{
Status: &status,
Checkpoint: &status.Checkpoint,
Checkpoint: status.Checkpoint,
},
},
},
Expand Down Expand Up @@ -453,7 +453,7 @@ func (a *agent) newAddTableResponseMessage(
Response: &schedulepb.DispatchTableResponse_AddTable{
AddTable: &schedulepb.AddTableResponse{
Status: &status,
Checkpoint: &status.Checkpoint,
Checkpoint: status.Checkpoint,
Reject: reject,
},
},
Expand Down
7 changes: 3 additions & 4 deletions cdc/scheduler/internal/tp/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func TestAgentCollectTableStatus(t *testing.T) {
a.runningTasks[model.TableID(0)] = &dispatchTableTask{IsRemove: true}
status := a.newTableStatus(model.TableID(0))
require.Equal(t, schedulepb.TableStateStopping, status.State)

}

func TestAgentHandleDispatchTableTask(t *testing.T) {
Expand Down Expand Up @@ -221,13 +220,13 @@ func TestAgentHandleMessageStopping(t *testing.T) {
AddTable: &schedulepb.AddTableRequest{
TableID: 1,
IsSecondary: true,
Checkpoint: &schedulepb.Checkpoint{},
},
},
},
}
// add table request should not be handled, so the running task count is 0.
response = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.Len(t, response, 0)
require.Len(t, a.runningTasks, 0)

// mock agent have running task before stopping but processed yet.
Expand Down Expand Up @@ -288,7 +287,7 @@ func TestAgentHandleMessage(t *testing.T) {
AddTable: &schedulepb.AddTableRequest{
TableID: 1,
IsSecondary: true,
Checkpoint: &schedulepb.Checkpoint{},
Checkpoint: schedulepb.Checkpoint{},
},
},
},
Expand Down Expand Up @@ -392,7 +391,7 @@ func TestAgentTick(t *testing.T) {
AddTable: &schedulepb.AddTableRequest{
TableID: 1,
IsSecondary: true,
Checkpoint: &schedulepb.Checkpoint{},
Checkpoint: schedulepb.Checkpoint{},
},
},
},
Expand Down
4 changes: 0 additions & 4 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,6 @@ func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *capture
}
}

func (c *captureManager) CaptureTableSets() map[model.CaptureID]*CaptureStatus {
return c.Captures
}

func (c *captureManager) CheckAllCaptureInitialized() bool {
if !c.checkAllCaptureInitialized() {
return false
Expand Down
31 changes: 15 additions & 16 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"go.uber.org/zap"
)

const checkpointCannotProceed = internal.CheckpointCannotProceed

type scheduler interface {
Name() string
Schedule(
Expand Down Expand Up @@ -82,11 +84,7 @@ func (c *coordinator) Tick(
// All captures that are alive according to the latest Etcd states.
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
err = c.poll(ctx, checkpointTs, currentTables, aliveCaptures)
if err != nil {
return internal.CheckpointCannotProceed, internal.CheckpointCannotProceed, errors.Trace(err)
}
return internal.CheckpointCannotProceed, internal.CheckpointCannotProceed, nil
return c.poll(ctx, checkpointTs, currentTables, aliveCaptures)
}

func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {}
Expand All @@ -100,10 +98,10 @@ func (c *coordinator) Close(ctx context.Context) {}
func (c *coordinator) poll(
ctx context.Context, checkpointTs model.Ts, currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) error {
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
recvMsgs, err := c.recvMsgs(ctx)
if err != nil {
return errors.Trace(err)
return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err)
}

var msgBuf []*schedulepb.Message
Expand All @@ -112,25 +110,25 @@ func (c *coordinator) poll(
msgBuf = append(msgBuf, msgs...)
msgs = c.captureM.HandleAliveCaptureUpdate(aliveCaptures)
msgBuf = append(msgBuf, msgs...)
if c.captureM.CheckAllCaptureInitialized() {
if !c.captureM.CheckAllCaptureInitialized() {
// Skip handling messages and tasks for replication manager,
// as not all capture are initialized.
return c.sendMsgs(ctx, msgBuf)
return checkpointCannotProceed, checkpointCannotProceed, c.sendMsgs(ctx, msgBuf)
}

// Handle capture membership changes.
if changes := c.captureM.TakeChanges(); changes != nil {
msgs, err = c.replicationM.HandleCaptureChanges(changes)
msgs, err = c.replicationM.HandleCaptureChanges(changes, checkpointTs)
if err != nil {
return errors.Trace(err)
return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err)
}
msgBuf = append(msgBuf, msgs...)
}

// Handle received messages to advance replication set.
msgs, err = c.replicationM.HandleMessage(recvMsgs)
if err != nil {
return errors.Trace(err)
return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err)
}
msgBuf = append(msgBuf, msgs...)

Expand All @@ -150,18 +148,19 @@ func (c *coordinator) poll(
// Handle generated schedule tasks.
msgs, err = c.replicationM.HandleTasks(allTasks)
if err != nil {
return errors.Trace(err)
return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err)
}
msgBuf = append(msgBuf, msgs...)

// Send new messages.
err = c.sendMsgs(ctx, msgBuf)
if err != nil {
return errors.Trace(err)
return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err)
}

// checkpoint calculation
return nil
// Checkpoint calculation
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables)
return newCheckpointTs, newResolvedTs, nil
}

func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) {
Expand Down
Loading

0 comments on commit d84f7fe

Please sign in to comment.