Skip to content

Commit

Permalink
tp: implement replicationManager (#5606)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus authored May 26, 2022
1 parent cca06c1 commit c7c6354
Show file tree
Hide file tree
Showing 15 changed files with 1,582 additions and 421 deletions.
14 changes: 7 additions & 7 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ type captureManager struct {
func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *captureManager {
return &captureManager{
OwnerRev: rev,
Captures: make(map[string]*CaptureStatus),
Captures: make(map[model.CaptureID]*CaptureStatus),
heartbeatTick: heartbeatTick,
}
}

func (c *captureManager) captureTableSets() map[model.CaptureID]*CaptureStatus {
func (c *captureManager) CaptureTableSets() map[model.CaptureID]*CaptureStatus {
return c.Captures
}

func (c *captureManager) checkAllCaptureInitialized() bool {
func (c *captureManager) CheckAllCaptureInitialized() bool {
for _, captrueStatus := range c.Captures {
if captrueStatus.State == CaptureStateUninitialize {
return false
Expand All @@ -103,7 +103,7 @@ func (c *captureManager) checkAllCaptureInitialized() bool {
return true
}

func (c *captureManager) tick() []*schedulepb.Message {
func (c *captureManager) Tick() []*schedulepb.Message {
c.tickCounter++
if c.tickCounter < c.heartbeatTick {
return nil
Expand All @@ -120,10 +120,10 @@ func (c *captureManager) tick() []*schedulepb.Message {
return msgs
}

func (c *captureManager) poll(
func (c *captureManager) Poll(
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
msgs []*schedulepb.Message,
) ([]*schedulepb.Message, bool) {
) []*schedulepb.Message {
outMsgs := c.onAliveCaptureUpdate(aliveCaptures)
for _, msg := range msgs {
if msg.MsgType == schedulepb.MsgHeartbeatResponse {
Expand All @@ -135,7 +135,7 @@ func (c *captureManager) poll(
msg.GetHeartbeatResponse(), msg.Header.ProcessorEpoch)
}
}
return outMsgs, c.checkAllCaptureInitialized()
return outMsgs
}

func (c *captureManager) onAliveCaptureUpdate(
Expand Down
41 changes: 26 additions & 15 deletions cdc/scheduler/internal/tp/capture_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,34 +57,45 @@ func TestCaptureManagerPoll(t *testing.T) {
cm := newCaptureManager(rev, 2)

// Initial poll for alive captures.
msgs, hasInit := cm.poll(ms, nil)
require.False(t, hasInit)
msgs := cm.Poll(ms, nil)
require.ElementsMatch(t, []*schedulepb.Message{
{To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
{To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
}, msgs)
require.False(t, cm.CheckAllCaptureInitialized())

// Poll one response
msgs, hasInit = cm.poll(ms, []*schedulepb.Message{
msgs = cm.Poll(ms, []*schedulepb.Message{
{
Header: &schedulepb.Message_Header{}, From: "1",
MsgType: schedulepb.MsgHeartbeatResponse,
HeartbeatResponse: &schedulepb.HeartbeatResponse{},
},
})
require.False(t, hasInit)
require.Empty(t, msgs)
require.False(t, cm.CheckAllCaptureInitialized())

// Poll another response
msgs, hasInit = cm.poll(ms, []*schedulepb.Message{
msgs = cm.Poll(ms, []*schedulepb.Message{
{
Header: &schedulepb.Message_Header{}, From: "2",
MsgType: schedulepb.MsgHeartbeatResponse,
HeartbeatResponse: &schedulepb.HeartbeatResponse{},
},
})
require.True(t, hasInit, "%v %v", cm.Captures["1"], cm.Captures["2"])
require.Empty(t, msgs)
require.True(t, cm.CheckAllCaptureInitialized(), "%v %v", cm.Captures["1"], cm.Captures["2"])

// Poll unknown capture response
msgs = cm.Poll(ms, []*schedulepb.Message{
{
Header: &schedulepb.Message_Header{}, From: "unknown",
MsgType: schedulepb.MsgHeartbeatResponse,
HeartbeatResponse: &schedulepb.HeartbeatResponse{},
},
})
require.Empty(t, msgs)
require.True(t, cm.CheckAllCaptureInitialized())
}

func TestCaptureManagerTick(t *testing.T) {
Expand All @@ -94,22 +105,22 @@ func TestCaptureManagerTick(t *testing.T) {
cm := newCaptureManager(rev, 2)

// No heartbeat if there is no capture.
msgs := cm.tick()
msgs := cm.Tick()
require.Empty(t, msgs)
msgs = cm.tick()
msgs = cm.Tick()
require.Empty(t, msgs)

ms := map[model.CaptureID]*model.CaptureInfo{
"1": {},
"2": {},
}
_, hasInit := cm.poll(ms, nil)
require.False(t, hasInit)
cm.Poll(ms, nil)
require.False(t, cm.CheckAllCaptureInitialized())

// Heartbeat even if capture is uninitialize.
msgs = cm.tick()
msgs = cm.Tick()
require.Empty(t, msgs)
msgs = cm.tick()
msgs = cm.Tick()
require.ElementsMatch(t, []*schedulepb.Message{
{To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
{To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
Expand All @@ -119,10 +130,10 @@ func TestCaptureManagerTick(t *testing.T) {
for _, s := range []CaptureState{CaptureStateInitialized, CaptureStateStopping} {
cm.Captures["1"].State = s
cm.Captures["2"].State = s
require.True(t, cm.checkAllCaptureInitialized())
msgs = cm.tick()
require.True(t, cm.CheckAllCaptureInitialized())
msgs = cm.Tick()
require.Empty(t, msgs)
msgs = cm.tick()
msgs = cm.Tick()
require.ElementsMatch(t, []*schedulepb.Message{
{To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
{To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
Expand Down
94 changes: 87 additions & 7 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@ package tp

import (
"context"
"log"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler/internal"
"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
)

type scheduler interface {
Expand All @@ -34,12 +40,38 @@ type scheduler interface {
var _ internal.Scheduler = (*coordinator)(nil)

type coordinator struct {
version string
revision schedulepb.OwnerRevision
trans transport
scheduler []scheduler
replicationM *replicationManager
captureM *captureManager
}

// NewCoordinator returns a two phase scheduler.
func NewCoordinator(
ctx context.Context,
changeFeedID model.ChangeFeedID,
checkpointTs model.Ts,
messageServer *p2p.MessageServer,
messageRouter p2p.MessageRouter,
ownerRevision int64,
cfg *config.SchedulerConfig,
) (internal.Scheduler, error) {
trans, err := newTranport(ctx, changeFeedID, messageServer, messageRouter)
if err != nil {
return nil, errors.Trace(err)
}
revision := schedulepb.OwnerRevision{Revision: ownerRevision}
return &coordinator{
version: version.ReleaseSemver(),
revision: revision,
trans: trans,
replicationM: newReplicationManager(cfg.MaxTaskConcurrency),
captureM: newCaptureManager(revision, cfg.HeartbeatTick),
}, nil
}

func (c *coordinator) Tick(
ctx context.Context,
// Latest global checkpoint of the changefeed
Expand Down Expand Up @@ -68,33 +100,81 @@ func (c *coordinator) poll(
ctx context.Context, checkpointTs model.Ts, currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) error {
recvMsgs, err := c.trans.Recv(ctx)
recvMsgs, err := c.recvMsgs(ctx)
if err != nil {
return errors.Trace(err)
}
sentMsgs, hasInit := c.captureM.poll(aliveCaptures, recvMsgs)
if !hasInit {

sentMsgs := c.captureM.Tick()
msgs := c.captureM.Poll(aliveCaptures, recvMsgs)
sentMsgs = append(sentMsgs, msgs...)
if c.captureM.CheckAllCaptureInitialized() {
// Skip polling replication manager as not all capture are initialized.
err := c.trans.Send(ctx, sentMsgs)
return errors.Trace(err)
}

captureTables := c.captureM.captureTableSets()
// Handling received messages to advance replication set.
msgs, err = c.replicationM.HandleMessage(recvMsgs)
if err != nil {
return errors.Trace(err)
}
sentMsgs = append(sentMsgs, msgs...)

// Generate schedule tasks based on the current status.
captureTables := c.captureM.CaptureTableSets()
allTasks := make([]*scheduleTask, 0)
for _, sched := range c.scheduler {
tasks := sched.Schedule(checkpointTs, currentTables, aliveCaptures, captureTables)
allTasks = append(allTasks, tasks...)
}
msgs, err := c.replicationM.poll(
ctx, checkpointTs, currentTables, aliveCaptures, recvMsgs, allTasks)

// Handling generated schedule tasks.
msgs, err = c.replicationM.HandleTasks(allTasks)
if err != nil {
return errors.Trace(err)
}
sentMsgs = append(sentMsgs, msgs...)
err = c.trans.Send(ctx, sentMsgs)

// Send new messages.
err = c.sendMsgs(ctx, sentMsgs)
if err != nil {
return errors.Trace(err)
}

// checkpoint calcuation
return nil
}

func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) {
recvMsgs, err := c.trans.Recv(ctx)
if err != nil {
return nil, errors.Trace(err)
}

n := 0
for _, val := range recvMsgs {
// Filter stale messages.
if val.Header.OwnerRevision == c.revision {
recvMsgs[n] = val
n++
}
}
return recvMsgs[:n], nil
}

func (c *coordinator) sendMsgs(ctx context.Context, msgs []*schedulepb.Message) error {
for i := range msgs {
m := msgs[i]
m.Header = &schedulepb.Message_Header{
Version: c.version,
OwnerRevision: c.revision,
}
// Correctness check.
if len(m.To) == 0 || m.MsgType == schedulepb.MsgUnknown {
log.Panic("invalid message no destination or unknown message type",
zap.Any("message", m))
}
}
return c.trans.Send(ctx, msgs)
}
76 changes: 76 additions & 0 deletions cdc/scheduler/internal/tp/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,79 @@
// limitations under the License.

package tp

import (
"context"
"testing"

"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/stretchr/testify/require"
)

type mockTrans struct {
send func(ctx context.Context, msgs []*schedulepb.Message) error
recv func(ctx context.Context) ([]*schedulepb.Message, error)
}

func (m *mockTrans) Send(ctx context.Context, msgs []*schedulepb.Message) error {
return m.send(ctx, msgs)
}
func (m *mockTrans) Recv(ctx context.Context) ([]*schedulepb.Message, error) {
return m.recv(ctx)
}

func TestCoordinatorSendMsgs(t *testing.T) {
t.Parallel()
ctx := context.Background()
trans := &mockTrans{}
cood := coordinator{
version: "6.2.0",
revision: schedulepb.OwnerRevision{Revision: 3},
trans: trans,
}
trans.send = func(ctx context.Context, msgs []*schedulepb.Message) error {
require.EqualValues(t, []*schedulepb.Message{{
Header: &schedulepb.Message_Header{
Version: cood.version,
OwnerRevision: cood.revision,
},
To: "1", MsgType: schedulepb.MsgDispatchTableRequest,
}}, msgs)
return nil
}
cood.sendMsgs(
ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}})
}

func TestCoordinatorRecvMsgs(t *testing.T) {
t.Parallel()

ctx := context.Background()
trans := &mockTrans{}
cood := coordinator{
version: "6.2.0",
revision: schedulepb.OwnerRevision{Revision: 3},
trans: trans,
}
trans.recv = func(ctx context.Context) ([]*schedulepb.Message, error) {
return []*schedulepb.Message{{
Header: &schedulepb.Message_Header{
OwnerRevision: cood.revision,
},
From: "1", MsgType: schedulepb.MsgDispatchTableResponse,
}, {
Header: &schedulepb.Message_Header{
OwnerRevision: schedulepb.OwnerRevision{Revision: 4},
},
From: "2", MsgType: schedulepb.MsgDispatchTableResponse,
}}, nil
}
msgs, err := cood.recvMsgs(ctx)
require.Nil(t, err)
require.EqualValues(t, []*schedulepb.Message{{
Header: &schedulepb.Message_Header{
OwnerRevision: cood.revision,
},
From: "1", MsgType: schedulepb.MsgDispatchTableResponse,
}}, msgs)
}
Loading

0 comments on commit c7c6354

Please sign in to comment.