From 3012060515af8f1997e1e8e28511115025b4372a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 5 Jul 2022 09:39:28 +0300 Subject: [PATCH] VReplication: throttling info for both source and target; Online DDL propagates said info (#10601) * vreplication throttling information Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * using RateLimiter Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * rowstreamer reports throttling status, captured by vcopier and logged to vreplication table Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * remove magic hint, add 'throttled' field Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * propagate throttling information into schema_migrations table Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * onlineddl/vrepl endtoend test: verify propagation of throttling info Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * test vstreamer throttling Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * simplify: reuse RateLimiter Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * vtadmin_web_proto_types Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * fix EOF return path Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * expose the two new column in 'vtctl Workflow ... Show' Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * adapt test queries to change in wrangler query Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * fix test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * proto: add Heartbeat indicator in VStreamRowsResponse Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * fix unit tests Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * RateLimiter.Stop() Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * rowstreamer now sends heartbeats, received by vcopier, and updated in _vt.vreplication Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * whoops. Removed debug Sleep() Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * make vtadmin_web_proto_types Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * skip throttled events Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * f allowed to be nil Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * fix 'now' calculation Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * remove numeric precision Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * ix typo Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * indicate Unix timestamp in field name Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * grammar Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * refactor WaitForThrottledTimestamp Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * formalize ComponentName Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * throttled app is vstreamer Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/vrepl/onlineddl_vrepl_test.go | 93 ++++-- go/test/endtoend/onlineddl/vtgate_util.go | 25 ++ go/timer/rate_limiter.go | 81 +++++ go/timer/rate_limiter_test.go | 74 +++++ go/vt/binlog/binlogplayer/binlog_player.go | 10 + go/vt/proto/binlogdata/binlogdata.pb.go | 292 ++++++++++-------- .../proto/binlogdata/binlogdata_vtproto.pb.go | 101 ++++++ go/vt/vttablet/endtoend/vstreamer_test.go | 3 + go/vt/vttablet/onlineddl/executor.go | 37 +++ go/vt/vttablet/onlineddl/schema.go | 17 +- go/vt/vttablet/onlineddl/vrepl.go | 13 +- .../tabletmanager/vreplication/engine_test.go | 2 + .../vreplication/framework_test.go | 8 +- .../tabletmanager/vreplication/vcopier.go | 12 +- .../tabletmanager/vreplication/vplayer.go | 22 +- .../tabletmanager/vreplication/vreplicator.go | 40 +++ .../tabletserver/vstreamer/rowstreamer.go | 35 ++- .../vstreamer/uvstreamer_flaky_test.go | 3 + .../tabletserver/vstreamer/vstreamer.go | 51 +-- .../vstreamer/vstreamer_flaky_test.go | 6 + go/vt/wrangler/traffic_switcher_env_test.go | 24 +- go/vt/wrangler/vexec.go | 35 ++- go/vt/wrangler/vexec_test.go | 4 + go/vt/wrangler/workflow_test.go | 6 +- go/vt/wrangler/wrangler_env_test.go | 12 +- proto/binlogdata.proto | 7 +- web/vtadmin/src/proto/vtadmin.d.ts | 18 ++ web/vtadmin/src/proto/vtadmin.js | 66 ++++ 28 files changed, 873 insertions(+), 224 deletions(-) create mode 100644 go/timer/rate_limiter.go create mode 100644 go/timer/rate_limiter_test.go diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index 7280231dd0a..7942d30b6b2 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -32,6 +32,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" "github.com/stretchr/testify/assert" @@ -39,11 +40,12 @@ import ( ) var ( - clusterInstance *cluster.LocalProcessCluster - shards []cluster.Shard - vtParams mysql.ConnParams - httpClient = throttlebase.SetupHTTPClient(time.Second) - throttlerAppName = "online-ddl" + clusterInstance *cluster.LocalProcessCluster + shards []cluster.Shard + vtParams mysql.ConnParams + httpClient = throttlebase.SetupHTTPClient(time.Second) + onlineDDLThrottlerAppName = "online-ddl" + vstreamerThrottlerAppName = "vstreamer" normalMigrationWait = 20 * time.Second extendedMigrationWait = 20 * time.Second @@ -358,17 +360,64 @@ func TestSchemaChange(t *testing.T) { // begin throttling: onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, throttlerAppName, true) + onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true) uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning) testRows(t) + // gotta give the migration a few seconds to read throttling info from _vt.vreplication and write + // to _vt.schema_migrations + row, startedTimestamp, lastThrottledTimestamp := onlineddl.WaitForThrottledTimestamp(t, &vtParams, uuid, normalMigrationWait) + require.NotNil(t, row) + // vplayer and vcopier update throttle timestamp every second, so we expect the value + // to be strictly higher than started_timestamp + assert.Greater(t, lastThrottledTimestamp, startedTimestamp) + component := row.AsString("component_throttled", "") + assert.Contains(t, []string{string(vreplication.VCopierComponentName), string(vreplication.VPlayerComponentName)}, component) + // unthrottle onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, throttlerAppName, false) + onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, false) + + _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + }) + + t.Run("throttled and unthrottled migration via vstreamer", func(t *testing.T) { + insertRows(t, 2) + var uuid string + func() { + for _, shard := range shards { + // technically we only need to throttle on a REPLICA, because that's the + // vstreamer source; but it's OK to be on the safe side and throttle on all tablets. Doesn't + // change the essence of this test. + for _, tablet := range shard.Vttablets { + _, body, err := throttleApp(tablet, vstreamerThrottlerAppName) + defer unthrottleApp(tablet, vstreamerThrottlerAppName) + + assert.NoError(t, err) + assert.Contains(t, body, vstreamerThrottlerAppName) + } + } + + uuid = testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) + _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning) + testRows(t) + + // gotta give the migration a few seconds to read throttling info from _vt.vreplication and write + // to _vt.schema_migrations + row, startedTimestamp, lastThrottledTimestamp := onlineddl.WaitForThrottledTimestamp(t, &vtParams, uuid, normalMigrationWait) + require.NotNil(t, row) + // rowstreamer throttle timestamp only updates once in 10 seconds, so greater or equals" is good enough here. + assert.GreaterOrEqual(t, lastThrottledTimestamp, startedTimestamp) + component := row.AsString("component_throttled", "") + assert.Contains(t, []string{string(vreplication.VStreamerComponentName), string(vreplication.RowStreamerComponentName)}, component) + }() + // now unthrottled _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) }) @@ -394,7 +443,7 @@ func TestSchemaChange(t *testing.T) { // Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...` onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, throttlerAppName, true) + onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true) // spawn n migrations; cancel them via cancel-all var wg sync.WaitGroup @@ -413,7 +462,7 @@ func TestSchemaChange(t *testing.T) { // Use VTGate for throttling, issue a `ALTER VITESS_MIGRATION THROTTLE ALL ...` onlineddl.ThrottleAllMigrations(t, &vtParams) defer onlineddl.UnthrottleAllMigrations(t, &vtParams) - onlineddl.CheckThrottledApps(t, &vtParams, throttlerAppName, true) + onlineddl.CheckThrottledApps(t, &vtParams, onlineDDLThrottlerAppName, true) // spawn n migrations; cancel them via cancel-all var wg sync.WaitGroup @@ -445,16 +494,16 @@ func TestSchemaChange(t *testing.T) { case 0: // this is the shard where we run PRS // Use per-tablet throttling API - _, body, err = throttleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName) - defer unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName) + _, body, err = throttleApp(shards[i].Vttablets[currentPrimaryTabletIndex], onlineDDLThrottlerAppName) + defer unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], onlineDDLThrottlerAppName) case 1: // no PRS on this shard // Use per-tablet throttling API - _, body, err = throttleApp(shards[i].Vttablets[0], throttlerAppName) - defer unthrottleApp(shards[i].Vttablets[0], throttlerAppName) + _, body, err = throttleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName) + defer unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName) } assert.NoError(t, err) - assert.Contains(t, body, throttlerAppName) + assert.Contains(t, body, onlineDDLThrottlerAppName) } uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true) @@ -502,14 +551,14 @@ func TestSchemaChange(t *testing.T) { case 0: // this is the shard where we run PRS // Use per-tablet throttling API - _, body, err = unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], throttlerAppName) + _, body, err = unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], onlineDDLThrottlerAppName) case 1: // no PRS on this shard // Use per-tablet throttling API - _, body, err = unthrottleApp(shards[i].Vttablets[0], throttlerAppName) + _, body, err = unthrottleApp(shards[i].Vttablets[0], onlineDDLThrottlerAppName) } assert.NoError(t, err) - assert.Contains(t, body, throttlerAppName) + assert.Contains(t, body, onlineDDLThrottlerAppName) } _ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, extendedMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) @@ -625,11 +674,11 @@ func TestSchemaChange(t *testing.T) { // - tablet throttling t.Run("Revert a migration completed on one shard and cancelled on another", func(t *testing.T) { // shard 0 will run normally, shard 1 will be throttled - defer unthrottleApp(shards[1].Vttablets[0], throttlerAppName) + defer unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName) t.Run("throttle shard 1", func(t *testing.T) { - _, body, err := throttleApp(shards[1].Vttablets[0], throttlerAppName) + _, body, err := throttleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName) assert.NoError(t, err) - assert.Contains(t, body, throttlerAppName) + assert.Contains(t, body, onlineDDLThrottlerAppName) }) var uuid string @@ -651,9 +700,9 @@ func TestSchemaChange(t *testing.T) { onlineddl.CheckCancelAllMigrations(t, &vtParams, 1) }) t.Run("unthrottle shard 1", func(t *testing.T) { - _, body, err := unthrottleApp(shards[1].Vttablets[0], throttlerAppName) + _, body, err := unthrottleApp(shards[1].Vttablets[0], onlineDDLThrottlerAppName) assert.NoError(t, err) - assert.Contains(t, body, throttlerAppName) + assert.Contains(t, body, onlineDDLThrottlerAppName) }) var revertUUID string t.Run("issue revert migration", func(t *testing.T) { diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 1034fd6f670..5a324650ed6 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -270,3 +270,28 @@ func CheckThrottledApps(t *testing.T, vtParams *mysql.ConnParams, appName string } assert.Equal(t, expectFind, found, "check app %v in throttled apps: %v", appName, found) } + +// WaitForThrottledTimestamp waits for a migration to have a non-empty last_throttled_timestamp +func WaitForThrottledTimestamp(t *testing.T, vtParams *mysql.ConnParams, uuid string, timeout time.Duration) ( + row sqltypes.RowNamedValues, + startedTimestamp string, + lastThrottledTimestamp string, +) { + startTime := time.Now() + for time.Since(startTime) < timeout { + rs := ReadMigrations(t, vtParams, uuid) + require.NotNil(t, rs) + for _, row = range rs.Named().Rows { + startedTimestamp = row.AsString("started_timestamp", "") + require.NotEmpty(t, startedTimestamp) + lastThrottledTimestamp = row.AsString("last_throttled_timestamp", "") + if lastThrottledTimestamp != "" { + // good. This is what we've been waiting for. + return row, startedTimestamp, lastThrottledTimestamp + } + } + time.Sleep(1 * time.Second) + } + t.Error("timeout waiting for last_throttled_timestamp to have nonempty value") + return +} diff --git a/go/timer/rate_limiter.go b/go/timer/rate_limiter.go new file mode 100644 index 00000000000..20304a03b3a --- /dev/null +++ b/go/timer/rate_limiter.go @@ -0,0 +1,81 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timer + +import ( + "context" + "math" + "sync" + "sync/atomic" + "time" +) + +// RateLimiter runs given tasks, at no more than one per defined duration. +// For example, we can create a RateLimiter of 1second. Then, we can ask it, over time, to run many +// tasks. It will only ever run a single task in any 1 second time frame. The rest are ignored. +type RateLimiter struct { + tickerValue int64 + lastDoValue int64 + + mu sync.Mutex + cancel context.CancelFunc +} + +// NewRateLimiter creates a new limiter with given duration. It is immediately ready to run tasks. +func NewRateLimiter(d time.Duration) *RateLimiter { + r := &RateLimiter{tickerValue: 1} + ctx, cancel := context.WithCancel(context.Background()) + r.cancel = cancel + go func() { + ticker := time.NewTicker(d) + for { + select { + case <-ctx.Done(): + ticker.Stop() + case <-ticker.C: + atomic.StoreInt64(&r.tickerValue, r.tickerValue+1) + } + } + }() + return r +} + +// Do runs a given func assuming rate limiting allows. This function is thread safe. +// f may be nil, in which case it is not invoked. +func (r *RateLimiter) Do(f func() error) (err error) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.lastDoValue >= atomic.LoadInt64(&r.tickerValue) { + return nil // rate limited. Skipped. + } + if f != nil { + err = f() + } + r.lastDoValue = atomic.LoadInt64(&r.tickerValue) + return err +} + +// Stop terminates rate limiter's operation and will not allow any more Do() executions. +func (r *RateLimiter) Stop() { + r.cancel() + + r.mu.Lock() + defer r.mu.Unlock() + + r.lastDoValue = math.MaxInt64 +} diff --git a/go/timer/rate_limiter_test.go b/go/timer/rate_limiter_test.go new file mode 100644 index 00000000000..ec70ed243d2 --- /dev/null +++ b/go/timer/rate_limiter_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timer + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRateLimiterLong(t *testing.T) { + r := NewRateLimiter(time.Hour) + require.NotNil(t, r) + val := 0 + incr := func() error { val++; return nil } + for i := 0; i < 10; i++ { + err := r.Do(incr) + assert.NoError(t, err) + } + assert.Equal(t, 1, val) +} + +func TestRateLimiterShort(t *testing.T) { + r := NewRateLimiter(time.Millisecond * 250) + require.NotNil(t, r) + val := 0 + incr := func() error { val++; return nil } + for i := 0; i < 10; i++ { + time.Sleep(time.Millisecond * 100) + err := r.Do(incr) + assert.NoError(t, err) + } + // we expect some 3-5 entries; this depends on the CI server performance. + assert.Greater(t, val, 2) + assert.Less(t, val, 10) +} + +func TestRateLimiterStop(t *testing.T) { + r := NewRateLimiter(time.Millisecond * 10) + require.NotNil(t, r) + val := 0 + incr := func() error { val++; return nil } + for i := 0; i < 5; i++ { + time.Sleep(time.Millisecond * 10) + err := r.Do(incr) + assert.NoError(t, err) + } + // we expect some 3-5 entries; this depends on the CI server performance. + assert.Greater(t, val, 2) + valSnapshot := val + r.Stop() + for i := 0; i < 5; i++ { + time.Sleep(time.Millisecond * 10) + err := r.Do(incr) + assert.NoError(t, err) + } + assert.Equal(t, valSnapshot, val) +} diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index e9b1913a68f..7cfd92fa042 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -568,6 +568,8 @@ var AlterVReplicationTable = []string{ // records the time of the last heartbeat. Heartbeats are only received if the source has no recent events "ALTER TABLE _vt.vreplication ADD COLUMN time_heartbeat BIGINT(20) NOT NULL DEFAULT 0", "ALTER TABLE _vt.vreplication ADD COLUMN workflow_type int NOT NULL DEFAULT 0", + "ALTER TABLE _vt.vreplication ADD COLUMN time_throttled BIGINT NOT NULL DEFAULT 0", + "ALTER TABLE _vt.vreplication ADD COLUMN component_throttled VARCHAR(255) NOT NULL DEFAULT ''", } // WithDDLInitialQueries contains the queries that: @@ -684,6 +686,14 @@ func GenerateUpdateHeartbeat(uid uint32, timeUpdated int64) (string, error) { return fmt.Sprintf("update _vt.vreplication set time_updated=%v, time_heartbeat=%v where id=%v", timeUpdated, timeUpdated, uid), nil } +// GenerateUpdateTimeThrottled returns a statement to record the latest throttle time in the _vt.vreplication table. +func GenerateUpdateTimeThrottled(uid uint32, timeThrottledUnix int64, componentThrottled string) (string, error) { + if timeThrottledUnix == 0 { + return "", fmt.Errorf("timeUpdated cannot be zero") + } + return fmt.Sprintf("update _vt.vreplication set time_updated=%v, time_throttled=%v, component_throttled='%v' where id=%v", timeThrottledUnix, timeThrottledUnix, componentThrottled, uid), nil +} + // StartVReplication returns a statement to start the replication. func StartVReplication(uid uint32) string { return fmt.Sprintf( diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index 8f5b12ffcd1..545e29ec8f2 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -1741,6 +1741,8 @@ type VEvent struct { Keyspace string `protobuf:"bytes,22,opt,name=keyspace,proto3" json:"keyspace,omitempty"` // the source shard Shard string `protobuf:"bytes,23,opt,name=shard,proto3" json:"shard,omitempty"` + // indicate that we are being throttled right now + Throttled bool `protobuf:"varint,24,opt,name=throttled,proto3" json:"throttled,omitempty"` } func (x *VEvent) Reset() { @@ -1866,6 +1868,13 @@ func (x *VEvent) GetShard() string { return "" } +func (x *VEvent) GetThrottled() bool { + if x != nil { + return x.Throttled + } + return false +} + type MinimalTable struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2203,6 +2212,10 @@ type VStreamRowsResponse struct { Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"` Rows []*query.Row `protobuf:"bytes,4,rep,name=rows,proto3" json:"rows,omitempty"` Lastpk *query.Row `protobuf:"bytes,5,opt,name=lastpk,proto3" json:"lastpk,omitempty"` + // Throttled indicates that rowstreamer is being throttled right now + Throttled bool `protobuf:"varint,6,opt,name=throttled,proto3" json:"throttled,omitempty"` + // Heartbeat indicates that this is a heartbeat message + Heartbeat bool `protobuf:"varint,7,opt,name=heartbeat,proto3" json:"heartbeat,omitempty"` } func (x *VStreamRowsResponse) Reset() { @@ -2272,6 +2285,20 @@ func (x *VStreamRowsResponse) GetLastpk() *query.Row { return nil } +func (x *VStreamRowsResponse) GetThrottled() bool { + if x != nil { + return x.Throttled + } + return false +} + +func (x *VStreamRowsResponse) GetHeartbeat() bool { + if x != nil { + return x.Heartbeat + } + return false +} + type LastPKEvent struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2808,7 +2835,7 @@ var file_binlogdata_proto_rawDesc = []byte{ 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x73, 0x12, 0x29, 0x0a, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x57, 0x6f, 0x72, - 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x22, 0xed, 0x03, 0x0a, 0x06, 0x56, 0x45, 0x76, 0x65, 0x6e, + 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x22, 0x8b, 0x04, 0x0a, 0x06, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2a, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x16, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x1c, 0x0a, @@ -2839,138 +2866,143 @@ var file_binlogdata_proto_rawDesc = []byte{ 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x6b, 0x65, 0x79, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x16, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6b, 0x65, 0x79, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x17, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x22, 0x68, 0x0a, 0x0c, 0x4d, 0x69, 0x6e, 0x69, 0x6d, 0x61, - 0x6c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, - 0x65, 0x6c, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, + 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x68, 0x72, 0x6f, 0x74, 0x74, + 0x6c, 0x65, 0x64, 0x18, 0x18, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x74, 0x68, 0x72, 0x6f, 0x74, + 0x74, 0x6c, 0x65, 0x64, 0x22, 0x68, 0x0a, 0x0c, 0x4d, 0x69, 0x6e, 0x69, 0x6d, 0x61, 0x6c, 0x54, + 0x61, 0x62, 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, + 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, + 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x1e, + 0x0a, 0x0b, 0x70, 0x5f, 0x6b, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x03, 0x20, + 0x03, 0x28, 0x03, 0x52, 0x09, 0x70, 0x4b, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x22, 0x41, + 0x0a, 0x0d, 0x4d, 0x69, 0x6e, 0x69, 0x6d, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x12, + 0x30, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x18, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x4d, 0x69, 0x6e, + 0x69, 0x6d, 0x61, 0x6c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, + 0x73, 0x22, 0xc7, 0x02, 0x0a, 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, 0x13, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, + 0x65, 0x5f, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, + 0x49, 0x44, 0x52, 0x11, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x43, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x45, 0x0a, 0x13, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, 0x61, + 0x74, 0x65, 0x5f, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x56, 0x54, 0x47, 0x61, 0x74, + 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x11, 0x69, 0x6d, 0x6d, 0x65, 0x64, + 0x69, 0x61, 0x74, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x06, + 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x71, + 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x06, 0x74, 0x61, 0x72, + 0x67, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, + 0x2a, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x46, 0x69, 0x6c, + 0x74, 0x65, 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x3e, 0x0a, 0x0f, 0x74, + 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x5f, 0x6b, 0x73, 0x18, 0x06, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, + 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x52, 0x0c, 0x74, + 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x73, 0x22, 0x3d, 0x0a, 0x0f, 0x56, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, + 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, + 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x85, 0x02, 0x0a, 0x12, 0x56, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x3f, 0x0a, 0x13, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x63, + 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, + 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, + 0x11, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, + 0x49, 0x64, 0x12, 0x45, 0x0a, 0x13, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, 0x65, 0x5f, + 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x15, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x56, 0x54, 0x47, 0x61, 0x74, 0x65, 0x43, 0x61, + 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x11, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, + 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x06, 0x74, 0x61, 0x72, + 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x71, 0x75, 0x65, 0x72, + 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, + 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x2a, 0x0a, 0x06, 0x6c, 0x61, 0x73, 0x74, 0x70, 0x6b, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x51, + 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x6c, 0x61, 0x73, 0x74, + 0x70, 0x6b, 0x22, 0xf9, 0x01, 0x0a, 0x13, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x6f, + 0x77, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, + 0x65, 0x6c, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, - 0x12, 0x1e, 0x0a, 0x0b, 0x70, 0x5f, 0x6b, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, - 0x03, 0x20, 0x03, 0x28, 0x03, 0x52, 0x09, 0x70, 0x4b, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, - 0x22, 0x41, 0x0a, 0x0d, 0x4d, 0x69, 0x6e, 0x69, 0x6d, 0x61, 0x6c, 0x53, 0x63, 0x68, 0x65, 0x6d, - 0x61, 0x12, 0x30, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x18, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x4d, - 0x69, 0x6e, 0x69, 0x6d, 0x61, 0x6c, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x06, 0x74, 0x61, 0x62, - 0x6c, 0x65, 0x73, 0x22, 0xc7, 0x02, 0x0a, 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, 0x13, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, - 0x69, 0x76, 0x65, 0x5f, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, - 0x65, 0x72, 0x49, 0x44, 0x52, 0x11, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x43, - 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x45, 0x0a, 0x13, 0x69, 0x6d, 0x6d, 0x65, 0x64, - 0x69, 0x61, 0x74, 0x65, 0x5f, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x56, 0x54, 0x47, - 0x61, 0x74, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x11, 0x69, 0x6d, 0x6d, - 0x65, 0x64, 0x69, 0x61, 0x74, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x25, - 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, - 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x06, 0x74, - 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x12, 0x2a, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x46, - 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x3e, 0x0a, - 0x0f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x5f, 0x6b, 0x73, - 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, - 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x52, - 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x73, 0x22, 0x3d, 0x0a, - 0x0f, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x2a, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x85, 0x02, 0x0a, - 0x12, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, 0x13, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, - 0x5f, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, - 0x44, 0x52, 0x11, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x43, 0x61, 0x6c, 0x6c, - 0x65, 0x72, 0x49, 0x64, 0x12, 0x45, 0x0a, 0x13, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, - 0x65, 0x5f, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x15, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x56, 0x54, 0x47, 0x61, 0x74, 0x65, - 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x11, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, - 0x61, 0x74, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x06, 0x74, - 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x71, 0x75, - 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, - 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x2a, 0x0a, 0x06, 0x6c, 0x61, 0x73, 0x74, - 0x70, 0x6b, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, - 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x6c, 0x61, - 0x73, 0x74, 0x70, 0x6b, 0x22, 0xbd, 0x01, 0x0a, 0x13, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x52, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x24, 0x0a, 0x06, + 0x12, 0x28, 0x0a, 0x08, 0x70, 0x6b, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, + 0x52, 0x08, 0x70, 0x6b, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x67, 0x74, + 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x67, 0x74, 0x69, 0x64, 0x12, 0x1e, + 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x71, + 0x75, 0x65, 0x72, 0x79, 0x2e, 0x52, 0x6f, 0x77, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x12, 0x22, + 0x0a, 0x06, 0x6c, 0x61, 0x73, 0x74, 0x70, 0x6b, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, + 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x52, 0x6f, 0x77, 0x52, 0x06, 0x6c, 0x61, 0x73, 0x74, + 0x70, 0x6b, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x74, 0x68, 0x72, 0x6f, 0x74, 0x74, 0x6c, 0x65, 0x64, + 0x12, 0x1c, 0x0a, 0x09, 0x68, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x18, 0x07, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x09, 0x68, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x22, 0x69, + 0x0a, 0x0b, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x3c, 0x0a, + 0x0e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x70, 0x5f, 0x6b, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, + 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x52, 0x0b, + 0x74, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x12, 0x1c, 0x0a, 0x09, 0x63, + 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, + 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x22, 0x58, 0x0a, 0x0b, 0x54, 0x61, 0x62, + 0x6c, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x61, 0x62, 0x6c, + 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x74, 0x61, + 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x6c, 0x61, 0x73, 0x74, 0x70, + 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, + 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x6c, 0x61, 0x73, + 0x74, 0x70, 0x6b, 0x22, 0xdc, 0x01, 0x0a, 0x15, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, + 0x13, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x63, 0x61, 0x6c, 0x6c, 0x65, + 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, + 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x11, 0x65, 0x66, 0x66, + 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x45, + 0x0a, 0x13, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, 0x65, 0x5f, 0x63, 0x61, 0x6c, 0x6c, + 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x71, 0x75, + 0x65, 0x72, 0x79, 0x2e, 0x56, 0x54, 0x47, 0x61, 0x74, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, + 0x49, 0x44, 0x52, 0x11, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, 0x65, 0x43, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, + 0x72, 0x67, 0x65, 0x74, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, + 0x72, 0x79, 0x22, 0x72, 0x0a, 0x16, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, - 0x64, 0x73, 0x12, 0x28, 0x0a, 0x08, 0x70, 0x6b, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x02, - 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, - 0x6c, 0x64, 0x52, 0x08, 0x70, 0x6b, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, - 0x67, 0x74, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x67, 0x74, 0x69, 0x64, - 0x12, 0x1e, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0a, - 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x52, 0x6f, 0x77, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, - 0x12, 0x22, 0x0a, 0x06, 0x6c, 0x61, 0x73, 0x74, 0x70, 0x6b, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x0a, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x52, 0x6f, 0x77, 0x52, 0x06, 0x6c, 0x61, - 0x73, 0x74, 0x70, 0x6b, 0x22, 0x69, 0x0a, 0x0b, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x12, 0x3c, 0x0a, 0x0e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6c, 0x61, 0x73, - 0x74, 0x5f, 0x70, 0x5f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x62, 0x69, - 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x61, - 0x73, 0x74, 0x50, 0x4b, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x50, - 0x4b, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x22, - 0x58, 0x0a, 0x0b, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x50, 0x4b, 0x12, 0x1d, - 0x0a, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x09, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2a, 0x0a, - 0x06, 0x6c, 0x61, 0x73, 0x74, 0x70, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, - 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, - 0x74, 0x52, 0x06, 0x6c, 0x61, 0x73, 0x74, 0x70, 0x6b, 0x22, 0xdc, 0x01, 0x0a, 0x15, 0x56, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, 0x13, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, - 0x5f, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, - 0x44, 0x52, 0x11, 0x65, 0x66, 0x66, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x43, 0x61, 0x6c, 0x6c, - 0x65, 0x72, 0x49, 0x64, 0x12, 0x45, 0x0a, 0x13, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, 0x61, 0x74, - 0x65, 0x5f, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x15, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x56, 0x54, 0x47, 0x61, 0x74, 0x65, - 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x11, 0x69, 0x6d, 0x6d, 0x65, 0x64, 0x69, - 0x61, 0x74, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x06, 0x74, - 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x71, 0x75, - 0x65, 0x72, 0x79, 0x2e, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x52, 0x06, 0x74, 0x61, 0x72, 0x67, - 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x22, 0x72, 0x0a, 0x16, 0x56, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, - 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x67, 0x74, 0x69, 0x64, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x67, 0x74, 0x69, 0x64, 0x12, 0x1e, 0x0a, 0x04, - 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x71, 0x75, 0x65, - 0x72, 0x79, 0x2e, 0x52, 0x6f, 0x77, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x2a, 0x3e, 0x0a, 0x0b, - 0x4f, 0x6e, 0x44, 0x44, 0x4c, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x49, - 0x47, 0x4e, 0x4f, 0x52, 0x45, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x54, 0x4f, 0x50, 0x10, - 0x01, 0x12, 0x08, 0x0a, 0x04, 0x45, 0x58, 0x45, 0x43, 0x10, 0x02, 0x12, 0x0f, 0x0a, 0x0b, 0x45, - 0x58, 0x45, 0x43, 0x5f, 0x49, 0x47, 0x4e, 0x4f, 0x52, 0x45, 0x10, 0x03, 0x2a, 0x7b, 0x0a, 0x18, - 0x56, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x57, 0x6f, 0x72, 0x6b, - 0x66, 0x6c, 0x6f, 0x77, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x4d, 0x41, 0x54, 0x45, - 0x52, 0x49, 0x41, 0x4c, 0x49, 0x5a, 0x45, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x4d, 0x4f, 0x56, - 0x45, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, 0x10, 0x01, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x52, 0x45, - 0x41, 0x54, 0x45, 0x4c, 0x4f, 0x4f, 0x4b, 0x55, 0x50, 0x49, 0x4e, 0x44, 0x45, 0x58, 0x10, 0x02, - 0x12, 0x0b, 0x0a, 0x07, 0x4d, 0x49, 0x47, 0x52, 0x41, 0x54, 0x45, 0x10, 0x03, 0x12, 0x0b, 0x0a, - 0x07, 0x52, 0x45, 0x53, 0x48, 0x41, 0x52, 0x44, 0x10, 0x04, 0x12, 0x0d, 0x0a, 0x09, 0x4f, 0x4e, - 0x4c, 0x49, 0x4e, 0x45, 0x44, 0x44, 0x4c, 0x10, 0x05, 0x2a, 0xf9, 0x01, 0x0a, 0x0a, 0x56, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, - 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x47, 0x54, 0x49, 0x44, 0x10, 0x01, 0x12, - 0x09, 0x0a, 0x05, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x4f, - 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, - 0x43, 0x4b, 0x10, 0x04, 0x12, 0x07, 0x0a, 0x03, 0x44, 0x44, 0x4c, 0x10, 0x05, 0x12, 0x0a, 0x0a, - 0x06, 0x49, 0x4e, 0x53, 0x45, 0x52, 0x54, 0x10, 0x06, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x50, - 0x4c, 0x41, 0x43, 0x45, 0x10, 0x07, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, - 0x10, 0x08, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x09, 0x12, 0x07, - 0x0a, 0x03, 0x53, 0x45, 0x54, 0x10, 0x0a, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x54, 0x48, 0x45, 0x52, - 0x10, 0x0b, 0x12, 0x07, 0x0a, 0x03, 0x52, 0x4f, 0x57, 0x10, 0x0c, 0x12, 0x09, 0x0a, 0x05, 0x46, - 0x49, 0x45, 0x4c, 0x44, 0x10, 0x0d, 0x12, 0x0d, 0x0a, 0x09, 0x48, 0x45, 0x41, 0x52, 0x54, 0x42, - 0x45, 0x41, 0x54, 0x10, 0x0e, 0x12, 0x09, 0x0a, 0x05, 0x56, 0x47, 0x54, 0x49, 0x44, 0x10, 0x0f, - 0x12, 0x0b, 0x0a, 0x07, 0x4a, 0x4f, 0x55, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x10, 0x12, 0x0b, 0x0a, - 0x07, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x11, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, - 0x53, 0x54, 0x50, 0x4b, 0x10, 0x12, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x41, 0x56, 0x45, 0x50, 0x4f, - 0x49, 0x4e, 0x54, 0x10, 0x13, 0x2a, 0x27, 0x0a, 0x0d, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, - 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x48, 0x41, 0x52, 0x44, 0x53, 0x10, 0x01, 0x42, 0x29, - 0x5a, 0x27, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, - 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, - 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x64, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x67, 0x74, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x67, 0x74, 0x69, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x04, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x52, 0x6f, 0x77, + 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x2a, 0x3e, 0x0a, 0x0b, 0x4f, 0x6e, 0x44, 0x44, 0x4c, 0x41, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0a, 0x0a, 0x06, 0x49, 0x47, 0x4e, 0x4f, 0x52, 0x45, 0x10, + 0x00, 0x12, 0x08, 0x0a, 0x04, 0x53, 0x54, 0x4f, 0x50, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x45, + 0x58, 0x45, 0x43, 0x10, 0x02, 0x12, 0x0f, 0x0a, 0x0b, 0x45, 0x58, 0x45, 0x43, 0x5f, 0x49, 0x47, + 0x4e, 0x4f, 0x52, 0x45, 0x10, 0x03, 0x2a, 0x7b, 0x0a, 0x18, 0x56, 0x52, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x54, 0x79, + 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x4d, 0x41, 0x54, 0x45, 0x52, 0x49, 0x41, 0x4c, 0x49, 0x5a, + 0x45, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x4d, 0x4f, 0x56, 0x45, 0x54, 0x41, 0x42, 0x4c, 0x45, + 0x53, 0x10, 0x01, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x4c, 0x4f, 0x4f, + 0x4b, 0x55, 0x50, 0x49, 0x4e, 0x44, 0x45, 0x58, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x4d, 0x49, + 0x47, 0x52, 0x41, 0x54, 0x45, 0x10, 0x03, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x53, 0x48, 0x41, + 0x52, 0x44, 0x10, 0x04, 0x12, 0x0d, 0x0a, 0x09, 0x4f, 0x4e, 0x4c, 0x49, 0x4e, 0x45, 0x44, 0x44, + 0x4c, 0x10, 0x05, 0x2a, 0xf9, 0x01, 0x0a, 0x0a, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, + 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, + 0x08, 0x0a, 0x04, 0x47, 0x54, 0x49, 0x44, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x45, 0x47, + 0x49, 0x4e, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, + 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x4f, 0x4c, 0x4c, 0x42, 0x41, 0x43, 0x4b, 0x10, 0x04, 0x12, 0x07, + 0x0a, 0x03, 0x44, 0x44, 0x4c, 0x10, 0x05, 0x12, 0x0a, 0x0a, 0x06, 0x49, 0x4e, 0x53, 0x45, 0x52, + 0x54, 0x10, 0x06, 0x12, 0x0b, 0x0a, 0x07, 0x52, 0x45, 0x50, 0x4c, 0x41, 0x43, 0x45, 0x10, 0x07, + 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x10, 0x08, 0x12, 0x0a, 0x0a, 0x06, + 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x10, 0x09, 0x12, 0x07, 0x0a, 0x03, 0x53, 0x45, 0x54, 0x10, + 0x0a, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x54, 0x48, 0x45, 0x52, 0x10, 0x0b, 0x12, 0x07, 0x0a, 0x03, + 0x52, 0x4f, 0x57, 0x10, 0x0c, 0x12, 0x09, 0x0a, 0x05, 0x46, 0x49, 0x45, 0x4c, 0x44, 0x10, 0x0d, + 0x12, 0x0d, 0x0a, 0x09, 0x48, 0x45, 0x41, 0x52, 0x54, 0x42, 0x45, 0x41, 0x54, 0x10, 0x0e, 0x12, + 0x09, 0x0a, 0x05, 0x56, 0x47, 0x54, 0x49, 0x44, 0x10, 0x0f, 0x12, 0x0b, 0x0a, 0x07, 0x4a, 0x4f, + 0x55, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x10, 0x12, 0x0b, 0x0a, 0x07, 0x56, 0x45, 0x52, 0x53, 0x49, + 0x4f, 0x4e, 0x10, 0x11, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, 0x53, 0x54, 0x50, 0x4b, 0x10, 0x12, + 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x41, 0x56, 0x45, 0x50, 0x4f, 0x49, 0x4e, 0x54, 0x10, 0x13, 0x2a, + 0x27, 0x0a, 0x0d, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, + 0x53, 0x48, 0x41, 0x52, 0x44, 0x53, 0x10, 0x01, 0x42, 0x29, 0x5a, 0x27, 0x76, 0x69, 0x74, 0x65, + 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, + 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, + 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go b/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go index 60ba2fd9d62..cda4788cb6f 100644 --- a/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go +++ b/go/vt/proto/binlogdata/binlogdata_vtproto.pb.go @@ -1192,6 +1192,18 @@ func (m *VEvent) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Throttled { + i-- + if m.Throttled { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0xc0 + } if len(m.Shard) > 0 { i -= len(m.Shard) copy(dAtA[i:], m.Shard) @@ -1668,6 +1680,26 @@ func (m *VStreamRowsResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.Heartbeat { + i-- + if m.Heartbeat { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x38 + } + if m.Throttled { + i-- + if m.Throttled { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x30 + } if m.Lastpk != nil { size, err := m.Lastpk.MarshalToSizedBufferVT(dAtA[:i]) if err != nil { @@ -2552,6 +2584,9 @@ func (m *VEvent) SizeVT() (n int) { if l > 0 { n += 2 + l + sov(uint64(l)) } + if m.Throttled { + n += 3 + } if m.unknownFields != nil { n += len(m.unknownFields) } @@ -2725,6 +2760,12 @@ func (m *VStreamRowsResponse) SizeVT() (n int) { l = m.Lastpk.SizeVT() n += 1 + l + sov(uint64(l)) } + if m.Throttled { + n += 2 + } + if m.Heartbeat { + n += 2 + } if m.unknownFields != nil { n += len(m.unknownFields) } @@ -6368,6 +6409,26 @@ func (m *VEvent) UnmarshalVT(dAtA []byte) error { } m.Shard = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 24: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Throttled", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Throttled = bool(v != 0) default: iNdEx = preIndex skippy, err := skip(dAtA[iNdEx:]) @@ -7461,6 +7522,46 @@ func (m *VStreamRowsResponse) UnmarshalVT(dAtA []byte) error { return err } iNdEx = postIndex + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Throttled", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Throttled = bool(v != 0) + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Heartbeat", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Heartbeat = bool(v != 0) default: iNdEx = preIndex skippy, err := skip(dAtA[iNdEx:]) diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index d56137eca9e..7b6251f3281 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -369,6 +369,9 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan [] if ev.Type == binlogdatapb.VEventType_HEARTBEAT { continue } + if ev.Throttled { + continue + } // Also ignore begin/commit to reduce list of events to expect, for readability ... if ev.Type == binlogdatapb.VEventType_BEGIN { continue diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 18078318f95..e33d507bdc1 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -132,6 +132,7 @@ var ( onlineDDLUser = "vt-online-ddl-internal" onlineDDLGrant = fmt.Sprintf("'%s'@'%s'", onlineDDLUser, "%") throttlerOnlineDDLApp = "online-ddl" + throttleCheckFlags = &throttle.CheckFlags{} ) type mysqlVariables struct { @@ -3018,6 +3019,8 @@ func (e *Executor) readVReplStream(ctx context.Context, uuid string, okIfMissing pos: row.AsString("pos", ""), timeUpdated: row.AsInt64("time_updated", 0), timeHeartbeat: row.AsInt64("time_heartbeat", 0), + timeThrottled: row.AsInt64("time_throttled", 0), + componentThrottled: row.AsString("component_throttled", ""), transactionTimestamp: row.AsInt64("transaction_timestamp", 0), state: row.AsString("state", ""), message: row.AsString("message", ""), @@ -3128,6 +3131,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i } } + var throttlerOnce sync.Once r, err := e.execQuery(ctx, sqlSelectRunningMigrations) if err != nil { return countRunnning, cancellable, err @@ -3198,6 +3202,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i _ = e.updateRowsCopied(ctx, uuid, s.rowsCopied) _ = e.updateMigrationProgressByRowsCopied(ctx, uuid, s.rowsCopied) _ = e.updateMigrationETASecondsByProgress(ctx, uuid) + _ = e.updateMigrationLastThrottled(ctx, uuid, s.timeThrottled, s.componentThrottled) isReady, err := e.isVReplMigrationReadyToCutOver(ctx, s) if err != nil { @@ -3223,6 +3228,25 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i return countRunnning, cancellable, err } } + go throttlerOnce.Do(func() { + if e.lagThrottler.CheckIsReady() != nil { + return + } + // Self healing: in the following scenario: + // - a vitess migration + // - with on demand heartbeats + // - the streamer running on a replica + // - the streamer was throttled for long enough + // - then vplayer and vcopier are locked, waiting for the streamer to do something + // - since they are blocked, they're not running throttler checks + // - since streamer runs on replica, it only checks that replica + // - therefore no one asking for on-demand heartbeats + // - then, if the conditions for the streamer's throttling are done, the streamer then thinks there's replication lag, with nothing to remediate it. + // - it's a deadlock. + // And so, once per reviewRunningMigrations(), and assuming there _are_ running migrations, we ensure to hit a throttler check. This will kick + // on-demand heartbeats, unlocking the deadlock. + e.lagThrottler.CheckByType(ctx, throttlerOnlineDDLApp, "", throttleCheckFlags, throttle.ThrottleCheckPrimaryWrite) + }) } } case schema.DDLStrategyPTOSC: @@ -3759,6 +3783,19 @@ func (e *Executor) updateMigrationETASecondsByProgress(ctx context.Context, uuid return err } +func (e *Executor) updateMigrationLastThrottled(ctx context.Context, uuid string, lastThrottledUnixTime int64, throttledCompnent string) error { + query, err := sqlparser.ParseAndBind(sqlUpdateLastThrottled, + sqltypes.Int64BindVariable(lastThrottledUnixTime), + sqltypes.StringBindVariable(throttledCompnent), + sqltypes.StringBindVariable(uuid), + ) + if err != nil { + return err + } + _, err = e.execQuery(ctx, query) + return err +} + func (e *Executor) updateMigrationTableRows(ctx context.Context, uuid string, tableRows int64) error { query, err := sqlparser.ParseAndBind(sqlUpdateMigrationTableRows, sqltypes.Int64BindVariable(tableRows), diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index ac9fff7b93b..6ae1728e8b1 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -77,6 +77,8 @@ const ( alterSchemaMigrationsTableVreplLivenessIndicator = "ALTER TABLE _vt.schema_migrations add column vitess_liveness_indicator bigint NOT NULL DEFAULT 0" alterSchemaMigrationsTableUserThrottleRatio = "ALTER TABLE _vt.schema_migrations add column user_throttle_ratio float NOT NULL DEFAULT 0" alterSchemaMigrationsTableSpecialPlan = "ALTER TABLE _vt.schema_migrations add column special_plan text NOT NULL" + alterSchemaMigrationsLastThrottled = "ALTER TABLE _vt.schema_migrations add column last_throttled_timestamp timestamp NULL DEFAULT NULL" + alterSchemaMigrationsComponentThrottled = "ALTER TABLE _vt.schema_migrations add column component_throttled tinytext NOT NULL" sqlInsertMigration = `INSERT IGNORE INTO _vt.schema_migrations ( migration_uuid, @@ -98,7 +100,7 @@ const ( reverted_uuid, is_view ) VALUES ( - %a, %a, %a, %a, %a, %a, %a, %a, %a, FROM_UNIXTIME(NOW()), %a, %a, %a, %a, %a, %a, %a, %a + %a, %a, %a, %a, %a, %a, %a, %a, %a, NOW(), %a, %a, %a, %a, %a, %a, %a, %a )` sqlSelectQueuedMigrations = `SELECT @@ -255,6 +257,11 @@ const ( WHERE migration_uuid=%a ` + sqlUpdateLastThrottled = `UPDATE _vt.schema_migrations + SET last_throttled_timestamp=FROM_UNIXTIME(%a), component_throttled=%a + WHERE + migration_uuid=%a + ` sqlRetryMigrationWhere = `UPDATE _vt.schema_migrations SET migration_status='queued', @@ -401,6 +408,9 @@ const ( stowaway_table, rows_copied, vitess_liveness_indicator, + user_throttle_ratio, + last_throttled_timestamp, + component_throttled, postpone_completion FROM _vt.schema_migrations WHERE @@ -536,6 +546,9 @@ const ( pos, time_updated, transaction_timestamp, + time_heartbeat, + time_throttled, + component_throttled, state, message, rows_copied @@ -612,4 +625,6 @@ var ApplyDDL = []string{ alterSchemaMigrationsTableVreplLivenessIndicator, alterSchemaMigrationsTableUserThrottleRatio, alterSchemaMigrationsTableSpecialPlan, + alterSchemaMigrationsLastThrottled, + alterSchemaMigrationsComponentThrottled, } diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index 5a379e196dc..905f53f92b9 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -52,6 +52,8 @@ type VReplStream struct { pos string timeUpdated int64 timeHeartbeat int64 + timeThrottled int64 + componentThrottled string transactionTimestamp int64 state string message string @@ -60,13 +62,12 @@ type VReplStream struct { } // livenessTimeIndicator returns a time indicator for last known healthy state. -// vreplication uses two indicators: time_updates and time_heartbeat. Either one making progress is good news. The greater of the two indicates the -// latest progress. Note that both indicate timestamp of events in the binary log stream, rather than time "now". -// A vreplication stream health is determined by "is there any progress in either of the two counters in the past X minutes" +// vreplication uses three indicators: +// - transaction_timestamp +// - time_heartbeat +// - time_throttled. +// Updating any of them, also updates time_updated, indicating liveness. func (v *VReplStream) livenessTimeIndicator() int64 { - if v.timeHeartbeat > v.timeUpdated { - return v.timeHeartbeat - } return v.timeUpdated } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go index ddf2c280a40..2ceed4b5900 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go @@ -508,6 +508,8 @@ func TestCreateDBAndTable(t *testing.T) { "ALTER TABLE _vt.vreplication ADD COLUMN tags.*", "ALTER TABLE _vt.vreplication ADD COLUMN time_heartbeat.*", "ALTER TABLE _vt.vreplication ADD COLUMN workflow_type int NOT NULL DEFAULT 0", + "ALTER TABLE _vt.vreplication ADD COLUMN time_throttled.*", + "ALTER TABLE _vt.vreplication ADD COLUMN component_throttled.*", "create table if not exists _vt.resharding_journal.*", "create table if not exists _vt.copy_state.*", } diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 80aeb1f36f8..dc78f92c10c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -477,9 +477,11 @@ func expectLogsAndUnsubscribe(t *testing.T, logs []LogExpectation, logCh chan an func shouldIgnoreQuery(query string) bool { queriesToIgnore := []string{ - "_vt.vreplication_log", // ignore all selects, updates and inserts into this table - "@@session.sql_mode", // ignore all selects, and sets of this variable - ", time_heartbeat=", // update of last heartbeat time, can happen out-of-band, so can't test for it + "_vt.vreplication_log", // ignore all selects, updates and inserts into this table + "@@session.sql_mode", // ignore all selects, and sets of this variable + ", time_heartbeat=", // update of last heartbeat time, can happen out-of-band, so can't test for it + ", time_throttled=", // update of last throttle time, can happen out-of-band, so can't test for it + ", component_throttled=", // update of last throttle time, can happen out-of-band, so can't test for it "context cancel", } for _, q := range queriesToIgnore { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index ef8a40b70e9..7a08b3cb510 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -238,9 +238,19 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma return io.EOF default: } + if rows.Throttled { + _ = vc.vr.updateTimeThrottled(RowStreamerComponentName) + return nil + } + if rows.Heartbeat { + _ = vc.vr.updateHeartbeatTime(time.Now().Unix()) + return nil + } // verify throttler is happy, otherwise keep looping if vc.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vc.throttlerAppName) { - break + break // out of 'for' loop + } else { // we're throttled + _ = vc.vr.updateTimeThrottled(VCopierComponentName) } } if vc.tablePlan == nil { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index d214b6fa3d2..aa862513076 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -254,17 +254,6 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { return posReached, nil } -func (vp *vplayer) updateHeartbeat(tm int64) error { - update, err := binlogplayer.GenerateUpdateHeartbeat(vp.vr.id, tm) - if err != nil { - return err - } - if _, err := withDDL.Exec(vp.vr.vre.ctx, update, vp.vr.dbClient.ExecuteFetch, vp.vr.dbClient.ExecuteFetch); err != nil { - return fmt.Errorf("error %v updating time", err) - } - return nil -} - func (vp *vplayer) mustUpdateHeartbeat() bool { return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval || vp.numAccumulatedHeartbeats >= vreplicationMinimumHeartbeatUpdateInterval @@ -277,7 +266,7 @@ func (vp *vplayer) recordHeartbeat() error { return nil } vp.numAccumulatedHeartbeats = 0 - return vp.updateHeartbeat(tm) + return vp.vr.updateHeartbeatTime(tm) } // applyEvents is the main thread that applies the events. It has the following use @@ -343,6 +332,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { } // check throttler. if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, vp.throttlerAppName) { + _ = vp.vr.updateTimeThrottled(VPlayerComponentName) continue } @@ -633,10 +623,14 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m stats.Send(fmt.Sprintf("%v", event.Journal)) return io.EOF case binlogdatapb.VEventType_HEARTBEAT: + if event.Throttled { + if err := vp.vr.updateTimeThrottled(VStreamerComponentName); err != nil { + return err + } + } if !vp.vr.dbClient.InTransaction { vp.numAccumulatedHeartbeats++ - err := vp.recordHeartbeat() - if err != nil { + if err := vp.recordHeartbeat(); err != nil { return err } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index b9e9602531c..e736885a6f8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -25,6 +25,7 @@ import ( "strings" "time" + "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/sqlparser" querypb "vitess.io/vitess/go/vt/proto/query" @@ -87,6 +88,15 @@ const ( setSQLModeQueryf = `SET @@session.sql_mode='%s'` ) +type ComponentName string + +const ( + VPlayerComponentName ComponentName = "vplayer" + VCopierComponentName ComponentName = "vcopier" + VStreamerComponentName ComponentName = "vstreamer" + RowStreamerComponentName ComponentName = "rowstreamer" +) + // vreplicator provides the core logic to start vreplication streams type vreplicator struct { vre *Engine @@ -106,6 +116,8 @@ type vreplicator struct { WorkflowType int64 WorkflowName string + + throttleUpdatesRateLimiter *timer.RateLimiter } // newVReplicator creates a new vreplicator. The valid fields from the source are: @@ -142,6 +154,8 @@ func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreame stats: stats, dbClient: newVDBClient(dbClient, stats), mysqld: mysqld, + + throttleUpdatesRateLimiter: timer.NewRateLimiter(time.Second), } } @@ -502,6 +516,32 @@ func (vr *vreplicator) throttlerAppName() string { return strings.Join(names, ":") } +func (vr *vreplicator) updateTimeThrottled(componentThrottled ComponentName) error { + err := vr.throttleUpdatesRateLimiter.Do(func() error { + tm := time.Now().Unix() + update, err := binlogplayer.GenerateUpdateTimeThrottled(vr.id, tm, string(componentThrottled)) + if err != nil { + return err + } + if _, err := withDDL.Exec(vr.vre.ctx, update, vr.dbClient.ExecuteFetch, vr.dbClient.ExecuteFetch); err != nil { + return fmt.Errorf("error %v updating time throttled", err) + } + return nil + }) + return err +} + +func (vr *vreplicator) updateHeartbeatTime(tm int64) error { + update, err := binlogplayer.GenerateUpdateHeartbeat(vr.id, tm) + if err != nil { + return err + } + if _, err := withDDL.Exec(vr.vre.ctx, update, vr.dbClient.ExecuteFetch, vr.dbClient.ExecuteFetch); err != nil { + return fmt.Errorf("error %v updating time", err) + } + return nil +} + func (vr *vreplicator) clearFKCheck() error { _, err := vr.dbClient.Execute("set foreign_key_checks=0;") return err diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index c0042867e92..ea7b85b63ba 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -19,12 +19,14 @@ package vstreamer import ( "context" "fmt" + "sync" "time" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" + "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -36,6 +38,10 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" ) +var ( + rowStreamertHeartbeatInterval = 10 * time.Second +) + // RowStreamer exposes an externally usable interface to rowStreamer. type RowStreamer interface { Stream() error @@ -72,6 +78,8 @@ type rowStreamer struct { sendQuery string vse *Engine pktsize PacketSizer + + throttleResponseRateLimiter *timer.RateLimiter } func newRowStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, query string, lastpk []sqltypes.Value, vschema *localVSchema, send func(*binlogdatapb.VStreamRowsResponse) error, vse *Engine) *rowStreamer { @@ -87,6 +95,8 @@ func newRowStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engi vschema: vschema, vse: vse, pktsize: DefaultPacketSizer(), + + throttleResponseRateLimiter: timer.NewRateLimiter(rowStreamertHeartbeatInterval), } } @@ -266,6 +276,13 @@ func (rs *rowStreamer) buildSelect() (string, error) { } func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.VStreamRowsResponse) error) error { + + var sendMu sync.Mutex + safeSend := func(r *binlogdatapb.VStreamRowsResponse) error { + sendMu.Lock() + defer sendMu.Unlock() + return send(r) + } // Let's wait until MySQL is in good shape to stream rows if err := rs.vse.waitForMySQL(rs.ctx, rs.cp, rs.plan.Table.Name); err != nil { return err @@ -295,7 +312,7 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V charsets[i] = collations.ID(fld.Charset) } - err = send(&binlogdatapb.VStreamRowsResponse{ + err = safeSend(&binlogdatapb.VStreamRowsResponse{ Fields: rs.plan.fields(), Pkfields: pkfields, Gtid: gtid, @@ -304,6 +321,15 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V return fmt.Errorf("stream send error: %v", err) } + // streamQuery sends heartbeats as long as it operates + heartbeatTicker := time.NewTicker(rowStreamertHeartbeatInterval) + defer heartbeatTicker.Stop() + go func() { + for range heartbeatTicker.C { + safeSend(&binlogdatapb.VStreamRowsResponse{Heartbeat: true}) + } + }() + var response binlogdatapb.VStreamRowsResponse var rows []*querypb.Row var rowCount int @@ -320,6 +346,9 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V // check throttler. if !rs.vse.throttlerClient.ThrottleCheckOKOrWait(rs.ctx) { + rs.throttleResponseRateLimiter.Do(func() error { + return safeSend(&binlogdatapb.VStreamRowsResponse{Throttled: true}) + }) continue } @@ -358,7 +387,7 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V rs.vse.rowStreamerNumRows.Add(int64(len(response.Rows))) rs.vse.rowStreamerNumPackets.Add(int64(1)) startSend := time.Now() - err = send(&response) + err = safeSend(&response) if err != nil { return err } @@ -373,7 +402,7 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V response.Lastpk = sqltypes.RowToProto3(lastpk) rs.vse.rowStreamerNumRows.Add(int64(len(response.Rows))) - err = send(&response) + err = safeSend(&response) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index c654136b97c..fdd60b8207f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -440,6 +440,9 @@ func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Fi if ev.Type == binlogdatapb.VEventType_HEARTBEAT { continue } + if ev.Throttled { + continue + } cb := getEventCallback(ev) if cb != nil { cb() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index dc8b5b54e11..fe53c8cf5c9 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -26,6 +26,7 @@ import ( "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/timer" vtschema "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/mysql" @@ -268,24 +269,36 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } // Main loop: calls bufferAndTransmit as events arrive. - timer := time.NewTimer(HeartbeatTime) - defer timer.Stop() + hbTimer := time.NewTimer(HeartbeatTime) + defer hbTimer.Stop() + + injectHeartbeat := func(throttled bool) error { + now := time.Now().UnixNano() + err := bufferAndTransmit(&binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_HEARTBEAT, + Timestamp: now / 1e9, + CurrentTime: now, + Throttled: throttled, + }) + return err + } - // throttledEvents can be read just like you would read from events - // throttledEvents pulls data from events, but throttles pulling data, - // which in turn blocks the BinlogConnection from pushing events to the channel - throttledEvents := make(chan mysql.BinlogEvent) - go func() { + throttleEvents := func(throttledEvents chan mysql.BinlogEvent) { + throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime) for { // check throttler. if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { - select { // make sure to leave if context is cancelled + select { case <-ctx.Done(): return default: // do nothing special } + throttledHeartbeatsRateLimiter.Do(func() error { + return injectHeartbeat(true) + }) + // we won't process events, until we're no longer throttling continue } select { @@ -303,14 +316,19 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog case <-ctx.Done(): return } - } - }() + } + // throttledEvents can be read just like you would read from events + // throttledEvents pulls data from events, but throttles pulling data, + // which in turn blocks the BinlogConnection from pushing events to the channel + throttledEvents := make(chan mysql.BinlogEvent) + go throttleEvents(throttledEvents) + for { - timer.Reset(HeartbeatTime) + hbTimer.Reset(HeartbeatTime) // Drain event if timer fired before reset. select { - case <-timer.C: + case <-hbTimer.C: default: } @@ -346,13 +364,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog vschemaUpdateCount.Add(1) case <-ctx.Done(): return nil - case <-timer.C: - now := time.Now().UnixNano() - if err := bufferAndTransmit(&binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_HEARTBEAT, - Timestamp: now / 1e9, - CurrentTime: now, - }); err != nil { + case <-hbTimer.C: + if err := injectHeartbeat(false); err != nil { if err == io.EOF { return nil } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index 4d3e9bbf331..191ba408f97 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -544,6 +544,9 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { if ev.Type == binlogdatapb.VEventType_HEARTBEAT { continue } + if ev.Throttled { + continue + } allEvents = append(allEvents, ev) } if len(allEvents) == len(expectedEvents) { @@ -2128,6 +2131,9 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog if ev.Type == binlogdatapb.VEventType_HEARTBEAT { continue } + if ev.Throttled { + continue + } evs = append(evs, ev) } case <-ctx.Done(): diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 434ff6168b5..d01dbd9a927 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -45,7 +45,7 @@ import ( const ( streamInfoQuery = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='%s' and db_name='vt_%s'" - streamExtInfoQuery = "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, message, tags from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'" + streamExtInfoQuery = "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, time_throttled, component_throttled, message, tags from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'" copyStateQuery = "select table_name, lastpk from _vt.copy_state where vrepl_id = %d" ) @@ -201,7 +201,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, }, } streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", j+1, bls)) - streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0||", j+1, now, now)) + streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0|||", j+1, now, now)) tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, j+1), noResult) } tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( @@ -209,12 +209,12 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, "int64|varchar|varchar|varchar|varchar"), streamInfoRows...)) tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|message|tags", - "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|varchar|varchar"), + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|varchar|varchar|varchar"), streamExtInfoRows...)) tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|message|tags", - "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|varchar|varchar"), + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|varchar|varchar|varchar"), streamExtInfoRows...)) } @@ -358,7 +358,7 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe } rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls)) rowsRdOnly = append(rows, fmt.Sprintf("%d|%v|||RDONLY", j+1, bls)) - streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0||", j+1, now, now)) + streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0|||", j+1, now, now)) tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, j+1), noResult) } tme.dbTargetClients[i].addInvariant(streamInfoKs, sqltypes.MakeTestResult(sqltypes.MakeTestFields( @@ -372,8 +372,8 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe rowsRdOnly...), ) tme.dbTargetClients[i].addInvariant(streamExtInfoKs, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|message|tags", - "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|varchar|varchar"), + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|varchar|varchar|varchar"), streamExtInfoRows...)) } @@ -382,12 +382,12 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe var streamExtInfoRows []string dbclient.addInvariant(streamInfoKs, &sqltypes.Result{}) for j := range targetShards { - streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks|%d|%d|0||", j+1, now, now)) + streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks|%d|%d|0|0|||", j+1, now, now)) tme.dbSourceClients[i].addInvariant(fmt.Sprintf(copyStateQuery, j+1), noResult) } tme.dbSourceClients[i].addInvariant(streamExtInfoKs, sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|message|tags", - "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|varchar|varchar"), + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|varchar|varchar|varchar"), streamExtInfoRows...)) } return tme diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index ff638e5fa44..4b036d2d341 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -418,6 +418,10 @@ type ReplicationStatus struct { TimeUpdated int64 // TimeHeartbeat represents the time_heartbeat column from the _vt.vreplication table. TimeHeartbeat int64 + // TimeThrottled represents the time_throttled column from the _vt.vreplication table. + TimeThrottled int64 + // ComponentThrottled represents the component_throttled column from the _vt.vreplication table. + ComponentThrottled string // Message represents the message column from the _vt.vreplication table. Message string // Tags contain the tags specified for this stream @@ -432,8 +436,8 @@ type ReplicationStatus struct { func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row sqltypes.RowNamedValues, primary *topo.TabletInfo) (*ReplicationStatus, string, error) { var err error - var id, timeUpdated, transactionTimestamp, timeHeartbeat int64 - var state, dbName, pos, stopPos, message, tags string + var id, timeUpdated, transactionTimestamp, timeHeartbeat, timeThrottled int64 + var state, dbName, pos, stopPos, message, tags, componentThrottled string var bls binlogdatapb.BinlogSource var mpos mysql.Position @@ -485,6 +489,14 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row sqltype if err != nil { return nil, "", err } + timeThrottled, err = row.ToInt64("time_throttled") + if err != nil { + return nil, "", err + } + componentThrottled, err = row.ToString("component_throttled") + if err != nil { + return nil, "", err + } message, err = row.ToString("message") if err != nil { return nil, "", err @@ -505,6 +517,8 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row sqltype TransactionTimestamp: transactionTimestamp, TimeUpdated: timeUpdated, TimeHeartbeat: timeHeartbeat, + TimeThrottled: timeThrottled, + ComponentThrottled: componentThrottled, Message: message, Tags: tags, sourceTimeZone: bls.SourceTimeZone, @@ -524,7 +538,22 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( rsr.ShardStatuses = make(map[string]*ShardReplicationStatus) rsr.Workflow = workflow var results map[*topo.TabletInfo]*querypb.QueryResult - query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, message, tags from _vt.vreplication" + query := `select + id, + source, + pos, + stop_pos, + max_replication_lag, + state, + db_name, + time_updated, + transaction_timestamp, + time_heartbeat, + time_throttled, + component_throttled, + message, + tags + from _vt.vreplication` results, err := wr.runVexec(ctx, workflow, keyspace, query, false) if err != nil { return nil, err diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index 3ffed07c752..341de8118be 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -238,6 +238,8 @@ func TestWorkflowListStreams(t *testing.T) { "TransactionTimestamp": 0, "TimeUpdated": 1234, "TimeHeartbeat": 1234, + "TimeThrottled": 0, + "ComponentThrottled": "", "Message": "", "Tags": "", "CopyState": [ @@ -275,6 +277,8 @@ func TestWorkflowListStreams(t *testing.T) { "TransactionTimestamp": 0, "TimeUpdated": 1234, "TimeHeartbeat": 1234, + "TimeThrottled": 0, + "ComponentThrottled": "", "Message": "", "Tags": "", "CopyState": [ diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index fa83b11679d..fe8d8e90c5c 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -99,11 +99,11 @@ func TestReshardingWorkflowErrorsAndMisc(t *testing.T) { func expectCanSwitchQueries(t *testing.T, tme *testMigraterEnv, keyspace, state string, currentLag int64) { now := time.Now().Unix() - rowTemplate := "1|||||%s|vt_%s|%d|%d|0||" + rowTemplate := "1|||||%s|vt_%s|%d|%d|0|0|||" row := fmt.Sprintf(rowTemplate, state, keyspace, now, now-currentLag) replicationResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|message|tags", - "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|varchar|varchar"), + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags", + "int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|varchar|varchar|varchar"), row) copyStateResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields( "table|lastpk", diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index 5e8b27ab2d8..742b8333323 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -143,11 +143,11 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit env.tmc.setVRResults(primary.tablet, "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", &sqltypes.Result{RowsAffected: 2}) result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|message|tags", - "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|int64|varchar|varchar"), - fmt.Sprintf("1|%v|MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-3||0|Running|vt_target|%d|0|%d||", bls, timeUpdated, timeUpdated), + "id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags", + "int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|int64|int64|varchar|varchar|varchar"), + fmt.Sprintf("1|%v|MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-3||0|Running|vt_target|%d|0|%d|0|||", bls, timeUpdated, timeUpdated), ) - env.tmc.setVRResults(primary.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, message, tags from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result) + env.tmc.setVRResults(primary.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, time_throttled, component_throttled, message, tags from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result) env.tmc.setVRResults( primary.tablet, "select source, pos from _vt.vreplication where db_name='vt_target' and workflow='wrWorkflow'", @@ -172,9 +172,9 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit env.tmc.setVRResults(primary.tablet, "select table_name, lastpk from _vt.copy_state where vrepl_id = 1", result) - env.tmc.setVRResults(primary.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags from _vt.vreplication where db_name = 'vt_target' and workflow = 'bad'", &sqltypes.Result{}) + env.tmc.setVRResults(primary.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, time_throttled, component_throttled, message, tags from _vt.vreplication where db_name = 'vt_target' and workflow = 'bad'", &sqltypes.Result{}) - env.tmc.setVRResults(primary.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags from _vt.vreplication where db_name = 'vt_target' and workflow = 'badwf'", &sqltypes.Result{}) + env.tmc.setVRResults(primary.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, time_heartbeat, time_throttled, component_throttled, message, tags from _vt.vreplication where db_name = 'vt_target' and workflow = 'badwf'", &sqltypes.Result{}) env.tmc.vrpos[tabletID] = testSourceGtid env.tmc.pos[tabletID] = testTargetPrimaryPosition diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index f8af9069117..8433e105025 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -410,7 +410,8 @@ message VEvent { string keyspace = 22; // the source shard string shard = 23; - + // indicate that we are being throttled right now + bool throttled = 24; } message MinimalTable { @@ -456,6 +457,10 @@ message VStreamRowsResponse { string gtid = 3; repeated query.Row rows = 4; query.Row lastpk = 5; + // Throttled indicates that rowstreamer is being throttled right now + bool throttled = 6; + // Heartbeat indicates that this is a heartbeat message + bool heartbeat = 7; } message LastPKEvent { diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index bf6a8d07531..7b0907a59df 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -50156,6 +50156,9 @@ export namespace binlogdata { /** VEvent shard */ shard?: (string|null); + + /** VEvent throttled */ + throttled?: (boolean|null); } /** Represents a VEvent. */ @@ -50206,6 +50209,9 @@ export namespace binlogdata { /** VEvent shard. */ public shard: string; + /** VEvent throttled. */ + public throttled: boolean; + /** * Creates a new VEvent instance using the specified properties. * @param [properties] Properties to set @@ -50810,6 +50816,12 @@ export namespace binlogdata { /** VStreamRowsResponse lastpk */ lastpk?: (query.IRow|null); + + /** VStreamRowsResponse throttled */ + throttled?: (boolean|null); + + /** VStreamRowsResponse heartbeat */ + heartbeat?: (boolean|null); } /** Represents a VStreamRowsResponse. */ @@ -50836,6 +50848,12 @@ export namespace binlogdata { /** VStreamRowsResponse lastpk. */ public lastpk?: (query.IRow|null); + /** VStreamRowsResponse throttled. */ + public throttled: boolean; + + /** VStreamRowsResponse heartbeat. */ + public heartbeat: boolean; + /** * Creates a new VStreamRowsResponse instance using the specified properties. * @param [properties] Properties to set diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index 13336330618..6db9f5984bd 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -118957,6 +118957,7 @@ $root.binlogdata = (function() { * @property {binlogdata.ILastPKEvent|null} [last_p_k_event] VEvent last_p_k_event * @property {string|null} [keyspace] VEvent keyspace * @property {string|null} [shard] VEvent shard + * @property {boolean|null} [throttled] VEvent throttled */ /** @@ -119078,6 +119079,14 @@ $root.binlogdata = (function() { */ VEvent.prototype.shard = ""; + /** + * VEvent throttled. + * @member {boolean} throttled + * @memberof binlogdata.VEvent + * @instance + */ + VEvent.prototype.throttled = false; + /** * Creates a new VEvent instance using the specified properties. * @function create @@ -119128,6 +119137,8 @@ $root.binlogdata = (function() { writer.uint32(/* id 22, wireType 2 =*/178).string(message.keyspace); if (message.shard != null && Object.hasOwnProperty.call(message, "shard")) writer.uint32(/* id 23, wireType 2 =*/186).string(message.shard); + if (message.throttled != null && Object.hasOwnProperty.call(message, "throttled")) + writer.uint32(/* id 24, wireType 0 =*/192).bool(message.throttled); return writer; }; @@ -119201,6 +119212,9 @@ $root.binlogdata = (function() { case 23: message.shard = reader.string(); break; + case 24: + message.throttled = reader.bool(); + break; default: reader.skipType(tag & 7); break; @@ -119308,6 +119322,9 @@ $root.binlogdata = (function() { if (message.shard != null && message.hasOwnProperty("shard")) if (!$util.isString(message.shard)) return "shard: string expected"; + if (message.throttled != null && message.hasOwnProperty("throttled")) + if (typeof message.throttled !== "boolean") + return "throttled: boolean expected"; return null; }; @@ -119458,6 +119475,8 @@ $root.binlogdata = (function() { message.keyspace = String(object.keyspace); if (object.shard != null) message.shard = String(object.shard); + if (object.throttled != null) + message.throttled = Boolean(object.throttled); return message; }; @@ -119496,6 +119515,7 @@ $root.binlogdata = (function() { object.last_p_k_event = null; object.keyspace = ""; object.shard = ""; + object.throttled = false; } if (message.type != null && message.hasOwnProperty("type")) object.type = options.enums === String ? $root.binlogdata.VEventType[message.type] : message.type; @@ -119529,6 +119549,8 @@ $root.binlogdata = (function() { object.keyspace = message.keyspace; if (message.shard != null && message.hasOwnProperty("shard")) object.shard = message.shard; + if (message.throttled != null && message.hasOwnProperty("throttled")) + object.throttled = message.throttled; return object; }; @@ -120897,6 +120919,8 @@ $root.binlogdata = (function() { * @property {string|null} [gtid] VStreamRowsResponse gtid * @property {Array.|null} [rows] VStreamRowsResponse rows * @property {query.IRow|null} [lastpk] VStreamRowsResponse lastpk + * @property {boolean|null} [throttled] VStreamRowsResponse throttled + * @property {boolean|null} [heartbeat] VStreamRowsResponse heartbeat */ /** @@ -120957,6 +120981,22 @@ $root.binlogdata = (function() { */ VStreamRowsResponse.prototype.lastpk = null; + /** + * VStreamRowsResponse throttled. + * @member {boolean} throttled + * @memberof binlogdata.VStreamRowsResponse + * @instance + */ + VStreamRowsResponse.prototype.throttled = false; + + /** + * VStreamRowsResponse heartbeat. + * @member {boolean} heartbeat + * @memberof binlogdata.VStreamRowsResponse + * @instance + */ + VStreamRowsResponse.prototype.heartbeat = false; + /** * Creates a new VStreamRowsResponse instance using the specified properties. * @function create @@ -120994,6 +121034,10 @@ $root.binlogdata = (function() { $root.query.Row.encode(message.rows[i], writer.uint32(/* id 4, wireType 2 =*/34).fork()).ldelim(); if (message.lastpk != null && Object.hasOwnProperty.call(message, "lastpk")) $root.query.Row.encode(message.lastpk, writer.uint32(/* id 5, wireType 2 =*/42).fork()).ldelim(); + if (message.throttled != null && Object.hasOwnProperty.call(message, "throttled")) + writer.uint32(/* id 6, wireType 0 =*/48).bool(message.throttled); + if (message.heartbeat != null && Object.hasOwnProperty.call(message, "heartbeat")) + writer.uint32(/* id 7, wireType 0 =*/56).bool(message.heartbeat); return writer; }; @@ -121049,6 +121093,12 @@ $root.binlogdata = (function() { case 5: message.lastpk = $root.query.Row.decode(reader, reader.uint32()); break; + case 6: + message.throttled = reader.bool(); + break; + case 7: + message.heartbeat = reader.bool(); + break; default: reader.skipType(tag & 7); break; @@ -121119,6 +121169,12 @@ $root.binlogdata = (function() { if (error) return "lastpk." + error; } + if (message.throttled != null && message.hasOwnProperty("throttled")) + if (typeof message.throttled !== "boolean") + return "throttled: boolean expected"; + if (message.heartbeat != null && message.hasOwnProperty("heartbeat")) + if (typeof message.heartbeat !== "boolean") + return "heartbeat: boolean expected"; return null; }; @@ -121171,6 +121227,10 @@ $root.binlogdata = (function() { throw TypeError(".binlogdata.VStreamRowsResponse.lastpk: object expected"); message.lastpk = $root.query.Row.fromObject(object.lastpk); } + if (object.throttled != null) + message.throttled = Boolean(object.throttled); + if (object.heartbeat != null) + message.heartbeat = Boolean(object.heartbeat); return message; }; @@ -121195,6 +121255,8 @@ $root.binlogdata = (function() { if (options.defaults) { object.gtid = ""; object.lastpk = null; + object.throttled = false; + object.heartbeat = false; } if (message.fields && message.fields.length) { object.fields = []; @@ -121215,6 +121277,10 @@ $root.binlogdata = (function() { } if (message.lastpk != null && message.hasOwnProperty("lastpk")) object.lastpk = $root.query.Row.toObject(message.lastpk, options); + if (message.throttled != null && message.hasOwnProperty("throttled")) + object.throttled = message.throttled; + if (message.heartbeat != null && message.hasOwnProperty("heartbeat")) + object.heartbeat = message.heartbeat; return object; };