Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker(dm): add retry for watch when network is weak #4553

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ func (s *Server) Start() error {
// worker keepalive with master
// If worker loses connect from master, it would stop all task and try to connect master again.
func (s *Server) startKeepAlive() {
s.kaWg.Add(1)
s.kaCtx, s.kaCancel = context.WithCancel(s.ctx)
s.kaWg.Add(1)
go s.doStartKeepAlive()
}

Expand Down Expand Up @@ -431,7 +431,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}
} else {
if err != nil {
log.L().Error("observeSourceBound is failed and will quit now", zap.Error(err))
log.L().Error("observeSourceBound is failed and will quit now outer logic will trigger a retry", zap.Error(err))
} else {
log.L().Info("observeSourceBound will quit now")
}
Expand Down Expand Up @@ -564,7 +564,7 @@ OUTER:
}
// TODO: Deal with err
log.L().Error("WatchSourceBound received an error", zap.Error(err))
if etcdutil.IsRetryableError(err) {
if etcdutil.IsRetryableError(err) || errors.Cause(err) == etcdutil.ErrEtcdWatchChannelClose {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func (w *SourceWorker) DisableHandleSubtasks() {

// close all sub tasks
w.subTaskHolder.closeAllSubTasks()
w.l.Info("handling subtask enabled")
w.l.Info("handling subtask disabled")
}

// fetchSubTasksAndAdjust gets source's subtask stages and configs, adjust some values by worker's config and status
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ var etcdDefaultTxnRetryParam = retry.Params{

var etcdDefaultTxnStrategy = retry.FiniteRetryStrategy{}

var ErrEtcdWatchChannelClose = errors.New("etcd watch channel closed")

// CreateClient creates an etcd client with some default config items.
func CreateClient(endpoints []string, tlsCfg *tls.Config) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Expand Down
13 changes: 13 additions & 0 deletions dm/pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,20 @@ func WatchSourceBound(ctx context.Context, cli *clientv3.Client, worker string,
case <-ctx.Done():
return
case resp, ok := <-ch:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the reason of this WatchChan being closed?

Copy link
Contributor Author

@Ehco1996 Ehco1996 Feb 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when networkw is poor, all grpc stream is dead, etcd watch will return an closed channel

see here

and this issue

Copy link
Contributor

@lance6716 lance6716 Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The link you given is not the version and path of etcd Watch we use.

tiflow/go.mod

Line 83 in b301406

go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9

https://github.com/etcd-io/etcd/blob/d19fbe541bf9c81e2d69d71d1068bd40c04de200/clientv3/watch.go#L288

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w.streams will only be nil after w.Close(), I don't think that's the root cause

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you have time can you help locate the problem, I don't have any ideas here at the moment, I'll send you the log in private chat~

failpoint.Inject("WatchSourceBoundChanClosed", func() {
ok = false
})
if !ok {
log.L().Info("WatchSourceBound chan closed! observeSourceBound will exit!")
select {
case errCh <- etcdutil.ErrEtcdWatchChannelClose:
// etcd client will return a closed channel if underlying grpc connection is closed
// see https://github.com/etcd-io/etcd/blob/20c89df5e5e2d738efb9c276d954d754eb86918b/client/v3/watch.go#L325
// and https://github.com/etcd-io/etcd/issues/8980#issuecomment-350070282
// we return an error to let outer logic do the whole retry for `observeSourceBound`
case <-ctx.Done():
return
}
return
}
if resp.Canceled {
Expand Down
2 changes: 2 additions & 0 deletions dm/tests/_utils/ha_cases_lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ function cleanup() {
$(mysql -h127.0.0.1 -p123456 -P${i} -uroot -e "drop database if exists ha_test2;")
sleep 1
done
export GO_FAILPOINTS=""
cleanup_process $*
}

function isolate_master() {
Expand Down
46 changes: 44 additions & 2 deletions dm/tests/ha_cases3_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ function test_multi_task_running() {

function test_isolate_master_and_worker() {
echo "[$(date)] <<<<<< start test_isolate_master_and_worker >>>>>>"

test_multi_task_running
cleanup
prepare_sql_multi_task
start_multi_tasks_cluster

# join master4 and master5
run_dm_master $WORK_DIR/master-join4 $MASTER_PORT4 $cur/conf/dm-master-join4.toml
Expand Down Expand Up @@ -129,7 +130,48 @@ function test_isolate_master_and_worker() {
echo "[$(date)] <<<<<< finish test_isolate_master_and_worker >>>>>>"
}

function test_watch_source_bound_exit() {
echo "[$(date)] <<<<<< start test_watch_source_bound_exit >>>>>>"
cleanup
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/pkg/ha/WatchSourceBoundChanClosed=return()"

echo "start DM worker and master cluster"
run_dm_master $WORK_DIR/master1 $MASTER_PORT1 $cur/conf/dm-master1.toml
run_dm_master $WORK_DIR/master2 $MASTER_PORT2 $cur/conf/dm-master2.toml
run_dm_master $WORK_DIR/master3 $MASTER_PORT3 $cur/conf/dm-master3.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT2
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT3

echo "start worker and operate mysql config to worker"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT1" \
"operate-source create $WORK_DIR/source1.yaml" \
"\"result\": true" 1 \
"fail to get expected result" 1

# master think worker1 is online and bound source1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \
"list-member --name worker1" \
"\"source\": \"mysql-replica-01\"" 1

# but worker1 is not handling this source because worker watch source bound thread is exited
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT1" \
"query-status" \
"no mysql source is being handled in the worker" 1

# dm-worker will retry watch source bound again and agin
check_log_contain_with_retry 'observeSourceBound is failed and will quit now outer logic will trigger a retry' $WORK_DIR/worker1/log/dm-worker.log

echo "[$(date)] <<<<<< finish test_watch_source_bound_exit >>>>>>"
}

function run() {
test_multi_task_running
test_watch_source_bound_exit
test_isolate_master_and_worker # TICASE-934, 935, 936, 987, 992, 998, 999
}

Expand Down