diff --git a/input_file.go b/input_file.go index d1e42308..fee47786 100644 --- a/input_file.go +++ b/input_file.go @@ -63,6 +63,8 @@ type fileInputReader struct { s3 bool queue payloadQueue readDepth int + dryRun bool + path string } func (f *fileInputReader) parse(init chan struct{}) error { @@ -70,8 +72,11 @@ func (f *fileInputReader) parse(init chan struct{}) error { var buffer bytes.Buffer var initialized bool + lineNum := 0 + for { line, err := f.reader.ReadBytes('\n') + lineNum++ if err != nil { if err != io.EOF { @@ -92,6 +97,12 @@ func (f *fileInputReader) parse(init chan struct{}) error { asBytes := buffer.Bytes() meta := payloadMeta(asBytes) + if len(meta) < 3 { + Debug(1, fmt.Sprintf("Found malformed record, file: %s, line %d", f.path, lineNum)) + buffer = bytes.Buffer{} + continue + } + timestamp, _ := strconv.ParseInt(string(meta[2]), 10, 64) data := asBytes[:len(asBytes)-1] @@ -112,7 +123,9 @@ func (f *fileInputReader) parse(init chan struct{}) error { initialized = true } - time.Sleep(100 * time.Millisecond) + if !f.dryRun { + time.Sleep(100 * time.Millisecond) + } } buffer = bytes.Buffer{} @@ -133,7 +146,9 @@ func (f *fileInputReader) wait() { return } - time.Sleep(100 * time.Millisecond) + if !f.dryRun { + time.Sleep(100 * time.Millisecond) + } } return @@ -149,7 +164,7 @@ func (f *fileInputReader) Close() error { return nil } -func newFileInputReader(path string, readDepth int) *fileInputReader { +func newFileInputReader(path string, readDepth int, dryRun bool) *fileInputReader { var file io.ReadCloser var err error @@ -164,7 +179,7 @@ func newFileInputReader(path string, readDepth int) *fileInputReader { return nil } - r := &fileInputReader{file: file, closed: 0, readDepth: readDepth} + r := &fileInputReader{path: path, file: file, closed: 0, readDepth: readDepth, dryRun: dryRun} if strings.HasSuffix(path, ".gz") { gzReader, err := gzip.NewReader(file) if err != nil { @@ -262,7 +277,7 @@ func (i *FileInput) init() (err error) { i.readers = make([]*fileInputReader, len(matches)) for idx, p := range matches { - i.readers[idx] = newFileInputReader(p, i.readDepth) + i.readers[idx] = newFileInputReader(p, i.readDepth, i.dryRun) } i.stats.Add("reader_count", int64(len(matches)))