From d677502e37979efeefe5750846c6e964b2f92bd3 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Fri, 17 Sep 2021 16:49:41 -0400 Subject: [PATCH] Improvements to the way delete_after_read works (#432) - Wait until end of poll cycle and then delete files from the primary goroutine - Update checkpoints, even though in thoery files are consumed fully - If file.Close fails, log at Warn level instead of Debug --- operator/builtin/input/file/file.go | 26 ++++++++++++-------------- operator/builtin/input/file/reader.go | 2 +- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 822676655..ea95f9064 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -162,12 +162,6 @@ OUTER: go func(r *Reader) { defer wg.Done() r.ReadToEnd(ctx) - if f.deleteAfterRead { - r.Close() - if err := os.Remove(r.file.Name()); err != nil { - f.Errorf("could not delete %s", r.file.Name()) - } - } }(reader) } @@ -175,16 +169,20 @@ OUTER: wg.Wait() if f.deleteAfterRead { - // No need to track files, since we only consume them once - return - } - - for _, reader := range f.lastPollReaders { - reader.Close() + f.Debug("cleaning up log files that have been consumed") + for _, reader := range readers { + reader.Close() + if err := os.Remove(reader.file.Name()); err != nil { + f.Errorf("could not delete %s", reader.file.Name()) + } + } + } else { + for _, reader := range f.lastPollReaders { + reader.Close() + } + f.lastPollReaders = readers } - f.lastPollReaders = readers - f.saveCurrent(readers) f.syncLastPollFiles() } diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 4cb268b23..35778e45d 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -174,7 +174,7 @@ func (f *Reader) readHeaders(ctx context.Context, msgBuf []byte) error { func (f *Reader) Close() { if f.file != nil { if err := f.file.Close(); err != nil { - f.Debugf("Problem closing reader", "Error", err.Error()) + f.Warnf("Problem closing reader", "Error", err.Error()) } } }