Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race in StreamFramer and truncation in api/AllocFS.Logs #4234

Merged
merged 10 commits into from
May 4, 2018
12 changes: 10 additions & 2 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,15 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
// The chan will be closed when follow=false and the end of the file is
// reached.
//
// Unexpected (non-EOF) errors will be sent on the error chan.
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {

errCh := make(chan error, 1)

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
Expand Down Expand Up @@ -315,8 +320,11 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
errCh <- err
close(frames)
if err == io.EOF || err == io.ErrClosedPipe {
close(frames)
} else {
errCh <- err
}
return
}

Expand Down
148 changes: 148 additions & 0 deletions api/fs_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,162 @@
package api

import (
"bytes"
"fmt"
"io"
"reflect"
"strings"
"testing"
"time"

units "github.com/docker/go-units"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFS_Logs(t *testing.T) {
t.Parallel()
require := require.New(t)
rpcPort := 0
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
rpcPort = c.Ports.RPC
c.Client = &testutil.ClientConfig{
Enabled: true,
}
})
defer s.Stop()

//TODO There should be a way to connect the client to the servers in
//makeClient above
require.NoError(c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't makeClient already do this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't! This makeClient helper is very strange and all of the tests in api/ that expect a client node seem to be commented out! For example: https://github.com/hashicorp/nomad/blob/master/api/allocations_test.go#L33-L59

I'm honestly not sure what the right approach is here. Either we need to improve this test client or we need to remove it and use one of the other test client helpers we have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That test client just serializes a config and starts the agent with it. By default it only creates the server. You may want to consider just using a dev agent.


index := uint64(0)
testutil.WaitForResult(func() (bool, error) {
nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
index = qm.LastIndex
if len(nodes) != 1 {
return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes))
}
if nodes[0].Status != "ready" {
return false, fmt.Errorf("node not ready: %s", nodes[0].Status)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

var input strings.Builder
input.Grow(units.MB)
lines := 80 * units.KB
for i := 0; i < lines; i++ {
fmt.Fprintf(&input, "%d\n", i)
}

job := &Job{
ID: helper.StringToPtr("TestFS_Logs"),
Region: helper.StringToPtr("global"),
Datacenters: []string{"dc1"},
Type: helper.StringToPtr("batch"),
TaskGroups: []*TaskGroup{
{
Name: helper.StringToPtr("TestFS_LogsGroup"),
Tasks: []*Task{
{
Name: "logger",
Driver: "mock_driver",
Config: map[string]interface{}{
"stdout_string": input.String(),
},
},
},
},
},
}

jobs := c.Jobs()
jobResp, _, err := jobs.Register(job, nil)
require.NoError(err)

index = jobResp.EvalCreateIndex
evals := c.Evaluations()
testutil.WaitForResult(func() (bool, error) {
evalResp, qm, err := evals.Info(jobResp.EvalID, &QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
if evalResp.BlockedEval != "" {
t.Fatalf("Eval blocked: %s", pretty.Sprint(evalResp))
}
index = qm.LastIndex
if evalResp.Status != "complete" {
return false, fmt.Errorf("eval status: %v", evalResp.Status)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

allocID := ""
testutil.WaitForResult(func() (bool, error) {
allocs, _, err := jobs.Allocations(*job.ID, true, &QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
if len(allocs) != 1 {
return false, fmt.Errorf("unexpected number of allocs: %d", len(allocs))
}
if allocs[0].ClientStatus != "complete" {
return false, fmt.Errorf("alloc not complete: %s", allocs[0].ClientStatus)
}
allocID = allocs[0].ID
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

alloc, _, err := c.Allocations().Info(allocID, nil)
require.NoError(err)

for i := 0; i < 3; i++ {
stopCh := make(chan struct{})
defer close(stopCh)

frames, errors := c.AllocFS().Logs(alloc, false, "logger", "stdout", "start", 0, stopCh, nil)

var result bytes.Buffer
READ_FRAMES:
for {
select {
case f := <-frames:
if f == nil {
break READ_FRAMES
}
result.Write(f.Data)
case err := <-errors:
// Don't Fatal here as the other assertions may
// contain helpeful information.
t.Errorf("Error: %v", err)
}
}

// Check length
assert.Equal(t, input.Len(), result.Len(), "file size mismatch")

// Check complete ordering
for i := 0; i < lines; i++ {
line, err := result.ReadBytes('\n')
require.NoErrorf(err, "unexpected error on line %d: %v", i, err)
require.Equal(fmt.Sprintf("%d\n", i), string(line))
}
}
}

func TestFS_FrameReader(t *testing.T) {
t.Parallel()
// Create a channel of the frames and a cancel channel
Expand Down
21 changes: 10 additions & 11 deletions client/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ OUTER:
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
Expand Down Expand Up @@ -405,8 +406,6 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {

frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)

// Start streaming
go func() {
Expand All @@ -423,20 +422,23 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
go func() {
for {
if _, err := conn.Read(nil); err != nil {
if err == io.EOF {
if err == io.EOF || err == io.ErrClosedPipe {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@dadgar dadgar May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here 51440e2

// One end of the pipe was explicitly closed, exit cleanly
cancel()
return
}
select {
case errCh <- err:
case <-ctx.Done():
return
}
return
}
}
}()

var streamErr error
buf := new(bytes.Buffer)
frameCodec := codec.NewEncoder(buf, structs.JsonHandle)
OUTER:
for {
select {
Expand All @@ -455,6 +457,7 @@ OUTER:
streamErr = err
break OUTER
}
frameCodec.Reset(buf)

resp.Payload = buf.Bytes()
buf.Reset()
Expand All @@ -464,6 +467,7 @@ OUTER:
streamErr = err
break OUTER
}
encoder.Reset(conn)
}
}

Expand Down Expand Up @@ -576,12 +580,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
// #3342
select {
case <-framer.ExitCh():
err := parseFramerErr(framer.Err())
if err == syscall.EPIPE {
// EPIPE just means the connection was closed
return nil
}
return err
return nil
default:
}

Expand Down Expand Up @@ -705,7 +704,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return parseFramerErr(framer.Err())
return nil
case <-ctx.Done():
return nil
case err, ok := <-eofCancelCh:
Expand Down
Loading