Skip to content

Commit

Permalink
Fix PRS from being blocked because of misbehaving clients (#15339)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Feb 28, 2024
1 parent 236f84c commit 059e50d
Show file tree
Hide file tree
Showing 15 changed files with 253 additions and 45 deletions.
12 changes: 11 additions & 1 deletion changelog/20.0/20.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
### Table of Contents

- **[Major Changes](#major-changes)**
- **[Breaking changes](#breaking-changes)**
- [`shutdown_grace_period` Default Change](#shutdown-grace-period-default)
- **[Query Compatibility](#query-compatibility)**
- [Vindex Hints](#vindex-hints)
- [Update with Limit Support](#update-limit)
Expand All @@ -16,6 +18,15 @@

## <a id="major-changes"/>Major Changes

### <a id="breaking-changes"/>Breaking Changes

#### <a id="shutdown-grace-period-default"/>`shutdown_grace_period` Default Change

The `--shutdown_grace_period` flag, which was introduced in v2 with a default of `0 seconds`, has now been changed to default to `3 seconds`.
This makes reparenting in Vitess resilient to client errors, and prevents PlannedReparentShard from timing out.

In order to preserve the old behaviour, the users can set the flag back to `0 seconds` causing open transactions to never be shutdown, but in that case, they run the risk of PlannedReparentShard calls timing out.

### <a id="query-compatibility"/>Query Compatibility

#### <a id="vindex-hints"/> Vindex Hints
Expand Down Expand Up @@ -61,7 +72,6 @@ The `--pprof-http` flag, which was introduced in v19 with a default of `true`, h
This makes HTTP `pprof` endpoints now an *opt-in* feature, rather than opt-out.
To continue enabling these endpoints, explicitly set `--pprof-http` when starting up Vitess components.


## <a id="minor-changes"/>Minor Changes

### <a id="new-stats"/>New Stats
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ Flags:
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown.
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s)
--sql-max-length-errors int truncate queries in error logs to the given length (default unlimited)
--sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512)
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ Flags:
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown.
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s)
--sql-max-length-errors int truncate queries in error logs to the given length (default unlimited)
--sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512)
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
Expand Down
15 changes: 9 additions & 6 deletions go/test/endtoend/tabletgateway/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package healthcheck

import (
"flag"
"fmt"
"os"
"testing"

Expand All @@ -26,11 +27,12 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
keyspaceName = "commerce"
cell = "zone1"
sqlSchema = `create table product(
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
keyspaceName = "commerce"
vtgateGrpcAddress string
cell = "zone1"
sqlSchema = `create table product(
sku varbinary(128),
description varbinary(128),
price bigint,
Expand Down Expand Up @@ -64,7 +66,7 @@ func TestMain(m *testing.M) {

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, "localhost")
clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s"}
clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s", "--shutdown_grace_period", "3s"}
defer clusterInstance.Teardown()

// Start topo server
Expand Down Expand Up @@ -96,6 +98,7 @@ func TestMain(m *testing.M) {
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)
return m.Run()
}()
os.Exit(exitCode)
Expand Down
39 changes: 39 additions & 0 deletions go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -283,6 +284,44 @@ func TestReplicaTransactions(t *testing.T) {
assert.Equal(t, `[[INT64(1) VARCHAR("email1")] [INT64(2) VARCHAR("email2")]]`, fmt.Sprintf("%v", qr4.Rows), "we are not able to reconnect after restart")
}

// TestStreamingRPCStuck tests that StreamExecute calls don't get stuck on the vttablets if a client stop reading from a stream.
func TestStreamingRPCStuck(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
vtConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer vtConn.Close()

// We want the table to have enough rows such that a streaming call returns multiple packets.
// Therefore, we insert one row and keep doubling it.
utils.Exec(t, vtConn, "insert into customer(email) values('testemail')")
for i := 0; i < 15; i++ {
// Double the number of rows in customer table.
utils.Exec(t, vtConn, "insert into customer (email) select email from customer")
}

// Connect to vtgate and run a streaming query.
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "test_user", "")
require.NoError(t, err)
stream, err := vtgateConn.Session("", &querypb.ExecuteOptions{}).StreamExecute(ctx, "select * from customer", map[string]*querypb.BindVariable{})
require.NoError(t, err)

// We read packets until we see the first set of results. This ensures that the stream is working.
for {
res, err := stream.Recv()
require.NoError(t, err)
if res != nil && len(res.Rows) > 0 {
// breaking here stops reading from the stream.
break
}
}

// We simulate a misbehaving client that doesn't read from the stream anymore.
// This however shouldn't block PlannedReparentShard calls.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, "0", clusterInstance.Keyspaces[0].Shards[0].Vttablets[1].Alias)
require.NoError(t, err)
}

func getMapFromJSON(JSON map[string]any, key string) map[string]any {
result := make(map[string]any)
object := reflect.ValueOf(JSON[key])
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vtgate/transaction/restart/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func TestMain(m *testing.M) {
Name: keyspaceName,
SchemaSQL: schemaSQL,
}
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--shutdown_grace_period=0s",
)
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false)
if err != nil {
return 1
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction

// Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it
// to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional)
// weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go.
// weren't getting cleaned up during unserveCommon>terminateAllQueries in state_manager.go.
// This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange
// once their grace period is over.
qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn)
Expand Down
78 changes: 78 additions & 0 deletions go/vt/vttablet/tabletserver/requests_waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletserver

import "sync"

// requestsWaiter is used to wait for requests. It stores the count of the requests pending,
// and also the number of waiters currently waiting. It has a mutex as well to protects its fields.
type requestsWaiter struct {
mu sync.Mutex
wg sync.WaitGroup
// waitCounter is the number of goroutines that are waiting for wg to be empty.
// If this value is greater than zero, then we have to ensure that we don't Add to the requests
// to avoid any panics in the wait.
waitCounter int
// counter is the count of the number of outstanding requests.
counter int
}

// newRequestsWaiter creates a new requestsWaiter.
func newRequestsWaiter() *requestsWaiter {
return &requestsWaiter{}
}

// Add adds to the requestsWaiter.
func (r *requestsWaiter) Add(val int) {
r.mu.Lock()
defer r.mu.Unlock()
r.counter += val
r.wg.Add(val)
}

// Done subtracts 1 from the requestsWaiter.
func (r *requestsWaiter) Done() {
r.Add(-1)
}

// addToWaitCounter adds to the waitCounter while being protected by a mutex.
func (r *requestsWaiter) addToWaitCounter(val int) {
r.mu.Lock()
defer r.mu.Unlock()
r.waitCounter += val
}

// WaitToBeEmpty waits for requests to be empty. It also increments and decrements the waitCounter as required.
func (r *requestsWaiter) WaitToBeEmpty() {
r.addToWaitCounter(1)
r.wg.Wait()
r.addToWaitCounter(-1)
}

// GetWaiterCount gets the number of go routines currently waiting on the wait group.
func (r *requestsWaiter) GetWaiterCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.waitCounter
}

// GetOutstandingRequestsCount gets the number of requests outstanding.
func (r *requestsWaiter) GetOutstandingRequestsCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.counter
}
57 changes: 57 additions & 0 deletions go/vt/vttablet/tabletserver/requests_waiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tabletserver

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestRequestWaiter tests the functionality of request waiter.
func TestRequestWaiter(t *testing.T) {
rw := newRequestsWaiter()
require.EqualValues(t, 0, rw.GetWaiterCount())
require.EqualValues(t, 0, rw.GetOutstandingRequestsCount())

rw.Add(3)
require.EqualValues(t, 0, rw.GetWaiterCount())
require.EqualValues(t, 3, rw.GetOutstandingRequestsCount())

rw.Done()
require.EqualValues(t, 0, rw.GetWaiterCount())
require.EqualValues(t, 2, rw.GetOutstandingRequestsCount())

go func() {
rw.WaitToBeEmpty()
}()
go func() {
rw.WaitToBeEmpty()
}()

time.Sleep(100 * time.Millisecond)
require.EqualValues(t, 2, rw.GetWaiterCount())
require.EqualValues(t, 2, rw.GetOutstandingRequestsCount())

rw.Done()
rw.Done()

time.Sleep(100 * time.Millisecond)
require.EqualValues(t, 0, rw.GetWaiterCount())
require.EqualValues(t, 0, rw.GetOutstandingRequestsCount())
}
Loading

0 comments on commit 059e50d

Please sign in to comment.