Skip to content

Commit

Permalink
Merge pull request #8257 from planetscale/rn-vr-on-ddl-ignore-pseudo-…
Browse files Browse the repository at this point in the history
…gtid

Tracker/VStreamer: only reload schema for tables in current database and not for internal table artifacts
  • Loading branch information
rohit-nayak-ps authored Jun 3, 2021
2 parents e018d0f + 1b9c8e9 commit 236ddc4
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 7 deletions.
37 changes: 34 additions & 3 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/schema"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -136,7 +138,7 @@ func (tr *Tracker) process(ctx context.Context) {
defer tr.env.LogError()
defer tr.wg.Done()
if err := tr.possiblyInsertInitialSchema(ctx); err != nil {
log.Errorf("possiblyInsertInitialSchema eror: %v", err)
log.Errorf("error inserting initial schema: %v", err)
return
}

Expand All @@ -153,10 +155,13 @@ func (tr *Tracker) process(ctx context.Context) {
if event.Type == binlogdatapb.VEventType_GTID {
gtid = event.Gtid
}
if event.Type == binlogdatapb.VEventType_DDL {
if event.Type == binlogdatapb.VEventType_DDL &&
MustReloadSchemaOnDDL(event.Statement, tr.engine.cp.DBName()) {

if err := tr.schemaUpdated(gtid, event.Statement, event.Timestamp); err != nil {
tr.env.Stats().ErrorCounters.Add(vtrpcpb.Code_INTERNAL.String(), 1)
log.Errorf("Error updating schema: %s", sqlparser.TruncateForLog(err.Error()))
log.Errorf("Error updating schema: %s for ddl %s, gtid %s",
sqlparser.TruncateForLog(err.Error()), event.Statement, gtid)
}
}
}
Expand Down Expand Up @@ -278,3 +283,29 @@ func encodeString(in string) string {
sqltypes.NewVarChar(in).EncodeSQL(buf)
return buf.String()
}

// MustReloadSchemaOnDDL returns true if the ddl is for the db which is part of the workflow and is not an online ddl artifact
func MustReloadSchemaOnDDL(sql string, dbname string) bool {
ast, err := sqlparser.Parse(sql)
if err != nil {
return false
}
switch stmt := ast.(type) {
case sqlparser.DBDDLStatement:
return false
case sqlparser.DDLStatement:
table := stmt.GetTable()
if table.IsEmpty() {
return false
}
if !table.Qualifier.IsEmpty() && table.Qualifier.String() != dbname {
return false
}
tableName := table.Name.String()
if schema.IsOnlineDDLTableName(tableName) {
return false
}
return true
}
return false
}
29 changes: 26 additions & 3 deletions go/vt/vttablet/tabletserver/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestTracker(t *testing.T) {
},
{
Type: binlogdatapb.VEventType_GTID,
Statement: "",
Statement: "", // This event should cause an error updating schema since gtid is bad
}, {
Type: binlogdatapb.VEventType_DDL,
Statement: ddl1,
Expand All @@ -84,9 +84,8 @@ func TestTracker(t *testing.T) {
<-vs.done
cancel()
tracker.Close()
// Two of those events should have caused an error.
final := env.Stats().ErrorCounters.Counts()["INTERNAL"]
require.Equal(t, initial+2, final)
require.Equal(t, initial+1, final)
require.True(t, initialSchemaInserted)
}

Expand Down Expand Up @@ -149,3 +148,27 @@ func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []
<-ctx.Done()
return nil
}

func TestMustReloadSchemaOnDDL(t *testing.T) {
type testcase struct {
query string
dbname string
want bool
}
db1, db2 := "db1", "db2"
testcases := []*testcase{
{"create table x(i int);", db1, true},
{"bad", db2, false},
{"create table db2.x(i int);", db2, true},
{"create table db1.x(i int);", db2, false},
{"create table _vt.x(i int);", db1, false},
{"DROP VIEW IF EXISTS `pseudo_gtid`.`_pseudo_gtid_hint__asc:55B364E3:0000000000056EE2:6DD57B85`", db2, false},
{"create database db1;", db1, false},
{"create table db1._4e5dcf80_354b_11eb_82cd_f875a4d24e90_20201203114014_gho(i int);", db1, false},
}
for _, tc := range testcases {
t.Run("", func(t *testing.T) {
require.Equal(t, tc.want, MustReloadSchemaOnDDL(tc.query, tc.dbname))
})
}
}
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
Type: binlogdatapb.VEventType_OTHER,
})
}
vs.se.ReloadAt(context.Background(), vs.pos)
if schema.MustReloadSchemaOnDDL(q.SQL, vs.cp.DBName()) {
vs.se.ReloadAt(context.Background(), vs.pos)
}
case sqlparser.StmtSavepoint:
mustSend := mustSendStmt(q, vs.cp.DBName())
if mustSend {
Expand Down

0 comments on commit 236ddc4

Please sign in to comment.