From 1f1db62e662d86c831ff154b394a9f086550c281 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 18 Jun 2021 22:46:14 -0400 Subject: [PATCH 01/13] [grpctmclient] Rewrite to move dialing logic to an interface Signed-off-by: Andrew Mason --- go/vt/vttablet/grpctmclient/client.go | 283 +++++++++++++++----------- 1 file changed, 159 insertions(+), 124 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index ee7d7e7c004..7ca004f6a21 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -19,6 +19,7 @@ package grpctmclient import ( "flag" "fmt" + "io" "sync" "time" @@ -61,8 +62,8 @@ type tmc struct { client tabletmanagerservicepb.TabletManagerClient } -// Client implements tmclient.TabletManagerClient -type Client struct { +// grpcClient implements both dialer and poolDialer. +type grpcClient struct { // This cache of connections is to maximize QPS for ExecuteFetch. // Note we'll keep the clients open and close them upon Close() only. // But that's OK because usually the tasks that use them are @@ -72,13 +73,33 @@ type Client struct { rpcClientMap map[string]chan *tmc } +type dialer interface { + dial(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) + Close() +} + +type poolDialer interface { + dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) +} + +// Client implements tmclient.TabletManagerClient. +// +// Connections are produced by the dialer implementation, which is either the +// grpcClient implementation, which reuses connections only for ExecuteFetch and +// otherwise makes single-purpose connections that are closed after use. +type Client struct { + dialer dialer +} + // NewClient returns a new gRPC client. func NewClient() *Client { - return &Client{} + return &Client{ + dialer: &grpcClient{}, + } } // dial returns a client to use -func (client *Client) dial(tablet *topodatapb.Tablet) (*grpc.ClientConn, tabletmanagerservicepb.TabletManagerClient, error) { +func (client *grpcClient) dial(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"])) opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name) if err != nil { @@ -88,10 +109,11 @@ func (client *Client) dial(tablet *topodatapb.Tablet) (*grpc.ClientConn, tabletm if err != nil { return nil, nil, err } - return cc, tabletmanagerservicepb.NewTabletManagerClient(cc), nil + + return tabletmanagerservicepb.NewTabletManagerClient(cc), cc, nil } -func (client *Client) dialPool(tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) { +func (client *grpcClient) dialPool(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, error) { addr := netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"])) opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name) if err != nil { @@ -127,17 +149,30 @@ func (client *Client) dialPool(tablet *topodatapb.Tablet) (tabletmanagerservicep return result.client, nil } +// Close is part of the tmclient.TabletManagerClient interface. +func (client *grpcClient) Close() { + client.mu.Lock() + defer client.mu.Unlock() + for _, c := range client.rpcClientMap { + close(c) + for ch := range c { + ch.cc.Close() + } + } + client.rpcClientMap = nil +} + // // Various read-only methods // // Ping is part of the tmclient.TabletManagerClient interface. func (client *Client) Ping(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() result, err := c.Ping(ctx, &tabletmanagerdatapb.PingRequest{ Payload: "payload", }) @@ -152,11 +187,11 @@ func (client *Client) Ping(ctx context.Context, tablet *topodatapb.Tablet) error // Sleep is part of the tmclient.TabletManagerClient interface. func (client *Client) Sleep(ctx context.Context, tablet *topodatapb.Tablet, duration time.Duration) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.Sleep(ctx, &tabletmanagerdatapb.SleepRequest{ Duration: int64(duration), }) @@ -165,11 +200,11 @@ func (client *Client) Sleep(ctx context.Context, tablet *topodatapb.Tablet, dura // ExecuteHook is part of the tmclient.TabletManagerClient interface. func (client *Client) ExecuteHook(ctx context.Context, tablet *topodatapb.Tablet, hk *hook.Hook) (*hook.HookResult, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() hr, err := c.ExecuteHook(ctx, &tabletmanagerdatapb.ExecuteHookRequest{ Name: hk.Name, Parameters: hk.Parameters, @@ -187,11 +222,11 @@ func (client *Client) ExecuteHook(ctx context.Context, tablet *topodatapb.Tablet // GetSchema is part of the tmclient.TabletManagerClient interface. func (client *Client) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.GetSchema(ctx, &tabletmanagerdatapb.GetSchemaRequest{ Tables: tables, ExcludeTables: excludeTables, @@ -205,11 +240,11 @@ func (client *Client) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, // GetPermissions is part of the tmclient.TabletManagerClient interface. func (client *Client) GetPermissions(ctx context.Context, tablet *topodatapb.Tablet) (*tabletmanagerdatapb.Permissions, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.GetPermissions(ctx, &tabletmanagerdatapb.GetPermissionsRequest{}) if err != nil { return nil, err @@ -223,33 +258,33 @@ func (client *Client) GetPermissions(ctx context.Context, tablet *topodatapb.Tab // SetReadOnly is part of the tmclient.TabletManagerClient interface. func (client *Client) SetReadOnly(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.SetReadOnly(ctx, &tabletmanagerdatapb.SetReadOnlyRequest{}) return err } // SetReadWrite is part of the tmclient.TabletManagerClient interface. func (client *Client) SetReadWrite(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.SetReadWrite(ctx, &tabletmanagerdatapb.SetReadWriteRequest{}) return err } // ChangeType is part of the tmclient.TabletManagerClient interface. func (client *Client) ChangeType(ctx context.Context, tablet *topodatapb.Tablet, dbType topodatapb.TabletType) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.ChangeType(ctx, &tabletmanagerdatapb.ChangeTypeRequest{ TabletType: dbType, }) @@ -258,33 +293,33 @@ func (client *Client) ChangeType(ctx context.Context, tablet *topodatapb.Tablet, // RefreshState is part of the tmclient.TabletManagerClient interface. func (client *Client) RefreshState(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.RefreshState(ctx, &tabletmanagerdatapb.RefreshStateRequest{}) return err } // RunHealthCheck is part of the tmclient.TabletManagerClient interface. func (client *Client) RunHealthCheck(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.RunHealthCheck(ctx, &tabletmanagerdatapb.RunHealthCheckRequest{}) return err } // IgnoreHealthError is part of the tmclient.TabletManagerClient interface. func (client *Client) IgnoreHealthError(ctx context.Context, tablet *topodatapb.Tablet, pattern string) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.IgnoreHealthError(ctx, &tabletmanagerdatapb.IgnoreHealthErrorRequest{ Pattern: pattern, }) @@ -293,11 +328,11 @@ func (client *Client) IgnoreHealthError(ctx context.Context, tablet *topodatapb. // ReloadSchema is part of the tmclient.TabletManagerClient interface. func (client *Client) ReloadSchema(ctx context.Context, tablet *topodatapb.Tablet, waitPosition string) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.ReloadSchema(ctx, &tabletmanagerdatapb.ReloadSchemaRequest{ WaitPosition: waitPosition, }) @@ -306,11 +341,11 @@ func (client *Client) ReloadSchema(ctx context.Context, tablet *topodatapb.Table // PreflightSchema is part of the tmclient.TabletManagerClient interface. func (client *Client) PreflightSchema(ctx context.Context, tablet *topodatapb.Tablet, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.PreflightSchema(ctx, &tabletmanagerdatapb.PreflightSchemaRequest{ Changes: changes, @@ -324,11 +359,11 @@ func (client *Client) PreflightSchema(ctx context.Context, tablet *topodatapb.Ta // ApplySchema is part of the tmclient.TabletManagerClient interface. func (client *Client) ApplySchema(ctx context.Context, tablet *topodatapb.Tablet, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.ApplySchema(ctx, &tabletmanagerdatapb.ApplySchemaRequest{ Sql: change.SQL, Force: change.Force, @@ -347,11 +382,11 @@ func (client *Client) ApplySchema(ctx context.Context, tablet *topodatapb.Tablet // LockTables is part of the tmclient.TabletManagerClient interface. func (client *Client) LockTables(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.LockTables(ctx, &tabletmanagerdatapb.LockTablesRequest{}) return err @@ -359,11 +394,11 @@ func (client *Client) LockTables(ctx context.Context, tablet *topodatapb.Tablet) // UnlockTables is part of the tmclient.TabletManagerClient interface. func (client *Client) UnlockTables(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.UnlockTables(ctx, &tabletmanagerdatapb.UnlockTablesRequest{}) return err @@ -371,11 +406,11 @@ func (client *Client) UnlockTables(ctx context.Context, tablet *topodatapb.Table // ExecuteQuery is part of the tmclient.TabletManagerClient interface. func (client *Client) ExecuteQuery(ctx context.Context, tablet *topodatapb.Tablet, query []byte, maxrows int) (*querypb.QueryResult, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.ExecuteQuery(ctx, &tabletmanagerdatapb.ExecuteQueryRequest{ Query: query, @@ -393,17 +428,21 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb. var c tabletmanagerservicepb.TabletManagerClient var err error if usePool { - c, err = client.dialPool(tablet) - if err != nil { - return nil, err + if poolDialer, ok := client.dialer.(poolDialer); ok { + c, err = poolDialer.dialPool(ctx, tablet) + if err != nil { + return nil, err + } } - } else { - var cc *grpc.ClientConn - cc, c, err = client.dial(tablet) + } + + if !usePool || c == nil { + var closer io.Closer + c, closer, err = client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() } response, err := c.ExecuteFetchAsDba(ctx, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ @@ -421,14 +460,11 @@ func (client *Client) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb. // ExecuteFetchAsAllPrivs is part of the tmclient.TabletManagerClient interface. func (client *Client) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *topodatapb.Tablet, query []byte, maxRows int, reloadSchema bool) (*querypb.QueryResult, error) { - var c tabletmanagerservicepb.TabletManagerClient - var err error - var cc *grpc.ClientConn - cc, c, err = client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.ExecuteFetchAsAllPrivs(ctx, &tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest{ Query: query, @@ -447,17 +483,21 @@ func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb. var c tabletmanagerservicepb.TabletManagerClient var err error if usePool { - c, err = client.dialPool(tablet) - if err != nil { - return nil, err + if poolDialer, ok := client.dialer.(poolDialer); ok { + c, err = poolDialer.dialPool(ctx, tablet) + if err != nil { + return nil, err + } } - } else { - var cc *grpc.ClientConn - cc, c, err = client.dial(tablet) + } + + if !usePool || c == nil { + var closer io.Closer + c, closer, err = client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() } response, err := c.ExecuteFetchAsApp(ctx, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ @@ -476,11 +516,11 @@ func (client *Client) ExecuteFetchAsApp(ctx context.Context, tablet *topodatapb. // ReplicationStatus is part of the tmclient.TabletManagerClient interface. func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.Status, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.ReplicationStatus(ctx, &tabletmanagerdatapb.ReplicationStatusRequest{}) if err != nil { return nil, err @@ -490,11 +530,11 @@ func (client *Client) ReplicationStatus(ctx context.Context, tablet *topodatapb. // MasterStatus is part of the tmclient.TabletManagerClient interface. func (client *Client) MasterStatus(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.MasterStatus, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.MasterStatus(ctx, &tabletmanagerdatapb.MasterStatusRequest{}) if err != nil { return nil, err @@ -504,11 +544,11 @@ func (client *Client) MasterStatus(ctx context.Context, tablet *topodatapb.Table // MasterPosition is part of the tmclient.TabletManagerClient interface. func (client *Client) MasterPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return "", err } - defer cc.Close() + defer closer.Close() response, err := c.MasterPosition(ctx, &tabletmanagerdatapb.MasterPositionRequest{}) if err != nil { return "", err @@ -518,33 +558,34 @@ func (client *Client) MasterPosition(ctx context.Context, tablet *topodatapb.Tab // WaitForPosition is part of the tmclient.TabletManagerClient interface. func (client *Client) WaitForPosition(ctx context.Context, tablet *topodatapb.Tablet, pos string) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.WaitForPosition(ctx, &tabletmanagerdatapb.WaitForPositionRequest{Position: pos}) return err } // StopReplication is part of the tmclient.TabletManagerClient interface. func (client *Client) StopReplication(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.StopReplication(ctx, &tabletmanagerdatapb.StopReplicationRequest{}) return err } // StopReplicationMinimum is part of the tmclient.TabletManagerClient interface. func (client *Client) StopReplicationMinimum(ctx context.Context, tablet *topodatapb.Tablet, minPos string, waitTime time.Duration) (string, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return "", err } - defer cc.Close() + defer closer.Close() + response, err := c.StopReplicationMinimum(ctx, &tabletmanagerdatapb.StopReplicationMinimumRequest{ Position: minPos, WaitTimeout: int64(waitTime), @@ -557,22 +598,22 @@ func (client *Client) StopReplicationMinimum(ctx context.Context, tablet *topoda // StartReplication is part of the tmclient.TabletManagerClient interface. func (client *Client) StartReplication(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.StartReplication(ctx, &tabletmanagerdatapb.StartReplicationRequest{}) return err } // StartReplicationUntilAfter is part of the tmclient.TabletManagerClient interface. func (client *Client) StartReplicationUntilAfter(ctx context.Context, tablet *topodatapb.Tablet, position string, waitTime time.Duration) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.StartReplicationUntilAfter(ctx, &tabletmanagerdatapb.StartReplicationUntilAfterRequest{ Position: position, WaitTimeout: int64(waitTime), @@ -582,11 +623,11 @@ func (client *Client) StartReplicationUntilAfter(ctx context.Context, tablet *to // GetReplicas is part of the tmclient.TabletManagerClient interface. func (client *Client) GetReplicas(ctx context.Context, tablet *topodatapb.Tablet) ([]string, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.GetReplicas(ctx, &tabletmanagerdatapb.GetReplicasRequest{}) if err != nil { return nil, err @@ -596,11 +637,11 @@ func (client *Client) GetReplicas(ctx context.Context, tablet *topodatapb.Tablet // VExec is part of the tmclient.TabletManagerClient interface. func (client *Client) VExec(ctx context.Context, tablet *topodatapb.Tablet, query, workflow, keyspace string) (*querypb.QueryResult, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.VExec(ctx, &tabletmanagerdatapb.VExecRequest{Query: query, Workflow: workflow, Keyspace: keyspace}) if err != nil { return nil, err @@ -610,11 +651,11 @@ func (client *Client) VExec(ctx context.Context, tablet *topodatapb.Tablet, quer // VReplicationExec is part of the tmclient.TabletManagerClient interface. func (client *Client) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.VReplicationExec(ctx, &tabletmanagerdatapb.VReplicationExecRequest{Query: query}) if err != nil { return nil, err @@ -624,11 +665,11 @@ func (client *Client) VReplicationExec(ctx context.Context, tablet *topodatapb.T // VReplicationWaitForPos is part of the tmclient.TabletManagerClient interface. func (client *Client) VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int, pos string) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() if _, err = c.VReplicationWaitForPos(ctx, &tabletmanagerdatapb.VReplicationWaitForPosRequest{Id: int64(id), Position: pos}); err != nil { return err } @@ -641,22 +682,23 @@ func (client *Client) VReplicationWaitForPos(ctx context.Context, tablet *topoda // ResetReplication is part of the tmclient.TabletManagerClient interface. func (client *Client) ResetReplication(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.ResetReplication(ctx, &tabletmanagerdatapb.ResetReplicationRequest{}) return err } // InitMaster is part of the tmclient.TabletManagerClient interface. func (client *Client) InitMaster(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return "", err } - defer cc.Close() + defer closer.Close() + response, err := c.InitMaster(ctx, &tabletmanagerdatapb.InitMasterRequest{}) if err != nil { return "", err @@ -666,11 +708,11 @@ func (client *Client) InitMaster(ctx context.Context, tablet *topodatapb.Tablet) // PopulateReparentJournal is part of the tmclient.TabletManagerClient interface. func (client *Client) PopulateReparentJournal(ctx context.Context, tablet *topodatapb.Tablet, timeCreatedNS int64, actionName string, masterAlias *topodatapb.TabletAlias, pos string) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.PopulateReparentJournal(ctx, &tabletmanagerdatapb.PopulateReparentJournalRequest{ TimeCreatedNs: timeCreatedNS, ActionName: actionName, @@ -682,11 +724,11 @@ func (client *Client) PopulateReparentJournal(ctx context.Context, tablet *topod // InitReplica is part of the tmclient.TabletManagerClient interface. func (client *Client) InitReplica(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, replicationPosition string, timeCreatedNS int64) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.InitReplica(ctx, &tabletmanagerdatapb.InitReplicaRequest{ Parent: parent, ReplicationPosition: replicationPosition, @@ -697,11 +739,11 @@ func (client *Client) InitReplica(ctx context.Context, tablet *topodatapb.Tablet // DemoteMaster is part of the tmclient.TabletManagerClient interface. func (client *Client) DemoteMaster(ctx context.Context, tablet *topodatapb.Tablet) (*replicationdatapb.MasterStatus, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } - defer cc.Close() + defer closer.Close() response, err := c.DemoteMaster(ctx, &tabletmanagerdatapb.DemoteMasterRequest{}) if err != nil { return nil, err @@ -719,33 +761,33 @@ func (client *Client) DemoteMaster(ctx context.Context, tablet *topodatapb.Table // UndoDemoteMaster is part of the tmclient.TabletManagerClient interface. func (client *Client) UndoDemoteMaster(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.UndoDemoteMaster(ctx, &tabletmanagerdatapb.UndoDemoteMasterRequest{}) return err } // ReplicaWasPromoted is part of the tmclient.TabletManagerClient interface. func (client *Client) ReplicaWasPromoted(ctx context.Context, tablet *topodatapb.Tablet) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.ReplicaWasPromoted(ctx, &tabletmanagerdatapb.ReplicaWasPromotedRequest{}) return err } // SetMaster is part of the tmclient.TabletManagerClient interface. func (client *Client) SetMaster(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias, timeCreatedNS int64, waitPosition string, forceStartReplication bool) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.SetMaster(ctx, &tabletmanagerdatapb.SetMasterRequest{ Parent: parent, TimeCreatedNs: timeCreatedNS, @@ -757,11 +799,11 @@ func (client *Client) SetMaster(ctx context.Context, tablet *topodatapb.Tablet, // ReplicaWasRestarted is part of the tmclient.TabletManagerClient interface. func (client *Client) ReplicaWasRestarted(ctx context.Context, tablet *topodatapb.Tablet, parent *topodatapb.TabletAlias) error { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return err } - defer cc.Close() + defer closer.Close() _, err = c.ReplicaWasRestarted(ctx, &tabletmanagerdatapb.ReplicaWasRestartedRequest{ Parent: parent, }) @@ -770,11 +812,11 @@ func (client *Client) ReplicaWasRestarted(ctx context.Context, tablet *topodatap // StopReplicationAndGetStatus is part of the tmclient.TabletManagerClient interface. func (client *Client) StopReplicationAndGetStatus(ctx context.Context, tablet *topodatapb.Tablet, stopReplicationMode replicationdatapb.StopReplicationMode) (hybridStatus *replicationdatapb.Status, status *replicationdatapb.StopReplicationStatus, err error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, nil, err } - defer cc.Close() + defer closer.Close() response, err := c.StopReplicationAndGetStatus(ctx, &tabletmanagerdatapb.StopReplicationAndGetStatusRequest{ StopReplicationMode: stopReplicationMode, }) @@ -789,11 +831,12 @@ func (client *Client) StopReplicationAndGetStatus(ctx context.Context, tablet *t // PromoteReplica is part of the tmclient.TabletManagerClient interface. func (client *Client) PromoteReplica(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return "", err } - defer cc.Close() + defer closer.Close() + response, err := c.PromoteReplica(ctx, &tabletmanagerdatapb.PromoteReplicaRequest{}) if err != nil { return "", err @@ -806,13 +849,13 @@ func (client *Client) PromoteReplica(ctx context.Context, tablet *topodatapb.Tab // type backupStreamAdapter struct { stream tabletmanagerservicepb.TabletManager_BackupClient - cc *grpc.ClientConn + closer io.Closer } func (e *backupStreamAdapter) Recv() (*logutilpb.Event, error) { br, err := e.stream.Recv() if err != nil { - e.cc.Close() + e.closer.Close() return nil, err } return br.Event, nil @@ -820,7 +863,7 @@ func (e *backupStreamAdapter) Recv() (*logutilpb.Event, error) { // Backup is part of the tmclient.TabletManagerClient interface. func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, concurrency int, allowMaster bool) (logutil.EventStream, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } @@ -830,24 +873,24 @@ func (client *Client) Backup(ctx context.Context, tablet *topodatapb.Tablet, con AllowMaster: bool(allowMaster), }) if err != nil { - cc.Close() + closer.Close() return nil, err } return &backupStreamAdapter{ stream: stream, - cc: cc, + closer: closer, }, nil } type restoreFromBackupStreamAdapter struct { stream tabletmanagerservicepb.TabletManager_RestoreFromBackupClient - cc *grpc.ClientConn + closer io.Closer } func (e *restoreFromBackupStreamAdapter) Recv() (*logutilpb.Event, error) { br, err := e.stream.Recv() if err != nil { - e.cc.Close() + e.closer.Close() return nil, err } return br.Event, nil @@ -855,31 +898,23 @@ func (e *restoreFromBackupStreamAdapter) Recv() (*logutilpb.Event, error) { // RestoreFromBackup is part of the tmclient.TabletManagerClient interface. func (client *Client) RestoreFromBackup(ctx context.Context, tablet *topodatapb.Tablet) (logutil.EventStream, error) { - cc, c, err := client.dial(tablet) + c, closer, err := client.dialer.dial(ctx, tablet) if err != nil { return nil, err } stream, err := c.RestoreFromBackup(ctx, &tabletmanagerdatapb.RestoreFromBackupRequest{}) if err != nil { - cc.Close() + closer.Close() return nil, err } return &restoreFromBackupStreamAdapter{ stream: stream, - cc: cc, + closer: closer, }, nil } // Close is part of the tmclient.TabletManagerClient interface. func (client *Client) Close() { - client.mu.Lock() - defer client.mu.Unlock() - for _, c := range client.rpcClientMap { - close(c) - for ch := range c { - ch.cc.Close() - } - } - client.rpcClientMap = nil + client.dialer.Close() } From 9931b93c3a3a25e3f62f2ece4dace50fb1287db1 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sat, 19 Jun 2021 13:32:09 -0400 Subject: [PATCH 02/13] [tmserver] Restructure tests - Move grpctmserver tests to a testpackage to allow grpctmclient tests to import grpctmserver (this prevents an import cycle) - Change fakeRPCTM to take a testing.TB which both benchmark and test structs satisfy Signed-off-by: Andrew Mason --- go/vt/vttablet/grpctmserver/server.go | 2 +- go/vt/vttablet/grpctmserver/server_test.go | 6 +++--- go/vt/vttablet/tmrpctest/test_tm_rpc.go | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go/vt/vttablet/grpctmserver/server.go b/go/vt/vttablet/grpctmserver/server.go index b76cc1e2365..9ba33a20893 100644 --- a/go/vt/vttablet/grpctmserver/server.go +++ b/go/vt/vttablet/grpctmserver/server.go @@ -502,6 +502,6 @@ func init() { } // RegisterForTest will register the RPC, to be used by test instances only -func RegisterForTest(s *grpc.Server, tm *tabletmanager.TabletManager) { +func RegisterForTest(s *grpc.Server, tm tabletmanager.RPCTM) { tabletmanagerservicepb.RegisterTabletManagerServer(s, &server{tm: tm}) } diff --git a/go/vt/vttablet/grpctmserver/server_test.go b/go/vt/vttablet/grpctmserver/server_test.go index e87022764a1..e8be1088604 100644 --- a/go/vt/vttablet/grpctmserver/server_test.go +++ b/go/vt/vttablet/grpctmserver/server_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package grpctmserver +package grpctmserver_test import ( "net" @@ -23,9 +23,9 @@ import ( "google.golang.org/grpc" "vitess.io/vitess/go/vt/vttablet/grpctmclient" + "vitess.io/vitess/go/vt/vttablet/grpctmserver" "vitess.io/vitess/go/vt/vttablet/tmrpctest" - tabletmanagerservicepb "vitess.io/vitess/go/vt/proto/tabletmanagerservice" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -43,7 +43,7 @@ func TestGRPCTMServer(t *testing.T) { // Create a gRPC server and listen on the port. s := grpc.NewServer() fakeTM := tmrpctest.NewFakeRPCTM(t) - tabletmanagerservicepb.RegisterTabletManagerServer(s, &server{tm: fakeTM}) + grpctmserver.RegisterForTest(s, fakeTM) go s.Serve(listener) // Create a gRPC client to talk to the fake tablet. diff --git a/go/vt/vttablet/tmrpctest/test_tm_rpc.go b/go/vt/vttablet/tmrpctest/test_tm_rpc.go index 89fb39de3b6..99609d4488e 100644 --- a/go/vt/vttablet/tmrpctest/test_tm_rpc.go +++ b/go/vt/vttablet/tmrpctest/test_tm_rpc.go @@ -45,7 +45,7 @@ import ( // fakeRPCTM implements tabletmanager.RPCTM and fills in all // possible values in all APIs type fakeRPCTM struct { - t *testing.T + t testing.TB panics bool // slow if true will let Ping() sleep and effectively not respond to an RPC. slow bool @@ -68,7 +68,7 @@ func (fra *fakeRPCTM) setSlow(slow bool) { } // NewFakeRPCTM returns a fake tabletmanager.RPCTM that's just a mirror. -func NewFakeRPCTM(t *testing.T) tabletmanager.RPCTM { +func NewFakeRPCTM(t testing.TB) tabletmanager.RPCTM { return &fakeRPCTM{ t: t, } @@ -83,7 +83,7 @@ func NewFakeRPCTM(t *testing.T) tabletmanager.RPCTM { var protoMessage = reflect.TypeOf((*proto.Message)(nil)).Elem() -func compare(t *testing.T, name string, got, want interface{}) { +func compare(t testing.TB, name string, got, want interface{}) { t.Helper() typ := reflect.TypeOf(got) if reflect.TypeOf(got) != reflect.TypeOf(want) { @@ -114,7 +114,7 @@ fail: t.Errorf("Unexpected %v:\ngot %#v\nwant %#v", name, got, want) } -func compareBool(t *testing.T, name string, got bool) { +func compareBool(t testing.TB, name string, got bool) { t.Helper() if !got { t.Errorf("Unexpected %v: got false expected true", name) @@ -200,7 +200,7 @@ func tmRPCTestPingPanic(ctx context.Context, t *testing.T, client tmclient.Table // tmRPCTestDialExpiredContext verifies that // the context returns the right DeadlineExceeded Err() for // RPCs failed due to an expired context before .Dial(). -func tmRPCTestDialExpiredContext(ctx context.Context, t *testing.T, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { +func tmRPCTestDialExpiredContext(ctx context.Context, t testing.TB, client tmclient.TabletManagerClient, tablet *topodatapb.Tablet) { // Using a timeout of 0 here such that .Dial() will fail immediately. expiredCtx, cancel := context.WithTimeout(ctx, 0) defer cancel() From c6938ca93167db1e7f45d04b28bfad84994486fb Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Fri, 18 Jun 2021 23:17:18 -0400 Subject: [PATCH 03/13] [grpctmclient] Add a cachedClient dialer implementation Signed-off-by: Andrew Mason --- go/vt/vttablet/grpctmclient/cached_client.go | 362 ++++++++++++++++++ .../grpctmclient/cached_client_test.go | 303 +++++++++++++++ 2 files changed, 665 insertions(+) create mode 100644 go/vt/vttablet/grpctmclient/cached_client.go create mode 100644 go/vt/vttablet/grpctmclient/cached_client_test.go diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go new file mode 100644 index 00000000000..3074a54d4db --- /dev/null +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -0,0 +1,362 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpctmclient + +import ( + "context" + "flag" + "io" + "sync" + "time" + + "google.golang.org/grpc" + + "vitess.io/vitess/go/netutil" + "vitess.io/vitess/go/sync2" + "vitess.io/vitess/go/timer" + "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + tabletmanagerservicepb "vitess.io/vitess/go/vt/proto/tabletmanagerservice" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +var ( + defaultPoolCapacity = flag.Int("tablet_manager_grpc_connpool_size", 10, "number of tablets to keep tmclient connections open to") + defaultPoolIdleTimeout = flag.Duration("tablet_manager_grpc_connpool_idle_timeout", time.Second*30, "how long to leave a connection in the tmclient connpool. acquiring a connection resets this period for that connection") + defaultPoolWaitTimeout = flag.Duration("tablet_manager_grpc_connpool_wait_timeout", time.Millisecond*50, "how long to wait for a connection from the tmclient connpool") + defaultPoolSweepInterval = flag.Duration("tablet_manager_grpc_connpool_sweep_interval", time.Second*30, "how often to clean up and close unused tmclient connections that exceed the idle timeout") +) + +func init() { + tmclient.RegisterTabletManagerClientFactory("grpc-cached", func() tmclient.TabletManagerClient { + return NewCachedClient(*defaultPoolCapacity, *defaultPoolIdleTimeout, *defaultPoolWaitTimeout, *defaultPoolSweepInterval) + }) +} + +type pooledTMC struct { + tabletmanagerservicepb.TabletManagerClient + cc *grpc.ClientConn + + m sync.RWMutex // protects lastAccessTime and refs + lastAccessTime time.Time + refs int +} + +// newPooledConn returns a pooledTMC dialed to the given address, and with the +// equivalent of having called aquire() on it exactly once. +func newPooledConn(addr string) (*pooledTMC, error) { + opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name) + if err != nil { + return nil, err + } + + cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) + if err != nil { + return nil, err + } + + return &pooledTMC{ + TabletManagerClient: tabletmanagerservicepb.NewTabletManagerClient(cc), + cc: cc, + lastAccessTime: time.Now(), + refs: 1, + }, nil +} + +func (tmc *pooledTMC) acquire() { + tmc.m.Lock() + defer tmc.m.Unlock() + + tmc.refs++ + tmc.lastAccessTime = time.Now() +} + +func (tmc *pooledTMC) release() { + tmc.m.Lock() + defer tmc.m.Unlock() + + tmc.refs-- + if tmc.refs < 0 { + panic("release() called on unacquired pooled tabletmanager conn") + } +} + +// Close implements io.Closer for a pooledTMC. It is a wrapper around release() +// which never returns an error, but will panic if called on a pooledTMC that +// has not been acquire()'d. +func (tmc *pooledTMC) Close() error { + tmc.release() + return nil +} + +type cachedClient struct { + capacity int + idleTimeout time.Duration + waitTimeout time.Duration + + // sema gates the addition of new connections to the cache + sema *sync2.Semaphore + + m sync.RWMutex // protects conns map + conns map[string]*pooledTMC + + janitor *janitor +} + +// NewCachedClient returns a Client using the cachedClient dialer implementation. +// Because all connections are cached/pooled, it does not implement poolDialer, +// and grpctmclient.Client will use pooled connections for all RPCs. +func NewCachedClient(capacity int, idleTimeout time.Duration, waitTimeout time.Duration, sweepInterval time.Duration) *Client { + cc := &cachedClient{ + capacity: capacity, + idleTimeout: idleTimeout, + waitTimeout: waitTimeout, + sema: sync2.NewSemaphore(capacity, waitTimeout), + conns: make(map[string]*pooledTMC, capacity), + janitor: &janitor{ + ch: make(chan *struct{}, 10), // TODO: flag + timer: timer.NewTimer(sweepInterval), + }, + } + + cc.janitor.client = cc + go cc.janitor.run() + + return &Client{ + dialer: cc, + } +} + +func (client *cachedClient) dial(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { + addr := getTabletAddr(tablet) + + client.m.RLock() + if conn, ok := client.conns[addr]; ok { + // Fast path, we have a conn for this addr in the cache. Mark it as + // acquired and return it. + defer client.m.RUnlock() + conn.acquire() + + return conn, conn, nil + } + client.m.RUnlock() + + // Slow path, we're going to see if there's a free slot. If so, we'll claim + // it and dial a new conn. If not, we're going to have to wait or timeout. + // We don't hold the lock while we're polling. + ctx, cancel := context.WithTimeout(ctx, client.waitTimeout) + defer cancel() + + dial := func(addr string) (conn *pooledTMC, closer io.Closer, err error) { + client.m.Lock() + defer client.m.Unlock() + + defer func() { + // If we failed to dial a new conn for any reason, release our spot + // in the sema so another dial can take its place. + if err != nil { + client.sema.Release() + } + }() + + select { + case <-ctx.Done(): // We timed out waiting for the write lock, bail. + return nil, nil, ctx.Err() // TODO: wrap + default: + } + + conn, err = newPooledConn(addr) + if err != nil { + return nil, nil, err + } + + client.conns[addr] = conn + return conn, conn, nil + } + + if client.sema.TryAcquire() { + return dial(addr) + } + + select { + // Non-blocking signal to the janitor that it should consider sweeping soon. + case client.janitor.ch <- sentinel: + default: + } + + if !client.sema.AcquireContext(ctx) { + return nil, nil, ctx.Err() + } + + return dial(addr) +} + +func (client *cachedClient) Close() { + client.m.Lock() + defer client.m.Unlock() + + close(client.janitor.ch) + client.sweepFnLocked(func(conn *pooledTMC) (bool, bool) { + return true, false + }) +} + +func (client *cachedClient) sweep2(f func(conn *pooledTMC) (bool, bool)) { + client.m.RLock() + + var toFree []string + for key, conn := range client.conns { + conn.m.RLock() + shouldFree, stopSweep := f(conn) + conn.m.RUnlock() + + if shouldFree { + toFree = append(toFree, key) + } + + if stopSweep { + break + } + } + + client.m.RUnlock() + + if len(toFree) > 0 { + client.m.Lock() + defer client.m.Unlock() + + for _, key := range toFree { + conn, ok := client.conns[key] + if !ok { + continue + } + + conn.m.Lock() + // check the condition again, things may have changed since we + // transitioned from the read lock to the write lock + shouldFree, _ := f(conn) + if !shouldFree { + conn.m.Unlock() + continue + } + + conn.cc.Close() + conn.m.Unlock() + delete(client.conns, key) + client.sema.Release() + } + } +} + +func (client *cachedClient) sweep() { + now := time.Now() + client.sweep2(func(conn *pooledTMC) (bool, bool) { + return conn.refs == 0 && conn.lastAccessTime.Add(client.idleTimeout).Before(now), false + }) +} + +func (client *cachedClient) sweepFnLocked(f func(conn *pooledTMC) (shouldFree bool, stopSweep bool)) { + for key, conn := range client.conns { + conn.m.Lock() + shouldFree, stopSweep := f(conn) + if !shouldFree { + conn.m.Unlock() + if stopSweep { + return + } + + continue + } + + conn.cc.Close() + delete(client.conns, key) + client.sema.Release() + conn.m.Unlock() + + if stopSweep { + return + } + } +} + +var sentinel = &struct{}{} + +type janitor struct { + ch chan *struct{} + client *cachedClient + timer *timer.Timer + + m sync.Mutex + sweeping bool + lastSweep time.Time +} + +func (j *janitor) run() { + j.timer.Start(j.sweep) + defer j.timer.Stop() + + for s := range j.ch { + if s == nil { + break + } + + scan := true + t := time.NewTimer(time.Millisecond * 50) // TODO: flag + for scan { + select { + case <-t.C: + scan = false + case s := <-j.ch: + if s == nil { + scan = false + } + default: + scan = false + } + } + + t.Stop() + j.sweep() + } +} + +func (j *janitor) sweep() { + j.m.Lock() + if j.sweeping { + j.m.Unlock() + return + } + + if j.lastSweep.Add(time.Millisecond * 10 /* TODO: flag */).After(time.Now()) { + j.m.Unlock() + return + } + + j.sweeping = true + j.m.Unlock() + + j.client.sweep() + j.m.Lock() + j.sweeping = false + j.lastSweep = time.Now() + j.m.Unlock() +} + +func getTabletAddr(tablet *topodatapb.Tablet) string { + return netutil.JoinHostPort(tablet.Hostname, int32(tablet.PortMap["grpc"])) +} diff --git a/go/vt/vttablet/grpctmclient/cached_client_test.go b/go/vt/vttablet/grpctmclient/cached_client_test.go new file mode 100644 index 00000000000..c7d5c9b2af0 --- /dev/null +++ b/go/vt/vttablet/grpctmclient/cached_client_test.go @@ -0,0 +1,303 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grpctmclient + +import ( + "context" + "fmt" + "math/rand" + "net" + "os" + "runtime" + "runtime/pprof" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + "google.golang.org/grpc" + + "vitess.io/vitess/go/sync2" + "vitess.io/vitess/go/vt/vttablet/grpctmserver" + "vitess.io/vitess/go/vt/vttablet/tabletmanager" + "vitess.io/vitess/go/vt/vttablet/tmrpctest" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +func grpcTestServer(t testing.TB, tm tabletmanager.RPCTM) (*net.TCPAddr, func()) { + t.Helper() + + lis, err := nettest.NewLocalListener("tcp") + if err != nil { + t.Fatalf("Cannot listen: %v", err) + } + + s := grpc.NewServer() + grpctmserver.RegisterForTest(s, tm) + go s.Serve(lis) + + var shutdownOnce sync.Once + + return lis.Addr().(*net.TCPAddr), func() { + shutdownOnce.Do(func() { + s.Stop() + lis.Close() + }) + } +} + +func TestPooledTMC(t *testing.T) { + tmserv := tmrpctest.NewFakeRPCTM(t) + addr, shutdown := grpcTestServer(t, tmserv) + defer shutdown() + + start := time.Now() + tmc, err := newPooledConn(addr.String()) + require.NoError(t, err) + assert.Equal(t, tmc.refs, 1, "TODO") + assert.True(t, start.Before(tmc.lastAccessTime), "lastAccessTime is not after start; should have %v < %v", start, tmc.lastAccessTime) + + checkpoint := tmc.lastAccessTime + tmc.acquire() + assert.Equal(t, tmc.refs, 2, "TODO") + assert.True(t, checkpoint.Before(tmc.lastAccessTime), "lastAccessTime is not after checkpoint; should have %v < %v", checkpoint, tmc.lastAccessTime) + + checkpoint = tmc.lastAccessTime + tmc.release() + tmc.release() + assert.Equal(t, tmc.refs, 0, "TODO") + assert.True(t, checkpoint.Equal(tmc.lastAccessTime), "releasing pooledTMC should not change access time; should have %v = %v", checkpoint, tmc.lastAccessTime) + + t.Run("release panic", func(t *testing.T) { + defer func() { + err := recover() + assert.NotNil(t, err, "release on unacquired pooledTMC should panic") + }() + + tmc.release() + }) +} + +func BenchmarkCachedClient(b *testing.B) { + tmserv := tmrpctest.NewFakeRPCTM(b) + tablets := make([]*topodatapb.Tablet, 4) + for i := 0; i < len(tablets); i++ { + addr, shutdown := grpcTestServer(b, tmserv) + defer shutdown() + + tablets[i] = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "test", + Uid: uint32(addr.Port), + }, + Hostname: addr.IP.String(), + PortMap: map[string]int32{ + "grpc": int32(addr.Port), + }, + } + } + + b.ResetTimer() + client := NewCachedClient(5, time.Second*30, time.Millisecond*50, time.Second*30) + defer client.Close() + + for i := 0; i < b.N; i++ { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + x := rand.Intn(len(tablets)) + err := client.Ping(ctx, tablets[x]) + if err != nil { + b.Errorf("error pinging tablet %v: %w", tablets[x].Hostname, err) + } + + cancel() + } +} + +func TestCachedClient(t *testing.T) { + t.Parallel() + + tmserv := tmrpctest.NewFakeRPCTM(t) + tablets := make([]*topodatapb.Tablet, 4) + for i := 0; i < len(tablets); i++ { + addr, shutdown := grpcTestServer(t, tmserv) + defer shutdown() + + tablets[i] = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "test", + Uid: uint32(addr.Port), + }, + Hostname: addr.IP.String(), + PortMap: map[string]int32{ + "grpc": int32(addr.Port), + }, + } + } + + client := NewCachedClient(5, time.Second*30, time.Millisecond*50, time.Second*30) + defer client.Close() + + dialAttempts := sync2.NewAtomicInt64(0) + dialErrors := sync2.NewAtomicInt64(0) + testCtx, testCancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + attempts := 0 + + for { + select { + case <-testCtx.Done(): + dialAttempts.Add(int64(attempts)) + return + default: + attempts++ + + tablet := tablets[rand.Intn(len(tablets))] + _, closer, err := client.dialer.dial(context.Background(), tablet) + if err != nil { + dialErrors.Add(1) + continue + } + + closer.Close() + } + } + }() + } + + time.Sleep(time.Second * 35) + testCancel() + wg.Wait() + + attempts, errors := dialAttempts.Get(), dialErrors.Get() + assert.Less(t, float64(errors)/float64(attempts), 0.001, "fewer than 0.1% of dial attempts should fail") +} + +func TestCachedClientMultipleSweeps(t *testing.T) { + t.Parallel() + + file, err := os.Create("profile.out") + require.NoError(t, err) + defer file.Close() + if err := pprof.StartCPUProfile(file); err != nil { + t.Errorf("failed to start cpu profile: %v", err) + return + } + defer pprof.StopCPUProfile() + + testCtx, testCancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + procs := 0 + + wg.Add(1) + go func() { + defer wg.Done() + procs = runtime.NumGoroutine() + + for { + select { + case <-testCtx.Done(): + return + case <-time.After(time.Millisecond * 100): + newProcs := runtime.NumGoroutine() + if newProcs > procs { + procs = newProcs + } + } + } + }() + + numTablets := 100 + numGoroutines := 8 + + tmserv := tmrpctest.NewFakeRPCTM(t) + tablets := make([]*topodatapb.Tablet, numTablets) + for i := 0; i < len(tablets); i++ { + addr, shutdown := grpcTestServer(t, tmserv) + defer shutdown() + + tablets[i] = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "test", + Uid: uint32(addr.Port), + }, + Hostname: addr.IP.String(), + PortMap: map[string]int32{ + "grpc": int32(addr.Port), + }, + } + } + + // make sure we can dial every tablet + for _, tablet := range tablets { + err := NewClient().Ping(context.Background(), tablet) + require.NoError(t, err, "failed to dial tablet %s", tablet.Hostname) + } + + poolSize := int(float64(numTablets) * 0.8) + client := NewCachedClient(poolSize, time.Second*10, time.Millisecond*100, time.Second*30) + // client := NewPooledDialer() + defer client.Close() + + dialAttempts := sync2.NewAtomicInt64(0) + dialErrors := sync2.NewAtomicInt64(0) + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + attempts := 0 + jitter := time.Second * 0 + + for { + select { + case <-testCtx.Done(): + dialAttempts.Add(int64(attempts)) + return + case <-time.After(jitter): + jitter = time.Millisecond * (time.Duration(rand.Intn(51) + 100)) + attempts++ + + tablet := tablets[rand.Intn(len(tablets))] + _, closer, err := client.dialer.dial(context.Background(), tablet) + if err != nil { + dialErrors.Add(1) + continue + } + + closer.Close() + } + } + }() + } + + time.Sleep(time.Minute) + testCancel() + wg.Wait() + + attempts, errors := dialAttempts.Get(), dialErrors.Get() + assert.Less(t, float64(errors)/float64(attempts), 0.001, fmt.Sprintf("fewer than 0.1%% of dial attempts should fail (attempts = %d, errors = %d, max running procs = %d)", attempts, errors, procs)) +} From 0533f07d41da4945b97aca1e04163aec2e226231 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Mon, 21 Jun 2021 13:55:32 -0400 Subject: [PATCH 04/13] Add priority queue based dialer cache impl Two bugfixes: - one to prevent leaking open connections (by checking the map again after getting the write lock) - one to prevent leaving around closed connections, resulting in errors on later uses (by deleting the freed conn's addr and not the addr we're attempting to dial) Refactor to not duplicate dialing/queue management I don't want mistaken copy-paste to result in a bug between sections that should be otherwise identical Add one more missing "another goroutine dialed" check Add some stats to the cached conn dialer Remove everything from the old, slower cache implementation, pqueue is the way to go lots and lots and lots and lots of comments Signed-off-by: Andrew Mason --- go/vt/vttablet/grpctmclient/cached_client.go | 482 +++++++++--------- .../grpctmclient/cached_client_test.go | 150 ++---- go/vt/vttablet/grpctmclient/client.go | 7 + 3 files changed, 271 insertions(+), 368 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go index 3074a54d4db..4cd2232c39e 100644 --- a/go/vt/vttablet/grpctmclient/cached_client.go +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -17,6 +17,7 @@ limitations under the License. package grpctmclient import ( + "container/heap" "context" "flag" "io" @@ -26,8 +27,8 @@ import ( "google.golang.org/grpc" "vitess.io/vitess/go/netutil" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/sync2" - "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/vttablet/tmclient" @@ -36,325 +37,302 @@ import ( ) var ( - defaultPoolCapacity = flag.Int("tablet_manager_grpc_connpool_size", 10, "number of tablets to keep tmclient connections open to") - defaultPoolIdleTimeout = flag.Duration("tablet_manager_grpc_connpool_idle_timeout", time.Second*30, "how long to leave a connection in the tmclient connpool. acquiring a connection resets this period for that connection") - defaultPoolWaitTimeout = flag.Duration("tablet_manager_grpc_connpool_wait_timeout", time.Millisecond*50, "how long to wait for a connection from the tmclient connpool") - defaultPoolSweepInterval = flag.Duration("tablet_manager_grpc_connpool_sweep_interval", time.Second*30, "how often to clean up and close unused tmclient connections that exceed the idle timeout") + defaultPoolCapacity = flag.Int("tablet_manager_grpc_connpool_size", 100, "number of tablets to keep tmclient connections open to") ) func init() { tmclient.RegisterTabletManagerClientFactory("grpc-cached", func() tmclient.TabletManagerClient { - return NewCachedClient(*defaultPoolCapacity, *defaultPoolIdleTimeout, *defaultPoolWaitTimeout, *defaultPoolSweepInterval) + return NewCachedConnClient(*defaultPoolCapacity) }) } -type pooledTMC struct { +// closeFunc allows a standalone function to implement io.Closer, similar to +// how http.HandlerFunc allows standalone functions to implement http.Handler. +type closeFunc func() error + +func (fn closeFunc) Close() error { + return fn() +} + +var _ io.Closer = (*closeFunc)(nil) + +type cachedConn struct { tabletmanagerservicepb.TabletManagerClient cc *grpc.ClientConn - m sync.RWMutex // protects lastAccessTime and refs lastAccessTime time.Time refs int -} -// newPooledConn returns a pooledTMC dialed to the given address, and with the -// equivalent of having called aquire() on it exactly once. -func newPooledConn(addr string) (*pooledTMC, error) { - opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name) - if err != nil { - return nil, err - } + index int + key string +} - cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) - if err != nil { - return nil, err +// cachedConns provides a priority queue implementation for O(log n) connection +// eviction management. It is a nearly verbatim copy of the priority queue +// sample at https://golang.org/pkg/container/heap/#pkg-overview, with the +// Less function changed such that connections with refs==0 get pushed to the +// front of the queue, because those are the connections we are going to want +// to evict. +type cachedConns []*cachedConn + +var _ heap.Interface = (*cachedConns)(nil) + +// Len is part of the sort.Interface interface and is used by container/heap +// functions. +func (queue cachedConns) Len() int { return len(queue) } + +// Less is part of the sort.Interface interface and is used by container/heap +// functions. +func (queue cachedConns) Less(i, j int) bool { + left, right := queue[i], queue[j] + if left.refs == right.refs { + // break ties by access time. + // more stale connections have higher priority for removal + // this condition is equvalent to: + // left.lastAccessTime <= right.lastAccessTime + return !left.lastAccessTime.After(right.lastAccessTime) } - return &pooledTMC{ - TabletManagerClient: tabletmanagerservicepb.NewTabletManagerClient(cc), - cc: cc, - lastAccessTime: time.Now(), - refs: 1, - }, nil + // connections with fewer refs have higher priority for removal + return left.refs < right.refs } -func (tmc *pooledTMC) acquire() { - tmc.m.Lock() - defer tmc.m.Unlock() +// Swap is part of the sort.Interface interface and is used by container/heap +// functions. +func (queue cachedConns) Swap(i, j int) { + queue[i], queue[j] = queue[j], queue[i] + queue[i].index = i + queue[j].index = j +} - tmc.refs++ - tmc.lastAccessTime = time.Now() +// Push is part of the container/heap.Interface interface. +func (queue *cachedConns) Push(x interface{}) { + n := len(*queue) + conn := x.(*cachedConn) + conn.index = n + *queue = append(*queue, conn) } -func (tmc *pooledTMC) release() { - tmc.m.Lock() - defer tmc.m.Unlock() +// Pop is part of the container/heap.Interface interface. +func (queue *cachedConns) Pop() interface{} { + old := *queue + n := len(old) + conn := old[n-1] + old[n-1] = nil // avoid memory leak + conn.index = -1 // for safety + *queue = old[0 : n-1] - tmc.refs-- - if tmc.refs < 0 { - panic("release() called on unacquired pooled tabletmanager conn") - } + return conn } -// Close implements io.Closer for a pooledTMC. It is a wrapper around release() -// which never returns an error, but will panic if called on a pooledTMC that -// has not been acquire()'d. -func (tmc *pooledTMC) Close() error { - tmc.release() - return nil +type cachedConnDialer struct { + m sync.RWMutex + conns map[string]*cachedConn + qMu sync.Mutex + queue cachedConns + connWaitSema *sync2.Semaphore } -type cachedClient struct { - capacity int - idleTimeout time.Duration - waitTimeout time.Duration - - // sema gates the addition of new connections to the cache - sema *sync2.Semaphore - - m sync.RWMutex // protects conns map - conns map[string]*pooledTMC - - janitor *janitor +var dialerStats = struct { + ConnReuse *stats.Gauge + ConnNew *stats.Gauge + DialTimeouts *stats.Gauge + DialTimings *stats.Timings + EvictionQueueTimings *stats.Timings +}{ + ConnReuse: stats.NewGauge("tabletmanagerclient_cachedconn_reuse", "number of times a call to dial() was able to reuse an existing connection"), + ConnNew: stats.NewGauge("tabletmanagerclient_cachedconn_new", "number of times a call to dial() resulted in a dialing a new grpc clientconn"), + DialTimeouts: stats.NewGauge("tabletmanagerclient_cachedconn_dial_timeouts", "number of context timeouts during dial()"), + DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dialtimings", "timings for various dial paths", "path", "rlock_fast", "sema_fast", "sema_poll"), + // TODO: add timings for heap operations (push, pop, fix) } -// NewCachedClient returns a Client using the cachedClient dialer implementation. -// Because all connections are cached/pooled, it does not implement poolDialer, -// and grpctmclient.Client will use pooled connections for all RPCs. -func NewCachedClient(capacity int, idleTimeout time.Duration, waitTimeout time.Duration, sweepInterval time.Duration) *Client { - cc := &cachedClient{ - capacity: capacity, - idleTimeout: idleTimeout, - waitTimeout: waitTimeout, - sema: sync2.NewSemaphore(capacity, waitTimeout), - conns: make(map[string]*pooledTMC, capacity), - janitor: &janitor{ - ch: make(chan *struct{}, 10), // TODO: flag - timer: timer.NewTimer(sweepInterval), - }, +// NewCachedConnClient returns a grpc Client using the priority queue cache +// dialer implementation. +func NewCachedConnClient(capacity int) *Client { + dialer := &cachedConnDialer{ + conns: make(map[string]*cachedConn, capacity), + queue: make(cachedConns, 0, capacity), + connWaitSema: sync2.NewSemaphore(capacity, 0), } - cc.janitor.client = cc - go cc.janitor.run() - - return &Client{ - dialer: cc, - } + heap.Init(&dialer.queue) + return &Client{dialer} } -func (client *cachedClient) dial(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { - addr := getTabletAddr(tablet) - - client.m.RLock() - if conn, ok := client.conns[addr]; ok { - // Fast path, we have a conn for this addr in the cache. Mark it as - // acquired and return it. - defer client.m.RUnlock() - conn.acquire() - - return conn, conn, nil - } - client.m.RUnlock() - - // Slow path, we're going to see if there's a free slot. If so, we'll claim - // it and dial a new conn. If not, we're going to have to wait or timeout. - // We don't hold the lock while we're polling. - ctx, cancel := context.WithTimeout(ctx, client.waitTimeout) - defer cancel() +var _ dialer = (*cachedConnDialer)(nil) - dial := func(addr string) (conn *pooledTMC, closer io.Closer, err error) { - client.m.Lock() - defer client.m.Unlock() +func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { + start := time.Now() + addr := getTabletAddr(tablet) + dialer.m.RLock() + if conn, ok := dialer.conns[addr]; ok { defer func() { - // If we failed to dial a new conn for any reason, release our spot - // in the sema so another dial can take its place. - if err != nil { - client.sema.Release() - } + dialerStats.DialTimings.Add("rlock_fast", time.Since(start)) }() - - select { - case <-ctx.Done(): // We timed out waiting for the write lock, bail. - return nil, nil, ctx.Err() // TODO: wrap - default: - } - - conn, err = newPooledConn(addr) - if err != nil { - return nil, nil, err - } - - client.conns[addr] = conn - return conn, conn, nil + defer dialer.m.RUnlock() + return dialer.redial(conn) } + dialer.m.RUnlock() - if client.sema.TryAcquire() { - return dial(addr) - } - - select { - // Non-blocking signal to the janitor that it should consider sweeping soon. - case client.janitor.ch <- sentinel: - default: - } - - if !client.sema.AcquireContext(ctx) { - return nil, nil, ctx.Err() - } - - return dial(addr) -} - -func (client *cachedClient) Close() { - client.m.Lock() - defer client.m.Unlock() - - close(client.janitor.ch) - client.sweepFnLocked(func(conn *pooledTMC) (bool, bool) { - return true, false - }) -} - -func (client *cachedClient) sweep2(f func(conn *pooledTMC) (bool, bool)) { - client.m.RLock() - - var toFree []string - for key, conn := range client.conns { - conn.m.RLock() - shouldFree, stopSweep := f(conn) - conn.m.RUnlock() - - if shouldFree { - toFree = append(toFree, key) + if dialer.connWaitSema.TryAcquire() { + defer func() { + dialerStats.DialTimings.Add("sema_fast", time.Since(start)) + }() + dialer.m.Lock() + defer dialer.m.Unlock() + + // Check if another goroutine managed to dial a conn for the same addr + // while we were waiting for the write lock. This is identical to the + // read-lock section above. + if conn, ok := dialer.conns[addr]; ok { + return dialer.redial(conn) } - if stopSweep { - break - } + return dialer.newdial(addr, true /* manage queue lock */) } - client.m.RUnlock() + defer func() { + dialerStats.DialTimings.Add("sema_poll", time.Since(start)) + }() - if len(toFree) > 0 { - client.m.Lock() - defer client.m.Unlock() - - for _, key := range toFree { - conn, ok := client.conns[key] - if !ok { - continue + for { + select { + case <-ctx.Done(): + dialerStats.DialTimeouts.Add(1) + return nil, nil, ctx.Err() + default: + dialer.m.Lock() + if conn, ok := dialer.conns[addr]; ok { + // Someone else dialed this addr while we were polling. No need + // to evict anyone else, just reuse the existing conn. + defer dialer.m.Unlock() + return dialer.redial(conn) } - conn.m.Lock() - // check the condition again, things may have changed since we - // transitioned from the read lock to the write lock - shouldFree, _ := f(conn) - if !shouldFree { - conn.m.Unlock() + dialer.qMu.Lock() + conn := dialer.queue[0] + if conn.refs != 0 { + dialer.qMu.Unlock() + dialer.m.Unlock() continue } + // We're going to return from this point + defer dialer.m.Unlock() + defer dialer.qMu.Unlock() + heap.Pop(&dialer.queue) + delete(dialer.conns, conn.key) conn.cc.Close() - conn.m.Unlock() - delete(client.conns, key) - client.sema.Release() + + return dialer.newdial(addr, false /* manage queue lock */) } } } -func (client *cachedClient) sweep() { - now := time.Now() - client.sweep2(func(conn *pooledTMC) (bool, bool) { - return conn.refs == 0 && conn.lastAccessTime.Add(client.idleTimeout).Before(now), false - }) -} +// newdial creates a new cached connection, and updates the cache and eviction +// queue accordingly. This must be called only while holding the write lock on +// dialer.m as well as after having successfully acquired the dialer.connWaitSema. If newdial fails to create the underlying +// gRPC connection, it will make a call to Release the connWaitSema for other +// newdial calls. +// +// It returns the three-tuple of client-interface, closer, and error that the +// main dial func returns. +func (dialer *cachedConnDialer) newdial(addr string, manageQueueLock bool) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { + dialerStats.ConnNew.Add(1) -func (client *cachedClient) sweepFnLocked(f func(conn *pooledTMC) (shouldFree bool, stopSweep bool)) { - for key, conn := range client.conns { - conn.m.Lock() - shouldFree, stopSweep := f(conn) - if !shouldFree { - conn.m.Unlock() - if stopSweep { - return - } - - continue - } - - conn.cc.Close() - delete(client.conns, key) - client.sema.Release() - conn.m.Unlock() + opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name) + if err != nil { + dialer.connWaitSema.Release() + return nil, nil, err + } - if stopSweep { - return - } + cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) + if err != nil { + dialer.connWaitSema.Release() + return nil, nil, err } -} -var sentinel = &struct{}{} + // In the case where dial is evicting a connection from the cache, we + // already have a lock on the eviction queue. Conversely, in the case where + // we are able to create a new connection without evicting (because the + // cache is not yet full), we don't have the queue lock yet. + if manageQueueLock { + dialer.qMu.Lock() + defer dialer.qMu.Unlock() + } -type janitor struct { - ch chan *struct{} - client *cachedClient - timer *timer.Timer + conn := &cachedConn{ + TabletManagerClient: tabletmanagerservicepb.NewTabletManagerClient(cc), + cc: cc, + lastAccessTime: time.Now(), + refs: 1, + index: -1, // gets set by call to Push + key: addr, + } + heap.Push(&dialer.queue, conn) + dialer.conns[addr] = conn - m sync.Mutex - sweeping bool - lastSweep time.Time + return dialer.connWithCloser(conn) } -func (j *janitor) run() { - j.timer.Start(j.sweep) - defer j.timer.Stop() +// redial takes an already-dialed connection in the cache does all the work of +// lending that connection out to one more caller. this should only ever be +// called while holding at least the RLock on dialer.m (but the write lock is +// fine too), to prevent the connection from getting evicted out from under us. +// +// It returns the three-tuple of client-interface, closer, and error that the +// main dial func returns. +func (dialer *cachedConnDialer) redial(conn *cachedConn) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { + dialerStats.ConnReuse.Add(1) - for s := range j.ch { - if s == nil { - break - } + dialer.qMu.Lock() + defer dialer.qMu.Unlock() - scan := true - t := time.NewTimer(time.Millisecond * 50) // TODO: flag - for scan { - select { - case <-t.C: - scan = false - case s := <-j.ch: - if s == nil { - scan = false - } - default: - scan = false - } - } + conn.lastAccessTime = time.Now() + conn.refs++ + heap.Fix(&dialer.queue, conn.index) - t.Stop() - j.sweep() - } + return dialer.connWithCloser(conn) } -func (j *janitor) sweep() { - j.m.Lock() - if j.sweeping { - j.m.Unlock() - return - } +// connWithCloser returns the three-tuple expected by the main dial func, where +// the closer handles the correct state management for updating the conns place +// in the eviction queue. +func (dialer *cachedConnDialer) connWithCloser(conn *cachedConn) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { + return conn, closeFunc(func() error { + dialer.qMu.Lock() + defer dialer.qMu.Unlock() + + conn.refs-- + heap.Fix(&dialer.queue, conn.index) + return nil + }), nil +} - if j.lastSweep.Add(time.Millisecond * 10 /* TODO: flag */).After(time.Now()) { - j.m.Unlock() - return +// Close closes all currently cached connections, ***regardless of whether +// those connections are in use***. Calling Close therefore will fail any RPCs +// using currently lent-out connections, and, furthermore, will invalidate the +// io.Closer that was returned for that connection from dialer.dial(). +// +// As a result, it is not safe to reuse a cachedConnDialer after calling Close, +// and you should instead obtain a new one by calling either +// tmclient.TabletManagerClient() with +// TabletManagerProtocol set to "grpc-cached", or by calling +// grpctmclient.NewCachedConnClient directly. +func (dialer *cachedConnDialer) Close() { + dialer.m.Lock() + defer dialer.m.Unlock() + dialer.qMu.Lock() + defer dialer.qMu.Unlock() + + for dialer.queue.Len() > 0 { + conn := dialer.queue.Pop().(*cachedConn) + conn.cc.Close() + delete(dialer.conns, conn.key) + dialer.connWaitSema.Release() } - - j.sweeping = true - j.m.Unlock() - - j.client.sweep() - j.m.Lock() - j.sweeping = false - j.lastSweep = time.Now() - j.m.Unlock() } func getTabletAddr(tablet *topodatapb.Tablet) string { diff --git a/go/vt/vttablet/grpctmclient/cached_client_test.go b/go/vt/vttablet/grpctmclient/cached_client_test.go index c7d5c9b2af0..c25d4cd8758 100644 --- a/go/vt/vttablet/grpctmclient/cached_client_test.go +++ b/go/vt/vttablet/grpctmclient/cached_client_test.go @@ -63,39 +63,7 @@ func grpcTestServer(t testing.TB, tm tabletmanager.RPCTM) (*net.TCPAddr, func()) } } -func TestPooledTMC(t *testing.T) { - tmserv := tmrpctest.NewFakeRPCTM(t) - addr, shutdown := grpcTestServer(t, tmserv) - defer shutdown() - - start := time.Now() - tmc, err := newPooledConn(addr.String()) - require.NoError(t, err) - assert.Equal(t, tmc.refs, 1, "TODO") - assert.True(t, start.Before(tmc.lastAccessTime), "lastAccessTime is not after start; should have %v < %v", start, tmc.lastAccessTime) - - checkpoint := tmc.lastAccessTime - tmc.acquire() - assert.Equal(t, tmc.refs, 2, "TODO") - assert.True(t, checkpoint.Before(tmc.lastAccessTime), "lastAccessTime is not after checkpoint; should have %v < %v", checkpoint, tmc.lastAccessTime) - - checkpoint = tmc.lastAccessTime - tmc.release() - tmc.release() - assert.Equal(t, tmc.refs, 0, "TODO") - assert.True(t, checkpoint.Equal(tmc.lastAccessTime), "releasing pooledTMC should not change access time; should have %v = %v", checkpoint, tmc.lastAccessTime) - - t.Run("release panic", func(t *testing.T) { - defer func() { - err := recover() - assert.NotNil(t, err, "release on unacquired pooledTMC should panic") - }() - - tmc.release() - }) -} - -func BenchmarkCachedClient(b *testing.B) { +func BenchmarkCachedConnClient(b *testing.B) { tmserv := tmrpctest.NewFakeRPCTM(b) tablets := make([]*topodatapb.Tablet, 4) for i := 0; i < len(tablets); i++ { @@ -115,7 +83,7 @@ func BenchmarkCachedClient(b *testing.B) { } b.ResetTimer() - client := NewCachedClient(5, time.Second*30, time.Millisecond*50, time.Second*30) + client := NewCachedConnClient(5) defer client.Close() for i := 0; i < b.N; i++ { @@ -130,83 +98,20 @@ func BenchmarkCachedClient(b *testing.B) { } } -func TestCachedClient(t *testing.T) { +func TestCachedConnClient(t *testing.T) { t.Parallel() - tmserv := tmrpctest.NewFakeRPCTM(t) - tablets := make([]*topodatapb.Tablet, 4) - for i := 0; i < len(tablets); i++ { - addr, shutdown := grpcTestServer(t, tmserv) - defer shutdown() - - tablets[i] = &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{ - Cell: "test", - Uid: uint32(addr.Port), - }, - Hostname: addr.IP.String(), - PortMap: map[string]int32{ - "grpc": int32(addr.Port), - }, + if os.Getenv("VT_PPROF_TEST") != "" { + file, err := os.Create(fmt.Sprintf("%s.profile.out", t.Name())) + require.NoError(t, err) + defer file.Close() + if err := pprof.StartCPUProfile(file); err != nil { + t.Errorf("failed to start cpu profile: %v", err) + return } + defer pprof.StopCPUProfile() } - client := NewCachedClient(5, time.Second*30, time.Millisecond*50, time.Second*30) - defer client.Close() - - dialAttempts := sync2.NewAtomicInt64(0) - dialErrors := sync2.NewAtomicInt64(0) - testCtx, testCancel := context.WithCancel(context.Background()) - wg := sync.WaitGroup{} - - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - attempts := 0 - - for { - select { - case <-testCtx.Done(): - dialAttempts.Add(int64(attempts)) - return - default: - attempts++ - - tablet := tablets[rand.Intn(len(tablets))] - _, closer, err := client.dialer.dial(context.Background(), tablet) - if err != nil { - dialErrors.Add(1) - continue - } - - closer.Close() - } - } - }() - } - - time.Sleep(time.Second * 35) - testCancel() - wg.Wait() - - attempts, errors := dialAttempts.Get(), dialErrors.Get() - assert.Less(t, float64(errors)/float64(attempts), 0.001, "fewer than 0.1% of dial attempts should fail") -} - -func TestCachedClientMultipleSweeps(t *testing.T) { - t.Parallel() - - file, err := os.Create("profile.out") - require.NoError(t, err) - defer file.Close() - if err := pprof.StartCPUProfile(file); err != nil { - t.Errorf("failed to start cpu profile: %v", err) - return - } - defer pprof.StopCPUProfile() - testCtx, testCancel := context.WithCancel(context.Background()) wg := sync.WaitGroup{} procs := 0 @@ -250,20 +155,15 @@ func TestCachedClientMultipleSweeps(t *testing.T) { } } - // make sure we can dial every tablet - for _, tablet := range tablets { - err := NewClient().Ping(context.Background(), tablet) - require.NoError(t, err, "failed to dial tablet %s", tablet.Hostname) - } - - poolSize := int(float64(numTablets) * 0.8) - client := NewCachedClient(poolSize, time.Second*10, time.Millisecond*100, time.Second*30) - // client := NewPooledDialer() + poolSize := int(float64(numTablets) * 0.5) + client := NewCachedConnClient(poolSize) defer client.Close() dialAttempts := sync2.NewAtomicInt64(0) dialErrors := sync2.NewAtomicInt64(0) + longestDials := make(chan time.Duration, numGoroutines) + for i := 0; i < numGoroutines; i++ { wg.Add(1) go func() { @@ -271,23 +171,31 @@ func TestCachedClientMultipleSweeps(t *testing.T) { attempts := 0 jitter := time.Second * 0 + longestDial := time.Duration(0) for { select { case <-testCtx.Done(): dialAttempts.Add(int64(attempts)) + longestDials <- longestDial return case <-time.After(jitter): - jitter = time.Millisecond * (time.Duration(rand.Intn(51) + 100)) + jitter = time.Millisecond * (time.Duration(rand.Intn(11) + 50)) attempts++ tablet := tablets[rand.Intn(len(tablets))] + start := time.Now() _, closer, err := client.dialer.dial(context.Background(), tablet) if err != nil { dialErrors.Add(1) continue } + dialDuration := time.Since(start) + if dialDuration > longestDial { + longestDial = dialDuration + } + closer.Close() } } @@ -297,7 +205,17 @@ func TestCachedClientMultipleSweeps(t *testing.T) { time.Sleep(time.Minute) testCancel() wg.Wait() + close(longestDials) + + longestDial := time.Duration(0) + for dialDuration := range longestDials { + if dialDuration > longestDial { + longestDial = dialDuration + } + } attempts, errors := dialAttempts.Get(), dialErrors.Get() assert.Less(t, float64(errors)/float64(attempts), 0.001, fmt.Sprintf("fewer than 0.1%% of dial attempts should fail (attempts = %d, errors = %d, max running procs = %d)", attempts, errors, procs)) + assert.Less(t, errors, int64(1), "at least one dial attempt failed (attempts = %d, errors = %d)", attempts, errors) + assert.Less(t, longestDial.Milliseconds(), int64(50)) } diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 7ca004f6a21..6aa26c9a442 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -87,6 +87,13 @@ type poolDialer interface { // Connections are produced by the dialer implementation, which is either the // grpcClient implementation, which reuses connections only for ExecuteFetch and // otherwise makes single-purpose connections that are closed after use. +// +// In order to more efficiently use the underlying tcp connections, you can +// instead use the cachedConnDialer implementation by specifying +// -tablet_manager_protocol "grpc-cached" +// The cachedConnDialer keeps connections to up to -tablet_manager_grpc_connpool_size distinct +// tablets open at any given time, for faster per-RPC call time, and less +// connection churn. type Client struct { dialer dialer } From 45a165e56c533939b2857685a5e1262c6a607475 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Tue, 22 Jun 2021 16:26:40 -0400 Subject: [PATCH 05/13] Provide a second flag name to use for the oneshot dialer, to make deprecation easier Refactor sections of the main `dial` method to allow all unlocks to be deferrals Add a test case to exercise evictions and full caches Refactor heap operations to make timing them easier Signed-off-by: Andrew Mason --- go/vt/vttablet/grpctmclient/cached_client.go | 150 +++++++++++++----- .../grpctmclient/cached_client_test.go | 63 ++++++++ go/vt/vttablet/grpctmclient/client.go | 3 + 3 files changed, 175 insertions(+), 41 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go index 4cd2232c39e..17b8fca580c 100644 --- a/go/vt/vttablet/grpctmclient/cached_client.go +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -140,11 +140,11 @@ var dialerStats = struct { DialTimings *stats.Timings EvictionQueueTimings *stats.Timings }{ - ConnReuse: stats.NewGauge("tabletmanagerclient_cachedconn_reuse", "number of times a call to dial() was able to reuse an existing connection"), - ConnNew: stats.NewGauge("tabletmanagerclient_cachedconn_new", "number of times a call to dial() resulted in a dialing a new grpc clientconn"), - DialTimeouts: stats.NewGauge("tabletmanagerclient_cachedconn_dial_timeouts", "number of context timeouts during dial()"), - DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dialtimings", "timings for various dial paths", "path", "rlock_fast", "sema_fast", "sema_poll"), - // TODO: add timings for heap operations (push, pop, fix) + ConnReuse: stats.NewGauge("tabletmanagerclient_cachedconn_reuse", "number of times a call to dial() was able to reuse an existing connection"), + ConnNew: stats.NewGauge("tabletmanagerclient_cachedconn_new", "number of times a call to dial() resulted in a dialing a new grpc clientconn"), + DialTimeouts: stats.NewGauge("tabletmanagerclient_cachedconn_dial_timeouts", "number of context timeouts during dial()"), + DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dialtimings", "timings for various dial paths", "path", "rlock_fast", "sema_fast", "sema_poll"), + EvictionQueueTimings: stats.NewTimings("tabletmanagerclient_cachedconn_eviction_queue_timings", "timings for eviction queue management operations", "operation", "init", "push", "pop", "fix"), } // NewCachedConnClient returns a grpc Client using the priority queue cache @@ -156,7 +156,7 @@ func NewCachedConnClient(capacity int) *Client { connWaitSema: sync2.NewSemaphore(capacity, 0), } - heap.Init(&dialer.queue) + dialer.heapInit() return &Client{dialer} } @@ -164,17 +164,12 @@ var _ dialer = (*cachedConnDialer)(nil) func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { start := time.Now() - addr := getTabletAddr(tablet) - dialer.m.RLock() - if conn, ok := dialer.conns[addr]; ok { - defer func() { - dialerStats.DialTimings.Add("rlock_fast", time.Since(start)) - }() - defer dialer.m.RUnlock() - return dialer.redial(conn) + + if client, closer, found, err := dialer.tryFromCache(addr, dialer.m.RLocker()); found { + dialerStats.DialTimings.Add("rlock_fast", time.Since(start)) + return client, closer, err } - dialer.m.RUnlock() if dialer.connWaitSema.TryAcquire() { defer func() { @@ -186,8 +181,8 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab // Check if another goroutine managed to dial a conn for the same addr // while we were waiting for the write lock. This is identical to the // read-lock section above. - if conn, ok := dialer.conns[addr]; ok { - return dialer.redial(conn) + if client, closer, found, err := dialer.tryFromCache(addr, nil); found { + return client, closer, err } return dialer.newdial(addr, true /* manage queue lock */) @@ -203,32 +198,77 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab dialerStats.DialTimeouts.Add(1) return nil, nil, ctx.Err() default: - dialer.m.Lock() - if conn, ok := dialer.conns[addr]; ok { - // Someone else dialed this addr while we were polling. No need - // to evict anyone else, just reuse the existing conn. - defer dialer.m.Unlock() - return dialer.redial(conn) + if client, closer, found, err := dialer.pollOnce(addr); found { + return client, closer, err } + } + } +} - dialer.qMu.Lock() - conn := dialer.queue[0] - if conn.refs != 0 { - dialer.qMu.Unlock() - dialer.m.Unlock() - continue - } +// tryFromCache tries to get a connection from the cache, performing a redial +// on that connection if it exists. It returns a TabletManagerClient impl, an +// io.Closer, a flag to indicate whether a connection was found in the cache, +// and an error, which is always nil. +// +// In addition to the addr being dialed, tryFromCache takes a sync.Locker which, +// if not nil, will be used to wrap the lookup and redial in that lock. This +// function can be called in situations where the conns map is locked +// externally (like in pollOnce), so we do not want to manage the locks here. In +// other cases (like in the rlock_fast path of dial()), we pass in the RLocker +// to ensure we have a read lock on the cache for the duration of the call. +func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (client tabletmanagerservicepb.TabletManagerClient, closer io.Closer, found bool, err error) { + if locker != nil { + locker.Lock() + defer locker.Unlock() + } - // We're going to return from this point - defer dialer.m.Unlock() - defer dialer.qMu.Unlock() - heap.Pop(&dialer.queue) - delete(dialer.conns, conn.key) - conn.cc.Close() + if conn, ok := dialer.conns[addr]; ok { + client, closer, err := dialer.redial(conn) + return client, closer, ok, err + } - return dialer.newdial(addr, false /* manage queue lock */) - } + return nil, nil, false, nil +} + +// pollOnce is called on each iteration of the polling loop in dial(). It: +// - locks the conns cache for writes +// - attempts to get a connection from the cache. If found, redial() it and exit. +// - locks the queue +// - peeks at the head of the eviction queue. if the peeked conn has no refs, it +// is unused, and can be evicted to make room for the new connection to addr. +// If the peeked conn has refs, exit. +// - pops the conn we just peeked from the queue, delete it from the cache, and +// close the underlying ClientConn for that conn. +// - attempt a newdial. if the newdial fails, it will release a slot on the +// connWaitSema, so another dial() call can successfully acquire it to dial +// a new conn. if the newdial succeeds, we will have evicted one conn, but +// added another, so the net change is 0, and no changes to the connWaitSema +// are made. +// +// It returns a TabletManagerClient impl, an io.Closer, a flag to indicate +// whether the dial() poll loop should exit, and an error. +func (dialer *cachedConnDialer) pollOnce(addr string) (client tabletmanagerservicepb.TabletManagerClient, closer io.Closer, found bool, err error) { + dialer.m.Lock() + defer dialer.m.Unlock() + + if client, closer, found, err := dialer.tryFromCache(addr, nil); found { + return client, closer, found, err } + + dialer.qMu.Lock() + defer dialer.qMu.Unlock() + + conn := dialer.queue[0] + if conn.refs != 0 { + return nil, nil, false, nil + } + + dialer.heapPop() + delete(dialer.conns, conn.key) + conn.cc.Close() + + client, closer, err = dialer.newdial(addr, false /* manage queue lock */) + return client, closer, true, err } // newdial creates a new cached connection, and updates the cache and eviction @@ -271,7 +311,7 @@ func (dialer *cachedConnDialer) newdial(addr string, manageQueueLock bool) (tabl index: -1, // gets set by call to Push key: addr, } - heap.Push(&dialer.queue, conn) + dialer.heapPush(conn) dialer.conns[addr] = conn return dialer.connWithCloser(conn) @@ -292,7 +332,7 @@ func (dialer *cachedConnDialer) redial(conn *cachedConn) (tabletmanagerservicepb conn.lastAccessTime = time.Now() conn.refs++ - heap.Fix(&dialer.queue, conn.index) + dialer.heapFix(conn.index) return dialer.connWithCloser(conn) } @@ -306,11 +346,39 @@ func (dialer *cachedConnDialer) connWithCloser(conn *cachedConn) (tabletmanagers defer dialer.qMu.Unlock() conn.refs-- - heap.Fix(&dialer.queue, conn.index) + dialer.heapFix(conn.index) return nil }), nil } +// Functions to wrap queue operations to record timings for them. + +func (dialer *cachedConnDialer) heapInit() { + start := time.Now() + heap.Init(&dialer.queue) + dialerStats.EvictionQueueTimings.Add("init", time.Since(start)) +} + +func (dialer *cachedConnDialer) heapFix(index int) { + start := time.Now() + heap.Fix(&dialer.queue, index) + dialerStats.EvictionQueueTimings.Add("fix", time.Since(start)) +} + +func (dialer *cachedConnDialer) heapPop() interface{} { + start := time.Now() + x := heap.Pop(&dialer.queue) + dialerStats.EvictionQueueTimings.Add("pop", time.Since(start)) + + return x +} + +func (dialer *cachedConnDialer) heapPush(conn *cachedConn) { + start := time.Now() + heap.Push(&dialer.queue, conn) + dialerStats.EvictionQueueTimings.Add("push", time.Since(start)) +} + // Close closes all currently cached connections, ***regardless of whether // those connections are in use***. Calling Close therefore will fail any RPCs // using currently lent-out connections, and, furthermore, will invalidate the diff --git a/go/vt/vttablet/grpctmclient/cached_client_test.go b/go/vt/vttablet/grpctmclient/cached_client_test.go index c25d4cd8758..0f9b83c65cf 100644 --- a/go/vt/vttablet/grpctmclient/cached_client_test.go +++ b/go/vt/vttablet/grpctmclient/cached_client_test.go @@ -19,6 +19,7 @@ package grpctmclient import ( "context" "fmt" + "io" "math/rand" "net" "os" @@ -219,3 +220,65 @@ func TestCachedConnClient(t *testing.T) { assert.Less(t, errors, int64(1), "at least one dial attempt failed (attempts = %d, errors = %d)", attempts, errors) assert.Less(t, longestDial.Milliseconds(), int64(50)) } + +func TestCachedConnClient_evictions(t *testing.T) { + tmserv := tmrpctest.NewFakeRPCTM(t) + tablets := make([]*topodatapb.Tablet, 5) + for i := 0; i < len(tablets); i++ { + addr, shutdown := grpcTestServer(t, tmserv) + defer shutdown() + + tablets[i] = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "test", + Uid: uint32(addr.Port), + }, + Hostname: addr.IP.String(), + PortMap: map[string]int32{ + "grpc": int32(addr.Port), + }, + } + } + + testCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + connHoldContext, connHoldCancel := context.WithCancel(testCtx) + + client := NewCachedConnClient(len(tablets) - 1) + for i := 0; i < len(tablets)-1; i++ { + _, closer, err := client.dialer.dial(context.Background(), tablets[i]) + t.Logf("holding connection open to %d", tablets[i].Alias.Uid) + require.NoError(t, err) + + ctx := testCtx + if i == 0 { + ctx = connHoldContext + } + go func(ctx context.Context, closer io.Closer) { + // Hold on to one connection until the test is done. + // In the case of tablets[0], hold on to the connection until we + // signal to close it. + <-ctx.Done() + closer.Close() + }(ctx, closer) + } + + dialCtx, dialCancel := context.WithTimeout(testCtx, time.Millisecond*50) + defer dialCancel() + + err := client.Ping(dialCtx, tablets[0]) // this should take the rlock_fast path + assert.NoError(t, err, "could not redial on inuse cached connection") + + err = client.Ping(dialCtx, tablets[4]) // this will enter the poll loop until context timeout + assert.Error(t, err, "should have timed out waiting for an eviction, while all conns were held") + + // free up a connection + connHoldCancel() + + dialCtx, dialCancel = context.WithTimeout(testCtx, time.Millisecond*100) + defer dialCancel() + + err = client.Ping(dialCtx, tablets[4]) // this will enter the poll loop and evict a connection + assert.NoError(t, err, "should have evicted a conn and succeeded to dial") +} diff --git a/go/vt/vttablet/grpctmclient/client.go b/go/vt/vttablet/grpctmclient/client.go index 6aa26c9a442..c7dd4e3fcd5 100644 --- a/go/vt/vttablet/grpctmclient/client.go +++ b/go/vt/vttablet/grpctmclient/client.go @@ -55,6 +55,9 @@ func init() { tmclient.RegisterTabletManagerClientFactory("grpc", func() tmclient.TabletManagerClient { return NewClient() }) + tmclient.RegisterTabletManagerClientFactory("grpc-oneshot", func() tmclient.TabletManagerClient { + return NewClient() + }) } type tmc struct { From ca41e804d567769899248b3790fbc159299dcc71 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Thu, 24 Jun 2021 11:41:29 -0400 Subject: [PATCH 06/13] Add steadystate benchmarks Signed-off-by: Andrew Mason --- .../grpctmclient/cached_client_test.go | 212 ++++++++++++++++++ 1 file changed, 212 insertions(+) diff --git a/go/vt/vttablet/grpctmclient/cached_client_test.go b/go/vt/vttablet/grpctmclient/cached_client_test.go index 0f9b83c65cf..068ff4b3118 100644 --- a/go/vt/vttablet/grpctmclient/cached_client_test.go +++ b/go/vt/vttablet/grpctmclient/cached_client_test.go @@ -99,6 +99,218 @@ func BenchmarkCachedConnClient(b *testing.B) { } } +func BenchmarkCachedConnClientSteadyState(b *testing.B) { + tmserv := tmrpctest.NewFakeRPCTM(b) + tablets := make([]*topodatapb.Tablet, 1000) + for i := 0; i < len(tablets); i++ { + addr, shutdown := grpcTestServer(b, tmserv) + // b.Cleanup(shutdown) + defer shutdown() + + tablets[i] = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "test", + Uid: uint32(addr.Port), + }, + Hostname: addr.IP.String(), + PortMap: map[string]int32{ + "grpc": int32(addr.Port), + }, + } + } + + client := NewCachedConnClient(100) + // b.Cleanup(client.Close) + defer client.Close() + + // fill the pool + for i := 0; i < 100; i++ { + err := client.Ping(context.Background(), tablets[i]) + require.NoError(b, err) + } + + procs := runtime.GOMAXPROCS(0) / 4 + if procs == 0 { + procs = 2 + } + + pingsPerProc := len(tablets) / procs + if pingsPerProc == 0 { + pingsPerProc = 2 + } + + b.ResetTimer() + + // Begin the benchmark + for i := 0; i < b.N; i++ { + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + for j := 0; j < procs; j++ { + wg.Add(1) + go func() { + defer wg.Done() + + for k := 0; k < pingsPerProc; k++ { + func() { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + x := rand.Intn(len(tablets)) + err := client.Ping(ctx, tablets[x]) + assert.NoError(b, err) + }() + } + }() + } + + wg.Wait() + cancel() + } +} + +func BenchmarkCachedConnClientSteadyStateRedials(b *testing.B) { + tmserv := tmrpctest.NewFakeRPCTM(b) + tablets := make([]*topodatapb.Tablet, 1000) + for i := 0; i < len(tablets); i++ { + addr, shutdown := grpcTestServer(b, tmserv) + // b.Cleanup(shutdown) + defer shutdown() + + tablets[i] = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "test", + Uid: uint32(addr.Port), + }, + Hostname: addr.IP.String(), + PortMap: map[string]int32{ + "grpc": int32(addr.Port), + }, + } + } + + client := NewCachedConnClient(1000) + // b.Cleanup(client.Close) + defer client.Close() + + // fill the pool + for i := 0; i < 1000; i++ { + err := client.Ping(context.Background(), tablets[i]) + require.NoError(b, err) + } + + procs := runtime.GOMAXPROCS(0) / 4 + if procs == 0 { + procs = 2 + } + + pingsPerProc := len(tablets) / procs + if pingsPerProc == 0 { + pingsPerProc = 2 + } + + b.ResetTimer() + + // Begin the benchmark + for i := 0; i < b.N; i++ { + ctx, cancel := context.WithCancel(context.Background()) + + var wg sync.WaitGroup + for j := 0; j < procs; j++ { + wg.Add(1) + go func() { + defer wg.Done() + + for k := 0; k < pingsPerProc; k++ { + func() { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + x := rand.Intn(len(tablets)) + err := client.Ping(ctx, tablets[x]) + assert.NoError(b, err) + }() + } + }() + } + + wg.Wait() + cancel() + } +} + +func BenchmarkCachedConnClientSteadyStateEvictions(b *testing.B) { + tmserv := tmrpctest.NewFakeRPCTM(b) + tablets := make([]*topodatapb.Tablet, 1000) + for i := 0; i < len(tablets); i++ { + addr, shutdown := grpcTestServer(b, tmserv) + b.Cleanup(shutdown) + + tablets[i] = &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "test", + Uid: uint32(addr.Port), + }, + Hostname: addr.IP.String(), + PortMap: map[string]int32{ + "grpc": int32(addr.Port), + }, + } + } + + client := NewCachedConnClient(100) + b.Cleanup(client.Close) + + // fill the pool + for i := 0; i < 100; i++ { + err := client.Ping(context.Background(), tablets[i]) + require.NoError(b, err) + } + + assert.Equal(b, len(client.dialer.(*cachedConnDialer).conns), 100) + + procs := runtime.GOMAXPROCS(0) / 4 + if procs == 0 { + procs = 2 + } + + start := 100 + b.ResetTimer() + + // Begin the benchmark + for i := 0; i < b.N; i++ { + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan int, 100) // 100 dials per iteration + + var wg sync.WaitGroup + for j := 0; j < procs; j++ { + wg.Add(1) + go func() { + defer wg.Done() + + for idx := range ch { + func() { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + err := client.Ping(ctx, tablets[idx]) + assert.NoError(b, err) + }() + } + }() + } + + for j := 0; j < cap(ch); j++ { + start = (start + j) % 1000 // go in increasing order, wrapping around + ch <- start + } + + close(ch) + wg.Wait() + cancel() + } +} + func TestCachedConnClient(t *testing.T) { t.Parallel() From 2228c62a2b17c5db6d5831a03bbb66ba276fe40c Mon Sep 17 00:00:00 2001 From: Vicent Marti Date: Wed, 23 Jun 2021 18:20:40 +0200 Subject: [PATCH 07/13] grpctmclient: simplify locking & queuing Signed-off-by: Vicent Marti --- go/vt/vttablet/grpctmclient/cached_client.go | 216 ++++++------------- 1 file changed, 64 insertions(+), 152 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go index 17b8fca580c..1b2e5492fa4 100644 --- a/go/vt/vttablet/grpctmclient/cached_client.go +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -17,10 +17,10 @@ limitations under the License. package grpctmclient import ( - "container/heap" "context" "flag" "io" + "sort" "sync" "time" @@ -60,113 +60,61 @@ type cachedConn struct { tabletmanagerservicepb.TabletManagerClient cc *grpc.ClientConn + addr string lastAccessTime time.Time refs int - - index int - key string -} - -// cachedConns provides a priority queue implementation for O(log n) connection -// eviction management. It is a nearly verbatim copy of the priority queue -// sample at https://golang.org/pkg/container/heap/#pkg-overview, with the -// Less function changed such that connections with refs==0 get pushed to the -// front of the queue, because those are the connections we are going to want -// to evict. -type cachedConns []*cachedConn - -var _ heap.Interface = (*cachedConns)(nil) - -// Len is part of the sort.Interface interface and is used by container/heap -// functions. -func (queue cachedConns) Len() int { return len(queue) } - -// Less is part of the sort.Interface interface and is used by container/heap -// functions. -func (queue cachedConns) Less(i, j int) bool { - left, right := queue[i], queue[j] - if left.refs == right.refs { - // break ties by access time. - // more stale connections have higher priority for removal - // this condition is equvalent to: - // left.lastAccessTime <= right.lastAccessTime - return !left.lastAccessTime.After(right.lastAccessTime) - } - - // connections with fewer refs have higher priority for removal - return left.refs < right.refs -} - -// Swap is part of the sort.Interface interface and is used by container/heap -// functions. -func (queue cachedConns) Swap(i, j int) { - queue[i], queue[j] = queue[j], queue[i] - queue[i].index = i - queue[j].index = j -} - -// Push is part of the container/heap.Interface interface. -func (queue *cachedConns) Push(x interface{}) { - n := len(*queue) - conn := x.(*cachedConn) - conn.index = n - *queue = append(*queue, conn) -} - -// Pop is part of the container/heap.Interface interface. -func (queue *cachedConns) Pop() interface{} { - old := *queue - n := len(old) - conn := old[n-1] - old[n-1] = nil // avoid memory leak - conn.index = -1 // for safety - *queue = old[0 : n-1] - - return conn } type cachedConnDialer struct { - m sync.RWMutex + m sync.Mutex conns map[string]*cachedConn - qMu sync.Mutex - queue cachedConns + evict []*cachedConn + evictSorted bool connWaitSema *sync2.Semaphore } var dialerStats = struct { - ConnReuse *stats.Gauge - ConnNew *stats.Gauge - DialTimeouts *stats.Gauge - DialTimings *stats.Timings - EvictionQueueTimings *stats.Timings + ConnReuse *stats.Gauge + ConnNew *stats.Gauge + DialTimeouts *stats.Gauge + DialTimings *stats.Timings }{ - ConnReuse: stats.NewGauge("tabletmanagerclient_cachedconn_reuse", "number of times a call to dial() was able to reuse an existing connection"), - ConnNew: stats.NewGauge("tabletmanagerclient_cachedconn_new", "number of times a call to dial() resulted in a dialing a new grpc clientconn"), - DialTimeouts: stats.NewGauge("tabletmanagerclient_cachedconn_dial_timeouts", "number of context timeouts during dial()"), - DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dialtimings", "timings for various dial paths", "path", "rlock_fast", "sema_fast", "sema_poll"), - EvictionQueueTimings: stats.NewTimings("tabletmanagerclient_cachedconn_eviction_queue_timings", "timings for eviction queue management operations", "operation", "init", "push", "pop", "fix"), + ConnReuse: stats.NewGauge("tabletmanagerclient_cachedconn_reuse", "number of times a call to dial() was able to reuse an existing connection"), + ConnNew: stats.NewGauge("tabletmanagerclient_cachedconn_new", "number of times a call to dial() resulted in a dialing a new grpc clientconn"), + DialTimeouts: stats.NewGauge("tabletmanagerclient_cachedconn_dial_timeouts", "number of context timeouts during dial()"), + DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dialtimings", "timings for various dial paths", "path", "rlock_fast", "sema_fast", "sema_poll"), } -// NewCachedConnClient returns a grpc Client using the priority queue cache -// dialer implementation. +// NewCachedConnClient returns a grpc Client that caches connections to the different tablets func NewCachedConnClient(capacity int) *Client { dialer := &cachedConnDialer{ conns: make(map[string]*cachedConn, capacity), - queue: make(cachedConns, 0, capacity), + evict: make([]*cachedConn, 0, capacity), connWaitSema: sync2.NewSemaphore(capacity, 0), } - - dialer.heapInit() return &Client{dialer} } var _ dialer = (*cachedConnDialer)(nil) +func (dialer *cachedConnDialer) sortEvictionsLocked() { + if !dialer.evictSorted { + sort.Slice(dialer.evict, func(i, j int) bool { + left, right := dialer.evict[i], dialer.evict[j] + if left.refs == right.refs { + return left.lastAccessTime.After(right.lastAccessTime) + } + return left.refs > right.refs + }) + dialer.evictSorted = true + } +} + func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tablet) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { start := time.Now() addr := getTabletAddr(tablet) - if client, closer, found, err := dialer.tryFromCache(addr, dialer.m.RLocker()); found { + if client, closer, found, err := dialer.tryFromCache(addr, &dialer.m); found { dialerStats.DialTimings.Add("rlock_fast", time.Since(start)) return client, closer, err } @@ -175,17 +123,14 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab defer func() { dialerStats.DialTimings.Add("sema_fast", time.Since(start)) }() - dialer.m.Lock() - defer dialer.m.Unlock() // Check if another goroutine managed to dial a conn for the same addr // while we were waiting for the write lock. This is identical to the // read-lock section above. - if client, closer, found, err := dialer.tryFromCache(addr, nil); found { + if client, closer, found, err := dialer.tryFromCache(addr, &dialer.m); found { return client, closer, err } - - return dialer.newdial(addr, true /* manage queue lock */) + return dialer.newdial(addr) } defer func() { @@ -223,7 +168,7 @@ func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (c } if conn, ok := dialer.conns[addr]; ok { - client, closer, err := dialer.redial(conn) + client, closer, err := dialer.redialLocked(conn) return client, closer, ok, err } @@ -249,37 +194,37 @@ func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (c // whether the dial() poll loop should exit, and an error. func (dialer *cachedConnDialer) pollOnce(addr string) (client tabletmanagerservicepb.TabletManagerClient, closer io.Closer, found bool, err error) { dialer.m.Lock() - defer dialer.m.Unlock() if client, closer, found, err := dialer.tryFromCache(addr, nil); found { + dialer.m.Unlock() return client, closer, found, err } - dialer.qMu.Lock() - defer dialer.qMu.Unlock() + dialer.sortEvictionsLocked() - conn := dialer.queue[0] + conn := dialer.evict[len(dialer.evict)-1] if conn.refs != 0 { + dialer.m.Unlock() return nil, nil, false, nil } - dialer.heapPop() - delete(dialer.conns, conn.key) + dialer.evict = dialer.evict[:len(dialer.evict)-1] + delete(dialer.conns, conn.addr) conn.cc.Close() + dialer.m.Unlock() - client, closer, err = dialer.newdial(addr, false /* manage queue lock */) + client, closer, err = dialer.newdial(addr) return client, closer, true, err } // newdial creates a new cached connection, and updates the cache and eviction -// queue accordingly. This must be called only while holding the write lock on -// dialer.m as well as after having successfully acquired the dialer.connWaitSema. If newdial fails to create the underlying +// queue accordingly. If newdial fails to create the underlying // gRPC connection, it will make a call to Release the connWaitSema for other // newdial calls. // // It returns the three-tuple of client-interface, closer, and error that the // main dial func returns. -func (dialer *cachedConnDialer) newdial(addr string, manageQueueLock bool) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { +func (dialer *cachedConnDialer) newdial(addr string) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { dialerStats.ConnNew.Add(1) opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name) @@ -294,13 +239,16 @@ func (dialer *cachedConnDialer) newdial(addr string, manageQueueLock bool) (tabl return nil, nil, err } - // In the case where dial is evicting a connection from the cache, we - // already have a lock on the eviction queue. Conversely, in the case where - // we are able to create a new connection without evicting (because the - // cache is not yet full), we don't have the queue lock yet. - if manageQueueLock { - dialer.qMu.Lock() - defer dialer.qMu.Unlock() + dialer.m.Lock() + defer dialer.m.Unlock() + + if conn, existing := dialer.conns[addr]; existing { + // race condition: some other goroutine has dialed our tablet before we have; + // this is not great, but shouldn't happen often (if at all), so we're going to + // close this connection and reuse the existing one. by doing this, we can keep + // the actual Dial out of the global lock and significantly increase throughput + cc.Close() + return dialer.redialLocked(conn) } conn := &cachedConn{ @@ -308,32 +256,27 @@ func (dialer *cachedConnDialer) newdial(addr string, manageQueueLock bool) (tabl cc: cc, lastAccessTime: time.Now(), refs: 1, - index: -1, // gets set by call to Push - key: addr, + addr: addr, } - dialer.heapPush(conn) + dialer.evict = append(dialer.evict, conn) + dialer.evictSorted = false dialer.conns[addr] = conn return dialer.connWithCloser(conn) } -// redial takes an already-dialed connection in the cache does all the work of +// redialLocked takes an already-dialed connection in the cache does all the work of // lending that connection out to one more caller. this should only ever be // called while holding at least the RLock on dialer.m (but the write lock is // fine too), to prevent the connection from getting evicted out from under us. // // It returns the three-tuple of client-interface, closer, and error that the // main dial func returns. -func (dialer *cachedConnDialer) redial(conn *cachedConn) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { +func (dialer *cachedConnDialer) redialLocked(conn *cachedConn) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { dialerStats.ConnReuse.Add(1) - - dialer.qMu.Lock() - defer dialer.qMu.Unlock() - conn.lastAccessTime = time.Now() conn.refs++ - dialer.heapFix(conn.index) - + dialer.evictSorted = false return dialer.connWithCloser(conn) } @@ -342,43 +285,14 @@ func (dialer *cachedConnDialer) redial(conn *cachedConn) (tabletmanagerservicepb // in the eviction queue. func (dialer *cachedConnDialer) connWithCloser(conn *cachedConn) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { return conn, closeFunc(func() error { - dialer.qMu.Lock() - defer dialer.qMu.Unlock() - + dialer.m.Lock() + defer dialer.m.Unlock() conn.refs-- - dialer.heapFix(conn.index) + dialer.evictSorted = false return nil }), nil } -// Functions to wrap queue operations to record timings for them. - -func (dialer *cachedConnDialer) heapInit() { - start := time.Now() - heap.Init(&dialer.queue) - dialerStats.EvictionQueueTimings.Add("init", time.Since(start)) -} - -func (dialer *cachedConnDialer) heapFix(index int) { - start := time.Now() - heap.Fix(&dialer.queue, index) - dialerStats.EvictionQueueTimings.Add("fix", time.Since(start)) -} - -func (dialer *cachedConnDialer) heapPop() interface{} { - start := time.Now() - x := heap.Pop(&dialer.queue) - dialerStats.EvictionQueueTimings.Add("pop", time.Since(start)) - - return x -} - -func (dialer *cachedConnDialer) heapPush(conn *cachedConn) { - start := time.Now() - heap.Push(&dialer.queue, conn) - dialerStats.EvictionQueueTimings.Add("push", time.Since(start)) -} - // Close closes all currently cached connections, ***regardless of whether // those connections are in use***. Calling Close therefore will fail any RPCs // using currently lent-out connections, and, furthermore, will invalidate the @@ -392,15 +306,13 @@ func (dialer *cachedConnDialer) heapPush(conn *cachedConn) { func (dialer *cachedConnDialer) Close() { dialer.m.Lock() defer dialer.m.Unlock() - dialer.qMu.Lock() - defer dialer.qMu.Unlock() - for dialer.queue.Len() > 0 { - conn := dialer.queue.Pop().(*cachedConn) + for _, conn := range dialer.evict { conn.cc.Close() - delete(dialer.conns, conn.key) + delete(dialer.conns, conn.addr) dialer.connWaitSema.Release() } + dialer.evict = nil } func getTabletAddr(tablet *topodatapb.Tablet) string { From 584396cdae43f6682d0174322fad2970862f15bf Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sat, 26 Jun 2021 08:15:54 -0400 Subject: [PATCH 08/13] Add missing connwaitsema release on a not-actually-new newdial Signed-off-by: Andrew Mason --- go/vt/vttablet/grpctmclient/cached_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go index 1b2e5492fa4..188e608e456 100644 --- a/go/vt/vttablet/grpctmclient/cached_client.go +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -248,6 +248,7 @@ func (dialer *cachedConnDialer) newdial(addr string) (tabletmanagerservicepb.Tab // close this connection and reuse the existing one. by doing this, we can keep // the actual Dial out of the global lock and significantly increase throughput cc.Close() + dialer.connWaitSema.Release() return dialer.redialLocked(conn) } From ec85098bf5c4ad9ba41249b8426ee0996e792db6 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Tue, 29 Jun 2021 12:43:09 -0400 Subject: [PATCH 09/13] Add eviction optimization By putting evictable conns at the front, we know that when we append to the end we don't need to resort, so multiple successive evictions can be faster Signed-off-by: Andrew Mason --- go/vt/vttablet/grpctmclient/cached_client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go index 188e608e456..38383d36020 100644 --- a/go/vt/vttablet/grpctmclient/cached_client.go +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -102,9 +102,9 @@ func (dialer *cachedConnDialer) sortEvictionsLocked() { sort.Slice(dialer.evict, func(i, j int) bool { left, right := dialer.evict[i], dialer.evict[j] if left.refs == right.refs { - return left.lastAccessTime.After(right.lastAccessTime) + return right.lastAccessTime.After(left.lastAccessTime) } - return left.refs > right.refs + return right.refs > left.refs }) dialer.evictSorted = true } @@ -202,13 +202,13 @@ func (dialer *cachedConnDialer) pollOnce(addr string) (client tabletmanagerservi dialer.sortEvictionsLocked() - conn := dialer.evict[len(dialer.evict)-1] + conn := dialer.evict[0] if conn.refs != 0 { dialer.m.Unlock() return nil, nil, false, nil } - dialer.evict = dialer.evict[:len(dialer.evict)-1] + dialer.evict = dialer.evict[1:] delete(dialer.conns, conn.addr) conn.cc.Close() dialer.m.Unlock() @@ -260,7 +260,7 @@ func (dialer *cachedConnDialer) newdial(addr string) (tabletmanagerservicepb.Tab addr: addr, } dialer.evict = append(dialer.evict, conn) - dialer.evictSorted = false + // dialer.evictSorted = false -- no need to do this here dialer.conns[addr] = conn return dialer.connWithCloser(conn) From 024965c2971c07ea9656d159730a25fe69f7d837 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Tue, 29 Jun 2021 14:26:56 -0400 Subject: [PATCH 10/13] Add final missing sema release Signed-off-by: Andrew Mason --- go/vt/vttablet/grpctmclient/cached_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go index 38383d36020..3cb3a0c09d0 100644 --- a/go/vt/vttablet/grpctmclient/cached_client.go +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -128,6 +128,7 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab // while we were waiting for the write lock. This is identical to the // read-lock section above. if client, closer, found, err := dialer.tryFromCache(addr, &dialer.m); found { + dialer.connWaitSema.Release() return client, closer, err } return dialer.newdial(addr) From f3ff5c708e728f83f40c53b590ceb78327cc37c9 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Tue, 29 Jun 2021 14:30:43 -0400 Subject: [PATCH 11/13] Switch to DialContext Signed-off-by: Andrew Mason --- go/vt/grpcclient/client.go | 13 ++++++++++++- go/vt/vttablet/grpctmclient/cached_client.go | 12 ++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/go/vt/grpcclient/client.go b/go/vt/grpcclient/client.go index b000a542a41..6ad54ae8dea 100644 --- a/go/vt/grpcclient/client.go +++ b/go/vt/grpcclient/client.go @@ -19,6 +19,7 @@ limitations under the License. package grpcclient import ( + "context" "flag" "time" @@ -58,6 +59,16 @@ func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([ // failFast is a non-optional parameter because callers are required to specify // what that should be. func Dial(target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return DialContext(context.Background(), target, failFast, opts...) +} + +// DialContext creates a grpc connection to the given target. Setup steps are +// covered by the context deadline, and, if WithBlock is specified in the dial +// options, connection establishment steps are covered by the context as well. +// +// failFast is a non-optional parameter because callers are required to specify +// what that should be. +func DialContext(ctx context.Context, target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.ClientConn, error) { grpccommon.EnableTracingOpt() newopts := []grpc.DialOption{ grpc.WithDefaultCallOptions( @@ -98,7 +109,7 @@ func Dial(target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.Clie newopts = append(newopts, interceptors()...) - return grpc.Dial(target, newopts...) + return grpc.DialContext(ctx, target, newopts...) } func interceptors() []grpc.DialOption { diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go index 3cb3a0c09d0..6e5b7e4b151 100644 --- a/go/vt/vttablet/grpctmclient/cached_client.go +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -131,7 +131,7 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab dialer.connWaitSema.Release() return client, closer, err } - return dialer.newdial(addr) + return dialer.newdial(ctx, addr) } defer func() { @@ -144,7 +144,7 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab dialerStats.DialTimeouts.Add(1) return nil, nil, ctx.Err() default: - if client, closer, found, err := dialer.pollOnce(addr); found { + if client, closer, found, err := dialer.pollOnce(ctx, addr); found { return client, closer, err } } @@ -193,7 +193,7 @@ func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (c // // It returns a TabletManagerClient impl, an io.Closer, a flag to indicate // whether the dial() poll loop should exit, and an error. -func (dialer *cachedConnDialer) pollOnce(addr string) (client tabletmanagerservicepb.TabletManagerClient, closer io.Closer, found bool, err error) { +func (dialer *cachedConnDialer) pollOnce(ctx context.Context, addr string) (client tabletmanagerservicepb.TabletManagerClient, closer io.Closer, found bool, err error) { dialer.m.Lock() if client, closer, found, err := dialer.tryFromCache(addr, nil); found { @@ -214,7 +214,7 @@ func (dialer *cachedConnDialer) pollOnce(addr string) (client tabletmanagerservi conn.cc.Close() dialer.m.Unlock() - client, closer, err = dialer.newdial(addr) + client, closer, err = dialer.newdial(ctx, addr) return client, closer, true, err } @@ -225,7 +225,7 @@ func (dialer *cachedConnDialer) pollOnce(addr string) (client tabletmanagerservi // // It returns the three-tuple of client-interface, closer, and error that the // main dial func returns. -func (dialer *cachedConnDialer) newdial(addr string) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { +func (dialer *cachedConnDialer) newdial(ctx context.Context, addr string) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { dialerStats.ConnNew.Add(1) opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name) @@ -234,7 +234,7 @@ func (dialer *cachedConnDialer) newdial(addr string) (tabletmanagerservicepb.Tab return nil, nil, err } - cc, err := grpcclient.Dial(addr, grpcclient.FailFast(false), opt) + cc, err := grpcclient.DialContext(ctx, addr, grpcclient.FailFast(false), opt) if err != nil { dialer.connWaitSema.Release() return nil, nil, err From 4c002015ef31a7e554f5fde14bd84312e436fb60 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 30 Jun 2021 14:33:29 -0400 Subject: [PATCH 12/13] Cleanup test code Signed-off-by: Andrew Mason --- .../grpctmclient/cached_client_test.go | 56 +------------------ 1 file changed, 2 insertions(+), 54 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/cached_client_test.go b/go/vt/vttablet/grpctmclient/cached_client_test.go index 068ff4b3118..096e0278150 100644 --- a/go/vt/vttablet/grpctmclient/cached_client_test.go +++ b/go/vt/vttablet/grpctmclient/cached_client_test.go @@ -22,9 +22,7 @@ import ( "io" "math/rand" "net" - "os" "runtime" - "runtime/pprof" "sync" "testing" "time" @@ -64,47 +62,11 @@ func grpcTestServer(t testing.TB, tm tabletmanager.RPCTM) (*net.TCPAddr, func()) } } -func BenchmarkCachedConnClient(b *testing.B) { - tmserv := tmrpctest.NewFakeRPCTM(b) - tablets := make([]*topodatapb.Tablet, 4) - for i := 0; i < len(tablets); i++ { - addr, shutdown := grpcTestServer(b, tmserv) - defer shutdown() - - tablets[i] = &topodatapb.Tablet{ - Alias: &topodatapb.TabletAlias{ - Cell: "test", - Uid: uint32(addr.Port), - }, - Hostname: addr.IP.String(), - PortMap: map[string]int32{ - "grpc": int32(addr.Port), - }, - } - } - - b.ResetTimer() - client := NewCachedConnClient(5) - defer client.Close() - - for i := 0; i < b.N; i++ { - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) - x := rand.Intn(len(tablets)) - err := client.Ping(ctx, tablets[x]) - if err != nil { - b.Errorf("error pinging tablet %v: %w", tablets[x].Hostname, err) - } - - cancel() - } -} - func BenchmarkCachedConnClientSteadyState(b *testing.B) { tmserv := tmrpctest.NewFakeRPCTM(b) tablets := make([]*topodatapb.Tablet, 1000) for i := 0; i < len(tablets); i++ { addr, shutdown := grpcTestServer(b, tmserv) - // b.Cleanup(shutdown) defer shutdown() tablets[i] = &topodatapb.Tablet{ @@ -120,7 +82,6 @@ func BenchmarkCachedConnClientSteadyState(b *testing.B) { } client := NewCachedConnClient(100) - // b.Cleanup(client.Close) defer client.Close() // fill the pool @@ -174,7 +135,6 @@ func BenchmarkCachedConnClientSteadyStateRedials(b *testing.B) { tablets := make([]*topodatapb.Tablet, 1000) for i := 0; i < len(tablets); i++ { addr, shutdown := grpcTestServer(b, tmserv) - // b.Cleanup(shutdown) defer shutdown() tablets[i] = &topodatapb.Tablet{ @@ -190,7 +150,6 @@ func BenchmarkCachedConnClientSteadyStateRedials(b *testing.B) { } client := NewCachedConnClient(1000) - // b.Cleanup(client.Close) defer client.Close() // fill the pool @@ -244,7 +203,7 @@ func BenchmarkCachedConnClientSteadyStateEvictions(b *testing.B) { tablets := make([]*topodatapb.Tablet, 1000) for i := 0; i < len(tablets); i++ { addr, shutdown := grpcTestServer(b, tmserv) - b.Cleanup(shutdown) + defer shutdown() tablets[i] = &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ @@ -259,7 +218,7 @@ func BenchmarkCachedConnClientSteadyStateEvictions(b *testing.B) { } client := NewCachedConnClient(100) - b.Cleanup(client.Close) + defer client.Close() // fill the pool for i := 0; i < 100; i++ { @@ -314,17 +273,6 @@ func BenchmarkCachedConnClientSteadyStateEvictions(b *testing.B) { func TestCachedConnClient(t *testing.T) { t.Parallel() - if os.Getenv("VT_PPROF_TEST") != "" { - file, err := os.Create(fmt.Sprintf("%s.profile.out", t.Name())) - require.NoError(t, err) - defer file.Close() - if err := pprof.StartCPUProfile(file); err != nil { - t.Errorf("failed to start cpu profile: %v", err) - return - } - defer pprof.StopCPUProfile() - } - testCtx, testCancel := context.WithCancel(context.Background()) wg := sync.WaitGroup{} procs := 0 From b5908a832111e68ba2dac735d93fcd8414207ab1 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 30 Jun 2021 14:41:17 -0400 Subject: [PATCH 13/13] Clean up comments + stats Signed-off-by: Andrew Mason --- go/vt/vttablet/grpctmclient/cached_client.go | 53 ++++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/go/vt/vttablet/grpctmclient/cached_client.go b/go/vt/vttablet/grpctmclient/cached_client.go index 6e5b7e4b151..2e55e62a79f 100644 --- a/go/vt/vttablet/grpctmclient/cached_client.go +++ b/go/vt/vttablet/grpctmclient/cached_client.go @@ -71,6 +71,7 @@ type cachedConnDialer struct { evict []*cachedConn evictSorted bool connWaitSema *sync2.Semaphore + capacity int } var dialerStats = struct { @@ -82,15 +83,17 @@ var dialerStats = struct { ConnReuse: stats.NewGauge("tabletmanagerclient_cachedconn_reuse", "number of times a call to dial() was able to reuse an existing connection"), ConnNew: stats.NewGauge("tabletmanagerclient_cachedconn_new", "number of times a call to dial() resulted in a dialing a new grpc clientconn"), DialTimeouts: stats.NewGauge("tabletmanagerclient_cachedconn_dial_timeouts", "number of context timeouts during dial()"), - DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dialtimings", "timings for various dial paths", "path", "rlock_fast", "sema_fast", "sema_poll"), + DialTimings: stats.NewTimings("tabletmanagerclient_cachedconn_dial_timings", "timings for various dial paths", "path", "cache_fast", "sema_fast", "sema_poll"), } -// NewCachedConnClient returns a grpc Client that caches connections to the different tablets +// NewCachedConnClient returns a grpc Client that caches connections to the +// different tablets. func NewCachedConnClient(capacity int) *Client { dialer := &cachedConnDialer{ conns: make(map[string]*cachedConn, capacity), evict: make([]*cachedConn, 0, capacity), connWaitSema: sync2.NewSemaphore(capacity, 0), + capacity: capacity, } return &Client{dialer} } @@ -115,7 +118,7 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab addr := getTabletAddr(tablet) if client, closer, found, err := dialer.tryFromCache(addr, &dialer.m); found { - dialerStats.DialTimings.Add("rlock_fast", time.Since(start)) + dialerStats.DialTimings.Add("cache_fast", time.Since(start)) return client, closer, err } @@ -126,7 +129,9 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab // Check if another goroutine managed to dial a conn for the same addr // while we were waiting for the write lock. This is identical to the - // read-lock section above. + // read-lock section above, except we release the connWaitSema if we + // are able to use the cache, allowing another goroutine to dial a new + // conn instead. if client, closer, found, err := dialer.tryFromCache(addr, &dialer.m); found { dialer.connWaitSema.Release() return client, closer, err @@ -160,8 +165,8 @@ func (dialer *cachedConnDialer) dial(ctx context.Context, tablet *topodatapb.Tab // if not nil, will be used to wrap the lookup and redial in that lock. This // function can be called in situations where the conns map is locked // externally (like in pollOnce), so we do not want to manage the locks here. In -// other cases (like in the rlock_fast path of dial()), we pass in the RLocker -// to ensure we have a read lock on the cache for the duration of the call. +// other cases (like in the cache_fast path of dial()), we pass in the dialer.m +// to ensure we have a lock on the cache for the duration of the call. func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (client tabletmanagerservicepb.TabletManagerClient, closer io.Closer, found bool, err error) { if locker != nil { locker.Lock() @@ -179,11 +184,10 @@ func (dialer *cachedConnDialer) tryFromCache(addr string, locker sync.Locker) (c // pollOnce is called on each iteration of the polling loop in dial(). It: // - locks the conns cache for writes // - attempts to get a connection from the cache. If found, redial() it and exit. -// - locks the queue // - peeks at the head of the eviction queue. if the peeked conn has no refs, it // is unused, and can be evicted to make room for the new connection to addr. // If the peeked conn has refs, exit. -// - pops the conn we just peeked from the queue, delete it from the cache, and +// - pops the conn we just peeked from the queue, deletes it from the cache, and // close the underlying ClientConn for that conn. // - attempt a newdial. if the newdial fails, it will release a slot on the // connWaitSema, so another dial() call can successfully acquire it to dial @@ -226,8 +230,6 @@ func (dialer *cachedConnDialer) pollOnce(ctx context.Context, addr string) (clie // It returns the three-tuple of client-interface, closer, and error that the // main dial func returns. func (dialer *cachedConnDialer) newdial(ctx context.Context, addr string) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { - dialerStats.ConnNew.Add(1) - opt, err := grpcclient.SecureDialOption(*cert, *key, *ca, *name) if err != nil { dialer.connWaitSema.Release() @@ -253,6 +255,8 @@ func (dialer *cachedConnDialer) newdial(ctx context.Context, addr string) (table return dialer.redialLocked(conn) } + dialerStats.ConnNew.Add(1) + conn := &cachedConn{ TabletManagerClient: tabletmanagerservicepb.NewTabletManagerClient(cc), cc: cc, @@ -260,20 +264,22 @@ func (dialer *cachedConnDialer) newdial(ctx context.Context, addr string) (table refs: 1, addr: addr, } + + // NOTE: we deliberately do not set dialer.evictSorted=false here. Since + // cachedConns are evicted from the front of the queue, and we are appending + // to the end, if there is already a second evictable connection, it will be + // at the front of the queue, so we can speed up the edge case where we need + // to evict multiple connections in a row. dialer.evict = append(dialer.evict, conn) - // dialer.evictSorted = false -- no need to do this here dialer.conns[addr] = conn return dialer.connWithCloser(conn) } -// redialLocked takes an already-dialed connection in the cache does all the work of -// lending that connection out to one more caller. this should only ever be -// called while holding at least the RLock on dialer.m (but the write lock is -// fine too), to prevent the connection from getting evicted out from under us. -// -// It returns the three-tuple of client-interface, closer, and error that the -// main dial func returns. +// redialLocked takes an already-dialed connection in the cache does all the +// work of lending that connection out to one more caller. It returns the +// three-tuple of client-interface, closer, and error that the main dial func +// returns. func (dialer *cachedConnDialer) redialLocked(conn *cachedConn) (tabletmanagerservicepb.TabletManagerClient, io.Closer, error) { dialerStats.ConnReuse.Add(1) conn.lastAccessTime = time.Now() @@ -298,10 +304,13 @@ func (dialer *cachedConnDialer) connWithCloser(conn *cachedConn) (tabletmanagers // Close closes all currently cached connections, ***regardless of whether // those connections are in use***. Calling Close therefore will fail any RPCs // using currently lent-out connections, and, furthermore, will invalidate the -// io.Closer that was returned for that connection from dialer.dial(). +// io.Closer that was returned for that connection from dialer.dial(). When +// calling those io.Closers, they will still lock the dialer's mutex, and then +// perform needless operations that will slow down dial throughput, but not +// actually impact the correctness of the internal state of the dialer. // -// As a result, it is not safe to reuse a cachedConnDialer after calling Close, -// and you should instead obtain a new one by calling either +// As a result, while it is safe to reuse a cachedConnDialer after calling Close, +// it will be less performant than getting a new one, either by calling // tmclient.TabletManagerClient() with // TabletManagerProtocol set to "grpc-cached", or by calling // grpctmclient.NewCachedConnClient directly. @@ -314,7 +323,7 @@ func (dialer *cachedConnDialer) Close() { delete(dialer.conns, conn.addr) dialer.connWaitSema.Release() } - dialer.evict = nil + dialer.evict = make([]*cachedConn, 0, dialer.capacity) } func getTabletAddr(tablet *topodatapb.Tablet) string {