Skip to content

Commit

Permalink
[tail/logparser] resume from last known offset when reloading
Browse files Browse the repository at this point in the history
* Fixes #3522 - Not able to read rotated log file without missing lines
* Fixes #3573 - logparser/tail plugin stop after reload (removes unnecessary tailer.Cleanup() calls)
  • Loading branch information
sgtsquiggs committed Jul 5, 2019
1 parent 370d54b commit 8e159a8
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 22 deletions.
54 changes: 45 additions & 9 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package logparser

import (
"fmt"
"log"
"strings"
"sync"
Expand All @@ -19,6 +20,12 @@ const (
defaultWatchMethod = "inotify"
)

var (
offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex)
)


// LogParser in the primary interface for the plugin
type GrokConfig struct {
MeasurementName string `toml:"measurement"`
Expand Down Expand Up @@ -161,23 +168,27 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.wg.Add(1)
go l.parser()

return l.tailNewfiles(l.FromBeginning)
err = l.tailNewfiles(l.FromBeginning)

// clear resume offsets after starting
offsetsMutex.Lock()
offsets = make(map[string]int64)
offsetsMutex.Unlock()

return err
}

// check the globs against files on disk, and start tailing any new files.
// Assumes l's lock is held!
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
var seek tail.SeekInfo
if !fromBeginning {
seek.Whence = 2
seek.Offset = 0
}

var poll bool
if l.WatchMethod == "poll" {
poll = true
}

offsetsMutex.Lock()
defer offsetsMutex.Unlock()

// Create a "tailer" for each file
for _, filepath := range l.Files {
g, err := globpath.Compile(filepath)
Expand All @@ -193,11 +204,27 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
continue
}

var seek *tail.SeekInfo
if !fromBeginning {
if offset, ok := offsets[file]; ok {
log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: &seek,
Location: seek,
MustExist: true,
Poll: poll,
Logger: tail.DiscardingLogger,
Expand Down Expand Up @@ -286,6 +313,16 @@ func (l *LogParserPlugin) Stop() {
defer l.Unlock()

for _, t := range l.tailers {
if !l.FromBeginning {
// store offset for resume
offset, err := t.Tell()
if err == nil {
offsets[t.Filename] = offset
log.Printf("D! [inputs.logparser] recording offset %d for file: %v", offset, t.Filename)
} else {
l.acc.AddError(fmt.Errorf("E! Error recording offset for file %s\n", t.Filename))
}
}
err := t.Stop()

//message for a stopped tailer
Expand All @@ -294,7 +331,6 @@ func (l *LogParserPlugin) Stop() {
if err != nil {
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
}
t.Cleanup()
}
close(l.done)
l.wg.Wait()
Expand Down
58 changes: 45 additions & 13 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ const (
defaultWatchMethod = "inotify"
)

var (
offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex)
)

type Tail struct {
Files []string
FromBeginning bool
Expand Down Expand Up @@ -88,23 +93,25 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
t.acc = acc
t.tailers = make(map[string]*tail.Tail)

return t.tailNewFiles(t.FromBeginning)
err := t.tailNewFiles(t.FromBeginning)

// clear resume offsets after starting
offsetsMutex.Lock()
offsets = make(map[string]int64)
offsetsMutex.Unlock()

return err
}

func (t *Tail) tailNewFiles(fromBeginning bool) error {
var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}

var poll bool
if t.WatchMethod == "poll" {
poll = true
}

offsetsMutex.Lock()
defer offsetsMutex.Unlock()

// Create a "tailer" for each file
for _, filepath := range t.Files {
g, err := globpath.Compile(filepath)
Expand All @@ -117,6 +124,22 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
continue
}

var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
if offset, ok := offsets[file]; ok {
log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Expand Down Expand Up @@ -206,16 +229,25 @@ func (t *Tail) Stop() {
t.Lock()
defer t.Unlock()

offsetsMutex.Lock()
defer offsetsMutex.Unlock()

for _, tailer := range t.tailers {
if !t.Pipe && !t.FromBeginning {
// store offset for resume
offset, err := tailer.Tell()
if err == nil {
offsets[tailer.Filename] = offset
log.Printf("D! [inputs.tail] recording offset %d for file: %v", offset, tailer.Filename)
} else {
t.acc.AddError(fmt.Errorf("E! Error recording offset for file %s\n", tailer.Filename))
}
}
err := tailer.Stop()
if err != nil {
t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename))
}
}

for _, tailer := range t.tailers {
tailer.Cleanup()
}
t.wg.Wait()
}

Expand Down

0 comments on commit 8e159a8

Please sign in to comment.