Skip to content

Commit

Permalink
Merge pull request #7342 from planetscale/rn-fix-maxvrlag
Browse files Browse the repository at this point in the history
Workflow Show: use timeUpdated to calculate vreplication lag
  • Loading branch information
deepthi authored Jan 23, 2021
2 parents a346fb7 + 4f68b92 commit 4410733
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 40 deletions.
13 changes: 3 additions & 10 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,6 @@ type ReplicationStatus struct {
StopPos string
// State represents the state column from the _vt.vreplication table.
State string
// MaxReplicationLag represents the max_replication_lag column from the _vt.vreplication table.
MaxReplicationLag int64
// DbName represents the db_name column from the _vt.vreplication table.
DBName string
// TransactionTimestamp represents the transaction_timestamp column from the _vt.vreplication table.
Expand All @@ -395,7 +393,7 @@ type ReplicationStatus struct {

func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*ReplicationStatus, string, error) {
var err error
var id, maxReplicationLag, timeUpdated, transactionTimestamp int64
var id, timeUpdated, transactionTimestamp int64
var state, dbName, pos, stopPos, message string
var bls binlogdatapb.BinlogSource
id, err = evalengine.ToInt64(row[0])
Expand All @@ -407,10 +405,6 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
}
pos = row[2].ToString()
stopPos = row[3].ToString()
maxReplicationLag, err = evalengine.ToInt64(row[4])
if err != nil {
return nil, "", err
}
state = row[5].ToString()
dbName = row[6].ToString()
timeUpdated, err = evalengine.ToInt64(row[7])
Expand All @@ -431,7 +425,6 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
StopPos: stopPos,
State: state,
DBName: dbName,
MaxReplicationLag: maxReplicationLag,
TransactionTimestamp: transactionTimestamp,
TimeUpdated: timeUpdated,
Message: message,
Expand Down Expand Up @@ -477,8 +470,8 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (
sourceShards.Insert(status.Bls.Shard)
rsrStatus = append(rsrStatus, status)

transactionTimestamp := time.Unix(status.TransactionTimestamp, 0)
replicationLag := time.Since(transactionTimestamp)
timeUpdated := time.Unix(status.TimeUpdated, 0)
replicationLag := time.Since(timeUpdated)
if replicationLag.Seconds() > float64(rsr.MaxVReplicationLag) {
rsr.MaxVReplicationLag = int64(replicationLag.Seconds())
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/wrangler/vexec_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ func (p vreplicationPlanner) dryRun(ctx context.Context) error {
p.vx.plannedQuery, p.vx.keyspace, p.vx.workflow)
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID", "MaxReplicationLag"})
table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID"})
for _, master := range p.vx.masters {
key := fmt.Sprintf("%s/%s", master.Shard, master.AliasString())
for _, stream := range rsr.ShardStatuses[key].MasterReplicationStatuses {
table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)})
table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos})
}
}
table.SetAutoMergeCellsByColumnIndex([]int{0})
Expand Down
50 changes: 26 additions & 24 deletions go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestVExec(t *testing.T) {
workflow := "wrWorkflow"
keyspace := "target"
query := "update _vt.vreplication set state = 'Running'"
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil)
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, time.Now().Unix())
defer env.close()
var logger = logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
Expand Down Expand Up @@ -74,6 +74,10 @@ func TestVExec(t *testing.T) {
vx.plannedQuery = plan.parsedQuery.Query
vx.exec()

res, err := wr.getStreams(ctx, workflow, keyspace)
require.NoError(t, err)
require.Less(t, res.MaxVReplicationLag, int64(3 /*seconds*/)) // lag should be very small

type TestCase struct {
name string
query string
Expand Down Expand Up @@ -149,15 +153,15 @@ func TestVExec(t *testing.T) {
dryRunResults := []string{
"Query: delete from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'",
"will be run on the following streams in keyspace target for workflow wrWorkflow:\n\n",
"+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+",
"| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+",
"| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |",
"| | | filter:<rules:<match:\"t1\" > > | | | | |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+",
"| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |",
"| | | filter:<rules:<match:\"t1\" > > | | | | |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+",
"+----------------------+----+--------------------------------+---------+-----------+--------------+",
"| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+",
"| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos |",
"| | | filter:<rules:<match:\"t1\" > > | | | |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+",
"| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos |",
"| | | filter:<rules:<match:\"t1\" > > | | | |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+",
}
require.Equal(t, strings.Join(dryRunResults, "\n")+"\n\n\n\n\n", logger.String())
}
Expand All @@ -174,7 +178,7 @@ func TestWorkflowListStreams(t *testing.T) {
ctx := context.Background()
workflow := "wrWorkflow"
keyspace := "target"
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil)
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 1234)
defer env.close()
logger := logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
Expand Down Expand Up @@ -227,7 +231,6 @@ func TestWorkflowListStreams(t *testing.T) {
"Pos": "pos",
"StopPos": "",
"State": "Copying",
"MaxReplicationLag": 0,
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
Expand Down Expand Up @@ -263,7 +266,6 @@ func TestWorkflowListStreams(t *testing.T) {
"Pos": "pos",
"StopPos": "",
"State": "Copying",
"MaxReplicationLag": 0,
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
Expand Down Expand Up @@ -310,15 +312,15 @@ func TestWorkflowListStreams(t *testing.T) {
will be run on the following streams in keyspace target for workflow wrWorkflow:
+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+
| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG |
+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+
| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 |
| | | filter:<rules:<match:"t1" > > | | | | |
+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+
| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 |
| | | filter:<rules:<match:"t1" > > | | | | |
+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+
+----------------------+----+--------------------------------+---------+-----------+--------------+
| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID |
+----------------------+----+--------------------------------+---------+-----------+--------------+
| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos |
| | | filter:<rules:<match:"t1" > > | | | |
+----------------------+----+--------------------------------+---------+-----------+--------------+
| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos |
| | | filter:<rules:<match:"t1" > > | | | |
+----------------------+----+--------------------------------+---------+-----------+--------------+
Expand All @@ -331,7 +333,7 @@ func TestWorkflowListAll(t *testing.T) {
ctx := context.Background()
keyspace := "target"
workflow := "wrWorkflow"
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil)
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0)
defer env.close()
logger := logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
Expand All @@ -350,7 +352,7 @@ func TestVExecValidations(t *testing.T) {
workflow := "wf"
keyspace := "ks"
query := ""
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil)
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0)
defer env.close()

wr := New(logutil.NewConsoleLogger(), env.topoServ, env.tmc)
Expand Down
7 changes: 3 additions & 4 deletions go/vt/wrangler/wrangler_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ limitations under the License.
package wrangler

import (
"context"
"flag"
"fmt"
"sync"

"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -76,7 +75,7 @@ func init() {
//----------------------------------------------
// testWranglerEnv

func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string) *testWranglerEnv {
func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string, timeUpdated int64) *testWranglerEnv {
flag.Set("tablet_protocol", "WranglerTest")
env := &testWranglerEnv{
workflow: "wrWorkflow",
Expand Down Expand Up @@ -147,7 +146,7 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit
result := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|message",
"int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|varchar"),
fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|0|", bls),
fmt.Sprintf("1|%v|pos||0|Running|vt_target|%d|0|", bls, timeUpdated),
)
env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result)
env.tmc.setVRResults(
Expand Down

0 comments on commit 4410733

Please sign in to comment.