-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move filebeat to new publisher pipeline #4644
Conversation
cbb9b68
to
03c57bf
Compare
filebeat/harvester/forwarder.go
Outdated
Module string `config:"_module_name"` // hidden option to set the module name | ||
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name | ||
Processors processors.PluginConfig `config:"processors"` | ||
Type string `config:"type"` | ||
} | ||
|
||
// NewForwarder creates a new forwarder instances and initialises processors if configured | ||
func NewForwarder(cfg *common.Config, outlet Outlet) (*Forwarder, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: the forwarder is going to be removed in future iterations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I feels strange to get rid of the spooler but it's a great step forward to have this now in the publisher, means other beats can make use of it too. Small step for beats, big step for filebeat :-)
During review I left a few comments/questions that popped up in my head. Most of them got answered when reading more code but it would be good if you could have a quick look at it and just confirm that this is the case.
filebeat/beater/filebeat.go
Outdated
// Stopping publisher (might potentially drop items) | ||
defer func() { | ||
// Closes first the registrar logger to make sure not more events arrive at the registrar | ||
// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher | ||
registrarChannel.Close() | ||
publisher.Stop() | ||
defer close(outDone) // finally close all active connections to publisher pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a defer
inside the defer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ups, copy'n paste. Interestingly the effect will be the same :)
Module string `config:"_module_name"` // hidden option to set the module name | ||
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name | ||
Processors processors.PluginConfig `config:"processors"` | ||
Type string `config:"type"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand why type
is still here and all the others were moved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the forwarder is subject to be removed in future iterations. It's for the PutValue when publishing. In the meantime I added ClientConfig.Fields
to register fields to be added to every event on publish.
Basically the Forwarder will be remove in favor of the channel.Outleter
, which will be removed in the future, in favor if beat.Client
(once we get a more sane registry handling).
filebeat/harvester/forwarder.go
Outdated
|
||
// run the filters before sending to spooler | ||
data.Event = f.Processors.RunBC(data.Event) | ||
data.Event.PutValue("prospector.type", f.Config.Type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also add the type in the outlet? Probably we need the type in other places too?
filebeat/harvester/forwarder.go
Outdated
} | ||
|
||
ok := f.Outlet.OnEventSignal(data) | ||
ok := f.Outlet.OnEvent(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own note: I must check that there are still 2 different methods here, one blocking and one non blocking to make sure harvester is only closed when data acked (or in queue ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the harvester get's a publishState
, which uses the stateOutlet. The stateOutlet (owned by the prospector) and the Outlet of the harvesters are closed on different signals, each... so many hoops...
filebeat/prospector/log/harvester.go
Outdated
var err error | ||
|
||
outlet = channel.CloseOnSignal(outlet, h.done) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment on line 113 is now in the wrong place. I put this one as early as possible because of our still unsovled race condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixing.
@@ -43,27 +42,31 @@ def test_shutdown_wait_ok(self): | |||
|
|||
# Wait until first flush | |||
self.wait_until( | |||
lambda: self.log_contains_count("Flushing spooler") > 1, | |||
lambda: self.log_contains_count("Publish event") > 200, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with setting here 200 instead of one is that the likelyhood of all log lines being already read increases. I want to try to shut down as early as possible. What is the reason you increased this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before it was checking for 'Flushing spooler' message. which can be any number of events (up to spooler size). As spooler and any kind of flushing is gone, I have had to use a replacement on number of events being published. The test file contains like 50k events. 200 should be ok, by we can decrease it.
|
||
@unittest.skip("Skipping unreliable test") | ||
# we allow for a potential race in the harvester shutdown here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see outlet.OnEvent
in channel/outlet.go.
// Note: race condition on shutdown:
// The underlying beat.Client is asynchronous. Without proper ACK
// handler we can not tell if the event made it 'through' or the client
// close has been completed before sending. In either case,
// we report 'false' here, indicating the event eventually being dropped.
// Returning false here, prevents the harvester from updating the state
// to the most recently published events. Therefore, on shutdown the harvester
// might report an old/outdated state update to the registry, overwriting the
// most recently published offset in the registry on shutdown.
result[k] = innerMap.Clone() | ||
} else { | ||
result[k] = v | ||
if innerMap, ok := tryToMapStr(v); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of surprised to see a mapstr change in this PR. Why was that needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ups, kind of leaked into this PR :)
I found the old processors ordering potentially overwriting a globally shared MapStr and was forced to introduce some Clone()
on shared MapStr instances. So the processors adding/removing fields always operate on copies, but not the original MapStr. The amount of fmt.Errorf
was so expensive it did totally kill throughput.
@@ -219,9 +219,9 @@ func (b *Broker) eventLoop() { | |||
events = b.events | |||
} | |||
|
|||
b.logger.Debug("active events: ", activeEvents) | |||
// b.logger.Debug("active events: ", activeEvents) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on purpose commented out? also below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, loads of debugs commented out in the fast path. I did still keep them, so I can re-enabled them If I really need them.
@@ -264,7 +264,7 @@ func (b *eventBuffer) Len() int { | |||
} | |||
|
|||
func (b *eventBuffer) Set(idx int, event publisher.Event, st clientState) { | |||
b.logger.Debugf("insert event: idx=%v, seq=%v\n", idx, st.seq) | |||
// b.logger.Debugf("insert event: idx=%v, seq=%v\n", idx, st.seq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment?
- remove filebeat/spooler and filebeat/publisher package -> all spooling and reporting published events is moved to the publisher pipeline. Difference between spooler/publisher to new pipeline is: The new publisher pipeline operates fully asynchronous - have filebeat register an eventACKer with the publisher pipeline. The eventACKer will forward state updates of ACKed events to the registrar - filebeat uses beat.Event for events - update util.Data to use beat.Event: - store state in event.Private field for consumption by registry - changes to filebeat/channels package: - introduce OutletFactory: connect to publisher pipeline, applying common prospector settings - remove Outleter.SetSignal and Outleter.OnEventSignal - add Outleter.Close - introduce SubOutlet (separate closing): - when a suboutlet is closed, the original outlet is still active - if underlying outlet is closed, the suboutlet becomes closed as well (can not forward anymore) - introduce CloseOnSignal: close Outlet once a 'done' channel is closed - most functionality from harvester.Forwarder is moved into the outlet/publisher pipeline client - fix: ensure client events listener properly installed Note: Outlet shutdown with prospectors and harvesters is somewhat delicate. There are 3 shutdown signals to take into account: - filebeat.done - harvester.done - outDone (signal used to unblock prospectors from registrar on shutdown). An outlet is shared between all harvesters of a prospector and the prospector itself. If outDone is closed, all outlets will be closed, unblocking potentially waiting harvesters and prosepectors on filebeat shutdown. The prospector uses a sub-outlet for sending state updates (being closed on filebeat.done). The harvesters sub-outlet is closed when harveser.done is closed. The signals are only required to unblock an harvester/prospector on exit. On normal shutdown, the outlets are closed after all workers have been finished.
reporting published events is moved to the publisher pipeline.
Difference between spooler/publisher to new pipeline is:
The new publisher pipeline operates fully asynchronous
The eventACKer will forward state updates of ACKed events to the registrar
prospector settings
not forward anymore)
outlet/publisher pipeline client
Note:
Outlet shutdown with prospectors and harvesters is somewhat delicate. There are
3 shutdown signals to take into account:
An outlet is shared between all harvesters of a prospector and the prospector
itself. If outDone is closed, all outlets will be closed, unblocking
potentially waiting harvesters and prosepectors on filebeat shutdown.
The prospector uses a sub-outlet for sending state updates (being closed on
filebeat.done). The harvesters sub-outlet is closed when harveser.done is
closed.
The signals are only required to unblock an harvester/prospector on exit. On
normal shutdown, the outlets are closed after all workers have been finished.