Skip to content

Commit

Permalink
Merge pull request #8989 from planetscale/quiest-schema-tracking
Browse files Browse the repository at this point in the history
schema tracking: minimize logging of errors when loading keyspace with disabled schema tracking
  • Loading branch information
harshit-gangal authored Oct 13, 2021
2 parents b33fa76 + 89045a5 commit 4091720
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestBlockedLoadKeyspace(t *testing.T) {
Name: keyspaceName,
SchemaSQL: sqlSchema,
}
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false)
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false)
require.NoError(t, err)

// Start vtgate with the schema_change_signal flag
Expand All @@ -78,13 +78,16 @@ func TestBlockedLoadKeyspace(t *testing.T) {
require.NoError(t, err)

// wait for addKeyspaceToTracker to timeout
time.Sleep(10 * time.Second)
time.Sleep(30 * time.Second)

// check warning logs
logDir := clusterInstance.VtgateProcess.LogDir
all, err := os.ReadFile(path.Join(logDir, "vtgate-stderr.txt"))
require.NoError(t, err)
require.Contains(t, string(all), "Unable to get initial schema reload")

// This error should not be logged as the initial load itself failed.
require.NotContains(t, string(all), "Unable to add keyspace to tracker")
}

func TestLoadKeyspaceWithNoTablet(t *testing.T) {
Expand All @@ -104,7 +107,7 @@ func TestLoadKeyspaceWithNoTablet(t *testing.T) {
SchemaSQL: sqlSchema,
}
clusterInstance.VtTabletExtraArgs = []string{"-queryserver-config-schema-change-signal"}
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false)
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false)
require.NoError(t, err)

// teardown vttablets
Expand Down
15 changes: 10 additions & 5 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func (t *Tracker) newUpdateController() *updateController {
return &updateController{update: t.updateSchema, reloadKeyspace: t.initKeyspace, signal: t.signal, consumeDelay: t.consumeDelay}
}

func (t *Tracker) initKeyspace(th *discovery.TabletHealth) bool {
func (t *Tracker) initKeyspace(th *discovery.TabletHealth) error {
err := t.LoadKeyspace(th.Conn, th.Target)
if err != nil {
log.Warningf("Unable to add keyspace to tracker: %v", err)
return false
return err
}
return true
return nil
}

// Stop stops the schema tracking
Expand Down Expand Up @@ -217,8 +217,13 @@ func (t *Tracker) RegisterSignalReceiver(f func()) {

// AddNewKeyspace adds keyspace to the tracker.
func (t *Tracker) AddNewKeyspace(conn queryservice.QueryService, target *querypb.Target) error {
t.tracked[target.Keyspace] = t.newUpdateController()
return t.LoadKeyspace(conn, target)
updateController := t.newUpdateController()
t.tracked[target.Keyspace] = updateController
err := t.LoadKeyspace(conn, target)
if err != nil {
updateController.setIgnore(checkIfWeShouldIgnoreKeyspace(err))
}
return err
}

type tableMap struct {
Expand Down
47 changes: 44 additions & 3 deletions go/vt/vtgate/schema/uptate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/mysql"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/discovery"
Expand All @@ -35,9 +37,12 @@ type (
queue *queue
consumeDelay time.Duration
update func(th *discovery.TabletHealth) bool
reloadKeyspace func(th *discovery.TabletHealth) bool
reloadKeyspace func(th *discovery.TabletHealth) error
signal func()
loaded bool

// we'll only log a failed keyspace loading once
ignore bool
}
)

Expand All @@ -54,20 +59,39 @@ func (u *updateController) consume() {

// todo: scan queue for multiple update from the same shard, be clever
item := u.getItemFromQueueLocked()
loaded := u.loaded
u.mu.Unlock()

var success bool
if u.loaded {
if loaded {
success = u.update(item)
} else {
success = u.reloadKeyspace(item)
if err := u.reloadKeyspace(item); err == nil {
success = true
} else {
if checkIfWeShouldIgnoreKeyspace(err) {
u.setIgnore(true)
}
success = false
}
}
if success && u.signal != nil {
u.signal()
}
}
}

// checkIfWeShouldIgnoreKeyspace inspects an error and
// will mark a keyspace as failed and won't try to load more information from it
func checkIfWeShouldIgnoreKeyspace(err error) bool {
sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError)
if sqlErr.Num == mysql.ERBadDb || sqlErr.Num == mysql.ERNoSuchTable {
// if we are missing the db or table, not point in retrying
return true
}
return false
}

func (u *updateController) getItemFromQueueLocked() *discovery.TabletHealth {
item := u.queue.items[0]
itemsCount := len(u.queue.items)
Expand Down Expand Up @@ -114,6 +138,17 @@ func (u *updateController) add(th *discovery.TabletHealth) {
return
}

if len(th.Stats.TableSchemaChanged) > 0 && u.ignore {
// we got an update for this keyspace - we need to stop ignoring it, and reload everything
u.ignore = false
u.loaded = false
}

if u.ignore {
// keyspace marked as not working correctly, so we are ignoring it for now
return
}

if u.queue == nil {
u.queue = &queue{}
go u.consume()
Expand All @@ -126,3 +161,9 @@ func (u *updateController) setLoaded(loaded bool) {
defer u.mu.Unlock()
u.loaded = loaded
}

func (u *updateController) setIgnore(i bool) {
u.mu.Lock()
defer u.mu.Unlock()
u.ignore = i
}
9 changes: 6 additions & 3 deletions go/vt/vtgate/schema/uptate_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) {
update: update,
signal: signal,
consumeDelay: 5 * time.Millisecond,
reloadKeyspace: func(th *discovery.TabletHealth) bool {
reloadKeyspace: func(th *discovery.TabletHealth) error {
initNb++
return !test.initFail
var err error
if test.initFail {
err = fmt.Errorf("error")
}
return err
},
loaded: !test.init,
}
Expand Down Expand Up @@ -170,7 +174,6 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) {
assert.Equal(t, test.signalExpected, signalNb, "signal required")
assert.Equal(t, test.initExpected, initNb, "init required")
assert.Equal(t, test.updateTables, updatedTables, "tables to update")

})
}
}
4 changes: 2 additions & 2 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,19 +306,19 @@ func resolveAndLoadKeyspace(ctx context.Context, srvResolver *srvtopo.Resolver,
log.Warningf("Unable to resolve destination: %v", err)
return
}

timeout := time.After(5 * time.Second)
for {
select {
case <-timeout:
log.Warningf("Unable to get initial schema reload")
log.Warningf("Unable to get initial schema reload for keyspace: %s", keyspace)
return
case <-time.After(500 * time.Millisecond):
for _, shard := range dest {
err := st.AddNewKeyspace(gw, shard.Target)
if err == nil {
return
}
log.Warningf("Unable to add keyspace to tracker: %v", err)
}
}
}
Expand Down

0 comments on commit 4091720

Please sign in to comment.