From 386c845ed1b04803a9c0c471a3cc5d90c0aed0ae Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 20 Jan 2022 14:12:56 +0100 Subject: [PATCH 1/5] rpc: add a context arg to grpcDialRaw() Release note: None --- pkg/rpc/context.go | 15 +++++++++------ pkg/rpc/context_test.go | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index babf390fc358..6041424fa32d 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -1139,11 +1139,14 @@ type delayingHeader struct { // This method implies a DefaultClass ConnectionClass for the returned // ClientConn. func (rpcCtx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) { - return rpcCtx.grpcDialRaw(target, 0, DefaultClass) + return rpcCtx.grpcDialRaw(rpcCtx.masterCtx, target, 0, DefaultClass) } +// grpcDialRaw connects to the remote node. +// The ctx passed as argument must be derived from rpcCtx.masterCtx, so +// that it respects the same cancellation policy. func (rpcCtx *Context) grpcDialRaw( - target string, remoteNodeID roachpb.NodeID, class ConnectionClass, + ctx context.Context, target string, remoteNodeID roachpb.NodeID, class ConnectionClass, ) (*grpc.ClientConn, <-chan struct{}, error) { dialOpts, err := rpcCtx.grpcDialOptions(target, class) if err != nil { @@ -1174,7 +1177,7 @@ func (rpcCtx *Context) grpcDialRaw( dialerFunc := dialer.dial if rpcCtx.Knobs.ArtificialLatencyMap != nil { latency := rpcCtx.Knobs.ArtificialLatencyMap[target] - log.VEventf(rpcCtx.masterCtx, 1, "connecting to node %s (%d) with simulated latency %dms", target, remoteNodeID, + log.VEventf(ctx, 1, "connecting to node %s (%d) with simulated latency %dms", target, remoteNodeID, latency) dialer := artificialLatencyDialer{ dialerFunc: dialerFunc, @@ -1189,8 +1192,8 @@ func (rpcCtx *Context) grpcDialRaw( // behavior and redialChan will never be closed). dialOpts = append(dialOpts, rpcCtx.testingDialOpts...) - log.Health.Infof(rpcCtx.masterCtx, "dialing n%v: %s (%v)", remoteNodeID, target, class) - conn, err := grpc.DialContext(rpcCtx.masterCtx, target, dialOpts...) + log.Health.Infof(ctx, "dialing n%v: %s (%v)", remoteNodeID, target, class) + conn, err := grpc.DialContext(ctx, target, dialOpts...) if err != nil && rpcCtx.masterCtx.Err() != nil { // If the node is draining, discard the error (which is likely gRPC's version // of context.Canceled) and return errDialRejected which instructs callers not @@ -1273,7 +1276,7 @@ func (rpcCtx *Context) grpcDialNodeInternal( // Either we kick off the heartbeat loop (and clean up when it's done), // or we clean up the connKey entries immediately. var redialChan <-chan struct{} - conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(target, remoteNodeID, class) + conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(rpcCtx.masterCtx, target, remoteNodeID, class) if conn.dialErr == nil { if err := rpcCtx.Stopper.RunAsyncTask( rpcCtx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) { diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 1b0a76878aac..cb08250be9b3 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1889,7 +1889,7 @@ func TestRunHeartbeatSetsHeartbeatStateWhenExitingBeforeFirstHeartbeat(t *testin redialChan := make(chan struct{}) close(redialChan) - c.grpcConn, _, c.dialErr = rpcCtx.grpcDialRaw(remoteAddr, serverNodeID, DefaultClass) + c.grpcConn, _, c.dialErr = rpcCtx.grpcDialRaw(rpcCtx.masterCtx, remoteAddr, serverNodeID, DefaultClass) require.NoError(t, c.dialErr) // It is possible that the redial chan being closed is not seen on the first // pass through the loop. From 592d5c26f242666519c455c55305a71314596470 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 20 Jan 2022 14:20:04 +0100 Subject: [PATCH 2/5] rpc: add context arg to grpcDialNodeInternal() Release note: None --- pkg/rpc/context.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 6041424fa32d..f06d23259ef4 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -1208,7 +1208,7 @@ func (rpcCtx *Context) grpcDialRaw( // used with the gossip client and CLI commands which can talk to any // node. This method implies a SystemClass. func (rpcCtx *Context) GRPCUnvalidatedDial(target string) *Connection { - return rpcCtx.grpcDialNodeInternal(target, 0, SystemClass) + return rpcCtx.grpcDialNodeInternal(rpcCtx.masterCtx, target, 0, SystemClass) } // GRPCDialNode calls grpc.Dial with options appropriate for the @@ -1224,7 +1224,7 @@ func (rpcCtx *Context) GRPCDialNode( if remoteNodeID == 0 && !rpcCtx.TestingAllowNamedRPCToAnonymousServer { log.Fatalf(context.TODO(), "%v", errors.AssertionFailedf("invalid node ID 0 in GRPCDialNode()")) } - return rpcCtx.grpcDialNodeInternal(target, remoteNodeID, class) + return rpcCtx.grpcDialNodeInternal(rpcCtx.masterCtx, target, remoteNodeID, class) } // GRPCDialPod wraps GRPCDialNode and treats the `remoteInstanceID` @@ -1240,8 +1240,11 @@ func (rpcCtx *Context) GRPCDialPod( return rpcCtx.GRPCDialNode(target, roachpb.NodeID(remoteInstanceID), class) } +// grpcDialNodeInternal connects to the remote node and sets up the async heartbeater. +// The ctx passed as argument must be derived from rpcCtx.masterCtx, so +// that it respects the same cancellation policy. func (rpcCtx *Context) grpcDialNodeInternal( - target string, remoteNodeID roachpb.NodeID, class ConnectionClass, + ctx context.Context, target string, remoteNodeID roachpb.NodeID, class ConnectionClass, ) *Connection { thisConnKeys := []connKey{{target, remoteNodeID, class}} value, ok := rpcCtx.conns.Load(thisConnKeys[0]) @@ -1276,10 +1279,10 @@ func (rpcCtx *Context) grpcDialNodeInternal( // Either we kick off the heartbeat loop (and clean up when it's done), // or we clean up the connKey entries immediately. var redialChan <-chan struct{} - conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(rpcCtx.masterCtx, target, remoteNodeID, class) + conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(ctx, target, remoteNodeID, class) if conn.dialErr == nil { if err := rpcCtx.Stopper.RunAsyncTask( - rpcCtx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) { + ctx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) { err := rpcCtx.runHeartbeat(conn, target, redialChan) if err != nil && !grpcutil.IsClosedConnection(err) && !grpcutil.IsConnectionRejected(err) { From 8bd737f8d9669c9959dfee11e2537add604359e6 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 20 Jan 2022 14:25:00 +0100 Subject: [PATCH 3/5] rpc: add a context arg to runHeartbeat() Release note: None --- pkg/rpc/context.go | 25 ++++++++++++++----------- pkg/rpc/context_test.go | 4 +++- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index f06d23259ef4..690987f932c1 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -1282,11 +1282,11 @@ func (rpcCtx *Context) grpcDialNodeInternal( conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(ctx, target, remoteNodeID, class) if conn.dialErr == nil { if err := rpcCtx.Stopper.RunAsyncTask( - ctx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) { - err := rpcCtx.runHeartbeat(conn, target, redialChan) + ctx, "rpc.Context: grpc heartbeat", func(ctx context.Context) { + err := rpcCtx.runHeartbeat(ctx, conn, target, redialChan) if err != nil && !grpcutil.IsClosedConnection(err) && !grpcutil.IsConnectionRejected(err) { - log.Health.Errorf(masterCtx, "removing connection to %s due to error: %v", target, err) + log.Health.Errorf(ctx, "removing connection to %s due to error: %v", target, err) } rpcCtx.removeConn(conn, thisConnKeys...) }); err != nil { @@ -1322,8 +1322,11 @@ var ErrNotHeartbeated = errors.New("not yet heartbeated") // the node. var ErrNoConnection = errors.New("no connection found") +// runHeartbeat runs the heartbeat loop for the given RPC connection. +// The ctx passed as argument must be derived from rpcCtx.masterCtx, so +// that it respects the same cancellation policy. func (rpcCtx *Context) runHeartbeat( - conn *Connection, target string, redialChan <-chan struct{}, + ctx context.Context, conn *Connection, target string, redialChan <-chan struct{}, ) (retErr error) { rpcCtx.metrics.HeartbeatLoopsStarted.Inc(1) // setInitialHeartbeatDone is idempotent and is critical to notify Connect @@ -1371,7 +1374,7 @@ func (rpcCtx *Context) runHeartbeat( heartbeatTimer.Read = true } - if err := rpcCtx.Stopper.RunTaskWithErr(rpcCtx.masterCtx, "rpc heartbeat", func(goCtx context.Context) error { + if err := rpcCtx.Stopper.RunTaskWithErr(ctx, "rpc heartbeat", func(ctx context.Context) error { // We re-mint the PingRequest to pick up any asynchronous update to clusterID. clusterID := rpcCtx.ClusterID.Get() request := &PingRequest{ @@ -1390,7 +1393,7 @@ func (rpcCtx *Context) runHeartbeat( var response *PingResponse sendTime := rpcCtx.Clock.PhysicalTime() - ping := func(goCtx context.Context) error { + ping := func(ctx context.Context) error { // NB: We want the request to fail-fast (the default), otherwise we won't // be notified of transport failures. if err := interceptor(request); err != nil { @@ -1398,14 +1401,14 @@ func (rpcCtx *Context) runHeartbeat( return err } var err error - response, err = heartbeatClient.Ping(goCtx, request) + response, err = heartbeatClient.Ping(ctx, request) return err } var err error if rpcCtx.heartbeatTimeout > 0 { - err = contextutil.RunWithTimeout(goCtx, "rpc heartbeat", rpcCtx.heartbeatTimeout, ping) + err = contextutil.RunWithTimeout(ctx, "rpc heartbeat", rpcCtx.heartbeatTimeout, ping) } else { - err = ping(goCtx) + err = ping(ctx) } if grpcutil.IsConnectionRejected(err) { @@ -1431,7 +1434,7 @@ func (rpcCtx *Context) runHeartbeat( if err == nil { err = errors.Wrap( - checkVersion(goCtx, rpcCtx.Settings, response.ServerVersion), + checkVersion(ctx, rpcCtx.Settings, response.ServerVersion), "version compatibility check failed on ping response") if err != nil { returnErr = true @@ -1459,7 +1462,7 @@ func (rpcCtx *Context) runHeartbeat( remoteTimeNow := timeutil.Unix(0, response.ServerTime).Add(pingDuration / 2) request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds() } - rpcCtx.RemoteClocks.UpdateOffset(rpcCtx.masterCtx, target, request.Offset, pingDuration) + rpcCtx.RemoteClocks.UpdateOffset(ctx, target, request.Offset, pingDuration) if cb := rpcCtx.HeartbeatCB; cb != nil { cb() diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index cb08250be9b3..0fab06554512 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -1893,7 +1893,9 @@ func TestRunHeartbeatSetsHeartbeatStateWhenExitingBeforeFirstHeartbeat(t *testin require.NoError(t, c.dialErr) // It is possible that the redial chan being closed is not seen on the first // pass through the loop. - err = rpcCtx.runHeartbeat(c, "", redialChan) + // NB: we use rpcCtx.masterCtx and not just ctx because we need + // this to be cancelled when the RPC context is closed. + err = rpcCtx.runHeartbeat(rpcCtx.masterCtx, c, "", redialChan) require.EqualError(t, err, grpcutil.ErrCannotReuseClientConn.Error()) // Even when the runHeartbeat returns, we could have heartbeated successfully. // If we did not, then we expect the `not yet heartbeated` error. From cc38dab576f600b125aa191899cf75dea06a3301 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 20 Jan 2022 14:29:46 +0100 Subject: [PATCH 4/5] rpc: annotate logs with remote addr, nodeID, class This ensures that the log messages sent to the DEV channel relating to RPC low-level activity (e.g. conn failures, heartbeat etc) include the remote node ID and address and connection class. Example before: ``` [n6] 87 dialing ``` After: ``` [n6,rnode=4,raddr=127.0.0.1:41706,class=default] 87 dialing ``` Release note: None --- pkg/rpc/context.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 690987f932c1..95807b89a0ce 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/syncmap" @@ -1132,6 +1133,20 @@ type delayingHeader struct { DelayMS int32 } +func (rpcCtx *Context) makeDialCtx( + target string, remoteNodeID roachpb.NodeID, class ConnectionClass, +) context.Context { + dialCtx := rpcCtx.masterCtx + var rnodeID interface{} = remoteNodeID + if remoteNodeID == 0 { + rnodeID = '?' + } + dialCtx = logtags.AddTag(dialCtx, "rnode", rnodeID) + dialCtx = logtags.AddTag(dialCtx, "raddr", target) + dialCtx = logtags.AddTag(dialCtx, "class", class) + return dialCtx +} + // GRPCDialRaw calls grpc.Dial with options appropriate for the context. // Unlike GRPCDialNode, it does not start an RPC heartbeat to validate the // connection. This connection will not be reconnected automatically; @@ -1139,7 +1154,8 @@ type delayingHeader struct { // This method implies a DefaultClass ConnectionClass for the returned // ClientConn. func (rpcCtx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) { - return rpcCtx.grpcDialRaw(rpcCtx.masterCtx, target, 0, DefaultClass) + ctx := rpcCtx.makeDialCtx(target, 0, DefaultClass) + return rpcCtx.grpcDialRaw(ctx, target, 0, DefaultClass) } // grpcDialRaw connects to the remote node. @@ -1208,7 +1224,8 @@ func (rpcCtx *Context) grpcDialRaw( // used with the gossip client and CLI commands which can talk to any // node. This method implies a SystemClass. func (rpcCtx *Context) GRPCUnvalidatedDial(target string) *Connection { - return rpcCtx.grpcDialNodeInternal(rpcCtx.masterCtx, target, 0, SystemClass) + ctx := rpcCtx.makeDialCtx(target, 0, SystemClass) + return rpcCtx.grpcDialNodeInternal(ctx, target, 0, SystemClass) } // GRPCDialNode calls grpc.Dial with options appropriate for the @@ -1221,10 +1238,11 @@ func (rpcCtx *Context) GRPCUnvalidatedDial(target string) *Connection { func (rpcCtx *Context) GRPCDialNode( target string, remoteNodeID roachpb.NodeID, class ConnectionClass, ) *Connection { + ctx := rpcCtx.makeDialCtx(target, remoteNodeID, class) if remoteNodeID == 0 && !rpcCtx.TestingAllowNamedRPCToAnonymousServer { - log.Fatalf(context.TODO(), "%v", errors.AssertionFailedf("invalid node ID 0 in GRPCDialNode()")) + log.Fatalf(ctx, "%v", errors.AssertionFailedf("invalid node ID 0 in GRPCDialNode()")) } - return rpcCtx.grpcDialNodeInternal(rpcCtx.masterCtx, target, remoteNodeID, class) + return rpcCtx.grpcDialNodeInternal(ctx, target, remoteNodeID, class) } // GRPCDialPod wraps GRPCDialNode and treats the `remoteInstanceID` From 256f2b7a6493aad9f444f701827b329b6400cfbe Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 20 Jan 2022 14:33:07 +0100 Subject: [PATCH 5/5] rpc: minor polish to log messages This removes now-redundant logging output (since the same data is included in tags) and adds a tag for the heartbeat background task. For example, before: ``` [n6,rnode=4,raddr=127.0.0.1:41706,class=default] 87 dialing n4: 127.0.0.1:41706 (default) ``` After: ``` [n6,rnode=4,raddr=127.0.0.1:41706,class=default] 87 dialing ``` Release note: None --- pkg/rpc/context.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 95807b89a0ce..4cdd89f8e131 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -1193,7 +1193,7 @@ func (rpcCtx *Context) grpcDialRaw( dialerFunc := dialer.dial if rpcCtx.Knobs.ArtificialLatencyMap != nil { latency := rpcCtx.Knobs.ArtificialLatencyMap[target] - log.VEventf(ctx, 1, "connecting to node %s (%d) with simulated latency %dms", target, remoteNodeID, + log.VEventf(ctx, 1, "connecting with simulated latency %dms", latency) dialer := artificialLatencyDialer{ dialerFunc: dialerFunc, @@ -1208,7 +1208,7 @@ func (rpcCtx *Context) grpcDialRaw( // behavior and redialChan will never be closed). dialOpts = append(dialOpts, rpcCtx.testingDialOpts...) - log.Health.Infof(ctx, "dialing n%v: %s (%v)", remoteNodeID, target, class) + log.Health.Infof(ctx, "dialing") conn, err := grpc.DialContext(ctx, target, dialOpts...) if err != nil && rpcCtx.masterCtx.Err() != nil { // If the node is draining, discard the error (which is likely gRPC's version @@ -1300,7 +1300,8 @@ func (rpcCtx *Context) grpcDialNodeInternal( conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(ctx, target, remoteNodeID, class) if conn.dialErr == nil { if err := rpcCtx.Stopper.RunAsyncTask( - ctx, "rpc.Context: grpc heartbeat", func(ctx context.Context) { + logtags.AddTag(ctx, "heartbeat", nil), + "rpc.Context: grpc heartbeat", func(ctx context.Context) { err := rpcCtx.runHeartbeat(ctx, conn, target, redialChan) if err != nil && !grpcutil.IsClosedConnection(err) && !grpcutil.IsConnectionRejected(err) {