Skip to content

Commit

Permalink
VReplication: throttling info for both source and target; Online DDL …
Browse files Browse the repository at this point in the history
…propagates said info (#10601)

* vreplication throttling information

Signed-off-by: Shlomi Noach <[email protected]>

* using RateLimiter

Signed-off-by: Shlomi Noach <[email protected]>

* rowstreamer reports throttling status, captured by vcopier and logged to vreplication table

Signed-off-by: Shlomi Noach <[email protected]>

* remove magic hint, add 'throttled' field

Signed-off-by: Shlomi Noach <[email protected]>

* propagate throttling information into schema_migrations table

Signed-off-by: Shlomi Noach <[email protected]>

* onlineddl/vrepl endtoend test: verify propagation of throttling info

Signed-off-by: Shlomi Noach <[email protected]>

* test vstreamer throttling

Signed-off-by: Shlomi Noach <[email protected]>

* simplify: reuse RateLimiter

Signed-off-by: Shlomi Noach <[email protected]>

* vtadmin_web_proto_types

Signed-off-by: Shlomi Noach <[email protected]>

* fix EOF return path

Signed-off-by: Shlomi Noach <[email protected]>

* expose the two new column in 'vtctl Workflow ... Show'

Signed-off-by: Shlomi Noach <[email protected]>

* adapt test queries to change in wrangler query

Signed-off-by: Shlomi Noach <[email protected]>

* fix test

Signed-off-by: Shlomi Noach <[email protected]>

* proto: add Heartbeat indicator in VStreamRowsResponse

Signed-off-by: Shlomi Noach <[email protected]>

* fix unit tests

Signed-off-by: Shlomi Noach <[email protected]>

* RateLimiter.Stop()

Signed-off-by: Shlomi Noach <[email protected]>

* rowstreamer now sends heartbeats, received by vcopier, and updated in _vt.vreplication

Signed-off-by: Shlomi Noach <[email protected]>

* whoops. Removed debug Sleep()

Signed-off-by: Shlomi Noach <[email protected]>

* make vtadmin_web_proto_types

Signed-off-by: Shlomi Noach <[email protected]>

* skip throttled events

Signed-off-by: Shlomi Noach <[email protected]>

* f allowed to be nil

Signed-off-by: Shlomi Noach <[email protected]>

* fix 'now' calculation

Signed-off-by: Shlomi Noach <[email protected]>

* remove numeric precision

Signed-off-by: Shlomi Noach <[email protected]>

* ix typo

Signed-off-by: Shlomi Noach <[email protected]>

* indicate Unix timestamp in field name

Signed-off-by: Shlomi Noach <[email protected]>

* grammar

Signed-off-by: Shlomi Noach <[email protected]>

* refactor WaitForThrottledTimestamp

Signed-off-by: Shlomi Noach <[email protected]>

* formalize ComponentName

Signed-off-by: Shlomi Noach <[email protected]>

* throttled app is vstreamer

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jul 5, 2022
1 parent 01e5005 commit 3012060
Show file tree
Hide file tree
Showing 28 changed files with 873 additions and 224 deletions.
93 changes: 71 additions & 22 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams
httpClient = throttlebase.SetupHTTPClient(time.Second)
throttlerAppName = "online-ddl"
clusterInstance *cluster.LocalProcessCluster
shards []cluster.Shard
vtParams mysql.ConnParams
httpClient = throttlebase.SetupHTTPClient(time.Second)
onlineDDLThrottlerAppName = "online-ddl"
vstreamerThrottlerAppName = "vstreamer"

normalMigrationWait = 20 * time.Second
extendedMigrationWait = 20 * time.Second
Expand Down Expand Up @@ -358,17 +360,64 @@ func TestSchemaChange(t *testing.T) {
// begin throttling:
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)

uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
testRows(t)

// gotta give the migration a few seconds to read throttling info from _vt.vreplication and write
// to _vt.schema_migrations
row, startedTimestamp, lastThrottledTimestamp := onlineddl.WaitForThrottledTimestamp(t, &vtParams, uuid, normalMigrationWait)
require.NotNil(t, row)
// vplayer and vcopier update throttle timestamp every second, so we expect the value
// to be strictly higher than started_timestamp
assert.Greater(t, lastThrottledTimestamp, startedTimestamp)
component := row.AsString("component_throttled", "")
assert.Contains(t, []string{string(vreplication.VCopierComponentName), string(vreplication.VPlayerComponentName)}, component)

// unthrottle
onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerAppName, false)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, false)

_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
})

t.Run("throttled and unthrottled migration via vstreamer", func(t *testing.T) {
insertRows(t, 2)
var uuid string

func() {
for _, shard := range shards {
// technically we only need to throttle on a REPLICA, because that's the
// vstreamer source; but it's OK to be on the safe side and throttle on all tablets. Doesn't
// change the essence of this test.
for _, tablet := range shard.Vttablets {
_, body, err := throttleApp(tablet, vstreamerThrottlerAppName)
defer unthrottleApp(tablet, vstreamerThrottlerAppName)

assert.NoError(t, err)
assert.Contains(t, body, vstreamerThrottlerAppName)
}
}

uuid = testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
testRows(t)

// gotta give the migration a few seconds to read throttling info from _vt.vreplication and write
// to _vt.schema_migrations
row, startedTimestamp, lastThrottledTimestamp := onlineddl.WaitForThrottledTimestamp(t, &vtParams, uuid, normalMigrationWait)
require.NotNil(t, row)
// rowstreamer throttle timestamp only updates once in 10 seconds, so greater or equals" is good enough here.
assert.GreaterOrEqual(t, lastThrottledTimestamp, startedTimestamp)
component := row.AsString("component_throttled", "")
assert.Contains(t, []string{string(vreplication.VStreamerComponentName), string(vreplication.RowStreamerComponentName)}, component)
}()
// now unthrottled
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
})
Expand All @@ -394,7 +443,7 @@ func TestSchemaChange(t *testing.T) {
// Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...`
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)

// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
Expand All @@ -413,7 +462,7 @@ func TestSchemaChange(t *testing.T) {
// Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...`
onlineddl.ThrottleAllMigrations(t, &vtParams)
defer onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerAppName, true)
onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true)

// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
Expand Down Expand Up @@ -445,16 +494,16 @@ func TestSchemaChange(t *testing.T) {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
_, body, err = throttleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName)
defer unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName)
_, body, err = throttleApp(shards[i].Vttablets[currentPrimaryTabletIndex], onlineDDLThrottlerAppName)
defer unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], onlineDDLThrottlerAppName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
_, body, err = throttleApp(shards[i].Vttablets[0], throttlerAppName)
defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName)
_, body, err = throttleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
defer unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
}
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
assert.Contains(t, body, onlineDDLThrottlerAppName)
}
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)

Expand Down Expand Up @@ -502,14 +551,14 @@ func TestSchemaChange(t *testing.T) {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
_, body, err = unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName)
_, body, err = unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], onlineDDLThrottlerAppName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
_, body, err = unthrottleApp(shards[i].Vttablets[0], throttlerAppName)
_, body, err = unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName)
}
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
assert.Contains(t, body, onlineDDLThrottlerAppName)
}

_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, extendedMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
Expand Down Expand Up @@ -625,11 +674,11 @@ func TestSchemaChange(t *testing.T) {
// - tablet throttling
t.Run("Revert a migration completed on one shard and cancelled on another", func(t *testing.T) {
// shard 0 will run normally, shard 1 will be throttled
defer unthrottleApp(shards[1].Vttablets[0], throttlerAppName)
defer unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
t.Run("throttle shard 1", func(t *testing.T) {
_, body, err := throttleApp(shards[1].Vttablets[0], throttlerAppName)
_, body, err := throttleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
assert.Contains(t, body, onlineDDLThrottlerAppName)
})

var uuid string
Expand All @@ -651,9 +700,9 @@ func TestSchemaChange(t *testing.T) {
onlineddl.CheckCancelAllMigrations(t, &vtParams, 1)
})
t.Run("unthrottle shard 1", func(t *testing.T) {
_, body, err := unthrottleApp(shards[1].Vttablets[0], throttlerAppName)
_, body, err := unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
assert.Contains(t, body, onlineDDLThrottlerAppName)
})
var revertUUID string
t.Run("issue revert migration", func(t *testing.T) {
Expand Down
25 changes: 25 additions & 0 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,28 @@ func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, appName string
}
assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", appName, found)
}

// WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp
func WaitForThrottledTimestamp(t *testing.T, vtParams *mysql.ConnParams, uuid string, timeout time.Duration) (
row sqltypes.RowNamedValues,
startedTimestamp string,
lastThrottledTimestamp string,
) {
startTime := time.Now()
for time.Since(startTime) < timeout {
rs := ReadMigrations(t, vtParams, uuid)
require.NotNil(t, rs)
for _, row = range rs.Named().Rows {
startedTimestamp = row.AsString("started_timestamp", "")
require.NotEmpty(t, startedTimestamp)
lastThrottledTimestamp = row.AsString("last_throttled_timestamp", "")
if lastThrottledTimestamp != "" {
// good. This is what we've been waiting for.
return row, startedTimestamp, lastThrottledTimestamp
}
}
time.Sleep(1 * time.Second)
}
t.Error("timeout waiting for last_throttled_timestamp to have nonempty value")
return
}
81 changes: 81 additions & 0 deletions go/timer/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package timer

import (
"context"
"math"
"sync"
"sync/atomic"
"time"
)

// RateLimiter runs given tasks, at no more than one per defined duration.
// For example, we can create a RateLimiter of 1second. Then, we can ask it, over time, to run many
// tasks. It will only ever run a single task in any 1 second time frame. The rest are ignored.
type RateLimiter struct {
tickerValue int64
lastDoValue int64

mu sync.Mutex
cancel context.CancelFunc
}

// NewRateLimiter creates a new limiter with given duration. It is immediately ready to run tasks.
func NewRateLimiter(d time.Duration) *RateLimiter {
r := &RateLimiter{tickerValue: 1}
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
go func() {
ticker := time.NewTicker(d)
for {
select {
case <-ctx.Done():
ticker.Stop()
case <-ticker.C:
atomic.StoreInt64(&r.tickerValue, r.tickerValue+1)
}
}
}()
return r
}

// Do runs a given func assuming rate limiting allows. This function is thread safe.
// f may be nil, in which case it is not invoked.
func (r *RateLimiter) Do(f func() error) (err error) {
r.mu.Lock()
defer r.mu.Unlock()

if r.lastDoValue >= atomic.LoadInt64(&r.tickerValue) {
return nil // rate limited. Skipped.
}
if f != nil {
err = f()
}
r.lastDoValue = atomic.LoadInt64(&r.tickerValue)
return err
}

// Stop terminates rate limiter's operation and will not allow any more Do() executions.
func (r *RateLimiter) Stop() {
r.cancel()

r.mu.Lock()
defer r.mu.Unlock()

r.lastDoValue = math.MaxInt64
}
74 changes: 74 additions & 0 deletions go/timer/rate_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2020 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package timer

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRateLimiterLong(t *testing.T) {
r := NewRateLimiter(time.Hour)
require.NotNil(t, r)
val := 0
incr := func() error { val++; return nil }
for i := 0; i < 10; i++ {
err := r.Do(incr)
assert.NoError(t, err)
}
assert.Equal(t, 1, val)
}

func TestRateLimiterShort(t *testing.T) {
r := NewRateLimiter(time.Millisecond * 250)
require.NotNil(t, r)
val := 0
incr := func() error { val++; return nil }
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond * 100)
err := r.Do(incr)
assert.NoError(t, err)
}
// we expect some 3-5 entries; this depends on the CI server performance.
assert.Greater(t, val, 2)
assert.Less(t, val, 10)
}

func TestRateLimiterStop(t *testing.T) {
r := NewRateLimiter(time.Millisecond * 10)
require.NotNil(t, r)
val := 0
incr := func() error { val++; return nil }
for i := 0; i < 5; i++ {
time.Sleep(time.Millisecond * 10)
err := r.Do(incr)
assert.NoError(t, err)
}
// we expect some 3-5 entries; this depends on the CI server performance.
assert.Greater(t, val, 2)
valSnapshot := val
r.Stop()
for i := 0; i < 5; i++ {
time.Sleep(time.Millisecond * 10)
err := r.Do(incr)
assert.NoError(t, err)
}
assert.Equal(t, valSnapshot, val)
}
10 changes: 10 additions & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ var AlterVReplicationTable = []string{
// records the time of the last heartbeat. Heartbeats are only received if the source has no recent events
"ALTER TABLE _vt.vreplication ADD COLUMN time_heartbeat BIGINT(20) NOT NULL DEFAULT 0",
"ALTER TABLE _vt.vreplication ADD COLUMN workflow_type int NOT NULL DEFAULT 0",
"ALTER TABLE _vt.vreplication ADD COLUMN time_throttled BIGINT NOT NULL DEFAULT 0",
"ALTER TABLE _vt.vreplication ADD COLUMN component_throttled VARCHAR(255) NOT NULL DEFAULT ''",
}

// WithDDLInitialQueries contains the queries that:
Expand Down Expand Up @@ -684,6 +686,14 @@ func GenerateUpdateHeartbeat(uid uint32, timeUpdated int64) (string, error) {
return fmt.Sprintf("update _vt.vreplication set time_updated=%v, time_heartbeat=%v where id=%v", timeUpdated, timeUpdated, uid), nil
}

// GenerateUpdateTimeThrottled returns a statement to record the latest throttle time in the _vt.vreplication table.
func GenerateUpdateTimeThrottled(uid uint32, timeThrottledUnix int64, componentThrottled string) (string, error) {
if timeThrottledUnix == 0 {
return "", fmt.Errorf("timeUpdated cannot be zero")
}
return fmt.Sprintf("update _vt.vreplication set time_updated=%v, time_throttled=%v, component_throttled='%v' where id=%v", timeThrottledUnix, timeThrottledUnix, componentThrottled, uid), nil
}

// StartVReplication returns a statement to start the replication.
func StartVReplication(uid uint32) string {
return fmt.Sprintf(
Expand Down
Loading

0 comments on commit 3012060

Please sign in to comment.