From 1b9c8e95c6bf5f85001ba5535c406693243d3f3f Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 3 Jun 2021 16:08:42 +0200 Subject: [PATCH] Only reload/update tracked schema in the current schema and not for online ddl artifacts Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletserver/schema/tracker.go | 37 +++++++++++++++++-- .../tabletserver/schema/tracker_test.go | 29 +++++++++++++-- .../tabletserver/vstreamer/vstreamer.go | 4 +- 3 files changed, 63 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 8193fa3dbdd..c76dd5ca6fc 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -25,6 +25,8 @@ import ( "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/sqlparser" @@ -136,7 +138,7 @@ func (tr *Tracker) process(ctx context.Context) { defer tr.env.LogError() defer tr.wg.Done() if err := tr.possiblyInsertInitialSchema(ctx); err != nil { - log.Errorf("possiblyInsertInitialSchema eror: %v", err) + log.Errorf("error inserting initial schema: %v", err) return } @@ -153,10 +155,13 @@ func (tr *Tracker) process(ctx context.Context) { if event.Type == binlogdatapb.VEventType_GTID { gtid = event.Gtid } - if event.Type == binlogdatapb.VEventType_DDL { + if event.Type == binlogdatapb.VEventType_DDL && + MustReloadSchemaOnDDL(event.Statement, tr.engine.cp.DBName()) { + if err := tr.schemaUpdated(gtid, event.Statement, event.Timestamp); err != nil { tr.env.Stats().ErrorCounters.Add(vtrpcpb.Code_INTERNAL.String(), 1) - log.Errorf("Error updating schema: %s", sqlparser.TruncateForLog(err.Error())) + log.Errorf("Error updating schema: %s for ddl %s, gtid %s", + sqlparser.TruncateForLog(err.Error()), event.Statement, gtid) } } } @@ -278,3 +283,29 @@ func encodeString(in string) string { sqltypes.NewVarChar(in).EncodeSQL(buf) return buf.String() } + +// MustReloadSchemaOnDDL returns true if the ddl is for the db which is part of the workflow and is not an online ddl artifact +func MustReloadSchemaOnDDL(sql string, dbname string) bool { + ast, err := sqlparser.Parse(sql) + if err != nil { + return false + } + switch stmt := ast.(type) { + case sqlparser.DBDDLStatement: + return false + case sqlparser.DDLStatement: + table := stmt.GetTable() + if table.IsEmpty() { + return false + } + if !table.Qualifier.IsEmpty() && table.Qualifier.String() != dbname { + return false + } + tableName := table.Name.String() + if schema.IsOnlineDDLTableName(tableName) { + return false + } + return true + } + return false +} diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 8828ece4eb2..39d97dc46a6 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -61,7 +61,7 @@ func TestTracker(t *testing.T) { }, { Type: binlogdatapb.VEventType_GTID, - Statement: "", + Statement: "", // This event should cause an error updating schema since gtid is bad }, { Type: binlogdatapb.VEventType_DDL, Statement: ddl1, @@ -84,9 +84,8 @@ func TestTracker(t *testing.T) { <-vs.done cancel() tracker.Close() - // Two of those events should have caused an error. final := env.Stats().ErrorCounters.Counts()["INTERNAL"] - require.Equal(t, initial+2, final) + require.Equal(t, initial+1, final) require.True(t, initialSchemaInserted) } @@ -149,3 +148,27 @@ func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs [] <-ctx.Done() return nil } + +func TestMustReloadSchemaOnDDL(t *testing.T) { + type testcase struct { + query string + dbname string + want bool + } + db1, db2 := "db1", "db2" + testcases := []*testcase{ + {"create table x(i int);", db1, true}, + {"bad", db2, false}, + {"create table db2.x(i int);", db2, true}, + {"create table db1.x(i int);", db2, false}, + {"create table _vt.x(i int);", db1, false}, + {"DROP VIEW IF EXISTS `pseudo_gtid`.`_pseudo_gtid_hint__asc:55B364E3:0000000000056EE2:6DD57B85`", db2, false}, + {"create database db1;", db1, false}, + {"create table db1._4e5dcf80_354b_11eb_82cd_f875a4d24e90_20201203114014_gho(i int);", db1, false}, + } + for _, tc := range testcases { + t.Run("", func(t *testing.T) { + require.Equal(t, tc.want, MustReloadSchemaOnDDL(tc.query, tc.dbname)) + }) + } +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 65347a2cca6..a2496711b5c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -463,7 +463,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e Type: binlogdatapb.VEventType_OTHER, }) } - vs.se.ReloadAt(context.Background(), vs.pos) + if schema.MustReloadSchemaOnDDL(q.SQL, vs.cp.DBName()) { + vs.se.ReloadAt(context.Background(), vs.pos) + } case sqlparser.StmtSavepoint: mustSend := mustSendStmt(q, vs.cp.DBName()) if mustSend {