Skip to content

Commit

Permalink
Improvements to the way delete_after_read works (#432)
Browse files Browse the repository at this point in the history
- Wait until end of poll cycle and then delete files from the primary
  goroutine
- Update checkpoints, even though in thoery files are consumed fully
- If file.Close fails, log at Warn level instead of Debug
  • Loading branch information
djaglowski authored Sep 17, 2021
1 parent f596ffb commit d677502
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
26 changes: 12 additions & 14 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,29 +162,27 @@ OUTER:
go func(r *Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
if f.deleteAfterRead {
r.Close()
if err := os.Remove(r.file.Name()); err != nil {
f.Errorf("could not delete %s", r.file.Name())
}
}
}(reader)
}

// Wait until all the reader goroutines are finished
wg.Wait()

if f.deleteAfterRead {
// No need to track files, since we only consume them once
return
}

for _, reader := range f.lastPollReaders {
reader.Close()
f.Debug("cleaning up log files that have been consumed")
for _, reader := range readers {
reader.Close()
if err := os.Remove(reader.file.Name()); err != nil {
f.Errorf("could not delete %s", reader.file.Name())
}
}
} else {
for _, reader := range f.lastPollReaders {
reader.Close()
}
f.lastPollReaders = readers
}

f.lastPollReaders = readers

f.saveCurrent(readers)
f.syncLastPollFiles()
}
Expand Down
2 changes: 1 addition & 1 deletion operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (f *Reader) readHeaders(ctx context.Context, msgBuf []byte) error {
func (f *Reader) Close() {
if f.file != nil {
if err := f.file.Close(); err != nil {
f.Debugf("Problem closing reader", "Error", err.Error())
f.Warnf("Problem closing reader", "Error", err.Error())
}
}
}
Expand Down

0 comments on commit d677502

Please sign in to comment.