Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nomad Monitor #6499

Merged
merged 34 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5e03fec
Update gziphandler to latest version
drewbailey Oct 7, 2019
91c0184
Adds AgentMonitor Endpoint
drewbailey Oct 7, 2019
dc32864
Add Agent Monitor to receive streaming logs
drewbailey Oct 7, 2019
74cfdf5
Adds nomad monitor command
drewbailey Oct 7, 2019
a828c92
Display error when remote side ended monitor
drewbailey Oct 9, 2019
1281997
remove log_writer
drewbailey Oct 10, 2019
890b8a4
get local rpc endpoint working
drewbailey Oct 15, 2019
8095b48
New monitor pkg for shared monitor functionality
drewbailey Oct 15, 2019
e758930
enable json formatting, use queryoptions
drewbailey Oct 24, 2019
2362008
new hclog with standardlogger intercept
drewbailey Oct 24, 2019
3c0082f
use intercepting loggers for rpchandlers
drewbailey Oct 24, 2019
735530c
client monitor endpoint tests
drewbailey Oct 25, 2019
cd60628
rpc acl tests for both monitor endpoints
drewbailey Oct 25, 2019
c8d60dd
only look up rpchandler for node if we have nodeid
drewbailey Oct 25, 2019
92d6a30
agent:read acl policy for monitor
drewbailey Oct 25, 2019
9bb606a
update go-hclog dep
drewbailey Oct 28, 2019
a72bd5c
use channel instead of empty string to determine close
drewbailey Oct 30, 2019
1176fc0
address feedback, use agent_endpoint instead of monitor
drewbailey Oct 30, 2019
6bf8617
rename function, initialize log level better
drewbailey Oct 30, 2019
873969c
return 400 if invalid log_json param is given
drewbailey Oct 30, 2019
58117c0
fix deadlock issue, switch to frames envelope
drewbailey Oct 30, 2019
f8eaf1f
lock in sub select
drewbailey Oct 31, 2019
9a96c10
moving endpoints over to frames
drewbailey Nov 1, 2019
f74bd99
monitor command takes no args
drewbailey Nov 1, 2019
8423ccf
allow more time for streaming message
drewbailey Nov 1, 2019
676800f
address feedback
drewbailey Nov 4, 2019
dc977dc
move forwarded monitor request into helper
drewbailey Nov 4, 2019
4f618eb
simplify assert message
drewbailey Nov 4, 2019
bb2a7f4
address feedback, fix gauge metric name
drewbailey Nov 4, 2019
33ba36a
log-json -> json
drewbailey Nov 4, 2019
79411c5
coordinate closing of doneCh, use interface to simplify callers
drewbailey Nov 5, 2019
1585179
wireup plain=true|false query param
drewbailey Nov 5, 2019
8ccb770
simplify logch goroutine
drewbailey Nov 5, 2019
03f0aff
unlock before returning, no need for label
drewbailey Nov 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,57 @@ func (a *Agent) Health() (*AgentHealthResponse, error) {
return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err)
}

// Monitor returns a channel which will receive streaming logs from the agent
// Providing a non-nil stopCh can be used to close the connection and stop log streaming
func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
errCh := make(chan error, 1)
r, err := a.client.newRequest("GET", "/v1/agent/monitor")
if err != nil {
errCh <- err
return nil, errCh
}

r.setQueryOptions(q)
_, resp, err := requireOK(a.client.doRequest(r))
if err != nil {
errCh <- err
return nil, errCh
}

frames := make(chan *StreamFrame, 10)
go func() {
defer resp.Body.Close()

dec := json.NewDecoder(resp.Body)

for {
select {
case <-stopCh:
drewbailey marked this conversation as resolved.
Show resolved Hide resolved
close(frames)
return
default:
}

// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
close(frames)
errCh <- err
return
}

// Discard heartbeat frame
if frame.IsHeartbeat() {
continue
}

frames <- &frame
}
}()

return frames, errCh
}

// joinResponse is used to decode the response we get while
// sending a member join request.
type joinResponse struct {
Expand Down
119 changes: 119 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package api

import (
"fmt"
"reflect"
"sort"
"strings"
"testing"
"time"

"github.com/kr/pretty"
"github.com/stretchr/testify/require"

"github.com/hashicorp/nomad/api/internal/testutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -257,3 +263,116 @@ func TestAgent_Health(t *testing.T) {
assert.Nil(err)
assert.True(health.Server.Ok)
}

// TestAgent_MonitorWithNode tests the Monitor endpoint
// passing in a log level and node ie, which tests monitor
// functionality for a specific client node
func TestAgent_MonitorWithNode(t *testing.T) {
t.Parallel()
rpcPort := 0
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
rpcPort = c.Ports.RPC
c.Client = &testutil.ClientConfig{
Enabled: true,
}
})
defer s.Stop()

require.NoError(t, c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)}))

agent := c.Agent()

index := uint64(0)
var node *NodeListStub
// grab a node
testutil.WaitForResult(func() (bool, error) {
nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
index = qm.LastIndex
if len(nodes) != 1 {
return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes))
}
if nodes[0].Status != "ready" {
return false, fmt.Errorf("node not ready: %s", nodes[0].Status)
}
node = nodes[0]
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

doneCh := make(chan struct{})
q := &QueryOptions{
Params: map[string]string{
"log_level": "debug",
"node_id": node.ID,
},
}

frames, errCh := agent.Monitor(doneCh, q)
defer close(doneCh)

// make a request to generate some logs
_, err := agent.NodeName()
require.NoError(t, err)

// Wait for a log message
OUTER:
for {
select {
case f := <-frames:
if strings.Contains(string(f.Data), "[DEBUG]") {
break OUTER
}
case err := <-errCh:
t.Errorf("Error: %v", err)
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
}
}

// TestAgent_Monitor tests the Monitor endpoint
// passing in only a log level, which tests the servers
// monitor functionality
func TestAgent_Monitor(t *testing.T) {
drewbailey marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()
c, s := makeClient(t, nil, nil)
defer s.Stop()

agent := c.Agent()

q := &QueryOptions{
Params: map[string]string{
"log_level": "debug",
},
}

doneCh := make(chan struct{})
frames, errCh := agent.Monitor(doneCh, q)
defer close(doneCh)

// make a request to generate some logs
_, err := agent.Region()
require.NoError(t, err)

// Wait for a log message
OUTER:
for {
select {
case log := <-frames:
if log == nil {
continue
}
if strings.Contains(string(log.Data), "[DEBUG]") {
break OUTER
}
case err := <-errCh:
t.Fatalf("error: %v", err)
case <-time.After(2 * time.Second):
require.Fail(t, "failed to get a DEBUG log message")
}
}
}
163 changes: 163 additions & 0 deletions client/agent_endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package client

import (
"bytes"
"context"
"errors"
"io"
"time"

"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"

metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
sframer "github.com/hashicorp/nomad/client/lib/streamframer"
cstructs "github.com/hashicorp/nomad/client/structs"
)

type Agent struct {
c *Client
}

func NewAgentEndpoint(c *Client) *Agent {
m := &Agent{c: c}
m.c.streamingRpcs.Register("Agent.Monitor", m.monitor)
return m
}

func (m *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now())
defer conn.Close()

// Decode arguments
var args cstructs.MonitorRequest
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)

if err := decoder.Decode(&args); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}

// Check acl
if aclObj, err := m.c.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(403), encoder)
return
} else if aclObj != nil && !aclObj.AllowAgentRead() {
handleStreamResultError(structs.ErrPermissionDenied, helper.Int64ToPtr(403), encoder)
return
}

logLevel := log.LevelFromString(args.LogLevel)
if args.LogLevel == "" {
logLevel = log.LevelFromString("INFO")
}

if logLevel == log.NoLevel {
handleStreamResultError(errors.New("Unknown log level"), helper.Int64ToPtr(400), encoder)
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{
JSONFormat: args.LogJSON,
Level: logLevel,
})

frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)

framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()
defer framer.Destroy()

// goroutine to detect remote side closing
go func() {
if _, err := conn.Read(nil); err != nil {
// One end of the pipe explicitly closed, exit
cancel()
return
}
select {
case <-ctx.Done():
return
}
}()

logCh := monitor.Start()
defer monitor.Stop()
initialOffset := int64(0)

// receive logs and build frames
go func() {
defer framer.Destroy()
LOOP:
for {
select {
case log := <-logCh:
if err := framer.Send("", "log", log, initialOffset); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
}
break LOOP
}
case <-ctx.Done():
break LOOP
}
}
}()

var streamErr error
OUTER:
for {
select {
case frame, ok := <-frames:
if !ok {
// frame may have been closed when an error
// occurred. Check once more for an error.
select {
case streamErr = <-errCh:
// There was a pending error!
default:
// No error, continue on
}

break OUTER
}

var resp cstructs.StreamErrWrapper
if args.PlainText {
resp.Payload = frame.Data
} else {
if err := frameCodec.Encode(frame); err != nil {
streamErr = err
break OUTER
}

resp.Payload = buf.Bytes()
buf.Reset()
}

if err := encoder.Encode(resp); err != nil {
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
}

if streamErr != nil {
handleStreamResultError(streamErr, helper.Int64ToPtr(500), encoder)
return
}
}
Loading