Skip to content

Commit

Permalink
fix: fix from_beginning options behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
Wen Rex committed Nov 28, 2024
1 parent 2636612 commit 8c74e89
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 17 deletions.
2 changes: 1 addition & 1 deletion plugins/inputs/tail/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
##
files = ["/var/mymetrics.out"]

## Read file from beginning.
## Read file from beginning for new discovered files (without a persisted offset).
# from_beginning = false

## Whether file is a named pipe
Expand Down
44 changes: 28 additions & 16 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,32 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
return err
}

func (t *Tail) getSeekInfo(file string, fromBeginning bool) *tail.SeekInfo {
if t.Pipe {
return nil
}

if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
return &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
}

if fromBeginning {
return &tail.SeekInfo{
Whence: 0,
Offset: 0,
}
}

return &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}

func (t *Tail) tailNewFiles(fromBeginning bool) error {
var poll bool
if t.WatchMethod == "poll" {
Expand All @@ -180,21 +206,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
continue
}

var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
if offset, ok := t.offsets[file]; ok {
t.Log.Debugf("Using offset %d for %q", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
}
seek := t.getSeekInfo(file, fromBeginning)

tailer, err := tail.TailFile(file,
tail.Config{
Expand Down Expand Up @@ -379,7 +391,7 @@ func (t *Tail) receiver(parser telegraf.Parser, tailer *tail.Tail) {

func (t *Tail) Stop() {
for _, tailer := range t.tailers {
if !t.Pipe && !t.FromBeginning {
if !t.Pipe {
// store offset for resume
offset, err := tailer.Tell()
if err == nil {
Expand Down
60 changes: 60 additions & 0 deletions plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/tail"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -800,3 +802,61 @@ func TestStatePersistence(t *testing.T) {
require.True(t, ok, "state is not a map[string]int64")
require.Equal(t, expectedState, actualState)
}

func TestGetSeekInfo(t *testing.T) {
tests := []struct {
name string
offsets map[string]int64
file string
fromBeginning bool
expected *tail.SeekInfo
}{
{
name: "Offset exists",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
expected: &tail.SeekInfo{
Whence: 0,
Offset: 100,
},
},
{
name: "Ignore from beginning for already discoverd files",
offsets: map[string]int64{"test.log": 100},
file: "test.log",
fromBeginning: true,
expected: &tail.SeekInfo{
Whence: 0,
Offset: 100,
},
},
{
name: "From beginning",
file: "test.log",
fromBeginning: true,
expected: &tail.SeekInfo{
Whence: 0,
Offset: 0,
},
},
{
name: "Default case",
file: "test.log",
expected: &tail.SeekInfo{
Whence: 2,
Offset: 0,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
tail := NewTestTail()
tail.offsets = test.offsets
logger := &testutil.CaptureLogger{}
tail.Log = logger
seekInfo := tail.getSeekInfo(test.file, test.fromBeginning)
assert.Equal(t, test.expected, seekInfo)
})
}
}

0 comments on commit 8c74e89

Please sign in to comment.