Skip to content

Commit

Permalink
[release-18.0] Throttler: Use tmclient pool for CheckThrottler tablet…
Browse files Browse the repository at this point in the history
…manager RPC (#15087)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
shlomi-noach and mattlord authored Jan 30, 2024
1 parent f4d1487 commit 4896a26
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ Flags:
--stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Flags:
--tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Flags:
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttestserver.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Flags:
--tablet_hostname string The hostname to use for the tablet otherwise it will be derived from OS' hostname (default "localhost")
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App}) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
45 changes: 31 additions & 14 deletions go/vt/vttablet/grpctmclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
)

func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&concurrency, "tablet_manager_grpc_concurrency", concurrency, "concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,AllPrivs,App})")
fs.IntVar(&concurrency, "tablet_manager_grpc_concurrency", concurrency, "concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler)")
fs.StringVar(&cert, "tablet_manager_grpc_cert", cert, "the cert to use to connect")
fs.StringVar(&key, "tablet_manager_grpc_key", key, "the key to use to connect")
fs.StringVar(&ca, "tablet_manager_grpc_ca", ca, "the server ca to use to validate servers when connecting")
Expand Down Expand Up @@ -94,10 +94,9 @@ type tmc struct {

// grpcClient implements both dialer and poolDialer.
type grpcClient struct {
// This cache of connections is to maximize QPS for ExecuteFetch.
// Note we'll keep the clients open and close them upon Close() only.
// But that's OK because usually the tasks that use them are
// one-purpose only.
// This cache of connections is to maximize QPS for ExecuteFetchAs{Dba,App} and
// CheckThrottler. Note we'll keep the clients open and close them upon Close() only.
// But that's OK because usually the tasks that use them are one-purpose only.
// The map is protected by the mutex.
mu sync.Mutex
rpcClientMap map[string]chan *tmc
Expand All @@ -115,16 +114,17 @@ type poolDialer interface {
// Client implements tmclient.TabletManagerClient.
//
// Connections are produced by the dialer implementation, which is either the
// grpcClient implementation, which reuses connections only for ExecuteFetch and
// otherwise makes single-purpose connections that are closed after use.
// grpcClient implementation, which reuses connections only for ExecuteFetchAs{Dba,App}
// and CheckThrottler, otherwise making single-purpose connections that are closed
// after use.
//
// In order to more efficiently use the underlying tcp connections, you can
// instead use the cachedConnDialer implementation by specifying
//
// -tablet_manager_protocol "grpc-cached"
// --tablet_manager_protocol "grpc-cached"
//
// The cachedConnDialer keeps connections to up to -tablet_manager_grpc_connpool_size distinct
// tablets open at any given time, for faster per-RPC call time, and less
// The cachedConnDialer keeps connections to up to --tablet_manager_grpc_connpool_size
// distinct tablets open at any given time, for faster per-RPC call time, and less
// connection churn.
type Client struct {
dialer dialer
Expand Down Expand Up @@ -1002,12 +1002,29 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, req
}

// CheckThrottler is part of the tmclient.TabletManagerClient interface.
// It always tries to use a cached client via the dialer pool as this is
// called very frequently between tablets when the throttler is enabled in
// a keyspace and the overhead of creating a new gRPC connection/channel
// and dialing the other tablet every time is not practical.
func (client *Client) CheckThrottler(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CheckThrottlerRequest) (*tabletmanagerdatapb.CheckThrottlerResponse, error) {
c, closer, err := client.dialer.dial(ctx, tablet)
if err != nil {
return nil, err
var c tabletmanagerservicepb.TabletManagerClient
var err error
if poolDialer, ok := client.dialer.(poolDialer); ok {
c, err = poolDialer.dialPool(ctx, tablet)
if err != nil {
return nil, err
}
}
defer closer.Close()

if c == nil {
var closer io.Closer
c, closer, err = client.dialer.dial(ctx, tablet)
if err != nil {
return nil, err
}
defer closer.Close()
}

response, err := c.CheckThrottler(ctx, req)
if err != nil {
return nil, err
Expand Down

0 comments on commit 4896a26

Please sign in to comment.