Skip to content

Commit

Permalink
test: add test for client rpc infinite loop
Browse files Browse the repository at this point in the history
  • Loading branch information
schmichael committed Mar 20, 2023
1 parent 0b45af3 commit 495c6ff
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 16 deletions.
129 changes: 129 additions & 0 deletions nomad/client_meta_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package nomad

import (
"testing"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
)

// TestNodeMeta_Forward asserts that Client RPCs do not result in infinite
// loops. For example in a cluster with 1 Leader, 2 Followers, and a Node
// connected to Follower 1:
//
// If a NodeMeta.Apply RPC with AllowStale=false is received by Follower 1, it
// will honor AllowStale=false and forward the request to the Leader.
//
// The Leader will accept the RPC, notice that Follower 1 has a connection to
// the Node, and the Leader will send the request back to Follower 1.
//
// Follower 1, ever respectful of AllowStale=false, will forward it back to the
// Leader.
//
// The Leader, being unable to forward to the Node, will send it back to
// Follower 1.
//
// This argument will continue until one of the Servers runs out of memory or
// patience and stomps away in anger (crashes). Like any good argument the
// ending is never pretty as the Servers will suffer CPU starvation and
// potentially Raft flapping before anyone actually OOMs.
//
// See https://github.com/hashicorp/nomad/issues/16517 for details.
//
// If test fails it will do so spectacularly by consuming all available CPU and
// potentially all available memory. Running it in a VM or container is
// suggested.
func TestNodeMeta_Forward(t *testing.T) {
ci.Parallel(t)

servers := []*Server{}
for i := 0; i < 3; i++ {
s, cleanup := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
c.NumSchedulers = 0
})
t.Cleanup(cleanup)
servers = append(servers, s)
}

TestJoin(t, servers...)
leader := testutil.WaitForLeaders(t, servers[0].RPC, servers[1].RPC, servers[2].RPC)

followers := []string{}
for _, s := range servers {
if addr := s.config.RPCAddr.String(); addr != leader {
followers = append(followers, addr)
}
}
t.Logf("leader=%s followers=%q", leader, followers)

clients := []*client.Client{}
for i := 0; i < 4; i++ {
c, cleanup := client.TestClient(t, func(c *config.Config) {
// Clients will rebalance across all servers, but try to get them to use
// followers to ensure we don't hit the loop in #16517
c.Servers = followers
})
defer cleanup()
clients = append(clients, c)
}
for _, c := range clients {
testutil.WaitForClient(t, servers[0].RPC, c.NodeID(), c.Region())
}

agentRPCs := []func(string, any, any) error{}
nodeIDs := make([]string, 0, len(clients))

// Build list of agents and node IDs
for _, s := range servers {
agentRPCs = append(agentRPCs, s.RPC)
}

for _, c := range clients {
agentRPCs = append(agentRPCs, c.RPC)
nodeIDs = append(nodeIDs, c.NodeID())
}

region := clients[0].Region()

// Apply metadata to every client through every agent to ensure forwarding
// always works regardless of path taken.
for _, rpc := range agentRPCs {
for _, nodeID := range nodeIDs {
args := &structs.NodeMetaApplyRequest{
// Intentionally don't set QueryOptions.AllowStale to exercise #16517
QueryOptions: structs.QueryOptions{
Region: region,
},
NodeID: nodeID,
Meta: map[string]*string{"testing": pointer.Of("123")},
}
reply := &structs.NodeMetaResponse{}
must.NoError(t, rpc("NodeMeta.Apply", args, reply))
must.MapNotEmpty(t, reply.Meta)
}
}

for _, rpc := range agentRPCs {
for _, nodeID := range nodeIDs {
args := &structs.NodeSpecificRequest{
// Intentionally don't set QueryOptions.AllowStale to exercise #16517
QueryOptions: structs.QueryOptions{
Region: region,
},
NodeID: nodeID,
}
reply := &structs.NodeMetaResponse{}
must.NoError(t, rpc("NodeMeta.Read", args, reply))
must.MapNotEmpty(t, reply.Meta)
must.Eq(t, reply.Meta["testing"], "123")
must.MapNotEmpty(t, reply.Dynamic)
must.Eq(t, *reply.Dynamic["testing"], "123")
}
}
}
18 changes: 7 additions & 11 deletions nomad/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"math/rand"
"net"
"sync/atomic"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
Expand All @@ -15,14 +14,15 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/version"
testing "github.com/mitchellh/go-testing-interface"
"github.com/shoenig/test/must"
)

var (
nodeNumber int32 = 0
)

func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) {
func TestACLServer(t testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, func(c *Config) {
c.ACLEnabled = true
if cb != nil {
Expand All @@ -37,13 +37,13 @@ func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken,
return server, token, cleanup
}

func TestServer(t *testing.T, cb func(*Config)) (*Server, func()) {
func TestServer(t testing.T, cb func(*Config)) (*Server, func()) {
s, c, err := TestServerErr(t, cb)
must.NoError(t, err, must.Sprint("failed to start test server"))
return s, c
}

func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
func TestServerErr(t testing.T, cb func(*Config)) (*Server, func(), error) {
// Setup the default settings
config := DefaultConfig()

Expand Down Expand Up @@ -150,19 +150,15 @@ func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
return nil, nil, errors.New("unable to acquire ports for test server")
}

func TestJoin(t *testing.T, servers ...*Server) {
func TestJoin(t testing.T, servers ...*Server) {
for i := 0; i < len(servers)-1; i++ {
addr := fmt.Sprintf("127.0.0.1:%d",
servers[i].config.SerfConfig.MemberlistConfig.BindPort)

for j := i + 1; j < len(servers); j++ {
num, err := servers[j].Join([]string{addr})
if err != nil {
t.Fatalf("err: %v", err)
}
if num != 1 {
t.Fatalf("bad: %d", num)
}
must.NoError(t, err)
must.Eq(t, 1, num)
}
}
}
13 changes: 8 additions & 5 deletions testutil/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,16 @@ func WaitForLeader(t testing.TB, rpc rpcFn) {
})
}

// WaitForLeaders blocks until each serverRPC knows the leader.
func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) {
// WaitForLeaders blocks until each rpcs knows the leader.
func WaitForLeaders(t testing.TB, rpcs ...rpcFn) string {
t.Helper()

for i := 0; i < len(serverRPCs); i++ {
var leader string
for i := 0; i < len(rpcs); i++ {
ok := func() (bool, error) {
leader = ""
args := &structs.GenericRequest{}
var leader string
err := serverRPCs[i]("Status.Leader", args, &leader)
err := rpcs[i]("Status.Leader", args, &leader)
return leader != "", err
}
must.Wait(t, wait.InitialSuccess(
Expand All @@ -160,6 +161,8 @@ func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) {
wait.Gap(1*time.Second),
))
}

return leader
}

// WaitForClient blocks until the client can be found
Expand Down

0 comments on commit 495c6ff

Please sign in to comment.