diff --git a/.chloggen/otlpjsonfilereceiver-replay.yaml b/.chloggen/otlpjsonfilereceiver-replay.yaml new file mode 100755 index 000000000000..6adb63fb5f8c --- /dev/null +++ b/.chloggen/otlpjsonfilereceiver-replay.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otlpjsonfilereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add a replay_file config option to support replaying static telemetry + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31533] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 02de7d59e9e7..26fc161a8418 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -163,6 +163,12 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback, opts ...Opt DeleteAtEOF: c.DeleteAfterRead, } + var t tracker.Tracker + if o.noTracking { + t = tracker.NewNoStateTracker(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2) + } else { + t = tracker.NewFileTracker(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2) + } return &Manager{ SugaredLogger: logger.With("component", "fileconsumer"), readerFactory: readerFactory, @@ -170,7 +176,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback, opts ...Opt pollInterval: c.PollInterval, maxBatchFiles: c.MaxConcurrentFiles / 2, maxBatches: c.MaxBatches, - tracker: tracker.New(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2), + tracker: t, }, nil } @@ -229,7 +235,8 @@ func (c Config) validate() error { } type options struct { - splitFunc bufio.SplitFunc + splitFunc bufio.SplitFunc + noTracking bool } type Option func(*options) @@ -240,3 +247,11 @@ func WithSplitFunc(f bufio.SplitFunc) Option { o.splitFunc = f } } + +// WithNoTracking forces the readerFactory to not keep track of files in memory. When used, the reader will +// read from the beginning of each file every time it is polled. +func WithNoTracking() Option { + return func(o *options) { + o.noTracking = true + } +} diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 94b72b132b71..d1b4f34fa44c 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -27,7 +27,7 @@ type Manager struct { readerFactory reader.Factory fileMatcher *matcher.Matcher - tracker *tracker.Tracker + tracker tracker.Tracker pollInterval time.Duration persister operator.Persister @@ -129,8 +129,11 @@ func (m *Manager) poll(ctx context.Context) { // Any new files that appear should be consumed entirely m.readerFactory.FromBeginning = true if m.persister != nil { - if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil { - m.Errorw("save offsets", zap.Error(err)) + metadata := m.tracker.GetMetadata() + if metadata != nil { + if err := checkpoint.Save(context.Background(), m.persister, metadata); err != nil { + m.Errorw("save offsets", zap.Error(err)) + } } } // rotate at end of every poll() @@ -219,7 +222,7 @@ func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) } - // Cleck for closed files for match + // Check for closed files for match if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil { return m.readerFactory.NewReaderFromMetadata(file, oldMetadata) } diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 7017516fed95..51c45f0a638f 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1423,3 +1423,48 @@ func TestNoLostPartial(t *testing.T) { return foundSameFromOtherFile && foundNewFromFileOne }, time.Second, 100*time.Millisecond) } + +func TestNoTracking(t *testing.T) { + testCases := []struct { + testName string + noTracking bool + expectReplay bool + }{ + {"tracking_enabled", false, false}, + {"tracking_disabled", true, true}, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg.PollInterval = 1000 * time.Hour // We control the polling within the test. + + opts := make([]Option, 0) + if tc.noTracking { + opts = append(opts, WithNoTracking()) + } + operator, sink := testManager(t, cfg, opts...) + + temp := filetest.OpenTemp(t, tempDir) + filetest.WriteString(t, temp, " testlog1 \n") + + require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister())) + defer func() { + require.NoError(t, operator.Stop()) + }() + + operator.poll(context.Background()) + sink.ExpectToken(t, []byte("testlog1")) + + // Poll again and see if the file is replayed. + operator.poll(context.Background()) + if tc.expectReplay { + sink.ExpectToken(t, []byte("testlog1")) + } else { + sink.ExpectNoCalls(t) + } + }) + } +} diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go index 6c255f2fc02b..b371a3b873c1 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -11,7 +11,24 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" ) -type Tracker struct { +// Interface for tracking files that are being consumed. +type Tracker interface { + Add(reader *reader.Reader) + GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader + GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader + GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata + GetMetadata() []*reader.Metadata + LoadMetadata(metadata []*reader.Metadata) + CurrentPollFiles() []*reader.Reader + PreviousPollFiles() []*reader.Reader + ClosePreviousFiles() + EndPoll() + EndConsume() + TotalReaders() int +} + +// fileTracker tracks known offsets for files that are being consumed by the manager. +type fileTracker struct { *zap.SugaredLogger maxBatchFiles int @@ -21,13 +38,13 @@ type Tracker struct { knownFiles []*fileset.Fileset[*reader.Metadata] } -func New(logger *zap.SugaredLogger, maxBatchFiles int) *Tracker { +func NewFileTracker(logger *zap.SugaredLogger, maxBatchFiles int) Tracker { knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) for i := 0; i < len(knownFiles); i++ { knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles) } - return &Tracker{ - SugaredLogger: logger.With("component", "fileconsumer"), + return &fileTracker{ + SugaredLogger: logger.With("tracker", "fileTracker"), maxBatchFiles: maxBatchFiles, currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles), @@ -35,20 +52,20 @@ func New(logger *zap.SugaredLogger, maxBatchFiles int) *Tracker { } } -func (t *Tracker) Add(reader *reader.Reader) { +func (t *fileTracker) Add(reader *reader.Reader) { // add a new reader for tracking t.currentPollFiles.Add(reader) } -func (t *Tracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader { +func (t *fileTracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader { return t.currentPollFiles.Match(fp, fileset.Equal) } -func (t *Tracker) GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader { +func (t *fileTracker) GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader { return t.previousPollFiles.Match(fp, fileset.StartsWith) } -func (t *Tracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata { +func (t *fileTracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata { for i := 0; i < len(t.knownFiles); i++ { if oldMetadata := t.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil { return oldMetadata @@ -57,7 +74,7 @@ func (t *Tracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata { return nil } -func (t *Tracker) GetMetadata() []*reader.Metadata { +func (t *fileTracker) GetMetadata() []*reader.Metadata { // return all known metadata for checkpoining allCheckpoints := make([]*reader.Metadata, 0, t.TotalReaders()) for _, knownFiles := range t.knownFiles { @@ -70,19 +87,19 @@ func (t *Tracker) GetMetadata() []*reader.Metadata { return allCheckpoints } -func (t *Tracker) LoadMetadata(metadata []*reader.Metadata) { +func (t *fileTracker) LoadMetadata(metadata []*reader.Metadata) { t.knownFiles[0].Add(metadata...) } -func (t *Tracker) CurrentPollFiles() []*reader.Reader { +func (t *fileTracker) CurrentPollFiles() []*reader.Reader { return t.currentPollFiles.Get() } -func (t *Tracker) PreviousPollFiles() []*reader.Reader { +func (t *fileTracker) PreviousPollFiles() []*reader.Reader { return t.previousPollFiles.Get() } -func (t *Tracker) ClosePreviousFiles() { +func (t *fileTracker) ClosePreviousFiles() { // t.previousPollFiles -> t.knownFiles[0] for r, _ := t.previousPollFiles.Pop(); r != nil; r, _ = t.previousPollFiles.Pop() { @@ -90,17 +107,69 @@ func (t *Tracker) ClosePreviousFiles() { } } -func (t *Tracker) EndPoll() { +func (t *fileTracker) EndPoll() { // shift the filesets at end of every poll() call // t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2] copy(t.knownFiles[1:], t.knownFiles) t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles) } -func (t *Tracker) TotalReaders() int { +func (t *fileTracker) TotalReaders() int { total := t.previousPollFiles.Len() for i := 0; i < len(t.knownFiles); i++ { total += t.knownFiles[i].Len() } return total } + +// noStateTracker only tracks the current polled files. Once the poll is +// complete and telemetry is consumed, the tracked files are closed. The next +// poll will create fresh readers with no previously tracked offsets. +type noStateTracker struct { + *zap.SugaredLogger + maxBatchFiles int + currentPollFiles *fileset.Fileset[*reader.Reader] +} + +func NewNoStateTracker(logger *zap.SugaredLogger, maxBatchFiles int) Tracker { + return &noStateTracker{ + SugaredLogger: logger.With("tracker", "noStateTracker"), + maxBatchFiles: maxBatchFiles, + currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), + } +} + +func (t *noStateTracker) Add(reader *reader.Reader) { + // add a new reader for tracking + t.currentPollFiles.Add(reader) +} + +func (t *noStateTracker) CurrentPollFiles() []*reader.Reader { + return t.currentPollFiles.Get() +} + +func (t *noStateTracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader { + return t.currentPollFiles.Match(fp, fileset.Equal) +} + +func (t *noStateTracker) EndConsume() { + for r, _ := t.currentPollFiles.Pop(); r != nil; r, _ = t.currentPollFiles.Pop() { + r.Close() + } +} + +func (t *noStateTracker) GetOpenFile(_ *fingerprint.Fingerprint) *reader.Reader { return nil } + +func (t *noStateTracker) GetClosedFile(_ *fingerprint.Fingerprint) *reader.Metadata { return nil } + +func (t *noStateTracker) GetMetadata() []*reader.Metadata { return nil } + +func (t *noStateTracker) LoadMetadata(_ []*reader.Metadata) {} + +func (t *noStateTracker) PreviousPollFiles() []*reader.Reader { return nil } + +func (t *noStateTracker) ClosePreviousFiles() {} + +func (t *noStateTracker) EndPoll() {} + +func (t *noStateTracker) TotalReaders() int { return 0 } diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker_other.go b/pkg/stanza/fileconsumer/internal/tracker/tracker_other.go index d23ebf9521b6..f1df53fe6783 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker_other.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker_other.go @@ -12,7 +12,7 @@ import ( // On non-windows platforms, we keep files open between poll cycles so that we can detect // and read "lost" files, which have been moved out of the matching pattern. -func (t *Tracker) EndConsume() { +func (t *fileTracker) EndConsume() { t.ClosePreviousFiles() // t.currentPollFiles -> t.previousPollFiles diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go b/pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go index 98cb0d6e57ea..75ce2d876d9c 100644 --- a/pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go @@ -12,7 +12,7 @@ import ( ) // On windows, we close files immediately after reading because they cannot be moved while open. -func (t *Tracker) EndConsume() { +func (t *fileTracker) EndConsume() { // t.currentPollFiles -> t.previousPollFiles t.previousPollFiles = t.currentPollFiles t.ClosePreviousFiles() diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index 550ed1ee0245..a3d233dc85c3 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -12,13 +12,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" ) -func testManager(t *testing.T, cfg *Config) (*Manager, *emittest.Sink) { +func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest.Sink) { sink := emittest.NewSink() - return testManagerWithSink(t, cfg, sink), sink + return testManagerWithSink(t, cfg, sink, opts...), sink } -func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink) *Manager { - input, err := cfg.Build(testutil.Logger(t), sink.Callback) +func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink, opts ...Option) *Manager { + input, err := cfg.Build(testutil.Logger(t), sink.Callback, opts...) require.NoError(t, err) t.Cleanup(func() { input.tracker.ClosePreviousFiles() }) return input diff --git a/receiver/otlpjsonfilereceiver/file.go b/receiver/otlpjsonfilereceiver/file.go index b8f8d243bf35..e9cad1010bf1 100644 --- a/receiver/otlpjsonfilereceiver/file.go +++ b/receiver/otlpjsonfilereceiver/file.go @@ -36,6 +36,7 @@ func NewFactory() receiver.Factory { type Config struct { fileconsumer.Config `mapstructure:",squash"` StorageID *component.ID `mapstructure:"storage"` + ReplayFile bool `mapstructure:"replay_file"` } func createDefaultConfig() component.Config { @@ -73,6 +74,10 @@ func createLogsReceiver(_ context.Context, settings receiver.CreateSettings, con return nil, err } cfg := configuration.(*Config) + opts := make([]fileconsumer.Option, 0) + if cfg.ReplayFile { + opts = append(opts, fileconsumer.WithNoTracking()) + } input, err := cfg.Config.Build(settings.Logger.Sugar(), func(ctx context.Context, token []byte, _ map[string]any) error { ctx = obsrecv.StartLogsOp(ctx) var l plog.Logs @@ -87,7 +92,7 @@ func createLogsReceiver(_ context.Context, settings receiver.CreateSettings, con obsrecv.EndLogsOp(ctx, metadata.Type.String(), logRecordCount, err) } return nil - }) + }, opts...) if err != nil { return nil, err } @@ -106,6 +111,10 @@ func createMetricsReceiver(_ context.Context, settings receiver.CreateSettings, return nil, err } cfg := configuration.(*Config) + opts := make([]fileconsumer.Option, 0) + if cfg.ReplayFile { + opts = append(opts, fileconsumer.WithNoTracking()) + } input, err := cfg.Config.Build(settings.Logger.Sugar(), func(ctx context.Context, token []byte, _ map[string]any) error { ctx = obsrecv.StartMetricsOp(ctx) var m pmetric.Metrics @@ -119,7 +128,7 @@ func createMetricsReceiver(_ context.Context, settings receiver.CreateSettings, obsrecv.EndMetricsOp(ctx, metadata.Type.String(), m.MetricCount(), err) } return nil - }) + }, opts...) if err != nil { return nil, err } @@ -138,6 +147,10 @@ func createTracesReceiver(_ context.Context, settings receiver.CreateSettings, c return nil, err } cfg := configuration.(*Config) + opts := make([]fileconsumer.Option, 0) + if cfg.ReplayFile { + opts = append(opts, fileconsumer.WithNoTracking()) + } input, err := cfg.Config.Build(settings.Logger.Sugar(), func(ctx context.Context, token []byte, _ map[string]any) error { ctx = obsrecv.StartTracesOp(ctx) var t ptrace.Traces @@ -151,7 +164,7 @@ func createTracesReceiver(_ context.Context, settings receiver.CreateSettings, c obsrecv.EndTracesOp(ctx, metadata.Type.String(), t.SpanCount(), err) } return nil - }) + }, opts...) if err != nil { return nil, err } diff --git a/receiver/otlpjsonfilereceiver/file_test.go b/receiver/otlpjsonfilereceiver/file_test.go index e985818e7331..45693fddfdef 100644 --- a/receiver/otlpjsonfilereceiver/file_test.go +++ b/receiver/otlpjsonfilereceiver/file_test.go @@ -89,6 +89,44 @@ func TestFileMetricsReceiver(t *testing.T) { assert.NoError(t, err) } +func TestFileMetricsReceiverWithReplay(t *testing.T) { + tempFolder := t.TempDir() + factory := NewFactory() + cfg := createDefaultConfig().(*Config) + cfg.Config.Include = []string{filepath.Join(tempFolder, "*")} + cfg.Config.StartAt = "beginning" + cfg.ReplayFile = true + cfg.Config.PollInterval = 5 * time.Second + + sink := new(consumertest.MetricsSink) + receiver, err := factory.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink) + assert.NoError(t, err) + err = receiver.Start(context.Background(), nil) + assert.NoError(t, err) + + md := testdata.GenerateMetrics(5) + marshaler := &pmetric.JSONMarshaler{} + b, err := marshaler.MarshalMetrics(md) + assert.NoError(t, err) + b = append(b, '\n') + err = os.WriteFile(filepath.Join(tempFolder, "metrics.json"), b, 0600) + assert.NoError(t, err) + + // Wait for the first poll to complete. + time.Sleep(cfg.Config.PollInterval + time.Second) + require.Len(t, sink.AllMetrics(), 1) + assert.EqualValues(t, md, sink.AllMetrics()[0]) + + // Reset the sink and assert that the next poll replays all the existing metrics. + sink.Reset() + time.Sleep(cfg.Config.PollInterval + time.Second) + require.Len(t, sink.AllMetrics(), 1) + assert.EqualValues(t, md, sink.AllMetrics()[0]) + + err = receiver.Shutdown(context.Background()) + assert.NoError(t, err) +} + func TestFileLogsReceiver(t *testing.T) { tempFolder := t.TempDir() factory := NewFactory()