Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #19632 to 7.x: Composable ACKer #19742

Merged
merged 1 commit into from
Jul 8, 2020

Conversation

urso
Copy link

@urso urso commented Jul 8, 2020

Cherry-pick of PR #19632 to 7.x branch. Original message:

  • Refactoring

What does this PR do?

This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.

Review Notes

Although the PR is quite big, the main difference is that the ACKCount, ACKEvents, and ACKLastEvents handlers have been replaced by a single interface (beat.ACKer). The original ACKer implementations from libbeat/publisher/pipeline/acker.go and libbeat/publisher/pipeline/client_acker.go have been moved libbeat/common/acker. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The acker.Combine and acker.ConnectionOnly are the only new additions to the code base.

Why is it important?

tl;dr This change is required to integrate the v2 input API.

The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs.

In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration.

The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • [ ]

How to test this PR locally

ACKer implementations should be handled mostly by unit tests. Still, this change can have great impact and some Beats should be tested to double check that we have not introduced any regressions here.

  • Functionbeat:
    • check lambda function does not hang after publishing events (any kind of input will work)
    • check lambda function correctly stops in presence of processors that filter out events. E.g. setup kinesis and filter out events using drop_event with conditional. Check non-filtered events are published
    • check lambda function correctly stops if all events are filtered out
  • Choose one of the end-to-end ACKing inputs in filebeat (they all register similar handlers) and verify that end-to-end ACK works correctly. (e.g. Kafka input by monitoring the consumer group).

Related issues

Dev Docs:

Calls to pipeline.ConnectWith that used to setup an ACK callback need to use the acker helpers. Previously the callbacks have been ignored if the input is shutdown, but not all events have been ACKed yet. For end-to-end ACKers that loose the connection to the source-system on shutdown this behavior can be preserved by using ack.ConnectionOnly(<acker>).

Instead of directly passing callbacks, the callbacks should be wrapped using some of the utility functions in the libbeat/common/acker package:

  • Replace ACKCount: func(n int) { ... } with ACKHandler: acker.Counting(func(n int) { ... }).
  • Replace ACKEvents: func(private []interface{}) { ... } with ACKHandler: acker.EventPrivateReporter(func(_ int, private []interface{}) { ... }).
  • Replace ACKLastEvent: func(private interface{}) { ... } with ACKHandler: acker.LastEventPrivateReporter(func(_ int, interface{}) { ... })

The (beat.Pipeline).SetACKHandler method has been removed. libbeat/common/acker and libbeat/publisher/pipetool provide some helpers to modify and combine ACKers for all new beat.Client connections. For example this will use the global and local ACK handler for each event published.

pipeline = pipetool.WithACKer(pipeline, globalACKHandler)

...

client, err := pipeline.ConnectWith(beat.ClientConfig{
  ACKHandler: localACKHandler,
})

The WithACKer helper can be used arbitrarily often. ACKers are combined level by level via acker.Combine.

This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.

Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base.

The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs.

In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration.

The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.

(cherry picked from commit bb89344)
@urso urso added [zube]: In Review backport Team:Services (Deprecated) Label for the former Integrations-Services team labels Jul 8, 2020
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Jul 8, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/integrations-services (Team:Services)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Jul 8, 2020
@urso
Copy link
Author

urso commented Jul 8, 2020

relates tests are green. CI failed in kafka container setup and x-pack/winlogbeat environment seems to be missing venv

@urso urso merged commit a36910a into elastic:7.x Jul 8, 2020
@urso urso deleted the backport_19632_7.x branch July 8, 2020 20:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport Team:Services (Deprecated) Label for the former Integrations-Services team [zube]: Done
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants