From 7d5704b544b74af40b03b43540dccfd7e2162530 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 21 Jan 2021 21:16:03 +0100 Subject: [PATCH 1/2] Use timeUpdated to determine lag instead of transactionTimestamp. Remove unused column MaxReplicationLag Signed-off-by: Rohit Nayak --- go/vt/wrangler/vexec.go | 13 +++--------- go/vt/wrangler/vexec_plan.go | 4 ++-- go/vt/wrangler/vexec_test.go | 38 +++++++++++++++++------------------- 3 files changed, 23 insertions(+), 32 deletions(-) diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 5c7c9eb1d21..5b7259800a5 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -378,8 +378,6 @@ type ReplicationStatus struct { StopPos string // State represents the state column from the _vt.vreplication table. State string - // MaxReplicationLag represents the max_replication_lag column from the _vt.vreplication table. - MaxReplicationLag int64 // DbName represents the db_name column from the _vt.vreplication table. DBName string // TransactionTimestamp represents the transaction_timestamp column from the _vt.vreplication table. @@ -395,7 +393,7 @@ type ReplicationStatus struct { func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*ReplicationStatus, string, error) { var err error - var id, maxReplicationLag, timeUpdated, transactionTimestamp int64 + var id, timeUpdated, transactionTimestamp int64 var state, dbName, pos, stopPos, message string var bls binlogdatapb.BinlogSource id, err = evalengine.ToInt64(row[0]) @@ -407,10 +405,6 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty } pos = row[2].ToString() stopPos = row[3].ToString() - maxReplicationLag, err = evalengine.ToInt64(row[4]) - if err != nil { - return nil, "", err - } state = row[5].ToString() dbName = row[6].ToString() timeUpdated, err = evalengine.ToInt64(row[7]) @@ -431,7 +425,6 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty StopPos: stopPos, State: state, DBName: dbName, - MaxReplicationLag: maxReplicationLag, TransactionTimestamp: transactionTimestamp, TimeUpdated: timeUpdated, Message: message, @@ -477,8 +470,8 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( sourceShards.Insert(status.Bls.Shard) rsrStatus = append(rsrStatus, status) - transactionTimestamp := time.Unix(status.TransactionTimestamp, 0) - replicationLag := time.Since(transactionTimestamp) + timeUpdated := time.Unix(status.TimeUpdated, 0) + replicationLag := time.Since(timeUpdated) if replicationLag.Seconds() > float64(rsr.MaxVReplicationLag) { rsr.MaxVReplicationLag = int64(replicationLag.Seconds()) } diff --git a/go/vt/wrangler/vexec_plan.go b/go/vt/wrangler/vexec_plan.go index d5958ac2216..4beb5be4fd9 100644 --- a/go/vt/wrangler/vexec_plan.go +++ b/go/vt/wrangler/vexec_plan.go @@ -92,11 +92,11 @@ func (p vreplicationPlanner) dryRun(ctx context.Context) error { p.vx.plannedQuery, p.vx.keyspace, p.vx.workflow) tableString := &strings.Builder{} table := tablewriter.NewWriter(tableString) - table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID", "MaxReplicationLag"}) + table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID"}) for _, master := range p.vx.masters { key := fmt.Sprintf("%s/%s", master.Shard, master.AliasString()) for _, stream := range rsr.ShardStatuses[key].MasterReplicationStatuses { - table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)}) + table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos}) } } table.SetAutoMergeCellsByColumnIndex([]int{0}) diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index cb8db52661b..c4d64b4a536 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -149,15 +149,15 @@ func TestVExec(t *testing.T) { dryRunResults := []string{ "Query: delete from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", "will be run on the following streams in keyspace target for workflow wrWorkflow:\n\n", - "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", - "| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG |", - "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", - "| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |", - "| | | filter: > | | | | |", - "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", - "| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |", - "| | | filter: > | | | | |", - "+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+", + "+----------------------+----+--------------------------------+---------+-----------+--------------+", + "| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID |", + "+----------------------+----+--------------------------------+---------+-----------+--------------+", + "| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos |", + "| | | filter: > | | | |", + "+----------------------+----+--------------------------------+---------+-----------+--------------+", + "| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos |", + "| | | filter: > | | | |", + "+----------------------+----+--------------------------------+---------+-----------+--------------+", } require.Equal(t, strings.Join(dryRunResults, "\n")+"\n\n\n\n\n", logger.String()) } @@ -227,7 +227,6 @@ func TestWorkflowListStreams(t *testing.T) { "Pos": "pos", "StopPos": "", "State": "Copying", - "MaxReplicationLag": 0, "DBName": "vt_target", "TransactionTimestamp": 0, "TimeUpdated": 1234, @@ -263,7 +262,6 @@ func TestWorkflowListStreams(t *testing.T) { "Pos": "pos", "StopPos": "", "State": "Copying", - "MaxReplicationLag": 0, "DBName": "vt_target", "TransactionTimestamp": 0, "TimeUpdated": 1234, @@ -310,15 +308,15 @@ func TestWorkflowListStreams(t *testing.T) { will be run on the following streams in keyspace target for workflow wrWorkflow: -+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ -| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG | -+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ -| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 | -| | | filter: > | | | | | -+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ -| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 | -| | | filter: > | | | | | -+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+ ++----------------------+----+--------------------------------+---------+-----------+--------------+ +| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | ++----------------------+----+--------------------------------+---------+-----------+--------------+ +| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | +| | | filter: > | | | | ++----------------------+----+--------------------------------+---------+-----------+--------------+ +| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | +| | | filter: > | | | | ++----------------------+----+--------------------------------+---------+-----------+--------------+ From 4f68b92a6a07abf3e628b641eed5f0a98819be2d Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 21 Jan 2021 22:24:17 +0100 Subject: [PATCH 2/2] update tests for max lag Signed-off-by: Rohit Nayak --- go/vt/wrangler/vexec_test.go | 12 ++++++++---- go/vt/wrangler/wrangler_env_test.go | 7 +++---- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index c4d64b4a536..1ed3ccdc1d1 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -37,7 +37,7 @@ func TestVExec(t *testing.T) { workflow := "wrWorkflow" keyspace := "target" query := "update _vt.vreplication set state = 'Running'" - env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, time.Now().Unix()) defer env.close() var logger = logutil.NewMemoryLogger() wr := New(logger, env.topoServ, env.tmc) @@ -74,6 +74,10 @@ func TestVExec(t *testing.T) { vx.plannedQuery = plan.parsedQuery.Query vx.exec() + res, err := wr.getStreams(ctx, workflow, keyspace) + require.NoError(t, err) + require.Less(t, res.MaxVReplicationLag, int64(3 /*seconds*/)) // lag should be very small + type TestCase struct { name string query string @@ -174,7 +178,7 @@ func TestWorkflowListStreams(t *testing.T) { ctx := context.Background() workflow := "wrWorkflow" keyspace := "target" - env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 1234) defer env.close() logger := logutil.NewMemoryLogger() wr := New(logger, env.topoServ, env.tmc) @@ -329,7 +333,7 @@ func TestWorkflowListAll(t *testing.T) { ctx := context.Background() keyspace := "target" workflow := "wrWorkflow" - env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0) defer env.close() logger := logutil.NewMemoryLogger() wr := New(logger, env.topoServ, env.tmc) @@ -348,7 +352,7 @@ func TestVExecValidations(t *testing.T) { workflow := "wf" keyspace := "ks" query := "" - env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0) defer env.close() wr := New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index fd5fb24f776..14cbfaf7b8c 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -17,12 +17,11 @@ limitations under the License. package wrangler import ( + "context" "flag" "fmt" "sync" - "context" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" @@ -76,7 +75,7 @@ func init() { //---------------------------------------------- // testWranglerEnv -func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string) *testWranglerEnv { +func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string, timeUpdated int64) *testWranglerEnv { flag.Set("tablet_protocol", "WranglerTest") env := &testWranglerEnv{ workflow: "wrWorkflow", @@ -147,7 +146,7 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|message", "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|varchar"), - fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|0|", bls), + fmt.Sprintf("1|%v|pos||0|Running|vt_target|%d|0|", bls, timeUpdated), ) env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result) env.tmc.setVRResults(