Skip to content

Commit

Permalink
Resume from last known offset when reloading in tail input (influxdat…
Browse files Browse the repository at this point in the history
  • Loading branch information
sgtsquiggs authored and idohalevi committed Sep 23, 2020
1 parent f7acfd6 commit ed183d1
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 33 deletions.
88 changes: 72 additions & 16 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
package logparser

import (
"fmt"
"log"
"strings"
"sync"

"github.com/influxdata/tail"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
Expand All @@ -19,6 +21,11 @@ const (
defaultWatchMethod = "inotify"
)

var (
offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex)
)

// LogParser in the primary interface for the plugin
type GrokConfig struct {
MeasurementName string `toml:"measurement"`
Expand All @@ -42,6 +49,7 @@ type LogParserPlugin struct {
WatchMethod string

tailers map[string]*tail.Tail
offsets map[string]int64
lines chan logEntry
done chan struct{}
wg sync.WaitGroup
Expand All @@ -53,6 +61,20 @@ type LogParserPlugin struct {
GrokConfig GrokConfig `toml:"grok"`
}

func NewLogParser() *LogParserPlugin {
offsetsMutex.Lock()
offsetsCopy := make(map[string]int64, len(offsets))
for k, v := range offsets {
offsetsCopy[k] = v
}
offsetsMutex.Unlock()

return &LogParserPlugin{
WatchMethod: defaultWatchMethod,
offsets: offsetsCopy,
}
}

const sampleConfig = `
## Log files to parse.
## These accept standard unix glob matching rules, but with the addition of
Expand Down Expand Up @@ -161,18 +183,21 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.wg.Add(1)
go l.parser()

return l.tailNewfiles(l.FromBeginning)
err = l.tailNewfiles(l.FromBeginning)

// clear offsets
l.offsets = make(map[string]int64)
// assumption that once Start is called, all parallel plugins have already been initialized
offsetsMutex.Lock()
offsets = make(map[string]int64)
offsetsMutex.Unlock()

return err
}

// check the globs against files on disk, and start tailing any new files.
// Assumes l's lock is held!
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
var seek tail.SeekInfo
if !fromBeginning {
seek.Whence = 2
seek.Offset = 0
}

var poll bool
if l.WatchMethod == "poll" {
poll = true
Expand All @@ -182,7 +207,7 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
for _, filepath := range l.Files {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
log.Printf("E! [inputs.logparser] Error Glob %s failed to compile, %s", filepath, err)
continue
}
files := g.Match()
Expand All @@ -193,11 +218,27 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
continue
}

var seek *tail.SeekInfo
if !fromBeginning {
if offset, ok := l.offsets[file]; ok {
log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: &seek,
Location: seek,
MustExist: true,
Poll: poll,
Logger: tail.DiscardingLogger,
Expand Down Expand Up @@ -228,7 +269,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
for line = range tailer.Lines {

if line.Err != nil {
log.Printf("E! Error tailing file %s, Error: %s\n",
log.Printf("E! [inputs.logparser] Error tailing file %s, Error: %s",
tailer.Filename, line.Err)
continue
}
Expand Down Expand Up @@ -274,7 +315,7 @@ func (l *LogParserPlugin) parser() {
l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
log.Println("E! Error parsing log line: " + err.Error())
log.Println("E! [inputs.logparser] Error parsing log line: " + err.Error())
}

}
Expand All @@ -286,23 +327,38 @@ func (l *LogParserPlugin) Stop() {
defer l.Unlock()

for _, t := range l.tailers {
if !l.FromBeginning {
// store offset for resume
offset, err := t.Tell()
if err == nil {
l.offsets[t.Filename] = offset
log.Printf("D! [inputs.logparser] recording offset %d for file: %v", offset, t.Filename)
} else {
l.acc.AddError(fmt.Errorf("error recording offset for file %s", t.Filename))
}
}
err := t.Stop()

//message for a stopped tailer
log.Printf("D! tail dropped for file: %v", t.Filename)
log.Printf("D! [inputs.logparser] tail dropped for file: %v", t.Filename)

if err != nil {
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
log.Printf("E! [inputs.logparser] Error stopping tail on file %s", t.Filename)
}
}
close(l.done)
l.wg.Wait()

// persist offsets
offsetsMutex.Lock()
for k, v := range l.offsets {
offsets[k] = v
}
offsetsMutex.Unlock()
}

func init() {
inputs.Add("logparser", func() telegraf.Input {
return &LogParserPlugin{
WatchMethod: defaultWatchMethod,
}
return NewLogParser()
})
}
78 changes: 62 additions & 16 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"

"github.com/influxdata/tail"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
Expand All @@ -19,13 +20,19 @@ const (
defaultWatchMethod = "inotify"
)

var (
offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex)
)

type Tail struct {
Files []string
FromBeginning bool
Pipe bool
WatchMethod string

tailers map[string]*tail.Tail
offsets map[string]int64
parserFunc parsers.ParserFunc
wg sync.WaitGroup
acc telegraf.Accumulator
Expand All @@ -34,8 +41,16 @@ type Tail struct {
}

func NewTail() *Tail {
offsetsMutex.Lock()
offsetsCopy := make(map[string]int64, len(offsets))
for k, v := range offsets {
offsetsCopy[k] = v
}
offsetsMutex.Unlock()

return &Tail{
FromBeginning: false,
offsets: offsetsCopy,
}
}

Expand Down Expand Up @@ -87,18 +102,19 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
t.acc = acc
t.tailers = make(map[string]*tail.Tail)

return t.tailNewFiles(t.FromBeginning)
err := t.tailNewFiles(t.FromBeginning)

// clear offsets
t.offsets = make(map[string]int64)
// assumption that once Start is called, all parallel plugins have already been initialized
offsetsMutex.Lock()
offsets = make(map[string]int64)
offsetsMutex.Unlock()

return err
}

func (t *Tail) tailNewFiles(fromBeginning bool) error {
var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}

var poll bool
if t.WatchMethod == "poll" {
poll = true
Expand All @@ -108,14 +124,30 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
for _, filepath := range t.Files {
g, err := globpath.Compile(filepath)
if err != nil {
t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err))
t.acc.AddError(fmt.Errorf("glob %s failed to compile, %s", filepath, err))
}
for _, file := range g.Match() {
if _, ok := t.tailers[file]; ok {
// we're already tailing this file
continue
}

var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
if offset, ok := t.offsets[file]; ok {
log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Expand Down Expand Up @@ -159,8 +191,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
var line *tail.Line
for line = range tailer.Lines {
if line.Err != nil {
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err))
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err))
continue
}
// Fix up files with Windows line endings.
Expand Down Expand Up @@ -188,16 +219,15 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s",
tailer.Filename, line.Text, err))
}
}

log.Printf("D! [inputs.tail] tail removed for file: %v", tailer.Filename)

if err := tailer.Err(); err != nil {
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err))
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err))
}
}

Expand All @@ -206,13 +236,29 @@ func (t *Tail) Stop() {
defer t.Unlock()

for _, tailer := range t.tailers {
if !t.Pipe && !t.FromBeginning {
// store offset for resume
offset, err := tailer.Tell()
if err == nil {
log.Printf("D! [inputs.tail] recording offset %d for file: %v", offset, tailer.Filename)
} else {
t.acc.AddError(fmt.Errorf("error recording offset for file %s", tailer.Filename))
}
}
err := tailer.Stop()
if err != nil {
t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename))
t.acc.AddError(fmt.Errorf("error stopping tail on file %s", tailer.Filename))
}
}

t.wg.Wait()

// persist offsets
offsetsMutex.Lock()
for k, v := range t.offsets {
offsets[k] = v
}
offsetsMutex.Unlock()
}

func (t *Tail) SetParserFunc(fn parsers.ParserFunc) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestTailBadLine(t *testing.T) {
require.NoError(t, err)

acc.WaitError(1)
assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
assert.Contains(t, acc.Errors[0].Error(), "malformed log line")
}

func TestTailDosLineendings(t *testing.T) {
Expand Down

0 comments on commit ed183d1

Please sign in to comment.