diff --git a/nomad/client_meta_endpoint_test.go b/nomad/client_meta_endpoint_test.go new file mode 100644 index 00000000000..64e56c26950 --- /dev/null +++ b/nomad/client_meta_endpoint_test.go @@ -0,0 +1,129 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client" + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" +) + +// TestNodeMeta_Forward asserts that Client RPCs do not result in infinite +// loops. For example in a cluster with 1 Leader, 2 Followers, and a Node +// connected to Follower 1: +// +// If a NodeMeta.Apply RPC with AllowStale=false is received by Follower 1, it +// will honor AllowStale=false and forward the request to the Leader. +// +// The Leader will accept the RPC, notice that Follower 1 has a connection to +// the Node, and the Leader will send the request back to Follower 1. +// +// Follower 1, ever respectful of AllowStale=false, will forward it back to the +// Leader. +// +// The Leader, being unable to forward to the Node, will send it back to +// Follower 1. +// +// This argument will continue until one of the Servers runs out of memory or +// patience and stomps away in anger (crashes). Like any good argument the +// ending is never pretty as the Servers will suffer CPU starvation and +// potentially Raft flapping before anyone actually OOMs. +// +// See https://github.com/hashicorp/nomad/issues/16517 for details. +// +// If test fails it will do so spectacularly by consuming all available CPU and +// potentially all available memory. Running it in a VM or container is +// suggested. +func TestNodeMeta_Forward(t *testing.T) { + ci.Parallel(t) + + servers := []*Server{} + for i := 0; i < 3; i++ { + s, cleanup := TestServer(t, func(c *Config) { + c.BootstrapExpect = 3 + c.NumSchedulers = 0 + }) + t.Cleanup(cleanup) + servers = append(servers, s) + } + + TestJoin(t, servers...) + leader := testutil.WaitForLeaders(t, servers[0].RPC, servers[1].RPC, servers[2].RPC) + + followers := []string{} + for _, s := range servers { + if addr := s.config.RPCAddr.String(); addr != leader { + followers = append(followers, addr) + } + } + t.Logf("leader=%s followers=%q", leader, followers) + + clients := []*client.Client{} + for i := 0; i < 4; i++ { + c, cleanup := client.TestClient(t, func(c *config.Config) { + // Clients will rebalance across all servers, but try to get them to use + // followers to ensure we don't hit the loop in #16517 + c.Servers = followers + }) + defer cleanup() + clients = append(clients, c) + } + for _, c := range clients { + testutil.WaitForClient(t, servers[0].RPC, c.NodeID(), c.Region()) + } + + agentRPCs := []func(string, any, any) error{} + nodeIDs := make([]string, 0, len(clients)) + + // Build list of agents and node IDs + for _, s := range servers { + agentRPCs = append(agentRPCs, s.RPC) + } + + for _, c := range clients { + agentRPCs = append(agentRPCs, c.RPC) + nodeIDs = append(nodeIDs, c.NodeID()) + } + + region := clients[0].Region() + + // Apply metadata to every client through every agent to ensure forwarding + // always works regardless of path taken. + for _, rpc := range agentRPCs { + for _, nodeID := range nodeIDs { + args := &structs.NodeMetaApplyRequest{ + // Intentionally don't set QueryOptions.AllowStale to exercise #16517 + QueryOptions: structs.QueryOptions{ + Region: region, + }, + NodeID: nodeID, + Meta: map[string]*string{"testing": pointer.Of("123")}, + } + reply := &structs.NodeMetaResponse{} + must.NoError(t, rpc("NodeMeta.Apply", args, reply)) + must.MapNotEmpty(t, reply.Meta) + } + } + + for _, rpc := range agentRPCs { + for _, nodeID := range nodeIDs { + args := &structs.NodeSpecificRequest{ + // Intentionally don't set QueryOptions.AllowStale to exercise #16517 + QueryOptions: structs.QueryOptions{ + Region: region, + }, + NodeID: nodeID, + } + reply := &structs.NodeMetaResponse{} + must.NoError(t, rpc("NodeMeta.Read", args, reply)) + must.MapNotEmpty(t, reply.Meta) + must.Eq(t, reply.Meta["testing"], "123") + must.MapNotEmpty(t, reply.Dynamic) + must.Eq(t, *reply.Dynamic["testing"], "123") + } + } +} diff --git a/nomad/testing.go b/nomad/testing.go index 6e7d31bb886..d2ec5d6165e 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -6,7 +6,6 @@ import ( "math/rand" "net" "sync/atomic" - "testing" "time" "github.com/hashicorp/nomad/ci" @@ -15,6 +14,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/version" + testing "github.com/mitchellh/go-testing-interface" "github.com/shoenig/test/must" ) @@ -22,7 +22,7 @@ var ( nodeNumber int32 = 0 ) -func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) { +func TestACLServer(t testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) { server, cleanup := TestServer(t, func(c *Config) { c.ACLEnabled = true if cb != nil { @@ -37,13 +37,13 @@ func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken, return server, token, cleanup } -func TestServer(t *testing.T, cb func(*Config)) (*Server, func()) { +func TestServer(t testing.T, cb func(*Config)) (*Server, func()) { s, c, err := TestServerErr(t, cb) must.NoError(t, err, must.Sprint("failed to start test server")) return s, c } -func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) { +func TestServerErr(t testing.T, cb func(*Config)) (*Server, func(), error) { // Setup the default settings config := DefaultConfig() @@ -150,19 +150,15 @@ func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) { return nil, nil, errors.New("unable to acquire ports for test server") } -func TestJoin(t *testing.T, servers ...*Server) { +func TestJoin(t testing.T, servers ...*Server) { for i := 0; i < len(servers)-1; i++ { addr := fmt.Sprintf("127.0.0.1:%d", servers[i].config.SerfConfig.MemberlistConfig.BindPort) for j := i + 1; j < len(servers); j++ { num, err := servers[j].Join([]string{addr}) - if err != nil { - t.Fatalf("err: %v", err) - } - if num != 1 { - t.Fatalf("bad: %d", num) - } + must.NoError(t, err) + must.Eq(t, 1, num) } } } diff --git a/testutil/wait.go b/testutil/wait.go index 3cb8ee3a058..6f239b9081d 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -143,15 +143,16 @@ func WaitForLeader(t testing.TB, rpc rpcFn) { }) } -// WaitForLeaders blocks until each serverRPC knows the leader. -func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) { +// WaitForLeaders blocks until each rpcs knows the leader. +func WaitForLeaders(t testing.TB, rpcs ...rpcFn) string { t.Helper() - for i := 0; i < len(serverRPCs); i++ { + var leader string + for i := 0; i < len(rpcs); i++ { ok := func() (bool, error) { + leader = "" args := &structs.GenericRequest{} - var leader string - err := serverRPCs[i]("Status.Leader", args, &leader) + err := rpcs[i]("Status.Leader", args, &leader) return leader != "", err } must.Wait(t, wait.InitialSuccess( @@ -160,6 +161,8 @@ func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) { wait.Gap(1*time.Second), )) } + + return leader } // WaitForClient blocks until the client can be found