Skip to content

Commit

Permalink
Merge pull request #8224 from planetscale/schema-tracking-vtgate-upda…
Browse files Browse the repository at this point in the history
…te-controller

Schema tracking: One schema load at a time per keyspace
  • Loading branch information
harshit-gangal authored Jun 2, 2021
2 parents b7f9e56 + 49f20dc commit e1c70dd
Show file tree
Hide file tree
Showing 8 changed files with 453 additions and 270 deletions.
12 changes: 12 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,18 @@ func TestSchemaTracker(t *testing.T) {
require.NoError(t, err)
}

func TestVSchemaTrackerInit(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

qr := exec(t, conn, "SHOW VSCHEMA TABLES")
got := fmt.Sprintf("%v", qr.Rows)
want := `[[VARCHAR("aggr_test")] [VARCHAR("dual")] [VARCHAR("t1")] [VARCHAR("t1_id2_idx")] [VARCHAR("t2")] [VARCHAR("t2_id4_idx")] [VARCHAR("t3")] [VARCHAR("t3_id7_idx")] [VARCHAR("t4")] [VARCHAR("t4_id2_idx")] [VARCHAR("t5_null_vindex")] [VARCHAR("t6")] [VARCHAR("t6_id2_idx")] [VARCHAR("t7_fk")] [VARCHAR("t7_xxhash")] [VARCHAR("t7_xxhash_idx")] [VARCHAR("t8")] [VARCHAR("vstream_test")]]`
assert.Equal(t, want, got)
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := exec(t, conn, query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ func TestNoInitialKeyspace(t *testing.T) {
// teardown vtgate to flush logs
err = clusterInstance.VtgateProcess.TearDown()
require.NoError(t, err)
clusterInstance.VtgateProcess = cluster.VtgateProcess{}

// check info logs
all, err := ioutil.ReadFile(path.Join(logDir, "vtgate.INFO"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func TestMain(m *testing.M) {
}

// List of users authorized to execute vschema ddl operations
clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-schema-change-signal"}
clusterInstance.VtGateExtraArgs = []string{"-schema_change_signal"}

// Start keyspace
Expand All @@ -86,7 +85,9 @@ func TestMain(m *testing.M) {
}

// restart the tablet so that the schema.Engine gets a chance to start with existing schema
if err := clusterInstance.Keyspaces[0].Shards[0].MasterTablet().RestartOnlyTablet(); err != nil {
tablet := clusterInstance.Keyspaces[0].Shards[0].MasterTablet()
tablet.VttabletProcess.ExtraArgs = []string{"-queryserver-config-schema-change-signal"}
if err := tablet.RestartOnlyTablet(); err != nil {
return 1
}

Expand Down
122 changes: 0 additions & 122 deletions go/test/endtoend/vtgate/schematracker/schema_test.go

This file was deleted.

74 changes: 41 additions & 33 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,23 @@ type (
ctx context.Context
signal func() // a function that we'll call whenever we have new schema data

// map of keyspace currently tracked by the Tracker, the value of type time.Time
// defines when was the last time we tracked the keyspace.
tracked map[keyspace]time.Time
}

// Table contains the table name and also, whether the information can be trusted about this table.
Table struct {
Name string
UnknownState bool
// map of keyspace currently tracked
tracked map[keyspace]*updateController
consumeDelay time.Duration
}
)

// defaultConsumeDelay is the default time, the updateController will wait before checking the schema fetch request queue.
const defaultConsumeDelay = 1 * time.Second

// NewTracker creates the tracker object.
func NewTracker(ch chan *discovery.TabletHealth) *Tracker {
return &Tracker{
ch: ch,
tables: &tableMap{m: map[keyspace]map[tableName][]vindexes.Column{}},
tracked: map[keyspace]time.Time{},
ctx: context.Background(),
ctx: context.Background(),
ch: ch,
tables: &tableMap{m: map[keyspace]map[tableName][]vindexes.Column{}},
tracked: map[keyspace]*updateController{},
consumeDelay: defaultConsumeDelay,
}
}

Expand All @@ -75,6 +73,8 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T
if err != nil {
return err
}
t.mu.Lock()
defer t.mu.Unlock()
t.updateTables(target.Keyspace, res)
log.Infof("finished loading schema for keyspace %s. Found %d tables", target.Keyspace, len(res.Rows))
return nil
Expand All @@ -89,22 +89,8 @@ func (t *Tracker) Start() {
for {
select {
case th := <-t.ch:
signal := false
// try to load the keyspace if it was not tracked before
if _, ok := t.tracked[th.Target.Keyspace]; !ok {
err := t.LoadKeyspace(th.Conn, th.Target)
if err != nil {
log.Warningf("Unable to add keyspace to tracker: %v", err)
continue
}
signal = true
} else if len(th.TablesUpdated) > 0 {
t.updateSchema(th)
signal = true
}
if t.signal != nil && signal {
t.signal()
}
ksUpdater := t.getKeyspaceUpdateController(th)
ksUpdater.add(th)
case <-ctx.Done():
close(t.ch)
return
Expand All @@ -113,6 +99,28 @@ func (t *Tracker) Start() {
}(ctx, t)
}

// getKeyspaceUpdateController returns the updateController for the given keyspace
// the updateController will be created if there was none.
func (t *Tracker) getKeyspaceUpdateController(th *discovery.TabletHealth) *updateController {
t.mu.Lock()
defer t.mu.Unlock()

ksUpdater, ok := t.tracked[th.Target.Keyspace]
if !ok {
init := func(th *discovery.TabletHealth) bool {
err := t.LoadKeyspace(th.Conn, th.Target)
if err != nil {
log.Warningf("Unable to add keyspace to tracker: %v", err)
return false
}
return true
}
ksUpdater = &updateController{update: t.updateSchema, init: init, signal: t.signal, consumeDelay: t.consumeDelay}
t.tracked[th.Target.Keyspace] = ksUpdater
}
return ksUpdater
}

// Stop stops the schema tracking
func (t *Tracker) Stop() {
log.Info("Stopping schema tracking")
Expand Down Expand Up @@ -140,18 +148,18 @@ func (t *Tracker) Tables(ks string) map[string][]vindexes.Column {
return m
}

func (t *Tracker) updateSchema(th *discovery.TabletHealth) {
func (t *Tracker) updateSchema(th *discovery.TabletHealth) bool {
tables, err := sqltypes.BuildBindVariable(th.TablesUpdated)
if err != nil {
log.Errorf("failed to read updated tables from TabletHealth: %v", err)
return
return false
}
bv := map[string]*querypb.BindVariable{"tableNames": tables}
res, err := th.Conn.Execute(t.ctx, th.Target, mysql.FetchUpdatedTables, bv, 0, 0, nil)
if err != nil {
// TODO: these tables should now become non-authoritative
log.Warningf("error fetching new schema for %v, making them non-authoritative: %v", th.TablesUpdated, err)
return
return false
}

t.mu.Lock()
Expand All @@ -163,10 +171,10 @@ func (t *Tracker) updateSchema(th *discovery.TabletHealth) {
t.tables.delete(th.Target.Keyspace, tbl)
}
t.updateTables(th.Target.Keyspace, res)
return true
}

func (t *Tracker) updateTables(keyspace string, res *sqltypes.Result) {
t.tracked[keyspace] = time.Now()
for _, row := range res.Rows {
tbl := row[0].ToString()
colName := row[1].ToString()
Expand Down
Loading

0 comments on commit e1c70dd

Please sign in to comment.