Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: retain information about failed connections #99191

Merged
merged 17 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ require (
github.com/edsrzf/mmap-go v1.0.0
github.com/elastic/gosigar v0.14.1
github.com/emicklei/dot v0.15.0
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a
github.com/fatih/color v1.9.0
github.com/fraugster/parquet-go v0.10.0
github.com/fsnotify/fsnotify v1.5.1
Expand Down Expand Up @@ -278,6 +277,7 @@ require (
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion pkg/gossip/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
Expand Down
30 changes: 15 additions & 15 deletions pkg/gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sync"
"time"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -65,15 +64,13 @@ func newClient(ambient log.AmbientContext, addr net.Addr, nodeMetrics Metrics) *
}
}

var logFailedStartEvery = log.Every(5 * time.Second)

// start dials the remote addr and commences gossip once connected. Upon exit,
// the client is sent on the disconnected channel. This method starts client
// processing in a goroutine and returns immediately.
func (c *client) startLocked(
g *Gossip,
disconnected chan *client,
rpcCtx *rpc.Context,
stopper *stop.Stopper,
breaker *circuit.Breaker,
g *Gossip, disconnected chan *client, rpcCtx *rpc.Context, stopper *stop.Stopper,
) {
// Add a placeholder for the new outgoing connection because we may not know
// the ID of the node we're connecting to yet. This will be resolved in
Expand All @@ -98,23 +95,26 @@ func (c *client) startLocked(
disconnected <- c
}()

consecFailures := breaker.ConsecFailures()
var stream Gossip_GossipClient
if err := breaker.Call(func() error {
stream, err := func() (Gossip_GossipClient, error) {
// Note: avoid using `grpc.WithBlock` here. This code is already
// asynchronous from the caller's perspective, so the only effect of
// `WithBlock` here is blocking shutdown - at the time of this writing,
// that ends ups up making `kv` tests take twice as long.
conn, err := rpcCtx.GRPCUnvalidatedDial(c.addr.String()).Connect(ctx)
if err != nil {
return err
return nil, err
}
if stream, err = NewGossipClient(conn).Gossip(ctx); err != nil {
return err
stream, err := NewGossipClient(conn).Gossip(ctx)
if err != nil {
return nil, err
}
if err := c.requestGossip(g, stream); err != nil {
return nil, err
}
return c.requestGossip(g, stream)
}, 0); err != nil {
if consecFailures == 0 {
return stream, nil
}()
if err != nil {
if logFailedStartEvery.ShouldLog() {
log.Warningf(ctx, "failed to start gossip client to %s: %s", c.addr, err)
}
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func gossipSucceedsSoon(
// If the client wasn't able to connect, restart it.
g := gossip[client]
g.mu.Lock()
client.startLocked(g, disconnected, rpcContext, stopper, rpcContext.NewBreaker(""))
client.startLocked(g, disconnected, rpcContext, stopper)
g.mu.Unlock()
default:
}
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestClientNodeID(t *testing.T) {
case <-disconnected:
// The client hasn't been started or failed to start, loop and try again.
local.mu.Lock()
c.startLocked(local, disconnected, rpcContext, stopper, rpcContext.NewBreaker(""))
c.startLocked(local, disconnected, rpcContext, stopper)
local.mu.Unlock()
}
}
Expand Down
12 changes: 1 addition & 11 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"time"
"unsafe"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
Expand Down Expand Up @@ -233,8 +232,6 @@ type Gossip struct {
clientsMu struct {
syncutil.Mutex
clients []*client
// One breaker per client for the life of the process.
breakers map[string]*circuit.Breaker
}

disconnected chan *client // Channel of disconnected clients
Expand Down Expand Up @@ -307,7 +304,6 @@ func New(
stopper.AddCloser(stop.CloserFn(g.server.AmbientContext.FinishEventLog))

registry.AddMetric(g.outgoing.gauge)
g.clientsMu.breakers = map[string]*circuit.Breaker{}

g.mu.Lock()
// Add ourselves as a SystemConfig watcher.
Expand Down Expand Up @@ -1456,17 +1452,11 @@ func (g *Gossip) signalConnectedLocked() {
func (g *Gossip) startClientLocked(addr util.UnresolvedAddr, rpcContext *rpc.Context) {
g.clientsMu.Lock()
defer g.clientsMu.Unlock()
breaker, ok := g.clientsMu.breakers[addr.String()]
if !ok {
name := fmt.Sprintf("gossip %v->%v", rpcContext.Config.Addr, addr)
breaker = rpcContext.NewBreaker(name)
g.clientsMu.breakers[addr.String()] = breaker
}
ctx := g.AnnotateCtx(context.TODO())
log.VEventf(ctx, 1, "starting new client to %s", addr)
c := newClient(g.server.AmbientContext, &addr, g.serverMetrics)
g.clientsMu.clients = append(g.clientsMu.clients, c)
c.startLocked(g, g.disconnected, rpcContext, g.server.stopper, breaker)
c.startLocked(g, g.disconnected, rpcContext, g.server.stopper)
}

// removeClientLocked removes the specified client. Called when a client
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
localAddr := local.GetNodeAddr()
c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), localAddr, makeMetrics())
peer.mu.Lock()
c.startLocked(peer, disconnectedCh, peerCtx, stopper, peerCtx.NewBreaker(""))
c.startLocked(peer, disconnectedCh, peerCtx, stopper)
peer.mu.Unlock()

disconnectedClient := <-disconnectedCh
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"data.go",
"errors.go",
"method.go",
"node_decommissioned_error.go",
"replica_unavailable_error.go",
":gen-batch-generated", # keep
":gen-errordetailtype-stringer", # keep
Expand All @@ -42,7 +43,10 @@ go_library(
"@com_github_cockroachdb_redact//:redact",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_gogo_status//:status",
"@com_github_golang_mock//gomock", # keep
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata", # keep
],
)
Expand All @@ -53,6 +57,7 @@ go_test(
"api_test.go",
"batch_test.go",
"errors_test.go",
"node_decommissioned_error_test.go",
"replica_unavailable_error_test.go",
"string_test.go",
],
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ message NotLeaseHolderError {
message NodeUnavailableError {
}

// A NodeDecommissionedError is returned when trying to connect from or to a
// peer that is known to have been decommissioned.
message NodeDecommissionedError {
}

// An UnsupportedRequestError indicates that the recipient node
// does not know how to handle the type of request received.
message UnsupportedRequestError {
Expand Down
58 changes: 58 additions & 0 deletions pkg/kv/kvpb/node_decommissioned_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvpb

import (
"github.com/cockroachdb/errors"
_ "github.com/cockroachdb/errors/extgrpc" // for error encoder/decoder
"github.com/cockroachdb/redact"
ptypes "github.com/gogo/protobuf/types"
gogostatus "github.com/gogo/status"
"google.golang.org/grpc/codes"
)

// NewDecommissionedStatusErrorf returns a GRPC status with the given error code
// and formatted message whose detail is a *NodeDecommissionedError.
func NewDecommissionedStatusErrorf(errorCode codes.Code, format string, args ...interface{}) error {
// Important: gogoproto ptypes and status, not google protobuf, see extgrpc pkg.
st := gogostatus.Newf(errorCode, format, args...).Proto()
det, err := ptypes.MarshalAny(&NodeDecommissionedError{})
if err != nil {
return err
}
st.Details = append(st.Details, det)
return gogostatus.FromProto(st).Err()
}

func (err *NodeDecommissionedError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("node is decommissioned")
return nil
}

func (err *NodeDecommissionedError) Error() string {
return redact.Sprint(err).StripMarkers()
}

// IsDecommissionedStatusErr returns true if the error wraps a gRPC status error
// with a NodeDecommissionedError detail, i.e. it was created using
// NewDecommissionedStatusErrorf.
func IsDecommissionedStatusErr(err error) bool {
s, ok := gogostatus.FromError(errors.UnwrapAll(err))
if !ok {
return false
}
for _, det := range s.Details() {
if _, ok := det.(*NodeDecommissionedError); ok {
return true
}
}
return false
}
28 changes: 28 additions & 0 deletions pkg/kv/kvpb/node_decommissioned_error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvpb

import (
"context"
"testing"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
)

func TestNewDecommissionedStatusErrorf(t *testing.T) {
ctx := context.Background()
err := errors.Wrap(NewDecommissionedStatusErrorf(codes.Unauthenticated, "hello %s", "world"), "!")
require.True(t, IsDecommissionedStatusErr(err))
ee := errors.EncodeError(ctx, err)
require.True(t, IsDecommissionedStatusErr(errors.DecodeError(ctx, ee)))
}
1 change: 0 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ go_test(
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
Expand Down
20 changes: 11 additions & 9 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
Expand Down Expand Up @@ -2613,9 +2614,11 @@ func TestReportUnreachableHeartbeats(t *testing.T) {
// Shut down a raft transport via the circuit breaker, and wait for two
// election timeouts to trigger an election if reportUnreachable broke
// heartbeat transmission to the other store.
cb := tc.Servers[followerIdx].RaftTransport().GetCircuitBreaker(
b, ok := tc.Servers[followerIdx].RaftTransport().GetCircuitBreaker(
tc.Target(followerIdx).NodeID, rpc.DefaultClass)
cb.Break()
require.True(t, ok)
undo := circuit.TestingSetTripped(b, errors.New("boom"))
defer undo()

// Send a command to ensure Raft is aware of lost follower so that it won't
// quiesce (which would prevent heartbeats).
Expand Down Expand Up @@ -2698,22 +2701,21 @@ func TestReportUnreachableRemoveRace(t *testing.T) {
// Pseudo-partition partitionedMaybeLeaseholderIdx away from everyone else. We do this by tripping
// the circuit breaker on all other nodes.
t.Logf("partitioning")
var undos []func()
for i := range tc.Servers {
if i != partitionedMaybeLeaseholderIdx {
cb := tc.Servers[i].RaftTransport().GetCircuitBreaker(tc.Target(partitionedMaybeLeaseholderIdx).NodeID, rpc.DefaultClass)
cb.Break()
b, ok := tc.Servers[i].RaftTransport().GetCircuitBreaker(tc.Target(partitionedMaybeLeaseholderIdx).NodeID, rpc.DefaultClass)
require.True(t, ok)
undos = append(undos, circuit.TestingSetTripped(b, errors.New("boom")))
}
}

// Wait out the heartbeat interval and resolve the partition.
heartbeatInterval := tc.GetFirstStoreFromServer(t, partitionedMaybeLeaseholderIdx).GetStoreConfig().CoalescedHeartbeatsInterval
time.Sleep(heartbeatInterval)
t.Logf("resolving partition")
for i := range tc.Servers {
if i != partitionedMaybeLeaseholderIdx {
cb := tc.Servers[i].RaftTransport().GetCircuitBreaker(tc.Target(partitionedMaybeLeaseholderIdx).NodeID, rpc.DefaultClass)
cb.Reset()
}
for _, undo := range undos {
undo()
}

t.Logf("waiting for replicaGC of removed leader replica")
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"
"time"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator"
Expand All @@ -40,7 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
circuit2 "github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
Expand Down Expand Up @@ -237,7 +236,7 @@ func (r *Replica) Store() *Store {
return r.store
}

func (r *Replica) Breaker() *circuit2.Breaker {
func (r *Replica) Breaker() *circuit.Breaker {
return r.breaker.wrapped
}

Expand Down Expand Up @@ -498,7 +497,7 @@ func (r *Replica) TripBreaker() {
// connection attempts to the specified node.
func (t *RaftTransport) GetCircuitBreaker(
nodeID roachpb.NodeID, class rpc.ConnectionClass,
) *circuit.Breaker {
) (*circuit.Breaker, bool) {
return t.dialer.GetCircuitBreaker(nodeID, class)
}

Expand Down
7 changes: 2 additions & 5 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (t *RaftTransport) SendAsync(
panic("snapshots must be sent using SendSnapshot")
}

if !t.dialer.GetCircuitBreaker(toNodeID, class).Ready() {
if b, ok := t.dialer.GetCircuitBreaker(toNodeID, class); ok && b.Signal().Err() != nil {
return false
}

Expand Down Expand Up @@ -582,10 +582,7 @@ func (t *RaftTransport) startProcessNewQueue(
}
defer cleanup(q)
defer t.queues[class].Delete(int64(toNodeID))
// NB: we dial without a breaker here because the caller has already
// checked the breaker. Checking it again can cause livelock, see:
// https://github.com/cockroachdb/cockroach/issues/68419
conn, err := t.dialer.DialNoBreaker(ctx, toNodeID, class)
conn, err := t.dialer.Dial(ctx, toNodeID, class)
if err != nil {
// DialNode already logs sufficiently, so just return.
return
Expand Down
Loading