Skip to content

Commit

Permalink
Merge pull request #7324 from planetscale/vstreamer-throttle-source-c…
Browse files Browse the repository at this point in the history
…heck-self

vstreamer to throttle on source endpoint
  • Loading branch information
shlomi-noach authored Jan 26, 2021
2 parents 1514987 + 22088b4 commit fbb4300
Show file tree
Hide file tree
Showing 22 changed files with 732 additions and 442 deletions.
38 changes: 38 additions & 0 deletions .github/workflows/cluster_endtoend_vreplication_basic.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Cluster (vreplication_basic)
on: [push, pull_request]
jobs:

build:
name: Run endtoend tests on Cluster (vreplication_basic)
runs-on: ubuntu-latest

steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.15

- name: Check out code
uses: actions/checkout@v2

- name: Get dependencies
run: |
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get install -y gnupg2
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-24
- name: Run cluster endtoend test
timeout-minutes: 30
run: |
source build.env
eatmydata -- go run test.go -docker=false -print-log -follow -shard vreplication_basic
8 changes: 7 additions & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,14 @@ func (vc *VitessCluster) AddTablet(t *testing.T, cell *Cell, keyspace *Keyspace,
vc.Topo.Port,
globalConfig.hostname,
globalConfig.tmpDir,
[]string{"-queryserver-config-schema-reload-time", "5"}, //FIXME: for multi-cell initial schema doesn't seem to load without this
[]string{
"-queryserver-config-schema-reload-time", "5",
"-enable-lag-throttler",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
}, //FIXME: for multi-cell initial schema doesn't seem to load without "-queryserver-config-schema-reload-time"
false)

require.NotNil(t, vttablet)
vttablet.SupportsBackup = false

Expand Down
786 changes: 450 additions & 336 deletions go/test/endtoend/vreplication/vreplication_test.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func TestSchemaVersioning(t *testing.T) {
tsv.EnableHistorian(false)
tsv.SetTracking(false)
tsv.EnableHeartbeat(false)
tsv.EnableThrottler(false)
defer tsv.EnableThrottler(true)
defer tsv.EnableHeartbeat(true)
defer tsv.EnableHistorian(true)
defer tsv.SetTracking(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (ec *externalConnector) Get(name string) (*mysqlConnector, error) {
c := &mysqlConnector{}
c.env = tabletenv.NewEnv(config, name)
c.se = schema.NewEngine(c.env)
c.vstreamer = vstreamer.NewEngine(c.env, nil, c.se, "")
c.vstreamer = vstreamer.NewEngine(c.env, nil, c.se, nil, "")
c.vstreamer.InitDBConfig("")
c.se.InitDBConfig(c.env.Config().DB.AllPrivsWithDB())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestMain(m *testing.M) {

// engines cannot be initialized in testenv because it introduces
// circular dependencies.
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, env.Cells[0])
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0])
streamerEngine.InitDBConfig(env.KeyspaceName)
streamerEngine.Open()
defer streamerEngine.Close()
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/relaylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func newRelayLog(ctx context.Context, maxItems, maxSize int) *relayLog {
return rl
}

// Send writes events to the relay log
func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error {
rl.mu.Lock()
defer rl.mu.Unlock()
Expand All @@ -83,6 +84,7 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error {
return nil
}

// Fetch returns all existing items in the relay log, and empties the log
func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) {
rl.mu.Lock()
defer rl.mu.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewReplicaConnector(connParams *mysql.ConnParams) *replicaConnector {
env := tabletenv.NewEnv(config, "source")
c.se = schema.NewEngine(env)
c.se.SkipMetaCheck = true
c.vstreamer = vstreamer.NewEngine(env, nil, c.se, "")
c.vstreamer = vstreamer.NewEngine(env, nil, c.se, nil, "")
c.se.InitDBConfig(dbconfigs.New(connParams))

// Open
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
}
}

// applyStmtEvent applies an actual DML statement received from the source, directly onto the backend database
func (vp *vplayer) applyStmtEvent(ctx context.Context, event *binlogdatapb.VEvent) error {
sql := event.Statement
if sql == "" {
Expand Down
27 changes: 18 additions & 9 deletions go/vt/vttablet/tabletserver/gc/tablegc.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,21 @@ func (collector *TableGC) checkTables(ctx context.Context) error {
return nil
}

func (collector *TableGC) throttleStatusOK(ctx context.Context) bool {
if time.Since(collector.lastSuccessfulThrottleCheck) <= throttleCheckDuration {
// if last check was OK just very recently there is no need to check again
return true
}
// It's time to run a throttler check
checkResult := collector.lagThrottler.Check(ctx, throttlerAppName, "", throttleFlags)
if checkResult.StatusCode != http.StatusOK {
// sorry, we got throttled.
return false
}
collector.lastSuccessfulThrottleCheck = time.Now()
return true
}

// purge continuously purges rows from a table.
// This function is non-reentrant: there's only one instance of this function running at any given time.
// A timer keeps calling this function, so if it bails out (e.g. on error) it will later resume work
Expand Down Expand Up @@ -451,15 +466,9 @@ func (collector *TableGC) purge(ctx context.Context) (tableName string, err erro

log.Infof("TableGC: purge begin for %s", tableName)
for {
if time.Since(collector.lastSuccessfulThrottleCheck) > throttleCheckDuration {
// It's time to run a throttler check
checkResult := collector.lagThrottler.Check(ctx, throttlerAppName, "", throttleFlags)
if checkResult.StatusCode != http.StatusOK {
// sorry, we got throttled. Back off, sleep, try again
time.Sleep(throttleCheckDuration)
continue
}
collector.lastSuccessfulThrottleCheck = time.Now()
for !collector.throttleStatusOK(ctx) {
// Sorry, got throttled. Sleep some time, then check again
time.Sleep(throttleCheckDuration)
}
// OK, we're clear to go!

Expand Down
48 changes: 40 additions & 8 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,29 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to

tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(topoServer, "TabletSrvTopo") })

tabletTypeFunc := func() topodatapb.TabletType {
if tsv.sm == nil {
return topodatapb.TabletType_UNKNOWN
}
return tsv.sm.Target().TabletType
}

tsv.statelessql = NewQueryList("oltp-stateless")
tsv.statefulql = NewQueryList("oltp-stateful")
tsv.olapql = NewQueryList("olap")
tsv.lagThrottler = throttle.NewThrottler(tsv, topoServer, tabletTypeFunc)
tsv.hs = newHealthStreamer(tsv, alias)
tsv.se = schema.NewEngine(tsv)
tsv.rt = repltracker.NewReplTracker(tsv, alias)
tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, alias.Cell)
tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, tsv.lagThrottler, alias.Cell)
tsv.tracker = schema.NewTracker(tsv, tsv.vstreamer, tsv.se)
tsv.watcher = NewBinlogWatcher(tsv, tsv.vstreamer, tsv.config)
tsv.qe = NewQueryEngine(tsv, tsv.se)
tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer)
tsv.te = NewTxEngine(tsv)
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)

tabletTypeFunc := func() topodatapb.TabletType {
if tsv.sm == nil {
return topodatapb.TabletType_UNKNOWN
}
return tsv.sm.Target().TabletType
}
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tabletTypeFunc)
tsv.lagThrottler = throttle.NewThrottler(tsv, topoServer, tabletTypeFunc)
tsv.tableGC = gc.NewTableGC(tsv, topoServer, tabletTypeFunc, tsv.lagThrottler)

tsv.sm = &stateManager{
Expand Down Expand Up @@ -1621,10 +1622,34 @@ func (tsv *TabletServer) registerThrottlerStatusHandler() {
})
}

// registerThrottlerThrottleAppHandler registers a throttler "throttle-app" request
func (tsv *TabletServer) registerThrottlerThrottleAppHandler() {
tsv.exporter.HandleFunc("/throttler/throttle-app", func(w http.ResponseWriter, r *http.Request) {
appName := r.URL.Query().Get("app")
d, err := time.ParseDuration(r.URL.Query().Get("duration"))
if err != nil {
http.Error(w, fmt.Sprintf("not ok: %v", err), http.StatusInternalServerError)
return
}
appThrottle := tsv.lagThrottler.ThrottleApp(appName, time.Now().Add(d), 1)

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(appThrottle)
})
tsv.exporter.HandleFunc("/throttler/unthrottle-app", func(w http.ResponseWriter, r *http.Request) {
appName := r.URL.Query().Get("app")
appThrottle := tsv.lagThrottler.UnthrottleApp(appName)

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(appThrottle)
})
}

// registerThrottlerHandlers registers all throttler handlers
func (tsv *TabletServer) registerThrottlerHandlers() {
tsv.registerThrottlerCheckHandlers()
tsv.registerThrottlerStatusHandler()
tsv.registerThrottlerThrottleAppHandler()
}

func (tsv *TabletServer) registerDebugEnvHandler() {
Expand All @@ -1639,6 +1664,13 @@ func (tsv *TabletServer) EnableHeartbeat(enabled bool) {
tsv.rt.EnableHeartbeat(enabled)
}

// EnableThrottler forces throttler to be on or off.
// When throttler is off, it responds to all check requests with HTTP 200 OK
// Only to be used for testing.
func (tsv *TabletServer) EnableThrottler(enabled bool) {
tsv.Config().EnableLagThrottler = enabled
}

// SetTracking forces tracking to be on or off.
// Only to be used for testing.
func (tsv *TabletServer) SetTracking(enabled bool) {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/throttle/base/app_throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
// AppThrottle is the definition for an app throttling instruction
// - Ratio: [0..1], 0 == no throttle, 1 == fully throttle
type AppThrottle struct {
AppName string
ExpireAt time.Time
Ratio float64
}

// NewAppThrottle creates an AppThrottle struct
func NewAppThrottle(expireAt time.Time, ratio float64) *AppThrottle {
func NewAppThrottle(appName string, expireAt time.Time, ratio float64) *AppThrottle {
result := &AppThrottle{
AppName: appName,
ExpireAt: expireAt,
Ratio: ratio,
}
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,10 @@ func (throttler *Throttler) expireThrottledApps() {
}

// ThrottleApp instructs the throttler to begin throttling an app, to som eperiod and with some ratio.
func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, ratio float64) {
func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, ratio float64) (appThrottle *base.AppThrottle) {
throttler.throttledAppsMutex.Lock()
defer throttler.throttledAppsMutex.Unlock()

var appThrottle *base.AppThrottle
now := time.Now()
if object, found := throttler.throttledApps.Get(appName); found {
appThrottle = object.(*base.AppThrottle)
Expand All @@ -666,18 +665,20 @@ func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, rati
if ratio < 0 {
ratio = defaultThrottleRatio
}
appThrottle = base.NewAppThrottle(expireAt, ratio)
appThrottle = base.NewAppThrottle(appName, expireAt, ratio)
}
if now.Before(appThrottle.ExpireAt) {
throttler.throttledApps.Set(appName, appThrottle, cache.DefaultExpiration)
} else {
throttler.UnthrottleApp(appName)
}
return appThrottle
}

// UnthrottleApp cancels any throttling, if any, for a given app
func (throttler *Throttler) UnthrottleApp(appName string) {
func (throttler *Throttler) UnthrottleApp(appName string) (appThrottle *base.AppThrottle) {
throttler.throttledApps.Delete(appName)
return base.NewAppThrottle(appName, time.Now(), 0)
}

// IsAppThrottled tells whether some app should be throttled.
Expand Down
Loading

0 comments on commit fbb4300

Please sign in to comment.