diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8e6611da1dd9..d1a162565fe0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -75,6 +75,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix bug with `monitoring.cluster_uuid` setting not always being exposed via GET /state Beats API. {issue}16732[16732] {pull}17420[17420] - Fix building on FreeBSD by removing build flags from `add_cloudfoundry_metadata` processor. {pull}17486[17486] - Do not rotate log files on startup when interval is configured and rotateonstartup is disabled. {pull}17613[17613] +- Fix goroutine leak and Elasticsearch output file descriptor leak when output reloading is in use. {issue}10491[10491] {pull}17381[17381] *Auditbeat* diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index b591307c444b..e1c20f795bde 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -63,11 +63,15 @@ type ConnectionSettings struct { Parameters map[string]string CompressionLevel int EscapeHTML bool - Timeout time.Duration + + Timeout time.Duration + IdleConnTimeout time.Duration } // NewConnection returns a new Elasticsearch client func NewConnection(s ConnectionSettings) (*Connection, error) { + s = settingsWithDefaults(s) + u, err := url.Parse(s.URL) if err != nil { return nil, fmt.Errorf("failed to parse elasticsearch URL: %v", err) @@ -124,6 +128,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { DialTLS: tlsDialer.Dial, TLSClientConfig: s.TLS.ToConfig(), Proxy: proxy, + IdleConnTimeout: s.IdleConnTimeout, }, Timeout: s.Timeout, }, @@ -132,6 +137,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { }, nil } +func settingsWithDefaults(s ConnectionSettings) ConnectionSettings { + settings := s + if settings.IdleConnTimeout == 0 { + settings.IdleConnTimeout = 1 * time.Minute + } + + return settings +} + // NewClients returns a list of Elasticsearch clients based on the given // configuration. It accepts the same configuration parameters as the Elasticsearch // output, except for the output specific configuration options. If multiple hosts @@ -266,6 +280,7 @@ func (conn *Connection) Ping() (string, error) { // Close closes a connection. func (conn *Connection) Close() error { + conn.HTTP.CloseIdleConnections() return nil } diff --git a/libbeat/publisher/pipeline/batch.go b/libbeat/publisher/pipeline/batch.go index 5a8903c58140..54ba2058d746 100644 --- a/libbeat/publisher/pipeline/batch.go +++ b/libbeat/publisher/pipeline/batch.go @@ -24,7 +24,13 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" ) -type Batch struct { +type Batch interface { + publisher.Batch + + reduceTTL() bool +} + +type batch struct { original queue.Batch ctx *batchContext ttl int @@ -38,17 +44,17 @@ type batchContext struct { var batchPool = sync.Pool{ New: func() interface{} { - return &Batch{} + return &batch{} }, } -func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch { +func newBatch(ctx *batchContext, original queue.Batch, ttl int) *batch { if original == nil { panic("empty batch") } - b := batchPool.Get().(*Batch) - *b = Batch{ + b := batchPool.Get().(*batch) + *b = batch{ original: original, ctx: ctx, ttl: ttl, @@ -57,45 +63,47 @@ func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch { return b } -func releaseBatch(b *Batch) { - *b = Batch{} // clear batch +func releaseBatch(b *batch) { + *b = batch{} // clear batch batchPool.Put(b) } -func (b *Batch) Events() []publisher.Event { +func (b *batch) Events() []publisher.Event { return b.events } -func (b *Batch) ACK() { - b.ctx.observer.outBatchACKed(len(b.events)) +func (b *batch) ACK() { + if b.ctx != nil { + b.ctx.observer.outBatchACKed(len(b.events)) + } b.original.ACK() releaseBatch(b) } -func (b *Batch) Drop() { +func (b *batch) Drop() { b.original.ACK() releaseBatch(b) } -func (b *Batch) Retry() { +func (b *batch) Retry() { b.ctx.retryer.retry(b) } -func (b *Batch) Cancelled() { +func (b *batch) Cancelled() { b.ctx.retryer.cancelled(b) } -func (b *Batch) RetryEvents(events []publisher.Event) { +func (b *batch) RetryEvents(events []publisher.Event) { b.updEvents(events) b.Retry() } -func (b *Batch) CancelledEvents(events []publisher.Event) { +func (b *batch) CancelledEvents(events []publisher.Event) { b.updEvents(events) b.Cancelled() } -func (b *Batch) updEvents(events []publisher.Event) { +func (b *batch) updEvents(events []publisher.Event) { l1 := len(b.events) l2 := len(events) if l1 > l2 { @@ -105,3 +113,33 @@ func (b *Batch) updEvents(events []publisher.Event) { b.events = events } + +// reduceTTL reduces the time to live for all events that have no 'guaranteed' +// sending requirements. reduceTTL returns true if the batch is still alive. +func (b *batch) reduceTTL() bool { + if b.ttl <= 0 { + return true + } + + b.ttl-- + if b.ttl > 0 { + return true + } + + // filter for evens with guaranteed send flags + events := b.events[:0] + for _, event := range b.events { + if event.Guaranteed() { + events = append(events, event) + } + } + b.events = events + + if len(b.events) > 0 { + b.ttl = -1 // we need infinite retry for all events left in this batch + return true + } + + // all events have been dropped: + return false +} diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index 4dd211052c25..a5c4a97e25ab 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -138,7 +138,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { var ( out workQueue - batch *Batch + batch Batch paused = true ) @@ -154,7 +154,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { } paused = c.paused() - if !paused && c.out != nil && batch != nil { + if c.out != nil && batch != nil { out = c.out.workQueue } else { out = nil @@ -195,6 +195,9 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { handleSignal(sig) case out <- batch: batch = nil + if paused { + out = nil + } } } } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 05bd65338a98..837a70eab77b 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) @@ -34,7 +35,8 @@ type outputController struct { monitors Monitors observer outputObserver - queue queue.Queue + queue queue.Queue + workQueue workQueue retryer *retryer consumer *eventConsumer @@ -50,7 +52,7 @@ type outputGroup struct { timeToLive int // event lifetime } -type workQueue chan *Batch +type workQueue chan publisher.Batch // outputWorker instances pass events from the shared workQueue to the outputs.Client // instances. @@ -62,18 +64,19 @@ func newOutputController( beat beat.Info, monitors Monitors, observer outputObserver, - b queue.Queue, + queue queue.Queue, ) *outputController { c := &outputController{ - beat: beat, - monitors: monitors, - observer: observer, - queue: b, + beat: beat, + monitors: monitors, + observer: observer, + queue: queue, + workQueue: makeWorkQueue(), } ctx := &batchContext{} - c.consumer = newEventConsumer(monitors.Logger, b, ctx) - c.retryer = newRetryer(monitors.Logger, observer, nil, c.consumer) + c.consumer = newEventConsumer(monitors.Logger, queue, ctx) + c.retryer = newRetryer(monitors.Logger, observer, c.workQueue, c.consumer) ctx.observer = observer ctx.retryer = c.retryer @@ -86,27 +89,26 @@ func (c *outputController) Close() error { c.consumer.sigPause() c.consumer.close() c.retryer.close() + close(c.workQueue) if c.out != nil { for _, out := range c.out.outputs { out.Close() } - close(c.out.workQueue) } return nil } func (c *outputController) Set(outGrp outputs.Group) { - // create new outputGroup with shared work queue + // create new output group with the shared work queue clients := outGrp.Clients - queue := makeWorkQueue() worker := make([]outputWorker, len(clients)) for i, client := range clients { - worker[i] = makeClientWorker(c.observer, queue, client) + worker[i] = makeClientWorker(c.observer, c.workQueue, client) } grp := &outputGroup{ - workQueue: queue, + workQueue: c.workQueue, outputs: worker, timeToLive: outGrp.Retry + 1, batchSize: outGrp.BatchSize, @@ -119,7 +121,6 @@ func (c *outputController) Set(outGrp outputs.Group) { c.retryer.sigOutputRemoved() } } - c.retryer.updOutput(queue) for range clients { c.retryer.sigOutputAdded() } @@ -141,7 +142,7 @@ func (c *outputController) Set(outGrp outputs.Group) { } func makeWorkQueue() workQueue { - return workQueue(make(chan *Batch, 0)) + return workQueue(make(chan publisher.Batch, 0)) } // Reload the output diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go new file mode 100644 index 000000000000..32bdc54109a2 --- /dev/null +++ b/libbeat/publisher/pipeline/controller_test.go @@ -0,0 +1,114 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "sync" + "testing" + "testing/quick" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" + "github.com/elastic/beats/v7/libbeat/tests/resources" + + "github.com/stretchr/testify/require" +) + +func TestOutputReload(t *testing.T) { + tests := map[string]func(mockPublishFn) outputs.Client{ + "client": newMockClient, + "network_client": newMockNetworkClient, + } + + for name, ctor := range tests { + t.Run(name, func(t *testing.T) { + seedPRNG(t) + + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + err := quick.Check(func(q uint) bool { + numEventsToPublish := 15000 + (q % 500) // 15000 to 19999 + numOutputReloads := 350 + (q % 150) // 350 to 499 + + queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) { + return memqueue.NewQueue( + logp.L(), + memqueue.Settings{ + ACKListener: ackListener, + Events: int(numEventsToPublish), + }), nil + } + + var publishedCount atomic.Uint + countingPublishFn := func(batch publisher.Batch) error { + publishedCount.Add(uint(len(batch.Events()))) + return nil + } + + pipeline, err := New( + beat.Info{}, + Monitors{}, + queueFactory, + outputs.Group{}, + Settings{}, + ) + require.NoError(t, err) + defer pipeline.Close() + + pipelineClient, err := pipeline.Connect() + require.NoError(t, err) + defer pipelineClient.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + for i := uint(0); i < numEventsToPublish; i++ { + pipelineClient.Publish(beat.Event{}) + } + wg.Done() + }() + + for i := uint(0); i < numOutputReloads; i++ { + outputClient := ctor(countingPublishFn) + out := outputs.Group{ + Clients: []outputs.Client{outputClient}, + } + pipeline.output.Set(out) + } + + wg.Wait() + + timeout := 20 * time.Second + return waitUntilTrue(timeout, func() bool { + return uint(numEventsToPublish) == publishedCount.Load() + }) + }, &quick.Config{MaxCount: 25}) + + if err != nil { + t.Error(err) + } + }) + } +} diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 02ec2975db6f..fa2ce73a28ce 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -18,25 +18,27 @@ package pipeline import ( - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" ) -// clientWorker manages output client of type outputs.Client, not supporting reconnect. -type clientWorker struct { +type worker struct { + id uint observer outputObserver qu workQueue - client outputs.Client - closed atomic.Bool + done chan struct{} +} + +// clientWorker manages output client of type outputs.Client, not supporting reconnect. +type clientWorker struct { + worker + client outputs.Client } // netClientWorker manages reconnectable output clients of type outputs.NetworkClient. type netClientWorker struct { - observer outputObserver - qu workQueue - client outputs.NetworkClient - closed atomic.Bool + worker + client outputs.NetworkClient batchSize int batchSizer func() int @@ -44,96 +46,114 @@ type netClientWorker struct { } func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker { + w := worker{ + observer: observer, + qu: qu, + done: make(chan struct{}), + } + + var c interface { + outputWorker + run() + } + if nc, ok := client.(outputs.NetworkClient); ok { - c := &netClientWorker{ - observer: observer, - qu: qu, - client: nc, - logger: logp.NewLogger("publisher_pipeline_output"), + c = &netClientWorker{ + worker: w, + client: nc, + logger: logp.NewLogger("publisher_pipeline_output"), } - go c.run() - return c + } else { + c = &clientWorker{worker: w, client: client} } - c := &clientWorker{observer: observer, qu: qu, client: client} + go c.run() return c } +func (w *worker) close() { + close(w.done) +} + func (w *clientWorker) Close() error { - w.closed.Store(true) + w.worker.close() return w.client.Close() } func (w *clientWorker) run() { - for !w.closed.Load() { - for batch := range w.qu { - if w.closed.Load() { - if batch != nil { - batch.Cancelled() - } - return - } + for { + // We wait for either the worker to be closed or for there to be a batch of + // events to publish. + select { + + case <-w.done: + return - w.observer.outBatchSend(len(batch.events)) + case batch := <-w.qu: + if batch == nil { + continue + } + w.observer.outBatchSend(len(batch.Events())) if err := w.client.Publish(batch); err != nil { - break + return } } } } func (w *netClientWorker) Close() error { - w.closed.Store(true) + w.worker.close() return w.client.Close() } func (w *netClientWorker) run() { - for !w.closed.Load() { - reconnectAttempts := 0 - - // start initial connect loop from first batch, but return - // batch to pipeline for other outputs to catch up while we're trying to connect - for batch := range w.qu { - batch.Cancelled() + var ( + connected = false + reconnectAttempts = 0 + ) - if w.closed.Load() { - w.logger.Infof("Closed connection to %v", w.client) - return - } + for { + // We wait for either the worker to be closed or for there to be a batch of + // events to publish. + select { - if reconnectAttempts > 0 { - w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) - } else { - w.logger.Infof("Connecting to %v", w.client) - } + case <-w.done: + return - err := w.client.Connect() - if err != nil { - w.logger.Errorf("Failed to connect to %v: %v", w.client, err) - reconnectAttempts++ + case batch := <-w.qu: + if batch == nil { continue } - w.logger.Infof("Connection to %v established", w.client) - reconnectAttempts = 0 - break - } + // Try to (re)connect so we can publish batch + if !connected { + // Return batch to other output workers while we try to (re)connect + batch.Cancelled() - // send loop - for batch := range w.qu { - if w.closed.Load() { - if batch != nil { - batch.Cancelled() + if reconnectAttempts == 0 { + w.logger.Infof("Connecting to %v", w.client) + } else { + w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) } - return + + err := w.client.Connect() + connected = err == nil + if connected { + w.logger.Infof("Connection to %v established", w.client) + reconnectAttempts = 0 + } else { + w.logger.Errorf("Failed to connect to %v: %v", w.client, err) + reconnectAttempts++ + } + + continue } - err := w.client.Publish(batch) - if err != nil { + if err := w.client.Publish(batch); err != nil { w.logger.Errorf("Failed to publish events: %v", err) // on error return to connect loop - break + connected = false } } } diff --git a/libbeat/publisher/pipeline/output_test.go b/libbeat/publisher/pipeline/output_test.go index d89c166ee155..5f471ddf3961 100644 --- a/libbeat/publisher/pipeline/output_test.go +++ b/libbeat/publisher/pipeline/output_test.go @@ -18,9 +18,7 @@ package pipeline import ( - "flag" "math" - "math/rand" "sync" "testing" "testing/quick" @@ -32,11 +30,6 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue" -) - -var ( - SeedFlag = flag.Int64("seed", 0, "Randomization seed") ) func TestMakeClientWorker(t *testing.T) { @@ -51,6 +44,11 @@ func TestMakeClientWorker(t *testing.T) { err := quick.Check(func(i uint) bool { numBatches := 300 + (i % 100) // between 300 and 399 + numEvents := atomic.MakeUint(0) + + wqu := makeWorkQueue() + retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil) + defer retryer.close() var published atomic.Uint publishFn := func(batch publisher.Batch) error { @@ -58,13 +56,13 @@ func TestMakeClientWorker(t *testing.T) { return nil } - wqu := makeWorkQueue() client := ctor(publishFn) - makeClientWorker(nilObserver, wqu, client) - numEvents := atomic.MakeUint(0) - for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { - batch := randomBatch(50, 150, wqu) + worker := makeClientWorker(nilObserver, wqu, client) + defer worker.Close() + + for i := uint(0); i < numBatches; i++ { + batch := randomBatch(50, 150).withRetryer(retryer) numEvents.Add(uint(len(batch.Events()))) wqu <- batch } @@ -85,13 +83,14 @@ func TestMakeClientWorker(t *testing.T) { } } -func TestMakeClientWorkerAndClose(t *testing.T) { +func TestReplaceClientWorker(t *testing.T) { tests := map[string]func(mockPublishFn) outputs.Client{ "client": newMockClient, "network_client": newMockNetworkClient, } const minEventsInBatch = 50 + const maxEventsInBatch = 150 for name, ctor := range tests { t.Run(name, func(t *testing.T) { @@ -101,21 +100,28 @@ func TestMakeClientWorkerAndClose(t *testing.T) { numBatches := 1000 + (i % 100) // between 1000 and 1099 wqu := makeWorkQueue() - numEvents := atomic.MakeUint(0) + retryer := newRetryer(logp.NewLogger("test"), nilObserver, wqu, nil) + defer retryer.close() + + var batches []publisher.Batch + var numEvents int + for i := uint(0); i < numBatches; i++ { + batch := randomBatch(minEventsInBatch, maxEventsInBatch).withRetryer(retryer) + numEvents += batch.Len() + batches = append(batches, batch) + } var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { - batch := randomBatch(minEventsInBatch, 150, wqu) - numEvents.Add(uint(len(batch.Events()))) + for _, batch := range batches { wqu <- batch } }() // Publish at least 1 batch worth of events but no more than 20% events - publishLimit := uint(math.Max(minEventsInBatch, float64(numEvents.Load())*0.2)) + publishLimit := uint(math.Max(minEventsInBatch, float64(numEvents)*0.2)) var publishedFirst atomic.Uint blockCtrl := make(chan struct{}) @@ -145,6 +151,7 @@ func TestMakeClientWorkerAndClose(t *testing.T) { // Close worker before all batches have had time to be published err := worker.Close() require.NoError(t, err) + close(blockCtrl) // Start new worker to drain work queue @@ -161,7 +168,7 @@ func TestMakeClientWorkerAndClose(t *testing.T) { // Make sure that all events have eventually been published timeout = 20 * time.Second return waitUntilTrue(timeout, func() bool { - return numEvents.Load() == publishedFirst.Load()+publishedLater.Load() + return numEvents == int(publishedFirst.Load()+publishedLater.Load()) }) }, &quick.Config{MaxCount: 25}) @@ -171,91 +178,3 @@ func TestMakeClientWorkerAndClose(t *testing.T) { }) } } - -type mockPublishFn func(publisher.Batch) error - -func newMockClient(publishFn mockPublishFn) outputs.Client { - return &mockClient{publishFn: publishFn} -} - -type mockClient struct { - publishFn mockPublishFn -} - -func (c *mockClient) String() string { return "mock_client" } -func (c *mockClient) Close() error { return nil } -func (c *mockClient) Publish(batch publisher.Batch) error { - return c.publishFn(batch) -} - -func newMockNetworkClient(publishFn mockPublishFn) outputs.Client { - return &mockNetworkClient{newMockClient(publishFn)} -} - -type mockNetworkClient struct { - outputs.Client -} - -func (c *mockNetworkClient) Connect() error { return nil } - -type mockQueue struct{} - -func (q mockQueue) Close() error { return nil } -func (q mockQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} } -func (q mockQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return mockProducer{} } -func (q mockQueue) Consumer() queue.Consumer { return mockConsumer{} } - -type mockProducer struct{} - -func (p mockProducer) Publish(event publisher.Event) bool { return true } -func (p mockProducer) TryPublish(event publisher.Event) bool { return true } -func (p mockProducer) Cancel() int { return 0 } - -type mockConsumer struct{} - -func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &Batch{}, nil } -func (c mockConsumer) Close() error { return nil } - -func randomBatch(min, max int, wqu workQueue) *Batch { - numEvents := randIntBetween(min, max) - events := make([]publisher.Event, numEvents) - - consumer := newEventConsumer(logp.L(), mockQueue{}, &batchContext{}) - retryer := newRetryer(logp.L(), nilObserver, wqu, consumer) - - batch := Batch{ - events: events, - ctx: &batchContext{ - observer: nilObserver, - retryer: retryer, - }, - } - - return &batch -} - -// randIntBetween returns a random integer in [min, max) -func randIntBetween(min, max int) int { - return rand.Intn(max-min) + min -} - -func seedPRNG(t *testing.T) { - seed := *SeedFlag - if seed == 0 { - seed = time.Now().UnixNano() - } - - t.Logf("reproduce test with `go test ... -seed %v`", seed) - rand.Seed(seed) -} - -func waitUntilTrue(duration time.Duration, fn func() bool) bool { - end := time.Now().Add(duration) - for time.Now().Before(end) { - if fn() { - return true - } - time.Sleep(1 * time.Millisecond) - } - return false -} diff --git a/libbeat/publisher/pipeline/retry.go b/libbeat/publisher/pipeline/retry.go index a65a7d227c8c..0d724e80278b 100644 --- a/libbeat/publisher/pipeline/retry.go +++ b/libbeat/publisher/pipeline/retry.go @@ -36,7 +36,7 @@ type retryer struct { done chan struct{} - consumer *eventConsumer + consumer interruptor sig chan retryerSignal out workQueue @@ -44,6 +44,11 @@ type retryer struct { doneWaiter sync.WaitGroup } +type interruptor interface { + sigWait() + sigUnWait() +} + type retryQueue chan batchEvent type retryerSignal struct { @@ -53,7 +58,7 @@ type retryerSignal struct { type batchEvent struct { tag retryerBatchTag - batch *Batch + batch Batch } type retryerEventTag uint8 @@ -75,7 +80,7 @@ func newRetryer( log *logp.Logger, observer outputObserver, out workQueue, - c *eventConsumer, + c interruptor, ) *retryer { r := &retryer{ logger: log, @@ -106,18 +111,11 @@ func (r *retryer) sigOutputRemoved() { r.sig <- retryerSignal{tag: sigRetryerOutputRemoved} } -func (r *retryer) updOutput(ch workQueue) { - r.sig <- retryerSignal{ - tag: sigRetryerUpdateOutput, - channel: ch, - } -} - -func (r *retryer) retry(b *Batch) { +func (r *retryer) retry(b Batch) { r.in <- batchEvent{tag: retryBatch, batch: b} } -func (r *retryer) cancelled(b *Batch) { +func (r *retryer) cancelled(b Batch) { r.in <- batchEvent{tag: cancelledBatch, batch: b} } @@ -127,9 +125,9 @@ func (r *retryer) loop() { out workQueue consumerBlocked bool - active *Batch + active Batch activeSize int - buffer []*Batch + buffer []Batch numOutputs int log = r.logger @@ -144,21 +142,22 @@ func (r *retryer) loop() { countFailed int countDropped int batch = evt.batch - countRetry = len(batch.events) + countRetry = len(batch.Events()) + alive = true ) if evt.tag == retryBatch { - countFailed = len(batch.events) + countFailed = len(batch.Events()) r.observer.eventsFailed(countFailed) - decBatch(batch) + alive = batch.reduceTTL() - countRetry = len(batch.events) + countRetry = len(batch.Events()) countDropped = countFailed - countRetry r.observer.eventsDropped(countDropped) } - if len(batch.events) == 0 { + if !alive { log.Info("Drop batch") batch.Drop() } else { @@ -166,14 +165,9 @@ func (r *retryer) loop() { buffer = append(buffer, batch) out = r.out active = buffer[0] - activeSize = len(active.events) + activeSize = len(active.Events()) if !consumerBlocked { - consumerBlocked = blockConsumer(numOutputs, len(buffer)) - if consumerBlocked { - log.Info("retryer: send wait signal to consumer") - r.consumer.sigWait() - log.Info(" done") - } + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) } } @@ -187,51 +181,53 @@ func (r *retryer) loop() { out = nil } else { active = buffer[0] - activeSize = len(active.events) + activeSize = len(active.Events()) } if consumerBlocked { - consumerBlocked = blockConsumer(numOutputs, len(buffer)) - if !consumerBlocked { - log.Info("retryer: send unwait-signal to consumer") - r.consumer.sigUnWait() - log.Info(" done") - } + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) } case sig := <-r.sig: switch sig.tag { - case sigRetryerUpdateOutput: - r.out = sig.channel case sigRetryerOutputAdded: numOutputs++ + if consumerBlocked { + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) + } case sigRetryerOutputRemoved: numOutputs-- + if !consumerBlocked { + consumerBlocked = r.checkConsumerBlock(numOutputs, len(buffer)) + } } } } } -func blockConsumer(numOutputs, numBatches int) bool { - return numBatches/3 >= numOutputs -} - -func decBatch(batch *Batch) { - if batch.ttl <= 0 { - return +func (r *retryer) checkConsumerBlock(numOutputs, numBatches int) bool { + consumerBlocked := blockConsumer(numOutputs, numBatches) + if r.consumer == nil { + return consumerBlocked } - batch.ttl-- - if batch.ttl > 0 { - return - } - - // filter for evens with guaranteed send flags - events := batch.events[:0] - for _, event := range batch.events { - if event.Guaranteed() { - events = append(events, event) + if consumerBlocked { + r.logger.Info("retryer: send wait signal to consumer") + if r.consumer != nil { + r.consumer.sigWait() + } + r.logger.Info(" done") + } else { + r.logger.Info("retryer: send unwait signal to consumer") + if r.consumer != nil { + r.consumer.sigUnWait() } + r.logger.Info(" done") } - batch.events = events + + return consumerBlocked +} + +func blockConsumer(numOutputs, numBatches int) bool { + return numBatches/3 >= numOutputs } diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go new file mode 100644 index 000000000000..1d5c2b908ff0 --- /dev/null +++ b/libbeat/publisher/pipeline/testing.go @@ -0,0 +1,176 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import ( + "flag" + "math/rand" + "sync" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +var ( + SeedFlag = flag.Int64("seed", 0, "Randomization seed") +) + +type mockPublishFn func(publisher.Batch) error + +func newMockClient(publishFn mockPublishFn) outputs.Client { + return &mockClient{publishFn: publishFn} +} + +type mockClient struct { + publishFn mockPublishFn +} + +func (c *mockClient) String() string { return "mock_client" } +func (c *mockClient) Close() error { return nil } +func (c *mockClient) Publish(batch publisher.Batch) error { + return c.publishFn(batch) +} + +func newMockNetworkClient(publishFn mockPublishFn) outputs.Client { + return &mockNetworkClient{newMockClient(publishFn)} +} + +type mockNetworkClient struct { + outputs.Client +} + +func (c *mockNetworkClient) Connect() error { return nil } + +type mockQueue struct{} + +func (q mockQueue) Close() error { return nil } +func (q mockQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} } +func (q mockQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return mockProducer{} } +func (q mockQueue) Consumer() queue.Consumer { return mockConsumer{} } + +type mockProducer struct{} + +func (p mockProducer) Publish(event publisher.Event) bool { return true } +func (p mockProducer) TryPublish(event publisher.Event) bool { return true } +func (p mockProducer) Cancel() int { return 0 } + +type mockConsumer struct{} + +func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &batch{}, nil } +func (c mockConsumer) Close() error { return nil } + +type mockBatch struct { + mu sync.Mutex + events []publisher.Event + + onEvents func() + onACK func() + onDrop func() + onRetry func() + onCancelled func() + onReduceTTL func() bool +} + +func (b *mockBatch) Events() []publisher.Event { + b.mu.Lock() + defer b.mu.Unlock() + signalFn(b.onEvents) + return b.events +} + +func (b *mockBatch) ACK() { signalFn(b.onACK) } +func (b *mockBatch) Drop() { signalFn(b.onDrop) } +func (b *mockBatch) Retry() { signalFn(b.onRetry) } +func (b *mockBatch) Cancelled() { signalFn(b.onCancelled) } +func (b *mockBatch) RetryEvents(events []publisher.Event) { b.updateEvents(events); signalFn(b.onRetry) } + +func (b *mockBatch) reduceTTL() bool { + if b.onReduceTTL != nil { + return b.onReduceTTL() + } + return true +} + +func (b *mockBatch) CancelledEvents(events []publisher.Event) { + b.updateEvents(events) + signalFn(b.onCancelled) +} + +func (b *mockBatch) updateEvents(events []publisher.Event) { + b.mu.Lock() + defer b.mu.Unlock() + b.events = events +} + +func (b *mockBatch) Len() int { + b.mu.Lock() + defer b.mu.Unlock() + return len(b.events) +} + +func (b *mockBatch) withRetryer(r *retryer) *mockBatch { + return &mockBatch{ + events: b.events, + onACK: b.onACK, + onDrop: b.onDrop, + onRetry: func() { r.retry(b) }, + onCancelled: func() { r.cancelled(b) }, + onReduceTTL: b.onReduceTTL, + } +} + +func signalFn(fn func()) { + if fn != nil { + fn() + } +} + +func randomBatch(min, max int) *mockBatch { + return &mockBatch{ + events: make([]publisher.Event, randIntBetween(min, max)), + } +} + +// randIntBetween returns a random integer in [min, max) +func randIntBetween(min, max int) int { + return rand.Intn(max-min) + min +} + +func seedPRNG(t *testing.T) { + seed := *SeedFlag + if seed == 0 { + seed = time.Now().UnixNano() + } + + t.Logf("reproduce test with `go test ... -seed %v`", seed) + rand.Seed(seed) +} + +func waitUntilTrue(duration time.Duration, fn func() bool) bool { + end := time.Now().Add(duration) + for time.Now().Before(end) { + if fn() { + return true + } + time.Sleep(10 * time.Millisecond) + } + return false +}