Skip to content

Commit

Permalink
Composable ACKer (elastic#19632)
Browse files Browse the repository at this point in the history
This change replaces the ACK handler functions with a single interface
that makes it easier to combine ACK handlers.
The global ACK handler is removed from the pipeline, requiring Beats to
wrap and compose per input ACK handlers with their own ones.

Although the PR is quite big, the main difference is that the `ACKCount`, `ACKEvents`, and `ACKLastEvents` handlers have been replaced by a single interface (`beat.ACKer`). The original ACKer implementations from `libbeat/publisher/pipeline/acker.go` and `libbeat/publisher/pipeline/client_acker.go` have been moved `libbeat/common/acker`. The former private implementation is now exposed as Helpers for writing and combining ACK handlers. Support for global ACK handlers has been removed. The `acker.Combine` and `acker.ConnectionOnly` are the only new additions to the code base.

The global ACK handler support was introduced for filebeat, that did require some support for combine events from multiple inputs before applying state updates. With the introduction of the v2 input API this requirement will go away, as per input type managers are responsible for handling state update and ACKs.

In order to run old and new architecture in parallel, we need to combine ACK handling from input managers, existing input, custom registrar ACKer in filebeat, and event counting support (also via ACK handling) for shutdown. Exposing the interface and providing combinators (acker.Combine) for merging ACK handlers into one helps with the integration.

The v2 Input API gives implementors more flexibility in how to handle event publishing, coordination, and state handling shall be implemented. With the original ACK support the callbacks have been deregistered the moment inputs are stopped automatically. But for cursor based inputs we need to continue handling ACKs, even after the input is gone. The interface and helpers provide greater control over ACK handling after shutdown, which is required for the journald, winlog, and file/log inputs.
Steffen Siering authored Jul 6, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent e88743f commit bb89344
Showing 34 changed files with 955 additions and 1,480 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
@@ -49,6 +49,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114]
- `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114]
- Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135]
- Replace `ACKCount`, `ACKEvents`, and `ACKLastEvent` callbacks with `ACKHandler` and interface in `beat.ClientConfig`. {pull}19632[19632]
- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632]

==== Bugfixes

64 changes: 30 additions & 34 deletions filebeat/beater/acker.go
Original file line number Diff line number Diff line change
@@ -19,17 +19,11 @@ package beater

import (
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/logp"
)

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar or directly to the stateless logger.
type eventACKer struct {
stateful statefulLogger
stateless statelessLogger
log *logp.Logger
}

type statefulLogger interface {
Published(states []file.State)
}
@@ -38,35 +32,37 @@ type statelessLogger interface {
Published(c int) bool
}

func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}
// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar or directly to the stateless logger.
func eventACKer(statelessOut statelessLogger, statefulOut statefulLogger) beat.ACKer {
log := logp.NewLogger("acker")

func (a *eventACKer) ackEvents(data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}
return acker.EventPrivateReporter(func(_ int, data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}

st, ok := datum.(file.State)
if !ok {
stateless++
continue
}
st, ok := datum.(file.State)
if !ok {
stateless++
continue
}

states = append(states, st)
}
states = append(states, st)
}

if len(states) > 0 {
a.log.Debugw("stateful ack", "count", len(states))
a.stateful.Published(states)
}
if len(states) > 0 {
log.Debugw("stateful ack", "count", len(states))
statefulOut.Published(states)
}

if stateless > 0 {
a.log.Debugw("stateless ack", "count", stateless)
a.stateless.Published(stateless)
}
if stateless > 0 {
log.Debugw("stateless ack", "count", stateless)
statelessOut.Published(stateless)
}
})
}
9 changes: 7 additions & 2 deletions filebeat/beater/acker_test.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/beat"
)

type mockStatefulLogger struct {
@@ -78,9 +79,13 @@ func TestACKer(t *testing.T) {
sl := &mockStatelessLogger{}
sf := &mockStatefulLogger{}

h := newEventACKer(sl, sf)
h := eventACKer(sl, sf)

h.ackEvents(test.data)
for _, datum := range test.data {
h.AddEvent(beat.Event{Private: datum}, true)
}

h.ACKEvents(len(test.data))
assert.Equal(t, test.stateless, sl.count)
assert.Equal(t, test.stateful, sf.states)
})
24 changes: 14 additions & 10 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
@@ -229,16 +229,20 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)

err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
return err
}

fb.pipeline = pipetool.WithDefaultGuarantees(b.Publisher, beat.GuaranteedSend)
fb.pipeline = withPipelineEventCounter(fb.pipeline, wgEvents)
// setup event counting for startup and a global common ACKer, such that all events will be
// routed to the reigstrar after they've been ACKed.
// Events with Private==nil or the type of private != file.State are directly
// forwarded to `finishedLogger`. Events from the `logs` input will first be forwarded
// to the registrar via `registrarChannel`, which finally forwards the events to finishedLogger as well.
// The finishedLogger decrements the counters in wgEvents after all events have been securely processed
// by the registry.
fb.pipeline = withPipelineEventCounter(b.Publisher, wgEvents)
fb.pipeline = pipetool.WithACKer(fb.pipeline, eventACKer(finishedLogger, registrarChannel))

// Filebeat by default required infinite retry. Let's configure this for all
// inputs by default. Inputs (and InputController) can overwrite the sending
// guarantees explicitly when connecting with the pipeline.
fb.pipeline = pipetool.WithDefaultGuarantees(fb.pipeline, beat.GuaranteedSend)

outDone := make(chan struct{}) // outDone closes down all active pipeline connections
pipelineConnector := channel.NewOutletFactory(outDone).Create
15 changes: 9 additions & 6 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/logp"
@@ -69,13 +70,15 @@ func NewInput(
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
ACKEvents: func(events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
meta.handler.ack(meta.message)
ACKHandler: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, events []interface{}) {
for _, event := range events {
if meta, ok := event.(eventMeta); ok {
meta.handler.ack(meta.message)
}
}
}
},
}),
),
CloseRef: doneChannelContext(inputContext.Done),
WaitClose: config.WaitClose,
})
11 changes: 6 additions & 5 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@ import (

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/logp"
)

@@ -145,8 +146,8 @@ func (inp *managedInput) runSource(
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKEvents: newInputACKHandler(ctx.Logger),
CloseRef: ctx.Cancelation,
ACKHandler: newInputACKHandler(ctx.Logger),
})
if err != nil {
return err
@@ -174,8 +175,8 @@ func (inp *managedInput) createSourceID(s Source) string {
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name())
}

func newInputACKHandler(log *logp.Logger) func([]interface{}) {
return func(private []interface{}) {
func newInputACKHandler(log *logp.Logger) beat.ACKer {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
var n uint
var last int
for i := 0; i < len(private); i++ {
@@ -196,5 +197,5 @@ func newInputACKHandler(log *logp.Logger) func([]interface{}) {
return
}
private[last].(*updateOp).Execute(n)
}
})
}
22 changes: 7 additions & 15 deletions filebeat/input/v2/input-cursor/manager_test.go
Original file line number Diff line number Diff line change
@@ -434,24 +434,16 @@ func TestManager_InputsRun(t *testing.T) {
defer cancel()

// setup publishing pipeline and capture ACKer, so we can simulate progress in the Output
var acker func([]interface{})
var activeEventPrivate []interface{}

ackEvents := func(n int) {
data, rest := activeEventPrivate[:n], activeEventPrivate[n:]
activeEventPrivate = rest
acker(data)
}

var acker beat.ACKer
var wgACKer sync.WaitGroup
wgACKer.Add(1)
pipeline := &pubtest.FakeConnector{
ConnectFunc: func(cfg beat.ClientConfig) (beat.Client, error) {
defer wgACKer.Done()
acker = cfg.ACKEvents
acker = cfg.ACKHandler
return &pubtest.FakeClient{
PublishFunc: func(event beat.Event) {
activeEventPrivate = append(activeEventPrivate, event.Private)
acker.AddEvent(event, true)
},
}, nil
},
@@ -478,19 +470,19 @@ func TestManager_InputsRun(t *testing.T) {
require.Equal(t, nil, store.snapshot()["test::key"].Cursor)

// ACK first 2 events and check snapshot state
ackEvents(2)
acker.ACKEvents(2)
require.Equal(t, "test-cursor-state2", store.snapshot()["test::key"].Cursor)

// ACK 1 events and check snapshot state (3 events published)
ackEvents(1)
acker.ACKEvents(1)
require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor)

// ACK event without cursor update and check snapshot state not modified
ackEvents(1)
acker.ACKEvents(1)
require.Equal(t, "test-cursor-state3", store.snapshot()["test::key"].Cursor)

// ACK rest
ackEvents(3)
acker.ACKEvents(3)
require.Equal(t, "test-cursor-state6", store.snapshot()["test::key"].Cursor)
})
}
28 changes: 12 additions & 16 deletions journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
@@ -27,8 +27,10 @@ import (
"github.com/elastic/beats/v7/journalbeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

"github.com/elastic/beats/v7/journalbeat/config"
_ "github.com/elastic/beats/v7/journalbeat/include"
@@ -67,7 +69,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {

var inputs []*input.Input
for _, c := range config.Inputs {
i, err := input.New(c, b, done, cp.States())
i, err := input.New(c, b.Info, done, cp.States())
if err != nil {
return nil, err
}
@@ -91,34 +93,28 @@ func (bt *Journalbeat) Run(b *beat.Beat) error {
bt.logger.Info("journalbeat is running! Hit CTRL-C to stop it.")
defer bt.logger.Info("journalbeat is stopping")

err := bt.pipeline.SetACKHandler(beat.PipelineACKHandler{
ACKLastEvents: func(data []interface{}) {
for _, datum := range data {
if st, ok := datum.(checkpoint.JournalState); ok {
bt.checkpoint.PersistState(st)
}
}
},
})
if err != nil {
return err
}
defer bt.checkpoint.Shutdown()

pipeline := pipetool.WithACKer(b.Publisher, acker.LastEventPrivateReporter(func(_ int, private interface{}) {
if st, ok := private.(checkpoint.JournalState); ok {
bt.checkpoint.PersistState(st)
}
}))

var wg sync.WaitGroup
for _, i := range bt.inputs {
wg.Add(1)
go bt.runInput(i, &wg)
go bt.runInput(i, &wg, pipeline)
}

wg.Wait()

return nil
}

func (bt *Journalbeat) runInput(i *input.Input, wg *sync.WaitGroup) {
func (bt *Journalbeat) runInput(i *input.Input, wg *sync.WaitGroup, pipeline beat.Pipeline) {
defer wg.Done()
i.Run()
i.Run(pipeline)
}

// Stop stops the beat and its inputs.
15 changes: 7 additions & 8 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"

"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"

"github.com/elastic/beats/v7/journalbeat/checkpoint"
@@ -38,7 +39,6 @@ type Input struct {
readers []*reader.Reader
done chan struct{}
config Config
pipeline beat.Pipeline
client beat.Client
states map[string]checkpoint.JournalState
logger *logp.Logger
@@ -49,7 +49,7 @@ type Input struct {
// New returns a new Inout
func New(
c *common.Config,
b *beat.Beat,
info beat.Info,
done chan struct{},
states map[string]checkpoint.JournalState,
) (*Input, error) {
@@ -104,7 +104,7 @@ func New(
readers = append(readers, r)
}

inputProcessors, err := processorsForInput(b.Info, config)
inputProcessors, err := processorsForInput(info, config)
if err != nil {
return nil, err
}
@@ -115,7 +115,6 @@ func New(
readers: readers,
done: done,
config: config,
pipeline: b.Publisher,
states: states,
logger: logger,
eventMeta: config.EventMetadata,
@@ -125,18 +124,18 @@ func New(

// Run connects to the output, collects entries from the readers
// and then publishes the events.
func (i *Input) Run() {
func (i *Input) Run(pipeline beat.Pipeline) {
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
i.client, err = pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
},
ACKCount: func(n int) {
ACKHandler: acker.Counting(func(n int) {
i.logger.Debugw("journalbeat successfully published events", "event.count", n)
},
}),
})
if err != nil {
i.logger.Error("Error connecting to output: %v", err)
73 changes: 33 additions & 40 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
@@ -23,17 +23,15 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
)

// Pipeline provides access to libbeat event publishing by creating a Client
// instance.
type Pipeline interface {
PipelineConnector
SetACKHandler(PipelineACKHandler) error
}

// PipelineConnector creates a publishing Client. This is typically backed by a Pipeline.
type PipelineConnector interface {
ConnectWith(ClientConfig) (Client, error)
Connect() (Client, error)
}

type PipelineConnector = Pipeline

// Client holds a connection to the beats publisher pipeline
type Client interface {
Publish(Event)
@@ -56,28 +54,38 @@ type ClientConfig struct {
// is configured
WaitClose time.Duration

// Configure ACK callback.
ACKHandler ACKer

// Events configures callbacks for common client callbacks
Events ClientEventer
}

// ACK handler strategies.
// Note: ack handlers are run in another go-routine owned by the publisher pipeline.
// They should not block for to long, to not block the internal buffers for
// too long (buffers can only be freed after ACK has been processed).
// Note: It's not supported to configure multiple ack handler types. Use at
// most one.

// ACKCount reports the number of published events recently acknowledged
// by the pipeline.
ACKCount func(int)

// ACKEvents reports the events private data of recently acknowledged events.
// Note: The slice passed must be copied if the events are to be processed
// after the handler returns.
ACKEvents func([]interface{})

// ACKLastEvent reports the last ACKed event out of a batch of ACKed events only.
// Only the events 'Private' field will be reported.
ACKLastEvent func(interface{})
// ACKer can be registered with a Client when connecting to the pipeline.
// The ACKer will be informed when events are added or dropped by the processors,
// and when an event has been ACKed by the outputs.
//
// Due to event publishing and ACKing are asynchronous operations, the
// operations on ACKer are normally executed in different go routines. ACKers
// are required to be multi-threading safe.
type ACKer interface {
// AddEvent informs the ACKer that a new event has been send to the client.
// AddEvent is called after the processors have handled the event. If the
// event has been dropped by the processor `published` will be set to true.
// This allows the ACKer to do some bookeeping for dropped events.
AddEvent(event Event, published bool)

// ACK Events from the output and pipeline queue are forwarded to ACKEvents.
// The number of reported events only matches the known number of events downstream.
// ACKers might need to keep track of dropped events by themselves.
ACKEvents(n int)

// Close informs the ACKer that the Client used to publish to the pipeline has been closed.
// No new events should be published anymore. The ACKEvents method still will be actively called
// as long as there are pending events for the client in the pipeline. The Close signal can be used
// to supress any ACK event propagation if required.
// Close might be called from another go-routine than AddEvent and ACKEvents.
Close()
}

// CloseRef allows users to close the client asynchronously.
@@ -128,21 +136,6 @@ type ClientEventer interface {
DroppedOnPublish(Event) // event has been dropped, while waiting for the queue
}

// PipelineACKHandler configures some pipeline-wide event ACK handler.
type PipelineACKHandler struct {
// ACKCount reports the number of published events recently acknowledged
// by the pipeline.
ACKCount func(int)

// ACKEvents reports the events recently acknowledged by the pipeline.
// Only the events 'Private' field will be reported.
ACKEvents func([]interface{})

// ACKLastEvent reports the last ACKed event per pipeline client.
// Only the events 'Private' field will be reported.
ACKLastEvents func([]interface{})
}

type ProcessorList interface {
Processor
All() []Processor
341 changes: 341 additions & 0 deletions libbeat/common/acker/acker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package acker

import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
)

// Nil creates an ACKer that does nothing.
func Nil() beat.ACKer {
return nilACKer{}
}

type nilACKer struct{}

func (nilACKer) AddEvent(event beat.Event, published bool) {}
func (nilACKer) ACKEvents(n int) {}
func (nilACKer) Close() {}

// RawCounting reports the number of ACKed events as has been reported by the outputs or queue.
// The ACKer does not keep track of dropped events. Events after the client has
// been closed will still be reported.
func RawCounting(fn func(int)) beat.ACKer {
return countACKer(fn)
}

type countACKer func(int)

func (countACKer) AddEvent(_ beat.Event, _ bool) {}
func (fn countACKer) ACKEvents(n int) { fn(n) }
func (countACKer) Close() {}

// TrackingCounter keeps track of published and dropped events. It reports
// the number of acked events from the queue in the 'acked' argument and the
// total number of events published via the Client in the 'total' argument.
// The TrackingCountACKer keeps track of the order of events being send and events being acked.
// If N events have been acked by the output, then `total` will include all events dropped in between
// the last forwarded N events and the 'tail' of dropped events. For example (X = send, D = dropped):
//
// index: 0 1 2 3 4 5 6 7 8 9 10 11
// event: X X D D X D D X D X X X
//
// If the output ACKs 3 events, then all events from index 0 to 6 will be reported because:
// - the drop sequence for events 2 and 3 is inbetween the number of forwarded and ACKed events
// - events 5-6 have been dropped as well, but event 7 is not ACKed yet
//
// If there is no event currently tracked by this ACKer and the next event is dropped by the processors,
// then `fn` will be called immediately with acked=0 and total=1.
func TrackingCounter(fn func(acked, total int)) beat.ACKer {
a := &trackingACKer{fn: fn}
init := &gapInfo{}
a.lst.head = init
a.lst.tail = init
return a
}

// Counting returns an ACK count for all events a client has tried to publish.
// The ACKer keeps track of dropped events as well, and adjusts the ACK from the outputs accordingly.
func Counting(fn func(n int)) beat.ACKer {
return TrackingCounter(func(_ int, total int) {
fn(total)
})
}

type trackingACKer struct {
fn func(acked, total int)
events atomic.Uint32
lst gapList
}

type gapList struct {
sync.Mutex
head, tail *gapInfo
}

type gapInfo struct {
sync.Mutex
next *gapInfo
send, dropped int
}

func (a *trackingACKer) AddEvent(_ beat.Event, published bool) {
a.events.Inc()
if published {
a.addPublishedEvent()
} else {
a.addDropEvent()
}
}

// addPublishedEvent increments the 'send' counter in the current gapInfo
// element in the tail of the list. If events have been dropped, we append a
// new empty gapInfo element.
func (a *trackingACKer) addPublishedEvent() {
a.lst.Lock()

current := a.lst.tail
current.Lock()
if current.dropped > 0 {
tmp := &gapInfo{}
tmp.Lock()

a.lst.tail.next = tmp
a.lst.tail = tmp
current.Unlock()
current = tmp
}
a.lst.Unlock()

current.send++
current.Unlock()
}

// addDropEvent increments the 'dropped' counter in the gapInfo element in the
// tail of the list. The callback will be run with total=1 and acked=0 if the
// acker state is empty and no events have been send yet.
func (a *trackingACKer) addDropEvent() {
a.lst.Lock()

current := a.lst.tail
current.Lock()

if current.send == 0 && current.next == nil {
// send can only be 0 if no no events/gaps present yet
if a.lst.head != a.lst.tail {
panic("gap list expected to be empty")
}

a.fn(0, 1)
a.lst.Unlock()
current.Unlock()

a.events.Dec()
return
}

a.lst.Unlock()
current.dropped++
current.Unlock()
}

func (a *trackingACKer) ACKEvents(n int) {
var (
total = 0
acked = n
emptyLst bool
)

for n > 0 {
if emptyLst {
panic("too many events acked")
}

a.lst.Lock()
current := a.lst.head
current.Lock()

// advance list if we detect that the current head will be completely consumed
// by this ACK event.
if n >= current.send {
next := current.next
emptyLst = next == nil
if !emptyLst {
// advance list all event in current entry have been send and list as
// more then 1 gapInfo entry. If only 1 entry is present, list item will be
// reset and reused
a.lst.head = next
}
}
// hand over lock list-entry, so ACK handler and producer can operate
// on potentially different list ends
a.lst.Unlock()

if n < current.send {
current.send -= n
total += n
n = 0
} else {
total += current.send + current.dropped
n -= current.send
current.dropped = 0
current.send = 0
}
current.Unlock()
}

a.events.Sub(uint32(total))
a.fn(acked, total)
}

func (a *trackingACKer) Close() {}

// EventPrivateReporter reports all private fields from all events that have
// been published or removed.
//
// The EventPrivateFieldsACKer keeps track of the order of events being send
// and events being acked. If N events have been acked by the output, then
// `total` will include all events dropped in between the last forwarded N
// events and the 'tail' of dropped events. For example (X = send, D =
// dropped):
//
// index: 0 1 2 3 4 5 6 7 8 9 10 11
// event: X X D D X D D X D X X X
//
// If the output ACKs 3 events, then all events from index 0 to 6 will be reported because:
// - the drop sequence for events 2 and 3 is inbetween the number of forwarded and ACKed events
// - events 5-6 have been dropped as well, but event 7 is not ACKed yet
func EventPrivateReporter(fn func(acked int, data []interface{})) beat.ACKer {
a := &eventDataACKer{fn: fn}
a.ACKer = TrackingCounter(a.onACK)
return a
}

type eventDataACKer struct {
beat.ACKer
mu sync.Mutex
data []interface{}
fn func(acked int, data []interface{})
}

func (a *eventDataACKer) AddEvent(event beat.Event, published bool) {
a.mu.Lock()
a.data = append(a.data, event.Private)
a.mu.Unlock()
a.ACKer.AddEvent(event, published)
}

func (a *eventDataACKer) onACK(acked, total int) {
if total == 0 {
return
}

a.mu.Lock()
data := a.data[:total]
a.data = a.data[total:]
a.mu.Unlock()

if len(data) > 0 {
a.fn(acked, data)
}
}

// LastEventPrivateReporter reports only the 'latest' published and acked
// event if a batch of events have been ACKed.
func LastEventPrivateReporter(fn func(acked int, data interface{})) beat.ACKer {
ignored := 0
return EventPrivateReporter(func(acked int, data []interface{}) {
for i := len(data) - 1; i >= 0; i-- {
if d := data[i]; d != nil {
fn(ignored+acked, d)
ignored = 0
return
}
}

// complete batch has been ignored due to missing data -> add count
ignored += acked
})
}

// Combine forwards events to a list of ackers.
func Combine(as ...beat.ACKer) beat.ACKer {
return ackerList(as)
}

type ackerList []beat.ACKer

func (l ackerList) AddEvent(event beat.Event, published bool) {
for _, a := range l {
a.AddEvent(event, published)
}
}

func (l ackerList) ACKEvents(n int) {
for _, a := range l {
a.ACKEvents(n)
}
}

func (l ackerList) Close() {
for _, a := range l {
a.Close()
}
}

// ConnectionOnly ensures that the given ACKer is only used for as long as the
// pipeline Client is active. Once the Client is closed, the ACKer will drop
// its internal state and no more ACK events will be processed.
func ConnectionOnly(a beat.ACKer) beat.ACKer {
return &clientOnlyACKer{acker: a}
}

type clientOnlyACKer struct {
mu sync.Mutex
acker beat.ACKer
}

func (a *clientOnlyACKer) AddEvent(event beat.Event, published bool) {
a.mu.Lock()
defer a.mu.Unlock()
if sub := a.acker; sub != nil {
sub.AddEvent(event, published)
}
}

func (a *clientOnlyACKer) ACKEvents(n int) {
a.mu.Lock()
sub := a.acker
a.mu.Unlock()
if sub != nil {
sub.ACKEvents(n)
}
}

func (a *clientOnlyACKer) Close() {
a.mu.Lock()
sub := a.acker
a.acker = nil // drop the internal ACKer on Close and allow the runtime to gc accumulated state.
a.mu.Unlock()
if sub != nil {
sub.Close()
}
}
250 changes: 250 additions & 0 deletions libbeat/common/acker/acker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package acker

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
)

type fakeACKer struct {
AddEventFunc func(event beat.Event, published bool)
ACKEventsFunc func(n int)
CloseFunc func()
}

func TestNil(t *testing.T) {
acker := Nil()
require.NotNil(t, acker)

// check acker can be used without panic:
acker.AddEvent(beat.Event{}, false)
acker.AddEvent(beat.Event{}, true)
acker.ACKEvents(3)
acker.Close()
}

func TestCounting(t *testing.T) {
t.Run("ack count is passed through", func(t *testing.T) {
var n int
acker := RawCounting(func(acked int) { n = acked })
acker.ACKEvents(3)
require.Equal(t, 3, n)
})
}

func TestTracking(t *testing.T) {
t.Run("dropped event is acked immediately if empty", func(t *testing.T) {
var acked, total int
TrackingCounter(func(a, t int) { acked, total = a, t }).AddEvent(beat.Event{}, false)
require.Equal(t, 0, acked)
require.Equal(t, 1, total)
})

t.Run("no dropped events", func(t *testing.T) {
var acked, total int
acker := TrackingCounter(func(a, t int) { acked, total = a, t })
acker.AddEvent(beat.Event{}, true)
acker.AddEvent(beat.Event{}, true)
acker.ACKEvents(2)
require.Equal(t, 2, acked)
require.Equal(t, 2, total)
})

t.Run("acking published includes dropped events in middle", func(t *testing.T) {
var acked, total int
acker := TrackingCounter(func(a, t int) { acked, total = a, t })
acker.AddEvent(beat.Event{}, true)
acker.AddEvent(beat.Event{}, false)
acker.AddEvent(beat.Event{}, false)
acker.AddEvent(beat.Event{}, true)
acker.ACKEvents(2)
require.Equal(t, 2, acked)
require.Equal(t, 4, total)
})

t.Run("acking published includes dropped events at end of ACK interval", func(t *testing.T) {
var acked, total int
acker := TrackingCounter(func(a, t int) { acked, total = a, t })
acker.AddEvent(beat.Event{}, true)
acker.AddEvent(beat.Event{}, true)
acker.AddEvent(beat.Event{}, false)
acker.AddEvent(beat.Event{}, false)
acker.AddEvent(beat.Event{}, true)
acker.ACKEvents(2)
require.Equal(t, 2, acked)
require.Equal(t, 4, total)
})

t.Run("partial ACKs", func(t *testing.T) {
var acked, total int
acker := TrackingCounter(func(a, t int) { acked, total = a, t })
acker.AddEvent(beat.Event{}, true)
acker.AddEvent(beat.Event{}, true)
acker.AddEvent(beat.Event{}, true)
acker.AddEvent(beat.Event{}, true)
acker.AddEvent(beat.Event{}, false)
acker.AddEvent(beat.Event{}, true)
acker.AddEvent(beat.Event{}, true)

acker.ACKEvents(2)
require.Equal(t, 2, acked)
require.Equal(t, 2, total)

acker.ACKEvents(2)
require.Equal(t, 2, acked)
require.Equal(t, 3, total)
})
}

func TestEventPrivateReporter(t *testing.T) {
t.Run("dropped event is acked immediately if empty", func(t *testing.T) {
var acked int
var data []interface{}
acker := EventPrivateReporter(func(a int, d []interface{}) { acked, data = a, d })
acker.AddEvent(beat.Event{Private: 1}, false)
require.Equal(t, 0, acked)
require.Equal(t, []interface{}{1}, data)
})

t.Run("no dropped events", func(t *testing.T) {
var acked int
var data []interface{}
acker := EventPrivateReporter(func(a int, d []interface{}) { acked, data = a, d })
acker.AddEvent(beat.Event{Private: 1}, true)
acker.AddEvent(beat.Event{Private: 2}, true)
acker.AddEvent(beat.Event{Private: 3}, true)
acker.ACKEvents(3)
require.Equal(t, 3, acked)
require.Equal(t, []interface{}{1, 2, 3}, data)
})

t.Run("private of dropped events is included", func(t *testing.T) {
var acked int
var data []interface{}
acker := EventPrivateReporter(func(a int, d []interface{}) { acked, data = a, d })
acker.AddEvent(beat.Event{Private: 1}, true)
acker.AddEvent(beat.Event{Private: 2}, false)
acker.AddEvent(beat.Event{Private: 3}, true)
acker.ACKEvents(2)
require.Equal(t, 2, acked)
require.Equal(t, []interface{}{1, 2, 3}, data)
})
}

func TestLastEventPrivateReporter(t *testing.T) {
t.Run("dropped event with private is acked immediately if empty", func(t *testing.T) {
var acked int
var datum interface{}
acker := LastEventPrivateReporter(func(a int, d interface{}) { acked, datum = a, d })
acker.AddEvent(beat.Event{Private: 1}, false)
require.Equal(t, 0, acked)
require.Equal(t, 1, datum)
})

t.Run("dropped event without private is ignored", func(t *testing.T) {
var called bool
acker := LastEventPrivateReporter(func(_ int, _ interface{}) { called = true })
acker.AddEvent(beat.Event{Private: nil}, false)
require.False(t, called)
})

t.Run("no dropped events", func(t *testing.T) {
var acked int
var data interface{}
acker := LastEventPrivateReporter(func(a int, d interface{}) { acked, data = a, d })
acker.AddEvent(beat.Event{Private: 1}, true)
acker.AddEvent(beat.Event{Private: 2}, true)
acker.AddEvent(beat.Event{Private: 3}, true)
acker.ACKEvents(3)
require.Equal(t, 3, acked)
require.Equal(t, 3, data)
})
}

func TestCombine(t *testing.T) {
t.Run("AddEvent distributes", func(t *testing.T) {
var a1, a2 int
acker := Combine(countACKerOps(&a1, nil, nil), countACKerOps(&a2, nil, nil))
acker.AddEvent(beat.Event{}, true)
require.Equal(t, 1, a1)
require.Equal(t, 1, a2)
})

t.Run("ACKEvents distributes", func(t *testing.T) {
var a1, a2 int
acker := Combine(countACKerOps(nil, &a1, nil), countACKerOps(nil, &a2, nil))
acker.ACKEvents(1)
require.Equal(t, 1, a1)
require.Equal(t, 1, a2)
})

t.Run("Close distributes", func(t *testing.T) {
var c1, c2 int
acker := Combine(countACKerOps(nil, nil, &c1), countACKerOps(nil, nil, &c2))
acker.Close()
require.Equal(t, 1, c1)
require.Equal(t, 1, c2)
})
}

func TestConnectionOnly(t *testing.T) {
t.Run("passes ACKs if not closed", func(t *testing.T) {
var n int
acker := ConnectionOnly(RawCounting(func(acked int) { n = acked }))
acker.ACKEvents(3)
require.Equal(t, 3, n)
})

t.Run("ignores ACKs after close", func(t *testing.T) {
var n int
acker := ConnectionOnly(RawCounting(func(acked int) { n = acked }))
acker.Close()
acker.ACKEvents(3)
require.Equal(t, 0, n)
})
}

func countACKerOps(add, acked, close *int) beat.ACKer {
return &fakeACKer{
AddEventFunc: func(_ beat.Event, _ bool) { *add++ },
ACKEventsFunc: func(_ int) { *acked++ },
CloseFunc: func() { *close++ },
}
}

func (f *fakeACKer) AddEvent(event beat.Event, published bool) {
if f.AddEventFunc != nil {
f.AddEventFunc(event, published)
}
}

func (f *fakeACKer) ACKEvents(n int) {
if f.ACKEventsFunc != nil {
f.ACKEventsFunc(n)
}
}

func (f *fakeACKer) Close() {
if f.CloseFunc != nil {
f.CloseFunc()
}
}
499 changes: 0 additions & 499 deletions libbeat/publisher/pipeline/acker.go

This file was deleted.

53 changes: 0 additions & 53 deletions libbeat/publisher/pipeline/acker_test.go

This file was deleted.

113 changes: 89 additions & 24 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ package pipeline

import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
@@ -37,7 +38,8 @@ type client struct {
processors beat.Processor
producer queue.Producer
mutex sync.Mutex
acker acker
acker beat.ACKer
waiter *clientCloseWaiter

eventFlags publisher.EventFlags
canDrop bool
@@ -52,6 +54,15 @@ type client struct {
eventer beat.ClientEventer
}

type clientCloseWaiter struct {
events atomic.Uint32
closing atomic.Bool

signalAll chan struct{} // ack loop notifies `close` that all events have been acked
signalDone chan struct{} // shutdown handler telling `wait` that shutdown has been completed
waitClose time.Duration
}

func (c *client) PublishAll(events []beat.Event) {
c.mutex.Lock()
defer c.mutex.Unlock()
@@ -99,13 +110,7 @@ func (c *client) publish(e beat.Event) {
e = *event
}

open := c.acker.addEvent(e, publish)
if !open {
// client is closing down -> report event as dropped and return
c.onDroppedOnPublish(e)
return
}

c.acker.AddEvent(e, publish)
if !publish {
c.onFilteredOut(e)
return
@@ -143,34 +148,30 @@ func (c *client) Close() error {

// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.doClose()
log.Debug("client: wait for acker to finish")
c.acker.wait()
log.Debug("client: acker shut down")
return nil
}

func (c *client) doClose() {
c.closeOnce.Do(func() {
close(c.done)

log := c.logger()

c.isOpen.Store(false)
c.onClosing()

log.Debug("client: closing acker")
c.acker.close() // this must trigger a direct/indirect call to 'unlink'
c.waiter.signalClose()
c.waiter.wait()

c.acker.Close()
log.Debug("client: done closing acker")

log.Debug("client: unlink from queue")
c.unlink()
log.Debug("client: done unlink")
})
return nil
}

// unlink is the final step of closing a client. It must be executed only after
// it is guaranteed that the underlying acker has been closed and will not
// accept any new publish or ACK events.
// This method is normally registered with the ACKer and triggered by it.
// unlink is the final step of closing a client. It cancells the connect of the
// client as producer to the queue.
func (c *client) unlink() {
log := c.logger()
log.Debug("client: done closing acker")

n := c.producer.Cancel() // close connection to queue
log.Debugf("client: cancelled %v events", n)
@@ -233,3 +234,67 @@ func (c *client) onDroppedOnPublish(e beat.Event) {
c.eventer.DroppedOnPublish(e)
}
}

func newClientCloseWaiter(timeout time.Duration) *clientCloseWaiter {
return &clientCloseWaiter{
signalAll: make(chan struct{}, 1),
signalDone: make(chan struct{}),
waitClose: timeout,
}
}

func (w *clientCloseWaiter) AddEvent(_ beat.Event, published bool) {
if published {
w.events.Inc()
}
}

func (w *clientCloseWaiter) ACKEvents(n int) {
value := w.events.Sub(uint32(n))
if value != 0 {
return
}

// send done signal, if close is waiting
if w.closing.Load() {
w.signalAll <- struct{}{}
}
}

// The Close signal from the pipeline is ignored. Instead the client
// explicitely uses `signalClose` and `wait` before it continues with the
// closing sequence.
func (w *clientCloseWaiter) Close() {}

func (w *clientCloseWaiter) signalClose() {
if w == nil {
return
}

w.closing.Store(false)
if w.events.Load() == 0 {
w.finishClose()
return
}

// start routine to propagate shutdown signals or timeouts to anyone
// being blocked in wait.
go func() {
defer w.finishClose()

select {
case <-w.signalAll:
case <-time.After(w.waitClose):
}
}()
}

func (w *clientCloseWaiter) finishClose() {
close(w.signalDone)
}

func (w *clientCloseWaiter) wait() {
if w != nil {
<-w.signalDone
}
}
118 changes: 0 additions & 118 deletions libbeat/publisher/pipeline/client_ack.go

This file was deleted.

16 changes: 1 addition & 15 deletions libbeat/publisher/pipeline/config.go
Original file line number Diff line number Diff line change
@@ -48,23 +48,9 @@ func validateClientConfig(c *beat.ClientConfig) error {
return fmt.Errorf("unknown publish mode %v", m)
}

fnCount := 0
countPtr := func(b bool) {
if b {
fnCount++
}
}

countPtr(c.ACKCount != nil)
countPtr(c.ACKEvents != nil)
countPtr(c.ACKLastEvent != nil)
if fnCount > 1 {
return fmt.Errorf("At most one of ACKCount, ACKEvents, ACKLastEvent can be configured")
}

// ACK handlers can not be registered DropIfFull is set, as dropping events
// due to full broker can not be accounted for in the clients acker.
if fnCount != 0 && withDrop {
if c.ACKHandler != nil && withDrop {
return errors.New("ACK handlers with DropIfFull mode not supported")
}

39 changes: 27 additions & 12 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,9 @@
package pipeline

import (
"errors"
"sync"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
@@ -29,13 +32,12 @@ import (
// is receiving cancelled batches from outputs to be closed on output reloading.
type eventConsumer struct {
logger *logp.Logger
done chan struct{}

ctx *batchContext
ctx *batchContext

pause atomic.Bool
wait atomic.Bool
sig chan consumerSignal
wg sync.WaitGroup

queue queue.Queue
consumer queue.Consumer
@@ -55,16 +57,18 @@ const (
sigConsumerCheck consumerEventTag = iota
sigConsumerUpdateOutput
sigConsumerUpdateInput
sigStop
)

var errStopped = errors.New("stopped")

func newEventConsumer(
log *logp.Logger,
queue queue.Queue,
ctx *batchContext,
) *eventConsumer {
c := &eventConsumer{
logger: log,
done: make(chan struct{}),
sig: make(chan consumerSignal, 3),
out: nil,

@@ -74,13 +78,19 @@ func newEventConsumer(
}

c.pause.Store(true)
go c.loop(c.consumer)

c.wg.Add(1)
go func() {
defer c.wg.Done()
c.loop(c.consumer)
}()
return c
}

func (c *eventConsumer) close() {
c.consumer.Close()
close(c.done)
c.sig <- consumerSignal{tag: sigStop}
c.wg.Wait()
}

func (c *eventConsumer) sigWait() {
@@ -142,8 +152,11 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
paused = true
)

handleSignal := func(sig consumerSignal) {
handleSignal := func(sig consumerSignal) error {
switch sig.tag {
case sigStop:
return errStopped

case sigConsumerCheck:

case sigConsumerUpdateOutput:
@@ -159,6 +172,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
} else {
out = nil
}
return nil
}

for {
@@ -182,17 +196,18 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {

select {
case sig := <-c.sig:
handleSignal(sig)
if err := handleSignal(sig); err != nil {
return
}
continue
default:
}

select {
case <-c.done:
log.Debug("stop pipeline event consumer")
return
case sig := <-c.sig:
handleSignal(sig)
if err := handleSignal(sig); err != nil {
return
}
case out <- batch:
batch = nil
if paused {
27 changes: 8 additions & 19 deletions libbeat/publisher/pipeline/nilpipeline.go
Original file line number Diff line number Diff line change
@@ -24,10 +24,8 @@ import (
type nilPipeline struct{}

type nilClient struct {
eventer beat.ClientEventer
ackCount func(int)
ackEvents func([]interface{})
ackLastEvent func(interface{})
eventer beat.ClientEventer
acker beat.ACKer
}

var _nilPipeline = (*nilPipeline)(nil)
@@ -44,10 +42,8 @@ func (p *nilPipeline) Connect() (beat.Client, error) {

func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
return &nilClient{
eventer: cfg.Events,
ackCount: cfg.ACKCount,
ackEvents: cfg.ACKEvents,
ackLastEvent: cfg.ACKLastEvent,
eventer: cfg.Events,
acker: cfg.ACKHandler,
}, nil
}

@@ -61,18 +57,11 @@ func (c *nilClient) PublishAll(events []beat.Event) {
return
}

if c.ackLastEvent != nil {
c.ackLastEvent(events[L-1].Private)
}
if c.ackEvents != nil {
tmp := make([]interface{}, L)
for i := range events {
tmp[i] = events[i].Private
if c.acker != nil {
for _, event := range events {
c.acker.AddEvent(event, true)
}
c.ackEvents(tmp)
}
if c.ackCount != nil {
c.ackCount(L)
c.acker.ACKEvents(len(events))
}
}

92 changes: 26 additions & 66 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -21,13 +21,13 @@
package pipeline

import (
"errors"
"reflect"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/logp"
@@ -71,11 +71,7 @@ type Pipeline struct {
waitCloser *waitCloser

// pipeline ack
ackMode pipelineACKMode
ackActive atomic.Bool
ackDone chan struct{}
ackBuilder ackBuilder
eventSema *sema
eventSema *sema

// closeRef signal propagation support
guardStartSigPropagation sync.Once
@@ -128,7 +124,6 @@ type pipelineEventer struct {

observer queueObserver
waitClose *waitCloser
cb *pipelineEventCB
}

type waitCloser struct {
@@ -162,8 +157,6 @@ func New(
waitCloseTimeout: settings.WaitClose,
processors: settings.Processors,
}
p.ackBuilder = &pipelineEmptyACK{p}
p.ackActive = atomic.MakeBool(true)

if monitors.Metrics != nil {
p.observer = newMetricsObserver(monitors.Metrics)
@@ -197,45 +190,6 @@ func New(
return p, nil
}

// SetACKHandler sets a global ACK handler on all events published to the pipeline.
// SetACKHandler must be called before any connection is made.
func (p *Pipeline) SetACKHandler(handler beat.PipelineACKHandler) error {
p.eventer.mutex.Lock()
defer p.eventer.mutex.Unlock()

if !p.eventer.modifyable {
return errors.New("can not set ack handler on already active pipeline")
}

// TODO: check only one type being configured

cb, err := newPipelineEventCB(handler)
if err != nil {
return err
}

if cb == nil {
p.ackBuilder = &pipelineEmptyACK{p}
p.eventer.cb = nil
return nil
}

p.eventer.cb = cb
if cb.mode == countACKMode {
p.ackBuilder = &pipelineCountACK{
pipeline: p,
cb: cb.onCounts,
}
} else {
p.ackBuilder = &pipelineEventsACK{
pipeline: p,
cb: cb.onEvents,
}
}

return nil
}

// Close stops the pipeline, outputs and queue.
// If WaitClose with WaitOnPipelineClose mode is configured, Close will block
// for a duration of WaitClose, if there are still active events in the pipeline.
@@ -292,9 +246,8 @@ func (p *Pipeline) Connect() (beat.Client, error) {
// If not set otherwise the defaut publish mode is OutputChooses.
func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
var (
canDrop bool
dropOnCancel bool
eventFlags publisher.EventFlags
canDrop bool
eventFlags publisher.EventFlags
)

err := validateClientConfig(&cfg)
@@ -309,7 +262,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
switch cfg.PublishMode {
case beat.GuaranteedSend:
eventFlags = publisher.GuaranteedSend
dropOnCancel = true
case beat.DropIfFull:
canDrop = true
}
@@ -343,12 +295,9 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
reportEvents: reportEvents,
}

acker := p.makeACKer(processors != nil, &cfg, waitClose, client.unlink)
producerCfg := queue.ProducerConfig{
// Cancel events from queue if acker is configured
// and no pipeline-wide ACK handler is registered.
DropOnCancel: dropOnCancel && acker != nil && p.eventer.cb == nil,
}
ackHandler := cfg.ACKHandler

producerCfg := queue.ProducerConfig{}

if reportEvents || cfg.Events != nil {
producerCfg.OnDrop = func(event beat.Event) {
@@ -361,13 +310,27 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
}
}

if acker != nil {
producerCfg.ACK = acker.ackEvents
var waiter *clientCloseWaiter
if waitClose > 0 {
waiter = newClientCloseWaiter(waitClose)
}

if waiter != nil {
if ackHandler == nil {
ackHandler = waiter
} else {
ackHandler = acker.Combine(waiter, ackHandler)
}
}

if ackHandler != nil {
producerCfg.ACK = ackHandler.ACKEvents
} else {
acker = newCloseACKer(nilACKer, client.unlink)
ackHandler = acker.Nil()
}

client.acker = acker
client.acker = ackHandler
client.waiter = waiter
client.producer = p.queue.Producer(producerCfg)

p.observer.clientConnected()
@@ -429,7 +392,7 @@ func (p *Pipeline) runSignalPropagation() {
isSig := (chosen & 1) == 1
if isSig {
client := clients[i]
client.doClose()
client.Close()
}

// remove:
@@ -470,9 +433,6 @@ func (e *pipelineEventer) OnACK(n int) {
if wc := e.waitClose; wc != nil {
wc.dec(n)
}
if e.cb != nil {
e.cb.reportQueueACK(n)
}
}

func (e *waitCloser) inc() {
323 changes: 0 additions & 323 deletions libbeat/publisher/pipeline/pipeline_ack.go

This file was deleted.

5 changes: 3 additions & 2 deletions libbeat/publisher/pipeline/stress/gen.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/atomic"
)

@@ -68,9 +69,9 @@ func generate(

logger := logp.NewLogger("publisher_pipeline_stress_generate")
if config.ACK {
settings.ACKCount = func(n int) {
settings.ACKHandler = acker.Counting(func(n int) {
logger.Infof("Pipeline client (%v) ACKS; %v", id, n)
}
})
}

if m := config.PublishMode; m != "" {
12 changes: 12 additions & 0 deletions libbeat/publisher/pipetool/pipetool.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ package pipetool
import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
)

// connectEditPipeline modifies the client configuration using edit before calling
@@ -86,6 +87,17 @@ func WithDefaultGuarantees(pipeline beat.PipelineConnector, mode beat.PublishMod
})
}

func WithACKer(pipeline beat.PipelineConnector, a beat.ACKer) beat.PipelineConnector {
return WithClientConfigEdit(pipeline, func(cfg beat.ClientConfig) (beat.ClientConfig, error) {
if h := cfg.ACKHandler; h != nil {
cfg.ACKHandler = acker.Combine(a, h)
} else {
cfg.ACKHandler = a
}
return cfg, nil
})
}

// WithClientWrapper calls wrap on beat.Client instance, after a successful
// call to `pipeline.Connect` or `pipeline.ConnectWith`. The wrap function can
// wrap the client to provide additional functionality.
4 changes: 0 additions & 4 deletions libbeat/publisher/testing/testing.go
Original file line number Diff line number Diff line change
@@ -44,10 +44,6 @@ func (pub *TestPublisher) ConnectWith(_ beat.ClientConfig) (beat.Client, error)
return pub.client, nil
}

func (pub *TestPublisher) SetACKHandler(_ beat.PipelineACKHandler) error {
panic("Not supported")
}

func NewChanClient(bufSize int) *ChanClient {
return NewChanClientWith(make(chan beat.Event, bufSize))
}
5 changes: 0 additions & 5 deletions metricbeat/cmd/test/modules.go
Original file line number Diff line number Diff line change
@@ -90,8 +90,3 @@ type publisher struct {
func newPublisher() *publisher {
return &publisher{pipeline.NewNilPipeline()}
}

// SetACKHandler is a dummy implementation of the ack handler for the test publisher.
func (*publisher) SetACKHandler(beat.PipelineACKHandler) error {
return nil
}
14 changes: 10 additions & 4 deletions winlogbeat/beater/eventlogger.go
Original file line number Diff line number Diff line change
@@ -23,10 +23,12 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

"github.com/elastic/beats/v7/winlogbeat/checkpoint"
"github.com/elastic/beats/v7/winlogbeat/eventlog"
@@ -81,24 +83,28 @@ func (e *eventLogger) connect(pipeline beat.Pipeline) (beat.Client, error) {
Processor: e.processors,
KeepNull: e.keepNull,
},
ACKCount: func(n int) {
ACKHandler: acker.Counting(func(n int) {
addPublished(api, n)
logp.Info("EventLog[%s] successfully published %d events", api, n)
},
}),
})
}

func (e *eventLogger) run(
done <-chan struct{},
pipeline beat.Pipeline,
state checkpoint.EventLogState,
acker *eventACKer,
eventACKer *eventACKer,
) {
api := e.source

// Initialize per event log metrics.
initMetrics(api.Name())

pipeline = pipetool.WithACKer(pipeline, acker.EventPrivateReporter(func(_ int, private []interface{}) {
eventACKer.ACKEvents(private)
}))

client, err := e.connect(pipeline)
if err != nil {
logp.Warn("EventLog[%s] Pipeline error. Failed to connect to publisher pipeline",
@@ -155,7 +161,7 @@ func (e *eventLogger) run(
continue
}

acker.Add(len(records))
eventACKer.Add(len(records))
for _, lr := range records {
client.Publish(lr.ToEvent())
}
8 changes: 0 additions & 8 deletions winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
@@ -133,14 +133,6 @@ func (eb *Winlogbeat) Run(b *beat.Beat) error {
// Initialize metrics.
initMetrics("total")

// setup global event ACK handler
err := eb.pipeline.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: acker.ACKEvents,
})
if err != nil {
return err
}

var wg sync.WaitGroup
for _, log := range eb.eventLogs {
state, _ := persistedState[log.source.Name()]
5 changes: 3 additions & 2 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
helper "github.com/elastic/beats/v7/libbeat/common/docker"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader"
@@ -38,9 +39,9 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade
WaitClose: 0,
}
clientLogger := logp.NewLogger("clientLogReader")
settings.ACKCount = func(n int) {
settings.ACKHandler = acker.Counting(func(n int) {
clientLogger.Debugf("Pipeline client ACKS; %v", n)
}
})
settings.PublishMode = beat.DefaultGuarantees
client, err := pipeline.ConnectWith(settings)
if err != nil {
21 changes: 12 additions & 9 deletions x-pack/filebeat/input/googlepubsub/input.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/useragent"
"github.com/elastic/beats/v7/libbeat/logp"
@@ -92,16 +93,18 @@ func NewInput(

// Build outlet for events.
in.outlet, err = connector.ConnectWith(cfg, beat.ClientConfig{
ACKEvents: func(privates []interface{}) {
for _, priv := range privates {
if msg, ok := priv.(*pubsub.Message); ok {
msg.Ack()
in.ackedCount.Inc()
} else {
in.log.Error("Failed ACKing pub/sub event")
ACKHandler: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, privates []interface{}) {
for _, priv := range privates {
if msg, ok := priv.(*pubsub.Message); ok {
msg.Ack()
in.ackedCount.Inc()
} else {
in.log.Error("Failed ACKing pub/sub event")
}
}
}
},
}),
),
})
if err != nil {
return nil, err
35 changes: 15 additions & 20 deletions x-pack/filebeat/input/googlepubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -217,7 +217,7 @@ func runTest(t *testing.T, cfg *common.Config, run func(client *pubsub.Client, i
runTestWithACKer(t, cfg, ackEvent, run)
}

func runTestWithACKer(t *testing.T, cfg *common.Config, acker acker, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) {
func runTestWithACKer(t *testing.T, cfg *common.Config, onEvent eventHandler, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) {
if !isInDockerIntegTestEnv() {
// Don't test goroutines when using our compose.EnsureUp.
defer resources.NewGoroutinesChecker().Check(t)
@@ -233,7 +233,7 @@ func runTestWithACKer(t *testing.T, cfg *common.Config, acker acker, run func(cl
defer close(inputCtx.Done)

// Stub outlet for receiving events generated by the input.
eventOutlet := newStubOutlet(acker)
eventOutlet := newStubOutlet(onEvent)
defer eventOutlet.Close()

connector := channel.ConnectorFunc(func(_ *common.Config, cliCfg beat.ClientConfig) (channel.Outleter, error) {
@@ -257,37 +257,32 @@ func newInputContext() input.Context {
}
}

type acker func(beat.Event, beat.ClientConfig) bool
type eventHandler func(beat.Event, beat.ClientConfig) bool

type stubOutleter struct {
sync.Mutex
cond *sync.Cond
done bool
Events []beat.Event
clientCfg beat.ClientConfig
acker acker
cond *sync.Cond
done bool
Events []beat.Event
clientCfg beat.ClientConfig
eventHandler eventHandler
}

func newStubOutlet(acker acker) *stubOutleter {
func newStubOutlet(onEvent eventHandler) *stubOutleter {
o := &stubOutleter{
acker: acker,
eventHandler: onEvent,
}
o.cond = sync.NewCond(o)
return o
}

func ackEvent(ev beat.Event, cfg beat.ClientConfig) bool {
switch {
case cfg.ACKCount != nil:
cfg.ACKCount(1)
case cfg.ACKEvents != nil:
evs := [1]interface{}{ev.Private}
cfg.ACKEvents(evs[:])
case cfg.ACKLastEvent != nil:
cfg.ACKLastEvent(ev.Private)
default:
if cfg.ACKHandler == nil {
return false
}

cfg.ACKHandler.AddEvent(ev, true)
cfg.ACKHandler.ACKEvents(1)
return true
}

@@ -327,7 +322,7 @@ func (o *stubOutleter) Done() <-chan struct{} { return nil }
func (o *stubOutleter) OnEvent(event beat.Event) bool {
o.Lock()
defer o.Unlock()
acked := o.acker(event, o.clientCfg)
acked := o.eventHandler(event, o.clientCfg)
if acked {
o.Events = append(o.Events, event)
o.cond.Broadcast()
19 changes: 11 additions & 8 deletions x-pack/filebeat/input/o365audit/input.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/useragent"
"github.com/elastic/beats/v7/libbeat/logp"
@@ -83,15 +84,17 @@ func newInput(

var out channel.Outleter
out, err = connector.ConnectWith(cfg, beat.ClientConfig{
ACKLastEvent: func(private interface{}) {
// Errors don't have a cursor.
if cursor, ok := private.(cursor); ok {
log.Debugf("ACKed cursor %+v", cursor)
if err := storage.Save(cursor); err != nil && err != errNoUpdate {
log.Errorf("Error saving state: %v", err)
ACKHandler: acker.ConnectionOnly(
acker.LastEventPrivateReporter(func(_ int, private interface{}) {
// Errors don't have a cursor.
if cursor, ok := private.(cursor); ok {
log.Debugf("ACKed cursor %+v", cursor)
if err := storage.Save(cursor); err != nil && err != errNoUpdate {
log.Errorf("Error saving state: %v", err)
}
}
}
},
}),
),
})
if err != nil {
return nil, err
15 changes: 9 additions & 6 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
@@ -137,13 +138,15 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con
}

out, err := connector.ConnectWith(cfg, beat.ClientConfig{
ACKEvents: func(privates []interface{}) {
for _, private := range privates {
if s3Context, ok := private.(*s3Context); ok {
s3Context.done()
ACKHandler: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, privates []interface{}) {
for _, private := range privates {
if s3Context, ok := private.(*s3Context); ok {
s3Context.done()
}
}
}
},
}),
),
})
if err != nil {
return nil, err
58 changes: 8 additions & 50 deletions x-pack/functionbeat/function/core/sync_client.go
Original file line number Diff line number Diff line change
@@ -8,7 +8,9 @@ import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
)

// Client implements the interface used by all the functionbeat function, we only implement a synchronous
@@ -49,32 +51,10 @@ func NewSyncClient(log *logp.Logger, pipeline beat.Pipeline, cfg beat.ClientConf
}
s := &SyncClient{log: log.Named("sync client")}

// Proxy any callbacks to the original client.
//
// Notes: it's not supported to have multiple callback defined, but to support any configuration
// we map all of them.
if cfg.ACKCount != nil {
s.ackCount = cfg.ACKCount
cfg.ACKCount = s.onACKCount
}

if cfg.ACKEvents != nil {
s.ackEvents = cfg.ACKEvents
cfg.ACKEvents = s.onACKEvents
}

if cfg.ACKLastEvent != nil {
s.ackLastEvent = cfg.ACKLastEvent
cfg.ACKLastEvent = nil
cfg.ACKEvents = s.onACKEvents
}

// No calls is defined on the target on the config but we still need to track
// the ack to unblock.
hasACK := cfg.ACKCount != nil || cfg.ACKEvents != nil || cfg.ACKLastEvent != nil
if !hasACK {
cfg.ACKCount = s.onACKCount
}
pipeline = pipetool.WithACKer(pipeline, acker.TrackingCounter(func(_, total int) {
log.Debugf("ack callback receives with events count of %d", total)
s.onACK(total)
}))

c, err := pipeline.ConnectWith(cfg)
if err != nil {
@@ -114,28 +94,6 @@ func (s *SyncClient) Wait() {
s.wg.Wait()
}

// AckEvents receives an array with all the event acked for this client.
func (s *SyncClient) onACKEvents(data []interface{}) {
s.log.Debugf("onACKEvents callback receives with events count of %d", len(data))
count := len(data)
if count == 0 {
return
}

s.onACKCount(count)
if s.ackEvents != nil {
s.ackEvents(data)
}

if s.ackLastEvent != nil {
s.ackLastEvent(data[len(data)-1])
}
}

func (s *SyncClient) onACKCount(c int) {
s.log.Debugf("onACKCount callback receives with events count of %d", c)
s.wg.Add(c * -1)
if s.ackCount != nil {
s.ackCount(c)
}
func (s *SyncClient) onACK(n int) {
s.wg.Add(-1 * n)
}
100 changes: 3 additions & 97 deletions x-pack/functionbeat/function/core/sync_client_test.go
Original file line number Diff line number Diff line change
@@ -49,15 +49,11 @@ func (d *dummyPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error)
return d.client, nil
}

func (d *dummyPipeline) SetACKHandler(ackhandler beat.PipelineACKHandler) error {
return nil
}

func TestSyncClient(t *testing.T) {
receiver := func(c *dummyClient, sc *SyncClient) {
select {
case i := <-c.Received:
sc.onACKEvents(make([]interface{}, i))
sc.onACK(i)
return
}
}
@@ -114,8 +110,8 @@ func TestSyncClient(t *testing.T) {
select {
case <-c.Received:
// simulate multiple acks
sc.onACKEvents(make([]interface{}, 5))
sc.onACKEvents(make([]interface{}, 5))
sc.onACK(5)
sc.onACK(5)
return
}
}(c, sc)
@@ -127,93 +123,3 @@ func TestSyncClient(t *testing.T) {
sc.Wait()
})
}

func TestCallbacksPropagation(t *testing.T) {
testCallback := func(done <-chan struct{}, config beat.ClientConfig, events []beat.Event) {
c := newDummyClient()

pipeline := newDummyPipeline(c)
sc, err := NewSyncClient(nil, pipeline, config)
if !assert.NoError(t, err) {
return
}
defer sc.Close()

go func(c *dummyClient, sc *SyncClient, events []beat.Event) {
select {
case <-c.Received:
elements := make([]interface{}, len(events))
for i, e := range events {
elements[i] = e.Private
}
sc.onACKEvents(elements)
return
}
}(c, sc, events)

err = sc.PublishAll(events)
if !assert.NoError(t, err) {
return
}

sc.Wait()
select {
case <-done:
}
}

t.Run("propagate ACKCount", func(t *testing.T) {
done := make(chan struct{})

callback := func(count int) {
assert.Equal(t, 2, count)
close(done)
}

clientConfig := beat.ClientConfig{
ACKCount: callback,
}

testCallback(done, clientConfig, make([]beat.Event, 2))
})

t.Run("propagate ACKEvents", func(t *testing.T) {
done := make(chan struct{})

callback := func(data []interface{}) {
assert.Equal(t, 2, len(data))
close(done)
}

clientConfig := beat.ClientConfig{
ACKEvents: callback,
}

testCallback(done, clientConfig, make([]beat.Event, 2))
})

t.Run("propagate ACKLastEvent", func(t *testing.T) {
done := make(chan struct{})

type s struct{ test string }

semaphore := &s{test: "hello"}

events := []beat.Event{
beat.Event{},
beat.Event{
Private: semaphore,
},
}
callback := func(data interface{}) {
assert.Equal(t, semaphore, data)
close(done)
}

clientConfig := beat.ClientConfig{
ACKLastEvent: callback,
}

testCallback(done, clientConfig, events)
})
}

0 comments on commit bb89344

Please sign in to comment.