diff --git a/input_file.go b/input_file.go index 3346aa161..c7a20c7f3 100644 --- a/input_file.go +++ b/input_file.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" ) @@ -20,6 +21,7 @@ type fileInputReader struct { data []byte file *os.File timestamp int64 + closed int32 // Value of 0 indicates that the file is still open. } func (f *fileInputReader) parseNext() error { @@ -36,8 +38,7 @@ func (f *fileInputReader) parseNext() error { } if err == io.EOF { - f.file.Close() - f.file = nil + f.Close() return err } } @@ -64,7 +65,8 @@ func (f *fileInputReader) ReadPayload() []byte { return f.data } func (f *fileInputReader) Close() error { - if f.file != nil { + if atomic.LoadInt32(&f.closed) == 0 { + atomic.StoreInt32(&f.closed, 1) f.file.Close() } @@ -79,7 +81,7 @@ func NewFileInputReader(path string) *fileInputReader { return nil } - r := &fileInputReader{file: file} + r := &fileInputReader{file: file, closed: 0} if strings.HasSuffix(path, ".gz") { gzReader, err := gzip.NewReader(file) if err != nil { @@ -125,12 +127,6 @@ func NewFileInput(path string, loop bool) (i *FileInput) { return } -type NextFileNotFound struct{} - -func (_ *NextFileNotFound) Error() string { - return "There is no new files" -} - func (i *FileInput) init() (err error) { defer i.mu.Unlock() i.mu.Lock() @@ -170,7 +166,7 @@ func (i *FileInput) String() string { // Find reader with smallest timestamp e.g next payload in row func (i *FileInput) nextReader() (next *fileInputReader) { for _, r := range i.readers { - if r == nil || r.file == nil { + if r == nil || atomic.LoadInt32(&r.closed) != 0 { continue }