Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with output reloading #17381

Merged
merged 34 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ccefba0
Refactoring: extracting common fields into worker struct
ycombinator Mar 31, 2020
7e5e57c
More refactoring
ycombinator Mar 31, 2020
89aa8cb
Address goroutine leak in publisher
ycombinator Apr 1, 2020
03db189
Workaround: add Connection: close header to prevent FD leaks
ycombinator Apr 1, 2020
6d160f4
Adding CHANGELOG entry
ycombinator Apr 1, 2020
c8a16c6
Adding IdleConnTimeout setting
ycombinator Apr 1, 2020
f28d0b7
Close idle connections when ES client is closed
ycombinator Apr 1, 2020
d9bc4e5
When closing worker, make sure to cancel in-flight batches
ycombinator Apr 9, 2020
5d3b10e
Cancel batch + guard
ycombinator Apr 10, 2020
5c4daff
[WIP] Adding output reload test
ycombinator Apr 10, 2020
71b1d55
More WIP
ycombinator Apr 10, 2020
9ce4b54
Update test
ycombinator Apr 16, 2020
014fd25
Try to get test passing for client first
ycombinator Apr 16, 2020
5c5e48a
Make workqueue shared
ycombinator Apr 16, 2020
6227a1f
Making tests pass
ycombinator Apr 16, 2020
ffd0822
Clean up
ycombinator Apr 16, 2020
b97becc
Moving SeedFlag var to correct place
ycombinator Apr 16, 2020
3aeebda
Clarifying comments
ycombinator Apr 16, 2020
1208332
Reducing the number of quick iterations
ycombinator Apr 16, 2020
2039344
Reducing quick iterations even more
ycombinator Apr 17, 2020
8a03547
Trying just 1 iteration
ycombinator Apr 17, 2020
e31e53a
Setting out to nil after sending batch if paused
ycombinator Apr 17, 2020
dbf275a
Restoring old order of operations in Set()
ycombinator Apr 17, 2020
a36cbb1
proposal
Apr 17, 2020
229623f
Do not copy mutex
ycombinator Apr 17, 2020
f09c5e0
Remove debugging statements
ycombinator Apr 17, 2020
896505c
Bumping up testing/quick max count on TestOutputReload
ycombinator Apr 17, 2020
a59a56e
Removing commented out helper function
ycombinator Apr 17, 2020
041922b
Simplifying retryer now that workqueue is used across output loads
ycombinator Apr 17, 2020
5ff7219
Renaming parameter
ycombinator Apr 20, 2020
15a1383
Removing debugging variable
ycombinator Apr 21, 2020
6e10331
Reducing lower bound of random # of batches
ycombinator Apr 21, 2020
6a6680c
Removing sigUnWait from controller
ycombinator Apr 21, 2020
09ccd8d
Removing unused field
ycombinator Apr 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix bug with `monitoring.cluster_uuid` setting not always being exposed via GET /state Beats API. {issue}16732[16732] {pull}17420[17420]
- Fix building on FreeBSD by removing build flags from `add_cloudfoundry_metadata` processor. {pull}17486[17486]
- Do not rotate log files on startup when interval is configured and rotateonstartup is disabled. {pull}17613[17613]
- Fix goroutine leak and Elasticsearch output file descriptor leak when output reloading is in use. {issue}10491[10491] {pull}17381[17381]

*Auditbeat*

Expand Down
17 changes: 16 additions & 1 deletion libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ type ConnectionSettings struct {
Parameters map[string]string
CompressionLevel int
EscapeHTML bool
Timeout time.Duration

Timeout time.Duration
IdleConnTimeout time.Duration
}

// NewConnection returns a new Elasticsearch client
func NewConnection(s ConnectionSettings) (*Connection, error) {
s = settingsWithDefaults(s)

u, err := url.Parse(s.URL)
if err != nil {
return nil, fmt.Errorf("failed to parse elasticsearch URL: %v", err)
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
DialTLS: tlsDialer.Dial,
TLSClientConfig: s.TLS.ToConfig(),
Proxy: proxy,
IdleConnTimeout: s.IdleConnTimeout,
},
Timeout: s.Timeout,
},
Expand All @@ -132,6 +137,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
}, nil
}

func settingsWithDefaults(s ConnectionSettings) ConnectionSettings {
settings := s
if settings.IdleConnTimeout == 0 {
settings.IdleConnTimeout = 1 * time.Minute
}

return settings
}

// NewClients returns a list of Elasticsearch clients based on the given
// configuration. It accepts the same configuration parameters as the Elasticsearch
// output, except for the output specific configuration options. If multiple hosts
Expand Down Expand Up @@ -266,6 +280,7 @@ func (conn *Connection) Ping() (string, error) {

// Close closes a connection.
func (conn *Connection) Close() error {
conn.HTTP.CloseIdleConnections()
return nil
}

Expand Down
70 changes: 54 additions & 16 deletions libbeat/publisher/pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

type Batch struct {
type Batch interface {
publisher.Batch

reduceTTL() bool
}

type batch struct {
original queue.Batch
ctx *batchContext
ttl int
Expand All @@ -38,17 +44,17 @@ type batchContext struct {

var batchPool = sync.Pool{
New: func() interface{} {
return &Batch{}
return &batch{}
},
}

func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch {
func newBatch(ctx *batchContext, original queue.Batch, ttl int) *batch {
if original == nil {
panic("empty batch")
}

b := batchPool.Get().(*Batch)
*b = Batch{
b := batchPool.Get().(*batch)
*b = batch{
original: original,
ctx: ctx,
ttl: ttl,
Expand All @@ -57,45 +63,47 @@ func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch {
return b
}

func releaseBatch(b *Batch) {
*b = Batch{} // clear batch
func releaseBatch(b *batch) {
*b = batch{} // clear batch
batchPool.Put(b)
}

func (b *Batch) Events() []publisher.Event {
func (b *batch) Events() []publisher.Event {
return b.events
}

func (b *Batch) ACK() {
b.ctx.observer.outBatchACKed(len(b.events))
func (b *batch) ACK() {
if b.ctx != nil {
b.ctx.observer.outBatchACKed(len(b.events))
}
b.original.ACK()
releaseBatch(b)
}

func (b *Batch) Drop() {
func (b *batch) Drop() {
b.original.ACK()
releaseBatch(b)
}

func (b *Batch) Retry() {
func (b *batch) Retry() {
b.ctx.retryer.retry(b)
}

func (b *Batch) Cancelled() {
func (b *batch) Cancelled() {
b.ctx.retryer.cancelled(b)
}

func (b *Batch) RetryEvents(events []publisher.Event) {
func (b *batch) RetryEvents(events []publisher.Event) {
b.updEvents(events)
b.Retry()
}

func (b *Batch) CancelledEvents(events []publisher.Event) {
func (b *batch) CancelledEvents(events []publisher.Event) {
b.updEvents(events)
b.Cancelled()
}

func (b *Batch) updEvents(events []publisher.Event) {
func (b *batch) updEvents(events []publisher.Event) {
l1 := len(b.events)
l2 := len(events)
if l1 > l2 {
Expand All @@ -105,3 +113,33 @@ func (b *Batch) updEvents(events []publisher.Event) {

b.events = events
}

// reduceTTL reduces the time to live for all events that have no 'guaranteed'
// sending requirements. reduceTTL returns true if the batch is still alive.
func (b *batch) reduceTTL() bool {
if b.ttl <= 0 {
return true
}

b.ttl--
if b.ttl > 0 {
return true
}

// filter for evens with guaranteed send flags
events := b.events[:0]
for _, event := range b.events {
if event.Guaranteed() {
events = append(events, event)
}
}
b.events = events

if len(b.events) > 0 {
b.ttl = -1 // we need infinite retry for all events left in this batch
return true
}

// all events have been dropped:
return false
}
7 changes: 5 additions & 2 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {

var (
out workQueue
batch *Batch
batch Batch
paused = true
)

Expand All @@ -154,7 +154,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
}

paused = c.paused()
if !paused && c.out != nil && batch != nil {
if c.out != nil && batch != nil {
Copy link
Contributor Author

@ycombinator ycombinator Apr 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This turned out to be a key change in getting the test to pass (i.e. ensuring that all batches are eventually published despite rapid reloading of outputs multiple times).

Consider the case where the consumer is paused (so paused == true here) but it still has a batch of events that it has consumed from the queue but not yet sent to the workqueue. In this scenario because of the !paused clause here, we would fall into the else clause, which would set out = nil. This would then cause the last select in this method to block (unless, by luck of timing, a signal to unpause the consumer just happened to come along).

Now, if we have a valid output group c.out and a batch, we don't pay attention to the paused state; this ensures that this batch will be send to that output group's work queue via that final select's final case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow nice fix!

out = c.out.workQueue
} else {
out = nil
Expand Down Expand Up @@ -195,6 +195,9 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
handleSignal(sig)
case out <- batch:
batch = nil
if paused {
out = nil
}
}
}
}
Expand Down
30 changes: 16 additions & 14 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

Expand All @@ -34,7 +35,8 @@ type outputController struct {
monitors Monitors
observer outputObserver

queue queue.Queue
queue queue.Queue
workQueue workQueue
Copy link
Contributor Author

@ycombinator ycombinator Apr 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another change I made, to simplify things. Now the output controller creates a work queue in it's constructor and the same one is used by all output workers across time. This removes the need to drain the old work queue to a new one when outputs are reloaded and new output workers are created in the process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good change!


retryer *retryer
consumer *eventConsumer
Expand All @@ -50,7 +52,7 @@ type outputGroup struct {
timeToLive int // event lifetime
}

type workQueue chan *Batch
type workQueue chan publisher.Batch

// outputWorker instances pass events from the shared workQueue to the outputs.Client
// instances.
Expand All @@ -65,15 +67,16 @@ func newOutputController(
b queue.Queue,
) *outputController {
c := &outputController{
beat: beat,
monitors: monitors,
observer: observer,
queue: b,
beat: beat,
monitors: monitors,
observer: observer,
queue: b,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's funny. Why is the parameter named b ? As we've modified this piece of code, leats clean up the naming a little :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5ff7219.

workQueue: makeWorkQueue(),
}

ctx := &batchContext{}
c.consumer = newEventConsumer(monitors.Logger, b, ctx)
c.retryer = newRetryer(monitors.Logger, observer, nil, c.consumer)
c.retryer = newRetryer(monitors.Logger, observer, c.workQueue, c.consumer)
ctx.observer = observer
ctx.retryer = c.retryer

Expand All @@ -86,27 +89,26 @@ func (c *outputController) Close() error {
c.consumer.sigPause()
c.consumer.close()
c.retryer.close()
close(c.workQueue)

if c.out != nil {
for _, out := range c.out.outputs {
out.Close()
}
close(c.out.workQueue)
}

return nil
}

func (c *outputController) Set(outGrp outputs.Group) {
// create new outputGroup with shared work queue
// create new output group with the shared work queue
clients := outGrp.Clients
queue := makeWorkQueue()
worker := make([]outputWorker, len(clients))
for i, client := range clients {
worker[i] = makeClientWorker(c.observer, queue, client)
worker[i] = makeClientWorker(c.observer, c.workQueue, client)
}
grp := &outputGroup{
workQueue: queue,
workQueue: c.workQueue,
outputs: worker,
timeToLive: outGrp.Retry + 1,
batchSize: outGrp.BatchSize,
Expand All @@ -119,7 +121,6 @@ func (c *outputController) Set(outGrp outputs.Group) {
c.retryer.sigOutputRemoved()
}
}
c.retryer.updOutput(queue)
for range clients {
c.retryer.sigOutputAdded()
}
Expand All @@ -136,12 +137,13 @@ func (c *outputController) Set(outGrp outputs.Group) {

// restart consumer (potentially blocked by retryer)
c.consumer.sigContinue()
c.consumer.sigUnWait()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigUnWait is used by the retryer, no? Why add it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that this was needed to make the TestReload test pass. Without it the retryer never un-waited the consumer by itself and so events stopped being consumed and sent to the output.

That said, I agree that this is something the retryer should automatically call internally rather than "leak" this responsibility to the controller. I'll look into why it's not doing that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this sounds like a potential bug to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 6a6680c.


c.observer.updateOutputGroup()
}

func makeWorkQueue() workQueue {
return workQueue(make(chan *Batch, 0))
return workQueue(make(chan publisher.Batch, 0))
}

// Reload the output
Expand Down
Loading