Skip to content

Commit

Permalink
cdc: skip owner resign when there is only one capture
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Jun 30, 2022
1 parent ea9444e commit 6252b8c
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 6 deletions.
30 changes: 27 additions & 3 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ type captureImpl struct {
captureMu sync.Mutex
info *model.CaptureInfo
processorManager processor.Manager

liveness model.Liveness
liveness model.Liveness
config *config.ServerConfig

pdEndpoints []string
UpstreamManager *upstream.Manager
Expand Down Expand Up @@ -111,6 +111,7 @@ type captureImpl struct {
// NewCapture returns a new Capture instance
func NewCapture(pdEnpoints []string, etcdClient *etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper) Capture {
return &captureImpl{
config: config.GetGlobalServerConfig(),
liveness: model.LivenessCaptureAlive,
EtcdClient: etcdClient,
grpcService: grpcService,
Expand Down Expand Up @@ -598,9 +599,31 @@ func (c *captureImpl) Drain(ctx context.Context) <-chan struct{} {
}

func (c *captureImpl) drainImpl(ctx context.Context) bool {
if !c.config.Debug.EnableTwoPhaseScheduler {
// Skip drain as two phase scheduler is disabled.
return true
}

// Step 1, resign ownership.
o, _ := c.GetOwner()
if o != nil {
doneCh := make(chan error, 1)
query := &owner.Query{Tp: owner.QueryCaptures, Data: []*model.CaptureInfo{}}
o.Query(query, doneCh)
select {
case <-ctx.Done():
case err := <-doneCh:
if err != nil {
log.Warn("query capture count failed, retry", zap.Error(err))
return false
}
}
if len(query.Data.([]*model.CaptureInfo)) <= 1 {
// There is only one capture, the owner itself. It's impossible to
// resign owner nor move out tables, give up drain.
log.Warn("there is only one capture, skip drain")
return true
}
o.AsyncStop()
// Make sure it's not the owner before step 2.
return false
Expand All @@ -615,7 +638,8 @@ func (c *captureImpl) drainImpl(ctx context.Context) bool {
case <-ctx.Done():
case err := <-queryDone:
if err != nil {
log.Warn("query table count failed", zap.Error(err))
log.Warn("query table count failed, retry", zap.Error(err))
return false
}
}
select {
Expand Down
114 changes: 111 additions & 3 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ package capture

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
mock_owner "github.com/pingcap/tiflow/cdc/owner/mock"
mock_processor "github.com/pingcap/tiflow/cdc/processor/mock"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/pkg/v3/logutil"
Expand Down Expand Up @@ -84,7 +87,8 @@ func TestDrainImmediately(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
mm := mock_processor.NewMockManager(ctrl)
cp := &captureImpl{processorManager: mm}
cp := &captureImpl{processorManager: mm, config: config.GetDefaultServerConfig()}
cp.config.Debug.EnableTwoPhaseScheduler = true
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

// Drain completes immediately.
Expand All @@ -109,7 +113,8 @@ func TestDrainWaitsTables(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
mm := mock_processor.NewMockManager(ctrl)
cp := &captureImpl{processorManager: mm}
cp := &captureImpl{processorManager: mm, config: config.GetDefaultServerConfig()}
cp.config.Debug.EnableTwoPhaseScheduler = true
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

// Drain waits for moving out all tables.
Expand Down Expand Up @@ -152,10 +157,18 @@ func TestDrainWaitsOwnerResign(t *testing.T) {
ctrl := gomock.NewController(t)
mo := mock_owner.NewMockOwner(ctrl)
mm := mock_processor.NewMockManager(ctrl)
cp := &captureImpl{processorManager: mm, owner: mo}
cp := &captureImpl{processorManager: mm, owner: mo, config: config.GetDefaultServerConfig()}
cp.config.Debug.EnableTwoPhaseScheduler = true
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

ownerStopCh := make(chan struct{}, 1)
mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func(
query *owner.Query, done chan<- error,
) {
// Two captures to allow owner resign.
query.Data = []*model.CaptureInfo{{}, {}}
close(done)
}).AnyTimes()
mo.EXPECT().AsyncStop().Do(func() {
select {
case ownerStopCh <- struct{}{}:
Expand Down Expand Up @@ -190,3 +203,98 @@ func TestDrainWaitsOwnerResign(t *testing.T) {
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())
}
}

func TestDrainOneCapture(t *testing.T) {
t.Parallel()

ctx := context.Background()
ctrl := gomock.NewController(t)
mo := mock_owner.NewMockOwner(ctrl)
mm := mock_processor.NewMockManager(ctrl)
cp := &captureImpl{processorManager: mm, owner: mo, config: config.GetDefaultServerConfig()}
cp.config.Debug.EnableTwoPhaseScheduler = true
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func(
query *owner.Query, done chan<- error,
) {
// Only one capture, skip drain.
query.Data = []*model.CaptureInfo{{}}
close(done)
}).AnyTimes()

done := cp.Drain(ctx)

select {
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
case <-done:
}
}

func TestDrainErrors(t *testing.T) {
t.Parallel()

ctx := context.Background()
ctrl := gomock.NewController(t)
mo := mock_owner.NewMockOwner(ctrl)
mm := mock_processor.NewMockManager(ctrl)
cp := &captureImpl{processorManager: mm, owner: mo, config: config.GetDefaultServerConfig()}
cp.config.Debug.EnableTwoPhaseScheduler = true
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

errQueryCall := mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func(
query *owner.Query, done chan<- error,
) {
done <- fmt.Errorf("test")
close(done)
})
ownerStopCh := make(chan struct{}, 1)
okQueryCall := mo.EXPECT().Query(gomock.Any(), gomock.Any()).Do(func(
query *owner.Query, done chan<- error,
) {
// Two captures to allow owner resign.
query.Data = []*model.CaptureInfo{{}, {}}
close(done)
}).AnyTimes().After(errQueryCall)
mo.EXPECT().AsyncStop().Do(func() {
select {
case ownerStopCh <- struct{}{}:
default:
}
}).AnyTimes().After(okQueryCall)

errTableCall := mm.EXPECT().
QueryTableCount(gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, tableCh chan int, done chan<- error) {
done <- fmt.Errorf("test")
close(done)
}).After(okQueryCall)
mm.EXPECT().
QueryTableCount(gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, tableCh chan int, done chan<- error) {
tableCh <- 0
close(done)
}).After(errTableCall)

done := cp.Drain(ctx)

// Must wait owner resign by wait for async close.
select {
case <-ownerStopCh:
// Simulate owner has resigned.
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())
cp.setOwner(nil)
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
case <-done:
require.Fail(t, "unexpected")
}

select {
case <-time.After(3 * time.Second):
require.Fail(t, "timeout")
case <-done:
require.Equal(t, model.LivenessCaptureStopping, cp.Liveness())
}
}
2 changes: 2 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ func (o *ownerImpl) Query(query *Query, done chan<- error) {
func (o *ownerImpl) AsyncStop() {
atomic.StoreInt32(&o.closed, 1)
o.cleanStaleMetrics()

// FIXME: cleanup ownerJobQueue.
}

func (o *ownerImpl) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) {
Expand Down

0 comments on commit 6252b8c

Please sign in to comment.