Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix log streaming missing frames #11721

Merged
merged 15 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/11721.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
client: Fixed a bug where the allocation log streaming API was missing log frames that spanned log file rotation
```
43 changes: 29 additions & 14 deletions client/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,14 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
defer framer.Destroy()

// If we aren't following end as soon as we hit EOF
var eofCancelCh chan error
if !req.Follow {
eofCancelCh = make(chan error)
close(eofCancelCh)
}
cancelAfterFirstEof := !req.Follow

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start streaming
go func() {
if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, eofCancelCh); err != nil {
if err := f.streamFile(ctx, req.Offset, req.Path, req.Limit, fs, framer, nil, cancelAfterFirstEof); err != nil {
select {
case errCh <- err:
case <-ctx.Done():
Expand Down Expand Up @@ -578,21 +574,21 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
}

var eofCancelCh chan error
cancelAfterFirstEof := false
exitAfter := false
if !follow && idx > maxIndex {
// Exceeded what was there initially so return
return nil
} else if !follow && idx == maxIndex {
// At the end
eofCancelCh = make(chan error)
close(eofCancelCh)
cancelAfterFirstEof = true
exitAfter = true
} else {
eofCancelCh = blockUntilNextLog(ctx, fs, logPath, task, logType, idx+1)
}

p := filepath.Join(logPath, logEntry.Name)
err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh)
err = f.streamFile(ctx, openOffset, p, 0, fs, framer, eofCancelCh, cancelAfterFirstEof)

// Check if the context is cancelled
select {
Expand Down Expand Up @@ -637,10 +633,11 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in

// streamFile is the internal method to stream the content of a file. If limit
// is greater than zero, the stream will end once that many bytes have been
// read. eofCancelCh is used to cancel the stream if triggered while at EOF. If
// the connection is broken an EPIPE error is returned
// read. If eofCancelCh is triggered while at EOF, read one more frame and
// cancel the stream on the next EOF. If the connection is broken an EPIPE
// error is returned.
func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string, limit int64,
fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error) error {
fs allocdir.AllocDirFS, framer *sframer.StreamFramer, eofCancelCh chan error, cancelAfterFirstEof bool) error {

// Get the reader
file, err := fs.ReadAt(path, offset)
Expand All @@ -667,6 +664,9 @@ func (f *FileSystem) streamFile(ctx context.Context, offset int64, path string,
// read and reach EOF.
var changes *watch.FileChanges

// Only watch file when there is a need for it
cancelReceived := cancelAfterFirstEof

// Start streaming the data
bufSize := int64(streamFrameSize)
if limit > 0 && limit < streamFrameSize {
Expand Down Expand Up @@ -704,6 +704,14 @@ OUTER:
continue
}

// At this point we can stop without waiting for more changes,
// because we have EOF and either we're not following at all,
// or we received an event from the eofCancelCh channel
// and last read was executed
if cancelReceived {
return nil
}

// If EOF is hit, wait for a change to the file
if changes == nil {
changes, err = fs.ChangeEvents(waitCtx, path, offset)
Expand Down Expand Up @@ -752,12 +760,19 @@ OUTER:
return nil
case <-ctx.Done():
return nil
case err, ok := <-eofCancelCh:
case _, ok := <-eofCancelCh:
if !ok {
return nil
}

return err
tgross marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

// try to read one more frame to avoid dropped entries
// during log rotation
cancelReceived = true
continue OUTER
arkadiuss marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
34 changes: 26 additions & 8 deletions client/fs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ func TestFS_streamFile_NoFile(t *testing.T) {
defer framer.Destroy()

err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, "foo", 0, ad, framer, nil)
context.Background(), 0, "foo", 0, ad, framer, nil, false)
require.Error(t, err)
if runtime.GOOS == "windows" {
require.Contains(t, err.Error(), "cannot find the file")
Expand Down Expand Up @@ -1629,7 +1629,7 @@ func TestFS_streamFile_Modify(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1704,7 +1704,7 @@ func TestFS_streamFile_Truncate(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1808,7 +1808,7 @@ func TestFS_streamImpl_Delete(t *testing.T) {
// Start streaming
go func() {
if err := c.endpoints.FileSystem.streamFile(
context.Background(), 0, streamFile, 0, ad, framer, nil); err != nil {
context.Background(), 0, streamFile, 0, ad, framer, nil, false); err != nil {
t.Fatalf("stream() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1918,14 +1918,30 @@ func TestFS_logsImpl_Follow(t *testing.T) {
expected := []byte("012345")
initialWrites := 3

writeToFile := func(index int, data []byte) {
filePath := func(index int) string {
logFile := fmt.Sprintf("%s.%s.%d", task, logType, index)
logFilePath := filepath.Join(logDir, logFile)
return filepath.Join(logDir, logFile)
}
writeToFile := func(index int, data []byte) {
logFilePath := filePath(index)
err := ioutil.WriteFile(logFilePath, data, 0777)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}
}
appendToFile := func(index int, data []byte) {
logFilePath := filePath(index)
f, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
t.Fatalf("Failed to create file: %v", err)
}

defer f.Close()

if _, err = f.Write(data); err != nil {
t.Fatalf("Failed to write file: %v", err)
}
}
for i := 0; i < initialWrites; i++ {
writeToFile(i, expected[i:i+1])
}
Expand Down Expand Up @@ -1967,11 +1983,13 @@ func TestFS_logsImpl_Follow(t *testing.T) {
t.Fatalf("did not receive data: got %q", string(received))
}

// We got the first chunk of data, write out the rest to the next file
// We got the first chunk of data, write out the rest splitted
// between the last file and to the next file
// at an index much ahead to check that it is following and detecting
// skips
skipTo := initialWrites + 10
writeToFile(skipTo, expected[initialWrites:])
appendToFile(initialWrites-1, expected[initialWrites:initialWrites+1])
writeToFile(skipTo, expected[initialWrites+1:])

select {
case <-fullResultCh:
Expand Down