From fbd50806a1c1a46debe813259a840b91015f6f0d Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Wed, 29 Jan 2020 11:22:43 -0500 Subject: [PATCH 1/2] Allow nomad monitor command to lookup server UUID Allows addressing servers with nomad monitor using the servers name or ID. Also unifies logic for addressing servers for client_agent_endpoint commands and makes addressing logic region aware. rpc getServer test --- nomad/client_agent_endpoint.go | 84 +++++++++-------------------- nomad/client_agent_endpoint_test.go | 80 +++++++++++++++++++++------ nomad/rpc.go | 21 ++++++++ nomad/rpc_test.go | 26 +++++++++ 4 files changed, 136 insertions(+), 75 deletions(-) diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 4d56d14aec5..dd38830d2c7 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -149,24 +149,30 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) { return } - currentServer := a.srv.serf.LocalMember().Name - var forwardServer bool - // Targeting a remote server which is not the leader and not this server - if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer { - forwardServer = true + region := args.RequestRegion() + if region == "" { + handleStreamResultError(fmt.Errorf("missing target RPC"), helper.Int64ToPtr(400), encoder) + return } - - // Targeting leader and this server is not current leader - if args.ServerID == "leader" && !a.srv.IsLeader() { - forwardServer = true + if region != a.srv.config.Region { + // Mark that we are forwarding + args.SetForwarded() } - if forwardServer { - a.forwardMonitorServer(conn, args, encoder, decoder) - return + // Try to forward request to remote region/server + if args.ServerID != "" { + serverToFwd, err := a.forwardFor(args.ServerID, region) + if err != nil { + handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + if serverToFwd != nil { + a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder) + return + } } - // NodeID was empty, so monitor this current server + // NodeID was empty, ServerID was equal to this server, monitor this server ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -273,6 +279,7 @@ OUTER: // serverID and region so the request should not be forwarded. func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) { var target *serverParts + var err error if serverID == "leader" { isLeader, remoteLeader := a.srv.getLeader() @@ -285,19 +292,9 @@ func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) { return nil, nil } } else { - members := a.srv.Members() - for _, mem := range members { - if mem.Name == serverID || mem.Tags["id"] == serverID { - if ok, srv := isNomadServer(mem); ok { - if srv.Region != region { - return nil, - fmt.Errorf( - "Requested server:%s region:%s does not exist in requested region: %s", - serverID, srv.Region, region) - } - target = srv - } - } + target, err = a.srv.getServer(region, serverID) + if err != nil { + return nil, err } } @@ -384,42 +381,11 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni return } -func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { - var target *serverParts - serverID := args.ServerID - +func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, server *serverParts, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) { // empty ServerID to prevent forwarding loop args.ServerID = "" - if serverID == "leader" { - isLeader, remoteServer := a.srv.getLeader() - if !isLeader && remoteServer != nil { - target = remoteServer - } - if !isLeader && remoteServer == nil { - handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder) - return - } - } else { - // See if the server ID is a known member - serfMembers := a.srv.Members() - for _, mem := range serfMembers { - if mem.Name == serverID { - if ok, srv := isNomadServer(mem); ok { - target = srv - } - } - } - } - - // Unable to find a server - if target == nil { - err := fmt.Errorf("unknown nomad server %s", serverID) - handleStreamResultError(err, helper.Int64ToPtr(400), encoder) - return - } - - serverConn, err := a.srv.streamingRpc(target, "Agent.Monitor") + serverConn, err := a.srv.streamingRpc(server, "Agent.Monitor") if err != nil { handleStreamResultError(err, helper.Int64ToPtr(500), encoder) return diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 931d03a6080..7e8b1349431 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -122,6 +122,7 @@ OUTER: func TestMonitor_Monitor_RemoteServer(t *testing.T) { t.Parallel() + foreignRegion := "foo" // start servers s1, cleanupS1 := TestServer(t, nil) @@ -130,9 +131,17 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { c.DevDisableBootstrap = true }) defer cleanupS2() - TestJoin(t, s1, s2) + + s3, cleanupS3 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + c.Region = foreignRegion + }) + defer cleanupS3() + + TestJoin(t, s1, s2, s3) testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s2.RPC) + testutil.WaitForLeader(t, s3.RPC) // determine leader and nonleader servers := []*Server{s1, s2} @@ -152,6 +161,8 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { expectedLog string logger hclog.InterceptLogger origin *Server + region string + expectedErr string }{ { desc: "remote leader", @@ -159,13 +170,23 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { expectedLog: "leader log", logger: leader.logger, origin: nonLeader, + region: "global", }, { - desc: "remote server", + desc: "remote server, server name", serverID: nonLeader.serf.LocalMember().Name, expectedLog: "nonleader log", logger: nonLeader.logger, origin: leader, + region: "global", + }, + { + desc: "remote server, server UUID", + serverID: nonLeader.serf.LocalMember().Tags["id"], + expectedLog: "nonleader log", + logger: nonLeader.logger, + origin: leader, + region: "global", }, { desc: "serverID is current leader", @@ -173,6 +194,7 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { expectedLog: "leader log", logger: leader.logger, origin: leader, + region: "global", }, { desc: "serverID is current server", @@ -180,6 +202,24 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { expectedLog: "non leader log", logger: nonLeader.logger, origin: nonLeader, + region: "global", + }, + { + desc: "remote server, different region", + serverID: s3.serf.LocalMember().Name, + expectedLog: "remote region logger", + logger: s3.logger, + origin: nonLeader, + region: foreignRegion, + }, + { + desc: "different region, region mismatch", + serverID: s3.serf.LocalMember().Name, + expectedLog: "remote region logger", + logger: s3.logger, + origin: nonLeader, + region: "bar", + expectedErr: "No path to region", }, } @@ -204,6 +244,9 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { req := cstructs.MonitorRequest{ LogLevel: "warn", ServerID: tc.serverID, + QueryOptions: structs.QueryOptions{ + Region: tc.region, + }, } handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor") @@ -246,23 +289,28 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) { for { select { case <-timeout: - t.Fatal("timeout waiting for logs") + require.Fail("timeout waiting for logs") case err := <-errCh: - t.Fatal(err) + require.Fail(err.Error()) case msg := <-streamMsg: if msg.Error != nil { - t.Fatalf("Got error: %v", msg.Error.Error()) - } - - var frame sframer.StreamFrame - err := json.Unmarshal(msg.Payload, &frame) - assert.NoError(t, err) - - received += string(frame.Data) - if strings.Contains(received, tc.expectedLog) { - close(doneCh) - require.Nil(p2.Close()) - break OUTER + if tc.expectedErr != "" { + require.Contains(msg.Error.Error(), tc.expectedErr) + break OUTER + } else { + require.Failf("Got error: %v", msg.Error.Error()) + } + } else { + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) + if strings.Contains(received, tc.expectedLog) { + close(doneCh) + require.Nil(p2.Close()) + break OUTER + } } } } diff --git a/nomad/rpc.go b/nomad/rpc.go index 41d47fe731a..228157d4cc5 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -528,6 +528,27 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl return r.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply) } +func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) { + // Bail if we can't find any servers + r.peerLock.RLock() + defer r.peerLock.RUnlock() + + servers := r.peers[region] + if len(servers) == 0 { + r.logger.Warn("no path found to region", "region", region) + return nil, structs.ErrNoRegionPath + } + + // Lookup server by id or name + for _, server := range servers { + if server.Name == serverID || server.ID == serverID { + return server, nil + } + } + + return nil, fmt.Errorf("unknown Nomad server %s", serverID) +} + // streamingRpc creates a connection to the given server and conducts the // initial handshake, returning the connection or an error. It is the callers // responsibility to close the connection if there is no returned error. diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index a4b8a6183a5..e32fba14ab0 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -145,6 +145,32 @@ func TestRPC_forwardRegion(t *testing.T) { } } +func TestRPC_getServer(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.Region = "global" + }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // Lookup by name + srv, err := s1.getServer("global", s2.serf.LocalMember().Name) + require.NoError(t, err) + + require.Equal(t, srv.Name, s2.serf.LocalMember().Name) + + // Lookup by id + srv, err = s2.getServer("global", s1.serf.LocalMember().Tags["id"]) + require.NoError(t, err) + + require.Equal(t, srv.Name, s1.serf.LocalMember().Name) +} + func TestRPC_PlaintextRPCSucceedsWhenInUpgradeMode(t *testing.T) { t.Parallel() assert := assert.New(t) From 2dbcad3f4504e35c4347553bfa29ebeb888e4e19 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Wed, 29 Jan 2020 13:55:14 -0500 Subject: [PATCH 2/2] fix tests, update changelog --- CHANGELOG.md | 3 ++- api/agent_test.go | 2 +- e2e/connect/input/multi-service.nomad | 4 ++-- nomad/client_agent_endpoint_test.go | 7 +++++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5e61f029d0..2ef2cad71b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,9 +18,10 @@ IMPROVEMENTS: BUG FIXES: * agent: Fixed race condition in logging when using `nomad monitor` command [[GH-6872](https://github.com/hashicorp/nomad/issues/6872)] + * agent: Fixed a bug where `nomad monitor -server-id` only work for a server's name instead of uuid or name. [[GH-7015](https://github.com/hashicorp/nomad/issues/7015)] * cli: Fixed a bug where `nomad monitor -node-id` would cause a cli panic when no nodes where found. [[GH-6828](https://github.com/hashicorp/nomad/issues/6828)] * config: Fixed a bug where agent startup would fail if the `consul.timeout` configuration was set. [[GH-6907](https://github.com/hashicorp/nomad/issues/6907)] - * consul: Fixed a bug where script-based health checks would fail if the service configuration included interpolation. [[GH-6916](https://github.com/hashicorp/nomad/issues/6916)] + * consul: Fixed a bug where script-based health checks would fail if the service configuration included interpolation. [[GH-6916](https://github.com/hashicorp/nomad/issues/6916)] * consul/connect: Fixed a bug where Connect-enabled jobs failed to validate when service names used interpolation. [[GH-6855](https://github.com/hashicorp/nomad/issues/6855)] * scheduler: Fixed a bug that caused evicted allocs on a lost node to be stuck in running. [[GH-6902](https://github.com/hashicorp/nomad/issues/6902)] * scheduler: Fixed a bug where `nomad job plan/apply` returned errors instead of a partial placement warning for ineligible nodes. [[GH-6968](https://github.com/hashicorp/nomad/issues/6968)] diff --git a/api/agent_test.go b/api/agent_test.go index 078da8fa852..8fff6cfb0c5 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -407,7 +407,7 @@ func TestAgentCPUProfile(t *testing.T) { } resp, err := agent.CPUProfile(opts, q) require.Error(t, err) - require.Contains(t, err.Error(), "500 (unknown nomad server unknown.global)") + require.Contains(t, err.Error(), "500 (unknown Nomad server unknown.global)") require.Nil(t, resp) } diff --git a/e2e/connect/input/multi-service.nomad b/e2e/connect/input/multi-service.nomad index 1699dbf85cf..2d57c13b41d 100644 --- a/e2e/connect/input/multi-service.nomad +++ b/e2e/connect/input/multi-service.nomad @@ -25,7 +25,7 @@ job "multi-service" { config { image = "hashicorp/http-echo" - args = ["-listen=:9001", "-text=echo1"] + args = ["-listen=:9001", "-text=echo1"] } } @@ -43,7 +43,7 @@ job "multi-service" { config { image = "hashicorp/http-echo" - args = ["-listen=:9002", "-text=echo2"] + args = ["-listen=:9002", "-text=echo2"] } } } diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 7e8b1349431..c87cf51b2d9 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -330,6 +330,9 @@ func TestMonitor_MonitorServer(t *testing.T) { // No node ID to monitor the remote server req := cstructs.MonitorRequest{ LogLevel: "debug", + QueryOptions: structs.QueryOptions{ + Region: "global", + }, } handler, err := s.StreamingRpcHandler("Agent.Monitor") @@ -589,7 +592,7 @@ func TestAgentProfile_RemoteRegionMisMatch(t *testing.T) { reply := structs.AgentPprofResponse{} err := s1.RPC("Agent.Profile", &req, &reply) - require.Contains(err.Error(), "does not exist in requested region") + require.Contains(err.Error(), "unknown Nomad server") require.Nil(reply.Payload) } @@ -704,7 +707,7 @@ func TestAgentProfile_Server(t *testing.T) { serverID: uuid.Generate(), origin: nonLeader, reqType: pprof.CmdReq, - expectedErr: "unknown nomad server", + expectedErr: "unknown Nomad server", expectedAgentID: "", }, }