diff --git a/changelog/20.0/20.0.0/summary.md b/changelog/20.0/20.0.0/summary.md
index bb376c6e721..cd3b9718503 100644
--- a/changelog/20.0/20.0.0/summary.md
+++ b/changelog/20.0/20.0.0/summary.md
@@ -3,6 +3,8 @@
### Table of Contents
- **[Major Changes](#major-changes)**
+ - **[Breaking changes](#breaking-changes)**
+ - [`shutdown_grace_period` Default Change](#shutdown-grace-period-default)
- **[Query Compatibility](#query-compatibility)**
- [Vindex Hints](#vindex-hints)
- [Update with Limit Support](#update-limit)
@@ -16,6 +18,15 @@
## Major Changes
+### Breaking Changes
+
+#### `shutdown_grace_period` Default Change
+
+The `--shutdown_grace_period` flag, which was introduced in v2 with a default of `0 seconds`, has now been changed to default to `3 seconds`.
+This makes reparenting in Vitess resilient to client errors, and prevents PlannedReparentShard from timing out.
+
+In order to preserve the old behaviour, the users can set the flag back to `0 seconds` causing open transactions to never be shutdown, but in that case, they run the risk of PlannedReparentShard calls timing out.
+
### Query Compatibility
#### Vindex Hints
@@ -61,7 +72,6 @@ The `--pprof-http` flag, which was introduced in v19 with a default of `true`, h
This makes HTTP `pprof` endpoints now an *opt-in* feature, rather than opt-out.
To continue enabling these endpoints, explicitly set `--pprof-http` when starting up Vitess components.
-
## Minor Changes
### New Stats
diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt
index 04a46d9e754..26e145e6930 100644
--- a/go/flags/endtoend/vtcombo.txt
+++ b/go/flags/endtoend/vtcombo.txt
@@ -319,7 +319,7 @@ Flags:
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
- --shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown.
+ --shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s)
--sql-max-length-errors int truncate queries in error logs to the given length (default unlimited)
--sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512)
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt
index 6ff475badfa..ea023318b01 100644
--- a/go/flags/endtoend/vttablet.txt
+++ b/go/flags/endtoend/vttablet.txt
@@ -319,7 +319,7 @@ Flags:
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
- --shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown.
+ --shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s)
--sql-max-length-errors int truncate queries in error logs to the given length (default unlimited)
--sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512)
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
diff --git a/go/test/endtoend/tabletgateway/main_test.go b/go/test/endtoend/tabletgateway/main_test.go
index da4fe711f64..354be6969d3 100644
--- a/go/test/endtoend/tabletgateway/main_test.go
+++ b/go/test/endtoend/tabletgateway/main_test.go
@@ -18,6 +18,7 @@ package healthcheck
import (
"flag"
+ "fmt"
"os"
"testing"
@@ -26,11 +27,12 @@ import (
)
var (
- clusterInstance *cluster.LocalProcessCluster
- vtParams mysql.ConnParams
- keyspaceName = "commerce"
- cell = "zone1"
- sqlSchema = `create table product(
+ clusterInstance *cluster.LocalProcessCluster
+ vtParams mysql.ConnParams
+ keyspaceName = "commerce"
+ vtgateGrpcAddress string
+ cell = "zone1"
+ sqlSchema = `create table product(
sku varbinary(128),
description varbinary(128),
price bigint,
@@ -64,7 +66,7 @@ func TestMain(m *testing.M) {
exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, "localhost")
- clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s"}
+ clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s", "--shutdown_grace_period", "3s"}
defer clusterInstance.Teardown()
// Start topo server
@@ -96,6 +98,7 @@ func TestMain(m *testing.M) {
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
+ vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)
return m.Run()
}()
os.Exit(exitCode)
diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go
index c48aa6c2131..d9cedc04b69 100644
--- a/go/test/endtoend/tabletgateway/vtgate_test.go
+++ b/go/test/endtoend/tabletgateway/vtgate_test.go
@@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils"
+ querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)
@@ -283,6 +284,44 @@ func TestReplicaTransactions(t *testing.T) {
assert.Equal(t, `[[INT64(1) VARCHAR("email1")] [INT64(2) VARCHAR("email2")]]`, fmt.Sprintf("%v", qr4.Rows), "we are not able to reconnect after restart")
}
+// TestStreamingRPCStuck tests that StreamExecute calls don't get stuck on the vttablets if a client stop reading from a stream.
+func TestStreamingRPCStuck(t *testing.T) {
+ defer cluster.PanicHandler(t)
+ ctx := context.Background()
+ vtConn, err := mysql.Connect(ctx, &vtParams)
+ require.NoError(t, err)
+ defer vtConn.Close()
+
+ // We want the table to have enough rows such that a streaming call returns multiple packets.
+ // Therefore, we insert one row and keep doubling it.
+ utils.Exec(t, vtConn, "insert into customer(email) values('testemail')")
+ for i := 0; i < 15; i++ {
+ // Double the number of rows in customer table.
+ utils.Exec(t, vtConn, "insert into customer (email) select email from customer")
+ }
+
+ // Connect to vtgate and run a streaming query.
+ vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "test_user", "")
+ require.NoError(t, err)
+ stream, err := vtgateConn.Session("", &querypb.ExecuteOptions{}).StreamExecute(ctx, "select * from customer", map[string]*querypb.BindVariable{})
+ require.NoError(t, err)
+
+ // We read packets until we see the first set of results. This ensures that the stream is working.
+ for {
+ res, err := stream.Recv()
+ require.NoError(t, err)
+ if res != nil && len(res.Rows) > 0 {
+ // breaking here stops reading from the stream.
+ break
+ }
+ }
+
+ // We simulate a misbehaving client that doesn't read from the stream anymore.
+ // This however shouldn't block PlannedReparentShard calls.
+ err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, "0", clusterInstance.Keyspaces[0].Shards[0].Vttablets[1].Alias)
+ require.NoError(t, err)
+}
+
func getMapFromJSON(JSON map[string]any, key string) map[string]any {
result := make(map[string]any)
object := reflect.ValueOf(JSON[key])
diff --git a/go/test/endtoend/vtgate/transaction/restart/main_test.go b/go/test/endtoend/vtgate/transaction/restart/main_test.go
index 3c7ac710e9d..de52a3e8870 100644
--- a/go/test/endtoend/vtgate/transaction/restart/main_test.go
+++ b/go/test/endtoend/vtgate/transaction/restart/main_test.go
@@ -60,6 +60,9 @@ func TestMain(m *testing.M) {
Name: keyspaceName,
SchemaSQL: schemaSQL,
}
+ clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
+ "--shutdown_grace_period=0s",
+ )
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false)
if err != nil {
return 1
diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go
index 5690c209ebb..f371d62006c 100644
--- a/go/vt/vttablet/tabletserver/query_executor.go
+++ b/go/vt/vttablet/tabletserver/query_executor.go
@@ -1117,7 +1117,7 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction
// Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it
// to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional)
- // weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go.
+ // weren't getting cleaned up during unserveCommon>terminateAllQueries in state_manager.go.
// This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange
// once their grace period is over.
qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn)
diff --git a/go/vt/vttablet/tabletserver/requests_waiter.go b/go/vt/vttablet/tabletserver/requests_waiter.go
new file mode 100644
index 00000000000..39e08f924cc
--- /dev/null
+++ b/go/vt/vttablet/tabletserver/requests_waiter.go
@@ -0,0 +1,78 @@
+/*
+Copyright 2024 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tabletserver
+
+import "sync"
+
+// requestsWaiter is used to wait for requests. It stores the count of the requests pending,
+// and also the number of waiters currently waiting. It has a mutex as well to protects its fields.
+type requestsWaiter struct {
+ mu sync.Mutex
+ wg sync.WaitGroup
+ // waitCounter is the number of goroutines that are waiting for wg to be empty.
+ // If this value is greater than zero, then we have to ensure that we don't Add to the requests
+ // to avoid any panics in the wait.
+ waitCounter int
+ // counter is the count of the number of outstanding requests.
+ counter int
+}
+
+// newRequestsWaiter creates a new requestsWaiter.
+func newRequestsWaiter() *requestsWaiter {
+ return &requestsWaiter{}
+}
+
+// Add adds to the requestsWaiter.
+func (r *requestsWaiter) Add(val int) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.counter += val
+ r.wg.Add(val)
+}
+
+// Done subtracts 1 from the requestsWaiter.
+func (r *requestsWaiter) Done() {
+ r.Add(-1)
+}
+
+// addToWaitCounter adds to the waitCounter while being protected by a mutex.
+func (r *requestsWaiter) addToWaitCounter(val int) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.waitCounter += val
+}
+
+// WaitToBeEmpty waits for requests to be empty. It also increments and decrements the waitCounter as required.
+func (r *requestsWaiter) WaitToBeEmpty() {
+ r.addToWaitCounter(1)
+ r.wg.Wait()
+ r.addToWaitCounter(-1)
+}
+
+// GetWaiterCount gets the number of go routines currently waiting on the wait group.
+func (r *requestsWaiter) GetWaiterCount() int {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ return r.waitCounter
+}
+
+// GetOutstandingRequestsCount gets the number of requests outstanding.
+func (r *requestsWaiter) GetOutstandingRequestsCount() int {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ return r.counter
+}
diff --git a/go/vt/vttablet/tabletserver/requests_waiter_test.go b/go/vt/vttablet/tabletserver/requests_waiter_test.go
new file mode 100644
index 00000000000..078e32e92ca
--- /dev/null
+++ b/go/vt/vttablet/tabletserver/requests_waiter_test.go
@@ -0,0 +1,57 @@
+/*
+Copyright 2024 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tabletserver
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+)
+
+// TestRequestWaiter tests the functionality of request waiter.
+func TestRequestWaiter(t *testing.T) {
+ rw := newRequestsWaiter()
+ require.EqualValues(t, 0, rw.GetWaiterCount())
+ require.EqualValues(t, 0, rw.GetOutstandingRequestsCount())
+
+ rw.Add(3)
+ require.EqualValues(t, 0, rw.GetWaiterCount())
+ require.EqualValues(t, 3, rw.GetOutstandingRequestsCount())
+
+ rw.Done()
+ require.EqualValues(t, 0, rw.GetWaiterCount())
+ require.EqualValues(t, 2, rw.GetOutstandingRequestsCount())
+
+ go func() {
+ rw.WaitToBeEmpty()
+ }()
+ go func() {
+ rw.WaitToBeEmpty()
+ }()
+
+ time.Sleep(100 * time.Millisecond)
+ require.EqualValues(t, 2, rw.GetWaiterCount())
+ require.EqualValues(t, 2, rw.GetOutstandingRequestsCount())
+
+ rw.Done()
+ rw.Done()
+
+ time.Sleep(100 * time.Millisecond)
+ require.EqualValues(t, 0, rw.GetWaiterCount())
+ require.EqualValues(t, 0, rw.GetOutstandingRequestsCount())
+}
diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go
index 9c01610f770..60b1f1281d0 100644
--- a/go/vt/vttablet/tabletserver/state_manager.go
+++ b/go/vt/vttablet/tabletserver/state_manager.go
@@ -99,12 +99,8 @@ type stateManager struct {
alsoAllow []topodatapb.TabletType
reason string
transitionErr error
- // requestsWaitCounter is the number of goroutines that are waiting for requests to be empty.
- // If this value is greater than zero, then we have to ensure that we don't Add to the requests
- // to avoid any panics in the wait.
- requestsWaitCounter int
- requests sync.WaitGroup
+ rw *requestsWaiter
// QueryList does not have an Open or Close.
statelessql *QueryList
@@ -358,20 +354,6 @@ func (sm *stateManager) checkMySQL() {
}()
}
-// addRequestsWaitCounter adds to the requestsWaitCounter while being protected by a mutex.
-func (sm *stateManager) addRequestsWaitCounter(val int) {
- sm.mu.Lock()
- defer sm.mu.Unlock()
- sm.requestsWaitCounter += val
-}
-
-// waitForRequestsToBeEmpty waits for requests to be empty. It also increments and decrements the requestsWaitCounter as required.
-func (sm *stateManager) waitForRequestsToBeEmpty() {
- sm.addRequestsWaitCounter(1)
- sm.requests.Wait()
- sm.addRequestsWaitCounter(-1)
-}
-
func (sm *stateManager) setWantState(stateWanted servingState) {
sm.mu.Lock()
defer sm.mu.Unlock()
@@ -410,9 +392,9 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target
}
shuttingDown := sm.wantState != StateServing
- // If requestsWaitCounter is not zero, then there are go-routines blocked on waiting for requests to be empty.
+ // If wait counter for the requests is not zero, then there are go-routines blocked on waiting for requests to be empty.
// We cannot allow adding to the requests to prevent any panics from happening.
- if (shuttingDown && !allowOnShutdown) || sm.requestsWaitCounter > 0 {
+ if (shuttingDown && !allowOnShutdown) || sm.rw.GetWaiterCount() > 0 {
// This specific error string needs to be returned for vtgate buffering to work.
return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown)
}
@@ -421,13 +403,13 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target
if err != nil {
return err
}
- sm.requests.Add(1)
+ sm.rw.Add(1)
return nil
}
// EndRequest unregisters the current request (a waitgroup) as done.
func (sm *stateManager) EndRequest() {
- sm.requests.Done()
+ sm.rw.Done()
}
// VerifyTarget allows requests to be executed even in non-serving state.
@@ -507,7 +489,7 @@ func (sm *stateManager) unservePrimary() error {
func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) error {
// We are likely transitioning from primary. We have to honor
// the shutdown grace period.
- cancel := sm.handleShutdownGracePeriod()
+ cancel := sm.terminateAllQueries(nil)
defer cancel()
sm.ddle.Close()
@@ -560,9 +542,12 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error {
}
func (sm *stateManager) unserveCommon() {
+ // We create a wait group that tracks whether all the queries have been terminated or not.
+ wg := sync.WaitGroup{}
+ wg.Add(1)
log.Infof("Started execution of unserveCommon")
- cancel := sm.handleShutdownGracePeriod()
- log.Infof("Finished execution of handleShutdownGracePeriod")
+ cancel := sm.terminateAllQueries(&wg)
+ log.Infof("Finished execution of terminateAllQueries")
defer cancel()
log.Infof("Started online ddl executor close")
@@ -580,22 +565,45 @@ func (sm *stateManager) unserveCommon() {
log.Info("Finished Killing all OLAP queries. Started tracker close")
sm.tracker.Close()
log.Infof("Finished tracker close. Started wait for requests")
- sm.waitForRequestsToBeEmpty()
- log.Infof("Finished wait for requests. Finished execution of unserveCommon")
+ sm.handleShutdownGracePeriod(&wg)
+ log.Infof("Finished handling grace period. Finished execution of unserveCommon")
+}
+
+// handleShutdownGracePeriod checks if we have shutdwonGracePeriod specified.
+// If its not, then we have to wait for all the requests to be empty.
+// Otherwise, we only wait for all the queries against MySQL to be terminated.
+func (sm *stateManager) handleShutdownGracePeriod(wg *sync.WaitGroup) {
+ // If there is no shutdown grace period specified, then we should wait for all the requests to be empty.
+ if sm.shutdownGracePeriod == 0 {
+ sm.rw.WaitToBeEmpty()
+ } else {
+ // We quickly check if the requests are empty or not.
+ // If they are, then we don't need to wait for the shutdown to complete.
+ count := sm.rw.GetOutstandingRequestsCount()
+ if count == 0 {
+ return
+ }
+ // Otherwise, we should wait for all olap queries to be killed.
+ // We don't need to wait for requests to be empty since we have ensured all the queries against MySQL have been killed.
+ wg.Wait()
+ }
}
-func (sm *stateManager) handleShutdownGracePeriod() (cancel func()) {
+func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func()) {
if sm.shutdownGracePeriod == 0 {
return func() {}
}
ctx, cancel := context.WithCancel(context.TODO())
go func() {
+ if wg != nil {
+ defer wg.Done()
+ }
if err := timer.SleepContext(ctx, sm.shutdownGracePeriod); err != nil {
return
}
log.Infof("Grace Period %v exceeded. Killing all OLTP queries.", sm.shutdownGracePeriod)
sm.statelessql.TerminateAll()
- log.Infof("Killed all stateful OLTP queries.")
+ log.Infof("Killed all stateless OLTP queries.")
sm.statefulql.TerminateAll()
log.Infof("Killed all OLTP queries.")
}()
@@ -645,7 +653,7 @@ func (sm *stateManager) setState(tabletType topodatapb.TabletType, state serving
log.Infof("TabletServer transition: %v -> %v for tablet %s:%s/%s",
sm.stateStringLocked(sm.target.TabletType, sm.state), sm.stateStringLocked(tabletType, state),
sm.target.Cell, sm.target.Keyspace, sm.target.Shard)
- sm.handleGracePeriod(tabletType)
+ sm.handleTransitionGracePeriod(tabletType)
sm.target.TabletType = tabletType
if sm.state == StateNotConnected {
// If we're transitioning out of StateNotConnected, we have
@@ -664,7 +672,7 @@ func (sm *stateManager) stateStringLocked(tabletType topodatapb.TabletType, stat
return fmt.Sprintf("%v: %v, %v", tabletType, state, sm.ptsTimestamp.Local().Format("Jan 2, 2006 at 15:04:05 (MST)"))
}
-func (sm *stateManager) handleGracePeriod(tabletType topodatapb.TabletType) {
+func (sm *stateManager) handleTransitionGracePeriod(tabletType topodatapb.TabletType) {
if tabletType != topodatapb.TabletType_PRIMARY {
// We allow serving of previous type only for a primary transition.
sm.alsoAllow = nil
diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go
index 59909888935..a0ef3557074 100644
--- a/go/vt/vttablet/tabletserver/state_manager_test.go
+++ b/go/vt/vttablet/tabletserver/state_manager_test.go
@@ -721,7 +721,7 @@ func TestPanicInWait(t *testing.T) {
// Simulate going to a not serving state and calling unserveCommon that waits on requests.
sm.wantState = StateNotServing
- sm.waitForRequestsToBeEmpty()
+ sm.rw.WaitToBeEmpty()
}
func verifySubcomponent(t *testing.T, order int64, component any, state testState) {
@@ -752,6 +752,7 @@ func newTestStateManager(t *testing.T) *stateManager {
ddle: &testOnlineDDLExecutor{},
throttler: &testLagThrottler{},
tableGC: &testTableGC{},
+ rw: newRequestsWaiter(),
}
sm.Init(env, &querypb.Target{})
sm.hs.InitDBConfig(&querypb.Target{}, dbconfigs.New(fakesqldb.New(t).ConnParams()))
diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go
index 233f8951227..72682a75e30 100644
--- a/go/vt/vttablet/tabletserver/tabletenv/config.go
+++ b/go/vt/vttablet/tabletserver/tabletenv/config.go
@@ -975,6 +975,9 @@ var defaultConfig = TabletConfig{
Mode: Disable,
HeartbeatInterval: 250 * time.Millisecond,
},
+ GracePeriods: GracePeriodsConfig{
+ Shutdown: 3 * time.Second,
+ },
HotRowProtection: HotRowProtectionConfig{
Mode: Disable,
// Default value is the same as TxPool.Size.
diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go
index 98d4cfceb21..ace094ac899 100644
--- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go
+++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go
@@ -123,7 +123,8 @@ func TestDefaultConfig(t *testing.T) {
want := `consolidator: enable
consolidatorStreamQuerySize: 2097152
consolidatorStreamTotalSize: 134217728
-gracePeriods: {}
+gracePeriods:
+ shutdownSeconds: 3s
healthcheck:
degradedThresholdSeconds: 30s
intervalSeconds: 20s
diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go
index dad637a6daf..8a1a45ca4a2 100644
--- a/go/vt/vttablet/tabletserver/tabletserver.go
+++ b/go/vt/vttablet/tabletserver/tabletserver.go
@@ -204,6 +204,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c
ddle: tsv.onlineDDLExecutor,
throttler: tsv.lagThrottler,
tableGC: tsv.tableGC,
+ rw: newRequestsWaiter(),
}
tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) })
diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go
index 9744b971946..df68c8b0a83 100644
--- a/go/vt/vttablet/tabletserver/tabletserver_test.go
+++ b/go/vt/vttablet/tabletserver/tabletserver_test.go
@@ -153,6 +153,10 @@ func TestTabletServerPrimaryToReplica(t *testing.T) {
defer cancel()
// Reuse code from tx_executor_test.
_, tsv, db := newTestTxExecutor(t, ctx)
+ // This is required because the test is verifying that we rollback transactions on changing serving type,
+ // but that only happens immediately if the shut down grace period is not specified.
+ tsv.te.shutdownGracePeriod = 0
+ tsv.sm.shutdownGracePeriod = 0
defer tsv.StopService()
defer db.Close()
target := querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}
@@ -180,7 +184,7 @@ func TestTabletServerPrimaryToReplica(t *testing.T) {
select {
case <-ch:
t.Fatal("ch should not fire")
- case <-time.After(10 * time.Millisecond):
+ case <-time.After(100 * time.Millisecond):
}
require.EqualValues(t, 1, tsv.te.txPool.scp.active.Size(), "tsv.te.txPool.scp.active.Size()")