Skip to content

Commit

Permalink
Merge pull request #5130 from planetscale/ss-vrepl-rowstreamer-fix
Browse files Browse the repository at this point in the history
vreplication: improved rowstreamer
  • Loading branch information
sougou authored Sep 4, 2019
2 parents 45fb488 + 40af468 commit ca686af
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 19 deletions.
28 changes: 12 additions & 16 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,20 @@ func (rs *rowStreamer) buildSelect() (string, error) {
if len(rs.lastpk) != len(rs.pkColumns) {
return "", fmt.Errorf("primary key values don't match length: %v vs %v", rs.lastpk, rs.pkColumns)
}
buf.WriteString(" where (")
buf.WriteString(" where ")
prefix := ""
for _, pk := range rs.pkColumns {
buf.Myprintf("%s%v", prefix, rs.plan.Table.Columns[pk].Name)
prefix = ","
}
buf.WriteString(") > (")
prefix = ""
for _, val := range rs.lastpk {
buf.WriteString(prefix)
prefix = ","
val.EncodeSQL(buf)
for lastcol := len(rs.pkColumns) - 1; lastcol >= 0; lastcol-- {
buf.Myprintf("%s(", prefix)
prefix = " or "
for i, pk := range rs.pkColumns[:lastcol] {
buf.Myprintf("%v = ", rs.plan.Table.Columns[pk].Name)
rs.lastpk[i].EncodeSQL(buf)
buf.Myprintf(" and ")
}
buf.Myprintf("%v > ", rs.plan.Table.Columns[rs.pkColumns[lastcol]].Name)
rs.lastpk[lastcol].EncodeSQL(buf)
buf.Myprintf(")")
}
buf.WriteString(")")
}
buf.Myprintf(" order by ", sqlparser.NewTableIdent(rs.plan.Table.Name))
prefix = ""
Expand Down Expand Up @@ -271,10 +271,6 @@ func (rs *rowStreamer) startStreaming(conn *mysql.Conn) (string, error) {
}()

log.Infof("Locking table %s for copying", rs.plan.Table.Name)
// mysql recommends this before locking tables.
if _, err := lockConn.ExecuteFetch("set autocommit=0", 0, false); err != nil {
return "", err
}
if _, err := lockConn.ExecuteFetch(fmt.Sprintf("lock tables %s read", sqlparser.String(sqlparser.NewTableIdent(rs.plan.Table.Name))), 0, false); err != nil {
return "", err
}
Expand Down
26 changes: 23 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ func TestStreamRowsScan(t *testing.T) {
// No PK
"create table t3(id int, val varbinary(128))",
"insert into t3 values (1, 'aaa'), (2, 'bbb')",
// Three-column PK
"create table t4(id1 int, id2 int, id3 int, val varbinary(128), primary key(id1, id2, id3))",
"insert into t4 values (1, 2, 3, 'aaa'), (2, 3, 4, 'bbb')",
})
defer execStatements(t, []string{
"drop table t1",
"drop table t2",
"drop table t3",
"drop table t4",
})
engine.se.Reload(context.Background())

Expand All @@ -63,7 +67,7 @@ func TestStreamRowsScan(t *testing.T) {
`fields:<name:"id" type:INT32 > fields:<name:"val" type:VARBINARY > pkfields:<name:"id" type:INT32 > `,
`rows:<lengths:1 lengths:3 values:"2bbb" > lastpk:<lengths:1 values:"2" > `,
}
wantQuery = "select id, val from t1 where (id) > (1) order by id"
wantQuery = "select id, val from t1 where (id > 1) order by id"
checkStream(t, "select * from t1", []sqltypes.Value{sqltypes.NewInt64(1)}, wantQuery, wantStream)

// t1: different column ordering
Expand All @@ -87,7 +91,7 @@ func TestStreamRowsScan(t *testing.T) {
`fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"val" type:VARBINARY > pkfields:<name:"id1" type:INT32 > pkfields:<name:"id2" type:INT32 > `,
`rows:<lengths:1 lengths:1 lengths:3 values:"13bbb" > lastpk:<lengths:1 lengths:1 values:"13" > `,
}
wantQuery = "select id1, id2, val from t2 where (id1,id2) > (1,2) order by id1, id2"
wantQuery = "select id1, id2, val from t2 where (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2"
checkStream(t, "select * from t2", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, wantQuery, wantStream)

// t3: all rows
Expand All @@ -103,8 +107,24 @@ func TestStreamRowsScan(t *testing.T) {
`fields:<name:"id" type:INT32 > fields:<name:"val" type:VARBINARY > pkfields:<name:"id" type:INT32 > pkfields:<name:"val" type:VARBINARY > `,
`rows:<lengths:1 lengths:3 values:"2bbb" > lastpk:<lengths:1 lengths:3 values:"2bbb" > `,
}
wantQuery = "select id, val from t3 where (id,val) > (1,'aaa') order by id, val"
wantQuery = "select id, val from t3 where (id = 1 and val > 'aaa') or (id > 1) order by id, val"
checkStream(t, "select * from t3", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewVarBinary("aaa")}, wantQuery, wantStream)

// t4: all rows
wantStream = []string{
`fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:INT32 > fields:<name:"val" type:VARBINARY > pkfields:<name:"id1" type:INT32 > pkfields:<name:"id2" type:INT32 > pkfields:<name:"id3" type:INT32 > `,
`rows:<lengths:1 lengths:1 lengths:1 lengths:3 values:"123aaa" > rows:<lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb" > lastpk:<lengths:1 lengths:1 lengths:1 values:"234" > `,
}
wantQuery = "select id1, id2, id3, val from t4 order by id1, id2, id3"
checkStream(t, "select * from t4", nil, wantQuery, wantStream)

// t4: lastpk: 1,2,3
wantStream = []string{
`fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:INT32 > fields:<name:"val" type:VARBINARY > pkfields:<name:"id1" type:INT32 > pkfields:<name:"id2" type:INT32 > pkfields:<name:"id3" type:INT32 > `,
`rows:<lengths:1 lengths:1 lengths:1 lengths:3 values:"234bbb" > lastpk:<lengths:1 lengths:1 lengths:1 values:"234" > `,
}
wantQuery = "select id1, id2, id3, val from t4 where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3"
checkStream(t, "select * from t4", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2), sqltypes.NewInt64(3)}, wantQuery, wantStream)
}

func TestStreamRowsUnicode(t *testing.T) {
Expand Down

0 comments on commit ca686af

Please sign in to comment.