Skip to content

Commit

Permalink
fb/latency(cdc): agent to be table state awared, to handle different …
Browse files Browse the repository at this point in the history
…p2p messages. (pingcap#5820)

* add table struct to agent.

* agent add table state machine.

* simplify coordinator.

* agent to be state awared.

* call IsRemoveTableFinished to clean table resource from processor.

* fix message header.

* fix agent.

* prepare new agent ready.

* fix some test.

* add basic ut.

* fix agent handle message ut.

* add all test.

* fix agent handle stopping.

* adjust pipeline table state.

* refine the agent.

* introduce tableManager.

* fix all tests.

* add some new test.

* fix log.

* fix by make check

* fix by review comment.

* fix by review comment.

* fix ut.

* fix check.

* agent fix heartbeat does not refresh each tick.

* remove scheduler log.

* fix ut.

* fix some test.

* fix ut.

* rename

* fix by check.
  • Loading branch information
3AceShowHand authored and overvenus committed Jun 21, 2022
1 parent f587147 commit 29a7888
Show file tree
Hide file tree
Showing 12 changed files with 861 additions and 499 deletions.
18 changes: 11 additions & 7 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,31 @@ type TableState int32

// TableState for table pipeline
const (
TableStateUnknown TableState = iota
// TableStateAbsent means the table not found
TableStateAbsent
// TableStatePreparing indicate that the table is preparing connecting to regions
TableStatePreparing TableState = iota
TableStatePreparing
// TableStatePrepared means the first `Resolved Ts` is received.
TableStatePrepared
// TableStateReplicating means that sink is consuming data from the sorter, and replicating it to downstream
// TableStateReplicating means that sink is consuming data from the sorter,
// and replicating it to downstream
TableStateReplicating
// TableStateStopping means the table is stopping, but not guaranteed yet.
// at the moment, this state is not used, only keep aligned with `schedulepb.TableStateStopping`
TableStateStopping
// TableStateStopped means sink stop all works.
// TableStateStopped means sink stop all works, but the table resource not released yet.
TableStateStopped
// TableStateAbsent means the table not found
TableStateAbsent
)

var tableStatusStringMap = map[TableState]string{
TableStateUnknown: "Unknown",
TableStateAbsent: "Absent",
TableStatePreparing: "Preparing",
TableStatePrepared: "Prepared",
TableStateReplicating: "Replicating",
TableStateStopping: "Stopping",
TableStateStopped: "Stopped",
TableStateAbsent: "Absent",
}

func (s TableState) String() string {
Expand Down Expand Up @@ -92,7 +96,7 @@ type TablePipeline interface {
AsyncStop(targetTs model.Ts) bool

// Start the sink consume data from the given `ts`
Start(ts model.Ts) bool
Start(ts model.Ts)

// Workload returns the workload of this table
Workload() model.WorkloadInfo
Expand Down
4 changes: 1 addition & 3 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,13 +481,11 @@ func (t *tableActor) MemoryConsumption() uint64 {
return t.sortNode.flowController.GetConsumption()
}

func (t *tableActor) Start(ts model.Ts) bool {
func (t *tableActor) Start(ts model.Ts) {
if atomic.CompareAndSwapInt32(&t.sortNode.started, 0, 1) {
t.sortNode.startTsCh <- ts
close(t.sortNode.startTsCh)
return true
}
return false
}

// for ut
Expand Down
9 changes: 4 additions & 5 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,15 @@ func (m *mockTablePipeline) Wait() {
// do nothing
}

func (m *mockTablePipeline) Start(ts model.Ts) {
m.sinkStartTs = ts
}

// MemoryConsumption return the memory consumption in bytes
func (m *mockTablePipeline) MemoryConsumption() uint64 {
return 0
}

func (m *mockTablePipeline) Start(ts model.Ts) bool {
m.sinkStartTs = ts
return true
}

type mockSchemaStorage struct {
// dummy to provide default versions of unimplemented interface methods,
// as we only need ResolvedTs() and DoGC() in unit tests.
Expand Down
Loading

0 comments on commit 29a7888

Please sign in to comment.