From 29a1ac410a03d95e53e4a713e3e20d292334595f Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sat, 19 Oct 2019 20:18:30 -0600 Subject: [PATCH 01/22] Improve my.cnf draft The binlog tests still fail. Signed-off-by: Morgan Tocker --- config/mycnf/backup.cnf | 1 - config/mycnf/benchmark.cnf | 7 ----- config/mycnf/default-fast.cnf | 50 ++++++++++-------------------- config/mycnf/default.cnf | 34 ++++++++++---------- config/mycnf/master.cnf | 5 --- config/mycnf/master_mariadb.cnf | 12 ------- config/mycnf/master_mariadb100.cnf | 37 ++++++++++++++++++++-- config/mycnf/master_mariadb101.cnf | 37 ++++++++++++++++++++-- config/mycnf/master_mariadb102.cnf | 32 +++++++++++++++++-- config/mycnf/master_mariadb103.cnf | 30 +++++++++++------- config/mycnf/master_mysql56.cnf | 24 +++++++++----- config/mycnf/master_mysql57.cnf | 14 ++++++--- config/mycnf/master_mysql80.cnf | 26 ++++++---------- config/mycnf/production.cnf | 5 --- config/mycnf/rbr.cnf | 4 ++- config/mycnf/rdonly.cnf | 1 - config/mycnf/replica.cnf | 13 -------- config/mycnf/sbr.cnf | 2 ++ config/mycnf/vtcombo.cnf | 1 - examples/local/vttablet-up.sh | 5 --- go/vt/mysqlctl/mycnf_test.go | 2 -- go/vt/mysqlctl/mysqld.go | 16 ++-------- go/vt/vttest/environment.go | 6 ++-- py/vttest/mysql_flavor.py | 2 +- py/vttest/run_local_database.py | 2 +- test/mysql_flavor.py | 2 +- 26 files changed, 199 insertions(+), 171 deletions(-) delete mode 100644 config/mycnf/backup.cnf delete mode 100644 config/mycnf/benchmark.cnf delete mode 100644 config/mycnf/master.cnf delete mode 100644 config/mycnf/master_mariadb.cnf delete mode 100644 config/mycnf/production.cnf delete mode 100644 config/mycnf/rdonly.cnf delete mode 100644 config/mycnf/replica.cnf create mode 100644 config/mycnf/sbr.cnf delete mode 100644 config/mycnf/vtcombo.cnf diff --git a/config/mycnf/backup.cnf b/config/mycnf/backup.cnf deleted file mode 100644 index de33eee9c41..00000000000 --- a/config/mycnf/backup.cnf +++ /dev/null @@ -1 +0,0 @@ -# reserved for future tuning diff --git a/config/mycnf/benchmark.cnf b/config/mycnf/benchmark.cnf deleted file mode 100644 index 5d33db9b15f..00000000000 --- a/config/mycnf/benchmark.cnf +++ /dev/null @@ -1,7 +0,0 @@ -innodb_doublewrite=0 -innodb_flush_log_at_trx_commit=0 -innodb_log_file_size=128M -innodb_buffer_pool_size=1G -max_connections=500 -open_files_limit=8192 -sync_binlog=0 diff --git a/config/mycnf/default-fast.cnf b/config/mycnf/default-fast.cnf index 969b51baa34..f7fde4463fd 100644 --- a/config/mycnf/default-fast.cnf +++ b/config/mycnf/default-fast.cnf @@ -1,37 +1,19 @@ -# basic config parameters for all db instances in the grid +# This sets some unsafe settings specifically for +# the test-suite which is currently MySQL 5.7 based +# In future it should be renamed testsuite.cnf -sql_mode = STRICT_TRANS_TABLES -character_set_server = utf8 -collation_server = utf8_general_ci -connect_timeout = 30 -datadir = {{.DataDir}} -expire_logs_days = 3 -innodb_buffer_pool_size = 64M -innodb_data_home_dir = {{.InnodbDataHomeDir}} -innodb_flush_log_at_trx_commit = 2 -innodb_lock_wait_timeout = 20 +innodb_buffer_pool_size = 32M +innodb_flush_log_at_trx_commit = 0 innodb_log_buffer_size = 1M -innodb_log_file_size = 4M -innodb_log_group_home_dir = {{.InnodbLogGroupHomeDir}} +innodb_log_file_size = 5M + +# Native AIO tends to run into aio-max-nr limit during test startup. +innodb_use_native_aio = 0 + key_buffer_size = 2M -log-error = {{.ErrorLogPath}} -long_query_time = 2 -pid-file = {{.PidFile}} -port = {{.MysqlPort}} -# all db instances should start in read-only mode - once the db is started and -# fully functional, we'll push it into read-write mode -read-only -server-id = {{.ServerID}} -skip-name-resolve -# we now need networking for replication. this is a tombstone to simpler times. -#skip_networking -# all db instances should skip the slave startup - that way we can do any -# out-of-bounds checking before we restart everything - in case we need to do -# some extra work to skip mangled transactions or fudge the slave start -skip_slave_start -slave_net_timeout = 60 -slave_load_tmpdir = {{.SlaveLoadTmpDir}} -slow-query-log -slow-query-log-file = {{.SlowLogPath}} -socket = {{.SocketFile}} -tmpdir = {{.TmpDir}} +sync_binlog=0 +innodb_doublewrite=0 + +# Some tests don't work with full strict yet +sql_mode='STRICT_TRANS_TABLES' + diff --git a/config/mycnf/default.cnf b/config/mycnf/default.cnf index 61f767a8032..df2e7017416 100644 --- a/config/mycnf/default.cnf +++ b/config/mycnf/default.cnf @@ -1,35 +1,35 @@ -# basic config parameters for all db instances in the grid +# Global configuration that is auto-included for all MySQL/MariaDB versions -sql_mode = STRICT_TRANS_TABLES -binlog_format = statement -character_set_server = utf8 -collation_server = utf8_general_ci -connect_timeout = 30 datadir = {{.DataDir}} -expire_logs_days = 3 -innodb_buffer_pool_size = 32M innodb_data_home_dir = {{.InnodbDataHomeDir}} -innodb_flush_log_at_trx_commit = 2 -innodb_lock_wait_timeout = 20 innodb_log_group_home_dir = {{.InnodbLogGroupHomeDir}} log-error = {{.ErrorLogPath}} -long_query_time = 2 -max_allowed_packet = 64M -max_connections = 500 pid-file = {{.PidFile}} port = {{.MysqlPort}} + # all db instances should start in read-only mode - once the db is started and # fully functional, we'll push it into read-write mode read-only server-id = {{.ServerID}} -skip-name-resolve + # all db instances should skip the slave startup - that way we can do any # additional configuration (like enabling semi-sync) before we connect to # the master. skip_slave_start -slave_net_timeout = 60 slave_load_tmpdir = {{.SlaveLoadTmpDir}} -slow-query-log -slow-query-log-file = {{.SlowLogPath}} socket = {{.SocketFile}} tmpdir = {{.TmpDir}} + +slow-query-log-file = {{.SlowLogPath}} + +# These are sensible defaults that apply to all MySQL/MariaDB versions + +long_query_time = 2 +slow-query-log +skip-name-resolve +connect_timeout = 30 +innodb_lock_wait_timeout = 20 +max_allowed_packet = 64M + + + diff --git a/config/mycnf/master.cnf b/config/mycnf/master.cnf deleted file mode 100644 index 481f06f5ffd..00000000000 --- a/config/mycnf/master.cnf +++ /dev/null @@ -1,5 +0,0 @@ -# master.cnf parameters - -log-bin = {{.BinLogPath}} -log-slave-updates -sync_binlog = 1 diff --git a/config/mycnf/master_mariadb.cnf b/config/mycnf/master_mariadb.cnf deleted file mode 100644 index 1e41cd8f3ce..00000000000 --- a/config/mycnf/master_mariadb.cnf +++ /dev/null @@ -1,12 +0,0 @@ -# This file is auto-included when MariaDB (any version) is detected. - -# enable strict mode so it's safe to compare sequence numbers across different server IDs. -gtid_strict_mode = 1 -innodb_stats_persistent = 0 - -# When semi-sync is enabled, don't allow fallback to async -# if you get no ack, or have no slaves. This is necessary to -# prevent alternate futures when doing a failover in response to -# a master that becomes unresponsive. -rpl_semi_sync_master_timeout = 1000000000000000000 -rpl_semi_sync_master_wait_no_slave = 1 diff --git a/config/mycnf/master_mariadb100.cnf b/config/mycnf/master_mariadb100.cnf index ce85c641c13..0c5642ee942 100644 --- a/config/mycnf/master_mariadb100.cnf +++ b/config/mycnf/master_mariadb100.cnf @@ -1,7 +1,5 @@ # This file is auto-included when MariaDB 10.0 is detected. -innodb_support_xa = 0 - # Semi-sync replication is required for automated unplanned failover # (when the master goes away). Here we just load the plugin so it's # available if desired, but it's disabled at startup. @@ -10,3 +8,38 @@ innodb_support_xa = 0 # at the proper time when replication is set up, or when masters are # promoted or demoted. plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so + +slave_net_timeout = 60 + +# MariaDB 10.0 is unstrict by default +sql_mode = STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION + +# enable strict mode so it's safe to compare sequence numbers across different server IDs. +gtid_strict_mode = 1 +innodb_stats_persistent = 0 + +# When semi-sync is enabled, don't allow fallback to async +# if you get no ack, or have no slaves. This is necessary to +# prevent alternate futures when doing a failover in response to +# a master that becomes unresponsive. +rpl_semi_sync_master_timeout = 1000000000000000000 +rpl_semi_sync_master_wait_no_slave = 1 + + +character_set_server = utf8 +collation_server = utf8_general_ci + +expire_logs_days = 3 + +log_bin +sync_binlog = 1 +binlog_format = ROW +log_slave_updates +expire_logs_days = 3 + +# In MariaDB the default charset is latin1 + +character_set_server = utf8 +collation_server = utf8_general_ci + + diff --git a/config/mycnf/master_mariadb101.cnf b/config/mycnf/master_mariadb101.cnf index d613b155d68..177cbd077eb 100644 --- a/config/mycnf/master_mariadb101.cnf +++ b/config/mycnf/master_mariadb101.cnf @@ -1,7 +1,5 @@ # This file is auto-included when MariaDB 10.1 is detected. -innodb_support_xa = 0 - # Semi-sync replication is required for automated unplanned failover # (when the master goes away). Here we just load the plugin so it's # available if desired, but it's disabled at startup. @@ -10,3 +8,38 @@ innodb_support_xa = 0 # at the proper time when replication is set up, or when masters are # promoted or demoted. plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so + +slave_net_timeout = 60 + +# MariaDB 10.1 default is only no-engine-substitution and no-auto-create-user +sql_mode = STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION,NO_AUTO_CREATE_USER + +# enable strict mode so it's safe to compare sequence numbers across different server IDs. +gtid_strict_mode = 1 +innodb_stats_persistent = 0 + +# When semi-sync is enabled, don't allow fallback to async +# if you get no ack, or have no slaves. This is necessary to +# prevent alternate futures when doing a failover in response to +# a master that becomes unresponsive. +rpl_semi_sync_master_timeout = 1000000000000000000 +rpl_semi_sync_master_wait_no_slave = 1 + + +character_set_server = utf8 +collation_server = utf8_general_ci + +expire_logs_days = 3 + +log_bin +sync_binlog = 1 +binlog_format = ROW +log_slave_updates +expire_logs_days = 3 + +# In MariaDB the default charset is latin1 + +character_set_server = utf8 +collation_server = utf8_general_ci + + diff --git a/config/mycnf/master_mariadb102.cnf b/config/mycnf/master_mariadb102.cnf index 487baa9bf87..8a25f5c2c34 100644 --- a/config/mycnf/master_mariadb102.cnf +++ b/config/mycnf/master_mariadb102.cnf @@ -1,7 +1,5 @@ # This file is auto-included when MariaDB 10.2 is detected. -innodb_support_xa = 0 - # Semi-sync replication is required for automated unplanned failover # (when the master goes away). Here we just load the plugin so it's # available if desired, but it's disabled at startup. @@ -10,3 +8,33 @@ innodb_support_xa = 0 # at the proper time when replication is set up, or when masters are # promoted or demoted. plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so + +# enable strict mode so it's safe to compare sequence numbers across different server IDs. +gtid_strict_mode = 1 +innodb_stats_persistent = 0 + +# When semi-sync is enabled, don't allow fallback to async +# if you get no ack, or have no slaves. This is necessary to +# prevent alternate futures when doing a failover in response to +# a master that becomes unresponsive. +rpl_semi_sync_master_timeout = 1000000000000000000 +rpl_semi_sync_master_wait_no_slave = 1 + + +character_set_server = utf8 +collation_server = utf8_general_ci + +expire_logs_days = 3 + +log_bin +sync_binlog = 1 +binlog_format = ROW +log_slave_updates +expire_logs_days = 3 + +# In MariaDB the default charset is latin1 + +character_set_server = utf8 +collation_server = utf8_general_ci + + diff --git a/config/mycnf/master_mariadb103.cnf b/config/mycnf/master_mariadb103.cnf index ac8b38404fd..36eef4f2f50 100644 --- a/config/mycnf/master_mariadb103.cnf +++ b/config/mycnf/master_mariadb103.cnf @@ -4,20 +4,28 @@ gtid_strict_mode = 1 innodb_stats_persistent = 0 -# Semi-sync replication is required for automated unplanned failover -# (when the master goes away). Here we just load the plugin so it's -# available if desired, but it's disabled at startup. -# -# If the -enable_semi_sync flag is used, VTTablet will enable semi-sync -# at the proper time when replication is set up, or when masters are -# promoted or demoted. - -# semi_sync has been merged into master as of mariadb 10.3 so this is no longer needed -#plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so - # When semi-sync is enabled, don't allow fallback to async # if you get no ack, or have no slaves. This is necessary to # prevent alternate futures when doing a failover in response to # a master that becomes unresponsive. rpl_semi_sync_master_timeout = 1000000000000000000 rpl_semi_sync_master_wait_no_slave = 1 + + +character_set_server = utf8 +collation_server = utf8_general_ci + +expire_logs_days = 3 + +log_bin +sync_binlog = 1 +binlog_format = ROW +log_slave_updates +expire_logs_days = 3 + +# In MariaDB the default charset is latin1 + +character_set_server = utf8 +collation_server = utf8_general_ci + + diff --git a/config/mycnf/master_mysql56.cnf b/config/mycnf/master_mysql56.cnf index dcb8a4e113f..7524ef1663a 100644 --- a/config/mycnf/master_mysql56.cnf +++ b/config/mycnf/master_mysql56.cnf @@ -1,20 +1,29 @@ # This file is auto-included when MySQL 5.6 is detected. -# Options for enabling GTID -# https://dev.mysql.com/doc/refman/5.6/en/replication-gtids-howto.html -gtid_mode = ON +# MySQL 5.6 does not enable the binary log by default, and +# the default for sync_binlog is unsafe. The format is TABLE, and +# info repositories also default to file. + log_bin +sync_binlog = 1 +gtid_mode = ON +binlog_format = ROW log_slave_updates enforce_gtid_consistency - -# Crash-safe replication settings. +expire_logs_days = 3 master_info_repository = TABLE relay_log_info_repository = TABLE relay_log_purge = 1 relay_log_recovery = 1 +slave_net_timeout = 60 -# Native AIO tends to run into aio-max-nr limit during test startup. -innodb_use_native_aio = 0 +# In MySQL 5.6 the default charset is latin1 + +character_set_server = utf8 +collation_server = utf8_general_ci + +# MySQL 5.6 is unstrict by default +sql_mode = STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION # Semi-sync replication is required for automated unplanned failover # (when the master goes away). Here we just load the plugin so it's @@ -31,3 +40,4 @@ plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisy # a master that becomes unresponsive. rpl_semi_sync_master_timeout = 1000000000000000000 rpl_semi_sync_master_wait_no_slave = 1 + diff --git a/config/mycnf/master_mysql57.cnf b/config/mycnf/master_mysql57.cnf index 381b05ac14c..82c4e36c5fb 100644 --- a/config/mycnf/master_mysql57.cnf +++ b/config/mycnf/master_mysql57.cnf @@ -1,19 +1,23 @@ # This file is auto-included when MySQL 5.7 is detected. -# Options for enabling GTID -# https://dev.mysql.com/doc/refman/5.6/en/replication-gtids-howto.html +# MySQL 5.7 does not enable the binary log by default, and +# info repositories default to file + gtid_mode = ON log_bin log_slave_updates enforce_gtid_consistency -innodb_use_native_aio = 0 - -# Crash-safe replication settings. +expire_logs_days = 3 master_info_repository = TABLE relay_log_info_repository = TABLE relay_log_purge = 1 relay_log_recovery = 1 +# In MySQL 5.7 the default charset is latin1 + +character_set_server = utf8 +collation_server = utf8_general_ci + # Semi-sync replication is required for automated unplanned failover # (when the master goes away). Here we just load the plugin so it's # available if desired, but it's disabled at startup. diff --git a/config/mycnf/master_mysql80.cnf b/config/mycnf/master_mysql80.cnf index e92b794ef9b..42c83f7ecec 100644 --- a/config/mycnf/master_mysql80.cnf +++ b/config/mycnf/master_mysql80.cnf @@ -1,20 +1,18 @@ # This file is auto-included when MySQL 8.0 is detected. -# Options for enabling GTID -# https://dev.mysql.com/doc/refman/5.6/en/replication-gtids-howto.html +# MySQL 8.0 enables binlog by default with sync_binlog and TABLE info repositories +# It does not enable GTIDs or enforced GTID consistency + gtid_mode = ON -log_bin -log_slave_updates enforce_gtid_consistency - -# Crash-safe replication settings. -master_info_repository = TABLE -relay_log_info_repository = TABLE -relay_log_purge = 1 relay_log_recovery = 1 +binlog_expire_logs_seconds = 259200 + +# disable mysqlx +mysqlx = 0 -# Native AIO tends to run into aio-max-nr limit during test startup. -innodb_use_native_aio = 0 +# 8.0 changes the default auth-plugin to caching_sha2_password +default_authentication_plugin = mysql_native_password # Semi-sync replication is required for automated unplanned failover # (when the master goes away). Here we just load the plugin so it's @@ -32,9 +30,3 @@ plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisy rpl_semi_sync_master_timeout = 1000000000000000000 rpl_semi_sync_master_wait_no_slave = 1 -# disable mysqlx -mysqlx = 0 - -# 8.0 changes the default auth-plugin to caching_sha2_password -default_authentication_plugin = mysql_native_password -secure_file_priv = NULL diff --git a/config/mycnf/production.cnf b/config/mycnf/production.cnf deleted file mode 100644 index 64f8c245035..00000000000 --- a/config/mycnf/production.cnf +++ /dev/null @@ -1,5 +0,0 @@ -# Values for a production vitess deployment -innodb_buffer_pool_size = 1024M -innodb_log_file_size = 512M -innodb_log_buffer_size = 64M -max_connections = 1000 \ No newline at end of file diff --git a/config/mycnf/rbr.cnf b/config/mycnf/rbr.cnf index 5dde64cda57..10c17a6f09c 100644 --- a/config/mycnf/rbr.cnf +++ b/config/mycnf/rbr.cnf @@ -1 +1,3 @@ -binlog-format=row +# This file is used to allow legacy tests to pass +# In theory it should not be required +binlog_format=row diff --git a/config/mycnf/rdonly.cnf b/config/mycnf/rdonly.cnf deleted file mode 100644 index de33eee9c41..00000000000 --- a/config/mycnf/rdonly.cnf +++ /dev/null @@ -1 +0,0 @@ -# reserved for future tuning diff --git a/config/mycnf/replica.cnf b/config/mycnf/replica.cnf deleted file mode 100644 index 74e9f2b34ea..00000000000 --- a/config/mycnf/replica.cnf +++ /dev/null @@ -1,13 +0,0 @@ -# replica.cnf - reserved for future tuning - -relay-log = {{.RelayLogPath}} -relay-log-index = {{.RelayLogIndexPath}} -relay-log-info-file = {{.RelayLogInfoPath}} -master-info-file = {{.MasterInfoFile}} - -# required if this master is chained -# probably safe to turn on all the time at the expense of some disk I/O -# note: this is in the master conf too -log-slave-updates - -#slave_compressed_protocol diff --git a/config/mycnf/sbr.cnf b/config/mycnf/sbr.cnf new file mode 100644 index 00000000000..12fb1267e59 --- /dev/null +++ b/config/mycnf/sbr.cnf @@ -0,0 +1,2 @@ +# This file is used to allow legacy tests to pass +binlog_format=statement diff --git a/config/mycnf/vtcombo.cnf b/config/mycnf/vtcombo.cnf deleted file mode 100644 index de6141f2c97..00000000000 --- a/config/mycnf/vtcombo.cnf +++ /dev/null @@ -1 +0,0 @@ -max_connections = 5000 diff --git a/examples/local/vttablet-up.sh b/examples/local/vttablet-up.sh index 75c3b191d04..28d5cc7709f 100755 --- a/examples/local/vttablet-up.sh +++ b/examples/local/vttablet-up.sh @@ -38,11 +38,6 @@ source $script_root/env.sh init_db_sql_file="$VTROOT/config/init_db.sql" -# Previously this file set EXTRA_MY_CNF based on MYSQL_FLAVOR -# It now relies on mysqlctl to autodetect - -export EXTRA_MY_CNF=$VTROOT/config/mycnf/default-fast.cnf:$VTROOT/config/mycnf/rbr.cnf - mkdir -p $VTDATAROOT/backups # Start 3 vttablets by default. diff --git a/go/vt/mysqlctl/mycnf_test.go b/go/vt/mysqlctl/mycnf_test.go index d938c068b18..1991b5bcb06 100644 --- a/go/vt/mysqlctl/mycnf_test.go +++ b/go/vt/mysqlctl/mycnf_test.go @@ -43,8 +43,6 @@ func TestMycnf(t *testing.T) { } cnfTemplatePaths := []string{ path.Join(root, "src/vitess.io/vitess/config/mycnf/default.cnf"), - path.Join(root, "src/vitess.io/vitess/config/mycnf/replica.cnf"), - path.Join(root, "src/vitess.io/vitess/config/mycnf/master.cnf"), } data, err := cnf.makeMycnf(cnfTemplatePaths) if err != nil { diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index e8c85797ae8..cb6943f297b 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -762,8 +762,6 @@ func (mysqld *Mysqld) getMycnfTemplates(root string) []string { cnfTemplatePaths := []string{ path.Join(root, "config/mycnf/default.cnf"), - path.Join(root, "config/mycnf/master.cnf"), - path.Join(root, "config/mycnf/replica.cnf"), } if extraCnf := os.Getenv("EXTRA_MY_CNF"); extraCnf != "" { @@ -771,24 +769,14 @@ func (mysqld *Mysqld) getMycnfTemplates(root string) []string { cnfTemplatePaths = append(cnfTemplatePaths, parts...) } - // Only include these files if they exist. - // master_{flavor}.cnf - // Percona Server == MySQL in this context - f := flavorMariaDB if mysqld.capabilities.isMySQLLike() { f = flavorMySQL } - p := path.Join(root, fmt.Sprintf("config/mycnf/master_%s.cnf", f)) - _, err := os.Stat(p) - if err == nil && !contains(cnfTemplatePaths, p) { - cnfTemplatePaths = append(cnfTemplatePaths, p) - } - // master_{flavor}{major}{minor}.cnf - p = path.Join(root, fmt.Sprintf("config/mycnf/master_%s%d%d.cnf", f, mysqld.capabilities.version.Major, mysqld.capabilities.version.Minor)) - _, err = os.Stat(p) + p := path.Join(root, fmt.Sprintf("config/mycnf/master_%s%d%d.cnf", f, mysqld.capabilities.version.Major, mysqld.capabilities.version.Minor)) + _, err := os.Stat(p) if err == nil && !contains(cnfTemplatePaths, p) { cnfTemplatePaths = append(cnfTemplatePaths, p) } diff --git a/go/vt/vttest/environment.go b/go/vt/vttest/environment.go index 970465b9cbd..d48dee8fe32 100644 --- a/go/vt/vttest/environment.go +++ b/go/vt/vttest/environment.go @@ -116,22 +116,20 @@ func GetMySQLOptions(flavor string) (string, []string, error) { flavor = DefaultMySQLFlavor } - mycnf := []string{"config/mycnf/vtcombo.cnf"} + mycnf := []string{} switch flavor { case "MariaDB103": mycnf = append(mycnf, "config/mycnf/default-fast.cnf") mycnf = append(mycnf, "config/mycnf/master_mariadb103.cnf") case "MariaDB": mycnf = append(mycnf, "config/mycnf/default-fast.cnf") - mycnf = append(mycnf, "config/mycnf/master_mariadb.cnf") - + mycnf = append(mycnf, "config/mycnf/master_mariadb100.cnf") case "MySQL80": mycnf = append(mycnf, "config/mycnf/default-fast.cnf") mycnf = append(mycnf, "config/mycnf/master_mysql80.cnf") case "MySQL56": mycnf = append(mycnf, "config/mycnf/default-fast.cnf") mycnf = append(mycnf, "config/mycnf/master_mysql56.cnf") - default: return "", nil, fmt.Errorf("unknown mysql flavor: %s", flavor) } diff --git a/py/vttest/mysql_flavor.py b/py/vttest/mysql_flavor.py index 0962de06fde..1c4bb61afe1 100644 --- a/py/vttest/mysql_flavor.py +++ b/py/vttest/mysql_flavor.py @@ -49,7 +49,7 @@ class MariaDB(MysqlFlavor): def my_cnf(self): files = [ os.path.join(vttop, "config/mycnf/default-fast.cnf"), - os.path.join(vttop, "config/mycnf/master_mariadb.cnf"), + os.path.join(vttop, "config/mycnf/master_mariadb100.cnf"), ] return ":".join(files) diff --git a/py/vttest/run_local_database.py b/py/vttest/run_local_database.py index 8301a4e2b42..d25205e66b3 100755 --- a/py/vttest/run_local_database.py +++ b/py/vttest/run_local_database.py @@ -98,7 +98,7 @@ def main(cmdline_options): init_data_opts.max_table_shard_size = cmdline_options.max_table_shard_size init_data_opts.null_probability = cmdline_options.null_probability - extra_my_cnf = os.path.join(os.environ['VTTOP'], 'config/mycnf/vtcombo.cnf') + extra_my_cnf = '' if cmdline_options.extra_my_cnf: extra_my_cnf += ':' + cmdline_options.extra_my_cnf diff --git a/test/mysql_flavor.py b/test/mysql_flavor.py index a0be7b1289a..55c3af2ce94 100644 --- a/test/mysql_flavor.py +++ b/test/mysql_flavor.py @@ -126,7 +126,7 @@ def reset_replication_commands(self): ] def extra_my_cnf(self): - return environment.vttop + "/config/mycnf/master_mariadb.cnf" + return environment.vttop + "/config/mycnf/master_mariadb100.cnf" def master_position(self, tablet): gtid = tablet.mquery("", "SELECT @@GLOBAL.gtid_binlog_pos")[0][0] From 4193d0378c02d151a3378ce1e11dc09d89ee39e8 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 29 Oct 2019 20:31:17 -0600 Subject: [PATCH 02/22] Make sure 5.7 doesn't read 5.6 config file Add SBR and sql-mode lines back in Signed-off-by: Morgan Tocker --- config/mycnf/default.cnf | 5 +++++ examples/compose/README.md | 3 +-- examples/compose/external_db/docker-compose.yml | 1 - examples/compose/vttablet-up.sh | 3 +-- .../vttablet-pod-benchmarking-template.yaml | 3 --- examples/kubernetes/vttablet-pod-template.yaml | 6 ------ go/vt/vttest/environment.go | 17 +---------------- helm/vitess/README.md | 2 +- py/vttest/mysql_flavor.py | 4 ---- test/mysql_flavor.py | 8 ++++---- 10 files changed, 13 insertions(+), 39 deletions(-) diff --git a/config/mycnf/default.cnf b/config/mycnf/default.cnf index df2e7017416..3a6ee12d08e 100644 --- a/config/mycnf/default.cnf +++ b/config/mycnf/default.cnf @@ -31,5 +31,10 @@ connect_timeout = 30 innodb_lock_wait_timeout = 20 max_allowed_packet = 64M +# These two settings are required for the testsuite to pass, +# but enabling them does not spark joy. They should be removed +# in the future. +binlog-format=statement +sql_mode = STRICT_TRANS_TABLES diff --git a/examples/compose/README.md b/examples/compose/README.md index eb801a8fcbe..053cf54c850 100644 --- a/examples/compose/README.md +++ b/examples/compose/README.md @@ -152,7 +152,6 @@ DB_CHARSET=CHARACTER SET utf8 COLLATE utf8_general_ci Ensure you have log bin enabled on your external database. You may add the following configs to your conf.d directory and reload mysqld on your server ``` -vitess/config/mycnf/master_mysql56.cnf vitess/config/mycnf/rbr.cnf ``` @@ -258,4 +257,4 @@ vitess/examples/compose$ ./lvtctl.sh ApplyVschema -vschema '{"sharded":false, "t ``` This has since been fixed by -https://github.com/vitessio/vitess/pull/4868 & https://github.com/vitessio/vitess/pull/5010 \ No newline at end of file +https://github.com/vitessio/vitess/pull/4868 & https://github.com/vitessio/vitess/pull/5010 diff --git a/examples/compose/external_db/docker-compose.yml b/examples/compose/external_db/docker-compose.yml index 5b3b28f1f9e..b0b1e58f9fd 100644 --- a/examples/compose/external_db/docker-compose.yml +++ b/examples/compose/external_db/docker-compose.yml @@ -17,7 +17,6 @@ services: volumes: - vol-db:/var/lib/mysql - ./mysql/:/docker-entrypoint-initdb.d/ - - ./mysql/master_mysql56.cnf:/etc/mysql/conf.d/master_mysql56.cnf - ./mysql/query.log:/var/log/mysql/query.log - ./mysql/slow.log:/var/log/mysql/slow.log healthcheck: diff --git a/examples/compose/vttablet-up.sh b/examples/compose/vttablet-up.sh index 00369ecdd22..3df619ce20a 100755 --- a/examples/compose/vttablet-up.sh +++ b/examples/compose/vttablet-up.sh @@ -70,7 +70,6 @@ if [ $tablet_role != "master" ]; then fi # Enforce Row Based Replication export EXTRA_MY_CNF=$VTROOT/config/mycnf/default-fast.cnf:$VTROOT/config/mycnf/rbr.cnf -export EXTRA_MY_CNF=$EXTRA_MY_CNF:$VTROOT/config/mycnf/master_mysql56.cnf mkdir -p $VTDATAROOT/backups @@ -182,4 +181,4 @@ exec $VTROOT/bin/vttablet \ -backup_storage_implementation file \ -file_backup_storage_root $VTDATAROOT/backups \ -queryserver-config-schema-reload-time 60 \ - $external_db_args \ No newline at end of file + $external_db_args diff --git a/examples/kubernetes/vttablet-pod-benchmarking-template.yaml b/examples/kubernetes/vttablet-pod-benchmarking-template.yaml index 8f24da64f54..f1cc59e7101 100644 --- a/examples/kubernetes/vttablet-pod-benchmarking-template.yaml +++ b/examples/kubernetes/vttablet-pod-benchmarking-template.yaml @@ -87,9 +87,6 @@ spec: -tablet_uid {{uid}} -socket_file $VTDATAROOT/mysqlctl.sock -init_db_sql_file $VTROOT/config/init_db.sql" vitess - env: - - name: EXTRA_MY_CNF - value: /vt/config/mycnf/benchmark.cnf:/vt/config/mycnf/master_mysql56.cnf volumes: - name: syslog hostPath: {path: /dev/log} diff --git a/examples/kubernetes/vttablet-pod-template.yaml b/examples/kubernetes/vttablet-pod-template.yaml index 52e19aad80f..6be80409cae 100644 --- a/examples/kubernetes/vttablet-pod-template.yaml +++ b/examples/kubernetes/vttablet-pod-template.yaml @@ -69,9 +69,6 @@ spec: -orc_api_url http://orchestrator/api -orc_discover_interval 5m -restore_from_backup {{backup_flags}}" vitess - env: - - name: EXTRA_MY_CNF - value: /vt/config/mycnf/master_mysql56.cnf - name: mysql image: {{vitess_image}} volumeMounts: @@ -96,9 +93,6 @@ spec: -tablet_uid {{uid}} -socket_file $VTDATAROOT/mysqlctl.sock -init_db_sql_file $VTROOT/config/init_db.sql" vitess - env: - - name: EXTRA_MY_CNF - value: /vt/config/mycnf/master_mysql56.cnf volumes: - name: syslog hostPath: {path: /dev/log} diff --git a/go/vt/vttest/environment.go b/go/vt/vttest/environment.go index 35b80ce341f..551cc4e1d9b 100644 --- a/go/vt/vttest/environment.go +++ b/go/vt/vttest/environment.go @@ -116,22 +116,7 @@ func GetMySQLOptions(flavor string) (string, []string, error) { } mycnf := []string{} - switch flavor { - case "MariaDB103": - mycnf = append(mycnf, "config/mycnf/default-fast.cnf") - mycnf = append(mycnf, "config/mycnf/master_mariadb103.cnf") - case "MariaDB": - mycnf = append(mycnf, "config/mycnf/default-fast.cnf") - mycnf = append(mycnf, "config/mycnf/master_mariadb100.cnf") - case "MySQL80": - mycnf = append(mycnf, "config/mycnf/default-fast.cnf") - mycnf = append(mycnf, "config/mycnf/master_mysql80.cnf") - case "MySQL56": - mycnf = append(mycnf, "config/mycnf/default-fast.cnf") - mycnf = append(mycnf, "config/mycnf/master_mysql56.cnf") - default: - return "", nil, fmt.Errorf("unknown mysql flavor: %s", flavor) - } + mycnf = append(mycnf, "config/mycnf/default-fast.cnf") for i, cnf := range mycnf { mycnf[i] = path.Join(os.Getenv("VTTOP"), cnf) diff --git a/helm/vitess/README.md b/helm/vitess/README.md index 8e884b60117..5ff807a552d 100644 --- a/helm/vitess/README.md +++ b/helm/vitess/README.md @@ -392,7 +392,7 @@ metadata: data: extra.cnf: |- early-plugin-load=keyring_vault=keyring_vault.so - # this includes default rpl plugins, see https://github.com/vitessio/vitess/blob/master/config/mycnf/master_mysql56.cnf for details + # this includes default rpl plugins, see https://github.com/vitessio/vitess/blob/master/config/mycnf/master_mysql57.cnf for details plugin-load=rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so;keyring_udf=keyring_udf.so keyring_vault_config=/vt/usersecrets/vttablet-vault/vault.conf # load keyring configuration from secret innodb_encrypt_tables=ON # encrypt all tables by default diff --git a/py/vttest/mysql_flavor.py b/py/vttest/mysql_flavor.py index 26e8aaf3d49..3da1570e05d 100644 --- a/py/vttest/mysql_flavor.py +++ b/py/vttest/mysql_flavor.py @@ -49,7 +49,6 @@ class MariaDB(MysqlFlavor): def my_cnf(self): files = [ os.path.join(vttop, "config/mycnf/default-fast.cnf"), - os.path.join(vttop, "config/mycnf/master_mariadb100.cnf"), ] return ":".join(files) @@ -59,7 +58,6 @@ class MariaDB103(MysqlFlavor): def my_cnf(self): files = [ os.path.join(vttop, "config/mycnf/default-fast.cnf"), - os.path.join(vttop, "config/mycnf/master_mariadb103.cnf"), ] return ":".join(files) @@ -69,7 +67,6 @@ class MySQL56(MysqlFlavor): def my_cnf(self): files = [ os.path.join(vttop, "config/mycnf/default-fast.cnf"), - os.path.join(vttop, "config/mycnf/master_mysql56.cnf"), ] return ":".join(files) @@ -79,7 +76,6 @@ class MySQL80(MysqlFlavor): def my_cnf(self): files = [ os.path.join(vttop, "config/mycnf/default-fast.cnf"), - os.path.join(vttop, "config/mycnf/master_mysql80.cnf"), ] return ":".join(files) diff --git a/test/mysql_flavor.py b/test/mysql_flavor.py index 9cf057982ea..8102ad0cfd1 100644 --- a/test/mysql_flavor.py +++ b/test/mysql_flavor.py @@ -126,7 +126,7 @@ def reset_replication_commands(self): ] def extra_my_cnf(self): - return environment.vttop + "/config/mycnf/master_mariadb100.cnf" + return "" def master_position(self, tablet): gtid = tablet.mquery("", "SELECT @@GLOBAL.gtid_binlog_pos")[0][0] @@ -152,7 +152,7 @@ class MariaDB103(MariaDB): """Overrides specific to MariaDB 10.3+.""" def extra_my_cnf(self): - return environment.vttop + "/config/mycnf/master_mariadb103.cnf" + return "" class MySQL56(MysqlFlavor): """Overrides specific to MySQL 5.6/5.7""" @@ -172,7 +172,7 @@ def position_at_least(self, a, b): ]).strip() == "true" def extra_my_cnf(self): - return environment.vttop + "/config/mycnf/master_mysql56.cnf" + return "" def change_master_commands(self, host, port, pos): gtid = pos.split("/")[1] @@ -186,7 +186,7 @@ def change_master_commands(self, host, port, pos): class MySQL80(MySQL56): """Overrides specific to MySQL 8.0.""" def extra_my_cnf(self): - return environment.vttop + "/config/mycnf/master_mysql80.cnf" + return "" def change_passwords(self, password_col): """set real passwords for all users""" return ''' From a9578250d610d086a809ec9197e1bbf597e9216b Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 29 Oct 2019 21:30:56 -0600 Subject: [PATCH 03/22] Move undesired settings to default-fast.cnf Remove sbr.cnf Signed-off-by: Morgan Tocker --- config/mycnf/default-fast.cnf | 7 +++++-- config/mycnf/default.cnf | 5 ----- config/mycnf/sbr.cnf | 2 -- 3 files changed, 5 insertions(+), 9 deletions(-) delete mode 100644 config/mycnf/sbr.cnf diff --git a/config/mycnf/default-fast.cnf b/config/mycnf/default-fast.cnf index f7fde4463fd..62a7799b6a1 100644 --- a/config/mycnf/default-fast.cnf +++ b/config/mycnf/default-fast.cnf @@ -14,6 +14,9 @@ key_buffer_size = 2M sync_binlog=0 innodb_doublewrite=0 -# Some tests don't work with full strict yet -sql_mode='STRICT_TRANS_TABLES' +# These two settings are required for the testsuite to pass, +# # but enabling them does not spark joy. They should be removed +# # in the future. +binlog-format=statement +sql_mode = STRICT_TRANS_TABLES diff --git a/config/mycnf/default.cnf b/config/mycnf/default.cnf index 3a6ee12d08e..df2e7017416 100644 --- a/config/mycnf/default.cnf +++ b/config/mycnf/default.cnf @@ -31,10 +31,5 @@ connect_timeout = 30 innodb_lock_wait_timeout = 20 max_allowed_packet = 64M -# These two settings are required for the testsuite to pass, -# but enabling them does not spark joy. They should be removed -# in the future. -binlog-format=statement -sql_mode = STRICT_TRANS_TABLES diff --git a/config/mycnf/sbr.cnf b/config/mycnf/sbr.cnf deleted file mode 100644 index 12fb1267e59..00000000000 --- a/config/mycnf/sbr.cnf +++ /dev/null @@ -1,2 +0,0 @@ -# This file is used to allow legacy tests to pass -binlog_format=statement From f079b1f1e44b4306d8f6200088f3e4f4e6fc7f34 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 29 Oct 2019 22:07:14 -0600 Subject: [PATCH 04/22] Bump CI Signed-off-by: Morgan Tocker --- config/mycnf/default-fast.cnf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/mycnf/default-fast.cnf b/config/mycnf/default-fast.cnf index 62a7799b6a1..2bd08dc8bd5 100644 --- a/config/mycnf/default-fast.cnf +++ b/config/mycnf/default-fast.cnf @@ -15,8 +15,8 @@ sync_binlog=0 innodb_doublewrite=0 # These two settings are required for the testsuite to pass, -# # but enabling them does not spark joy. They should be removed -# # in the future. +# but enabling them does not spark joy. They should be removed +# in the future. binlog-format=statement sql_mode = STRICT_TRANS_TABLES From 98d34fb9d9737a8112830e6c90602658a2893863 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sun, 3 Nov 2019 07:33:31 -0700 Subject: [PATCH 05/22] Add myself to CODEOWNERS Add links to issues for why testsuite requires SBR, sql-mode Signed-off-by: Morgan Tocker --- .github/CODEOWNERS | 2 ++ config/mycnf/default-fast.cnf | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 32f0a000966..b4b82473d97 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -2,3 +2,5 @@ /docker/ @derekperkins @dkhenry /helm/ @derekperkins @dkhenry +/config/mycnf/ @morgo +/go/vt/mysqlctl/mysqld.go @morgo diff --git a/config/mycnf/default-fast.cnf b/config/mycnf/default-fast.cnf index 2bd08dc8bd5..e3d852fcfc0 100644 --- a/config/mycnf/default-fast.cnf +++ b/config/mycnf/default-fast.cnf @@ -16,7 +16,9 @@ innodb_doublewrite=0 # These two settings are required for the testsuite to pass, # but enabling them does not spark joy. They should be removed -# in the future. +# in the future. See: +# https://github.com/vitessio/vitess/issues/5395 +# https://github.com/vitessio/vitess/issues/5396 binlog-format=statement sql_mode = STRICT_TRANS_TABLES From 49cd2b9d2b8e91e463dc35b60658185f0e83322b Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 4 Nov 2019 08:16:46 -0700 Subject: [PATCH 06/22] Empty commit Signed-off-by: Morgan Tocker From dfb3fe818889841c8417ea493da767a8fb1d61c7 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 11 Nov 2019 17:20:09 -0700 Subject: [PATCH 07/22] Add back max_connections based on review Signed-off-by: Morgan Tocker --- config/mycnf/default.cnf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/mycnf/default.cnf b/config/mycnf/default.cnf index df2e7017416..b8af15801b0 100644 --- a/config/mycnf/default.cnf +++ b/config/mycnf/default.cnf @@ -30,6 +30,6 @@ skip-name-resolve connect_timeout = 30 innodb_lock_wait_timeout = 20 max_allowed_packet = 64M - +max_connections = 500 From 464511d5c315581eb6e7d6dd7da6ade00315ca6e Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 11 Nov 2019 17:45:11 -0800 Subject: [PATCH 08/22] Empty commit Signed-off-by: Morgan Tocker From 2d3675bb5eb3498fa043233fef7c8f82dafe1db9 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 13 Nov 2019 10:54:11 -0800 Subject: [PATCH 09/22] Fix MySQL 8.0 unknown variable issue Signed-off-by: Morgan Tocker --- config/mycnf/master_mysql80.cnf | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/config/mycnf/master_mysql80.cnf b/config/mycnf/master_mysql80.cnf index 42c83f7ecec..6c3d77d5135 100644 --- a/config/mycnf/master_mysql80.cnf +++ b/config/mycnf/master_mysql80.cnf @@ -23,10 +23,9 @@ default_authentication_plugin = mysql_native_password # promoted or demoted. plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so -# When semi-sync is enabled, don't allow fallback to async -# if you get no ack, or have no slaves. This is necessary to -# prevent alternate futures when doing a failover in response to -# a master that becomes unresponsive. -rpl_semi_sync_master_timeout = 1000000000000000000 -rpl_semi_sync_master_wait_no_slave = 1 +# MySQL 8.0 will not load plugins during --initialize +# which makes these options unknown. Prefixing with --loose +# tells the server it's fine if they are not understood. +loose_rpl_semi_sync_master_timeout = 1000000000000000000 +loose_rpl_semi_sync_master_wait_no_slave = 1 From c398a7d35faab280069e2bb3e9468a27524e9cac Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 13 Nov 2019 13:30:44 -0800 Subject: [PATCH 10/22] Move worker test to be earlier Eliminate that it isn't transient state causing it to fail. Signed-off-by: Morgan Tocker --- test/config.json | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/test/config.json b/test/config.json index c303b4a6398..450ef5e6200 100644 --- a/test/config.json +++ b/test/config.json @@ -1,5 +1,16 @@ { "Tests": { + "1worker": { + "File": "worker.py", + "Args": [], + "Command": [], + "Manual": false, + "Shard": 2, + "RetryMax": 0, + "Tags": [ + "worker_test" + ] + }, "backup": { "File": "backup.py", "Args": [], @@ -573,17 +584,6 @@ "RetryMax": 0, "Tags": [] }, - "worker": { - "File": "worker.py", - "Args": [], - "Command": [], - "Manual": false, - "Shard": 2, - "RetryMax": 0, - "Tags": [ - "worker_test" - ] - }, "xtrabackup": { "File": "xtrabackup.py", "Args": [], From 1edf7f4ba98f3d6442ce9ee8a808d4eb6d16bdc3 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 26 Nov 2019 09:18:05 -0700 Subject: [PATCH 11/22] Move worker to shard 5 Merge in MariaDB 10.4 and use new config layout. Signed-off-by: Morgan Tocker --- config/mycnf/master_mariadb104.cnf | 30 +++++++++++++++++++----------- test/config.json | 22 +++++++++++----------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/config/mycnf/master_mariadb104.cnf b/config/mycnf/master_mariadb104.cnf index a144f352561..9444e8d67fa 100644 --- a/config/mycnf/master_mariadb104.cnf +++ b/config/mycnf/master_mariadb104.cnf @@ -4,20 +4,28 @@ gtid_strict_mode = 1 innodb_stats_persistent = 0 -# Semi-sync replication is required for automated unplanned failover -# (when the master goes away). Here we just load the plugin so it's -# available if desired, but it's disabled at startup. -# -# If the -enable_semi_sync flag is used, VTTablet will enable semi-sync -# at the proper time when replication is set up, or when masters are -# promoted or demoted. - -# semi_sync has been merged into master as of mariadb 10.3 so this is no longer needed -#plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so - # When semi-sync is enabled, don't allow fallback to async # if you get no ack, or have no slaves. This is necessary to # prevent alternate futures when doing a failover in response to # a master that becomes unresponsive. rpl_semi_sync_master_timeout = 1000000000000000000 rpl_semi_sync_master_wait_no_slave = 1 + + +character_set_server = utf8 +collation_server = utf8_general_ci + +expire_logs_days = 3 + +log_bin +sync_binlog = 1 +binlog_format = ROW +log_slave_updates +expire_logs_days = 3 + +# In MariaDB the default charset is latin1 + +character_set_server = utf8 +collation_server = utf8_general_ci + + diff --git a/test/config.json b/test/config.json index 81abf1d7930..9523cbed562 100644 --- a/test/config.json +++ b/test/config.json @@ -1,16 +1,5 @@ { "Tests": { - "1worker": { - "File": "worker.py", - "Args": [], - "Command": [], - "Manual": false, - "Shard": 2, - "RetryMax": 0, - "Tags": [ - "worker_test" - ] - }, "backup": { "File": "backup.py", "Args": [], @@ -596,6 +585,17 @@ "RetryMax": 0, "Tags": [] }, + "worker": { + "File": "worker.py", + "Args": [], + "Command": [], + "Manual": false, + "Shard": 5, + "RetryMax": 0, + "Tags": [ + "worker_test" + ] + }, "xtrabackup": { "File": "xtrabackup.py", "Args": [], From cc339c40758ca34c41f2f41c16ff4dff351066a1 Mon Sep 17 00:00:00 2001 From: Arindam Nayak Date: Wed, 27 Nov 2019 15:22:29 +0530 Subject: [PATCH 12/22] added fix for e2e intermittent test failure Signed-off-by: Arindam Nayak --- go/test/endtoend/cluster/cluster_process.go | 27 +++++++++++++- go/test/endtoend/cluster/vtgate_process.go | 36 +++++++++++++++++-- .../endtoend/clustertest/add_keyspace_test.go | 1 + 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 780917918ce..5f2e1cc4d43 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -253,7 +253,12 @@ func (cluster *LocalProcessCluster) StartVtgate() (err error) { cluster.VtGateExtraArgs) log.Info(fmt.Sprintf("Vtgate started, connect to mysql using : mysql -h 127.0.0.1 -P %d", cluster.VtgateMySQLPort)) - return cluster.VtgateProcess.Setup() + err = cluster.VtgateProcess.Setup() + if err != nil { + return + } + cluster.WaitForTabletsToHealthyInVtgate() + return nil } // NewCluster instantiates a new cluster @@ -282,6 +287,26 @@ func (cluster *LocalProcessCluster) ReStartVtgate() (err error) { return err } +// WaitForTabletsToHealthyInVtgate waits for all tablets in all shards to be healthy as per vtgate +func (cluster *LocalProcessCluster) WaitForTabletsToHealthyInVtgate() { + var isRdOnlyPresent bool + for _, keyspace := range cluster.Keyspaces { + for _, shard := range keyspace.Shards { + isRdOnlyPresent = false + _ = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", keyspace.Name, shard.Name)) + _ = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspace.Name, shard.Name)) + for _, tablet := range shard.Vttablets { + if tablet.Type == "rdonly" { + isRdOnlyPresent = true + } + } + if isRdOnlyPresent { + _ = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspace.Name, shard.Name)) + } + } + } +} + // Teardown brings down the cluster by invoking teardown for individual processes func (cluster *LocalProcessCluster) Teardown() (err error) { if err = cluster.VtgateProcess.TearDown(); err != nil { diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index e2b771735ad..2f98642a305 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -119,6 +119,18 @@ func (vtgate *VtgateProcess) Setup() (err error) { // WaitForStatus function checks if vtgate process is up and running func (vtgate *VtgateProcess) WaitForStatus() bool { + resp, err := http.Get(vtgate.VerifyURL) + if err != nil { + return false + } + if resp.StatusCode == 200 { + return true + } + return false +} + +// GetStatusForTabletOfShard function gets status for a specific tablet of a shard in keyspace +func (vtgate *VtgateProcess) GetStatusForTabletOfShard(name string) bool { resp, err := http.Get(vtgate.VerifyURL) if err != nil { return false @@ -134,10 +146,11 @@ func (vtgate *VtgateProcess) WaitForStatus() bool { masterConnectionExist := false if object.Kind() == reflect.Map { for _, key := range object.MapKeys() { - - if strings.Contains(key.String(),"master") { - masterConnectionExist = true + if key.String() == name { + value := fmt.Sprintf("%v", object.MapIndex(key)) + return value == "1" } + } } return masterConnectionExist @@ -145,6 +158,23 @@ func (vtgate *VtgateProcess) WaitForStatus() bool { return false } +// WaitForStatusOfTabletInShard function waits till status of a tablet in shard is 1 +func (vtgate *VtgateProcess) WaitForStatusOfTabletInShard(name string) error { + timeout := time.Now().Add(10 * time.Second) + for time.Now().Before(timeout) { + if vtgate.GetStatusForTabletOfShard(name) { + return nil + } + select { + case err := <-vtgate.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", vtgate.Name, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + return fmt.Errorf("wait for %s failed", name) +} + // TearDown shuts down the running vtgate service func (vtgate *VtgateProcess) TearDown() error { if vtgate.proc == nil || vtgate.exit == nil { diff --git a/go/test/endtoend/clustertest/add_keyspace_test.go b/go/test/endtoend/clustertest/add_keyspace_test.go index e305866b752..aface0b1467 100644 --- a/go/test/endtoend/clustertest/add_keyspace_test.go +++ b/go/test/endtoend/clustertest/add_keyspace_test.go @@ -64,6 +64,7 @@ func TestAddKeyspace(t *testing.T) { // Restart vtgate process _ = clusterInstance.VtgateProcess.TearDown() _ = clusterInstance.VtgateProcess.Setup() + clusterInstance.WaitForTabletsToHealthyInVtgate() ctx := context.Background() vtParams := mysql.ConnParams{ From ee638b159de8a8c4d40bc94a45ab23ea7a5a0b9f Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Tue, 9 Jul 2019 21:19:01 -0700 Subject: [PATCH 13/22] resharding_journal: vstreamer side Signed-off-by: Sugu Sougoumarane --- go/vt/vttablet/tabletserver/schema/engine.go | 11 ++ .../tabletserver/schema/load_table.go | 9 ++ .../tabletserver/vstreamer/vstreamer.go | 139 +++++++++++++----- .../tabletserver/vstreamer/vstreamer_test.go | 39 +++++ 4 files changed, 165 insertions(+), 33 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 202495e231c..0a1acb44dd5 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -306,6 +306,17 @@ func (se *Engine) Reload(ctx context.Context) error { return rec.Error() } +// LoadTableBasic loads a table with minimal info. This is used by vstreamer +// to load _vt.resharding_journal. +func (se *Engine) LoadTableBasic(ctx context.Context, tableName string) (*Table, error) { + conn, err := se.conns.Get(ctx) + if err != nil { + return nil, err + } + defer conn.Recycle() + return LoadTableBasic(conn, tableName) +} + func (se *Engine) mysqlTime(ctx context.Context, conn *connpool.DBConn) (int64, error) { tm, err := conn.Exec(ctx, "select unix_timestamp()", 1, false) if err != nil { diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index 273b41a89db..b530dcb09da 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -54,6 +54,15 @@ func LoadTable(conn *connpool.DBConn, tableName string, tableType string, commen return ta, nil } +// LoadTableBaisc creates a Table with just the column info loaded. +func LoadTableBasic(conn *connpool.DBConn, tableName string) (*Table, error) { + ta := NewTable(tableName) + if err := fetchColumns(ta, conn, tableName); err != nil { + return nil, err + } + return ta, nil +} + func fetchColumns(ta *Table, conn *connpool.DBConn, sqlTableName string) error { qr, err := conn.Exec(tabletenv.LocalContext(), fmt.Sprintf("select * from %s where 1 != 1", sqlTableName), 0, true) if err != nil { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 4a6ec20871b..175837b9430 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -23,6 +23,7 @@ import ( "io" "time" + "github.com/golang/protobuf/proto" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog" @@ -53,9 +54,10 @@ type vstreamer struct { send func([]*binlogdatapb.VEvent) error // A kschema is a VSchema for just one keyspace. - kevents chan *vindexes.KeyspaceSchema - kschema *vindexes.KeyspaceSchema - plans map[uint64]*streamerPlan + kevents chan *vindexes.KeyspaceSchema + kschema *vindexes.KeyspaceSchema + plans map[uint64]*streamerPlan + journalTableID uint64 // format and pos are updated by parseEvent. format mysql.BinlogFormat @@ -142,8 +144,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog // If a single row exceeds the packet size, it will be in its own packet. bufferAndTransmit := func(vevent *binlogdatapb.VEvent) error { switch vevent.Type { - case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD: - // We never have to send GTID, BEGIN or FIELD events on their own. + case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD, binlogdatapb.VEventType_JOURNAL: + // We never have to send GTID, BEGIN, FIELD events on their own. bufferedEvents = append(bufferedEvents, vevent) case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_HEARTBEAT: // COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent. @@ -343,7 +345,31 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e return nil, err } // We have to build a plan only for new ids. - if _, ok := vs.plans[id]; ok { + if _, ok := vs.plans[id]; ok || id == vs.journalTableID { + return nil, nil + } + if tm.Database == "_vt" && tm.Name == "resharding_journal" { + st, err := vs.se.LoadTableBasic(vs.ctx, "_vt.resharding_journal") + if err != nil { + return nil, err + } + // Partially duplicated code from below. + if len(st.Columns) < len(tm.Types) { + return nil, fmt.Errorf("cannot determine table columns for %s: event has %d columns, current schema has %d: %#v", tm.Name, len(tm.Types), len(st.Columns), ev) + } + table := &Table{ + Name: "_vt.resharding_journal", + Columns: st.Columns[:len(tm.Types)], + } + plan, err := buildREPlan(table, nil, "") + if err != nil { + return nil, err + } + vs.plans[id] = &streamerPlan{ + Plan: plan, + TableMap: tm, + } + vs.journalTableID = id return nil, nil } if tm.Database != "" && tm.Database != vs.cp.DbName { @@ -429,36 +455,83 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e if err != nil { return nil, err } - rowChanges := make([]*binlogdatapb.RowChange, 0, len(rows.Rows)) - for _, row := range rows.Rows { - beforeOK, beforeValues, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns) - if err != nil { - return nil, err - } - afterOK, afterValues, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) - if err != nil { - return nil, err - } - if !beforeOK && !afterOK { - continue + if id == vs.journalTableID { + nextrow: + for _, row := range rows.Rows { + afterOK, afterValues, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) + if err != nil { + return nil, err + } + if !afterOK { + continue + } + for i, fld := range plan.fields() { + switch fld.Name { + case "db_name": + if afterValues[i].ToString() != vs.cp.DbName { + continue nextrow + } + case "val": + journal := &binlogdatapb.Journal{} + if err := proto.UnmarshalText(afterValues[i].ToString(), journal); err != nil { + return nil, err + } + switch journal.MigrationType { + case binlogdatapb.MigrationType_SHARDS: + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_JOURNAL, + Journal: journal, + }) + case binlogdatapb.MigrationType_TABLES: + matched := false + for _, table := range journal.Tables { + tname := sqlparser.TableName{Name: sqlparser.NewTableIdent(table)} + if tableMatches(tname, "", vs.filter) { + matched = true + } + } + if matched { + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_JOURNAL, + Journal: journal, + }) + } + } + } + } } - rowChange := &binlogdatapb.RowChange{} - if beforeOK { - rowChange.Before = sqltypes.RowToProto3(beforeValues) + } else { + rowChanges := make([]*binlogdatapb.RowChange, 0, len(rows.Rows)) + for _, row := range rows.Rows { + beforeOK, beforeValues, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns) + if err != nil { + return nil, err + } + afterOK, afterValues, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) + if err != nil { + return nil, err + } + if !beforeOK && !afterOK { + continue + } + rowChange := &binlogdatapb.RowChange{} + if beforeOK { + rowChange.Before = sqltypes.RowToProto3(beforeValues) + } + if afterOK { + rowChange.After = sqltypes.RowToProto3(afterValues) + } + rowChanges = append(rowChanges, rowChange) } - if afterOK { - rowChange.After = sqltypes.RowToProto3(afterValues) + if len(rowChanges) != 0 { + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_ROW, + RowEvent: &binlogdatapb.RowEvent{ + TableName: plan.Table.Name, + RowChanges: rowChanges, + }, + }) } - rowChanges = append(rowChanges, rowChange) - } - if len(rowChanges) != 0 { - vevents = append(vevents, &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_ROW, - RowEvent: &binlogdatapb.RowEvent{ - TableName: plan.Table.Name, - RowChanges: rowChanges, - }, - }) } } for _, vevent := range vevents { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index dac50c5c62f..32126359e2e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -813,6 +813,45 @@ func TestExternalTable(t *testing.T) { runCases(t, nil, testcases, "") } +func TestJournal(t *testing.T) { + if testing.Short() { + t.Skip() + } + + execStatements(t, []string{ + "create table _vt.resharding_journal(id int, db_name varchar(128), val blob, primary key(id))", + }) + defer execStatements(t, []string{ + "drop table _vt.resharding_journal", + }) + engine.se.Reload(context.Background()) + + journal1 := &binlogdatapb.Journal{ + Id: 1, + MigrationType: binlogdatapb.MigrationType_SHARDS, + } + journal2 := &binlogdatapb.Journal{ + Id: 2, + MigrationType: binlogdatapb.MigrationType_SHARDS, + } + testcases := []testcase{{ + input: []string{ + "begin", + fmt.Sprintf("insert into _vt.resharding_journal values(1, 'vttest', '%v')", journal1.String()), + fmt.Sprintf("insert into _vt.resharding_journal values(2, 'nosend', '%v')", journal2.String()), + "commit", + }, + // External table events don't get sent. + output: [][]string{{ + `gtid|begin`, + `gtid|begin`, + `type:JOURNAL journal: `, + `commit`, + }}, + }} + runCases(t, nil, testcases) +} + func TestMinimalMode(t *testing.T) { if testing.Short() { t.Skip() From 961060345032af6befea58172c5586ebad290a58 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sun, 20 Oct 2019 19:26:46 -0700 Subject: [PATCH 14/22] resharding_journal: vstreamer side refactored Signed-off-by: Sugu Sougoumarane --- .../tabletserver/vstreamer/vstreamer.go | 328 +++++++++--------- .../tabletserver/vstreamer/vstreamer_test.go | 2 +- 2 files changed, 171 insertions(+), 159 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 175837b9430..a826a19d312 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -340,106 +340,27 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e case ev.IsTableMap(): // This is very frequent. It precedes every row event. id := ev.TableID(vs.format) + if _, ok := vs.plans[id]; ok { + return nil, nil + } tm, err := ev.TableMap(vs.format) if err != nil { return nil, err } - // We have to build a plan only for new ids. - if _, ok := vs.plans[id]; ok || id == vs.journalTableID { - return nil, nil - } if tm.Database == "_vt" && tm.Name == "resharding_journal" { - st, err := vs.se.LoadTableBasic(vs.ctx, "_vt.resharding_journal") - if err != nil { - return nil, err - } - // Partially duplicated code from below. - if len(st.Columns) < len(tm.Types) { - return nil, fmt.Errorf("cannot determine table columns for %s: event has %d columns, current schema has %d: %#v", tm.Name, len(tm.Types), len(st.Columns), ev) - } - table := &Table{ - Name: "_vt.resharding_journal", - Columns: st.Columns[:len(tm.Types)], - } - plan, err := buildREPlan(table, nil, "") - if err != nil { - return nil, err - } - vs.plans[id] = &streamerPlan{ - Plan: plan, - TableMap: tm, - } - vs.journalTableID = id - return nil, nil + return nil, vs.buildJournalPlan(id, tm) } if tm.Database != "" && tm.Database != vs.cp.DbName { vs.plans[id] = nil return nil, nil } - tableName := tm.Name - var cols []schema.TableColumn - for i, typ := range tm.Types { - t, err := sqltypes.MySQLToType(int64(typ), 0) - if err != nil { - return nil, fmt.Errorf("unsupported type: %d, position: %d", typ, i) - } - cols = append(cols, schema.TableColumn{ - Name: sqlparser.NewColIdent(fmt.Sprintf("@%d", i+1)), - Type: t, - }) - } - st := vs.se.GetTable(sqlparser.NewTableIdent(tm.Name)) - if st == nil { - if vs.filter.FieldEventMode == binlogdatapb.Filter_ERR_ON_MISMATCH { - return nil, fmt.Errorf("unknown table %v in schema", tm.Name) - } - } else { - if len(st.Columns) < len(tm.Types) && vs.filter.FieldEventMode == binlogdatapb.Filter_ERR_ON_MISMATCH { - return nil, fmt.Errorf("cannot determine table columns for %s: event has %d columns, current schema has %d: %#v", tm.Name, len(tm.Types), len(st.Columns), ev) - } - tableName = st.Name.String() - // check if the schema returned by schema.Engine matches with row. - schemaMatch := true - if len(tm.Types) <= len(st.Columns) { - for i := range tm.Types { - t := cols[i].Type - if !sqltypes.AreTypesEquivalent(t, st.Columns[i].Type) { - schemaMatch = false - break - } - } - } else { - schemaMatch = false - } - if schemaMatch { - // Columns should be truncated to match those in tm. - cols = st.Columns[:len(tm.Types)] - } - } - - table := &Table{ - Name: tableName, - Columns: cols, - } - plan, err := buildPlan(table, vs.kschema, vs.filter) + vevent, err := vs.buildTablePlan(id, tm) if err != nil { return nil, err } - if plan == nil { - vs.plans[id] = nil - return nil, nil + if vevent != nil { + vevents = append(vevents, vevent) } - vs.plans[id] = &streamerPlan{ - Plan: plan, - TableMap: tm, - } - vevents = append(vevents, &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_FIELD, - FieldEvent: &binlogdatapb.FieldEvent{ - TableName: plan.Table.Name, - Fields: plan.fields(), - }, - }) case ev.IsWriteRows() || ev.IsDeleteRows() || ev.IsUpdateRows(): // The existence of before and after images can be used to // identify statememt types. It's also possible that the @@ -456,87 +377,178 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e return nil, err } if id == vs.journalTableID { - nextrow: - for _, row := range rows.Rows { - afterOK, afterValues, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) - if err != nil { - return nil, err - } - if !afterOK { - continue - } - for i, fld := range plan.fields() { - switch fld.Name { - case "db_name": - if afterValues[i].ToString() != vs.cp.DbName { - continue nextrow - } - case "val": - journal := &binlogdatapb.Journal{} - if err := proto.UnmarshalText(afterValues[i].ToString(), journal); err != nil { - return nil, err - } - switch journal.MigrationType { - case binlogdatapb.MigrationType_SHARDS: - vevents = append(vevents, &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_JOURNAL, - Journal: journal, - }) - case binlogdatapb.MigrationType_TABLES: - matched := false - for _, table := range journal.Tables { - tname := sqlparser.TableName{Name: sqlparser.NewTableIdent(table)} - if tableMatches(tname, "", vs.filter) { - matched = true - } - } - if matched { - vevents = append(vevents, &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_JOURNAL, - Journal: journal, - }) - } - } - } - } - } + vevents, err = vs.processJounalEvent(vevents, plan, rows) } else { - rowChanges := make([]*binlogdatapb.RowChange, 0, len(rows.Rows)) - for _, row := range rows.Rows { - beforeOK, beforeValues, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns) - if err != nil { - return nil, err + vevents, err = vs.processRowEvent(vevents, plan, rows) + } + if err != nil { + return nil, err + } + } + for _, vevent := range vevents { + vevent.Timestamp = int64(ev.Timestamp()) + vevent.CurrentTime = time.Now().UnixNano() + } + return vevents, nil +} + +func (vs *vstreamer) buildJournalPlan(id uint64, tm *mysql.TableMap) error { + st, err := vs.se.LoadTableBasic(vs.ctx, "_vt.resharding_journal") + if err != nil { + return err + } + if len(st.Columns) < len(tm.Types) { + return fmt.Errorf("cannot determine table columns for %s: event has %v, schema as %v", tm.Name, tm.Types, st.Columns) + } + table := &Table{ + Name: "_vt.resharding_journal", + Columns: st.Columns[:len(tm.Types)], + } + plan, err := buildREPlan(table, nil, "") + if err != nil { + return err + } + vs.plans[id] = &streamerPlan{ + Plan: plan, + TableMap: tm, + } + vs.journalTableID = id + return nil +} + +func (vs *vstreamer) buildTablePlan(id uint64, tm *mysql.TableMap) (*binlogdatapb.VEvent, error) { + cols, err := vs.buildTableColumns(id, tm) + if err != nil { + return nil, err + } + + table := &Table{ + Name: tm.Name, + Columns: cols, + } + plan, err := buildPlan(table, vs.kschema, vs.filter) + if err != nil { + return nil, err + } + if plan == nil { + vs.plans[id] = nil + return nil, nil + } + vs.plans[id] = &streamerPlan{ + Plan: plan, + TableMap: tm, + } + return &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_FIELD, + FieldEvent: &binlogdatapb.FieldEvent{ + TableName: plan.Table.Name, + Fields: plan.fields(), + }, + }, nil +} + +func (vs *vstreamer) buildTableColumns(id uint64, tm *mysql.TableMap) ([]schema.TableColumn, error) { + var cols []schema.TableColumn + for i, typ := range tm.Types { + t, err := sqltypes.MySQLToType(int64(typ), 0) + if err != nil { + return nil, fmt.Errorf("unsupported type: %d, position: %d", typ, i) + } + cols = append(cols, schema.TableColumn{ + Name: sqlparser.NewColIdent(fmt.Sprintf("@%d", i+1)), + Type: t, + }) + } + + st := vs.se.GetTable(sqlparser.NewTableIdent(tm.Name)) + if st == nil { + if vs.filter.FieldEventMode == binlogdatapb.Filter_ERR_ON_MISMATCH { + return nil, fmt.Errorf("unknown table %v in schema", tm.Name) + } + return cols, nil + } + + if len(st.Columns) < len(tm.Types) { + if vs.filter.FieldEventMode == binlogdatapb.Filter_ERR_ON_MISMATCH { + return nil, fmt.Errorf("cannot determine table columns for %s: event has %v, schema as %v", tm.Name, tm.Types, st.Columns) + } + return cols, nil + } + + // check if the schema returned by schema.Engine matches with row. + for i := range tm.Types { + if !sqltypes.AreTypesEquivalent(cols[i].Type, st.Columns[i].Type) { + return cols, nil + } + } + + // Columns should be truncated to match those in tm. + cols = st.Columns[:len(tm.Types)] + return cols, nil +} + +func (vs *vstreamer) processJounalEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) { +nextrow: + for _, row := range rows.Rows { + afterOK, afterValues, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) + if err != nil { + return nil, err + } + if !afterOK { + continue + } + for i, fld := range plan.fields() { + switch fld.Name { + case "db_name": + if afterValues[i].ToString() != vs.cp.DbName { + continue nextrow } - afterOK, afterValues, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) - if err != nil { + case "val": + journal := &binlogdatapb.Journal{} + if err := proto.UnmarshalText(afterValues[i].ToString(), journal); err != nil { return nil, err } - if !beforeOK && !afterOK { - continue - } - rowChange := &binlogdatapb.RowChange{} - if beforeOK { - rowChange.Before = sqltypes.RowToProto3(beforeValues) - } - if afterOK { - rowChange.After = sqltypes.RowToProto3(afterValues) - } - rowChanges = append(rowChanges, rowChange) - } - if len(rowChanges) != 0 { vevents = append(vevents, &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_ROW, - RowEvent: &binlogdatapb.RowEvent{ - TableName: plan.Table.Name, - RowChanges: rowChanges, - }, + Type: binlogdatapb.VEventType_JOURNAL, + Journal: journal, }) } } } - for _, vevent := range vevents { - vevent.Timestamp = int64(ev.Timestamp()) - vevent.CurrentTime = time.Now().UnixNano() + return vevents, nil +} + +func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) { + rowChanges := make([]*binlogdatapb.RowChange, 0, len(rows.Rows)) + for _, row := range rows.Rows { + beforeOK, beforeValues, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns) + if err != nil { + return nil, err + } + afterOK, afterValues, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) + if err != nil { + return nil, err + } + if !beforeOK && !afterOK { + continue + } + rowChange := &binlogdatapb.RowChange{} + if beforeOK { + rowChange.Before = sqltypes.RowToProto3(beforeValues) + } + if afterOK { + rowChange.After = sqltypes.RowToProto3(afterValues) + } + rowChanges = append(rowChanges, rowChange) + } + if len(rowChanges) != 0 { + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_ROW, + RowEvent: &binlogdatapb.RowEvent{ + TableName: plan.Table.Name, + RowChanges: rowChanges, + }, + }) } return vevents, nil } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 32126359e2e..45c83b6a802 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -849,7 +849,7 @@ func TestJournal(t *testing.T) { `commit`, }}, }} - runCases(t, nil, testcases) + runCases(t, nil, testcases, "") } func TestMinimalMode(t *testing.T) { From 831e7e849a2592c01d0b8e03115988d9ba06b039 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Wed, 10 Jul 2019 12:40:10 -0700 Subject: [PATCH 15/22] resharding_journal: vplayer side Signed-off-by: Sugu Sougoumarane --- .../tabletmanager/vreplication/controller.go | 10 +- .../vreplication/controller_test.go | 16 +- .../tabletmanager/vreplication/engine.go | 142 +++++++++++++++++- .../tabletmanager/vreplication/vplayer.go | 25 +++ .../tabletmanager/vreplication/vreplicator.go | 4 +- 5 files changed, 182 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 85c957ecb3f..e8533cb7afc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -49,11 +49,13 @@ var ( // There is no mutex within a controller becaust its members are // either read-only or self-synchronized. type controller struct { + vre *Engine dbClientFactory func() binlogplayer.DBClient mysqld mysqlctl.MysqlDaemon blpStats *binlogplayer.Stats id uint32 + workflow string source binlogdatapb.BinlogSource stopPos string tabletPicker *discovery.TabletPicker @@ -67,11 +69,12 @@ type controller struct { // newController creates a new controller. Unless a stream is explicitly 'Stopped', // this function launches a goroutine to perform continuous vreplication. -func newController(ctx context.Context, params map[string]string, dbClientFactory func() binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, ts *topo.Server, cell, tabletTypesStr string, blpStats *binlogplayer.Stats) (*controller, error) { +func newController(ctx context.Context, params map[string]string, dbClientFactory func() binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, ts *topo.Server, cell, tabletTypesStr string, blpStats *binlogplayer.Stats, vre *Engine) (*controller, error) { if blpStats == nil { blpStats = binlogplayer.NewStats() } ct := &controller{ + vre: vre, dbClientFactory: dbClientFactory, mysqld: mysqld, blpStats: blpStats, @@ -84,6 +87,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor return nil, err } ct.id = uint32(id) + ct.workflow = params["workflow"] // Nothing to do if replication is stopped. if params["state"] == binlogplayer.BlpStopped { @@ -102,7 +106,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor if v, ok := params["cell"]; ok { cell = v } - if v, ok := params["tablet_types"]; ok { + if v := params["tablet_types"]; v != "" { tabletTypesStr = v } tp, err := discovery.NewTabletPicker(ctx, ts, cell, ct.source.Keyspace, ct.source.Shard, tabletTypesStr, *healthcheckTopologyRefresh, *healthcheckRetryDelay, *healthcheckTimeout) @@ -205,7 +209,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { if _, err := dbClient.ExecuteFetch("set names binary", 10000); err != nil { return err } - vreplicator := newVReplicator(ct.id, &ct.source, tablet, ct.blpStats, dbClient, ct.mysqld) + vreplicator := newVReplicator(ct.id, &ct.source, tablet, ct.blpStats, dbClient, ct.mysqld, ct.vre) return vreplicator.Replicate(ctx) } return fmt.Errorf("missing source") diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go index b6cf8a3ba11..88d78eb05f7 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go @@ -76,7 +76,7 @@ func TestControllerKeyRange(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil) if err != nil { t.Fatal(err) } @@ -136,7 +136,7 @@ func TestControllerTables(t *testing.T) { }, } - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil) if err != nil { t.Fatal(err) } @@ -153,7 +153,7 @@ func TestControllerBadID(t *testing.T) { params := map[string]string{ "id": "bad", } - _, err := newController(context.Background(), params, nil, nil, nil, "", "", nil) + _, err := newController(context.Background(), params, nil, nil, nil, "", "", nil, nil) want := `strconv.Atoi: parsing "bad": invalid syntax` if err == nil || err.Error() != want { t.Errorf("newController err: %v, want %v", err, want) @@ -166,7 +166,7 @@ func TestControllerStopped(t *testing.T) { "state": binlogplayer.BlpStopped, } - ct, err := newController(context.Background(), params, nil, nil, nil, "", "", nil) + ct, err := newController(context.Background(), params, nil, nil, nil, "", "", nil, nil) if err != nil { t.Fatal(err) } @@ -203,7 +203,7 @@ func TestControllerOverrides(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, nil) if err != nil { t.Fatal(err) } @@ -227,7 +227,7 @@ func TestControllerCanceledContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil) + ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil, nil) if err != nil { t.Fatal(err) } @@ -269,7 +269,7 @@ func TestControllerRetry(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, nil) if err != nil { t.Fatal(err) } @@ -315,7 +315,7 @@ func TestControllerStopPosition(t *testing.T) { dbClientFactory := func() binlogplayer.DBClient { return dbClient } mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} - ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil) + ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil) if err != nil { t.Fatal(err) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 3cea952a16e..b582f38ebc1 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/topo" ) @@ -82,6 +83,13 @@ type Engine struct { mysqld mysqlctl.MysqlDaemon dbClientFactory func() binlogplayer.DBClient dbName string + + journaler map[string]*journalEvent +} + +type journalEvent struct { + journal *binlogdatapb.Journal + participants map[string]int } // NewEngine creates a new Engine. @@ -94,6 +102,7 @@ func NewEngine(ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, dbClie mysqld: mysqld, dbClientFactory: dbClientFactory, dbName: dbName, + journaler: make(map[string]*journalEvent), } return vre } @@ -187,7 +196,7 @@ func (vre *Engine) initAll() error { return err } for _, row := range rows { - ct, err := newController(vre.ctx, row, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil) + ct, err := newController(vre.ctx, row, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil, vre) if err != nil { return err } @@ -280,7 +289,7 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { if err != nil { return nil, err } - ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil) + ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil, vre) if err != nil { return nil, err } @@ -318,7 +327,7 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) { } // Create a new controller in place of the old one. // For continuity, the new controller inherits the previous stats. - ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, blpStats[id]) + ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, blpStats[id], vre) if err != nil { return nil, err } @@ -394,6 +403,133 @@ func (vre *Engine) fetchIDs(dbClient binlogplayer.DBClient, selector string) (id return ids, bv, nil } +func (vre *Engine) journalRegister(journal *binlogdatapb.Journal, id int) error { + vre.mu.Lock() + defer vre.mu.Unlock() + if !vre.isOpen { + // Unreachable. + return nil + } + + key := fmt.Sprintf("%s:%d", vre.controllers[id].workflow, journal.Id) + je, ok := vre.journaler[key] + if !ok { + log.Infof("Journal encountered: %v", journal) + controllerSources := make(map[string]bool) + for _, ct := range vre.controllers { + ks := fmt.Sprintf("%s:%s", ct.source.Keyspace, ct.source.Shard) + controllerSources[ks] = true + } + je = &journalEvent{ + journal: journal, + participants: make(map[string]int), + } + for _, jks := range journal.Participants { + ks := fmt.Sprintf("%s:%s", jks.Keyspace, jks.Shard) + if _, ok := controllerSources[ks]; !ok { + return fmt.Errorf("cannot redirect on journal: not all sources are present in this workflow: missing %v", ks) + } + je.participants[ks] = 0 + } + vre.journaler[key] = je + } + + ks := fmt.Sprintf("%s:%s", vre.controllers[id].source.Keyspace, vre.controllers[id].source.Shard) + log.Infof("Registering id %v against %v", id, ks) + je.participants[ks] = id + for _, pid := range je.participants { + if pid == 0 { + // Still need to wait. + return nil + } + } + go vre.transitionJournal(key) + return nil +} + +func (vre *Engine) transitionJournal(key string) { + vre.mu.Lock() + defer vre.mu.Unlock() + if !vre.isOpen { + return + } + + log.Infof("Transitioning for journal:workload %v", key) + je := vre.journaler[key] + // Wait for participating controllers to stop. + // Also collect one id reference. + refid := 0 + for _, id := range je.participants { + refid = id + vre.controllers[id].Stop() + } + + dbClient := vre.dbClientFactory() + if err := dbClient.Connect(); err != nil { + log.Errorf("transitionJournal: unable to connect to the database: %v", err) + return + } + defer dbClient.Close() + + if err := dbClient.Begin(); err != nil { + log.Errorf("transitionJournal: %v", err) + return + } + + params, err := readRow(dbClient, refid) + if err != nil { + log.Errorf("transitionJournal: %v", err) + return + } + var newids []int + for _, sgtid := range je.journal.ShardGtids { + bls := vre.controllers[refid].source + bls.Keyspace, bls.Shard = sgtid.Keyspace, sgtid.Shard + query := fmt.Sprintf("insert into _vt.vreplication "+ + "(workflow, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, db_name) "+ + "values (%v, %v, %v, %v, %v, %v, %v, 0, '%v', %v)", + encodeString(params["workflow"]), encodeString(bls.String()), encodeString(sgtid.Gtid), params["max_tps"], params["max_replication_lag"], encodeString(params["tablet_types"]), time.Now().Unix(), binlogplayer.BlpRunning, encodeString(vre.dbName)) + qr, err := vre.executeFetchMaybeCreateTable(dbClient, query, 1) + if err != nil { + log.Errorf("transitionJournal: %v", err) + return + } + log.Infof("Created stream: %v for %v", qr.InsertID, sgtid) + newids = append(newids, int(qr.InsertID)) + } + for _, id := range je.participants { + _, err := vre.executeFetchMaybeCreateTable(dbClient, binlogplayer.DeleteVReplication(uint32(id)), 1) + if err != nil { + log.Errorf("transitionJournal: %v", err) + return + } + log.Infof("Deleted stream: %v", id) + } + if err := dbClient.Commit(); err != nil { + log.Errorf("transitionJournal: %v", err) + return + } + + for _, id := range je.participants { + delete(vre.controllers, id) + } + + for _, id := range newids { + params, err := readRow(dbClient, id) + if err != nil { + log.Errorf("transitionJournal: %v", err) + return + } + ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil, vre) + if err != nil { + log.Errorf("transitionJournal: %v", err) + return + } + vre.controllers[id] = ct + } + log.Infof("Completed transition for journal:workload %v", key) +} + // WaitForPos waits for the replication to reach the specified position. func (vre *Engine) WaitForPos(ctx context.Context, id int, pos string) error { start := time.Now() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index a7c50b00122..151b49acc2c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -403,6 +403,31 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return io.EOF } } + case binlogdatapb.VEventType_JOURNAL: + switch event.Journal.MigrationType { + case binlogdatapb.MigrationType_SHARDS: + // no-op + case binlogdatapb.MigrationType_TABLES: + jtables := make(map[string]bool) + for _, table := range event.Journal.Tables { + jtables[table] = true + } + for tableName := range vp.replicatorPlan.TablePlans { + if _, ok := jtables[tableName]; !ok { + if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("unable to continue stream: %v is absent in the journal", tableName)); err != nil { + return err + } + return io.EOF + } + } + } + if err := vp.vr.vre.journalRegister(event.Journal, int(vp.vr.id)); err != nil { + if err := vp.vr.setState(binlogplayer.BlpStopped, err.Error()); err != nil { + return err + } + return io.EOF + } + return io.EOF case binlogdatapb.VEventType_HEARTBEAT: // No-op: heartbeat timings are calculated in outer loop. } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 4aa88141de8..8c6c05e94f7 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -46,6 +46,7 @@ var ( ) type vreplicator struct { + vre *Engine id uint32 source *binlogdatapb.BinlogSource sourceTablet *topodatapb.Tablet @@ -57,8 +58,9 @@ type vreplicator struct { tableKeys map[string][]string } -func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceTablet *topodatapb.Tablet, stats *binlogplayer.Stats, dbClient binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon) *vreplicator { +func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceTablet *topodatapb.Tablet, stats *binlogplayer.Stats, dbClient binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, vre *Engine) *vreplicator { return &vreplicator{ + vre: vre, id: id, source: source, sourceTablet: sourceTablet, From fe5801af737c1e1f52923104df72e96d927c6604 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sun, 20 Oct 2019 21:59:41 -0700 Subject: [PATCH 16/22] resharding_journal: tweak rules of vplayer Since vstreamer sends all journal events, the vplayer has to match them against its stream. Also, changed journalRegister to only look for controllers within the workflow that encountered a journal event. Signed-off-by: Sugu Sougoumarane --- .../tabletmanager/vreplication/engine.go | 7 ++++- .../tabletmanager/vreplication/vplayer.go | 27 ++++++++++++++----- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index b582f38ebc1..4e215e4d925 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -411,12 +411,17 @@ func (vre *Engine) journalRegister(journal *binlogdatapb.Journal, id int) error return nil } - key := fmt.Sprintf("%s:%d", vre.controllers[id].workflow, journal.Id) + workflow := vre.controllers[id].workflow + key := fmt.Sprintf("%s:%d", workflow, journal.Id) je, ok := vre.journaler[key] if !ok { log.Infof("Journal encountered: %v", journal) controllerSources := make(map[string]bool) for _, ct := range vre.controllers { + if ct.workflow != workflow { + // Only compare with streams that belong to the current workflow. + continue + } ks := fmt.Sprintf("%s:%s", ct.source.Keyspace, ct.source.Shard) controllerSources[ks] = true } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 151b49acc2c..bc895a342b9 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -144,7 +144,8 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) error { <-streamErr }() // If the apply thread ends with io.EOF, it means either the Engine - // is shutting down and canceled the context, or stop position was reached. + // is shutting down and canceled the context, or stop position was reached, + // or a journal event was encountered. // If so, we return nil which will cause the controller to not retry. if err == io.EOF { return nil @@ -412,15 +413,29 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m for _, table := range event.Journal.Tables { jtables[table] = true } + found := false + notFound := false for tableName := range vp.replicatorPlan.TablePlans { - if _, ok := jtables[tableName]; !ok { - if err := vp.vr.setState(binlogplayer.BlpStopped, fmt.Sprintf("unable to continue stream: %v is absent in the journal", tableName)); err != nil { - return err - } - return io.EOF + if _, ok := jtables[tableName]; ok { + found = true + } else { + notFound = true } } + switch { + case found && notFound: + // Some were found and some were not found. We can't handle this. + if err := vp.vr.setState(binlogplayer.BlpStopped, "unable to handle journal event: tables were partially matched"); err != nil { + return err + } + return io.EOF + case notFound: + // None were found. Ignore journal. + return nil + } + // All were found. We must register journal. } + if err := vp.vr.vre.journalRegister(event.Journal, int(vp.vr.id)); err != nil { if err := vp.vr.setState(binlogplayer.BlpStopped, err.Error()); err != nil { return err From 9e7bfe24d6c30f80ef3e0ba44e335929d1977a17 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Wed, 23 Oct 2019 20:03:28 -0700 Subject: [PATCH 17/22] resharding_journal: vplayer tests These tests don't cover merges. Those need to be done later through integration tests on a sharded cluster. Signed-off-by: Sugu Sougoumarane --- .../tabletmanager/vreplication/engine.go | 1 + .../vreplication/framework_test.go | 25 ++ .../vreplication/journal_test.go | 299 ++++++++++++++++++ .../tabletmanager/vreplication/vreplicator.go | 2 +- 4 files changed, 326 insertions(+), 1 deletion(-) create mode 100644 go/vt/vttablet/tabletmanager/vreplication/journal_test.go diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 4e215e4d925..4d035004e69 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -461,6 +461,7 @@ func (vre *Engine) transitionJournal(key string) { log.Infof("Transitioning for journal:workload %v", key) je := vre.journaler[key] + defer delete(vre.journaler, key) // Wait for participating controllers to stop. // Also collect one id reference. refid := 0 diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 36b3e55c693..65ee4a601f5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -19,6 +19,7 @@ package vreplication import ( "flag" "fmt" + "io" "os" "reflect" "regexp" @@ -145,6 +146,26 @@ func addTablet(id int) *topodatapb.Tablet { return tablet } +func addOtherTablet(id int, keyspace, shard string) *topodatapb.Tablet { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.Cells[0], + Uid: uint32(id), + }, + Keyspace: keyspace, + Shard: shard, + KeyRange: &topodatapb.KeyRange{}, + Type: topodatapb.TabletType_REPLICA, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + if err := env.TopoServ.CreateTablet(context.Background(), tablet); err != nil { + panic(err) + } + return tablet +} + func deleteTablet(tablet *topodatapb.Tablet) { env.TopoServ.DeleteTablet(context.Background(), tablet.Alias) // This is not automatically removed from shard replication, which results in log spam. @@ -174,6 +195,10 @@ func (ftc *fakeTabletConn) StreamHealth(ctx context.Context, callback func(*quer // VStream directly calls into the pre-initialized engine. func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { + if target.Keyspace != "vttest" { + <-ctx.Done() + return io.EOF + } return streamerEngine.Stream(ctx, startPos, filter, send) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/journal_test.go b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go new file mode 100644 index 00000000000..ba62b12f9a4 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go @@ -0,0 +1,299 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "fmt" + "testing" + + "golang.org/x/net/context" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +func TestJournalOneToOne(t *testing.T) { + defer deleteTablet(addTablet(100)) + defer deleteTablet(addOtherTablet(101, "other_keyspace", "0")) + + execStatements(t, []string{ + "create table t(id int, val varbinary(128), primary key(id))", + fmt.Sprintf("create table %s.t(id int, val varbinary(128), primary key(id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table t", + fmt.Sprintf("drop table %s.t", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t", + }}, + } + _, firstID := startVReplication(t, filter, binlogdatapb.OnDDLAction_IGNORE, "") + + journal := &binlogdatapb.Journal{ + Id: 1, + MigrationType: binlogdatapb.MigrationType_SHARDS, + Participants: []*binlogdatapb.KeyspaceShard{{ + Keyspace: "vttest", + Shard: "0", + }}, + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "other_keyspace", + Shard: "0", + Gtid: "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10", + }}, + } + query := fmt.Sprintf("insert into _vt.resharding_journal(id, db_name, val) values (1, 'vttest', %v)", encodeString(journal.String())) + execStatements(t, []string{createReshardingJournalTable, query}) + defer execStatements(t, []string{"delete from _vt.resharding_journal"}) + + expectDBClientQueries(t, []string{ + "begin", + `/insert into _vt.vreplication.*workflow, source, pos.*values.*'test', 'keyspace:\\"other_keyspace\\" shard:\\"0\\.*'MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10'`, + fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID), + "commit", + "/update _vt.vreplication set state='Running', message='' where id.*", + }) + + // Delete all vreplication streams. There should be only one, but we don't know its id. + if _, err := playerEngine.Exec("delete from _vt.vreplication"); err != nil { + t.Fatal(err) + } + expectDeleteQueries(t) +} + +func TestJournalOneToMany(t *testing.T) { + defer deleteTablet(addTablet(100)) + defer deleteTablet(addOtherTablet(101, "other_keyspace", "-80")) + defer deleteTablet(addOtherTablet(102, "other_keyspace", "80-")) + + execStatements(t, []string{ + "create table t(id int, val varbinary(128), primary key(id))", + fmt.Sprintf("create table %s.t(id int, val varbinary(128), primary key(id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table t", + fmt.Sprintf("drop table %s.t", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t", + }}, + } + _, firstID := startVReplication(t, filter, binlogdatapb.OnDDLAction_IGNORE, "") + + journal := &binlogdatapb.Journal{ + Id: 1, + MigrationType: binlogdatapb.MigrationType_SHARDS, + Participants: []*binlogdatapb.KeyspaceShard{{ + Keyspace: "vttest", + Shard: "0", + }}, + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "other_keyspace", + Shard: "-80", + Gtid: "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-5", + }, { + Keyspace: "other_keyspace", + Shard: "80-", + Gtid: "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:5-10", + }}, + } + query := fmt.Sprintf("insert into _vt.resharding_journal(id, db_name, val) values (1, 'vttest', %v)", encodeString(journal.String())) + execStatements(t, []string{createReshardingJournalTable, query}) + defer execStatements(t, []string{"delete from _vt.resharding_journal"}) + + expectDBClientQueries(t, []string{ + "begin", + `/insert into _vt.vreplication.*workflow, source, pos.*values.*'test', 'keyspace:\\"other_keyspace\\" shard:\\"-80\\.*'MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-5'`, + `/insert into _vt.vreplication.*workflow, source, pos.*values.*'test', 'keyspace:\\"other_keyspace\\" shard:\\"80-\\.*'MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:5-10'`, + fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID), + "commit", + "/update _vt.vreplication set state='Running', message='' where id.*", + "/update _vt.vreplication set state='Running', message='' where id.*", + }) + + // Delete all vreplication streams. There should be only one, but we don't know its id. + if _, err := playerEngine.Exec("delete from _vt.vreplication"); err != nil { + t.Fatal(err) + } + expectDeleteQueries(t) +} + +func TestJournalTablePresent(t *testing.T) { + defer deleteTablet(addTablet(100)) + defer deleteTablet(addOtherTablet(101, "other_keyspace", "0")) + + execStatements(t, []string{ + "create table t(id int, val varbinary(128), primary key(id))", + fmt.Sprintf("create table %s.t(id int, val varbinary(128), primary key(id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table t", + fmt.Sprintf("drop table %s.t", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t", + }}, + } + _, firstID := startVReplication(t, filter, binlogdatapb.OnDDLAction_IGNORE, "") + + journal := &binlogdatapb.Journal{ + Id: 1, + MigrationType: binlogdatapb.MigrationType_TABLES, + Participants: []*binlogdatapb.KeyspaceShard{{ + Keyspace: "vttest", + Shard: "0", + }}, + Tables: []string{"t"}, + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "other_keyspace", + Shard: "0", + Gtid: "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10", + }}, + } + query := fmt.Sprintf("insert into _vt.resharding_journal(id, db_name, val) values (1, 'vttest', %v)", encodeString(journal.String())) + execStatements(t, []string{createReshardingJournalTable, query}) + defer execStatements(t, []string{"delete from _vt.resharding_journal"}) + + expectDBClientQueries(t, []string{ + "begin", + `/insert into _vt.vreplication.*workflow, source, pos.*values.*'test', 'keyspace:\\"other_keyspace\\" shard:\\"0\\.*'MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10'`, + fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID), + "commit", + "/update _vt.vreplication set state='Running', message='' where id.*", + }) + + // Delete all vreplication streams. There should be only one, but we don't know its id. + if _, err := playerEngine.Exec("delete from _vt.vreplication"); err != nil { + t.Fatal(err) + } + expectDeleteQueries(t) +} + +func TestJournalTableNotPresent(t *testing.T) { + defer deleteTablet(addTablet(100)) + defer deleteTablet(addOtherTablet(101, "other_keyspace", "0")) + + execStatements(t, []string{ + "create table t(id int, val varbinary(128), primary key(id))", + fmt.Sprintf("create table %s.t(id int, val varbinary(128), primary key(id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table t", + fmt.Sprintf("drop table %s.t", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t", + }}, + } + _, _ = startVReplication(t, filter, binlogdatapb.OnDDLAction_IGNORE, "") + + journal := &binlogdatapb.Journal{ + Id: 1, + MigrationType: binlogdatapb.MigrationType_TABLES, + Participants: []*binlogdatapb.KeyspaceShard{{ + Keyspace: "vttest", + Shard: "0", + }}, + Tables: []string{"t1"}, + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "other_keyspace", + Shard: "0", + Gtid: "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10", + }}, + } + query := fmt.Sprintf("insert into _vt.resharding_journal(id, db_name, val) values (1, 'vttest', %v)", encodeString(journal.String())) + execStatements(t, []string{createReshardingJournalTable, query}) + defer execStatements(t, []string{"delete from _vt.resharding_journal"}) + + // Wait for a heartbeat based update to confirm that the existing vreplication was not transitioned. + expectDBClientQueries(t, []string{ + "/update _vt.vreplication set pos=", + }) + + // Delete all vreplication streams. There should be only one, but we don't know its id. + if _, err := playerEngine.Exec("delete from _vt.vreplication"); err != nil { + t.Fatal(err) + } + expectDeleteQueries(t) +} + +func TestJournalTableMixed(t *testing.T) { + defer deleteTablet(addTablet(100)) + defer deleteTablet(addOtherTablet(101, "other_keyspace", "0")) + + execStatements(t, []string{ + "create table t(id int, val varbinary(128), primary key(id))", + "create table t1(id int, val varbinary(128), primary key(id))", + fmt.Sprintf("create table %s.t(id int, val varbinary(128), primary key(id))", vrepldb), + fmt.Sprintf("create table %s.t1(id int, val varbinary(128), primary key(id))", vrepldb), + }) + defer execStatements(t, []string{ + "drop table t", + "drop table t1", + fmt.Sprintf("drop table %s.t", vrepldb), + fmt.Sprintf("drop table %s.t1", vrepldb), + }) + env.SchemaEngine.Reload(context.Background()) + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t", + }, { + Match: "t1", + }}, + } + _, _ = startVReplication(t, filter, binlogdatapb.OnDDLAction_IGNORE, "") + + journal := &binlogdatapb.Journal{ + Id: 1, + MigrationType: binlogdatapb.MigrationType_TABLES, + Participants: []*binlogdatapb.KeyspaceShard{{ + Keyspace: "vttest", + Shard: "0", + }}, + Tables: []string{"t"}, + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "other_keyspace", + Shard: "0", + Gtid: "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10", + }}, + } + query := fmt.Sprintf("insert into _vt.resharding_journal(id, db_name, val) values (1, 'vttest', %v)", encodeString(journal.String())) + execStatements(t, []string{createReshardingJournalTable, query}) + defer execStatements(t, []string{"delete from _vt.resharding_journal"}) + + expectDBClientQueries(t, []string{ + "/update _vt.vreplication set state='Stopped', message='unable to handle journal event: tables were partially matched' where id", + }) + + // Delete all vreplication streams. There should be only one, but we don't know its id. + if _, err := playerEngine.Exec("delete from _vt.vreplication"); err != nil { + t.Fatal(err) + } + expectDeleteQueries(t) +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 8c6c05e94f7..395031c8658 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -80,7 +80,7 @@ func (vr *vreplicator) Replicate(ctx context.Context) error { for { settings, numTablesToCopy, err := vr.readSettings(ctx) if err != nil { - return fmt.Errorf("error reading VReplication settings: %v", err) + return err } // If any of the operations below changed state to Stopped, we should return. if settings.State == binlogplayer.BlpStopped { From 6c704fe958dad4f84078049c532a9eccdd3dbabf Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sun, 24 Nov 2019 13:11:43 -0800 Subject: [PATCH 18/22] resharding_journal: fix test after rebase Signed-off-by: Sugu Sougoumarane --- go/vt/vttablet/tabletmanager/vreplication/journal_test.go | 4 ++++ go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/journal_test.go b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go index ba62b12f9a4..fc29f6d1ba0 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/journal_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go @@ -63,6 +63,7 @@ func TestJournalOneToOne(t *testing.T) { defer execStatements(t, []string{"delete from _vt.resharding_journal"}) expectDBClientQueries(t, []string{ + "/update _vt.vreplication set pos=", "begin", `/insert into _vt.vreplication.*workflow, source, pos.*values.*'test', 'keyspace:\\"other_keyspace\\" shard:\\"0\\.*'MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10'`, fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID), @@ -121,6 +122,7 @@ func TestJournalOneToMany(t *testing.T) { defer execStatements(t, []string{"delete from _vt.resharding_journal"}) expectDBClientQueries(t, []string{ + "/update _vt.vreplication set pos=", "begin", `/insert into _vt.vreplication.*workflow, source, pos.*values.*'test', 'keyspace:\\"other_keyspace\\" shard:\\"-80\\.*'MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-5'`, `/insert into _vt.vreplication.*workflow, source, pos.*values.*'test', 'keyspace:\\"other_keyspace\\" shard:\\"80-\\.*'MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:5-10'`, @@ -177,6 +179,7 @@ func TestJournalTablePresent(t *testing.T) { defer execStatements(t, []string{"delete from _vt.resharding_journal"}) expectDBClientQueries(t, []string{ + "/update _vt.vreplication set pos=", "begin", `/insert into _vt.vreplication.*workflow, source, pos.*values.*'test', 'keyspace:\\"other_keyspace\\" shard:\\"0\\.*'MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10'`, fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID), @@ -288,6 +291,7 @@ func TestJournalTableMixed(t *testing.T) { defer execStatements(t, []string{"delete from _vt.resharding_journal"}) expectDBClientQueries(t, []string{ + "/update _vt.vreplication set pos=", "/update _vt.vreplication set state='Stopped', message='unable to handle journal event: tables were partially matched' where id", }) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 45c83b6a802..63d4a6dc655 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -843,9 +843,9 @@ func TestJournal(t *testing.T) { }, // External table events don't get sent. output: [][]string{{ - `gtid|begin`, - `gtid|begin`, + `begin`, `type:JOURNAL journal: `, + `gtid`, `commit`, }}, }} From 462f26cea1bd71aa97e5285608d24868edeb111c Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Wed, 27 Nov 2019 13:37:10 -0800 Subject: [PATCH 19/22] resharding_journal: address review comments Signed-off-by: Sugu Sougoumarane --- go/vt/vttablet/tabletmanager/vreplication/engine.go | 2 +- go/vt/vttablet/tabletmanager/vreplication/vplayer.go | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 4d035004e69..a9c1678fde3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -403,7 +403,7 @@ func (vre *Engine) fetchIDs(dbClient binlogplayer.DBClient, selector string) (id return ids, bv, nil } -func (vre *Engine) journalRegister(journal *binlogdatapb.Journal, id int) error { +func (vre *Engine) registerJournal(journal *binlogdatapb.Journal, id int) error { vre.mu.Lock() defer vre.mu.Unlock() if !vre.isOpen { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index bc895a342b9..17a5359ca81 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -405,10 +405,12 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m } } case binlogdatapb.VEventType_JOURNAL: + // Ensure that we don't have a partial set of table matches in the journal. switch event.Journal.MigrationType { case binlogdatapb.MigrationType_SHARDS: - // no-op + // All tables of the source were migrated. So, no validation needed. case binlogdatapb.MigrationType_TABLES: + // Validate that all or none of the tables are in the journal. jtables := make(map[string]bool) for _, table := range event.Journal.Tables { jtables[table] = true @@ -436,7 +438,7 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m // All were found. We must register journal. } - if err := vp.vr.vre.journalRegister(event.Journal, int(vp.vr.id)); err != nil { + if err := vp.vr.vre.registerJournal(event.Journal, int(vp.vr.id)); err != nil { if err := vp.vr.setState(binlogplayer.BlpStopped, err.Error()); err != nil { return err } From cc4d1b505505bdc10ff17b2b72127f1a743540b4 Mon Sep 17 00:00:00 2001 From: Arindam Nayak Date: Thu, 28 Nov 2019 11:24:02 +0530 Subject: [PATCH 20/22] Propagate err to top Signed-off-by: Arindam Nayak --- go/test/endtoend/cluster/cluster_process.go | 25 ++++++++++++++------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 5f2e1cc4d43..345a1684602 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -253,11 +253,12 @@ func (cluster *LocalProcessCluster) StartVtgate() (err error) { cluster.VtGateExtraArgs) log.Info(fmt.Sprintf("Vtgate started, connect to mysql using : mysql -h 127.0.0.1 -P %d", cluster.VtgateMySQLPort)) - err = cluster.VtgateProcess.Setup() - if err != nil { - return + if err = cluster.VtgateProcess.Setup(); err != nil { + return err + } + if err = cluster.WaitForTabletsToHealthyInVtgate(); err != nil { + return err } - cluster.WaitForTabletsToHealthyInVtgate() return nil } @@ -288,23 +289,31 @@ func (cluster *LocalProcessCluster) ReStartVtgate() (err error) { } // WaitForTabletsToHealthyInVtgate waits for all tablets in all shards to be healthy as per vtgate -func (cluster *LocalProcessCluster) WaitForTabletsToHealthyInVtgate() { +func (cluster *LocalProcessCluster) WaitForTabletsToHealthyInVtgate() (err error) { var isRdOnlyPresent bool for _, keyspace := range cluster.Keyspaces { for _, shard := range keyspace.Shards { isRdOnlyPresent = false - _ = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", keyspace.Name, shard.Name)) - _ = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspace.Name, shard.Name)) + if err = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", keyspace.Name, shard.Name)); err != nil { + return err + } + if err = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspace.Name, shard.Name)); err != nil { + return err + } for _, tablet := range shard.Vttablets { if tablet.Type == "rdonly" { isRdOnlyPresent = true } } if isRdOnlyPresent { - _ = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspace.Name, shard.Name)) + err = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspace.Name, shard.Name)) + } + if err != nil { + return err } } } + return nil } // Teardown brings down the cluster by invoking teardown for individual processes From 5402463421ff9e69a0062a09eb358c950f49d2bb Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Wed, 27 Nov 2019 22:54:54 -0700 Subject: [PATCH 21/22] config: add vt_monitoring user to init_db.sql Signed-off-by: Derek Perkins --- config/init_db.sql | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/config/init_db.sql b/config/init_db.sql index 5b56939c3c3..836c8c997e6 100644 --- a/config/init_db.sql +++ b/config/init_db.sql @@ -84,6 +84,13 @@ GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE, SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER ON *.* TO 'vt_filtered'@'localhost'; +# User for general MySQL monitoring. +CREATE USER 'vt_monitoring'@'localhost'; +GRANT SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD + ON *.* TO 'vt_monitoring'@'localhost'; +GRANT SELECT, UPDATE, DELETE, DROP + ON performance_schema.* TO 'vt_monitoring'@'localhost'; + # User for Orchestrator (https://github.com/github/orchestrator). CREATE USER 'orc_client_user'@'%' IDENTIFIED BY 'orc_client_user_password'; GRANT SUPER, PROCESS, REPLICATION SLAVE, RELOAD From b30793027dac83b01aebcffb2cf6c3a88e3659e4 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Thu, 28 Nov 2019 10:17:46 -0700 Subject: [PATCH 22/22] Add new github actions Signed-off-by: Morgan Tocker --- .github/workflows/check_make_parser.yml | 37 +++++++++++++++++ .github/workflows/cluster_endtoend.yml | 41 +++++++++++++++++++ .github/workflows/e2e_race.yml | 41 +++++++++++++++++++ .../{e2e-test-cluster.yml => endtoend.yml} | 7 ++-- .../{local-example.yml => local_example.yml} | 4 +- .github/workflows/unit.yml | 40 ++++++++++++++++++ .github/workflows/unit_race.yml | 41 +++++++++++++++++++ go/vt/mysqlctl/mycnf_test.go | 7 +--- test/config.json | 12 +++--- 9 files changed, 213 insertions(+), 17 deletions(-) create mode 100644 .github/workflows/check_make_parser.yml create mode 100644 .github/workflows/cluster_endtoend.yml create mode 100644 .github/workflows/e2e_race.yml rename .github/workflows/{e2e-test-cluster.yml => endtoend.yml} (91%) rename .github/workflows/{local-example.yml => local_example.yml} (95%) create mode 100644 .github/workflows/unit.yml create mode 100644 .github/workflows/unit_race.yml diff --git a/.github/workflows/check_make_parser.yml b/.github/workflows/check_make_parser.yml new file mode 100644 index 00000000000..1cf47a6d4a7 --- /dev/null +++ b/.github/workflows/check_make_parser.yml @@ -0,0 +1,37 @@ +name: check_make_parser +on: [push, pull_request] +jobs: + + build: + name: Build + runs-on: ubuntu-latest + steps: + + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.13 + + - name: Check out code + uses: actions/checkout@v1 + + - name: Get dependencies + run: | + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + - name: Run bootstrap.sh + run: | + echo "Copying new bootstrap over location of legacy one." + cp .github/bootstrap.sh . + ./bootstrap.sh + + - name: check_make_parser + run: | + export PATH=$PWD/bin:$PATH + VTDATAROOT=/tmp/vtdataroot VTTOP=$PWD VTROOT=$PWD tools/check_make_parser.sh + diff --git a/.github/workflows/cluster_endtoend.yml b/.github/workflows/cluster_endtoend.yml new file mode 100644 index 00000000000..4adab961fea --- /dev/null +++ b/.github/workflows/cluster_endtoend.yml @@ -0,0 +1,41 @@ +name: cluster_endtoend +on: [push, pull_request] +jobs: + + build: + name: Build + runs-on: ubuntu-latest + steps: + + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.13 + + - name: Check out code + uses: actions/checkout@v1 + + - name: Get dependencies + run: | + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + - name: Run bootstrap.sh + run: | + echo "Copying new bootstrap over location of legacy one." + cp .github/bootstrap.sh . + ./bootstrap.sh + + - name: Build + run: | + GOBIN=$PWD/bin make build + + - name: cluster_endtoend + run: | + export PATH=$PWD/bin:$PATH + source ./dev.env + VTDATAROOT=/tmp/vtdataroot VTTOP=$PWD VTROOT=$PWD make e2e_test_cluster diff --git a/.github/workflows/e2e_race.yml b/.github/workflows/e2e_race.yml new file mode 100644 index 00000000000..9acebc922ed --- /dev/null +++ b/.github/workflows/e2e_race.yml @@ -0,0 +1,41 @@ +name: e2e_race +on: [push, pull_request] +jobs: + + build: + name: Build + runs-on: ubuntu-latest + steps: + + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.13 + + - name: Check out code + uses: actions/checkout@v1 + + - name: Get dependencies + run: | + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + - name: Run bootstrap.sh + run: | + echo "Copying new bootstrap over location of legacy one." + cp .github/bootstrap.sh . + ./bootstrap.sh + + - name: Build + run: | + GOBIN=$PWD/bin make build + + - name: e2e_race + run: | + export PATH=$PWD/bin:$PATH + source ./dev.env + VTDATAROOT=/tmp/vtdataroot VTTOP=$PWD VTROOT=$PWD make e2e_test_race diff --git a/.github/workflows/e2e-test-cluster.yml b/.github/workflows/endtoend.yml similarity index 91% rename from .github/workflows/e2e-test-cluster.yml rename to .github/workflows/endtoend.yml index 969008305ef..8b058df58a8 100644 --- a/.github/workflows/e2e-test-cluster.yml +++ b/.github/workflows/endtoend.yml @@ -1,4 +1,4 @@ -name: e2e Test Cluster +name: endtoend on: [push, pull_request] jobs: @@ -34,8 +34,9 @@ jobs: run: | GOBIN=$PWD/bin make build - - name: Run e2e test cluster + - name: endtoend run: | export PATH=$PWD/bin:$PATH source ./dev.env - VTDATAROOT=/tmp/vtdataroot VTTOP=$PWD VTROOT=$PWD tools/e2e_test_cluster.sh + mkdir -p /tmp/vtdataroot + VTDATAROOT=/tmp/vtdataroot VTTOP=$PWD VTROOT=$PWD tools/e2e_test_runner.sh diff --git a/.github/workflows/local-example.yml b/.github/workflows/local_example.yml similarity index 95% rename from .github/workflows/local-example.yml rename to .github/workflows/local_example.yml index 03a358d8161..ccfde7389bd 100644 --- a/.github/workflows/local-example.yml +++ b/.github/workflows/local_example.yml @@ -1,4 +1,4 @@ -name: Local Example +name: local_example on: [push, pull_request] jobs: @@ -34,7 +34,7 @@ jobs: run: | GOBIN=$PWD/bin make build - - name: Run Local Example + - name: local_example run: | export PATH=$PWD/bin:$PATH VTDATAROOT=/tmp/vtdataroot VTTOP=$PWD VTROOT=$PWD test/local_example.sh diff --git a/.github/workflows/unit.yml b/.github/workflows/unit.yml new file mode 100644 index 00000000000..89d705a3f51 --- /dev/null +++ b/.github/workflows/unit.yml @@ -0,0 +1,40 @@ +name: unit +on: [push, pull_request] +jobs: + + build: + name: Build + runs-on: ubuntu-latest + steps: + + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.12 + + - name: Check out code + uses: actions/checkout@v1 + + - name: Get dependencies + run: | + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget ant openjdk-8-jdk + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + - name: Run bootstrap.sh + run: | + VTTOP=$PWD VTROOT=$PWD BUILD_PYTHON=0 ./bootstrap.sh + + - name: Build + run: | + GOBIN=$PWD/bin make build + + - name: unit + run: | + export PATH=$PWD/bin:$PATH + source ./dev.env + mkdir -p /tmp/vtdataroot + VTDATAROOT=/tmp/vtdataroot VTTOP=$PWD VTROOT=$PWD tools/unit_test_runner.sh diff --git a/.github/workflows/unit_race.yml b/.github/workflows/unit_race.yml new file mode 100644 index 00000000000..8c42a865339 --- /dev/null +++ b/.github/workflows/unit_race.yml @@ -0,0 +1,41 @@ +name: unit_race +on: [push, pull_request] +jobs: + + build: + name: Build + runs-on: ubuntu-latest + steps: + + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.13 + + - name: Check out code + uses: actions/checkout@v1 + + - name: Get dependencies + run: | + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + - name: Run bootstrap.sh + run: | + echo "Copying new bootstrap over location of legacy one." + cp .github/bootstrap.sh . + ./bootstrap.sh + + - name: Build + run: | + GOBIN=$PWD/bin make build + + - name: unit_race + run: | + export PATH=$PWD/bin:$PATH + source ./dev.env + VTDATAROOT=/tmp/vtdataroot VTTOP=$PWD VTROOT=$PWD make unit_test_race diff --git a/go/vt/mysqlctl/mycnf_test.go b/go/vt/mysqlctl/mycnf_test.go index c8381a46af3..0fd3e510833 100644 --- a/go/vt/mysqlctl/mycnf_test.go +++ b/go/vt/mysqlctl/mycnf_test.go @@ -24,7 +24,6 @@ import ( "testing" "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/env" "vitess.io/vitess/go/vt/servenv" ) @@ -37,12 +36,8 @@ func TestMycnf(t *testing.T) { // Assigning ServerID to be different from tablet UID to make sure that there are no // assumptions in the code that those IDs are the same. cnf.ServerID = 22222 - root, err := env.VtRoot() - if err != nil { - t.Errorf("err: %v", err) - } cnfTemplatePaths := []string{ - path.Join(root, "src/vitess.io/vitess/config/mycnf/default.cnf"), + path.Join(os.Getenv("VTTOP"), "/config/mycnf/default.cnf"), } data, err := cnf.makeMycnf(cnfTemplatePaths) if err != nil { diff --git a/test/config.json b/test/config.json index 9523cbed562..273fa24bd1a 100644 --- a/test/config.json +++ b/test/config.json @@ -94,7 +94,7 @@ "tools/check_make_parser.sh" ], "Manual": false, - "Shard": 4, + "Shard": 5, "RetryMax": 1, "Tags": [] }, @@ -212,7 +212,7 @@ "test/local_example.sh" ], "Manual": false, - "Shard": 3, + "Shard": 5, "RetryMax": 0, "Tags": [] }, @@ -418,7 +418,7 @@ "tools/e2e_test_runner.sh" ], "Manual": false, - "Shard": 3, + "Shard": 5, "RetryMax": 0, "Tags": [] }, @@ -442,7 +442,7 @@ "e2e_test_race" ], "Manual": false, - "Shard": 1, + "Shard": 5, "RetryMax": 0, "Tags": [] }, @@ -453,7 +453,7 @@ "tools/unit_test_runner.sh" ], "Manual": false, - "Shard": 0, + "Shard": 5, "RetryMax": 0, "Tags": [] }, @@ -465,7 +465,7 @@ "unit_test_race" ], "Manual": false, - "Shard": 3, + "Shard": 5, "RetryMax": 0, "Tags": [] },