Skip to content

Commit

Permalink
Fix log streaming missing frames (#11721)
Browse files Browse the repository at this point in the history
Perform one more read after receiving cancel when streaming file from the allocation API
  • Loading branch information
arkadiuss authored Jan 4, 2022
1 parent 500cef5 commit aa21628
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .changelog/11721.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: Fixed a bug where the allocation log streaming API was missing log frames that spanned log file rotation
```
43 changes: 29 additions & 14 deletions client/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,14 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
defer framer.Destroy()

// If we aren't following end as soon as we hit EOF
var eofCancelCh chan error
if !req.Follow {
eofCancelCh = make(chan error)
close(eofCancelCh)
}
cancelAfterFirstEof := !req.Follow

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start streaming
go func() {
if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh); err != nil {
if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, nil, cancelAfterFirstEof); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
Expand Down Expand Up @@ -578,21 +574,21 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
}

var eofCancelCh chan error
cancelAfterFirstEof := false
exitAfter := false
if !follow && idx > maxIndex {
// Exceeded what was there initially so return
return nil
} else if !follow && idx == maxIndex {
// At the end
eofCancelCh = make(chan error)
close(eofCancelCh)
cancelAfterFirstEof = true
exitAfter = true
} else {
eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1)
}

p := filepath.Join(logPath, logEntry.Name)
err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh)
err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, cancelAfterFirstEof)

// Check if the context is cancelled
select {
Expand Down Expand Up @@ -637,10 +633,11 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in

// streamFile is the internal method to stream the content of a file. If limit
// is greater than zero, the stream will end once that many bytes have been
// read. eofCancelCh is used to cancel the stream if triggered while at EOF. If
// the connection is broken an EPIPE error is returned
// read. If eofCancelCh is triggered while at EOF, read one more frame and
// cancel the stream on the next EOF. If the connection is broken an EPIPE
// error is returned.
func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64,
fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error) error {
fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, cancelAfterFirstEof bool) error {

// Get the reader
file, err := fs.ReadAt(path, offset)
Expand All @@ -667,6 +664,9 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string,
// read and reach EOF.
var changes *watch.FileChanges

// Only watch file when there is a need for it
cancelReceived := cancelAfterFirstEof

// Start streaming the data
bufSize := int64(streamFrameSize)
if limit > 0 && limit < streamFrameSize {
Expand Down Expand Up @@ -704,6 +704,14 @@ OUTER:
continue
}

// At this point we can stop without waiting for more changes,
// because we have EOF and either we're not following at all,
// or we received an event from the eofCancelCh channel
// and last read was executed
if cancelReceived {
return nil
}

// If EOF is hit, wait for a change to the file
if changes == nil {
changes, err = fs.ChangeEvents(waitCtx, path, offset)
Expand Down Expand Up @@ -752,12 +760,19 @@ OUTER:
return nil
case <-ctx.Done():
return nil
case err, ok := <-eofCancelCh:
case _, ok := <-eofCancelCh:
if !ok {
return nil
}

return err
if err != nil {
return err
}

// try to read one more frame to avoid dropped entries
// during log rotation
cancelReceived = true
continue OUTER
}
}
}
Expand Down
34 changes: 26 additions & 8 deletions client/fs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ func TestFS_streamFile_NoFile(t *testing.T) {
defer framer.Destroy()

err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, "foo", 0, ad, framer, nil)
context.Background(), 0, "foo", 0, ad, framer, nil, false)
require.Error(t, err)
if runtime.GOOS == "windows" {
require.Contains(t, err.Error(), "cannot find the file")
Expand Down Expand Up @@ -1629,7 +1629,7 @@ func TestFS_streamFile_Modify(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1704,7 +1704,7 @@ func TestFS_streamFile_Truncate(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1808,7 +1808,7 @@ func TestFS_streamImpl_Delete(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1918,14 +1918,30 @@ func TestFS_logsImpl_Follow(t *testing.T) {
expected := []byte("012345")
initialWrites := 3

writeToFile := func(index int, data []byte) {
filePath := func(index int) string {
logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
logFilePath := filepath.Join(logDir, logFile)
return filepath.Join(logDir, logFile)
}
writeToFile := func(index int, data []byte) {
logFilePath := filePath(index)
err := ioutil.WriteFile(logFilePath, data, 0777)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
}
appendToFile := func(index int, data []byte) {
logFilePath := filePath(index)
f, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}

defer f.Close()

if _, err = f.Write(data); err != nil {
t.Fatalf("Failed to write file: %v", err)
}
}
for i := 0; i < initialWrites; i++ {
writeToFile(i, expected[i:i+1])
}
Expand Down Expand Up @@ -1967,11 +1983,13 @@ func TestFS_logsImpl_Follow(t *testing.T) {
t.Fatalf("did not receive data: got %q", string(received))
}

// We got the first chunk of data, write out the rest to the next file
// We got the first chunk of data, write out the rest splitted
// between the last file and to the next file
// at an index much ahead to check that it is following and detecting
// skips
skipTo := initialWrites + 10
writeToFile(skipTo, expected[initialWrites:])
appendToFile(initialWrites-1, expected[initialWrites:initialWrites+1])
writeToFile(skipTo, expected[initialWrites+1:])

select {
case <-fullResultCh:
Expand Down

0 comments on commit aa21628

Please sign in to comment.