Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm: turn on/off relay by source config #3190

Merged
merged 14 commits into from
Nov 15, 2021
22 changes: 20 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,24 @@ dm_integration_test_build: check_failpoint_ctl
$(FAILPOINT_DISABLE)
./dm/tests/prepare_tools.sh

dm_integration_test_build_worker: check_failpoint_ctl
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/ticdc/dm/... \
-o bin/dm-worker.test github.com/pingcap/ticdc/dm/cmd/dm-worker \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)
./dm/tests/prepare_tools.sh

dm_integration_test_build_master: check_failpoint_ctl
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/ticdc/dm/... \
-o bin/dm-master.test github.com/pingcap/ticdc/dm/cmd/dm-master \
|| { $(FAILPOINT_DISABLE); exit 1; }
$(FAILPOINT_DISABLE)
./dm/tests/prepare_tools.sh

install_test_python_dep:
@echo "install python requirments for test"
pip install --user -q -r ./dm/tests/requirements.txt
Expand All @@ -299,15 +317,15 @@ dm_integration_test: check_third_party_binary_for_dm install_test_python_dep
@which bin/dm-master.test
@which bin/dm-worker.test
@which bin/dm-syncer.test
ln -srf bin dm/
cd dm && ln -sf ../bin
cd dm && ./tests/run.sh $(CASE)

dm_compatibility_test: check_third_party_binary_for_dm
@which bin/dm-master.test.current
@which bin/dm-worker.test.current
@which bin/dm-master.test.previous
@which bin/dm-worker.test.previous
ln -srf bin dm/
cd dm && ln -sf ../bin
cd dm && ./tests/compatibility_run.sh ${CASE}

dm_coverage: tools/bin/gocovmerge tools/bin/goveralls
Expand Down
5 changes: 1 addition & 4 deletions dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,6 @@ func (c *SourceConfig) YamlForDowngrade() (string, error) {
// omit default values, so we can ignore them for later marshal
s.omitDefaultVals()

// not write this field when exporting
s.EnableRelay = false
return s.Yaml()
}

Expand All @@ -408,6 +406,7 @@ type SourceConfigForDowngrade struct {
MetaDir string `yaml:"meta-dir"`
Flavor string `yaml:"flavor"`
Charset string `yaml:"charset"`
EnableRelay bool `yaml:"enable-relay"`
RelayBinLogName string `yaml:"relay-binlog-name"`
RelayBinlogGTID string `yaml:"relay-binlog-gtid"`
UUIDSuffix int `yaml:"-"`
Expand All @@ -420,8 +419,6 @@ type SourceConfigForDowngrade struct {
// any new config item, we mark it omitempty
CaseSensitive bool `yaml:"case-sensitive,omitempty"`
Filters []*bf.BinlogEventRule `yaml:"filters,omitempty"`
// deprecated, DM will not write this field when exporting
EnableRelay bool `yaml:"enable-relay,omitempty"`
}

// NewSourceConfigForDowngrade creates a new base config for downgrade.
Expand Down
8 changes: 0 additions & 8 deletions dm/dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,6 @@ func (t *testConfig) TestSourceConfigForDowngrade(c *C) {
cfgForClone := &SourceConfigForDowngrade{}
Clone(cfgForClone, cfg)
c.Assert(cfgForDowngrade, DeepEquals, cfgForClone)

cfg.EnableRelay = true
content, err := cfg.Yaml()
c.Assert(err, IsNil)
c.Assert(strings.Contains(content, "enable-relay"), IsTrue)
downgrade, err := cfg.YamlForDowngrade()
c.Assert(err, IsNil)
c.Assert(strings.Contains(downgrade, "enable-relay"), IsFalse)
}

func subtestFlavor(c *C, cfg *SourceConfig, sqlInfo, expectedFlavor, expectedError string) {
Expand Down
11 changes: 9 additions & 2 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ type Scheduler struct {
// a mirror of bounds whose element is not deleted when worker unbound. worker -> SourceBound
lastBound map[string]ha.SourceBound

// TODO: seems this memory status is useless.
// expectant relay stages for sources, source ID -> stage.
// add:
// - bound the source to a worker (at first time). // TODO: change this to add a relay-enabled source
// - bound the source to a worker (at first time).
// - recover from etcd (calling `recoverSources`).
// update:
// - update stage by user request (calling `UpdateExpectRelayStage`).
Expand Down Expand Up @@ -2051,7 +2052,13 @@ func (s *Scheduler) boundSourceToWorker(source string, w *Worker) error {
// 1. put the bound relationship into etcd.
var err error
bound := ha.NewSourceBound(source, w.BaseInfo().Name)
_, err = ha.PutSourceBound(s.etcdCli, bound)
sourceCfg, ok := s.sourceCfgs[source]
if ok && sourceCfg.EnableRelay {
stage := ha.NewRelayStage(pb.Stage_Running, source)
_, err = ha.PutRelayStageSourceBound(s.etcdCli, stage, bound)
} else {
_, err = ha.PutSourceBound(s.etcdCli, bound)
}
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
sourceCfg1, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
sourceCfg1.SourceID = sourceID1
sourceCfg1.EnableRelay = true
sourceCfg2 := *sourceCfg1
sourceCfg2.SourceID = sourceID2

Expand Down
5 changes: 0 additions & 5 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,11 +1227,6 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
err error
)
for _, cfg := range cfgs {
// tell user he should use `start-relay` to manually specify relay workers
if cfg.EnableRelay {
resp.Msg = "Please use `start-relay` to specify which workers should pull relay log of relay-enabled sources."
}

err = s.scheduler.AddSourceCfg(cfg)
// return first error and try to revert, so user could copy-paste same start command after error
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
}
rev = rev1
if relaySource == nil {
if w := s.getWorker(false); w != nil && w.startedRelayBySourceCfg {
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
break
}
log.L().Info("didn't found relay config after etcd retryable error")
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
err = s.disableRelay("")
if err != nil {
log.L().Error("fail to disableRelay after etcd retryable error", zap.Error(err))
Expand Down Expand Up @@ -387,6 +391,9 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
}
}

// observeSourceBound will
// 1. keep bound relation updated from DM-master
// 2. keep enable-relay in source config updated. (TODO) This relies on DM-master re-put SourceBound after change it.
func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
var wg sync.WaitGroup
for {
Expand Down Expand Up @@ -669,6 +676,16 @@ func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock b
if err != nil {
return err
}

if sourceCfg.EnableRelay {
w.startedRelayBySourceCfg = true
if err2 := w.EnableRelay(); err2 != nil {
log.L().Error("found a `enable-relay: true` source, but failed to enable relay for DM worker",
zap.Error(err2))
return err2
}
}

if err2 := w.EnableHandleSubtasks(); err2 != nil {
s.setSourceStatus(sourceCfg.SourceID, err2, false)
return err2
Expand All @@ -684,7 +701,15 @@ func (s *Server) disableHandleSubtasks(source string) error {
log.L().Warn("worker has already stopped before DisableHandleSubtasks", zap.String("source", source))
return nil
}

w.DisableHandleSubtasks()

// now the worker is unbound, stop relay if it's started by source config
if w.cfg.EnableRelay && w.startedRelayBySourceCfg {
log.L().Info("stop relay because the source is unbound")
w.DisableRelay()
}

var err error
if !w.relayEnabled.Load() {
log.L().Info("relay is not enabled after disabling subtask, so stop worker")
Expand Down
16 changes: 10 additions & 6 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type SourceWorker struct {
relayHolder RelayHolder
relayPurger purger.Purger

startedRelayBySourceCfg bool

taskStatusChecker TaskStatusChecker

etcdClient *clientv3.Client
Expand Down Expand Up @@ -279,13 +281,15 @@ func (w *SourceWorker) EnableRelay() (err error) {
failpoint.Goto("bypass")
})

// we need update worker source config from etcd first
// because the configuration of the relay part of the data source may be changed via scheduler.UpdateSourceCfg
sourceCfg, _, err = ha.GetRelayConfig(w.etcdClient, w.name)
if err != nil {
return err
if !w.startedRelayBySourceCfg {
// we need update worker source config from etcd first
// because the configuration of the relay part of the data source may be changed via scheduler.UpdateSourceCfg
sourceCfg, _, err = ha.GetRelayConfig(w.etcdClient, w.name)
if err != nil {
return err
}
w.cfg = sourceCfg
}
w.cfg = sourceCfg

failpoint.Label("bypass")

Expand Down
19 changes: 19 additions & 0 deletions dm/pkg/ha/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ import (
"github.com/pingcap/ticdc/dm/pkg/etcdutil"
)

// PutRelayStageSourceBound puts the following data in one txn.
// - relay stage.
// - source bound relationship.
func PutRelayStageSourceBound(cli *clientv3.Client, stage Stage, bound SourceBound) (int64, error) {
ops1, err := putRelayStageOp(stage)
if err != nil {
return 0, err
}
op2, err := putSourceBoundOp(bound)
if err != nil {
return 0, err
}
ops := make([]clientv3.Op, 0, len(ops1)+len(op2))
ops = append(ops, ops1...)
ops = append(ops, op2...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}

// PutRelayStageRelayConfigSourceBound puts the following data in one txn.
// - relay stage.
// - relay config for a worker
Expand Down
1 change: 1 addition & 0 deletions dm/tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ function test_query_timeout() {
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
sed -i "s/enable-relay: true/enable-relay: false/g" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
Expand Down
3 changes: 1 addition & 2 deletions dm/tests/dmctl_basic/check_list/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ function config_to_file() {
"\"msg\": \"\"" 2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source create $WORK_DIR/get_source1.yaml" \
"\"result\": true" 2 \
"Please use \`start-relay\`" 1
"\"result\": true" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source stop mysql-replica-02" \
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/dmctl_basic/conf/get_source1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ relay-dir: /tmp/dm_test/dmctl_basic/worker1/relay_log
meta-dir: ""
flavor: mysql
charset: ""
enable-relay: true
enable-relay: false
relay-binlog-name: ""
relay-binlog-gtid: ""
source-id: mysql-replica-01
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/dmctl_basic/conf/get_source2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ relay-dir: /tmp/dm_test/dmctl_basic/worker2/relay_log
meta-dir: ""
flavor: mysql
charset: ""
enable-relay: true
enable-relay: false
relay-binlog-name: ""
relay-binlog-gtid: ""
source-id: mysql-replica-02
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/dmctl_basic/conf/source1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ server-id: 123456
enable-gtid: false
relay-binlog-name: ''
relay-binlog-gtid: ''
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/dmctl_basic/conf/source2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ server-id: 654321
enable-gtid: true
relay-binlog-name: ''
relay-binlog-gtid: ''
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
7 changes: 3 additions & 4 deletions dm/tests/drop_column_with_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ function run() {
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1" \
"\"result\": true" 1

# start DM task only
dmctl_start_task_standalone "$cur/conf/dm-task.yaml" "--remove-meta"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"relayCatchUpMaster\": true" 1

# use sync_diff_inspector to check full dump loader
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/ha_cases/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source-id: mysql-replica-01
flavor: ''
enable-gtid: false
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/ha_cases/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source-id: mysql-replica-02
flavor: ''
enable-gtid: true
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/ha_cases2/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source-id: mysql-replica-01
flavor: ''
enable-gtid: false
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/ha_cases2/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source-id: mysql-replica-02
flavor: ''
enable-gtid: true
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/ha_cases3/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source-id: mysql-replica-01
flavor: ''
enable-gtid: false
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/ha_cases3/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source-id: mysql-replica-02
flavor: ''
enable-gtid: true
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
4 changes: 2 additions & 2 deletions dm/tests/ha_cases3_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ function test_multi_task_running() {
# make sure task to step in "Sync" stage
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \
"query-status test" \
"\"stage\": \"Running\"" 2 \
"\"stage\": \"Running\"" 4 \
"\"unit\": \"Sync\"" 2
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT3" \
"query-status test2" \
"\"stage\": \"Running\"" 2 \
"\"stage\": \"Running\"" 4 \
"\"unit\": \"Sync\"" 2

echo "use sync_diff_inspector to check full dump loader"
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/ha_cases_1/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source-id: mysql-replica-01
flavor: ''
enable-gtid: false
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/ha_cases_1/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source-id: mysql-replica-02
flavor: ''
enable-gtid: true
enable-relay: true
enable-relay: false
from:
host: 127.0.0.1
user: root
Expand Down
Loading