Skip to content

Commit

Permalink
Merge pull request #764 from buger/fix-race-fileinput
Browse files Browse the repository at this point in the history
Fix race input_file.go while closing the file and remove dead code.
  • Loading branch information
arijitAD authored Jun 4, 2020
2 parents dd92459 + ccdf96c commit 4987dcd
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions input_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -20,6 +21,7 @@ type fileInputReader struct {
data []byte
file *os.File
timestamp int64
closed int32 // Value of 0 indicates that the file is still open.
}

func (f *fileInputReader) parseNext() error {
Expand All @@ -36,8 +38,7 @@ func (f *fileInputReader) parseNext() error {
}

if err == io.EOF {
f.file.Close()
f.file = nil
f.Close()
return err
}
}
Expand All @@ -64,7 +65,8 @@ func (f *fileInputReader) ReadPayload() []byte {
return f.data
}
func (f *fileInputReader) Close() error {
if f.file != nil {
if atomic.LoadInt32(&f.closed) == 0 {
atomic.StoreInt32(&f.closed, 1)
f.file.Close()
}

Expand All @@ -79,7 +81,7 @@ func NewFileInputReader(path string) *fileInputReader {
return nil
}

r := &fileInputReader{file: file}
r := &fileInputReader{file: file, closed: 0}
if strings.HasSuffix(path, ".gz") {
gzReader, err := gzip.NewReader(file)
if err != nil {
Expand Down Expand Up @@ -125,12 +127,6 @@ func NewFileInput(path string, loop bool) (i *FileInput) {
return
}

type NextFileNotFound struct{}

func (_ *NextFileNotFound) Error() string {
return "There is no new files"
}

func (i *FileInput) init() (err error) {
defer i.mu.Unlock()
i.mu.Lock()
Expand Down Expand Up @@ -170,7 +166,7 @@ func (i *FileInput) String() string {
// Find reader with smallest timestamp e.g next payload in row
func (i *FileInput) nextReader() (next *fileInputReader) {
for _, r := range i.readers {
if r == nil || r.file == nil {
if r == nil || atomic.LoadInt32(&r.closed) != 0 {
continue
}

Expand Down

0 comments on commit 4987dcd

Please sign in to comment.