Skip to content

Commit

Permalink
tp: add info provider (#5844)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus authored and 3AceShowHand committed Jun 14, 2022
1 parent f6b9874 commit 404269b
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 0 deletions.
9 changes: 9 additions & 0 deletions cdc/scheduler/internal/info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@ type InfoProvider interface {
GetTaskStatuses() (map[model.CaptureID]*model.TaskStatus, error)

// GetTaskPositions returns the task positions.
//
// TODO: Remove the API, as capture no longer has local checkpoint ts and
// local resolved ts in tpscheduler.
GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error)

// GetTotalTableCounts returns the number of tables associated
// with each capture.
//
// TODO: Remove the API, as the only usage is metrics which
// has already exported in the scheduler package.
GetTotalTableCounts() map[model.CaptureID]int

// GetPendingTableCounts returns the number of tables in a non-ready
// status (Adding & Removing) associated with each capture.
//
// TODO: Remove the API, as the only usage is metrics which
// has already exported in the scheduler package.
GetPendingTableCounts() map[model.CaptureID]int
}
17 changes: 17 additions & 0 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tp

import (
"context"
"sync"
"sync/atomic"
"time"

Expand All @@ -37,6 +38,10 @@ const (
var _ internal.Scheduler = (*coordinator)(nil)

type coordinator struct {
// A mutex for concurrent access of coordinator in
// internal.Scheduler and internal.InfoProvider API.
mu sync.Mutex

version string
revision schedulepb.OwnerRevision
captureID model.CaptureID
Expand Down Expand Up @@ -105,10 +110,16 @@ func (c *coordinator) Tick(
// All captures that are alive according to the latest Etcd states.
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
c.mu.Lock()
defer c.mu.Unlock()

return c.poll(ctx, checkpointTs, currentTables, aliveCaptures)
}

func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {
c.mu.Lock()
defer c.mu.Unlock()

if !c.captureM.CheckAllCaptureInitialized() {
log.Info("tpscheduler: manual move table task ignored, "+
"since not all captures initialized",
Expand All @@ -133,6 +144,9 @@ func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {
}

func (c *coordinator) Rebalance() {
c.mu.Lock()
defer c.mu.Unlock()

if !c.captureM.CheckAllCaptureInitialized() {
log.Info("tpscheduler: manual rebalance task ignored, " +
"since not all captures initialized")
Expand All @@ -150,6 +164,9 @@ func (c *coordinator) Rebalance() {
}

func (c *coordinator) Close(ctx context.Context) {
c.mu.Lock()
defer c.mu.Unlock()

_ = c.trans.Close()
}

Expand Down
73 changes: 73 additions & 0 deletions cdc/scheduler/internal/tp/info_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tp

import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler/internal"
)

var _ internal.InfoProvider = (*coordinator)(nil)

// GetTaskStatuses returns the task statuses.
func (c *coordinator) GetTaskStatuses() (map[model.CaptureID]*model.TaskStatus, error) {
c.mu.Lock()
defer c.mu.Unlock()

tasks := make(map[model.CaptureID]*model.TaskStatus, len(c.captureM.Captures))
for captureID, status := range c.captureM.Captures {
taskStatus := &model.TaskStatus{
Tables: make(map[model.TableID]*model.TableReplicaInfo),
}
for _, s := range status.Tables {
taskStatus.Tables[s.TableID] = &model.TableReplicaInfo{
StartTs: s.Checkpoint.CheckpointTs,
}
}
tasks[captureID] = taskStatus
}
return tasks, nil
}

// GetTaskPositions returns the task positions.
func (c *coordinator) GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error) {
c.mu.Lock()
defer c.mu.Unlock()

p := &model.TaskPosition{}
pos := make(map[model.CaptureID]*model.TaskPosition, len(c.captureM.Captures))
for captureID := range c.captureM.Captures {
pos[captureID] = p
}
return pos, nil
}

// GetTotalTableCounts returns the number of tables associated
// with each capture.
func (c *coordinator) GetTotalTableCounts() map[model.CaptureID]int {
c.mu.Lock()
defer c.mu.Unlock()

tables := make(map[model.CaptureID]int, len(c.captureM.Captures))
for captureID, status := range c.captureM.Captures {
tables[captureID] = len(status.Tables)
}
return tables
}

// GetPendingTableCounts returns the number of tables in a non-ready
// status (Adding & Removing) associated with each capture.
func (c *coordinator) GetPendingTableCounts() map[model.CaptureID]int {
return make(map[model.CaptureID]int)
}
61 changes: 61 additions & 0 deletions cdc/scheduler/internal/tp/info_provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tp

import (
"math"
"testing"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler/internal"
"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
"github.com/stretchr/testify/require"
)

func TestInfoProvider(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
coord.captureM.Captures = map[model.CaptureID]*CaptureStatus{
"a": {Tables: []schedulepb.TableStatus{{
TableID: 1,
Checkpoint: schedulepb.Checkpoint{CheckpointTs: 1},
}, {
TableID: 2,
Checkpoint: schedulepb.Checkpoint{CheckpointTs: 1},
}}},
"b": {},
}

// Smoke testing
var ip internal.InfoProvider = coord
tasks, err := ip.GetTaskStatuses()
require.Nil(t, err)
require.EqualValues(t, map[model.CaptureID]*model.TaskStatus{
"a": {Tables: map[model.TableID]*model.TableReplicaInfo{
1: {StartTs: 1},
2: {StartTs: 1},
}},
"b": {Tables: map[model.TableID]*model.TableReplicaInfo{}},
}, tasks)
pos, err := ip.GetTaskPositions()
require.Nil(t, err)
require.Len(t, pos, 2)
require.Len(t, ip.GetTotalTableCounts(), 2)
require.Empty(t, ip.GetPendingTableCounts())
}

0 comments on commit 404269b

Please sign in to comment.