Skip to content

Commit

Permalink
client monitor endpoint tests
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Oct 25, 2019
1 parent 7d6ffdc commit 542e68f
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 20 deletions.
68 changes: 49 additions & 19 deletions api/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package api

import (
"fmt"
"reflect"
"sort"
"strings"
"testing"
"time"

"github.com/kr/pretty"
"github.com/stretchr/testify/require"

"github.com/hashicorp/go-uuid"
"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -262,17 +264,47 @@ func TestAgent_Health(t *testing.T) {
assert.True(health.Server.Ok)
}

func TestAgent_Monitor(t *testing.T) {
func TestAgent_MonitorWithNode(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
rpcPort := 0
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
rpcPort = c.Ports.RPC
c.Client = &testutil.ClientConfig{
Enabled: true,
}
})
defer s.Stop()

require.NoError(t, c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)}))

agent := c.Agent()

index := uint64(0)
var node *NodeListStub
// grab a node
testutil.WaitForResult(func() (bool, error) {
nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
index = qm.LastIndex
if len(nodes) != 1 {
return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes))
}
if nodes[0].Status != "ready" {
return false, fmt.Errorf("node not ready: %s", nodes[0].Status)
}
node = nodes[0]
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

doneCh := make(chan struct{})
q := &QueryOptions{
Params: map[string]string{
"log-level": "debug",
"node-id": node.ID,
},
}

Expand All @@ -283,34 +315,32 @@ func TestAgent_Monitor(t *testing.T) {
}

// make a request to generate some logs
_, err = agent.Region()
_, err = agent.NodeName()
require.NoError(t, err)

// Wait for the first log message and validate it
// Wait for a log message
OUTER:
for {
select {
case log := <-logCh:
if log == " " {
return
if strings.Contains(log, "[DEBUG]") {
break OUTER
}
require.Contains(t, log, "[DEBUG]")
case <-time.After(10 * time.Second):
require.Fail(t, "failed to get a log message")
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
}
}
func TestAgent_MonitorWithNode(t *testing.T) {
func TestAgent_Monitor(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()

agent := c.Agent()
id, _ := uuid.GenerateUUID()

q := &QueryOptions{
Params: map[string]string{
"log-level": "debug",
"node-id": id,
},
}

Expand All @@ -326,16 +356,16 @@ func TestAgent_MonitorWithNode(t *testing.T) {
_, err = agent.Region()
require.NoError(t, err)

// Wait for the first log message and validate it
// Wait for a log message
OUTER:
for {
select {
case log := <-logCh:
if log == " " {
return
if strings.Contains(log, "[DEBUG]") {
break OUTER
}
require.Contains(t, log, "[DEBUG]")
case <-time.After(10 * time.Second):
require.Fail(t, "failed to get a log message")
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
}
}
3 changes: 2 additions & 1 deletion nomad/client_monitor_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) {
// NodeID was empty, so monitor this current server
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer close(stopCh)
defer cancel()

monitor := monitor.New(512, m.srv.logger, &log.LoggerOptions{
Expand All @@ -142,7 +143,7 @@ func (m *Monitor) monitor(conn io.ReadWriteCloser) {

go func() {
if _, err := conn.Read(nil); err != nil {
close(stopCh)
// One end of the pipe closed, exit
cancel()
return
}
Expand Down
189 changes: 189 additions & 0 deletions nomad/client_monitor_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package nomad

import (
"fmt"
"io"
"net"
"strings"
"testing"
"time"

"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
)

func TestMonitor_Monitor_Remote_Server(t *testing.T) {
t.Parallel()
require := require.New(t)

// start server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)

c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.GetConfig().RPCAddr.String()}
})
defer cleanup()

testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})

// No node ID to monitor the remote server
req := cstructs.MonitorRequest{
LogLevel: "debug",
NodeID: c.NodeID(),
}

handler, err := s1.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)

// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()

errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)

go handler(p2)

// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}

streamMsg <- &msg
}
}()

// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))

timeout := time.After(1 * time.Second)
expected := "[DEBUG]"
received := ""

OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}

received += string(msg.Payload)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
}
}
}
}

func TestMonitor_MonitorServer(t *testing.T) {
t.Parallel()
require := require.New(t)

// start server and client
s := TestServer(t, nil)
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)

// No node ID to monitor the remote server
req := cstructs.MonitorRequest{
LogLevel: "debug",
}

handler, err := s.StreamingRpcHandler("Agent.Monitor")
require.Nil(err)

// create pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()

errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)

go handler(p2)

// Start decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}

streamMsg <- &msg
}
}()

// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))

timeout := time.After(1 * time.Second)
expected := "[DEBUG]"
received := ""

// send logs
go func() {
for {
s.logger.Debug("test log")
time.Sleep(100 * time.Millisecond)
}
}()

OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout waiting for logs")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error != nil {
t.Fatalf("Got error: %v", msg.Error.Error())
}

received += string(msg.Payload)
if strings.Contains(received, expected) {
require.Nil(p2.Close())
break OUTER
}
}
}
}

0 comments on commit 542e68f

Please sign in to comment.