Skip to content

Commit

Permalink
otlpjsonfilereceiver: add a replay_file config option to support hear…
Browse files Browse the repository at this point in the history
…tbeats (#31534)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This change adds a config option to allow users to replay static
telemetry at an interval (poll_interval is the configurable interval).

This is useful for some  usecases like:
- Sending a heartbeat log to a logging backend, as a signal that the
logging agent is alive
- Sending static metadata as metrics to metrics backend. The metadata
can be read using the `otlpjsonfilereceiver` that uses this package and
config.

**Link to tracking Issue:** 

#31533

**Testing:**
- Manual testing
- unit test for the fileconsumer
- test for the `otlpjsonfilereceiver`

**Documentation:** <Describe the documentation added.>

---------

Signed-off-by: Ridwan Sharif <[email protected]>
  • Loading branch information
ridwanmsharif authored Apr 18, 2024
1 parent c542a03 commit 5bdedd0
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 30 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otlpjsonfilereceiver-replay.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otlpjsonfilereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a replay_file config option to support replaying static telemetry

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31533]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
19 changes: 17 additions & 2 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,20 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback, opts ...Opt
DeleteAtEOF: c.DeleteAfterRead,
}

var t tracker.Tracker
if o.noTracking {
t = tracker.NewNoStateTracker(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2)
} else {
t = tracker.NewFileTracker(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2)
}
return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
readerFactory: readerFactory,
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
tracker: tracker.New(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2),
tracker: t,
}, nil
}

Expand Down Expand Up @@ -229,7 +235,8 @@ func (c Config) validate() error {
}

type options struct {
splitFunc bufio.SplitFunc
splitFunc bufio.SplitFunc
noTracking bool
}

type Option func(*options)
Expand All @@ -240,3 +247,11 @@ func WithSplitFunc(f bufio.SplitFunc) Option {
o.splitFunc = f
}
}

// WithNoTracking forces the readerFactory to not keep track of files in memory. When used, the reader will
// read from the beginning of each file every time it is polled.
func WithNoTracking() Option {
return func(o *options) {
o.noTracking = true
}
}
11 changes: 7 additions & 4 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Manager struct {

readerFactory reader.Factory
fileMatcher *matcher.Matcher
tracker *tracker.Tracker
tracker tracker.Tracker

pollInterval time.Duration
persister operator.Persister
Expand Down Expand Up @@ -129,8 +129,11 @@ func (m *Manager) poll(ctx context.Context) {
// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.Errorw("save offsets", zap.Error(err))
metadata := m.tracker.GetMetadata()
if metadata != nil {
if err := checkpoint.Save(context.Background(), m.persister, metadata); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
}
// rotate at end of every poll()
Expand Down Expand Up @@ -219,7 +222,7 @@ func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}

// Cleck for closed files for match
// Check for closed files for match
if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}
Expand Down
45 changes: 45 additions & 0 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,3 +1423,48 @@ func TestNoLostPartial(t *testing.T) {
return foundSameFromOtherFile && foundNewFromFileOne
}, time.Second, 100*time.Millisecond)
}

func TestNoTracking(t *testing.T) {
testCases := []struct {
testName string
noTracking bool
expectReplay bool
}{
{"tracking_enabled", false, false},
{"tracking_disabled", true, true},
}

for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.PollInterval = 1000 * time.Hour // We control the polling within the test.

opts := make([]Option, 0)
if tc.noTracking {
opts = append(opts, WithNoTracking())
}
operator, sink := testManager(t, cfg, opts...)

temp := filetest.OpenTemp(t, tempDir)
filetest.WriteString(t, temp, " testlog1 \n")

require.NoError(t, operator.Start(testutil.NewUnscopedMockPersister()))
defer func() {
require.NoError(t, operator.Stop())
}()

operator.poll(context.Background())
sink.ExpectToken(t, []byte("testlog1"))

// Poll again and see if the file is replayed.
operator.poll(context.Background())
if tc.expectReplay {
sink.ExpectToken(t, []byte("testlog1"))
} else {
sink.ExpectNoCalls(t)
}
})
}
}
99 changes: 84 additions & 15 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,24 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type Tracker struct {
// Interface for tracking files that are being consumed.
type Tracker interface {
Add(reader *reader.Reader)
GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader
GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader
GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata
GetMetadata() []*reader.Metadata
LoadMetadata(metadata []*reader.Metadata)
CurrentPollFiles() []*reader.Reader
PreviousPollFiles() []*reader.Reader
ClosePreviousFiles()
EndPoll()
EndConsume()
TotalReaders() int
}

// fileTracker tracks known offsets for files that are being consumed by the manager.
type fileTracker struct {
*zap.SugaredLogger

maxBatchFiles int
Expand All @@ -21,34 +38,34 @@ type Tracker struct {
knownFiles []*fileset.Fileset[*reader.Metadata]
}

func New(logger *zap.SugaredLogger, maxBatchFiles int) *Tracker {
func NewFileTracker(logger *zap.SugaredLogger, maxBatchFiles int) Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
}
return &Tracker{
SugaredLogger: logger.With("component", "fileconsumer"),
return &fileTracker{
SugaredLogger: logger.With("tracker", "fileTracker"),
maxBatchFiles: maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
knownFiles: knownFiles,
}
}

func (t *Tracker) Add(reader *reader.Reader) {
func (t *fileTracker) Add(reader *reader.Reader) {
// add a new reader for tracking
t.currentPollFiles.Add(reader)
}

func (t *Tracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader {
func (t *fileTracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader {
return t.currentPollFiles.Match(fp, fileset.Equal)
}

func (t *Tracker) GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader {
func (t *fileTracker) GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader {
return t.previousPollFiles.Match(fp, fileset.StartsWith)
}

func (t *Tracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata {
func (t *fileTracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata {
for i := 0; i < len(t.knownFiles); i++ {
if oldMetadata := t.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil {
return oldMetadata
Expand All @@ -57,7 +74,7 @@ func (t *Tracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata {
return nil
}

func (t *Tracker) GetMetadata() []*reader.Metadata {
func (t *fileTracker) GetMetadata() []*reader.Metadata {
// return all known metadata for checkpoining
allCheckpoints := make([]*reader.Metadata, 0, t.TotalReaders())
for _, knownFiles := range t.knownFiles {
Expand All @@ -70,37 +87,89 @@ func (t *Tracker) GetMetadata() []*reader.Metadata {
return allCheckpoints
}

func (t *Tracker) LoadMetadata(metadata []*reader.Metadata) {
func (t *fileTracker) LoadMetadata(metadata []*reader.Metadata) {
t.knownFiles[0].Add(metadata...)
}

func (t *Tracker) CurrentPollFiles() []*reader.Reader {
func (t *fileTracker) CurrentPollFiles() []*reader.Reader {
return t.currentPollFiles.Get()
}

func (t *Tracker) PreviousPollFiles() []*reader.Reader {
func (t *fileTracker) PreviousPollFiles() []*reader.Reader {
return t.previousPollFiles.Get()
}

func (t *Tracker) ClosePreviousFiles() {
func (t *fileTracker) ClosePreviousFiles() {
// t.previousPollFiles -> t.knownFiles[0]

for r, _ := t.previousPollFiles.Pop(); r != nil; r, _ = t.previousPollFiles.Pop() {
t.knownFiles[0].Add(r.Close())
}
}

func (t *Tracker) EndPoll() {
func (t *fileTracker) EndPoll() {
// shift the filesets at end of every poll() call
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles)
}

func (t *Tracker) TotalReaders() int {
func (t *fileTracker) TotalReaders() int {
total := t.previousPollFiles.Len()
for i := 0; i < len(t.knownFiles); i++ {
total += t.knownFiles[i].Len()
}
return total
}

// noStateTracker only tracks the current polled files. Once the poll is
// complete and telemetry is consumed, the tracked files are closed. The next
// poll will create fresh readers with no previously tracked offsets.
type noStateTracker struct {
*zap.SugaredLogger
maxBatchFiles int
currentPollFiles *fileset.Fileset[*reader.Reader]
}

func NewNoStateTracker(logger *zap.SugaredLogger, maxBatchFiles int) Tracker {
return &noStateTracker{
SugaredLogger: logger.With("tracker", "noStateTracker"),
maxBatchFiles: maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
}
}

func (t *noStateTracker) Add(reader *reader.Reader) {
// add a new reader for tracking
t.currentPollFiles.Add(reader)
}

func (t *noStateTracker) CurrentPollFiles() []*reader.Reader {
return t.currentPollFiles.Get()
}

func (t *noStateTracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader {
return t.currentPollFiles.Match(fp, fileset.Equal)
}

func (t *noStateTracker) EndConsume() {
for r, _ := t.currentPollFiles.Pop(); r != nil; r, _ = t.currentPollFiles.Pop() {
r.Close()
}
}

func (t *noStateTracker) GetOpenFile(_ *fingerprint.Fingerprint) *reader.Reader { return nil }

func (t *noStateTracker) GetClosedFile(_ *fingerprint.Fingerprint) *reader.Metadata { return nil }

func (t *noStateTracker) GetMetadata() []*reader.Metadata { return nil }

func (t *noStateTracker) LoadMetadata(_ []*reader.Metadata) {}

func (t *noStateTracker) PreviousPollFiles() []*reader.Reader { return nil }

func (t *noStateTracker) ClosePreviousFiles() {}

func (t *noStateTracker) EndPoll() {}

func (t *noStateTracker) TotalReaders() int { return 0 }
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/tracker/tracker_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// On non-windows platforms, we keep files open between poll cycles so that we can detect
// and read "lost" files, which have been moved out of the matching pattern.
func (t *Tracker) EndConsume() {
func (t *fileTracker) EndConsume() {
t.ClosePreviousFiles()

// t.currentPollFiles -> t.previousPollFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// On windows, we close files immediately after reading because they cannot be moved while open.
func (t *Tracker) EndConsume() {
func (t *fileTracker) EndConsume() {
// t.currentPollFiles -> t.previousPollFiles
t.previousPollFiles = t.currentPollFiles
t.ClosePreviousFiles()
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/fileconsumer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
)

func testManager(t *testing.T, cfg *Config) (*Manager, *emittest.Sink) {
func testManager(t *testing.T, cfg *Config, opts ...Option) (*Manager, *emittest.Sink) {
sink := emittest.NewSink()
return testManagerWithSink(t, cfg, sink), sink
return testManagerWithSink(t, cfg, sink, opts...), sink
}

func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink) *Manager {
input, err := cfg.Build(testutil.Logger(t), sink.Callback)
func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink, opts ...Option) *Manager {
input, err := cfg.Build(testutil.Logger(t), sink.Callback, opts...)
require.NoError(t, err)
t.Cleanup(func() { input.tracker.ClosePreviousFiles() })
return input
Expand Down
Loading

0 comments on commit 5bdedd0

Please sign in to comment.