Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nomad monitor command to lookup server UUID #7015

Merged
merged 2 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ IMPROVEMENTS:
BUG FIXES:

* agent: Fixed race condition in logging when using `nomad monitor` command [[GH-6872](https://github.com/hashicorp/nomad/issues/6872)]
* agent: Fixed a bug where `nomad monitor -server-id` only work for a server's name instead of uuid or name. [[GH-7015](https://github.com/hashicorp/nomad/issues/7015)]
* cli: Fixed a bug where `nomad monitor -node-id` would cause a cli panic when no nodes where found. [[GH-6828](https://github.com/hashicorp/nomad/issues/6828)]
* config: Fixed a bug where agent startup would fail if the `consul.timeout` configuration was set. [[GH-6907](https://github.com/hashicorp/nomad/issues/6907)]
* consul: Fixed a bug where script-based health checks would fail if the service configuration included interpolation. [[GH-6916](https://github.com/hashicorp/nomad/issues/6916)]
* consul: Fixed a bug where script-based health checks would fail if the service configuration included interpolation. [[GH-6916](https://github.com/hashicorp/nomad/issues/6916)]
* consul/connect: Fixed a bug where Connect-enabled jobs failed to validate when service names used interpolation. [[GH-6855](https://github.com/hashicorp/nomad/issues/6855)]
* scheduler: Fixed a bug that caused evicted allocs on a lost node to be stuck in running. [[GH-6902](https://github.com/hashicorp/nomad/issues/6902)]
* scheduler: Fixed a bug where `nomad job plan/apply` returned errors instead of a partial placement warning for ineligible nodes. [[GH-6968](https://github.com/hashicorp/nomad/issues/6968)]
Expand Down
2 changes: 1 addition & 1 deletion api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func TestAgentCPUProfile(t *testing.T) {
}
resp, err := agent.CPUProfile(opts, q)
require.Error(t, err)
require.Contains(t, err.Error(), "500 (unknown nomad server unknown.global)")
require.Contains(t, err.Error(), "500 (unknown Nomad server unknown.global)")
require.Nil(t, resp)
}

Expand Down
4 changes: 2 additions & 2 deletions e2e/connect/input/multi-service.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ job "multi-service" {

config {
image = "hashicorp/http-echo"
args = ["-listen=:9001", "-text=echo1"]
args = ["-listen=:9001", "-text=echo1"]
}
}

Expand All @@ -43,7 +43,7 @@ job "multi-service" {

config {
image = "hashicorp/http-echo"
args = ["-listen=:9002", "-text=echo2"]
args = ["-listen=:9002", "-text=echo2"]
}
}
}
Expand Down
84 changes: 25 additions & 59 deletions nomad/client_agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,30 @@ func (a *Agent) monitor(conn io.ReadWriteCloser) {
return
}

currentServer := a.srv.serf.LocalMember().Name
var forwardServer bool
// Targeting a remote server which is not the leader and not this server
if args.ServerID != "" && args.ServerID != "leader" && args.ServerID != currentServer {
forwardServer = true
region := args.RequestRegion()
if region == "" {
handleStreamResultError(fmt.Errorf("missing target RPC"), helper.Int64ToPtr(400), encoder)
return
}

// Targeting leader and this server is not current leader
if args.ServerID == "leader" && !a.srv.IsLeader() {
forwardServer = true
if region != a.srv.config.Region {
// Mark that we are forwarding
args.SetForwarded()
}

if forwardServer {
a.forwardMonitorServer(conn, args, encoder, decoder)
return
// Try to forward request to remote region/server
if args.ServerID != "" {
serverToFwd, err := a.forwardFor(args.ServerID, region)
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
if serverToFwd != nil {
a.forwardMonitorServer(conn, serverToFwd, args, encoder, decoder)
return
}
}

// NodeID was empty, so monitor this current server
// NodeID was empty, ServerID was equal to this server, monitor this server
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -273,6 +279,7 @@ OUTER:
// serverID and region so the request should not be forwarded.
func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) {
var target *serverParts
var err error

if serverID == "leader" {
isLeader, remoteLeader := a.srv.getLeader()
Expand All @@ -285,19 +292,9 @@ func (a *Agent) forwardFor(serverID, region string) (*serverParts, error) {
return nil, nil
}
} else {
members := a.srv.Members()
for _, mem := range members {
if mem.Name == serverID || mem.Tags["id"] == serverID {
if ok, srv := isNomadServer(mem); ok {
if srv.Region != region {
return nil,
fmt.Errorf(
"Requested server:%s region:%s does not exist in requested region: %s",
serverID, srv.Region, region)
}
target = srv
}
}
target, err = a.srv.getServer(region, serverID)
if err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -384,42 +381,11 @@ func (a *Agent) forwardMonitorClient(conn io.ReadWriteCloser, args cstructs.Moni
return
}

func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
var target *serverParts
serverID := args.ServerID

func (a *Agent) forwardMonitorServer(conn io.ReadWriteCloser, server *serverParts, args cstructs.MonitorRequest, encoder *codec.Encoder, decoder *codec.Decoder) {
// empty ServerID to prevent forwarding loop
args.ServerID = ""

if serverID == "leader" {
isLeader, remoteServer := a.srv.getLeader()
if !isLeader && remoteServer != nil {
target = remoteServer
}
if !isLeader && remoteServer == nil {
handleStreamResultError(structs.ErrNoLeader, helper.Int64ToPtr(400), encoder)
return
}
} else {
// See if the server ID is a known member
serfMembers := a.srv.Members()
for _, mem := range serfMembers {
if mem.Name == serverID {
if ok, srv := isNomadServer(mem); ok {
target = srv
}
}
}
}

// Unable to find a server
if target == nil {
err := fmt.Errorf("unknown nomad server %s", serverID)
handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}

serverConn, err := a.srv.streamingRpc(target, "Agent.Monitor")
serverConn, err := a.srv.streamingRpc(server, "Agent.Monitor")
if err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
Expand Down
87 changes: 69 additions & 18 deletions nomad/client_agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ OUTER:

func TestMonitor_Monitor_RemoteServer(t *testing.T) {
t.Parallel()
foreignRegion := "foo"

// start servers
s1, cleanupS1 := TestServer(t, nil)
Expand All @@ -130,9 +131,17 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
c.DevDisableBootstrap = true
})
defer cleanupS2()
TestJoin(t, s1, s2)

s3, cleanupS3 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
c.Region = foreignRegion
})
defer cleanupS3()

TestJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
testutil.WaitForLeader(t, s3.RPC)

// determine leader and nonleader
servers := []*Server{s1, s2}
Expand All @@ -152,34 +161,65 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
expectedLog string
logger hclog.InterceptLogger
origin *Server
region string
expectedErr string
}{
{
desc: "remote leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: nonLeader,
region: "global",
},
{
desc: "remote server",
desc: "remote server, server name",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "nonleader log",
logger: nonLeader.logger,
origin: leader,
region: "global",
},
{
desc: "remote server, server UUID",
serverID: nonLeader.serf.LocalMember().Tags["id"],
expectedLog: "nonleader log",
logger: nonLeader.logger,
origin: leader,
region: "global",
},
{
desc: "serverID is current leader",
serverID: "leader",
expectedLog: "leader log",
logger: leader.logger,
origin: leader,
region: "global",
},
{
desc: "serverID is current server",
serverID: nonLeader.serf.LocalMember().Name,
expectedLog: "non leader log",
logger: nonLeader.logger,
origin: nonLeader,
region: "global",
},
{
desc: "remote server, different region",
serverID: s3.serf.LocalMember().Name,
expectedLog: "remote region logger",
logger: s3.logger,
origin: nonLeader,
region: foreignRegion,
},
{
desc: "different region, region mismatch",
serverID: s3.serf.LocalMember().Name,
expectedLog: "remote region logger",
logger: s3.logger,
origin: nonLeader,
region: "bar",
expectedErr: "No path to region",
},
}

Expand All @@ -204,6 +244,9 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
req := cstructs.MonitorRequest{
LogLevel: "warn",
ServerID: tc.serverID,
QueryOptions: structs.QueryOptions{
Region: tc.region,
},
}

handler, err := tc.origin.StreamingRpcHandler("Agent.Monitor")
Expand Down Expand Up @@ -246,23 +289,28 @@ func TestMonitor_Monitor_RemoteServer(t *testing.T) {
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
require.Fail("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
require.Fail(err.Error())
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}

var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)

received += string(frame.Data)
if strings.Contains(received, tc.expectedLog) {
close(doneCh)
require.Nil(p2.Close())
break OUTER
if tc.expectedErr != "" {
require.Contains(msg.Error.Error(), tc.expectedErr)
break OUTER
} else {
require.Failf("Got error: %v", msg.Error.Error())
}
} else {
var frame sframer.StreamFrame
err := json.Unmarshal(msg.Payload, &frame)
assert.NoError(t, err)

received += string(frame.Data)
if strings.Contains(received, tc.expectedLog) {
close(doneCh)
require.Nil(p2.Close())
break OUTER
}
}
}
}
Expand All @@ -282,6 +330,9 @@ func TestMonitor_MonitorServer(t *testing.T) {
// No node ID to monitor the remote server
req := cstructs.MonitorRequest{
LogLevel: "debug",
QueryOptions: structs.QueryOptions{
Region: "global",
},
}

handler, err := s.StreamingRpcHandler("Agent.Monitor")
Expand Down Expand Up @@ -541,7 +592,7 @@ func TestAgentProfile_RemoteRegionMisMatch(t *testing.T) {
reply := structs.AgentPprofResponse{}

err := s1.RPC("Agent.Profile", &req, &reply)
require.Contains(err.Error(), "does not exist in requested region")
require.Contains(err.Error(), "unknown Nomad server")
require.Nil(reply.Payload)
}

Expand Down Expand Up @@ -656,7 +707,7 @@ func TestAgentProfile_Server(t *testing.T) {
serverID: uuid.Generate(),
origin: nonLeader,
reqType: pprof.CmdReq,
expectedErr: "unknown nomad server",
expectedErr: "unknown Nomad server",
expectedAgentID: "",
},
}
Expand Down
21 changes: 21 additions & 0 deletions nomad/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,27 @@ func (r *rpcHandler) forwardRegion(region, method string, args interface{}, repl
return r.connPool.RPC(region, server.Addr, server.MajorVersion, method, args, reply)
}

func (r *rpcHandler) getServer(region, serverID string) (*serverParts, error) {
// Bail if we can't find any servers
r.peerLock.RLock()
drewbailey marked this conversation as resolved.
Show resolved Hide resolved
defer r.peerLock.RUnlock()

servers := r.peers[region]
if len(servers) == 0 {
r.logger.Warn("no path found to region", "region", region)
return nil, structs.ErrNoRegionPath
}

// Lookup server by id or name
for _, server := range servers {
if server.Name == serverID || server.ID == serverID {
return server, nil
}
}

return nil, fmt.Errorf("unknown Nomad server %s", serverID)
}

// streamingRpc creates a connection to the given server and conducts the
// initial handshake, returning the connection or an error. It is the callers
// responsibility to close the connection if there is no returned error.
Expand Down
26 changes: 26 additions & 0 deletions nomad/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,32 @@ func TestRPC_forwardRegion(t *testing.T) {
}
}

func TestRPC_getServer(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.Region = "global"
})
defer cleanupS2()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)

// Lookup by name
srv, err := s1.getServer("global", s2.serf.LocalMember().Name)
require.NoError(t, err)

require.Equal(t, srv.Name, s2.serf.LocalMember().Name)

// Lookup by id
srv, err = s2.getServer("global", s1.serf.LocalMember().Tags["id"])
require.NoError(t, err)

require.Equal(t, srv.Name, s1.serf.LocalMember().Name)
}

func TestRPC_PlaintextRPCSucceedsWhenInUpgradeMode(t *testing.T) {
t.Parallel()
assert := assert.New(t)
Expand Down