From 8c74e8994bf313ed30da9b5724b5e1400cd7bcf8 Mon Sep 17 00:00:00 2001 From: Wen Rex <> Date: Thu, 28 Nov 2024 09:59:24 +0800 Subject: [PATCH] fix: fix from_beginning options behavior --- plugins/inputs/tail/README.md | 2 +- plugins/inputs/tail/tail.go | 44 ++++++++++++++--------- plugins/inputs/tail/tail_test.go | 60 ++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 17 deletions(-) diff --git a/plugins/inputs/tail/README.md b/plugins/inputs/tail/README.md index 7e635812916ae..e4948b35d2961 100644 --- a/plugins/inputs/tail/README.md +++ b/plugins/inputs/tail/README.md @@ -56,7 +56,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## files = ["/var/mymetrics.out"] - ## Read file from beginning. + ## Read file from beginning for new discovered files (without a persisted offset). # from_beginning = false ## Whether file is a named pipe diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index c2d8ac6589781..48c36a13a162d 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -162,6 +162,32 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { return err } +func (t *Tail) getSeekInfo(file string, fromBeginning bool) *tail.SeekInfo { + if t.Pipe { + return nil + } + + if offset, ok := t.offsets[file]; ok { + t.Log.Debugf("Using offset %d for %q", offset, file) + return &tail.SeekInfo{ + Whence: 0, + Offset: offset, + } + } + + if fromBeginning { + return &tail.SeekInfo{ + Whence: 0, + Offset: 0, + } + } + + return &tail.SeekInfo{ + Whence: 2, + Offset: 0, + } +} + func (t *Tail) tailNewFiles(fromBeginning bool) error { var poll bool if t.WatchMethod == "poll" { @@ -180,21 +206,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { continue } - var seek *tail.SeekInfo - if !t.Pipe && !fromBeginning { - if offset, ok := t.offsets[file]; ok { - t.Log.Debugf("Using offset %d for %q", offset, file) - seek = &tail.SeekInfo{ - Whence: 0, - Offset: offset, - } - } else { - seek = &tail.SeekInfo{ - Whence: 2, - Offset: 0, - } - } - } + seek := t.getSeekInfo(file, fromBeginning) tailer, err := tail.TailFile(file, tail.Config{ @@ -379,7 +391,7 @@ func (t *Tail) receiver(parser telegraf.Parser, tailer *tail.Tail) { func (t *Tail) Stop() { for _, tailer := range t.tailers { - if !t.Pipe && !t.FromBeginning { + if !t.Pipe { // store offset for resume offset, err := tailer.Tell() if err == nil { diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index f954b7d8e0d3e..314c141060ab4 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -9,6 +9,8 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/influxdata/tail" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -800,3 +802,61 @@ func TestStatePersistence(t *testing.T) { require.True(t, ok, "state is not a map[string]int64") require.Equal(t, expectedState, actualState) } + +func TestGetSeekInfo(t *testing.T) { + tests := []struct { + name string + offsets map[string]int64 + file string + fromBeginning bool + expected *tail.SeekInfo + }{ + { + name: "Offset exists", + offsets: map[string]int64{"test.log": 100}, + file: "test.log", + expected: &tail.SeekInfo{ + Whence: 0, + Offset: 100, + }, + }, + { + name: "Ignore from beginning for already discoverd files", + offsets: map[string]int64{"test.log": 100}, + file: "test.log", + fromBeginning: true, + expected: &tail.SeekInfo{ + Whence: 0, + Offset: 100, + }, + }, + { + name: "From beginning", + file: "test.log", + fromBeginning: true, + expected: &tail.SeekInfo{ + Whence: 0, + Offset: 0, + }, + }, + { + name: "Default case", + file: "test.log", + expected: &tail.SeekInfo{ + Whence: 2, + Offset: 0, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tail := NewTestTail() + tail.offsets = test.offsets + logger := &testutil.CaptureLogger{} + tail.Log = logger + seekInfo := tail.getSeekInfo(test.file, test.fromBeginning) + assert.Equal(t, test.expected, seekInfo) + }) + } +}