Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update for "multiline processing for tail input plugin" #7309

Closed
wants to merge 11 commits into from

Conversation

semigroupoid
Copy link
Contributor

@semigroupoid semigroupoid commented Apr 10, 2020

This PR contains the content of the existing PR 5603 but rebased on top of the current master branch of this repository.

Required for all PRs:

  • Signed CLA.
  • Associated README.md updated.
  • Has appropriate unit tests.

@zibuyu1995
Copy link

@ssoroka Hello, would really like to see this PR merged and released. Any idea on when that may be ?

Copy link
Contributor

@ssoroka ssoroka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking up this PR and running with it!
I'm mainly concerned about the new code branch with a second receiver function. I know this wasn't your design, but maybe we can improve it. See notes below.

FYI if you want help getting this over the line, just say the word and we will jump in where/when possible.

return nil, err
}
if m.Timeout == nil || m.Timeout.Duration.Nanoseconds() == int64(0) {
duration, _ := time.ParseDuration("5s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like 5 * time.Second will give you type safety here.

## The pattern should be a regexp which matches what you believe to be an indicator that the field is part of an event consisting of multiple lines of log data.
#pattern = "^\s"

## The what must be previous or next and indicates the relation to the multi-line event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be clearer. "what" is already not a great name (I understand it was borrowed from elastic; something like "MatchWhichLine" would be better). You could add some context and examples; eg "previous means any line matching the pattern belongs to the previous line"

Copy link
Contributor Author

@semigroupoid semigroupoid Apr 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on the unclear naming. I renamed the config element and added more detailed description for the two possible values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the docs to match? It looks like they still say "what" in multiple places. Need to keep the docs in sync with the sample config


acc telegraf.TrackingAccumulator

sync.Mutex
Copy link
Contributor

@ssoroka ssoroka Apr 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this was removed in another commit and came back from the merge (rebase is safer in this respect). We can remove this line.

go func() {
defer t.wg.Done()
t.receiver(parser, tailer)
if t.multiline.IsEnabled() {
go t.receiverMultiline(parser, tailer)
Copy link
Contributor

@ssoroka ssoroka Apr 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like that the conditional around receiverMultiline forks the behavior here (I'm not talking about the go routine). It definitely adds a lot of code complexity and likely introduces bugs in how the two receivers process input. eg, search for "Block until plugin is stopping or room is available to add metrics" and you'll find that this other receiver code has already changed, and multiline isn't supporting coordination with downstream outputs to not overwhelm them. Now the multiline receiver is out of sync and won't respect this behavior. In the best case scenario, future updates will have to update two versions of the code.

How difficult do you think it'd be to merge the two receiver functions? Any concerns?

var firstLine = true
for {
var line *tail.Line
timer := time.NewTimer(t.MultilineConfig.Timeout.Duration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can define this once and then do Reset(t.MultilineConfig.Timeout.Duration)

Merge receiver functions in the tail input plugin.
@semigroupoid
Copy link
Contributor Author

Hi @ssoroka, thanks for your detailed review. I have merged the two receiver functions into one, respecting the changes regarding downstream consumers.

@semigroupoid semigroupoid requested a review from ssoroka April 13, 2020 23:02
@@ -246,12 +247,12 @@ func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) {

acc := testutil.Accumulator{}
assert.NoError(t, tt.Start(&acc))
time.Sleep(20) // will force timeout
time.Sleep(5 * time.Second) // will force timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand these sleeps in the tests. Are we really expecting this test to sleep for 5x2 seconds?

plugins/inputs/tail/tail.go Show resolved Hide resolved
Copy link
Contributor

@ssoroka ssoroka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think we're good here from my view. I'd like to test it locally and get one more review before merging.

Thanks so much for the work you've put into this; this is one of our most popular plugins, and it is much appreciated!

@ssoroka ssoroka requested a review from danielnelson April 22, 2020 15:09
@zibuyu1995
Copy link

When you plan to merge this PR?

@ssoroka
Copy link
Contributor

ssoroka commented May 15, 2020

Waiting on a review from @danielnelson poke

@zibuyu1995
Copy link

@danielnelson We‘d really appreciate if you could review the code.

@semigroupoid
Copy link
Contributor Author

@ssoroka, @binchow-ai I've fixed the conflicts with the current master branch, this should be ready to be merged now.

@kemuning
Copy link

kemuning commented Aug 7, 2020

Any example of the syntax how to use this multiline feature ?

@zibuyu1995
Copy link

@kemuning
This is my use case:

[[inputs.tail]]
  files = ["/opt/emqx/log/emqx.log.[1-9]"]
  from_beginning = false
  data_format = "grok"
  grok_patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} \[%{LOGLEVEL:level}\] (?m)%{GREEDYDATA:messages}$']
  [inputs.tail.multiline]
   pattern = '^%{TIMESTAMP_ISO8601:timestamp:ts-"2006-01-02 15:04:05.000"} \[%{LOGLEVEL:level}\]'
   match_which_line = "previous"
   negate=true
   timeout = "1s"

@kemuning
Copy link

@zibuyu1995 Thanks.

@kemuning
Copy link

kemuning commented Aug 10, 2020

@zibuyu1995 Strange ... I have this config :

[inputs.tail.multiline]
   pattern = '^%{GREEDYDATA:logmessage}'
   match_which_line = "previous"
   negate=true
   timeout = "1s"

but then I got an error Error parsing tail, line 70: field corresponding to `multiline' is not defined in tail.Tail , where line 70 is [inputs.tail.multiline].

I am using telegraf 1.15.2. Any idea ?

@zibuyu1995
Copy link

@kemuning No, version 1.5.2 does not support multiline feature , you need to select https://github.com/semigroupoid/telegraf

@tamilselvan-v
Copy link

Is there any timeline when will this PR be merged ?

Comment on lines +155 to +157
_, err = tmpfile.WriteString(" ")
require.NoError(t, err)
require.NoError(t, tmpfile.Sync())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to understand more about why this is necessary.

## The pattern should be a regexp which matches what you believe to be an indicator that the field is part of an event consisting of multiple lines of log data.
#pattern = "^\s"

## The what must be previous or next and indicates the relation to the multi-line event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the docs to match? It looks like they still say "what" in multiple places. Need to keep the docs in sync with the sample config

Comment on lines 116 to 120
case `PREVIOUS`:
fallthrough
case `"PREVIOUS"`:
fallthrough
case `'PREVIOUS'`:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just fyi, you can do this:

Suggested change
case `PREVIOUS`:
fallthrough
case `"PREVIOUS"`:
fallthrough
case `'PREVIOUS'`:
case `PREVIOUS`, `"PREVIOUS"`, `'PREVIOUS'`:

Comment on lines 297 to 298
case <-t.ctx.Done():
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't remove this, or the plugin can hang and not shutdown properly.

channelOpen := true

for {
var line *tail.Line
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the compiler does here, but it looks like this is being reallocated every iteration, where we could just be setting it to nil.

## The negate field can be true or false (defaults to false).
## If true, a message not matching the pattern will constitute a match of the multiline
## filter and the what will be applied. (vice-versa is also true)
#negate = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about changing this name to invert_match?

@ssoroka
Copy link
Contributor

ssoroka commented Aug 19, 2020

Would like to see a few more things before this gets merged. Also if anyone is really interested in it, please check out this branch and build it, try it out locally and see how it works for you.

@lsitzmann
Copy link

Hello everybody interested in this feature.

we are using this feature (build locally) in a production environment for more than one year by now.
We are gathering logfiles from about 20 machines. Currently at a rate of about 250k lines per hour.
About 1% of the lines are "multilines".
The feature simply works without any problems.
I really wouldn't want to miss this feature and I'm looking forward to switch to an officially supported release.

So please merge this PR soon.

@ssoroka
Copy link
Contributor

ssoroka commented Aug 20, 2020

@lsitzmann I really appreciate hearing about your experience! Your vote of confidence means a lot.
@semigroupoid I'm happy to make any remaining changes if you'd like me to get it merged in quickly.

@tamilselvan-v
Copy link

Is there any updates on this PR ?

@sjwang90
Copy link
Contributor

Hey @semigroupoid, would you like make the changes @ssoroka mentioned above. If anyone would like to check out this branch and complete the fixes that would be awesome too. We'd love to get this merged in before the next release.

@ssoroka
Copy link
Contributor

ssoroka commented Sep 22, 2020

Look like this introduces some kind of race in TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer test.

panic: test timed out after 10m0s

goroutine 65 [running]:
testing.(*M).startAlarm.func1()
	/usr/local/go/src/testing/testing.go:1377 +0x11c
created by time.goFunc
	/usr/local/go/src/time/sleep.go:168 +0x52

goroutine 1 [chan receive, 9 minutes]:
testing.(*T).Run(0xc0001ac000, 0xc2d855, 0x43, 0xc368c8, 0x1)
	/usr/local/go/src/testing/testing.go:961 +0x68a
testing.runTests.func1(0xc0001ac000)
	/usr/local/go/src/testing/testing.go:1202 +0xa7
testing.tRunner(0xc0001ac000, 0xc00015fd40)
	/usr/local/go/src/testing/testing.go:909 +0x19a
testing.runTests(0xc000106760, 0x1097300, 0x16, 0x16, 0x0)
	/usr/local/go/src/testing/testing.go:1200 +0x522
testing.(*M).Run(0xc000154400, 0x0)
	/usr/local/go/src/testing/testing.go:1117 +0x300
main.main()
	_testmain.go:86 +0x224

goroutine 146 [sync.Cond.Wait, 9 minutes]:
runtime.goparkunlock(...)
	/usr/local/go/src/runtime/proc.go:310
sync.runtime_notifyListWait(0xc0004fa050, 0x3)
	/usr/local/go/src/runtime/sema.go:510 +0xf8
sync.(*Cond).Wait(0xc0004fa040)
	/usr/local/go/src/sync/cond.go:56 +0x8e
github.com/influxdata/telegraf/testutil.(*Accumulator).Wait(0xc000480000, 0x4)
	/go/src/github.com/influxdata/telegraf/testutil/accumulator.go:333 +0xb8
github.com/influxdata/telegraf/plugins/inputs/tail.TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(0xc000474000)
	/go/src/github.com/influxdata/telegraf/plugins/inputs/tail/tail_test.go:237 +0x4b9
testing.tRunner(0xc000474000, 0xc368c8)
	/usr/local/go/src/testing/testing.go:909 +0x19a
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:960 +0x652

goroutine 50 [select, 9 minutes]:
github.com/influxdata/tail/watch.(*InotifyTracker).run(0xc000242040)
	/go/pkg/mod/github.com/influxdata/[email protected]/watch/inotify_tracker.go:224 +0x2f4
created by github.com/influxdata/tail/watch.glob..func1
	/go/pkg/mod/github.com/influxdata/[email protected]/watch/inotify_tracker.go:54 +0x28a

goroutine 31 [syscall, 9 minutes]:
syscall.Syscall6(0xe8, 0x7, 0xc0001eeb64, 0x7, 0xffffffffffffffff, 0x0, 0x0, 0xc000036500, 0xc0001eeae8, 0x46107c)
	/usr/local/go/src/syscall/asm_linux_amd64.s:44 +0x5
golang.org/x/sys/unix.EpollWait(0x7, 0xc0001eeb64, 0x7, 0x7, 0xffffffffffffffff, 0x8ce01c, 0x468430, 0x411a90)
	/go/pkg/mod/golang.org/x/[email protected]/unix/zsyscall_linux_amd64.go:1881 +0x86
gopkg.in/fsnotify%2ev1.(*fdPoller).wait(0xc000122740, 0xc000008000, 0x1, 0x10c4520)
	/go/pkg/mod/gopkg.in/[email protected]/inotify_poller.go:86 +0xb9
gopkg.in/fsnotify%2ev1.(*Watcher).readEvents(0xc000118a00)
	/go/pkg/mod/gopkg.in/[email protected]/inotify.go:192 +0x2b2
created by gopkg.in/fsnotify%2ev1.NewWatcher
	/go/pkg/mod/gopkg.in/[email protected]/inotify.go:59 +0x31d
FAIL	github.com/influxdata/telegraf/plugins/inputs/tail	600.021s

@sjwang90 sjwang90 added this to the 1.16.0 milestone Sep 22, 2020
@ssoroka
Copy link
Contributor

ssoroka commented Sep 22, 2020

Ok, I have a fix for this failure. I've moved it into another PR to keep it clean. #8167

@ssoroka ssoroka mentioned this pull request Sep 22, 2020
@ssoroka
Copy link
Contributor

ssoroka commented Sep 28, 2020

merged in #8190

@ssoroka ssoroka closed this Sep 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants