Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resurrect gateway interface and command line option #5992

Merged
merged 2 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func initVtgateExecutor(vSchemaStr string, opts *Options) error {

func newFakeResolver(opts *Options, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vtgate.Resolver {
ctx := context.Background()
gw := vtgate.NewTabletGateway(ctx, hc, serv, cell, 3)
gw := vtgate.GatewayCreator()(ctx, hc, serv, cell, 3)
gw.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_REPLICA})

txMode := vtgatepb.TransactionMode_MULTI
Expand Down
73 changes: 25 additions & 48 deletions go/vt/vtgate/tabletgateway.go → go/vt/vtgate/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package vtgate
import (
"flag"
"fmt"
"golang.org/x/net/context"
"math/rand"
"sort"
"strings"
"sync"
"time"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/topotools"

"vitess.io/vitess/go/flagutil"
Expand All @@ -44,29 +45,27 @@ import (
)

var (
_ = flag.String("gateway_implementation", "", "Deprecated")
initialTabletTimeout = flag.Duration("gateway_initial_tablet_timeout", 30*time.Second, "At startup, the gateway will wait up to that duration to get one tablet per keyspace/shard/tablettype")
// KeyspacesToWatch - if provided this specifies which keyspaces should be
// visible to a vtgate. By default the vtgate will allow access to any
// keyspace.
cellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets")
refreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval")
refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")

allowedTabletTypes []topodatapb.TabletType

tabletFilters flagutil.StringListValue
KeyspacesToWatch flagutil.StringListValue
tabletFilters flagutil.StringListValue
)

const (
gatewayImplementationDiscovery = "discoverygateway"
)

func init() {
flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema")
flag.Var(&tabletFilters, "tablet_filters", "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch")
topoproto.TabletTypeListVar(&allowedTabletTypes, "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to")
RegisterGatewayCreator(gatewayImplementationDiscovery, createDiscoveryGateway)
}

type tabletGateway struct {
type discoveryGateway struct {
queryservice.QueryService
hc discovery.HealthCheck
tsc *discovery.TabletStatsCache
Expand All @@ -88,7 +87,11 @@ type tabletGateway struct {
buffer *buffer.Buffer
}

func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string, retryCount int) *tabletGateway {
func createDiscoveryGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string, retryCount int) Gateway {
return NewDiscoveryGateway(ctx, hc, serv, cell, retryCount)
}

func NewDiscoveryGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string, retryCount int) *discoveryGateway {
var topoServer *topo.Server
if serv != nil {
var err error
Expand All @@ -98,7 +101,7 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop
}
}

dg := &tabletGateway{
dg := &discoveryGateway{
hc: hc,
tsc: discovery.NewTabletStatsCacheDoNotSetListener(topoServer, cell),
srvTopoServer: serv,
Expand Down Expand Up @@ -142,7 +145,7 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop

// RegisterStats registers the stats to export the lag since the last refresh
// and the checksum of the topology
func (dg *tabletGateway) RegisterStats() {
func (dg *discoveryGateway) RegisterStats() {
stats.NewGaugeDurationFunc(
"TopologyWatcherMaxRefreshLag",
"maximum time since the topology watcher refreshed a cell",
Expand All @@ -158,7 +161,7 @@ func (dg *tabletGateway) RegisterStats() {

// topologyWatcherMaxRefreshLag returns the maximum lag since the watched
// cells were refreshed from the topo server
func (dg *tabletGateway) topologyWatcherMaxRefreshLag() time.Duration {
func (dg *discoveryGateway) topologyWatcherMaxRefreshLag() time.Duration {
var lag time.Duration
for _, tw := range dg.tabletsWatchers {
cellLag := tw.RefreshLag()
Expand All @@ -170,7 +173,7 @@ func (dg *tabletGateway) topologyWatcherMaxRefreshLag() time.Duration {
}

// topologyWatcherChecksum returns a checksum of the topology watcher state
func (dg *tabletGateway) topologyWatcherChecksum() int64 {
func (dg *discoveryGateway) topologyWatcherChecksum() int64 {
var checksum int64
for _, tw := range dg.tabletsWatchers {
checksum = checksum ^ int64(tw.TopoChecksum())
Expand All @@ -180,7 +183,7 @@ func (dg *tabletGateway) topologyWatcherChecksum() int64 {

// StatsUpdate forwards HealthCheck updates to TabletStatsCache and MasterBuffer.
// It is part of the discovery.HealthCheckStatsListener interface.
func (dg *tabletGateway) StatsUpdate(ts *discovery.TabletStats) {
func (dg *discoveryGateway) StatsUpdate(ts *discovery.TabletStats) {
dg.tsc.StatsUpdate(ts)

if ts.Target.TabletType == topodatapb.TabletType_MASTER {
Expand All @@ -189,7 +192,7 @@ func (dg *tabletGateway) StatsUpdate(ts *discovery.TabletStats) {
}

// WaitForTablets is part of the gateway.Gateway interface.
func (dg *tabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait []topodatapb.TabletType) error {
func (dg *discoveryGateway) WaitForTablets(ctx context.Context, tabletTypesToWait []topodatapb.TabletType) error {
// Skip waiting for tablets if we are not told to do so.
if len(tabletTypesToWait) == 0 {
return nil
Expand All @@ -206,7 +209,7 @@ func (dg *tabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait [

// Close shuts down underlying connections.
// This function hides the inner implementation.
func (dg *tabletGateway) Close(ctx context.Context) error {
func (dg *discoveryGateway) Close(ctx context.Context) error {
dg.buffer.Shutdown()
for _, ctw := range dg.tabletsWatchers {
ctw.Stop()
Expand All @@ -216,7 +219,7 @@ func (dg *tabletGateway) Close(ctx context.Context) error {

// CacheStatus returns a list of TabletCacheStatus per
// keyspace/shard/tablet_type.
func (dg *tabletGateway) CacheStatus() TabletCacheStatusList {
func (dg *discoveryGateway) CacheStatus() TabletCacheStatusList {
dg.mu.RLock()
res := make(TabletCacheStatusList, 0, len(dg.statusAggregators))
for _, aggr := range dg.statusAggregators {
Expand All @@ -232,7 +235,7 @@ func (dg *tabletGateway) CacheStatus() TabletCacheStatusList {
// the middle of a transaction. While returning the error check if it maybe a result of
// a resharding event, and set the re-resolve bit and let the upper layers
// re-resolve and retry.
func (dg *tabletGateway) withRetry(ctx context.Context, target *querypb.Target, unused queryservice.QueryService, name string, inTransaction bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
func (dg *discoveryGateway) withRetry(ctx context.Context, target *querypb.Target, unused queryservice.QueryService, name string, inTransaction bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error {
var tabletLastUsed *topodatapb.Tablet
var err error
invalidTablets := make(map[string]bool)
Expand Down Expand Up @@ -371,13 +374,13 @@ func nextTablet(cell string, tablets []discovery.TabletStats, offset, length int
return -1
}

func (dg *tabletGateway) updateStats(target *querypb.Target, startTime time.Time, err error) {
func (dg *discoveryGateway) updateStats(target *querypb.Target, startTime time.Time, err error) {
elapsed := time.Since(startTime)
aggr := dg.getStatsAggregator(target)
aggr.UpdateQueryInfo("", target.TabletType, elapsed, err != nil)
}

func (dg *tabletGateway) getStatsAggregator(target *querypb.Target) *TabletStatusAggregator {
func (dg *discoveryGateway) getStatsAggregator(target *querypb.Target) *TabletStatusAggregator {
key := fmt.Sprintf("%v/%v/%v", target.Keyspace, target.Shard, target.TabletType.String())

// get existing aggregator
Expand All @@ -399,32 +402,6 @@ func (dg *tabletGateway) getStatsAggregator(target *querypb.Target) *TabletStatu
return aggr
}

// WaitForTablets is a helper method to wait for the provided tablets,
// up until the *initialTabletTimeout. It will log what it is doing.
// Note it has the same name as the tabletGateway's method, as it
// just calls it.
func WaitForTablets(gw *tabletGateway, tabletTypesToWait []topodatapb.TabletType) error {
log.Infof("Gateway waiting for serving tablets of types %v ...", tabletTypesToWait)
ctx, cancel := context.WithTimeout(context.Background(), *initialTabletTimeout)
defer cancel()

err := gw.WaitForTablets(ctx, tabletTypesToWait)
switch err {
case nil:
// Log so we know everything is fine.
log.Infof("Waiting for tablets completed")
case context.DeadlineExceeded:
// In this scenario, we were able to reach the
// topology service, but some tablets may not be
// ready. We just warn and keep going.
log.Warningf("Timeout waiting for all keyspaces / shards to have healthy tablets of types %v, may be in degraded mode", tabletTypesToWait)
err = nil
default:
// Nothing to do here, the caller will log.Fatalf.
}
return err
}

// NewShardError returns a new error with the shard info amended.
func NewShardError(in error, target *querypb.Target, tablet *topodatapb.Tablet) error {
if in == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package vtgate

import (
"fmt"
"golang.org/x/net/context"
"strings"
"testing"

"golang.org/x/net/context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/srvtopo/srvtopotest"
Expand All @@ -36,31 +37,31 @@ import (
)

func TestDiscoveryGatewayExecute(t *testing.T) {
testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error {
_, err := dg.Execute(context.Background(), target, "query", nil, 0, nil)
return err
})
testDiscoveryGatewayTransact(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayTransact(t, func(dg *discoveryGateway, target *querypb.Target) error {
_, err := dg.Execute(context.Background(), target, "query", nil, 1, nil)
return err
})
}

func TestDiscoveryGatewayExecuteBatch(t *testing.T) {
testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error {
queries := []*querypb.BoundQuery{{Sql: "query", BindVariables: nil}}
_, err := dg.ExecuteBatch(context.Background(), target, queries, false, 0, nil)
return err
})
testDiscoveryGatewayTransact(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayTransact(t, func(dg *discoveryGateway, target *querypb.Target) error {
queries := []*querypb.BoundQuery{{Sql: "query", BindVariables: nil}}
_, err := dg.ExecuteBatch(context.Background(), target, queries, false, 1, nil)
return err
})
}

func TestDiscoveryGatewayExecuteStream(t *testing.T) {
testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error {
err := dg.StreamExecute(context.Background(), target, "query", nil, 0, nil, func(qr *sqltypes.Result) error {
return nil
})
Expand All @@ -69,33 +70,33 @@ func TestDiscoveryGatewayExecuteStream(t *testing.T) {
}

func TestDiscoveryGatewayBegin(t *testing.T) {
testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error {
_, err := dg.Begin(context.Background(), target, nil)
return err
})
}

func TestDiscoveryGatewayCommit(t *testing.T) {
testDiscoveryGatewayTransact(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayTransact(t, func(dg *discoveryGateway, target *querypb.Target) error {
return dg.Commit(context.Background(), target, 1)
})
}

func TestDiscoveryGatewayRollback(t *testing.T) {
testDiscoveryGatewayTransact(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayTransact(t, func(dg *discoveryGateway, target *querypb.Target) error {
return dg.Rollback(context.Background(), target, 1)
})
}

func TestDiscoveryGatewayBeginExecute(t *testing.T) {
testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error {
_, _, err := dg.BeginExecute(context.Background(), target, "query", nil, nil)
return err
})
}

func TestDiscoveryGatewayBeginExecuteBatch(t *testing.T) {
testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error {
testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error {
queries := []*querypb.BoundQuery{{Sql: "query", BindVariables: nil}}
_, _, err := dg.BeginExecuteBatch(context.Background(), target, queries, false, nil)
return err
Expand All @@ -106,7 +107,7 @@ func TestDiscoveryGatewayGetTablets(t *testing.T) {
keyspace := "ks"
shard := "0"
hc := discovery.NewFakeHealthCheck()
dg := NewTabletGateway(context.Background(), hc, nil, "local", 2)
dg := NewDiscoveryGateway(context.Background(), hc, nil, "local", 2)

// replica should only use local ones
hc.Reset()
Expand Down Expand Up @@ -214,7 +215,7 @@ func TestDiscoveryGatewayGetTabletsInRegion(t *testing.T) {
Cells: []string{"local-west", "local-east"},
}

dg := NewTabletGateway(context.Background(), hc, srvTopo, "local-west", 2)
dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local-west", 2)

ts.CreateCellsAlias(context.Background(), "local", cellsAlias)

Expand Down Expand Up @@ -244,7 +245,7 @@ func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) {
Cells: []string{"local-west", "local-east"},
}

dg := NewTabletGateway(context.Background(), hc, srvTopo, "local", 2)
dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2)

ts.CreateCellsAlias(context.Background(), "local", cellsAlias)

Expand All @@ -263,7 +264,7 @@ func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) {
}
}

func testDiscoveryGatewayGeneric(t *testing.T, f func(dg *tabletGateway, target *querypb.Target) error) {
func testDiscoveryGatewayGeneric(t *testing.T, f func(dg *discoveryGateway, target *querypb.Target) error) {
keyspace := "ks"
shard := "0"
tabletType := topodatapb.TabletType_REPLICA
Expand All @@ -273,7 +274,7 @@ func testDiscoveryGatewayGeneric(t *testing.T, f func(dg *tabletGateway, target
TabletType: tabletType,
}
hc := discovery.NewFakeHealthCheck()
dg := NewTabletGateway(context.Background(), hc, nil, "cell", 2)
dg := NewDiscoveryGateway(context.Background(), hc, nil, "cell", 2)

// no tablet
hc.Reset()
Expand Down Expand Up @@ -346,7 +347,7 @@ func testDiscoveryGatewayGeneric(t *testing.T, f func(dg *tabletGateway, target
}
}

func testDiscoveryGatewayTransact(t *testing.T, f func(dg *tabletGateway, target *querypb.Target) error) {
func testDiscoveryGatewayTransact(t *testing.T, f func(dg *discoveryGateway, target *querypb.Target) error) {
keyspace := "ks"
shard := "0"
tabletType := topodatapb.TabletType_REPLICA
Expand All @@ -356,7 +357,7 @@ func testDiscoveryGatewayTransact(t *testing.T, f func(dg *tabletGateway, target
TabletType: tabletType,
}
hc := discovery.NewFakeHealthCheck()
dg := NewTabletGateway(context.Background(), hc, nil, "cell", 2)
dg := NewDiscoveryGateway(context.Background(), hc, nil, "cell", 2)

// retry error - no retry
hc.Reset()
Expand Down
Loading