-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
x-pack/filebeat/input/http_endpoint: make input GA (#39410)
Add new features: * end-to-end ACK * improved logging * allow placement of events at doc root Fix request trace filename handling; the filename coming from integrations may include a * which is intended to be replaced with the data stream ID. The code in place does not do this, so add it.
- Loading branch information
Showing
10 changed files
with
441 additions
and
45 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package http_endpoint | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/elastic/beats/v7/libbeat/beat" | ||
"github.com/elastic/beats/v7/libbeat/common/acker" | ||
) | ||
|
||
// newEventACKHandler returns a beat ACKer that can receive callbacks when | ||
// an event has been ACKed an output. If the event contains a private metadata | ||
// pointing to a batchACKTracker then it will invoke the tracker's ACK() method | ||
// to decrement the number of pending ACKs. | ||
func newEventACKHandler() beat.EventListener { | ||
return acker.ConnectionOnly( | ||
acker.EventPrivateReporter(func(_ int, privates []interface{}) { | ||
for _, private := range privates { | ||
if ack, ok := private.(*batchACKTracker); ok { | ||
ack.ACK() | ||
} | ||
} | ||
}), | ||
) | ||
} | ||
|
||
// batchACKTracker invokes batchACK when all events associated to the batch | ||
// have been published and acknowledged by an output. | ||
type batchACKTracker struct { | ||
batchACK func() | ||
|
||
mu sync.Mutex | ||
pending int64 | ||
} | ||
|
||
// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function | ||
// is invoked after the full batch has been acknowledged. Ready() must be invoked | ||
// after all events in the batch are published. | ||
func newBatchACKTracker(fn func()) *batchACKTracker { | ||
return &batchACKTracker{ | ||
batchACK: fn, | ||
pending: 1, // Ready() must be called to consume this "1". | ||
} | ||
} | ||
|
||
// Ready signals that the batch has been fully consumed. Only | ||
// after the batch is marked as "ready" can the batch be ACKed. | ||
// This prevents the batch from being ACKed prematurely. | ||
func (t *batchACKTracker) Ready() { | ||
t.ACK() | ||
} | ||
|
||
// Add increments the number of pending ACKs. | ||
func (t *batchACKTracker) Add() { | ||
t.mu.Lock() | ||
t.pending++ | ||
t.mu.Unlock() | ||
} | ||
|
||
// ACK decrements the number of pending event ACKs. When all pending ACKs are | ||
// received then the event batch is ACKed. | ||
func (t *batchACKTracker) ACK() { | ||
t.mu.Lock() | ||
defer t.mu.Unlock() | ||
|
||
if t.pending <= 0 { | ||
panic("misuse detected: negative ACK counter") | ||
} | ||
|
||
t.pending-- | ||
if t.pending == 0 { | ||
t.batchACK() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package http_endpoint | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestBatchACKTracker(t *testing.T) { | ||
t.Run("empty", func(t *testing.T) { | ||
tracker := make(ack) | ||
|
||
acker := newBatchACKTracker(tracker.ACK) | ||
require.False(t, tracker.wasACKed()) | ||
|
||
acker.Ready() | ||
require.True(t, tracker.wasACKed()) | ||
}) | ||
|
||
t.Run("single_event", func(t *testing.T) { | ||
tracker := make(ack) | ||
|
||
acker := newBatchACKTracker(tracker.ACK) | ||
acker.Add() | ||
acker.ACK() | ||
require.False(t, tracker.wasACKed()) | ||
|
||
acker.Ready() | ||
require.True(t, tracker.wasACKed()) | ||
}) | ||
} | ||
|
||
type ack chan struct{} | ||
|
||
func (a ack) ACK() { | ||
close(a) | ||
} | ||
|
||
func (a ack) wasACKed() bool { | ||
select { | ||
case <-a: | ||
return true | ||
default: | ||
return false | ||
} | ||
} |
Oops, something went wrong.