From 52d0ea3673eb42b8502ebac92d87de2c466a6f2e Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 1 Jun 2021 11:05:46 +0200 Subject: [PATCH 01/10] Addition of tracker's updateController Signed-off-by: Florent Poinsard --- go/vt/vtgate/schema/tracker.go | 49 +++--- go/vt/vtgate/schema/tracker_test.go | 186 +++++++++-------------- go/vt/vtgate/schema/uptate_controller.go | 77 ++++++++++ 3 files changed, 176 insertions(+), 136 deletions(-) create mode 100644 go/vt/vtgate/schema/uptate_controller.go diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index ea20c6fff78..1e86fcee0c2 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -19,7 +19,6 @@ package schema import ( "context" "sync" - "time" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -47,9 +46,8 @@ type ( ctx context.Context signal func() // a function that we'll call whenever we have new schema data - // map of keyspace currently tracked by the Tracker, the value of type time.Time - // defines when was the last time we tracked the keyspace. - tracked map[keyspace]time.Time + // map of keyspace currently tracked + tracked map[keyspace]*updateController } // Table contains the table name and also, whether the information can be trusted about this table. @@ -64,7 +62,7 @@ func NewTracker(ch chan *discovery.TabletHealth) *Tracker { return &Tracker{ ch: ch, tables: &tableMap{m: map[keyspace]map[tableName][]vindexes.Column{}}, - tracked: map[keyspace]time.Time{}, + tracked: map[keyspace]*updateController{}, ctx: context.Background(), } } @@ -75,6 +73,8 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T if err != nil { return err } + t.mu.Lock() + defer t.mu.Unlock() t.updateTables(target.Keyspace, res) log.Infof("finished loading schema for keyspace %s. Found %d tables", target.Keyspace, len(res.Rows)) return nil @@ -89,22 +89,8 @@ func (t *Tracker) Start() { for { select { case th := <-t.ch: - signal := false - // try to load the keyspace if it was not tracked before - if _, ok := t.tracked[th.Target.Keyspace]; !ok { - err := t.LoadKeyspace(th.Conn, th.Target) - if err != nil { - log.Warningf("Unable to add keyspace to tracker: %v", err) - continue - } - signal = true - } else if len(th.TablesUpdated) > 0 { - t.updateSchema(th) - signal = true - } - if t.signal != nil && signal { - t.signal() - } + ksUpdater := t.getKeyspaceUpdateController(th) + ksUpdater.add(th) case <-ctx.Done(): close(t.ch) return @@ -113,6 +99,26 @@ func (t *Tracker) Start() { }(ctx, t) } +// getKeyspaceUpdateController returns the updateController for the given keyspace +// the updateController will be created if there was none. +func (t *Tracker) getKeyspaceUpdateController(th *discovery.TabletHealth) *updateController { + t.mu.Lock() + defer t.mu.Unlock() + + ksUpdater, ok := t.tracked[th.Target.Keyspace] + if !ok { + init := func(th *discovery.TabletHealth) { + err := t.LoadKeyspace(th.Conn, th.Target) + if err != nil { + log.Warningf("Unable to add keyspace to tracker: %v", err) + } + } + ksUpdater = &updateController{update: t.updateSchema, init: init, signal: t.signal} + t.tracked[th.Target.Keyspace] = ksUpdater + } + return ksUpdater +} + // Stop stops the schema tracking func (t *Tracker) Stop() { log.Info("Stopping schema tracking") @@ -166,7 +172,6 @@ func (t *Tracker) updateSchema(th *discovery.TabletHealth) { } func (t *Tracker) updateTables(keyspace string, res *sqltypes.Result) { - t.tracked[keyspace] = time.Now() for _, row := range res.Rows { tbl := row[0].ToString() colName := row[1].ToString() diff --git a/go/vt/vtgate/schema/tracker_test.go b/go/vt/vtgate/schema/tracker_test.go index ad289b19d1d..85e6cbc31ff 100644 --- a/go/vt/vtgate/schema/tracker_test.go +++ b/go/vt/vtgate/schema/tracker_test.go @@ -17,6 +17,7 @@ limitations under the License. package schema import ( + "fmt" "sync" "testing" "time" @@ -45,34 +46,57 @@ func TestTracking(t *testing.T) { Shard: target.Shard, Type: target.TabletType, } - sbc := sandboxconn.NewSandboxConn(tablet) - ch := make(chan *discovery.TabletHealth) - tracker := NewTracker(ch) fields := sqltypes.MakeTestFields("table_name|col_name|col_type", "varchar|varchar|varchar") - sbc.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult( - fields, - "prior|id|int", - )}) - err := tracker.LoadKeyspace(sbc, target) - require.NoError(t, err) + type delta struct { + result *sqltypes.Result + updTbl []string + } + var ( + d0 = delta{ + result: sqltypes.MakeTestResult( + fields, + "prior|id|int", + ), + updTbl: []string{"prior"}, + } + + d1 = delta{ + result: sqltypes.MakeTestResult( + fields, + "t1|id|int", + "t1|name|varchar", + "t2|id|varchar", + ), + updTbl: []string{"t1", "t2"}, + } + + d2 = delta{ + result: sqltypes.MakeTestResult( + fields, + "t2|id|varchar", + "t2|name|varchar", + "t3|id|datetime", + ), + updTbl: []string{"prior", "t1", "t2", "t3"}, + } + + d3 = delta{ + result: sqltypes.MakeTestResult( + fields, + "t4|name|varchar", + ), + updTbl: []string{"t4"}, + } + ) - tracker.Start() - defer tracker.Stop() testcases := []struct { tName string - result *sqltypes.Result - updTbl []string + deltas []delta exp map[string][]vindexes.Column }{{ - tName: "new tables", - result: sqltypes.MakeTestResult( - fields, - "t1|id|int", - "t1|name|varchar", - "t2|id|varchar", - ), - updTbl: []string{"t1", "t2"}, + tName: "new tables", + deltas: []delta{d0, d1}, exp: map[string][]vindexes.Column{ "t1": { {Name: sqlparser.NewColIdent("id"), Type: querypb.Type_INT32}, @@ -83,14 +107,8 @@ func TestTracking(t *testing.T) { {Name: sqlparser.NewColIdent("id"), Type: querypb.Type_INT32}}, }, }, { - tName: "delete t1 and prior, updated t2 and new t3", - result: sqltypes.MakeTestResult( - fields, - "t2|id|varchar", - "t2|name|varchar", - "t3|id|datetime", - ), - updTbl: []string{"prior", "t1", "t2", "t3"}, + tName: "delete t1 and prior, updated t2 and new t3", + deltas: []delta{d0, d1, d2}, exp: map[string][]vindexes.Column{ "t2": { {Name: sqlparser.NewColIdent("id"), Type: querypb.Type_VARCHAR}, @@ -99,12 +117,8 @@ func TestTracking(t *testing.T) { {Name: sqlparser.NewColIdent("id"), Type: querypb.Type_DATETIME}}, }, }, { - tName: "new t4", - result: sqltypes.MakeTestResult( - fields, - "t4|name|varchar", - ), - updTbl: []string{"t4"}, + tName: "new t4", + deltas: []delta{d0, d1, d2, d3}, exp: map[string][]vindexes.Column{ "t2": { {Name: sqlparser.NewColIdent("id"), Type: querypb.Type_VARCHAR}, @@ -116,98 +130,42 @@ func TestTracking(t *testing.T) { }, }, } - for _, tcase := range testcases { - t.Run(tcase.tName, func(t *testing.T) { - sbc.SetResults([]*sqltypes.Result{tcase.result}) - sbc.Queries = nil - - wg := sync.WaitGroup{} - wg.Add(1) - tracker.RegisterSignalReceiver(func() { - wg.Done() - }) - - ch <- &discovery.TabletHealth{ - Conn: sbc, - Tablet: tablet, - Target: target, - Serving: true, - TablesUpdated: tcase.updTbl, + for i, tcase := range testcases { + t.Run(fmt.Sprintf("%d - %s", i, tcase.tName), func(t *testing.T) { + sbc := sandboxconn.NewSandboxConn(tablet) + ch := make(chan *discovery.TabletHealth) + tracker := NewTracker(ch) + + tracker.Start() + defer tracker.Stop() + + var results []*sqltypes.Result + for _, d := range tcase.deltas { + results = append(results, d.result) } - require.False(t, waitTimeout(&wg, time.Second), "schema was updated but received no signal") - - require.Equal(t, 1, len(sbc.StringQueries())) - - _, keyspacePresent := tracker.tracked[target.Keyspace] - require.Equal(t, true, keyspacePresent) - - for k, v := range tcase.exp { - utils.MustMatch(t, v, tracker.GetColumns("ks", k), "mismatch for table: ", k) - } - }) - } -} - -func TestTrackingWithUntrackedKeyspace(t *testing.T) { - target := &querypb.Target{ - Keyspace: "ks", - Shard: "0", - TabletType: topodatapb.TabletType_MASTER, - Cell: "aa", - } - tablet := &topodatapb.Tablet{ - Keyspace: target.Keyspace, - Shard: target.Shard, - Type: target.TabletType, - } - sbc := sandboxconn.NewSandboxConn(tablet) - ch := make(chan *discovery.TabletHealth) - tracker := NewTracker(ch) - fields := sqltypes.MakeTestFields("table_name|col_name|col_type", "varchar|varchar|varchar") - - tracker.Start() - defer tracker.Stop() - testcases := []struct { - tName string - result *sqltypes.Result - updTbl []string - exp map[string][]vindexes.Column - }{{ - tName: "existing tables", - result: sqltypes.MakeTestResult( - fields, - "prior|id|int", - ), - updTbl: []string{"prior"}, - exp: map[string][]vindexes.Column{ - "prior": { - {Name: sqlparser.NewColIdent("id"), Type: querypb.Type_INT32}}, - }, - }} - - for _, tcase := range testcases { - t.Run(tcase.tName, func(t *testing.T) { - sbc.SetResults([]*sqltypes.Result{tcase.result}) + sbc.SetResults(results) sbc.Queries = nil wg := sync.WaitGroup{} - wg.Add(1) + wg.Add(len(tcase.deltas)) tracker.RegisterSignalReceiver(func() { wg.Done() }) - ch <- &discovery.TabletHealth{ - Conn: sbc, - Tablet: tablet, - Target: target, - Serving: true, - TablesUpdated: tcase.updTbl, + for _, d := range tcase.deltas { + ch <- &discovery.TabletHealth{ + Conn: sbc, + Tablet: tablet, + Target: target, + Serving: true, + TablesUpdated: d.updTbl, + } } require.False(t, waitTimeout(&wg, time.Second), "schema was updated but received no signal") - require.Equal(t, 1, len(sbc.StringQueries())) + require.Equal(t, len(tcase.deltas), len(sbc.StringQueries())) _, keyspacePresent := tracker.tracked[target.Keyspace] require.Equal(t, true, keyspacePresent) diff --git a/go/vt/vtgate/schema/uptate_controller.go b/go/vt/vtgate/schema/uptate_controller.go new file mode 100644 index 00000000000..8433ae4a65b --- /dev/null +++ b/go/vt/vtgate/schema/uptate_controller.go @@ -0,0 +1,77 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schema + +import ( + "sync" + + "vitess.io/vitess/go/vt/discovery" +) + +type ( + queue struct { + items []*discovery.TabletHealth + } + + updateController struct { + mu sync.Mutex + queue *queue + update func(th *discovery.TabletHealth) + init func(th *discovery.TabletHealth) + signal func() + } +) + +func (u *updateController) consume() { + for { + u.mu.Lock() + var item *discovery.TabletHealth + + if len(u.queue.items) == 0 { + u.queue = nil + u.mu.Unlock() + return + } + // todo: scan queue for multiple update from the same shard, be clever + item = u.queue.items[0] + u.queue.items = u.queue.items[1:] + u.mu.Unlock() + + if u.init != nil { + u.init(item) + u.init = nil + } else { + if len(item.TablesUpdated) == 0 { + continue + } + u.update(item) + } + if u.signal != nil { + u.signal() + } + } +} + +func (u *updateController) add(th *discovery.TabletHealth) { + u.mu.Lock() + defer u.mu.Unlock() + if u.queue == nil { + u.queue = &queue{} + go u.consume() + } + u.queue.items = append(u.queue.items, th) +} From bee2b14b22c21504dad9b4abe8dd9c8c282ebf51 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 1 Jun 2021 15:22:12 +0200 Subject: [PATCH 02/10] Improved update controller with merged-items and unit tests Signed-off-by: Florent Poinsard --- go/vt/vtgate/schema/uptate_controller.go | 51 +++++- go/vt/vtgate/schema/uptate_controller_test.go | 153 ++++++++++++++++++ 2 files changed, 197 insertions(+), 7 deletions(-) create mode 100644 go/vt/vtgate/schema/uptate_controller_test.go diff --git a/go/vt/vtgate/schema/uptate_controller.go b/go/vt/vtgate/schema/uptate_controller.go index 8433ae4a65b..a602fac11dc 100644 --- a/go/vt/vtgate/schema/uptate_controller.go +++ b/go/vt/vtgate/schema/uptate_controller.go @@ -18,10 +18,17 @@ package schema import ( "sync" + "time" + + querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/discovery" ) +var ( + consumeDelay = 1 * time.Second +) + type ( queue struct { items []*discovery.TabletHealth @@ -33,31 +40,30 @@ type ( update func(th *discovery.TabletHealth) init func(th *discovery.TabletHealth) signal func() + + shardVersionCount map[*querypb.Target]int } ) func (u *updateController) consume() { for { - u.mu.Lock() - var item *discovery.TabletHealth + time.Sleep(consumeDelay) + u.mu.Lock() if len(u.queue.items) == 0 { u.queue = nil u.mu.Unlock() return } + // todo: scan queue for multiple update from the same shard, be clever - item = u.queue.items[0] - u.queue.items = u.queue.items[1:] + item := u.getItemFromQueueLocked() u.mu.Unlock() if u.init != nil { u.init(item) u.init = nil } else { - if len(item.TablesUpdated) == 0 { - continue - } u.update(item) } if u.signal != nil { @@ -66,9 +72,40 @@ func (u *updateController) consume() { } } +func (u *updateController) itemMatch(i, j int) bool { + left := u.queue.items[i] + right := u.queue.items[j] + return left.Target.Keyspace != right.Target.Keyspace +} + +func (u *updateController) getItemFromQueueLocked() *discovery.TabletHealth { + item := u.queue.items[0] + i := 0 + for ; i < len(u.queue.items) && u.itemMatch(0, i); i++ { + for _, table := range u.queue.items[i].TablesUpdated { + found := false + for _, itemTable := range item.TablesUpdated { + if itemTable == table { + found = true + break + } + } + if !found { + item.TablesUpdated = append(item.TablesUpdated, table) + } + } + } + u.queue.items = u.queue.items[i:] + return item +} + func (u *updateController) add(th *discovery.TabletHealth) { u.mu.Lock() defer u.mu.Unlock() + + if len(th.TablesUpdated) == 0 && u.init == nil { + return + } if u.queue == nil { u.queue = &queue{} go u.consume() diff --git a/go/vt/vtgate/schema/uptate_controller_test.go b/go/vt/vtgate/schema/uptate_controller_test.go new file mode 100644 index 00000000000..42d2a7eca84 --- /dev/null +++ b/go/vt/vtgate/schema/uptate_controller_test.go @@ -0,0 +1,153 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schema + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/vt/discovery" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +func TestMultipleUpdatesFromDifferentShards(t *testing.T) { + type input struct { + shard string + tablesUpdates []string + } + type testCase struct { + updateTables []string + signalExpected, initExpected int + inputs []input + delay time.Duration + init bool + } + tests := []testCase{{ + inputs: []input{{ + shard: "-80", + tablesUpdates: []string{"a"}, + }, { + shard: "80-", + tablesUpdates: []string{"a"}, + }}, + updateTables: []string{"a"}, + signalExpected: 1, + }, { + inputs: []input{{ + shard: "0", + tablesUpdates: []string{"a"}, + }, { + shard: "0", + tablesUpdates: []string{"b"}, + }}, + updateTables: []string{"a", "b"}, + signalExpected: 1, + }, { + inputs: []input{{ + shard: "0", + tablesUpdates: []string{"a"}, + }, { + shard: "0", + tablesUpdates: []string{"b"}, + }}, + updateTables: []string{"b"}, + signalExpected: 2, + delay: 10 * time.Millisecond, + }, { + inputs: []input{{ + shard: "0", + }, { + shard: "0", + }}, + }, { + inputs: []input{{ + shard: "-80", + tablesUpdates: []string{"a"}, + }, { + shard: "80-", + tablesUpdates: []string{"a"}, + }}, + signalExpected: 1, + initExpected: 1, + init: true, + }, + } + consumeDelay = 5 * time.Millisecond + for i, test := range tests { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + var signalNb, initNb int + var updatedTables []string + update := func(th *discovery.TabletHealth) { + updatedTables = th.TablesUpdated + } + signal := func() { + signalNb++ + } + kUpdate := updateController{ + update: update, + signal: signal, + shardVersionCount: map[*querypb.Target]int{}, + } + + if test.init { + kUpdate.init = func(th *discovery.TabletHealth) { + initNb++ + } + } + + for _, in := range test.inputs { + target := &querypb.Target{ + Keyspace: "ks", + Shard: in.shard, + } + tablet := &topodatapb.Tablet{ + Keyspace: target.Keyspace, + Shard: target.Shard, + Type: target.TabletType, + } + d := &discovery.TabletHealth{ + Tablet: tablet, + Target: target, + Serving: true, + TablesUpdated: in.tablesUpdates, + } + if test.delay > 0 { + time.Sleep(test.delay) + } + kUpdate.add(d) + } + + for { + kUpdate.mu.Lock() + done := kUpdate.queue == nil + kUpdate.mu.Unlock() + if done { + break + } + } + + assert.Equal(t, test.signalExpected, signalNb, "signal required") + assert.Equal(t, test.initExpected, initNb, "init required") + assert.Equal(t, test.updateTables, updatedTables, "tables to update") + + }) + } +} From 3772e56ad4bc7a66e297b238828fb28b61ad31f2 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 1 Jun 2021 15:48:07 +0200 Subject: [PATCH 03/10] Unit test for tracker's getKeyspaceUpdateController Signed-off-by: Florent Poinsard --- go/vt/vtgate/schema/tracker_test.go | 34 +++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/go/vt/vtgate/schema/tracker_test.go b/go/vt/vtgate/schema/tracker_test.go index 85e6cbc31ff..cbb0ac9fa11 100644 --- a/go/vt/vtgate/schema/tracker_test.go +++ b/go/vt/vtgate/schema/tracker_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/sqltypes" @@ -190,3 +192,35 @@ func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { return true // timed out } } + +func TestTrackerGetKeyspaceUpdateController(t *testing.T) { + ks3 := &updateController{} + tracker := Tracker{ + tracked: map[keyspace]*updateController{ + "ks3": ks3, + }, + } + + th1 := &discovery.TabletHealth{ + Target: &querypb.Target{Keyspace: "ks1"}, + } + ks1 := tracker.getKeyspaceUpdateController(th1) + + th2 := &discovery.TabletHealth{ + Target: &querypb.Target{Keyspace: "ks2"}, + } + ks2 := tracker.getKeyspaceUpdateController(th2) + + th3 := &discovery.TabletHealth{ + Target: &querypb.Target{Keyspace: "ks3"}, + } + + assert.NotEqual(t, ks1, ks2, "ks1 and ks2 should not be equal, belongs to different keyspace") + assert.Equal(t, ks1, tracker.getKeyspaceUpdateController(th1), "received different updateController") + assert.Equal(t, ks2, tracker.getKeyspaceUpdateController(th2), "received different updateController") + assert.Equal(t, ks3, tracker.getKeyspaceUpdateController(th3), "received different updateController") + + assert.NotNil(t, ks1.init, "ks1 needs to be initialized") + assert.NotNil(t, ks2.init, "ks2 needs to be initialized") + assert.Nil(t, ks3.init, "ks3 already initialized") +} From 1068f9bf84bb31bbb4bef5af0bf8e1a99cef2d95 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 1 Jun 2021 15:58:01 +0200 Subject: [PATCH 04/10] Removed shardVersionCount and added comment for queue emptying Signed-off-by: Florent Poinsard --- go/vt/vtgate/schema/uptate_controller.go | 13 ++----------- go/vt/vtgate/schema/uptate_controller_test.go | 5 ++--- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/go/vt/vtgate/schema/uptate_controller.go b/go/vt/vtgate/schema/uptate_controller.go index a602fac11dc..5cf0d1d05ab 100644 --- a/go/vt/vtgate/schema/uptate_controller.go +++ b/go/vt/vtgate/schema/uptate_controller.go @@ -20,8 +20,6 @@ import ( "sync" "time" - querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/discovery" ) @@ -40,8 +38,6 @@ type ( update func(th *discovery.TabletHealth) init func(th *discovery.TabletHealth) signal func() - - shardVersionCount map[*querypb.Target]int } ) @@ -72,16 +68,10 @@ func (u *updateController) consume() { } } -func (u *updateController) itemMatch(i, j int) bool { - left := u.queue.items[i] - right := u.queue.items[j] - return left.Target.Keyspace != right.Target.Keyspace -} - func (u *updateController) getItemFromQueueLocked() *discovery.TabletHealth { item := u.queue.items[0] i := 0 - for ; i < len(u.queue.items) && u.itemMatch(0, i); i++ { + for ; i < len(u.queue.items); i++ { for _, table := range u.queue.items[i].TablesUpdated { found := false for _, itemTable := range item.TablesUpdated { @@ -95,6 +85,7 @@ func (u *updateController) getItemFromQueueLocked() *discovery.TabletHealth { } } } + // emptying queue's items as all items from 0 to i (length of the queue) are merged u.queue.items = u.queue.items[i:] return item } diff --git a/go/vt/vtgate/schema/uptate_controller_test.go b/go/vt/vtgate/schema/uptate_controller_test.go index 42d2a7eca84..92c2dcd211e 100644 --- a/go/vt/vtgate/schema/uptate_controller_test.go +++ b/go/vt/vtgate/schema/uptate_controller_test.go @@ -102,9 +102,8 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) { signalNb++ } kUpdate := updateController{ - update: update, - signal: signal, - shardVersionCount: map[*querypb.Target]int{}, + update: update, + signal: signal, } if test.init { From 91187f1afcbfe37163dec183568f4342fdc2ccae Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 1 Jun 2021 16:25:22 +0200 Subject: [PATCH 05/10] Checking if updateController init and update are successful Signed-off-by: Florent Poinsard --- go/vt/vtgate/schema/tracker.go | 11 ++++--- go/vt/vtgate/schema/uptate_controller.go | 15 +++++---- go/vt/vtgate/schema/uptate_controller_test.go | 33 ++++++++++++++++--- 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index 1e86fcee0c2..1d2bca86e6d 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -107,11 +107,13 @@ func (t *Tracker) getKeyspaceUpdateController(th *discovery.TabletHealth) *updat ksUpdater, ok := t.tracked[th.Target.Keyspace] if !ok { - init := func(th *discovery.TabletHealth) { + init := func(th *discovery.TabletHealth) bool { err := t.LoadKeyspace(th.Conn, th.Target) if err != nil { log.Warningf("Unable to add keyspace to tracker: %v", err) + return false } + return true } ksUpdater = &updateController{update: t.updateSchema, init: init, signal: t.signal} t.tracked[th.Target.Keyspace] = ksUpdater @@ -146,18 +148,18 @@ func (t *Tracker) Tables(ks string) map[string][]vindexes.Column { return m } -func (t *Tracker) updateSchema(th *discovery.TabletHealth) { +func (t *Tracker) updateSchema(th *discovery.TabletHealth) bool { tables, err := sqltypes.BuildBindVariable(th.TablesUpdated) if err != nil { log.Errorf("failed to read updated tables from TabletHealth: %v", err) - return + return false } bv := map[string]*querypb.BindVariable{"tableNames": tables} res, err := th.Conn.Execute(t.ctx, th.Target, mysql.FetchUpdatedTables, bv, 0, 0, nil) if err != nil { // TODO: these tables should now become non-authoritative log.Warningf("error fetching new schema for %v, making them non-authoritative: %v", th.TablesUpdated, err) - return + return false } t.mu.Lock() @@ -169,6 +171,7 @@ func (t *Tracker) updateSchema(th *discovery.TabletHealth) { t.tables.delete(th.Target.Keyspace, tbl) } t.updateTables(th.Target.Keyspace, res) + return true } func (t *Tracker) updateTables(keyspace string, res *sqltypes.Result) { diff --git a/go/vt/vtgate/schema/uptate_controller.go b/go/vt/vtgate/schema/uptate_controller.go index 5cf0d1d05ab..d76bf8be176 100644 --- a/go/vt/vtgate/schema/uptate_controller.go +++ b/go/vt/vtgate/schema/uptate_controller.go @@ -35,8 +35,8 @@ type ( updateController struct { mu sync.Mutex queue *queue - update func(th *discovery.TabletHealth) - init func(th *discovery.TabletHealth) + update func(th *discovery.TabletHealth) bool + init func(th *discovery.TabletHealth) bool signal func() } ) @@ -56,13 +56,16 @@ func (u *updateController) consume() { item := u.getItemFromQueueLocked() u.mu.Unlock() + var success bool if u.init != nil { - u.init(item) - u.init = nil + success = u.init(item) + if success { + u.init = nil + } } else { - u.update(item) + success = u.update(item) } - if u.signal != nil { + if success && u.signal != nil { u.signal() } } diff --git a/go/vt/vtgate/schema/uptate_controller_test.go b/go/vt/vtgate/schema/uptate_controller_test.go index 92c2dcd211e..c29467a92b9 100644 --- a/go/vt/vtgate/schema/uptate_controller_test.go +++ b/go/vt/vtgate/schema/uptate_controller_test.go @@ -38,7 +38,7 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) { signalExpected, initExpected int inputs []input delay time.Duration - init bool + init, initFail, updateFail bool } tests := []testCase{{ inputs: []input{{ @@ -88,15 +88,39 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) { signalExpected: 1, initExpected: 1, init: true, + }, { + inputs: []input{{ + shard: "-80", + tablesUpdates: []string{"a"}, + }, { + shard: "80-", + tablesUpdates: []string{"a"}, + }}, + signalExpected: 0, + initExpected: 1, + init: true, + initFail: true, + }, { + inputs: []input{{ + shard: "-80", + tablesUpdates: []string{"a"}, + }, { + shard: "80-", + tablesUpdates: []string{"b"}, + }}, + updateTables: []string{"a", "b"}, + signalExpected: 0, + updateFail: true, }, } consumeDelay = 5 * time.Millisecond for i, test := range tests { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + t.Run(fmt.Sprintf("%d", i+1), func(t *testing.T) { var signalNb, initNb int var updatedTables []string - update := func(th *discovery.TabletHealth) { + update := func(th *discovery.TabletHealth) bool { updatedTables = th.TablesUpdated + return !test.updateFail } signal := func() { signalNb++ @@ -107,8 +131,9 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) { } if test.init { - kUpdate.init = func(th *discovery.TabletHealth) { + kUpdate.init = func(th *discovery.TabletHealth) bool { initNb++ + return !test.initFail } } From 217b8b0b1a3b4167f85197e1030a7eee71958468 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 1 Jun 2021 17:38:02 +0200 Subject: [PATCH 06/10] Moved E2E TestVSchemaTrackerInit test to misc Signed-off-by: Florent Poinsard --- go/test/endtoend/vtgate/misc_test.go | 12 ++ .../vtgate/schematracker/schema_test.go | 122 ------------------ 2 files changed, 12 insertions(+), 122 deletions(-) delete mode 100644 go/test/endtoend/vtgate/schematracker/schema_test.go diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index 73656458b46..77de7f40766 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -662,6 +662,18 @@ func TestSchemaTracker(t *testing.T) { require.NoError(t, err) } +func TestVSchemaTrackerInit(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + qr := exec(t, conn, "SHOW VSCHEMA TABLES") + got := fmt.Sprintf("%v", qr.Rows) + want := `[[VARCHAR("aggr_test")] [VARCHAR("dual")] [VARCHAR("t1")] [VARCHAR("t1_id2_idx")] [VARCHAR("t2")] [VARCHAR("t2_id4_idx")] [VARCHAR("t3")] [VARCHAR("t3_id7_idx")] [VARCHAR("t4")] [VARCHAR("t4_id2_idx")] [VARCHAR("t5_null_vindex")] [VARCHAR("t6")] [VARCHAR("t6_id2_idx")] [VARCHAR("t7_fk")] [VARCHAR("t7_xxhash")] [VARCHAR("t7_xxhash_idx")] [VARCHAR("t8")] [VARCHAR("vstream_test")]]` + assert.Equal(t, want, got) +} + func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) { t.Helper() qr := exec(t, conn, query) diff --git a/go/test/endtoend/vtgate/schematracker/schema_test.go b/go/test/endtoend/vtgate/schematracker/schema_test.go deleted file mode 100644 index 5861225cafc..00000000000 --- a/go/test/endtoend/vtgate/schematracker/schema_test.go +++ /dev/null @@ -1,122 +0,0 @@ -/* -Copyright 2021 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package schematracker - -import ( - "context" - "flag" - "fmt" - "os" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/stretchr/testify/assert" - - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/test/endtoend/cluster" -) - -var ( - clusterInstance *cluster.LocalProcessCluster - vtParams mysql.ConnParams - hostname = "localhost" - keyspaceName = "ks" - cell = "zone1" - sqlSchema = ` - create table vt_user ( - id bigint, - name varchar(64), - primary key (id) - ) Engine=InnoDB; - - create table main ( - id bigint, - val varchar(128), - primary key(id) - ) Engine=InnoDB; - - create table test_table ( - id bigint, - val varchar(128), - primary key(id) - ) Engine=InnoDB; -` -) - -func TestMain(m *testing.M) { - defer cluster.PanicHandler(nil) - flag.Parse() - - exitcode := func() int { - clusterInstance = cluster.NewCluster(cell, hostname) - defer clusterInstance.Teardown() - - // Start topo server - if err := clusterInstance.StartTopo(); err != nil { - return 1 - } - - // List of users authorized to execute vschema ddl operations - clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-schema-change-signal"} - clusterInstance.VtGateExtraArgs = []string{"-schema_change_signal"} - - // Start keyspace - keyspace := &cluster.Keyspace{ - Name: keyspaceName, - SchemaSQL: sqlSchema, - } - if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { - return 1 - } - - // Start vtgate - if err := clusterInstance.StartVtgate(); err != nil { - clusterInstance.VtgateProcess = cluster.VtgateProcess{} - return 1 - } - vtParams = mysql.ConnParams{ - Host: clusterInstance.Hostname, - Port: clusterInstance.VtgateMySQLPort, - } - return m.Run() - }() - os.Exit(exitcode) -} - -func TestVSchemaTrackerInit(t *testing.T) { - defer cluster.PanicHandler(t) - ctx := context.Background() - conn, err := mysql.Connect(ctx, &vtParams) - require.NoError(t, err) - defer conn.Close() - - qr := exec(t, conn, "SHOW VSCHEMA TABLES") - got := fmt.Sprintf("%v", qr.Rows) - want := `[[VARCHAR("dual")] [VARCHAR("main")] [VARCHAR("test_table")] [VARCHAR("vt_user")]]` - assert.Equal(t, want, got) -} - -func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { - t.Helper() - qr, err := conn.ExecuteFetch(query, 1000, true) - if err != nil { - t.Fatal(err) - } - return qr -} From 0cbccf2702294fd7aa0adda9eba6d14d70a555f9 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 1 Jun 2021 18:16:53 +0200 Subject: [PATCH 07/10] Improved schema tracker E2E tests Signed-off-by: Florent Poinsard --- .../schematracker/loadkeyspace/schema_load_keyspace_test.go | 1 - .../schematracker/restarttablet/schema_restart_test.go | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vtgate/schematracker/loadkeyspace/schema_load_keyspace_test.go b/go/test/endtoend/vtgate/schematracker/loadkeyspace/schema_load_keyspace_test.go index e6bde675943..56c91a0ead1 100644 --- a/go/test/endtoend/vtgate/schematracker/loadkeyspace/schema_load_keyspace_test.go +++ b/go/test/endtoend/vtgate/schematracker/loadkeyspace/schema_load_keyspace_test.go @@ -146,7 +146,6 @@ func TestNoInitialKeyspace(t *testing.T) { // teardown vtgate to flush logs err = clusterInstance.VtgateProcess.TearDown() require.NoError(t, err) - clusterInstance.VtgateProcess = cluster.VtgateProcess{} // check info logs all, err := ioutil.ReadFile(path.Join(logDir, "vtgate.INFO")) diff --git a/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go b/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go index d8d066fc878..bb780d2c997 100644 --- a/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go +++ b/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go @@ -73,7 +73,6 @@ func TestMain(m *testing.M) { } // List of users authorized to execute vschema ddl operations - clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-schema-change-signal"} clusterInstance.VtGateExtraArgs = []string{"-schema_change_signal"} // Start keyspace @@ -86,7 +85,9 @@ func TestMain(m *testing.M) { } // restart the tablet so that the schema.Engine gets a chance to start with existing schema - if err := clusterInstance.Keyspaces[0].Shards[0].MasterTablet().RestartOnlyTablet(); err != nil { + tablet := clusterInstance.Keyspaces[0].Shards[0].MasterTablet() + tablet.VttabletProcess.ExtraArgs = []string{"-queryserver-config-schema-change-signal"} + if err := tablet.RestartOnlyTablet(); err != nil { return 1 } From 62f27ba838b5c33f88e2118005dd9745e1b34223 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 1 Jun 2021 18:23:12 +0200 Subject: [PATCH 08/10] Removed unused type in tracker Signed-off-by: Florent Poinsard --- go/vt/vtgate/schema/tracker.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index 1d2bca86e6d..ee82f288bf3 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -49,12 +49,6 @@ type ( // map of keyspace currently tracked tracked map[keyspace]*updateController } - - // Table contains the table name and also, whether the information can be trusted about this table. - Table struct { - Name string - UnknownState bool - } ) // NewTracker creates the tracker object. From ced2805c8405e04e2a712cf33b8b9dc8fac63e70 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Tue, 1 Jun 2021 20:14:36 +0200 Subject: [PATCH 09/10] Fixing tracker unit test with newer queuing system Signed-off-by: Florent Poinsard --- go/vt/vtgate/schema/tracker_test.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgate/schema/tracker_test.go b/go/vt/vtgate/schema/tracker_test.go index cbb0ac9fa11..2b05efc4221 100644 --- a/go/vt/vtgate/schema/tracker_test.go +++ b/go/vt/vtgate/schema/tracker_test.go @@ -50,6 +50,8 @@ func TestTracking(t *testing.T) { } fields := sqltypes.MakeTestFields("table_name|col_name|col_type", "varchar|varchar|varchar") + consumeDelay = 200 * time.Millisecond + type delta struct { result *sqltypes.Result updTbl []string @@ -141,16 +143,27 @@ func TestTracking(t *testing.T) { tracker.Start() defer tracker.Stop() - var results []*sqltypes.Result + results := []*sqltypes.Result{{}} for _, d := range tcase.deltas { - results = append(results, d.result) + for _, deltaRow := range d.result.Rows { + same := false + for _, row := range results[0].Rows { + if row[0].String() == deltaRow[0].String() && row[1].String() == deltaRow[1].String() { + same = true + break + } + } + if same == false { + results[0].Rows = append(results[0].Rows, deltaRow) + } + } } sbc.SetResults(results) sbc.Queries = nil wg := sync.WaitGroup{} - wg.Add(len(tcase.deltas)) + wg.Add(1) tracker.RegisterSignalReceiver(func() { wg.Done() }) @@ -167,7 +180,7 @@ func TestTracking(t *testing.T) { require.False(t, waitTimeout(&wg, time.Second), "schema was updated but received no signal") - require.Equal(t, len(tcase.deltas), len(sbc.StringQueries())) + require.Equal(t, 1, len(sbc.StringQueries())) _, keyspacePresent := tracker.tracked[target.Keyspace] require.Equal(t, true, keyspacePresent) From 49f20dc300d5a416f96345081cfdf149a018b18a Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 2 Jun 2021 10:11:37 +0530 Subject: [PATCH 10/10] fix data race for consumeDelay Signed-off-by: Harshit Gangal --- go/vt/vtgate/schema/tracker.go | 18 ++++++++++++------ go/vt/vtgate/schema/tracker_test.go | 4 +--- go/vt/vtgate/schema/uptate_controller.go | 17 +++++++---------- go/vt/vtgate/schema/uptate_controller_test.go | 6 +++--- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index ee82f288bf3..824ada235bc 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -19,6 +19,7 @@ package schema import ( "context" "sync" + "time" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -47,17 +48,22 @@ type ( signal func() // a function that we'll call whenever we have new schema data // map of keyspace currently tracked - tracked map[keyspace]*updateController + tracked map[keyspace]*updateController + consumeDelay time.Duration } ) +// defaultConsumeDelay is the default time, the updateController will wait before checking the schema fetch request queue. +const defaultConsumeDelay = 1 * time.Second + // NewTracker creates the tracker object. func NewTracker(ch chan *discovery.TabletHealth) *Tracker { return &Tracker{ - ch: ch, - tables: &tableMap{m: map[keyspace]map[tableName][]vindexes.Column{}}, - tracked: map[keyspace]*updateController{}, - ctx: context.Background(), + ctx: context.Background(), + ch: ch, + tables: &tableMap{m: map[keyspace]map[tableName][]vindexes.Column{}}, + tracked: map[keyspace]*updateController{}, + consumeDelay: defaultConsumeDelay, } } @@ -109,7 +115,7 @@ func (t *Tracker) getKeyspaceUpdateController(th *discovery.TabletHealth) *updat } return true } - ksUpdater = &updateController{update: t.updateSchema, init: init, signal: t.signal} + ksUpdater = &updateController{update: t.updateSchema, init: init, signal: t.signal, consumeDelay: t.consumeDelay} t.tracked[th.Target.Keyspace] = ksUpdater } return ksUpdater diff --git a/go/vt/vtgate/schema/tracker_test.go b/go/vt/vtgate/schema/tracker_test.go index 2b05efc4221..b1fd591f75b 100644 --- a/go/vt/vtgate/schema/tracker_test.go +++ b/go/vt/vtgate/schema/tracker_test.go @@ -50,8 +50,6 @@ func TestTracking(t *testing.T) { } fields := sqltypes.MakeTestFields("table_name|col_name|col_type", "varchar|varchar|varchar") - consumeDelay = 200 * time.Millisecond - type delta struct { result *sqltypes.Result updTbl []string @@ -139,7 +137,7 @@ func TestTracking(t *testing.T) { sbc := sandboxconn.NewSandboxConn(tablet) ch := make(chan *discovery.TabletHealth) tracker := NewTracker(ch) - + tracker.consumeDelay = 1 * time.Millisecond tracker.Start() defer tracker.Stop() diff --git a/go/vt/vtgate/schema/uptate_controller.go b/go/vt/vtgate/schema/uptate_controller.go index d76bf8be176..4a099bf89b2 100644 --- a/go/vt/vtgate/schema/uptate_controller.go +++ b/go/vt/vtgate/schema/uptate_controller.go @@ -23,27 +23,24 @@ import ( "vitess.io/vitess/go/vt/discovery" ) -var ( - consumeDelay = 1 * time.Second -) - type ( queue struct { items []*discovery.TabletHealth } updateController struct { - mu sync.Mutex - queue *queue - update func(th *discovery.TabletHealth) bool - init func(th *discovery.TabletHealth) bool - signal func() + mu sync.Mutex + queue *queue + consumeDelay time.Duration + update func(th *discovery.TabletHealth) bool + init func(th *discovery.TabletHealth) bool + signal func() } ) func (u *updateController) consume() { for { - time.Sleep(consumeDelay) + time.Sleep(u.consumeDelay) u.mu.Lock() if len(u.queue.items) == 0 { diff --git a/go/vt/vtgate/schema/uptate_controller_test.go b/go/vt/vtgate/schema/uptate_controller_test.go index c29467a92b9..8d0c8ca501b 100644 --- a/go/vt/vtgate/schema/uptate_controller_test.go +++ b/go/vt/vtgate/schema/uptate_controller_test.go @@ -113,7 +113,6 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) { updateFail: true, }, } - consumeDelay = 5 * time.Millisecond for i, test := range tests { t.Run(fmt.Sprintf("%d", i+1), func(t *testing.T) { var signalNb, initNb int @@ -126,8 +125,9 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) { signalNb++ } kUpdate := updateController{ - update: update, - signal: signal, + update: update, + signal: signal, + consumeDelay: 5 * time.Millisecond, } if test.init {