Skip to content

Commit

Permalink
add retry for observeRelayConfig and observeSourceBound
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Feb 11, 2022
1 parent e269a8e commit 5d6be06
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
25 changes: 17 additions & 8 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,15 @@ func (s *Server) Start() error {
s.wg.Add(1)
go func(ctx context.Context) {
defer s.wg.Done()
// TODO: handle fatal error from observeRelayConfig
//nolint:errcheck
s.observeRelayConfig(ctx, revRelay)
for {
select {
case <-ctx.Done():
return
default:
// relay don't affects sync, so no need to restart keepalive
log.L().Error("observeRelayConfig meet error will retry now", zap.Error(s.observeRelayConfig(ctx, revRelay)))
}
}
}(s.ctx)

bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
Expand All @@ -162,11 +168,14 @@ func (s *Server) Start() error {
go func(ctx context.Context) {
defer s.wg.Done()
for {
err1 := s.observeSourceBound(ctx, revBound)
if err1 == nil {
select {
case <-ctx.Done():
return
default:
log.L().Error("observeSourceBound meet error will retry and restart keepalive",
zap.Error(s.observeSourceBound(ctx, revBound)))
s.restartKeepAlive()
}
s.restartKeepAlive()
}
}(s.ctx)

Expand Down Expand Up @@ -239,8 +248,8 @@ func (s *Server) Start() error {
// worker keepalive with master
// If worker loses connect from master, it would stop all task and try to connect master again.
func (s *Server) startKeepAlive() {
s.kaWg.Add(1)
s.kaCtx, s.kaCancel = context.WithCancel(s.ctx)
s.kaWg.Add(1)
go s.doStartKeepAlive()
}

Expand Down Expand Up @@ -459,8 +468,8 @@ func (s *Server) doClose() {

// Close close the RPC server, this function can be called multiple times.
func (s *Server) Close() {
s.stopKeepAlive()
s.doClose()
s.stopKeepAlive()
}

// if needLock is false, we should make sure Server has been locked in caller.
Expand Down
9 changes: 5 additions & 4 deletions dm/tests/ha_cases3_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ function test_isolate_master_and_worker() {
}

function test_watch_source_bound_exit() {
# TODO FIXME this is a negative test case, dm should not let this happen

echo "[$(date)] <<<<<< start test_watch_source_bound_exit >>>>>>"
cleanup
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/pkg/ha/WatchSourceBoundChanClosed=return()"
Expand Down Expand Up @@ -165,13 +163,16 @@ function test_watch_source_bound_exit() {
"query-status" \
"no mysql source is being handled in the worker" 1

# dm-worker will retry watch source bound again and agin
check_log_contain_with_retry 'observeSourceBound meet error will retry and restart keepalive' $WORK_DIR/worker1/log/dm-worker.log

echo "[$(date)] <<<<<< finish test_watch_source_bound_exit >>>>>>"
}

function run() {
test_multi_task_running
test_isolate_master_and_worker # TICASE-934, 935, 936, 987, 992, 998, 999
# test_multi_task_running
test_watch_source_bound_exit
# test_isolate_master_and_worker # TICASE-934, 935, 936, 987, 992, 998, 999
}

cleanup_data $ha_test
Expand Down

0 comments on commit 5d6be06

Please sign in to comment.