Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: add logging tags for rpc connect events #71243

Merged
merged 5 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 49 additions & 21 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/syncmap"
Expand Down Expand Up @@ -1132,18 +1133,36 @@ type delayingHeader struct {
DelayMS int32
}

func (rpcCtx *Context) makeDialCtx(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) context.Context {
dialCtx := rpcCtx.masterCtx
var rnodeID interface{} = remoteNodeID
if remoteNodeID == 0 {
rnodeID = '?'
}
dialCtx = logtags.AddTag(dialCtx, "rnode", rnodeID)
dialCtx = logtags.AddTag(dialCtx, "raddr", target)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets marked as sensitive and redacted in logs. Is that intentional? I'm not sure if we consider IP addresses to be sensitive, but it would make debugging a bit harder.

dialCtx = logtags.AddTag(dialCtx, "class", class)
return dialCtx
}

// GRPCDialRaw calls grpc.Dial with options appropriate for the context.
// Unlike GRPCDialNode, it does not start an RPC heartbeat to validate the
// connection. This connection will not be reconnected automatically;
// the returned channel is closed when a reconnection is attempted.
// This method implies a DefaultClass ConnectionClass for the returned
// ClientConn.
func (rpcCtx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) {
return rpcCtx.grpcDialRaw(target, 0, DefaultClass)
ctx := rpcCtx.makeDialCtx(target, 0, DefaultClass)
return rpcCtx.grpcDialRaw(ctx, target, 0, DefaultClass)
}

// grpcDialRaw connects to the remote node.
// The ctx passed as argument must be derived from rpcCtx.masterCtx, so
// that it respects the same cancellation policy.
func (rpcCtx *Context) grpcDialRaw(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
ctx context.Context, target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) (*grpc.ClientConn, <-chan struct{}, error) {
dialOpts, err := rpcCtx.grpcDialOptions(target, class)
if err != nil {
Expand Down Expand Up @@ -1174,7 +1193,7 @@ func (rpcCtx *Context) grpcDialRaw(
dialerFunc := dialer.dial
if rpcCtx.Knobs.ArtificialLatencyMap != nil {
latency := rpcCtx.Knobs.ArtificialLatencyMap[target]
log.VEventf(rpcCtx.masterCtx, 1, "connecting to node %s (%d) with simulated latency %dms", target, remoteNodeID,
log.VEventf(ctx, 1, "connecting with simulated latency %dms",
latency)
dialer := artificialLatencyDialer{
dialerFunc: dialerFunc,
Expand All @@ -1189,8 +1208,8 @@ func (rpcCtx *Context) grpcDialRaw(
// behavior and redialChan will never be closed).
dialOpts = append(dialOpts, rpcCtx.testingDialOpts...)

log.Health.Infof(rpcCtx.masterCtx, "dialing n%v: %s (%v)", remoteNodeID, target, class)
conn, err := grpc.DialContext(rpcCtx.masterCtx, target, dialOpts...)
log.Health.Infof(ctx, "dialing")
conn, err := grpc.DialContext(ctx, target, dialOpts...)
if err != nil && rpcCtx.masterCtx.Err() != nil {
// If the node is draining, discard the error (which is likely gRPC's version
// of context.Canceled) and return errDialRejected which instructs callers not
Expand All @@ -1205,7 +1224,8 @@ func (rpcCtx *Context) grpcDialRaw(
// used with the gossip client and CLI commands which can talk to any
// node. This method implies a SystemClass.
func (rpcCtx *Context) GRPCUnvalidatedDial(target string) *Connection {
return rpcCtx.grpcDialNodeInternal(target, 0, SystemClass)
ctx := rpcCtx.makeDialCtx(target, 0, SystemClass)
return rpcCtx.grpcDialNodeInternal(ctx, target, 0, SystemClass)
}

// GRPCDialNode calls grpc.Dial with options appropriate for the
Expand All @@ -1218,10 +1238,11 @@ func (rpcCtx *Context) GRPCUnvalidatedDial(target string) *Connection {
func (rpcCtx *Context) GRPCDialNode(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) *Connection {
ctx := rpcCtx.makeDialCtx(target, remoteNodeID, class)
if remoteNodeID == 0 && !rpcCtx.TestingAllowNamedRPCToAnonymousServer {
log.Fatalf(context.TODO(), "%v", errors.AssertionFailedf("invalid node ID 0 in GRPCDialNode()"))
log.Fatalf(ctx, "%v", errors.AssertionFailedf("invalid node ID 0 in GRPCDialNode()"))
}
return rpcCtx.grpcDialNodeInternal(target, remoteNodeID, class)
return rpcCtx.grpcDialNodeInternal(ctx, target, remoteNodeID, class)
}

// GRPCDialPod wraps GRPCDialNode and treats the `remoteInstanceID`
Expand All @@ -1237,8 +1258,11 @@ func (rpcCtx *Context) GRPCDialPod(
return rpcCtx.GRPCDialNode(target, roachpb.NodeID(remoteInstanceID), class)
}

// grpcDialNodeInternal connects to the remote node and sets up the async heartbeater.
// The ctx passed as argument must be derived from rpcCtx.masterCtx, so
// that it respects the same cancellation policy.
func (rpcCtx *Context) grpcDialNodeInternal(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
ctx context.Context, target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) *Connection {
thisConnKeys := []connKey{{target, remoteNodeID, class}}
value, ok := rpcCtx.conns.Load(thisConnKeys[0])
Expand Down Expand Up @@ -1273,14 +1297,15 @@ func (rpcCtx *Context) grpcDialNodeInternal(
// Either we kick off the heartbeat loop (and clean up when it's done),
// or we clean up the connKey entries immediately.
var redialChan <-chan struct{}
conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(target, remoteNodeID, class)
conn.grpcConn, redialChan, conn.dialErr = rpcCtx.grpcDialRaw(ctx, target, remoteNodeID, class)
if conn.dialErr == nil {
if err := rpcCtx.Stopper.RunAsyncTask(
rpcCtx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) {
err := rpcCtx.runHeartbeat(conn, target, redialChan)
logtags.AddTag(ctx, "heartbeat", nil),
"rpc.Context: grpc heartbeat", func(ctx context.Context) {
err := rpcCtx.runHeartbeat(ctx, conn, target, redialChan)
if err != nil && !grpcutil.IsClosedConnection(err) &&
!grpcutil.IsConnectionRejected(err) {
log.Health.Errorf(masterCtx, "removing connection to %s due to error: %v", target, err)
log.Health.Errorf(ctx, "removing connection to %s due to error: %v", target, err)
}
rpcCtx.removeConn(conn, thisConnKeys...)
}); err != nil {
Expand Down Expand Up @@ -1316,8 +1341,11 @@ var ErrNotHeartbeated = errors.New("not yet heartbeated")
// the node.
var ErrNoConnection = errors.New("no connection found")

// runHeartbeat runs the heartbeat loop for the given RPC connection.
// The ctx passed as argument must be derived from rpcCtx.masterCtx, so
// that it respects the same cancellation policy.
func (rpcCtx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
ctx context.Context, conn *Connection, target string, redialChan <-chan struct{},
) (retErr error) {
rpcCtx.metrics.HeartbeatLoopsStarted.Inc(1)
// setInitialHeartbeatDone is idempotent and is critical to notify Connect
Expand Down Expand Up @@ -1365,7 +1393,7 @@ func (rpcCtx *Context) runHeartbeat(
heartbeatTimer.Read = true
}

if err := rpcCtx.Stopper.RunTaskWithErr(rpcCtx.masterCtx, "rpc heartbeat", func(goCtx context.Context) error {
if err := rpcCtx.Stopper.RunTaskWithErr(ctx, "rpc heartbeat", func(ctx context.Context) error {
// We re-mint the PingRequest to pick up any asynchronous update to clusterID.
clusterID := rpcCtx.ClusterID.Get()
request := &PingRequest{
Expand All @@ -1384,22 +1412,22 @@ func (rpcCtx *Context) runHeartbeat(

var response *PingResponse
sendTime := rpcCtx.Clock.PhysicalTime()
ping := func(goCtx context.Context) error {
ping := func(ctx context.Context) error {
// NB: We want the request to fail-fast (the default), otherwise we won't
// be notified of transport failures.
if err := interceptor(request); err != nil {
returnErr = true
return err
}
var err error
response, err = heartbeatClient.Ping(goCtx, request)
response, err = heartbeatClient.Ping(ctx, request)
return err
}
var err error
if rpcCtx.heartbeatTimeout > 0 {
err = contextutil.RunWithTimeout(goCtx, "rpc heartbeat", rpcCtx.heartbeatTimeout, ping)
err = contextutil.RunWithTimeout(ctx, "rpc heartbeat", rpcCtx.heartbeatTimeout, ping)
} else {
err = ping(goCtx)
err = ping(ctx)
}

if grpcutil.IsConnectionRejected(err) {
Expand All @@ -1425,7 +1453,7 @@ func (rpcCtx *Context) runHeartbeat(

if err == nil {
err = errors.Wrap(
checkVersion(goCtx, rpcCtx.Settings, response.ServerVersion),
checkVersion(ctx, rpcCtx.Settings, response.ServerVersion),
"version compatibility check failed on ping response")
if err != nil {
returnErr = true
Expand Down Expand Up @@ -1453,7 +1481,7 @@ func (rpcCtx *Context) runHeartbeat(
remoteTimeNow := timeutil.Unix(0, response.ServerTime).Add(pingDuration / 2)
request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds()
}
rpcCtx.RemoteClocks.UpdateOffset(rpcCtx.masterCtx, target, request.Offset, pingDuration)
rpcCtx.RemoteClocks.UpdateOffset(ctx, target, request.Offset, pingDuration)

if cb := rpcCtx.HeartbeatCB; cb != nil {
cb()
Expand Down
6 changes: 4 additions & 2 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1889,11 +1889,13 @@ func TestRunHeartbeatSetsHeartbeatStateWhenExitingBeforeFirstHeartbeat(t *testin
redialChan := make(chan struct{})
close(redialChan)

c.grpcConn, _, c.dialErr = rpcCtx.grpcDialRaw(remoteAddr, serverNodeID, DefaultClass)
c.grpcConn, _, c.dialErr = rpcCtx.grpcDialRaw(rpcCtx.masterCtx, remoteAddr, serverNodeID, DefaultClass)
require.NoError(t, c.dialErr)
// It is possible that the redial chan being closed is not seen on the first
// pass through the loop.
err = rpcCtx.runHeartbeat(c, "", redialChan)
// NB: we use rpcCtx.masterCtx and not just ctx because we need
// this to be cancelled when the RPC context is closed.
err = rpcCtx.runHeartbeat(rpcCtx.masterCtx, c, "", redialChan)
require.EqualError(t, err, grpcutil.ErrCannotReuseClientConn.Error())
// Even when the runHeartbeat returns, we could have heartbeated successfully.
// If we did not, then we expect the `not yet heartbeated` error.
Expand Down