diff --git a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go index d5f37dc8604..813e9fc23f5 100644 --- a/go/test/endtoend/reparent/newfeaturetest/reparent_test.go +++ b/go/test/endtoend/reparent/newfeaturetest/reparent_test.go @@ -19,10 +19,13 @@ package newfeaturetest import ( "context" "fmt" + "sync" "testing" + "time" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/reparent/utils" ) @@ -146,3 +149,63 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) { err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "replica") require.NoError(t, err) } + +func TestBufferingWithMultipleDisruptions(t *testing.T) { + defer cluster.PanicHandler(t) + clusterInstance := utils.SetupShardedReparentCluster(t) + defer utils.TeardownCluster(clusterInstance) + + // Stop all VTOrc instances, so that they don't interfere with the test. + for _, vtorc := range clusterInstance.VTOrcProcesses { + err := vtorc.TearDown() + require.NoError(t, err) + } + + // Start by reparenting all the shards to the first tablet. + keyspace := clusterInstance.Keyspaces[0] + shards := keyspace.Shards + for _, shard := range shards { + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shard.Name, shard.Vttablets[0].Alias) + require.NoError(t, err) + } + + // We simulate start of external reparent or a PRS where the healthcheck update from the tablet gets lost in transit + // to vtgate by just setting the primary read only. This is also why we needed to shutdown all VTOrcs, so that they don't + // fix this. + utils.RunSQL(context.Background(), t, "set global read_only=1", shards[0].Vttablets[0]) + utils.RunSQL(context.Background(), t, "set global read_only=1", shards[1].Vttablets[0]) + + wg := sync.WaitGroup{} + rowCount := 10 + vtParams := clusterInstance.GetVTParams(keyspace.Name) + // We now spawn writes for a bunch of go routines. + // The ones going to shard 1 and shard 2 should block, since + // they're in the midst of a reparenting operation (as seen by the buffering code). + for i := 1; i <= rowCount; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + return + } + defer conn.Close() + _, err = conn.ExecuteFetch(utils.GetInsertQuery(i), 0, false) + require.NoError(t, err) + }(i) + } + + // Now, run a PRS call on the last shard. This shouldn't unbuffer the queries that are buffered for shards 1 and 2 + // since the disruption on the two shards hasn't stopped. + err := clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[2].Name, shards[2].Vttablets[1].Alias) + require.NoError(t, err) + // We wait a second just to make sure the PRS changes are processed by the buffering logic in vtgate. + time.Sleep(1 * time.Second) + // Finally, we'll now make the 2 shards healthy again by running PRS. + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[0].Name, shards[0].Vttablets[1].Alias) + require.NoError(t, err) + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspace.Name, shards[1].Name, shards[1].Vttablets[1].Alias) + require.NoError(t, err) + // Wait for all the writes to have succeeded. + wg.Wait() +} diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 0f48f2b3fa8..d604d9f20f9 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -75,6 +75,52 @@ func SetupRangeBasedCluster(ctx context.Context, t *testing.T) *cluster.LocalPro return setupCluster(ctx, t, ShardName, []string{cell1}, []int{2}, "semi_sync") } +// SetupShardedReparentCluster is used to setup a sharded cluster for testing +func SetupShardedReparentCluster(t *testing.T) *cluster.LocalProcessCluster { + clusterInstance := cluster.NewCluster(cell1, Hostname) + // Start topo server + err := clusterInstance.StartTopo() + require.NoError(t, err) + + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--lock_tables_timeout", "5s", + // Fast health checks help find corner cases. + "--health_check_interval", "1s", + "--track_schema_versions=true", + "--queryserver_enable_online_ddl=false") + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--enable_buffer", + // Long timeout in case failover is slow. + "--buffer_window", "10m", + "--buffer_max_failover_duration", "10m", + "--buffer_min_time_between_failovers", "20m", + ) + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: KeyspaceName, + SchemaSQL: sqlSchema, + VSchema: `{"sharded": true, "vindexes": {"hash_index": {"type": "hash"}}, "tables": {"vt_insert_test": {"column_vindexes": [{"column": "id", "name": "hash_index"}]}}}`, + } + err = clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false) + require.NoError(t, err) + + // Start Vtgate + err = clusterInstance.StartVtgate() + require.NoError(t, err) + return clusterInstance +} + +// GetInsertQuery returns a built insert query to insert a row. +func GetInsertQuery(idx int) string { + return fmt.Sprintf(insertSQL, idx, idx) +} + +// GetSelectionQuery returns a built selection query read the data. +func GetSelectionQuery() string { + return `select * from vt_insert_test` +} + // TeardownCluster is used to teardown the reparent cluster. When // run in a CI environment -- which is considered true when the // "CI" env variable is set to "true" -- the teardown also removes diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index cb959902c19..823ad4e5503 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -172,6 +172,21 @@ func (fhc *FakeHealthCheck) SetTabletType(tablet *topodatapb.Tablet, tabletType item.ts.Target.TabletType = tabletType } +// SetPrimaryTimestamp sets the primary timestamp for the given tablet +func (fhc *FakeHealthCheck) SetPrimaryTimestamp(tablet *topodatapb.Tablet, timestamp int64) { + if fhc.ch == nil { + return + } + fhc.mu.Lock() + defer fhc.mu.Unlock() + key := TabletToMapKey(tablet) + item, isPresent := fhc.items[key] + if !isPresent { + return + } + item.ts.PrimaryTermStartTime = timestamp +} + // Unsubscribe is not implemented. func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) { } diff --git a/go/vt/discovery/keyspace_events.go b/go/vt/discovery/keyspace_events.go index 014284ed5ee..91f9e67ce3e 100644 --- a/go/vt/discovery/keyspace_events.go +++ b/go/vt/discovery/keyspace_events.go @@ -173,8 +173,12 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool { } type shardState struct { - target *querypb.Target - serving bool + target *querypb.Target + serving bool + // waitForReparent is used to tell the keyspace event watcher + // that this shard should be marked serving only after a reparent + // operation has succeeded. + waitForReparent bool externallyReparented int64 currentPrimary *topodatapb.TabletAlias } @@ -355,8 +359,34 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) { // if the shard went from serving to not serving, or the other way around, the keyspace // is undergoing an availability event if sstate.serving != th.Serving { - sstate.serving = th.Serving kss.consistent = false + switch { + case th.Serving && sstate.waitForReparent: + // While waiting for a reparent, if we receive a serving primary, + // we should check if the primary term start time is greater than the externally reparented time. + // We mark the shard serving only if it is. This is required so that we don't prematurely stop + // buffering for PRS, or TabletExternallyReparented, after seeing a serving healthcheck from the + // same old primary tablet that has already been turned read-only. + if th.PrimaryTermStartTime > sstate.externallyReparented { + sstate.waitForReparent = false + sstate.serving = true + } + case th.Serving && !sstate.waitForReparent: + sstate.serving = true + case !th.Serving: + sstate.serving = false + } + } + if !th.Serving { + // Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait + // for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop + // buffering when we receive a serving healthcheck from the primary that is being demoted. + // However, if we receive a non-serving check, then we know that we won't receive any more serving + // health checks until reparent finishes. Specifically, this helps us when PRS fails, but + // stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote + // the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop + // buffering for that case. + sstate.waitForReparent = false } // if the primary for this shard has been externally reparented, we're undergoing a failover, @@ -651,3 +681,36 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string { } return servingKeyspaces } + +// MarkShardNotServing marks the given shard not serving. +// We use this when we start buffering for a given shard. This helps +// coordinate between the sharding logic and the keyspace event watcher. +// We take in a boolean as well to tell us whether this error is because +// a reparent is ongoing. If it is, we also mark the shard to wait for a reparent. +// The return argument is whether the shard was found and marked not serving successfully or not. +func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspace string, shard string, isReparentErr bool) bool { + kss := kew.getKeyspaceStatus(ctx, keyspace) + if kss == nil { + // Only happens if the keyspace was deleted. + return false + } + kss.mu.Lock() + defer kss.mu.Unlock() + sstate := kss.shards[shard] + if sstate == nil { + // This only happens if the shard is deleted, or if + // the keyspace event watcher hasn't seen the shard at all. + return false + } + // Mark the keyspace inconsistent and the shard not serving. + kss.consistent = false + sstate.serving = false + if isReparentErr { + // If the error was triggered because a reparent operation has started. + // We mark the shard to wait for a reparent to finish before marking it serving. + // This is required to prevent premature stopping of buffering if we receive + // a serving healthcheck from a primary that is being demoted. + sstate.waitForReparent = true + } + return true +} diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index 43af4bf49de..86b4c756119 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -19,6 +19,7 @@ package discovery import ( "context" "encoding/hex" + "sync" "testing" "time" @@ -277,6 +278,234 @@ func TestKeyspaceEventTypes(t *testing.T) { } } +func TestOnHealthCheck(t *testing.T) { + testcases := []struct { + name string + ss *shardState + th *TabletHealth + wantServing bool + wantWaitForReparent bool + wantExternallyReparented int64 + wantUID uint32 + }{ + { + name: "Non primary tablet health ignored", + ss: &shardState{ + serving: false, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_REPLICA, + }, + Serving: true, + }, + wantServing: false, + wantWaitForReparent: false, + wantExternallyReparented: 10, + wantUID: 1, + }, { + name: "Serving primary seen in non-serving shard", + ss: &shardState{ + serving: false, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 20, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 2, + }, + }, + }, + wantServing: true, + wantWaitForReparent: false, + wantExternallyReparented: 20, + wantUID: 2, + }, { + name: "New serving primary seen while waiting for reparent", + ss: &shardState{ + serving: false, + waitForReparent: true, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 20, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 2, + }, + }, + }, + wantServing: true, + wantWaitForReparent: false, + wantExternallyReparented: 20, + wantUID: 2, + }, { + name: "Old serving primary seen while waiting for reparent", + ss: &shardState{ + serving: false, + waitForReparent: true, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 10, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + }, + wantServing: false, + wantWaitForReparent: true, + wantExternallyReparented: 10, + wantUID: 1, + }, { + name: "Old non-serving primary seen while waiting for reparent", + ss: &shardState{ + serving: false, + waitForReparent: true, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: false, + PrimaryTermStartTime: 10, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + }, + wantServing: false, + wantWaitForReparent: false, + wantExternallyReparented: 10, + wantUID: 1, + }, { + name: "New serving primary while already serving", + ss: &shardState{ + serving: true, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: true, + PrimaryTermStartTime: 20, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 2, + }, + }, + }, + wantServing: true, + wantWaitForReparent: false, + wantExternallyReparented: 20, + wantUID: 2, + }, { + name: "Primary goes non serving", + ss: &shardState{ + serving: true, + waitForReparent: false, + externallyReparented: 10, + currentPrimary: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + th: &TabletHealth{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_PRIMARY, + }, + Serving: false, + PrimaryTermStartTime: 10, + Tablet: &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: testCell, + Uid: 1, + }, + }, + }, + wantServing: false, + wantWaitForReparent: false, + wantExternallyReparented: 10, + wantUID: 1, + }, + } + + ksName := "ks" + shard := "-80" + kss := &keyspaceState{ + mu: sync.Mutex{}, + keyspace: ksName, + shards: make(map[string]*shardState), + } + // Adding this so that we don't run any topo calls from ensureConsistentLocked. + kss.moveTablesState = &MoveTablesState{ + Typ: MoveTablesRegular, + State: MoveTablesSwitching, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + kss.shards[shard] = tt.ss + tt.th.Target.Keyspace = ksName + tt.th.Target.Shard = shard + kss.onHealthCheck(tt.th) + require.Equal(t, tt.wantServing, tt.ss.serving) + require.Equal(t, tt.wantWaitForReparent, tt.ss.waitForReparent) + require.Equal(t, tt.wantExternallyReparented, tt.ss.externallyReparented) + require.Equal(t, tt.wantUID, tt.ss.currentPrimary.Uid) + }) + } +} + type fakeTopoServer struct { } diff --git a/go/vt/vtgate/buffer/buffer.go b/go/vt/vtgate/buffer/buffer.go index 622bb03b082..0900709145f 100644 --- a/go/vt/vtgate/buffer/buffer.go +++ b/go/vt/vtgate/buffer/buffer.go @@ -94,6 +94,18 @@ func CausedByFailover(err error) bool { return isFailover } +// isErrorDueToReparenting is a stronger check than CausedByFailover, meant to return +// if the failure is caused because of a reparent. +func isErrorDueToReparenting(err error) bool { + if vterrors.Code(err) != vtrpcpb.Code_CLUSTER_EVENT { + return false + } + if strings.Contains(err.Error(), ClusterEventReshardingInProgress) { + return false + } + return true +} + // for debugging purposes func getReason(err error) string { for _, ce := range ClusterEvents { @@ -171,7 +183,7 @@ func New(cfg *Config) *Buffer { // It returns an error if buffering failed (e.g. buffer full). // If it does not return an error, it may return a RetryDoneFunc which must be // called after the request was retried. -func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) { +func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) { // If an err is given, it must be related to a failover. // We never buffer requests with other errors. if err != nil && !CausedByFailover(err) { @@ -188,7 +200,7 @@ func (b *Buffer) WaitForFailoverEnd(ctx context.Context, keyspace, shard string, requestsSkipped.Add([]string{keyspace, shard, skippedDisabled}, 1) return nil, nil } - return sb.waitForFailoverEnd(ctx, keyspace, shard, err) + return sb.waitForFailoverEnd(ctx, keyspace, shard, kev, err) } func (b *Buffer) HandleKeyspaceEvent(ksevent *discovery.KeyspaceEvent) { diff --git a/go/vt/vtgate/buffer/buffer_helper_test.go b/go/vt/vtgate/buffer/buffer_helper_test.go index 2deb460fc39..1276f0cd751 100644 --- a/go/vt/vtgate/buffer/buffer_helper_test.go +++ b/go/vt/vtgate/buffer/buffer_helper_test.go @@ -50,7 +50,7 @@ func issueRequestAndBlockRetry(ctx context.Context, t *testing.T, b *Buffer, err bufferingStopped := make(chan error) go func() { - retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, failoverErr) + retryDone, err := b.WaitForFailoverEnd(ctx, keyspace, shard, nil, failoverErr) if err != nil { bufferingStopped <- err } diff --git a/go/vt/vtgate/buffer/buffer_test.go b/go/vt/vtgate/buffer/buffer_test.go index 7f32364d57f..b152f0dbfa9 100644 --- a/go/vt/vtgate/buffer/buffer_test.go +++ b/go/vt/vtgate/buffer/buffer_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "gotest.tools/assert" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vterrors" @@ -68,6 +70,32 @@ var ( } ) +func TestIsErrorDueToReparenting(t *testing.T) { + testcases := []struct { + err error + want bool + }{ + { + err: vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, ClusterEventReshardingInProgress), + want: false, + }, + { + err: vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, ClusterEventReparentInProgress), + want: true, + }, + { + err: vterrors.Errorf(vtrpcpb.Code_CLUSTER_EVENT, "The MySQL server is running with the --super-read-only option"), + want: true, + }, + } + for _, tt := range testcases { + t.Run(tt.err.Error(), func(t *testing.T) { + got := isErrorDueToReparenting(tt.err) + assert.Equal(t, tt.want, got) + }) + } +} + func TestBuffering(t *testing.T) { testAllImplementations(t, testBuffering1) } @@ -107,7 +135,7 @@ func testBuffering1(t *testing.T, fail failover) { } // Subsequent requests with errors not related to the failover are not buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil { t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone) } @@ -155,7 +183,7 @@ func testBuffering1(t *testing.T, fail failover) { } // Second failover: Buffering is skipped because last failover is too recent. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("subsequent failovers must be skipped due to -buffer_min_time_between_failovers setting. err: %v retryDone: %v", err, retryDone) } if got, want := requestsSkipped.Counts()[statsKeyJoinedLastFailoverTooRecent], int64(1); got != want { @@ -213,7 +241,7 @@ func testDryRun1(t *testing.T, fail failover) { b := New(cfg) // Request does not get buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests must not be buffered during dry-run. err: %v retryDone: %v", err, retryDone) } // But the internal state changes though. @@ -259,10 +287,10 @@ func testPassthrough1(t *testing.T, fail failover) { b := New(cfg) - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil { t.Fatalf("requests with no error must never be buffered. err: %v retryDone: %v", err, retryDone) } - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nonFailoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nonFailoverErr); err != nil || retryDone != nil { t.Fatalf("requests with non-failover errors must never be buffered. err: %v retryDone: %v", err, retryDone) } @@ -298,7 +326,7 @@ func testLastReparentTooRecentBufferingSkipped1(t *testing.T, fail failover) { now = now.Add(1 * time.Second) fail(b, newPrimary, keyspace, shard, now) - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests where the failover end was recently detected before the start must not be buffered. err: %v retryDone: %v", err, retryDone) } if err := waitForPoolSlots(b, cfg.Size); err != nil { @@ -395,10 +423,10 @@ func testPassthroughDuringDrain1(t *testing.T, fail failover) { } // Requests during the drain will be passed through and not buffered. - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, nil); err != nil || retryDone != nil { t.Fatalf("requests with no error must not be buffered during a drain. err: %v retryDone: %v", err, retryDone) } - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests with failover errors must not be buffered during a drain. err: %v retryDone: %v", err, retryDone) } @@ -430,7 +458,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) { b := New(cfg) ignoredKeyspace := "ignored_ks" - if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), ignoredKeyspace, shard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests for ignored keyspaces must not be buffered. err: %v retryDone: %v", err, retryDone) } statsKeyJoined := strings.Join([]string{ignoredKeyspace, shard, skippedDisabled}, ".") @@ -439,7 +467,7 @@ func testPassthroughIgnoredKeyspaceOrShard1(t *testing.T, fail failover) { } ignoredShard := "ff-" - if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, failoverErr); err != nil || retryDone != nil { + if retryDone, err := b.WaitForFailoverEnd(context.Background(), keyspace, ignoredShard, nil, failoverErr); err != nil || retryDone != nil { t.Fatalf("requests for ignored shards must not be buffered. err: %v retryDone: %v", err, retryDone) } if err := waitForPoolSlots(b, cfg.Size); err != nil { @@ -621,7 +649,7 @@ func testEvictionNotPossible1(t *testing.T, fail failover) { // Newer requests of the second failover cannot evict anything because // they have no entries buffered. - retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, failoverErr) + retryDone, bufferErr := b.WaitForFailoverEnd(context.Background(), keyspace, shard2, nil, failoverErr) if bufferErr == nil || retryDone != nil { t.Fatalf("buffer should have returned an error because it's full: err: %v retryDone: %v", bufferErr, retryDone) } diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index ae33aabb399..ef3ae5afcf9 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -137,7 +137,7 @@ func (sb *shardBuffer) disabled() bool { return sb.mode == bufferModeDisabled } -func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, err error) (RetryDoneFunc, error) { +func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard string, kev *discovery.KeyspaceEventWatcher, err error) (RetryDoneFunc, error) { // We assume if err != nil then it's always caused by a failover. // Other errors must be filtered at higher layers. failoverDetected := err != nil @@ -211,7 +211,11 @@ func (sb *shardBuffer) waitForFailoverEnd(ctx context.Context, keyspace, shard s return nil, nil } - sb.startBufferingLocked(err) + // Try to start buffering. If we're unsuccessful, then we exit early. + if !sb.startBufferingLocked(ctx, kev, err) { + sb.mu.Unlock() + return nil, nil + } } if sb.mode == bufferModeDryRun { @@ -255,7 +259,16 @@ func (sb *shardBuffer) shouldBufferLocked(failoverDetected bool) bool { panic("BUG: All possible states must be covered by the switch expression above.") } -func (sb *shardBuffer) startBufferingLocked(err error) { +func (sb *shardBuffer) startBufferingLocked(ctx context.Context, kev *discovery.KeyspaceEventWatcher, err error) bool { + if kev != nil { + if !kev.MarkShardNotServing(ctx, sb.keyspace, sb.shard, isErrorDueToReparenting(err)) { + // We failed to mark the shard as not serving. Do not buffer the request. + // This can happen if the keyspace has been deleted or if the keyspace even watcher + // hasn't yet seen the shard. Keyspace event watcher might not stop buffering for this + // request at all until it times out. It's better to not buffer this request. + return false + } + } // Reset monitoring data from previous failover. lastRequestsInFlightMax.Set(sb.statsKey, 0) lastRequestsDryRunMax.Set(sb.statsKey, 0) @@ -281,6 +294,7 @@ func (sb *shardBuffer) startBufferingLocked(err error) { sb.buf.config.MaxFailoverDuration, errorsanitizer.NormalizeError(err.Error()), ) + return true } // logErrorIfStateNotLocked logs an error if the current state is not "state". diff --git a/go/vt/vtgate/buffer/variables_test.go b/go/vt/vtgate/buffer/variables_test.go index a0640bde9e4..30d2426c639 100644 --- a/go/vt/vtgate/buffer/variables_test.go +++ b/go/vt/vtgate/buffer/variables_test.go @@ -51,7 +51,7 @@ func TestVariablesAreInitialized(t *testing.T) { // Create a new buffer and make a call which will create the shardBuffer object. // After that, the variables should be initialized for that shard. b := New(NewDefaultConfig()) - _, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil /* err */) + _, err := b.WaitForFailoverEnd(context.Background(), "init_test", "0", nil, nil) if err != nil { t.Fatalf("buffer should just passthrough and not return an error: %v", err) } diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index de63da87907..df6db786471 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -260,7 +260,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, // b) no transaction was created yet. if gw.buffer != nil && !bufferedOnce && !inTransaction && target.TabletType == topodatapb.TabletType_PRIMARY { // The next call blocks if we should buffer during a failover. - retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, err) + retryDone, bufferErr := gw.buffer.WaitForFailoverEnd(ctx, target.Keyspace, target.Shard, gw.kev, err) // Request may have been buffered. if retryDone != nil { diff --git a/go/vt/vtgate/tabletgateway_flaky_test.go b/go/vt/vtgate/tabletgateway_flaky_test.go index f625b5599cd..151fb557591 100644 --- a/go/vt/vtgate/tabletgateway_flaky_test.go +++ b/go/vt/vtgate/tabletgateway_flaky_test.go @@ -219,6 +219,7 @@ func TestGatewayBufferingWhileReparenting(t *testing.T) { hc.SetTabletType(primaryTablet, topodatapb.TabletType_REPLICA) hc.Broadcast(primaryTablet) hc.SetTabletType(replicaTablet, topodatapb.TabletType_PRIMARY) + hc.SetPrimaryTimestamp(replicaTablet, 100) // We set a higher timestamp than before to simulate a PRS. hc.SetServing(replicaTablet, true) hc.Broadcast(replicaTablet) @@ -230,7 +231,7 @@ outer: require.Fail(t, "timed out - could not verify the new primary") case <-time.After(10 * time.Millisecond): newPrimary, notServing := tg.kev.PrimaryIsNotServing(ctx, target) - if newPrimary != nil && newPrimary.Uid == 1 && !notServing { + if newPrimary != nil && newPrimary.Uid == replicaTablet.Alias.Uid && !notServing { break outer } }