Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow Show: use timeUpdated to calculate vreplication lag #7342

Merged
merged 2 commits into from
Jan 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,6 @@ type ReplicationStatus struct {
StopPos string
// State represents the state column from the _vt.vreplication table.
State string
// MaxReplicationLag represents the max_replication_lag column from the _vt.vreplication table.
MaxReplicationLag int64
// DbName represents the db_name column from the _vt.vreplication table.
DBName string
// TransactionTimestamp represents the transaction_timestamp column from the _vt.vreplication table.
Expand All @@ -395,7 +393,7 @@ type ReplicationStatus struct {

func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*ReplicationStatus, string, error) {
var err error
var id, maxReplicationLag, timeUpdated, transactionTimestamp int64
var id, timeUpdated, transactionTimestamp int64
var state, dbName, pos, stopPos, message string
var bls binlogdatapb.BinlogSource
id, err = evalengine.ToInt64(row[0])
Expand All @@ -407,10 +405,6 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
}
pos = row[2].ToString()
stopPos = row[3].ToString()
maxReplicationLag, err = evalengine.ToInt64(row[4])
if err != nil {
return nil, "", err
}
state = row[5].ToString()
dbName = row[6].ToString()
timeUpdated, err = evalengine.ToInt64(row[7])
Expand All @@ -431,7 +425,6 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
StopPos: stopPos,
State: state,
DBName: dbName,
MaxReplicationLag: maxReplicationLag,
TransactionTimestamp: transactionTimestamp,
TimeUpdated: timeUpdated,
Message: message,
Expand Down Expand Up @@ -477,8 +470,8 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (
sourceShards.Insert(status.Bls.Shard)
rsrStatus = append(rsrStatus, status)

transactionTimestamp := time.Unix(status.TransactionTimestamp, 0)
replicationLag := time.Since(transactionTimestamp)
timeUpdated := time.Unix(status.TimeUpdated, 0)
replicationLag := time.Since(timeUpdated)
if replicationLag.Seconds() > float64(rsr.MaxVReplicationLag) {
rsr.MaxVReplicationLag = int64(replicationLag.Seconds())
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/wrangler/vexec_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ func (p vreplicationPlanner) dryRun(ctx context.Context) error {
p.vx.plannedQuery, p.vx.keyspace, p.vx.workflow)
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID", "MaxReplicationLag"})
table.SetHeader([]string{"Tablet", "ID", "BinLogSource", "State", "DBName", "Current GTID"})
for _, master := range p.vx.masters {
key := fmt.Sprintf("%s/%s", master.Shard, master.AliasString())
for _, stream := range rsr.ShardStatuses[key].MasterReplicationStatuses {
table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos, fmt.Sprintf("%d", stream.MaxReplicationLag)})
table.Append([]string{key, fmt.Sprintf("%d", stream.ID), stream.Bls.String(), stream.State, stream.DBName, stream.Pos})
}
}
table.SetAutoMergeCellsByColumnIndex([]int{0})
Expand Down
50 changes: 26 additions & 24 deletions go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestVExec(t *testing.T) {
workflow := "wrWorkflow"
keyspace := "target"
query := "update _vt.vreplication set state = 'Running'"
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil)
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, time.Now().Unix())
defer env.close()
var logger = logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
Expand Down Expand Up @@ -74,6 +74,10 @@ func TestVExec(t *testing.T) {
vx.plannedQuery = plan.parsedQuery.Query
vx.exec()

res, err := wr.getStreams(ctx, workflow, keyspace)
require.NoError(t, err)
require.Less(t, res.MaxVReplicationLag, int64(3 /*seconds*/)) // lag should be very small

type TestCase struct {
name string
query string
Expand Down Expand Up @@ -149,15 +153,15 @@ func TestVExec(t *testing.T) {
dryRunResults := []string{
"Query: delete from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'",
"will be run on the following streams in keyspace target for workflow wrWorkflow:\n\n",
"+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+",
"| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+",
"| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |",
"| | | filter:<rules:<match:\"t1\" > > | | | | |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+",
"| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos | 0 |",
"| | | filter:<rules:<match:\"t1\" > > | | | | |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+",
"+----------------------+----+--------------------------------+---------+-----------+--------------+",
"| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+",
"| -80/zone1-0000000200 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos |",
"| | | filter:<rules:<match:\"t1\" > > | | | |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+",
"| 80-/zone1-0000000210 | 1 | keyspace:\"source\" shard:\"0\" | Copying | vt_target | pos |",
"| | | filter:<rules:<match:\"t1\" > > | | | |",
"+----------------------+----+--------------------------------+---------+-----------+--------------+",
}
require.Equal(t, strings.Join(dryRunResults, "\n")+"\n\n\n\n\n", logger.String())
}
Expand All @@ -174,7 +178,7 @@ func TestWorkflowListStreams(t *testing.T) {
ctx := context.Background()
workflow := "wrWorkflow"
keyspace := "target"
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil)
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 1234)
defer env.close()
logger := logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
Expand Down Expand Up @@ -227,7 +231,6 @@ func TestWorkflowListStreams(t *testing.T) {
"Pos": "pos",
"StopPos": "",
"State": "Copying",
"MaxReplicationLag": 0,
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
Expand Down Expand Up @@ -263,7 +266,6 @@ func TestWorkflowListStreams(t *testing.T) {
"Pos": "pos",
"StopPos": "",
"State": "Copying",
"MaxReplicationLag": 0,
"DBName": "vt_target",
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
Expand Down Expand Up @@ -310,15 +312,15 @@ func TestWorkflowListStreams(t *testing.T) {
will be run on the following streams in keyspace target for workflow wrWorkflow:


+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+
| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID | MAXREPLICATIONLAG |
+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+
| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 |
| | | filter:<rules:<match:"t1" > > | | | | |
+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+
| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos | 0 |
| | | filter:<rules:<match:"t1" > > | | | | |
+----------------------+----+--------------------------------+---------+-----------+--------------+-------------------+
+----------------------+----+--------------------------------+---------+-----------+--------------+
| TABLET | ID | BINLOGSOURCE | STATE | DBNAME | CURRENT GTID |
+----------------------+----+--------------------------------+---------+-----------+--------------+
| -80/zone1-0000000200 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos |
| | | filter:<rules:<match:"t1" > > | | | |
+----------------------+----+--------------------------------+---------+-----------+--------------+
| 80-/zone1-0000000210 | 1 | keyspace:"source" shard:"0" | Copying | vt_target | pos |
| | | filter:<rules:<match:"t1" > > | | | |
+----------------------+----+--------------------------------+---------+-----------+--------------+



Expand All @@ -331,7 +333,7 @@ func TestWorkflowListAll(t *testing.T) {
ctx := context.Background()
keyspace := "target"
workflow := "wrWorkflow"
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil)
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0)
defer env.close()
logger := logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)
Expand All @@ -350,7 +352,7 @@ func TestVExecValidations(t *testing.T) {
workflow := "wf"
keyspace := "ks"
query := ""
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil)
env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil, 0)
defer env.close()

wr := New(logutil.NewConsoleLogger(), env.topoServ, env.tmc)
Expand Down
7 changes: 3 additions & 4 deletions go/vt/wrangler/wrangler_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ limitations under the License.
package wrangler

import (
"context"
"flag"
"fmt"
"sync"

"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -76,7 +75,7 @@ func init() {
//----------------------------------------------
// testWranglerEnv

func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string) *testWranglerEnv {
func newWranglerTestEnv(sourceShards, targetShards []string, query string, positions map[string]string, timeUpdated int64) *testWranglerEnv {
flag.Set("tablet_protocol", "WranglerTest")
env := &testWranglerEnv{
workflow: "wrWorkflow",
Expand Down Expand Up @@ -147,7 +146,7 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit
result := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|message",
"int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|varchar"),
fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|0|", bls),
fmt.Sprintf("1|%v|pos||0|Running|vt_target|%d|0|", bls, timeUpdated),
)
env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result)
env.tmc.setVRResults(
Expand Down