Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pickfirst: Stop test servers without closing listeners #7872

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 56 additions & 43 deletions balancer/pickfirst/pickfirstleaf/pickfirstleaf_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,54 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// testServer is a server than can be stopped and resumed without closing
// the listener. This guarantees the same port number (and address) is used
// after restart. When a server is stopped, it accepts and closes all tcp
// connections from clients.
type testServer struct {
stubserver.StubServer
lis *testutils.RestartableListener
}

func (s *testServer) stop() {
s.lis.Stop()
}

func (s *testServer) resume() {
s.lis.Restart()
}

func newTestServer(t *testing.T) *testServer {
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
rl := testutils.NewRestartableListener(l)
ss := stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
Listener: rl,
}
return &testServer{
StubServer: ss,
lis: rl,
}
}

// setupPickFirstLeaf performs steps required for pick_first tests. It starts a
// bunch of backends exporting the TestService, and creates a ClientConn to them.
func setupPickFirstLeaf(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, *backendManager) {
t.Helper()
r := manual.NewBuilderWithScheme("whatever")
backends := make([]*stubserver.StubServer, backendCount)
backends := make([]*testServer, backendCount)
addrs := make([]resolver.Address, backendCount)

for i := 0; i < backendCount; i++ {
backend := stubserver.StartTestService(t, nil)
server := newTestServer(t)
backend := stubserver.StartTestService(t, &server.StubServer)
t.Cleanup(func() {
backend.Stop()
})
backends[i] = backend
backends[i] = server
addrs[i] = resolver.Address{Addr: backend.Address}
}

Expand Down Expand Up @@ -263,8 +297,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) {
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
var bal *stateStoringBalancer
select {
Expand All @@ -286,8 +319,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_DisjointLists(t *testing.T) {
t.Errorf("SubConn states mismatch (-want +got):\n%s", diff)
}

bm.backends[2].S.Stop()
bm.backends[2].S = nil
bm.backends[2].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[2], addrs[3]}})

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[3]); err != nil {
Expand Down Expand Up @@ -326,8 +358,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
var bal *stateStoringBalancer
select {
Expand All @@ -349,8 +380,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_ActiveBackendInUpdatedList(t *testing
t.Errorf("SubConn states mismatch (-want +got):\n%s", diff)
}

bm.backends[2].S.Stop()
bm.backends[2].S = nil
bm.backends[2].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[2], addrs[1]}})

// Verify that the ClientConn stays in READY.
Expand Down Expand Up @@ -390,8 +420,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
var bal *stateStoringBalancer
select {
Expand All @@ -413,11 +442,9 @@ func (s) TestPickFirstLeaf_ResolverUpdates_InActiveBackendInUpdatedList(t *testi
t.Errorf("SubConn states mismatch (-want +got):\n%s", diff)
}

bm.backends[2].S.Stop()
bm.backends[2].S = nil
if err := bm.backends[0].StartServer(); err != nil {
t.Fatalf("Failed to re-start test backend: %v", err)
}
bm.backends[2].stop()
bm.backends[0].resume()

r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[2]}})

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
Expand Down Expand Up @@ -455,8 +482,7 @@ func (s) TestPickFirstLeaf_ResolverUpdates_IdenticalLists(t *testing.T) {
stateSubscriber := &ccStateSubscriber{}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, stateSubscriber)

bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0], addrs[1]}})
var bal *stateStoringBalancer
select {
Expand Down Expand Up @@ -553,14 +579,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerRestart(t *testing.T)
}

// Shut down the connected server.
bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Start the new target server.
if err := bm.backends[0].StartServer(); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
bm.backends[0].resume()

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -619,14 +642,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerRestart(t *testing.T)
}

// Shut down the connected server.
bm.backends[1].S.Stop()
bm.backends[1].S = nil
bm.backends[1].stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Start the new target server.
if err := bm.backends[1].StartServer(); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
bm.backends[1].resume()

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -691,14 +711,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_SecondServerToFirst(t *testing.T)
}

// Shut down the connected server.
bm.backends[1].S.Stop()
bm.backends[1].S = nil
bm.backends[1].stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Start the new target server.
if err := bm.backends[0].StartServer(); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
bm.backends[0].resume()

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -762,14 +779,11 @@ func (s) TestPickFirstLeaf_StopConnectedServer_FirstServerToSecond(t *testing.T)
}

// Shut down the connected server.
bm.backends[0].S.Stop()
bm.backends[0].S = nil
bm.backends[0].stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)

// Start the new target server.
if err := bm.backends[1].StartServer(); err != nil {
t.Fatalf("Failed to start server: %v", err)
}
bm.backends[1].resume()

if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1266,14 +1280,13 @@ type scState struct {
}

type backendManager struct {
backends []*stubserver.StubServer
backends []*testServer
}

func (b *backendManager) stopAllExcept(index int) {
for idx, b := range b.backends {
if idx != index {
b.S.Stop()
b.S = nil
b.stop()
}
}
}
Expand Down
Loading