Skip to content

Commit

Permalink
cdc: skip owner resign when there is only one capture
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Jun 30, 2022
1 parent c21b7fa commit fa39335
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 1 deletion.
20 changes: 19 additions & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,23 @@ func (c *captureImpl) drainImpl(ctx context.Context) bool {
// Step 1, resign ownership.
o, _ := c.GetOwner()
if o != nil {
doneCh := make(chan error, 1)
query := &owner.Query{Tp: owner.QueryCaptures, Data: []*model.CaptureInfo{}}
o.Query(query, doneCh)
select {
case <-ctx.Done():
case err := <-doneCh:
if err != nil {
log.Warn("query capture count failed, retry", zap.Error(err))
return false
}
}
if len(query.Data.([]*model.CaptureInfo)) <= 1 {
// There is only one capture, the owner itself. It's impossible to
// resign owner nor move out tables, give up drain.
log.Warn("there is only one capture, skip drain")
return true
}
o.AsyncStop()
// Make sure it's not the owner before step 2.
return false
Expand All @@ -615,7 +632,8 @@ func (c *captureImpl) drainImpl(ctx context.Context) bool {
case <-ctx.Done():
case err := <-queryDone:
if err != nil {
log.Warn("query table count failed", zap.Error(err))
log.Warn("query table count failed, retry", zap.Error(err))
return false
}
}
select {
Expand Down
102 changes: 102 additions & 0 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ package capture

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
mock_owner "github.com/pingcap/tiflow/cdc/owner/mock"
mock_processor "github.com/pingcap/tiflow/cdc/processor/mock"
"github.com/pingcap/tiflow/pkg/etcd"
Expand Down Expand Up @@ -156,6 +158,13 @@ func TestDrainWaitsOwnerResign(t *testing.T) {
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

ownerStopCh := make(chan struct{}, 1)
mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func(
query *owner.Query, done chan<- error,
) {
// Two captures to allow owner resign.
query.Data = []*model.CaptureInfo{{}, {}}
close(done)
}).AnyTimes()
mo.EXPECT().AsyncStop().Do(func() {
select {
case ownerStopCh <- struct{}{}:
Expand Down Expand Up @@ -190,3 +199,96 @@ func TestDrainWaitsOwnerResign(t *testing.T) {
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())
}
}

func TestDrainOneCapture(t *testing.T) {
t.Parallel()

ctx := context.Background()
ctrl := gomock.NewController(t)
mo := mock_owner.NewMockOwner(ctrl)
mm := mock_processor.NewMockManager(ctrl)
cp := &captureImpl{processorManager: mm, owner: mo}
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func(
query *owner.Query, done chan<- error,
) {
// Only one capture, skip drain.
query.Data = []*model.CaptureInfo{{}}
close(done)
}).AnyTimes()

done := cp.Drain(ctx)

select {
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
case <-done:
}
}

func TestDrainErrors(t *testing.T) {
t.Parallel()

ctx := context.Background()
ctrl := gomock.NewController(t)
mo := mock_owner.NewMockOwner(ctrl)
mm := mock_processor.NewMockManager(ctrl)
cp := &captureImpl{processorManager: mm, owner: mo}
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

errQueryCall := mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func(
query *owner.Query, done chan<- error,
) {
done <- fmt.Errorf("test")
close(done)
})
ownerStopCh := make(chan struct{}, 1)
okQueryCall := mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func(
query *owner.Query, done chan<- error,
) {
// Two captures to allow owner resign.
query.Data = []*model.CaptureInfo{{}, {}}
close(done)
}).AnyTimes().After(errQueryCall)
mo.EXPECT().AsyncStop().Do(func() {
select {
case ownerStopCh <- struct{}{}:
default:
}
}).AnyTimes().After(okQueryCall)

errTableCall := mm.EXPECT().
QueryTableCount(gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, tableCh chan int, done chan<- error) {
done <- fmt.Errorf("test")
close(done)
}).After(okQueryCall)
mm.EXPECT().
QueryTableCount(gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, tableCh chan int, done chan<- error) {
tableCh <- 0
close(done)
}).After(errTableCall)

done := cp.Drain(ctx)

// Must wait owner resign by wait for async close.
select {
case <-ownerStopCh:
// Simulate owner has resigned.
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())
cp.setOwner(nil)
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
case <-done:
require.Fail(t, "unexpected")
}

select {
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
case <-done:
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())
}
}
2 changes: 2 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ func (o *ownerImpl) Query(query *Query, done chan<- error) {
func (o *ownerImpl) AsyncStop() {
atomic.StoreInt32(&o.closed, 1)
o.cleanStaleMetrics()

// FIXME: cleanup ownerJobQueue.
}

func (o *ownerImpl) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) {
Expand Down

0 comments on commit fa39335

Please sign in to comment.