Skip to content

Commit

Permalink
Track files that are rotated out of Include parameter by keeping file…
Browse files Browse the repository at this point in the history
…s open till next poll cycle

Instead of closing all files at the end of poll cycle, keep them open till next poll cycle. Only after opening all matched files, consume files that kept open from previous poll and close. By having this "overlap" at each cycle, we make sure we have trace of moved away files and consume any logs that were written to it but not yet read before being rotated.

In addition, there were "Move/Create" rotation tests not being skipped for windows. I skipped those.

Fix test cases for new implementation and skip move-create tests for windows (#182)
  • Loading branch information
rockb1017 authored Jun 16, 2021
1 parent 44b6bf5 commit cdbb6d6
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 43 deletions.
7 changes: 6 additions & 1 deletion docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The `file_input` operator reads logs from files. It will place the lines read in
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds this number, then files will be processed in batches. One batch will be processed per `poll_interval`. |
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |

Expand All @@ -37,6 +37,11 @@ match either the beginning of a new log entry, or the end of a log entry.

Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.

### File rotation

When files are rotated and its new names are no longer captured in `include` pattern (i.e. tailing symlink files), it could result in data loss.
To avoid the data loss, choose move/create rotation method and set `max_concurrent_files` higher than the twice of the number of files to tail.

### Supported encodings

| Key | Description
Expand Down
2 changes: 1 addition & 1 deletion operator/builtin/input/file/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func BenchmarkFileInput(b *testing.B) {
cfg.Include = []string{
"file*.log",
}
cfg.MaxConcurrentFiles = 1
cfg.MaxConcurrentFiles = 2
return cfg
},
},
Expand Down
4 changes: 2 additions & 2 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
return nil, fmt.Errorf("`max_log_size` must be positive")
}

if c.MaxConcurrentFiles <= 0 {
return nil, fmt.Errorf("`max_concurrent_files` must be positive")
if c.MaxConcurrentFiles <= 1 {
return nil, fmt.Errorf("`max_concurrent_files` must be greater than 1")
}

if c.FingerprintSize == 0 {
Expand Down
59 changes: 44 additions & 15 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ type InputOperator struct {

persister operator.Persister

knownFiles []*Reader
queuedMatches []string
knownFiles []*Reader
queuedMatches []string
maxBatchFiles int
lastPollReaders []*Reader

startAtBeginning bool

Expand Down Expand Up @@ -85,6 +87,12 @@ func (f *InputOperator) Start(persister operator.Persister) error {
func (f *InputOperator) Stop() error {
f.cancel()
f.wg.Wait()
for _, reader := range f.lastPollReaders {
reader.Close()
}
for _, reader := range f.knownFiles {
reader.Close()
}
f.knownFiles = nil
f.cancel = nil
return nil
Expand Down Expand Up @@ -113,9 +121,10 @@ func (f *InputOperator) startPoller(ctx context.Context) {

// poll checks all the watched paths for new entries
func (f *InputOperator) poll(ctx context.Context) {
f.maxBatchFiles = f.MaxConcurrentFiles / 2
var matches []string
if len(f.queuedMatches) > f.MaxConcurrentFiles {
matches, f.queuedMatches = f.queuedMatches[:f.MaxConcurrentFiles], f.queuedMatches[f.MaxConcurrentFiles:]
if len(f.queuedMatches) > f.maxBatchFiles {
matches, f.queuedMatches = f.queuedMatches[:f.maxBatchFiles], f.queuedMatches[f.maxBatchFiles:]
} else {
if len(f.queuedMatches) > 0 {
matches, f.queuedMatches = f.queuedMatches, make([]string, 0)
Expand All @@ -130,16 +139,36 @@ func (f *InputOperator) poll(ctx context.Context) {
matches = getMatches(f.Include, f.Exclude)
if f.firstCheck && len(matches) == 0 {
f.Warnw("no files match the configured include patterns", "include", f.Include)
} else if len(matches) > f.MaxConcurrentFiles {
matches, f.queuedMatches = matches[:f.MaxConcurrentFiles], matches[f.MaxConcurrentFiles:]
} else if len(matches) > f.maxBatchFiles {
matches, f.queuedMatches = matches[:f.maxBatchFiles], matches[f.maxBatchFiles:]
}
}
}

readers := f.makeReaders(matches)
f.firstCheck = false

// Detect files that have been rotated out of matching pattern
lostReaders := make([]*Reader, 0, len(f.lastPollReaders))
OUTER:
for _, oldReader := range f.lastPollReaders {
for _, reader := range readers {
if reader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
}
lostReaders = append(lostReaders, oldReader)
}

var wg sync.WaitGroup
for _, reader := range lostReaders {
wg.Add(1)
go func(r *Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
}(reader)
}

for _, reader := range readers {
wg.Add(1)
go func(r *Reader) {
Expand All @@ -151,6 +180,13 @@ func (f *InputOperator) poll(ctx context.Context) {
// Wait until all the reader goroutines are finished
wg.Wait()

// Close all files
for _, reader := range f.lastPollReaders {
reader.Close()
}

f.lastPollReaders = readers

f.saveCurrent(readers)
f.syncLastPollFiles(ctx)
}
Expand Down Expand Up @@ -217,7 +253,7 @@ func (f *InputOperator) makeReaders(filesPaths []string) []*Reader {

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
OUTER:
for i := 0; i < len(fps); {
for i := 0; i < len(fps); i++ {
fp := fps[i]
if len(fp.FirstBytes) == 0 {
if err := files[i].Close(); err != nil {
Expand All @@ -227,13 +263,7 @@ OUTER:
fps = append(fps[:i], fps[i+1:]...)
files = append(files[:i], files[i+1:]...)
}

for j := 0; j < len(fps); j++ {
if i == j {
// Skip checking itself
continue
}

for j := i + 1; j < len(fps); j++ {
fp2 := fps[j]
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
// Exclude
Expand All @@ -245,7 +275,6 @@ OUTER:
continue OUTER
}
}
i++
}

readers := make([]*Reader, 0, len(fps))
Expand Down
19 changes: 12 additions & 7 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestReadExistingAndNewLogs(t *testing.T) {
t.Parallel()
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
operator.persister = testutil.NewMockPersister("test")
defer operator.Stop()

// Start with a file with an entry in it, and expect that entry
// to come through when we poll for the first time
Expand All @@ -134,6 +135,7 @@ func TestStartAtEnd(t *testing.T) {
cfg.StartAt = "end"
}, nil)
operator.persister = testutil.NewMockPersister("test")
defer operator.Stop()

temp := openTemp(t, tempDir)
writeString(t, temp, "testlog1\n")
Expand All @@ -156,9 +158,9 @@ func TestStartAtEndNewFile(t *testing.T) {
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
operator.persister = testutil.NewMockPersister("test")
operator.startAtBeginning = false
defer operator.Stop()

operator.poll(context.Background())

temp := openTemp(t, tempDir)
writeString(t, temp, "testlog1\ntestlog2\n")

Expand Down Expand Up @@ -205,6 +207,7 @@ func TestSplitWrite(t *testing.T) {
t.Parallel()
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
operator.persister = testutil.NewMockPersister("test")
defer operator.Stop()

temp := openTemp(t, tempDir)
writeString(t, temp, "testlog1")
Expand Down Expand Up @@ -429,10 +432,11 @@ func TestFileBatching(t *testing.T) {

files := 100
linesPerFile := 10
maxConcurrentFiles := 10
maxConcurrentFiles := 20
maxBatchFiles := maxConcurrentFiles / 2

expectedBatches := files / maxConcurrentFiles // assumes no remainder
expectedLinesPerBatch := maxConcurrentFiles * linesPerFile
expectedBatches := files / maxBatchFiles // assumes no remainder
expectedLinesPerBatch := maxBatchFiles * linesPerFile

expectedMessages := make([]string, 0, files*linesPerFile)
actualMessages := make([]string, 0, files*linesPerFile)
Expand All @@ -442,10 +446,11 @@ func TestFileBatching(t *testing.T) {
cfg.MaxConcurrentFiles = maxConcurrentFiles
},
func(out *testutil.FakeOutput) {
out.Received = make(chan *entry.Entry, expectedLinesPerBatch)
out.Received = make(chan *entry.Entry, expectedLinesPerBatch*2)
},
)
operator.persister = testutil.NewMockPersister("test")
defer operator.Stop()

temps := make([]*os.File, 0, files)
for i := 0; i < files; i++ {
Expand All @@ -465,7 +470,6 @@ func TestFileBatching(t *testing.T) {
// poll once so we can validate that files were batched
operator.poll(context.Background())
actualMessages = append(actualMessages, waitForN(t, logReceived, expectedLinesPerBatch)...)
expectNoMessagesUntil(t, logReceived, 10*time.Millisecond)
}

require.ElementsMatch(t, expectedMessages, actualMessages)
Expand All @@ -483,7 +487,6 @@ func TestFileBatching(t *testing.T) {
// poll once so we can validate that files were batched
operator.poll(context.Background())
actualMessages = append(actualMessages, waitForN(t, logReceived, expectedLinesPerBatch)...)
expectNoMessagesUntil(t, logReceived, 10*time.Millisecond)
}

require.ElementsMatch(t, expectedMessages, actualMessages)
Expand All @@ -493,13 +496,15 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {
t.Parallel()

operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
defer operator.Stop()

temp := openTemp(t, tempDir)
tempCopy := openFile(t, temp.Name())
fp, err := operator.NewFingerprint(temp)
require.NoError(t, err)
reader, err := operator.NewReader(temp.Name(), tempCopy, fp)
require.NoError(t, err)
defer reader.Close()

writeString(t, temp, "testlog1\n")
reader.ReadToEnd(context.Background())
Expand Down
14 changes: 6 additions & 8 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,6 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error {

// ReadToEnd will read until the end of the file
func (f *Reader) ReadToEnd(ctx context.Context) {
defer func() {
if err := f.file.Close(); err != nil {
f.Errorw("Failed to close", zap.Error(err))
}
}()

if _, err := f.file.Seek(f.Offset, 0); err != nil {
f.Errorw("Failed to seek", zap.Error(err))
return
Expand Down Expand Up @@ -122,8 +116,12 @@ func (f *Reader) ReadToEnd(ctx context.Context) {
}

// Close will close the file
func (f *Reader) Close() error {
return f.file.Close()
func (f *Reader) Close() {
if f.file != nil {
if err := f.file.Close(); err != nil {
f.Debugf("Problem closing reader", "Error", err.Error())
}
}
}

// Emit creates an entry with the decoded message and sends it to the next
Expand Down
55 changes: 51 additions & 4 deletions operator/builtin/input/file/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ import (
"github.com/open-telemetry/opentelemetry-log-collection/testutil"
)

const WINDOWS_OS = "windows"

func TestMultiFileRotate(t *testing.T) {
if runtime.GOOS == WINDOWS_OS {
// Windows has very poor support for moving active files, so rotation is less commonly used
// This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358
t.Skip()
}
t.Parallel()

getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) }
Expand Down Expand Up @@ -82,7 +89,7 @@ func TestMultiFileRotate(t *testing.T) {
}

func TestMultiFileRotateSlow(t *testing.T) {
if runtime.GOOS == "windows" {
if runtime.GOOS == WINDOWS_OS {
// Windows has very poor support for moving active files, so rotation is less commonly used
// This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358
t.Skip()
Expand Down Expand Up @@ -322,15 +329,19 @@ func TestRotation(t *testing.T) {
}

for _, tc := range cases {
t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false))
t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true))
if runtime.GOOS != WINDOWS_OS {
// Windows has very poor support for moving active files, so rotation is less commonly used
// This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358
t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false))
t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true))
}
t.Run(fmt.Sprintf("%s/CopyTruncateTimestamped", tc.name), tc.run(tc, true, false))
t.Run(fmt.Sprintf("%s/CopyTruncateSequential", tc.name), tc.run(tc, true, true))
}
}

func TestMoveFile(t *testing.T) {
if runtime.GOOS == "windows" {
if runtime.GOOS == WINDOWS_OS {
t.Skip("Moving files while open is unsupported on Windows")
}
t.Parallel()
Expand All @@ -355,6 +366,42 @@ func TestMoveFile(t *testing.T) {
expectNoMessages(t, logReceived)
}

func TestTrackMovedAwayFiles(t *testing.T) {
if runtime.GOOS == WINDOWS_OS {
t.Skip("Moving files while open is unsupported on Windows")
}
t.Parallel()
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
operator.persister = testutil.NewMockPersister("test")

temp1 := openTemp(t, tempDir)
writeString(t, temp1, "testlog1\n")
temp1.Close()

operator.poll(context.Background())
defer operator.Stop()

waitForMessage(t, logReceived, "testlog1")

// Wait until all goroutines are finished before renaming
operator.wg.Wait()

newDir := fmt.Sprintf("%s%s", tempDir[:len(tempDir)-1], "_new/")
err := os.Mkdir(newDir, 0777)
require.NoError(t, err)
newFileName := fmt.Sprintf("%s%s", newDir, "newfile.log")

err = os.Rename(temp1.Name(), newFileName)
require.NoError(t, err)

movedFile, err := os.OpenFile(newFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
writeString(t, movedFile, "testlog2\n")
operator.poll(context.Background())

waitForMessage(t, logReceived, "testlog2")
}

// TruncateThenWrite tests that, after a file has been truncated,
// any new writes are picked up
func TestTruncateThenWrite(t *testing.T) {
Expand Down
Loading

0 comments on commit cdbb6d6

Please sign in to comment.