diff --git a/api/fs.go b/api/fs.go index 45d2a676a62..107e553030c 100644 --- a/api/fs.go +++ b/api/fs.go @@ -254,10 +254,15 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, // * cancel: A channel that when closed, streaming will end. // // The return value is a channel that will emit StreamFrames as they are read. +// The chan will be closed when follow=false and the end of the file is +// reached. +// +// Unexpected (non-EOF) errors will be sent on the error chan. func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string, offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { errCh := make(chan error, 1) + nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) if err != nil { errCh <- err @@ -315,8 +320,11 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str // Decode the next frame var frame StreamFrame if err := dec.Decode(&frame); err != nil { - errCh <- err - close(frames) + if err == io.EOF || err == io.ErrClosedPipe { + close(frames) + } else { + errCh <- err + } return } diff --git a/api/fs_test.go b/api/fs_test.go index bd9219fc28a..790d2ca7c79 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -1,14 +1,162 @@ package api import ( + "bytes" "fmt" "io" "reflect" "strings" "testing" "time" + + units "github.com/docker/go-units" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/testutil" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +func TestFS_Logs(t *testing.T) { + t.Parallel() + require := require.New(t) + rpcPort := 0 + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + rpcPort = c.Ports.RPC + c.Client = &testutil.ClientConfig{ + Enabled: true, + } + }) + defer s.Stop() + + //TODO There should be a way to connect the client to the servers in + //makeClient above + require.NoError(c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)})) + + index := uint64(0) + testutil.WaitForResult(func() (bool, error) { + nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + index = qm.LastIndex + if len(nodes) != 1 { + return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes)) + } + if nodes[0].Status != "ready" { + return false, fmt.Errorf("node not ready: %s", nodes[0].Status) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + var input strings.Builder + input.Grow(units.MB) + lines := 80 * units.KB + for i := 0; i < lines; i++ { + fmt.Fprintf(&input, "%d\n", i) + } + + job := &Job{ + ID: helper.StringToPtr("TestFS_Logs"), + Region: helper.StringToPtr("global"), + Datacenters: []string{"dc1"}, + Type: helper.StringToPtr("batch"), + TaskGroups: []*TaskGroup{ + { + Name: helper.StringToPtr("TestFS_LogsGroup"), + Tasks: []*Task{ + { + Name: "logger", + Driver: "mock_driver", + Config: map[string]interface{}{ + "stdout_string": input.String(), + }, + }, + }, + }, + }, + } + + jobs := c.Jobs() + jobResp, _, err := jobs.Register(job, nil) + require.NoError(err) + + index = jobResp.EvalCreateIndex + evals := c.Evaluations() + testutil.WaitForResult(func() (bool, error) { + evalResp, qm, err := evals.Info(jobResp.EvalID, &QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + if evalResp.BlockedEval != "" { + t.Fatalf("Eval blocked: %s", pretty.Sprint(evalResp)) + } + index = qm.LastIndex + if evalResp.Status != "complete" { + return false, fmt.Errorf("eval status: %v", evalResp.Status) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + allocID := "" + testutil.WaitForResult(func() (bool, error) { + allocs, _, err := jobs.Allocations(*job.ID, true, &QueryOptions{WaitIndex: index}) + if err != nil { + return false, err + } + if len(allocs) != 1 { + return false, fmt.Errorf("unexpected number of allocs: %d", len(allocs)) + } + if allocs[0].ClientStatus != "complete" { + return false, fmt.Errorf("alloc not complete: %s", allocs[0].ClientStatus) + } + allocID = allocs[0].ID + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + alloc, _, err := c.Allocations().Info(allocID, nil) + require.NoError(err) + + for i := 0; i < 3; i++ { + stopCh := make(chan struct{}) + defer close(stopCh) + + frames, errors := c.AllocFS().Logs(alloc, false, "logger", "stdout", "start", 0, stopCh, nil) + + var result bytes.Buffer + READ_FRAMES: + for { + select { + case f := <-frames: + if f == nil { + break READ_FRAMES + } + result.Write(f.Data) + case err := <-errors: + // Don't Fatal here as the other assertions may + // contain helpeful information. + t.Errorf("Error: %v", err) + } + } + + // Check length + assert.Equal(t, input.Len(), result.Len(), "file size mismatch") + + // Check complete ordering + for i := 0; i < lines; i++ { + line, err := result.ReadBytes('\n') + require.NoErrorf(err, "unexpected error on line %d: %v", i, err) + require.Equal(fmt.Sprintf("%d\n", i), string(line)) + } + } +} + func TestFS_FrameReader(t *testing.T) { t.Parallel() // Create a channel of the frames and a cancel channel diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index eaff009c7df..904c5cf49e8 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -294,6 +294,7 @@ OUTER: streamErr = err break OUTER } + encoder.Reset(conn) case <-ctx.Done(): break OUTER } @@ -405,8 +406,6 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { frames := make(chan *sframer.StreamFrame, streamFramesBuffer) errCh := make(chan error) - var buf bytes.Buffer - frameCodec := codec.NewEncoder(&buf, structs.JsonHandle) // Start streaming go func() { @@ -423,20 +422,23 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { go func() { for { if _, err := conn.Read(nil); err != nil { - if err == io.EOF { + if err == io.EOF || err == io.ErrClosedPipe { + // One end of the pipe was explicitly closed, exit cleanly cancel() return } select { case errCh <- err: case <-ctx.Done(): - return } + return } } }() var streamErr error + buf := new(bytes.Buffer) + frameCodec := codec.NewEncoder(buf, structs.JsonHandle) OUTER: for { select { @@ -455,6 +457,7 @@ OUTER: streamErr = err break OUTER } + frameCodec.Reset(buf) resp.Payload = buf.Bytes() buf.Reset() @@ -464,6 +467,7 @@ OUTER: streamErr = err break OUTER } + encoder.Reset(conn) } } @@ -576,12 +580,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // #3342 select { case <-framer.ExitCh(): - err := parseFramerErr(framer.Err()) - if err == syscall.EPIPE { - // EPIPE just means the connection was closed - return nil - } - return err + return nil default: } @@ -705,7 +704,7 @@ OUTER: lastEvent = truncateEvent continue OUTER case <-framer.ExitCh(): - return parseFramerErr(framer.Err()) + return nil case <-ctx.Done(): return nil case err, ok := <-eofCancelCh: diff --git a/client/lib/streamframer/framer.go b/client/lib/streamframer/framer.go index b0caa4a047b..812f775c643 100644 --- a/client/lib/streamframer/framer.go +++ b/client/lib/streamframer/framer.go @@ -55,8 +55,18 @@ func (s *StreamFrame) IsCleared() bool { } } +func (s *StreamFrame) Copy() *StreamFrame { + n := new(StreamFrame) + *n = *s + n.Data = make([]byte, len(s.Data)) + copy(n.Data, s.Data) + return n +} + // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { + // out is where frames are sent and is closed when no more frames will + // be sent. out chan<- *StreamFrame frameSize int @@ -64,21 +74,26 @@ type StreamFramer struct { heartbeat *time.Ticker flusher *time.Ticker - shutdown bool + // shutdown is true when a shutdown is triggered + shutdown bool + + // shutdownCh is closed when no more Send()s will be called and run() + // should flush pending frames before closing exitCh shutdownCh chan struct{} - exitCh chan struct{} + + // exitCh is closed when the run() goroutine exits and no more frames + // will be sent. + exitCh chan struct{} // The mutex protects everything below l sync.Mutex // The current working frame - f StreamFrame + f *StreamFrame data *bytes.Buffer - // Captures whether the framer is running and any error that occurred to - // cause it to stop. + // Captures whether the framer is running running bool - err error } // NewStreamFramer creates a new stream framer that will output StreamFrames to @@ -95,6 +110,7 @@ func NewStreamFramer(out chan<- *StreamFrame, frameSize: frameSize, heartbeat: heartbeat, flusher: flusher, + f: new(StreamFrame), data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)), shutdownCh: make(chan struct{}), exitCh: make(chan struct{}), @@ -121,6 +137,8 @@ func (s *StreamFramer) Destroy() { if running { <-s.exitCh } + + // Close out chan only after exitCh has exited if !wasShutdown { close(s.out) } @@ -144,21 +162,12 @@ func (s *StreamFramer) ExitCh() <-chan struct{} { return s.exitCh } -// Err returns the error that caused the StreamFramer to exit -func (s *StreamFramer) Err() error { - s.l.Lock() - defer s.l.Unlock() - return s.err -} - // run is the internal run method. It exits if Destroy is called or an error // occurs, in which case the exit channel is closed. func (s *StreamFramer) run() { - var err error defer func() { s.l.Lock() s.running = false - s.err = err s.l.Unlock() close(s.exitCh) }() @@ -177,40 +186,46 @@ OUTER: } // Read the data for the frame, and send it - s.f.Data = s.readData() - err = s.send(&s.f) - s.f.Clear() + s.send() s.l.Unlock() - if err != nil { - return - } case <-s.heartbeat.C: // Send a heartbeat frame - if err = s.send(HeartbeatStreamFrame); err != nil { - return + select { + case s.out <- HeartbeatStreamFrame: + case <-s.shutdownCh: } } } s.l.Lock() + // Send() may have left a partial frame. Send it now. if !s.f.IsCleared() { s.f.Data = s.readData() - err = s.send(&s.f) - s.f.Clear() + + // Only send if there's actually data left + if len(s.f.Data) > 0 { + // Cannot select on shutdownCh as it's already closed + // Cannot select on exitCh as it's only closed after this exits + s.out <- s.f.Copy() + } } s.l.Unlock() } // send takes a StreamFrame, encodes and sends it -func (s *StreamFramer) send(f *StreamFrame) error { - sending := *f - f.Data = nil +func (s *StreamFramer) send() { + // Ensure s.out has not already been closd by Destroy + select { + case <-s.exitCh: + return + default: + } + s.f.Data = s.readData() select { - case s.out <- &sending: - return nil + case s.out <- s.f.Copy(): + s.f.Clear() case <-s.exitCh: - return nil } } @@ -236,31 +251,16 @@ func (s *StreamFramer) readData() []byte { func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error { s.l.Lock() defer s.l.Unlock() - // If we are not running, return the error that caused us to not run or // indicated that it was never started. if !s.running { - if s.err != nil { - return s.err - } - return fmt.Errorf("StreamFramer not running") } // Check if not mergeable if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) { // Flush the old frame - s.f.Data = s.readData() - select { - case <-s.exitCh: - return nil - default: - } - err := s.send(&s.f) - s.f.Clear() - if err != nil { - return err - } + s.send() } // Store the new data as the current frame. @@ -285,25 +285,24 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e force = false } - // Create a new frame to send it - s.f.Data = s.readData() + // Ensure s.out has not already been closed by Destroy select { case <-s.exitCh: return nil default: } - if err := s.send(&s.f); err != nil { - return err + // Create a new frame to send it + s.f.Data = s.readData() + select { + case s.out <- s.f.Copy(): + case <-s.exitCh: + return nil } // Update the offset s.f.Offset += int64(len(s.f.Data)) } - if s.data.Len() == 0 { - s.f.Clear() - } - return nil } diff --git a/client/lib/streamframer/framer_test.go b/client/lib/streamframer/framer_test.go index 13be32141ab..12d5fefaa88 100644 --- a/client/lib/streamframer/framer_test.go +++ b/client/lib/streamframer/framer_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/hashicorp/nomad/testutil" + "github.com/kr/pretty" ) // This test checks, that even if the frame size has not been hit, a flush will @@ -132,8 +133,8 @@ func TestStreamFramer_Batch(t *testing.T) { t.Fatalf("exit channel should close") } - if _, ok := <-frames; ok { - t.Fatal("out channel should be closed") + if f, ok := <-frames; ok { + t.Fatalf("out channel should be closed. recv: %s", pretty.Sprint(f)) } } diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index c19684c8983..9da2c4f6c47 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -338,15 +338,16 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, return nil, CodedError(500, handlerErr.Error()) } - p1, p2 := net.Pipe() - decoder := codec.NewDecoder(p1, structs.MsgpackHandle) - encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + // Create a pipe connecting the (possibly remote) handler to the http response + httpPipe, handlerPipe := net.Pipe() + decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle) + encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle) // Create a goroutine that closes the pipe if the connection closes. ctx, cancel := context.WithCancel(req.Context()) go func() { <-ctx.Done() - p1.Close() + httpPipe.Close() }() // Create an output that gets flushed on every write @@ -355,10 +356,11 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, // Create a channel that decodes the results errCh := make(chan HTTPCodedError) go func() { + defer cancel() + // Send the request if err := encoder.Encode(args); err != nil { errCh <- CodedError(500, err.Error()) - cancel() return } @@ -366,7 +368,6 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, select { case <-ctx.Done(): errCh <- nil - cancel() return default: } @@ -374,29 +375,29 @@ func (s *HTTPServer) fsStreamImpl(resp http.ResponseWriter, var res cstructs.StreamErrWrapper if err := decoder.Decode(&res); err != nil { errCh <- CodedError(500, err.Error()) - cancel() return } + decoder.Reset(httpPipe) if err := res.Error; err != nil { if err.Code != nil { errCh <- CodedError(int(*err.Code), err.Error()) - cancel() return } } - if _, err := io.Copy(output, bytes.NewBuffer(res.Payload)); err != nil { + if _, err := io.Copy(output, bytes.NewReader(res.Payload)); err != nil { errCh <- CodedError(500, err.Error()) - cancel() return } } }() - handler(p2) + handler(handlerPipe) cancel() codedErr := <-errCh + + // Ignore EOF and ErrClosedPipe errors. if codedErr != nil && (codedErr == io.EOF || strings.Contains(codedErr.Error(), "closed") ||