Skip to content

Commit

Permalink
tp: implement capture manager (pingcap#5562)
Browse files Browse the repository at this point in the history
* tp: refine table schedule proto definition
* tp: implement capture status
* tp: implement capture manager
* tp: adjust Transport.Send
* rename TableSchedule to table_schedule

Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Jun 24, 2022
1 parent 912c707 commit 87c4e59
Show file tree
Hide file tree
Showing 18 changed files with 1,235 additions and 1,046 deletions.
6 changes: 3 additions & 3 deletions cdc/scheduler/internal/tp/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ func (a *agent) Close() error {
}

func (a *agent) handleMessage(msg []*schedulepb.Message) {
// s.handleMessageAnnounce()
// s.handleMessageHeartbeat()
// s.handleMessageDispatchTableRequest()
}

func (a *agent) handleMessageAnnounce(msg *schedulepb.Sync) {
// TODO: build s.tables from Sync message.
func (a *agent) handleMessageHeartbeat(msg *schedulepb.Heartbeat) {
// TODO: build s.tables from Heartbeat message.
}

func (a *agent) handleMessageDispatchTableRequest(msg *schedulepb.DispatchTableResponse) {
Expand Down
3 changes: 2 additions & 1 deletion cdc/scheduler/internal/tp/balance_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var _ scheduler = &balancer{}

type balancer struct{}

//nolint:deadcode
func newBalancer() *balancer {
return nil
}
Expand All @@ -31,7 +32,7 @@ func (b *balancer) Schedule(
checkpointTs model.Ts,
currentTables []model.TableID,
captures map[model.CaptureID]*model.CaptureInfo,
captureTables map[model.CaptureID]captureStatus,
captureTables map[model.CaptureID]*CaptureStatus,
) []*scheduleTask {
return nil
}
158 changes: 158 additions & 0 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tp

import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"go.uber.org/zap"
)

// CaptureState is the state of a capture.
//
// ┌──────────────┐ Heartbeat Resp ┌─────────────┐
// │ Uninitialize ├───────────────>│ Initialized │
// └──────┬───────┘ └──────┬──────┘
// │ │
// IsStopping │ ┌──────────┐ │ IsStopping
// └────────> │ Stopping │ <───────┘
// └──────────┘
type CaptureState int

const (
// CaptureStateUninitialize means the capture status is unknown,
// no heartbeat response received yet.
CaptureStateUninitialize CaptureState = 1
// CaptureStateInitialized means owner has received heartbeat response.
CaptureStateInitialized CaptureState = 2
// CaptureStateStopping means the capture is removing, e.g., shutdown.
CaptureStateStopping CaptureState = 3
)

// CaptureStatus represent captrue's status.
type CaptureStatus struct {
OwnerRev schedulepb.OwnerRevision
Epoch schedulepb.ProcessorEpoch
State CaptureState
}

func newCaptureStatus(rev schedulepb.OwnerRevision) *CaptureStatus {
return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialize}
}

func (c *CaptureStatus) handleHeartbeatResponse(
resp *schedulepb.HeartbeatResponse, epoch schedulepb.ProcessorEpoch,
) {
// Check epoch for initialized captures.
if c.State != CaptureStateUninitialize && c.Epoch.Epoch != epoch.Epoch {
log.Warn("tpscheduler: ignore heartbeat response",
zap.String("epoch", c.Epoch.Epoch),
zap.String("respEpoch", epoch.Epoch),
zap.Int64("ownerRev", c.OwnerRev.Revision))
return
}

if c.State == CaptureStateUninitialize {
c.Epoch = epoch
c.State = CaptureStateInitialized
}
if resp.IsStopping {
c.State = CaptureStateStopping
}
}

type captureManager struct {
OwnerRev schedulepb.OwnerRevision
Captures map[model.CaptureID]*CaptureStatus

// A logical clock counter, for heartbeat.
tickCounter int
heartbeatTick int
}

func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *captureManager {
return &captureManager{
OwnerRev: rev,
Captures: make(map[string]*CaptureStatus),
heartbeatTick: heartbeatTick,
}
}

func (c *captureManager) captureTableSets() map[model.CaptureID]*CaptureStatus {
return c.Captures
}

func (c *captureManager) checkAllCaptureInitialized() bool {
for _, captrueStatus := range c.Captures {
if captrueStatus.State == CaptureStateUninitialize {
return false
}
}
return true
}

func (c *captureManager) tick() []*schedulepb.Message {
c.tickCounter++
if c.tickCounter < c.heartbeatTick {
return nil
}
c.tickCounter = 0
msgs := make([]*schedulepb.Message, 0, len(c.Captures))
for to := range c.Captures {
msgs = append(msgs, &schedulepb.Message{
To: to,
MsgType: schedulepb.MsgHeartbeat,
Heartbeat: &schedulepb.Heartbeat{},
})
}
return msgs
}

func (c *captureManager) poll(
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
msgs []*schedulepb.Message,
) ([]*schedulepb.Message, bool) {
outMsgs := c.onAliveCaptureUpdate(aliveCaptures)
for _, msg := range msgs {
if msg.MsgType == schedulepb.MsgHeartbeatResponse {
captureStatus, ok := c.Captures[msg.From]
if !ok {
continue
}
captureStatus.handleHeartbeatResponse(
msg.GetHeartbeatResponse(), msg.Header.ProcessorEpoch)
}
}
return outMsgs, c.checkAllCaptureInitialized()
}

func (c *captureManager) onAliveCaptureUpdate(
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) []*schedulepb.Message {
msgs := make([]*schedulepb.Message, 0)
for id := range aliveCaptures {
if _, ok := c.Captures[id]; !ok {
// A new capture.
c.Captures[id] = newCaptureStatus(c.OwnerRev)
log.Info("tpscheduler: find a new capture", zap.String("newCapture", id))
msgs = append(msgs, &schedulepb.Message{
To: id,
MsgType: schedulepb.MsgHeartbeat,
Heartbeat: &schedulepb.Heartbeat{},
})
}
}
return msgs
}
131 changes: 131 additions & 0 deletions cdc/scheduler/internal/tp/capture_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tp

import (
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/stretchr/testify/require"
)

func TestCaptureStatusHandleHeartbeatResponse(t *testing.T) {
t.Parallel()

rev := schedulepb.OwnerRevision{Revision: 1}
epoch := schedulepb.ProcessorEpoch{Epoch: "test"}
c := newCaptureStatus(rev)
require.Equal(t, CaptureStateUninitialize, c.State)

// Uninitialize -> Initialized
c.handleHeartbeatResponse(&schedulepb.HeartbeatResponse{}, epoch)
require.Equal(t, CaptureStateInitialized, c.State)
require.Equal(t, epoch, c.Epoch)

// Processor epoch mismatch
c.handleHeartbeatResponse(&schedulepb.HeartbeatResponse{
IsStopping: true,
}, schedulepb.ProcessorEpoch{Epoch: "unknown"})
require.Equal(t, CaptureStateInitialized, c.State)

// Initialized -> Stopping
c.handleHeartbeatResponse(&schedulepb.HeartbeatResponse{IsStopping: true}, epoch)
require.Equal(t, CaptureStateStopping, c.State)
require.Equal(t, epoch, c.Epoch)
}

func TestCaptureManagerPoll(t *testing.T) {
t.Parallel()

rev := schedulepb.OwnerRevision{}
ms := map[model.CaptureID]*model.CaptureInfo{
"1": {},
"2": {},
}
cm := newCaptureManager(rev, 2)

// Initial poll for alive captures.
msgs, hasInit := cm.poll(ms, nil)
require.False(t, hasInit)
require.ElementsMatch(t, []*schedulepb.Message{
{To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
{To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
}, msgs)

// Poll one response
msgs, hasInit = cm.poll(ms, []*schedulepb.Message{
{
Header: &schedulepb.Message_Header{}, From: "1",
MsgType: schedulepb.MsgHeartbeatResponse,
HeartbeatResponse: &schedulepb.HeartbeatResponse{},
},
})
require.False(t, hasInit)
require.Empty(t, msgs)

// Poll another response
msgs, hasInit = cm.poll(ms, []*schedulepb.Message{
{
Header: &schedulepb.Message_Header{}, From: "2",
MsgType: schedulepb.MsgHeartbeatResponse,
HeartbeatResponse: &schedulepb.HeartbeatResponse{},
},
})
require.True(t, hasInit, "%v %v", cm.Captures["1"], cm.Captures["2"])
require.Empty(t, msgs)
}

func TestCaptureManagerTick(t *testing.T) {
t.Parallel()

rev := schedulepb.OwnerRevision{}
cm := newCaptureManager(rev, 2)

// No heartbeat if there is no capture.
msgs := cm.tick()
require.Empty(t, msgs)
msgs = cm.tick()
require.Empty(t, msgs)

ms := map[model.CaptureID]*model.CaptureInfo{
"1": {},
"2": {},
}
_, hasInit := cm.poll(ms, nil)
require.False(t, hasInit)

// Heartbeat even if capture is uninitialize.
msgs = cm.tick()
require.Empty(t, msgs)
msgs = cm.tick()
require.ElementsMatch(t, []*schedulepb.Message{
{To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
{To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
}, msgs)

// Heartbeat even if capture is initialized or stopping.
for _, s := range []CaptureState{CaptureStateInitialized, CaptureStateStopping} {
cm.Captures["1"].State = s
cm.Captures["2"].State = s
require.True(t, cm.checkAllCaptureInitialized())
msgs = cm.tick()
require.Empty(t, msgs)
msgs = cm.tick()
require.ElementsMatch(t, []*schedulepb.Message{
{To: "1", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
{To: "2", MsgType: schedulepb.MsgHeartbeat, Heartbeat: &schedulepb.Heartbeat{}},
}, msgs)
}
}
45 changes: 0 additions & 45 deletions cdc/scheduler/internal/tp/capture_status.go

This file was deleted.

Loading

0 comments on commit 87c4e59

Please sign in to comment.