From cc17691950e46b5d1ccfcfea0e484612fb1d1b8e Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 28 Jun 2022 22:34:45 +0800 Subject: [PATCH] cdc: remove verbose log and set liveness stop after resign owner Signed-off-by: Neil Shen --- cdc/capture/capture.go | 7 +++++-- cdc/capture/capture_test.go | 7 ++++--- .../internal/tp/scheduler_drain_capture.go | 2 +- .../tp/scheduler_drain_capture_test.go | 19 +++++++++++++++++++ pkg/cmd/server/server.go | 1 - 5 files changed, 29 insertions(+), 7 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 2829ba59dae..3a973d106f6 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -570,8 +570,6 @@ func (c *captureImpl) AsyncClose() { // Drain removes tables in the current TiCDC instance. func (c *captureImpl) Drain(ctx context.Context) <-chan struct{} { - // Set liveness stopping, owners will move out all tables in the capture. - c.liveness.Store(model.LivenessCaptureStopping) log.Info("draining capture, removing all tables in the capture") const drainInterval = 100 * time.Millisecond @@ -588,6 +586,9 @@ func (c *captureImpl) Drain(ctx context.Context) <-chan struct{} { ticker.Reset(drainInterval) select { case <-ctx.Done(): + // Give up when the context cancels. In the current + // implementation, it is caused TiCDC receives a second signal + // and begins force shutdown. return case <-ticker.C: } @@ -605,6 +606,8 @@ func (c *captureImpl) drainImpl(ctx context.Context) bool { return false } // Step 2, wait for moving out all tables. + // Set liveness stopping, owners will move out all tables in the capture. + c.liveness.Store(model.LivenessCaptureStopping) queryDone := make(chan error, 1) tableCh := make(chan int, 1) c.processorManager.QueryTableCount(ctx, tableCh, queryDone) diff --git a/cdc/capture/capture_test.go b/cdc/capture/capture_test.go index 6d4af2ba013..368f7096a07 100644 --- a/cdc/capture/capture_test.go +++ b/cdc/capture/capture_test.go @@ -95,9 +95,9 @@ func TestDrainImmediately(t *testing.T) { close(done) }) done := cp.Drain(ctx) - require.Equal(t, model.LivenessCaptureStopping, cp.Liveness()) select { case <-done: + require.Equal(t, model.LivenessCaptureStopping, cp.Liveness()) case <-time.After(time.Second): require.Fail(t, "timeout") } @@ -136,9 +136,9 @@ func TestDrainWaitsTables(t *testing.T) { close(done) }).After(t1) done := cp.Drain(ctx) - require.Equal(t, model.LivenessCaptureStopping, cp.Liveness()) select { case <-done: + require.Equal(t, model.LivenessCaptureStopping, cp.Liveness()) require.EqualValues(t, 3, calls) case <-time.After(3 * time.Second): require.Fail(t, "timeout") @@ -170,12 +170,12 @@ func TestDrainWaitsOwnerResign(t *testing.T) { }) done := cp.Drain(ctx) - require.Equal(t, model.LivenessCaptureStopping, cp.Liveness()) // Must wait owner resign by wait for async close. select { case <-ownerStopCh: // Simulate owner has resigned. + require.Equal(t, model.LivenessCaptureAlive, cp.Liveness()) cp.setOwner(nil) case <-time.After(3 * time.Second): require.Fail(t, "timeout") @@ -187,5 +187,6 @@ func TestDrainWaitsOwnerResign(t *testing.T) { case <-time.After(3 * time.Second): require.Fail(t, "timeout") case <-done: + require.Equal(t, model.LivenessCaptureStopping, cp.Liveness()) } } diff --git a/cdc/scheduler/internal/tp/scheduler_drain_capture.go b/cdc/scheduler/internal/tp/scheduler_drain_capture.go index fe2afb01888..fc6bd3e6472 100644 --- a/cdc/scheduler/internal/tp/scheduler_drain_capture.go +++ b/cdc/scheduler/internal/tp/scheduler_drain_capture.go @@ -145,7 +145,7 @@ func (d *drainCaptureScheduler) Schedule( // 3. the target capture cannot be found in the latest captures if len(victims) == 0 { log.Info("tpscheduler: drain capture scheduler finished, since no table", - zap.String("target", d.target), zap.Any("captures", captures)) + zap.String("target", d.target)) d.target = captureIDNotDraining return nil } diff --git a/cdc/scheduler/internal/tp/scheduler_drain_capture_test.go b/cdc/scheduler/internal/tp/scheduler_drain_capture_test.go index 714969bd293..5a0c7d6059a 100644 --- a/cdc/scheduler/internal/tp/scheduler_drain_capture_test.go +++ b/cdc/scheduler/internal/tp/scheduler_drain_capture_test.go @@ -114,3 +114,22 @@ func TestDrainStoppingCapture(t *testing.T) { require.EqualValues(t, "a", tasks[0].moveTable.DestCapture) require.EqualValues(t, "b", scheduler.getTarget()) } + +func TestDrainSkipOwner(t *testing.T) { + t.Parallel() + + var checkpointTs model.Ts + currentTables := make([]model.TableID, 0) + captures := map[model.CaptureID]*CaptureStatus{ + "a": {}, + "b": {IsOwner: true, State: CaptureStateStopping}, + } + replications := map[model.TableID]*ReplicationSet{ + 1: {State: ReplicationSetStateReplicating, Primary: "a"}, + 2: {State: ReplicationSetStateReplicating, Primary: "b"}, + } + scheduler := newDrainCaptureScheduler(10) + tasks := scheduler.Schedule(checkpointTs, currentTables, captures, replications) + require.Len(t, tasks, 0) + require.EqualValues(t, captureIDNotDraining, scheduler.getTarget()) +} diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 8c2869c1c48..2a381bbc976 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -148,7 +148,6 @@ func (o *options) run(cmd *cobra.Command) error { return errors.Annotate(err, "new server") } // Drain the server before shutdown. - shutdownNotify := func() <-chan struct{} { return server.Drain(ctx) } util.InitSignalHandling(shutdownNotify, cancel)