Skip to content

Commit

Permalink
Merge branch 'master' into fix-orphan-load-task
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Dec 23, 2021
2 parents 140b7fa + c2b9e85 commit 973f6bc
Show file tree
Hide file tree
Showing 116 changed files with 3,296 additions and 952 deletions.
10 changes: 10 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ func (r *RowChangedEvent) IsDelete() bool {
return len(r.PreColumns) != 0 && len(r.Columns) == 0
}

// IsInsert returns true if the row is an insert event
func (r *RowChangedEvent) IsInsert() bool {
return len(r.PreColumns) == 0 && len(r.Columns) != 0
}

// IsUpdate returns true if the row is an update event
func (r *RowChangedEvent) IsUpdate() bool {
return len(r.PreColumns) != 0 && len(r.Columns) != 0
}

// PrimaryKeyColumns returns the column(s) corresponding to the handle key(s)
func (r *RowChangedEvent) PrimaryKeyColumns() []*Column {
pkeyCols := make([]*Column, 0)
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) {
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(c, state, map[string]string{
"/tidb/cdc/capture/d563bfc0-f406-4f34-bc7d-6dc2e35a44e5": `{"id":"d563bfc0-f406-4f34-bc7d-6dc2e35a44e5","address":"172.16.6.147:8300","version":"v5.0.0-master-dirty"}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole:///","opts":{},"create-time":"2021-06-05T00:44:15.065939487+08:00","start-ts":425381670108266496,"target-ts":0,"admin-job-type":1,"sort-engine":"unified","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"failed","history":[],"error":{"addr":"172.16.6.147:8300","code":"CDC:ErrSnapshotLostByGC","message":"[CDC:ErrSnapshotLostByGC]fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts 425381670108266496 is earlier than GC safepoint at 0"},"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v5.0.0-master-dirty"}`,
"/tidb/cdc/owner/156579d017f84a68": "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5",
})
manager.Tick(state)
Expand Down
62 changes: 41 additions & 21 deletions cdc/owner/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,26 +127,20 @@ func (s *schedulerV2) DispatchTable(
captureID model.CaptureID,
isDelete bool,
) (done bool, err error) {
client, ok := s.GetClient(ctx, captureID)
if !ok {
return false, nil
}

topic := model.DispatchTableTopic(changeFeedID)
message := &model.DispatchTableMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
ID: tableID,
IsDelete: isDelete,
}

_, err = client.TrySendMessage(ctx, topic, message)
ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
log.Warn("scheduler: send message failed, retry later", zap.Error(err))
return false, nil
}
return false, errors.Trace(err)
}
if !ok {
return false, nil
}

s.stats.RecordDispatch()
log.Debug("send message successfully",
Expand All @@ -161,25 +155,19 @@ func (s *schedulerV2) Announce(
changeFeedID model.ChangeFeedID,
captureID model.CaptureID,
) (bool, error) {
client, ok := s.GetClient(ctx, captureID)
if !ok {
return false, nil
}

topic := model.AnnounceTopic(changeFeedID)
message := &model.AnnounceMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
OwnerVersion: version.ReleaseSemver(),
}

_, err := client.TrySendMessage(ctx, topic, message)
ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
log.Warn("scheduler: send message failed, retry later", zap.Error(err))
return false, nil
}
return false, errors.Trace(err)
}
if !ok {
return false, nil
}

s.stats.RecordAnnounce()
log.Debug("send message successfully",
Expand All @@ -189,7 +177,7 @@ func (s *schedulerV2) Announce(
return true, nil
}

func (s *schedulerV2) GetClient(ctx context.Context, target model.CaptureID) (*p2p.MessageClient, bool) {
func (s *schedulerV2) getClient(target model.CaptureID) (*p2p.MessageClient, bool) {
client := s.messageRouter.GetClient(target)
if client == nil {
log.Warn("scheduler: no message client found, retry later",
Expand All @@ -199,6 +187,38 @@ func (s *schedulerV2) GetClient(ctx context.Context, target model.CaptureID) (*p
return client, true
}

func (s *schedulerV2) trySendMessage(
ctx context.Context,
target model.CaptureID,
topic p2p.Topic,
value interface{},
) (bool, error) {
// TODO (zixiong): abstract this function out together with the similar method in cdc/processor/agent.go
// We probably need more advanced logic to handle and mitigate complex failure situations.

client, ok := s.getClient(target)
if !ok {
return false, nil
}

_, err := client.TrySendMessage(ctx, topic, value)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
return false, nil
}
if cerror.ErrPeerMessageClientClosed.Equal(err) {
log.Warn("peer messaging client is closed while trying to send a message through it. "+
"Report a bug if this warning repeats",
zap.String("changefeed-id", s.changeFeedID),
zap.String("target", target))
return false, nil
}
return false, errors.Trace(err)
}

return true, nil
}

func (s *schedulerV2) Close(ctx context.Context) {
log.Debug("scheduler closed", zap.String("changefeed-id", s.changeFeedID))
s.deregisterPeerMessageHandlers(ctx)
Expand Down
5 changes: 5 additions & 0 deletions cdc/owner/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func TestSchedulerBasics(t *testing.T) {
_ = failpoint.Disable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectSendMessageTryAgain")
}()

_ = failpoint.Enable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed", "5*return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed")
}()

stdCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand Down
5 changes: 5 additions & 0 deletions cdc/owner/scheduler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ func (s *oldScheduler) handleJobs(jobs []*schedulerJob) {
func (s *oldScheduler) cleanUpFinishedOperations() {
for captureID := range s.state.TaskStatuses {
s.state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
if status == nil {
log.Warn("task status of the capture is not found, may be the key in etcd was deleted", zap.String("captureID", captureID), zap.String("changeFeedID", s.state.ID))
return status, false, nil
}

changed := false
for tableID, operation := range status.Operation {
if operation.Status == model.OperFinished {
Expand Down
19 changes: 18 additions & 1 deletion cdc/owner/scheduler_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/util/testleak"
)
Expand Down Expand Up @@ -83,8 +84,24 @@ func (s *schedulerSuite) finishTableOperation(captureID model.CaptureID, tableID

func (s *schedulerSuite) TestScheduleOneCapture(c *check.C) {
defer testleak.AfterTest(c)()

s.reset(c)
captureID := "test-capture-0"
s.addCapture(captureID)

_, _ = s.scheduler.Tick(s.state, []model.TableID{}, s.captures)

// Manually simulate the scenario where the corresponding key was deleted in the etcd
key := &etcd.CDCKey{
Tp: etcd.CDCKeyTypeTaskStatus,
CaptureID: captureID,
ChangefeedID: s.state.ID,
}
s.tester.MustUpdate(key.String(), nil)
s.tester.MustApplyPatches()

s.reset(c)
captureID := "test-capture-1"
captureID = "test-capture-1"
s.addCapture(captureID)

// add three tables
Expand Down
10 changes: 10 additions & 0 deletions cdc/processor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ func (a *agentImpl) trySendMessage(
topic p2p.Topic,
value interface{},
) (bool, error) {
// TODO (zixiong): abstract this function out together with the similar method in cdc/owner/scheduler.go
// We probably need more advanced logic to handle and mitigate complex failure situations.

client := a.messageRouter.GetClient(target)
if client == nil {
a.printNoClientWarning(target)
Expand All @@ -299,6 +302,13 @@ func (a *agentImpl) trySendMessage(
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
return false, nil
}
if cerror.ErrPeerMessageClientClosed.Equal(err) {
log.Warn("peer messaging client is closed while trying to send a message through it. "+
"Report a bug if this warning repeats",
zap.String("changefeed-id", a.changeFeed),
zap.String("target", target))
return false, nil
}
return false, errors.Trace(err)
}

Expand Down
43 changes: 43 additions & 0 deletions cdc/processor/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/cdc/model"
pscheduler "github.com/pingcap/tiflow/cdc/scheduler"
cdcContext "github.com/pingcap/tiflow/pkg/context"
Expand Down Expand Up @@ -334,3 +335,45 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {
err = agent.Close()
require.NoError(t, err)
}

func TestAgentTolerateClientClosed(t *testing.T) {
suite := newAgentTestSuite(t)
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything, etcd.CaptureOwnerKey, mock.Anything).Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
},
}, nil).Once()

// Test Point 1: Create an agent.
agent, err := suite.CreateAgent(t)
require.NoError(t, err)

_ = failpoint.Enable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed", "5*return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed")
}()

// Test Point 2: We should tolerate the error ErrPeerMessageClientClosed
for i := 0; i < 6; i++ {
err = agent.Tick(suite.cdcCtx)
require.NoError(t, err)
}

select {
case <-suite.ctx.Done():
require.Fail(t, "context should not be canceled")
case syncMsg := <-suite.syncCh:
require.Equal(t, &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Running: nil,
Adding: nil,
Removing: nil,
}, syncMsg)
}
}
3 changes: 2 additions & 1 deletion cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func newSorterNode(
flowController: flowController,
mounter: mounter,
resolvedTs: startTs,
barrierTs: startTs,
replConfig: replConfig,
}
}
Expand All @@ -101,7 +102,7 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
startTs := ctx.ChangefeedVars().Info.StartTs
actorID := ctx.GlobalVars().SorterSystem.ActorID(uint64(n.tableID))
router := ctx.GlobalVars().SorterSystem.Router()
levelSorter := leveldb.NewLevelDBSorter(ctx, n.tableID, startTs, router, actorID)
levelSorter := leveldb.NewSorter(ctx, n.tableID, startTs, router, actorID)
n.cleanID = actorID
n.cleanTask = levelSorter.CleanupTask()
n.cleanRouter = ctx.GlobalVars().SorterSystem.CleanerRouter()
Expand Down
15 changes: 13 additions & 2 deletions cdc/processor/pipeline/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,31 @@ func (c *checkSorter) Output() <-chan *model.PolymorphicEvent {

func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) {
t.Parallel()
s := &checkSorter{ch: make(chan *model.PolymorphicEvent, 1)}
sch := make(chan *model.PolymorphicEvent, 1)
s := &checkSorter{ch: sch}
sn := newSorterNode("tableName", 1, 1, nil, nil, &config.ReplicaConfig{
Consistent: &config.ConsistentConfig{},
})
sn.sorter = s

ch := make(chan pipeline.Message, 1)
require.EqualValues(t, 1, sn.ResolvedTs())

// Resolved ts must not regress even if there is no barrier ts message.
resolvedTs1 := pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, 1))
nctx := pipeline.NewNodeContext(
cdcContext.NewContext(context.Background(), nil), resolvedTs1, ch)
err := sn.Receive(nctx)
require.Nil(t, err)
require.EqualValues(t, model.NewResolvedPolymorphicEvent(0, 1), <-sch)

// Advance barrier ts.
nctx = pipeline.NewNodeContext(
cdcContext.NewContext(context.Background(), nil),
pipeline.BarrierMessage(2),
ch,
)
err := sn.Receive(nctx)
err = sn.Receive(nctx)
require.Nil(t, err)
require.EqualValues(t, 2, sn.barrierTs)
// Barrier message must be passed to the next node.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
}
markTableID = tableInfo.ID
return nil
}, retry.WithBackoffMaxDelay(50), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))
}, retry.WithBackoffBaseDelay(50), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func initProcessor4Test(ctx cdcContext.Context, c *check.C) (*processor, *orches
p.changefeed = orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
return p, orchestrator.NewReactorStateTester(c, p.changefeed, map[string]string{
"/tidb/cdc/capture/" + ctx.GlobalVars().CaptureInfo.ID: `{"id":"` + ctx.GlobalVars().CaptureInfo.ID + `","address":"127.0.0.1:8300"}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":0,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
"/tidb/cdc/changefeed/info/" + ctx.ChangefeedVars().ID: `{"sink-uri":"blackhole://","opts":{},"create-time":"2020-02-02T00:00:00.000000+00:00","start-ts":0,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value":false,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"open-protocol"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}`,
"/tidb/cdc/job/" + ctx.ChangefeedVars().ID: `{"resolved-ts":0,"checkpoint-ts":0,"admin-job-type":0}`,
"/tidb/cdc/task/status/" + ctx.GlobalVars().CaptureInfo.ID + "/" + ctx.ChangefeedVars().ID: `{"tables":{},"operation":null,"admin-job-type":0}`,
})
Expand Down
Loading

0 comments on commit 973f6bc

Please sign in to comment.