diff --git a/api/agent.go b/api/agent.go index bccb2c19f3d..936ac1e4c19 100644 --- a/api/agent.go +++ b/api/agent.go @@ -289,7 +289,7 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *Stream } }() - return frames, nil + return frames, errCh } // joinResponse is used to decode the response we get while diff --git a/api/agent_test.go b/api/agent_test.go index 4a0304f094f..079ba44c1b6 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1,6 +1,7 @@ package api import ( + "bytes" "fmt" "reflect" "sort" @@ -311,22 +312,24 @@ func TestAgent_MonitorWithNode(t *testing.T) { }, } - logCh, err := agent.Monitor(doneCh, q) - require.NoError(t, err) + frames, errCh := agent.Monitor(doneCh, q) defer close(doneCh) // make a request to generate some logs - _, err = agent.NodeName() + _, err := agent.NodeName() require.NoError(t, err) // Wait for a log message + var result bytes.Buffer OUTER: for { select { - case log := <-logCh: - if strings.Contains(string(log.Data), "[DEBUG]") { + case f := <-frames: + if strings.Contains(string(f.Data), "[DEBUG]") { break OUTER } + case err := <-errCh: + t.Errorf("Error: %v", err) case <-time.After(2 * time.Second): require.Fail(t, "failed to get a DEBUG log message") } @@ -350,22 +353,26 @@ func TestAgent_Monitor(t *testing.T) { } doneCh := make(chan struct{}) - logCh, err := agent.Monitor(doneCh, q) - require.NoError(t, err) + frames, errCh := agent.Monitor(doneCh, q) defer close(doneCh) // make a request to generate some logs - _, err = agent.Region() + _, err := agent.Region() require.NoError(t, err) // Wait for a log message OUTER: for { select { - case log := <-logCh: + case log := <-frames: + if log == nil { + continue + } if strings.Contains(string(log.Data), "[DEBUG]") { break OUTER } + case err := <-errCh: + t.Fatalf("error: %v", err) case <-time.After(2 * time.Second): require.Fail(t, "failed to get a DEBUG log message") } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index feeb8a8289f..504fc16348b 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -298,15 +298,13 @@ func TestHTTP_AgentMonitor(t *testing.T) { tried := 0 testutil.WaitForResult(func() (bool, error) { if tried < maxLogAttempts { - s.Server.logger.Debug("log that should not be sent") s.Server.logger.Warn("log that should be sent") tried++ } got := resp.Body.String() - want := "[WARN] http: log that should be sent" + want := `{"Data":"` if strings.Contains(got, want) { - require.NotContains(t, resp.Body.String(), "[DEBUG]") return true, nil } @@ -344,9 +342,8 @@ func TestHTTP_AgentMonitor(t *testing.T) { } out += string(output) - want := "[WARN] http: log that should be sent" + want := `{"Data":"` if strings.Contains(out, want) { - require.NotContains(t, resp.Body.String(), "[DEBUG]") return true, nil } diff --git a/command/agent_monitor_test.go b/command/agent_monitor_test.go index eb49e92c33f..24109b28048 100644 --- a/command/agent_monitor_test.go +++ b/command/agent_monitor_test.go @@ -14,6 +14,8 @@ func TestMonitorCommand_Implements(t *testing.T) { func TestMonitorCommand_Fails(t *testing.T) { t.Parallel() + srv, _, _ := testServer(t, false, nil) + defer srv.Shutdown() ui := new(cli.MockUi) cmd := &MonitorCommand{Meta: Meta{Ui: ui}} diff --git a/nomad/client_agent_endpoint.go b/nomad/client_agent_endpoint.go index 6c93706530f..d3d3d4d5508 100644 --- a/nomad/client_agent_endpoint.go +++ b/nomad/client_agent_endpoint.go @@ -1,14 +1,17 @@ package nomad import ( + "bytes" "context" "errors" "fmt" "io" "net" "strings" + "time" log "github.com/hashicorp/go-hclog" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/command/agent/monitor" "github.com/hashicorp/nomad/helper" @@ -138,9 +141,20 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { JSONFormat: args.LogJSON, }) + frames := make(chan *sframer.StreamFrame, 32) + errCh := make(chan error) + var buf bytes.Buffer + frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) + + // framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 64*1024) + framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024) + framer.Run() + defer framer.Destroy() + + // goroutine to detect remote side closing go func() { if _, err := conn.Read(nil); err != nil { - // One end of the pipe closed, exit + // One end of the pipe explicitly closed, exit cancel() return } @@ -151,14 +165,59 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) { }() logCh := monitor.Start(stopCh) + initialOffset := int64(0) + + // receive logs and build frames + go func() { + defer framer.Destroy() + LOOP: + for { + select { + case log := <-logCh: + if err := framer.Send("", "log", log, initialOffset); err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + break LOOP + } + case <-ctx.Done(): + break LOOP + } + } + }() var streamErr error OUTER: for { select { - case log := <-logCh: + case frame, ok := <-frames: + if !ok { + // frame may have been closed when an error + // occurred. Check once more for an error. + select { + case streamErr = <-errCh: + // There was a pending error! + default: + // No error, continue on + } + + break OUTER + } + var resp cstructs.StreamErrWrapper - resp.Payload = log + if args.PlainText { + resp.Payload = frame.Data + } else { + if err := frameCodec.Encode(frame); err != nil { + streamErr = err + break OUTER + } + + resp.Payload = buf.Bytes() + buf.Reset() + } + if err := encoder.Encode(resp); err != nil { streamErr = err break OUTER @@ -174,11 +233,5 @@ OUTER: if streamErr == io.EOF || strings.Contains(streamErr.Error(), "closed") { return } - - // Attempt to send the error - encoder.Encode(&cstructs.StreamErrWrapper{ - Error: cstructs.NewRpcError(streamErr, helper.Int64ToPtr(500)), - }) - return } } diff --git a/nomad/client_agent_endpoint_test.go b/nomad/client_agent_endpoint_test.go index 27e4cb84f70..201e21fb937 100644 --- a/nomad/client_agent_endpoint_test.go +++ b/nomad/client_agent_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "encoding/json" "fmt" "io" "net" @@ -11,10 +12,12 @@ import ( "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client" "github.com/hashicorp/nomad/client/config" + sframer "github.com/hashicorp/nomad/client/lib/streamframer" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/ugorji/go/codec" ) @@ -85,7 +88,7 @@ func TestMonitor_Monitor_Remote_Server(t *testing.T) { encoder := codec.NewEncoder(p1, structs.MsgpackHandle) require.Nil(encoder.Encode(req)) - timeout := time.After(1 * time.Second) + timeout := time.After(3 * time.Second) expected := "[DEBUG]" received := "" @@ -101,7 +104,11 @@ OUTER: t.Fatalf("Got error: %v", msg.Error.Error()) } - received += string(msg.Payload) + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) if strings.Contains(received, expected) { require.Nil(p2.Close()) break OUTER @@ -181,7 +188,11 @@ OUTER: t.Fatalf("Got error: %v", msg.Error.Error()) } - received += string(msg.Payload) + var frame sframer.StreamFrame + err := json.Unmarshal(msg.Payload, &frame) + assert.NoError(t, err) + + received += string(frame.Data) if strings.Contains(received, expected) { require.Nil(p2.Close()) break OUTER