Skip to content

Commit

Permalink
rpc: improve test coverage of initial heartbeat failures
Browse files Browse the repository at this point in the history
This patch improves test coverage for `InitialHeartbeatFailedError` and
`grpcutil.RequestDidNotStart()`.

It also removes some stale references about wanting gRPC to return
unambiguious errors for certain types of connection failures.
`InitialHeartbeatFailedError` has largerly replaced this need.

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jun 30, 2023
1 parent 1e222ff commit 24537db
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 127 deletions.
101 changes: 101 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2498,3 +2498,104 @@ func checkMetrics(m *Metrics, healthy, unhealthy, inactive int64, checkDurations

return nil
}

// TestInitialHeartbeatFailedError tests that InitialHeartbeatFailedError is
// returned for various scenarios. This is important for
// grpcutil.RequestDidNotStart to properly detect unambiguous failures.
func TestInitialHeartbeatFailedError(t *testing.T) {
defer leaktest.AfterTest(t)()

const maxOffset = 0
const nodeID = 1

hbErrType := (*netutil.InitialHeartbeatFailedError)(nil)
requireHeartbeatError := func(t *testing.T, err error) {
t.Helper()
require.Error(t, err)
require.True(t, errors.HasType(err, hbErrType), "got %T: %s", err, err)
}

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := timeutil.NewManualTime(timeutil.Unix(0, 20))
serverCtx := newTestContext(uuid.MakeV4(), clock, maxOffset, stopper)
serverCtx.NodeID.Set(ctx, nodeID)
clientCtx := newTestContext(serverCtx.StorageClusterID.Get(), clock, maxOffset, stopper)
clientCtx.AddTestingDialOpts(grpc.WithConnectParams(grpc.ConnectParams{
MinConnectTimeout: time.Second,
}))

// Set up a ping handler that can error out.
var failPing, hangPing atomic.Bool
onHandlePing := func(ctx context.Context, req *PingRequest, resp *PingResponse) error {
if failPing.Load() {
return errors.New("error")
}
for hangPing.Load() {
time.Sleep(100 * time.Millisecond)
}
return nil
}

// Rejected connection errors with InitialHeartbeatFailedError.
remoteAddr := "127.0.0.99:64072"
_, err := clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx)
requireHeartbeatError(t, err)

// Hung listener errors with InitialHeartbeatFailedError.
hungLn, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer func() {
_ = hungLn.Close()
}()
remoteAddr = hungLn.Addr().String()

_, err = clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx)
requireHeartbeatError(t, err)

// Start server listener.
s := newTestServer(t, serverCtx)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: clock,
remoteClockMonitor: serverCtx.RemoteClocks,
clusterID: serverCtx.StorageClusterID,
nodeID: serverCtx.NodeID,
version: serverCtx.Settings.Version,
onHandlePing: onHandlePing,
})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
require.NoError(t, err)
remoteAddr = ln.Addr().String()

// Before connecting, health does not return an InitialHeartbeatFailedError,
// it returns ErrNotHeartbeated.
err = clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Health()
require.Error(t, err)
require.True(t, errors.HasType(err, ErrNotHeartbeated))
require.False(t, errors.HasType(err, hbErrType))

// Ping errors result in InitialHeartbeatFailedError.
failPing.Store(true)
_, err = clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx)
requireHeartbeatError(t, err)
failPing.Store(false)

// Stalled pings result in InitialHeartbeatFailedError.
hangPing.Store(true)
_, err = clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx)
requireHeartbeatError(t, err)
hangPing.Store(false)

// RPC circuit breakers will now be tripped. They should result in
// InitialHeartbeatFailedError until we finally recover.
testutils.SucceedsSoon(t, func() error {
_, err := clientCtx.GRPCDialNode(remoteAddr, nodeID, SystemClass).Connect(ctx)
if err != nil {
requireHeartbeatError(t, err)
}
return err
})
}
5 changes: 1 addition & 4 deletions pkg/util/grpcutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,16 @@ go_test(
embed = [":grpcutil"],
deps = [
"//pkg/server",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/severity",
"//pkg/util/netutil",
"@com_github_cockroachdb_circuitbreaker//:circuitbreaker",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_status//:status",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//health/grpc_health_v1",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
],
Expand Down
26 changes: 7 additions & 19 deletions pkg/util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,26 +150,14 @@ func IsWaitingForInit(err error) bool {
return ok && s.Code() == codes.Unavailable && strings.Contains(err.Error(), "node waiting for init")
}

// RequestDidNotStart returns true if the given error from gRPC
// means that the request definitely could not have started on the
// remote server.
// RequestDidNotStart returns true if the given RPC error means that the request
// definitely could not have started on the remote server.
func RequestDidNotStart(err error) bool {
if errors.HasType(err, (*netutil.InitialHeartbeatFailedError)(nil)) ||
// NB: gRPC doesn't provide a way to distinguish unambiguous failures, but
// InitialHeartbeatFailedError serves mostly the same purpose. See also
// https://github.com/grpc/grpc-go/issues/1443.
return errors.HasType(err, (*netutil.InitialHeartbeatFailedError)(nil)) ||
errors.Is(err, circuit.ErrBreakerOpen) ||
IsConnectionRejected(err) ||
IsWaitingForInit(err) {
return true
}
_, ok := status.FromError(errors.Cause(err))
if !ok {
// This is a non-gRPC error; assume nothing.
return false
}
// This is where you'd hope to treat some gRPC errors as unambiguous.
// Unfortunately, gRPC provides no good way to distinguish ambiguous from
// unambiguous failures.
//
// https://github.com/grpc/grpc-go/issues/1443
// https://github.com/cockroachave hdb/cockroach/issues/19708#issuecomment-343891640
return false
IsWaitingForInit(err)
}
114 changes: 10 additions & 104 deletions pkg/util/grpcutil/grpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,21 @@
package grpcutil_test

import (
"context"
"fmt"
"net"
"strings"
"testing"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/netutil"
"github.com/cockroachdb/errors"
"github.com/gogo/status"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// Implement the grpc health check interface (just because it's the
// simplest predefined RPC service I could find that seems unlikely to
// change out from under us) with an implementation that shuts itself
// down whenever anything calls it. This lets us distinguish errors
// caused by server shutdowns during the request from those when the
// server was already down.
type healthServer struct {
grpcServer *grpc.Server
}

func (hs healthServer) Check(
ctx context.Context, req *healthpb.HealthCheckRequest,
) (*healthpb.HealthCheckResponse, error) {
hs.grpcServer.Stop()

// Wait for the shutdown to happen before returning from this
// method.
<-ctx.Done()
return nil, errors.New("no one should see this")
}

func (hs healthServer) Watch(*healthpb.HealthCheckRequest, healthpb.Health_WatchServer) error {
panic("not implemented")
}

func TestIsWaitingForInit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -76,75 +45,6 @@ func TestIsWaitingForInit(t *testing.T) {
}
}

func TestRequestDidNotStart(t *testing.T) {
defer leaktest.AfterTest(t)()

skip.WithIssue(t, 19708)

lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer func() {
_ = lis.Close()
}()

server := grpc.NewServer()
hs := healthServer{server}
healthpb.RegisterHealthServer(server, hs)
go func() {
_ = server.Serve(lis)
}()
//lint:ignore SA1019 grpc.WithInsecure is deprecated
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}
defer func() {
_ = conn.Close() // nolint:grpcconnclose
}()
client := healthpb.NewHealthClient(conn)

// The first time, the request will start and we'll get a
// "connection is closing" message.
_, err = client.Check(context.Background(), &healthpb.HealthCheckRequest{})
if err == nil {
t.Fatal("did not get expected error")
} else if grpcutil.RequestDidNotStart(err) {
t.Fatalf("request should have started, but got %s", err)
} else if !strings.Contains(err.Error(), "is closing") {
// This assertion is not essential to this test, but since this
// logic is sensitive to grpc error handling details it's safer to
// make the test fail when anything changes. This error could be
// either "transport is closing" or "connection is closing"
t.Fatalf("expected 'is closing' error but got %s", err)
}

// Afterwards, the request will fail immediately without being sent.
// But if we try too soon, there's a chance the transport hasn't
// been put in the "transient failure" state yet and we get a
// different error.
testutils.SucceedsSoon(t, func() error {
_, err = client.Check(context.Background(), &healthpb.HealthCheckRequest{})
if err == nil {
return errors.New("did not get expected error")
} else if !grpcutil.RequestDidNotStart(err) {
return errors.Wrap(err, "request should not have started, but got error")
}
return nil
})

// Once the transport is in the "transient failure" state it should
// stay that way, and every subsequent request will fail
// immediately.
_, err = client.Check(context.Background(), &healthpb.HealthCheckRequest{})
if err == nil {
t.Fatal("did not get expected error")
} else if !grpcutil.RequestDidNotStart(err) {
t.Fatalf("request should not have started, but got %s", err)
}
}

func TestRequestDidNotStart_Errors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -153,13 +53,19 @@ func TestRequestDidNotStart_Errors(t *testing.T) {
err error
expect bool
}{
"breaker": {errors.Wrapf(circuit.ErrBreakerOpen, "unable to dial n%d", 42), true},
"waiting for init": {errors.Wrapf(server.NewWaitingForInitError("foo"), "failed"), true},
"plain": {errors.New("foo"), false},
"failed heartbeat": {&netutil.InitialHeartbeatFailedError{}, true},
"waiting for init": {server.NewWaitingForInitError("foo"), true},
"unauthenticated": {status.Error(codes.Unauthenticated, "unauthenticated"), true},
"permission denied": {status.Error(codes.PermissionDenied, "permission denied"), true},
"failed precondition": {status.Error(codes.FailedPrecondition, "failed precondition"), true},
"circuit breaker": {circuit.ErrBreakerOpen, true},
"plain": {errors.New("foo"), false},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
// Make sure the error is properly detected both bare and wrapped.
require.Equal(t, tc.expect, grpcutil.RequestDidNotStart(tc.err))
require.Equal(t, tc.expect, grpcutil.RequestDidNotStart(errors.Wrap(tc.err, "wrapped")))
})
}
}

0 comments on commit 24537db

Please sign in to comment.