From 2cfaaea2de1277d5d9720712d5b6fd567bcb2aa0 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Tue, 17 Oct 2023 06:39:00 -0600 Subject: [PATCH] [pkg/stanza] Fix issue where batching caused incorrect starting point (#27773) --- ...-fileconsumer-reader-from-end-batches.yaml | 27 ++++++++++ pkg/stanza/fileconsumer/file.go | 6 +-- pkg/stanza/fileconsumer/file_test.go | 50 +++++++++++++++++++ 3 files changed, 80 insertions(+), 3 deletions(-) create mode 100755 .chloggen/pkg-stanza-fileconsumer-reader-from-end-batches.yaml diff --git a/.chloggen/pkg-stanza-fileconsumer-reader-from-end-batches.yaml b/.chloggen/pkg-stanza-fileconsumer-reader-from-end-batches.yaml new file mode 100755 index 000000000000..7997f7242946 --- /dev/null +++ b/.chloggen/pkg-stanza-fileconsumer-reader-from-end-batches.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where batching of files could result in ignoring start_at setting. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27773] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 7c32574e19f7..4a2ebb0480c2 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -118,6 +118,9 @@ func (m *Manager) poll(ctx context.Context) { matches = matches[m.maxBatchFiles:] } m.consume(ctx, matches) + + // Any new files that appear should be consumed entirely + m.readerFactory.FromBeginning = true } func (m *Manager) consume(ctx context.Context, paths []string) { @@ -145,9 +148,6 @@ func (m *Manager) consume(ctx context.Context, paths []string) { } wg.Wait() - // Any new files that appear should be consumed entirely - m.readerFactory.FromBeginning = true - m.roller.roll(ctx, readers) m.saveCurrent(readers) m.syncLastPollFiles(ctx) diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 06aea9b2cf81..d0dfaca5dee1 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1098,6 +1098,56 @@ func TestFileBatching(t *testing.T) { } } +func TestFileBatchingRespectsStartAtEnd(t *testing.T) { + t.Parallel() + + initFiles := 10 + moreFiles := 10 + maxConcurrentFiles := 2 + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "end" + cfg.MaxConcurrentFiles = maxConcurrentFiles + + operator, emitChan := buildTestManager(t, cfg) + operator.persister = testutil.NewMockPersister("test") + + temps := make([]*os.File, 0, initFiles+moreFiles) + for i := 0; i < initFiles; i++ { + temps = append(temps, openTemp(t, tempDir)) + } + + // Write one log to each file + for i, temp := range temps { + message := fmt.Sprintf("file %d: %s", i, "written before start") + _, err := temp.WriteString(message + "\n") + require.NoError(t, err) + } + + // Poll and expect no logs + operator.poll(context.Background()) + expectNoTokens(t, emitChan) + + // Create some more files + for i := 0; i < moreFiles; i++ { + temps = append(temps, openTemp(t, tempDir)) + } + + // Write a log to each file + expectedTokens := make([][]byte, 0, initFiles+moreFiles) + for i, temp := range temps { + message := fmt.Sprintf("file %d: %s", i, "written after start") + _, err := temp.WriteString(message + "\n") + require.NoError(t, err) + expectedTokens = append(expectedTokens, []byte(message)) + } + + // Poll again and expect one line from each file. + operator.poll(context.Background()) + waitForTokens(t, emitChan, expectedTokens) +} + func TestFileReader_FingerprintUpdated(t *testing.T) { t.Parallel()