From 7e938bd3adf259bb8fecfceca9cadf83f998304c Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Wed, 3 Jun 2020 22:35:47 +0530 Subject: [PATCH 1/2] Fix race input_file.go while closing the file and remove dead code. --- input_file.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/input_file.go b/input_file.go index 3346aa161..a0aa3e30f 100644 --- a/input_file.go +++ b/input_file.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" ) @@ -20,6 +21,7 @@ type fileInputReader struct { data []byte file *os.File timestamp int64 + closed int32 // Value of 0 indicates open. } func (f *fileInputReader) parseNext() error { @@ -36,8 +38,7 @@ func (f *fileInputReader) parseNext() error { } if err == io.EOF { - f.file.Close() - f.file = nil + f.Close() return err } } @@ -64,7 +65,9 @@ func (f *fileInputReader) ReadPayload() []byte { return f.data } func (f *fileInputReader) Close() error { - if f.file != nil { + closed := atomic.LoadInt32(&f.closed) + if closed == 0 { + atomic.StoreInt32(&f.closed, 1) f.file.Close() } @@ -79,7 +82,7 @@ func NewFileInputReader(path string) *fileInputReader { return nil } - r := &fileInputReader{file: file} + r := &fileInputReader{file: file, closed: 0} if strings.HasSuffix(path, ".gz") { gzReader, err := gzip.NewReader(file) if err != nil { @@ -125,12 +128,6 @@ func NewFileInput(path string, loop bool) (i *FileInput) { return } -type NextFileNotFound struct{} - -func (_ *NextFileNotFound) Error() string { - return "There is no new files" -} - func (i *FileInput) init() (err error) { defer i.mu.Unlock() i.mu.Lock() @@ -170,7 +167,7 @@ func (i *FileInput) String() string { // Find reader with smallest timestamp e.g next payload in row func (i *FileInput) nextReader() (next *fileInputReader) { for _, r := range i.readers { - if r == nil || r.file == nil { + if r == nil || atomic.LoadInt32(&r.closed) != 0 { continue } From ccdf96ccaceb73de7a1fefbbbc1a1ad68628b3d5 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Wed, 3 Jun 2020 22:41:12 +0530 Subject: [PATCH 2/2] Minor changes. --- input_file.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/input_file.go b/input_file.go index a0aa3e30f..c7a20c7f3 100644 --- a/input_file.go +++ b/input_file.go @@ -21,7 +21,7 @@ type fileInputReader struct { data []byte file *os.File timestamp int64 - closed int32 // Value of 0 indicates open. + closed int32 // Value of 0 indicates that the file is still open. } func (f *fileInputReader) parseNext() error { @@ -65,8 +65,7 @@ func (f *fileInputReader) ReadPayload() []byte { return f.data } func (f *fileInputReader) Close() error { - closed := atomic.LoadInt32(&f.closed) - if closed == 0 { + if atomic.LoadInt32(&f.closed) == 0 { atomic.StoreInt32(&f.closed, 1) f.file.Close() }