Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a proxy queue to avoid double-queueing every event when using the shipper output #34377

Merged
merged 35 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2521070
proxy queue in progress
faec Jan 19, 2023
406fabe
iterating on implementation, currently builds
faec Jan 23, 2023
a6feed0
updating the shipper output
faec Jan 24, 2023
1c66088
Add FreeEvents to publisher.Event
faec Jan 24, 2023
c05e3cd
finish shipper output refactor
faec Jan 25, 2023
efe68df
a little more cleanup
faec Jan 25, 2023
1c73bcf
finish proxy queue implementation
faec Jan 25, 2023
7ac49b0
remove queue from active configurations
faec Jan 25, 2023
c2ce99a
lint
faec Jan 25, 2023
98f83ab
remove event ID checking in memqueue tests
faec Jan 25, 2023
e210595
remove unused queue factories for global queue registry
faec Jan 25, 2023
ef8cf5c
remove queue entry ids from the proxy queue
faec Jan 25, 2023
d5012a9
lint
faec Jan 25, 2023
9a432ed
lint
faec Jan 25, 2023
42cce28
license headers
faec Jan 25, 2023
c8693dc
remove unused config code
faec Jan 26, 2023
f101f91
Rework internal api to avoid race conditions
faec Jan 27, 2023
9c6c6b0
Merge branch 'main' of github.com:elastic/beats into proxy-queue
faec Jan 30, 2023
7397b67
Some major fixes / simplifications
faec Feb 1, 2023
09623b8
broker diagram draft
faec Feb 8, 2023
d7e0b2a
More documentation, more fixed race conditions
faec Feb 10, 2023
c5b5bd4
Add more tests, fix the bugs they found
faec Feb 10, 2023
84333a3
propagate errors in the shipper output
faec Feb 10, 2023
72be5f9
lint
faec Feb 13, 2023
132e474
lint
faec Feb 14, 2023
e652cb6
make update
faec Feb 15, 2023
2e90778
Merge branch 'main' of github.com:elastic/beats into proxy-queue
faec Feb 16, 2023
87e9470
Merge branch 'main' of github.com:elastic/beats into proxy-queue
faec Feb 21, 2023
86a524b
Merge branch 'main' of github.com:elastic/beats into proxy-queue
faec Feb 22, 2023
284f848
synchronize shutdown on Close call
faec Feb 22, 2023
e9f328e
Merge branch 'main' of github.com:elastic/beats into proxy-queue
faec Feb 22, 2023
aacd2d0
fix diagram EOLs
faec Feb 23, 2023
c836beb
add test for write-after-close
faec Feb 23, 2023
202f6bb
Merge branch 'main' of github.com:elastic/beats into proxy-queue
faec Feb 23, 2023
2e0714c
fix error updating remaining acks
faec Feb 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libbeat/outputs/outest/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (b *Batch) RetryEvents(events []publisher.Event) {
b.doSignal(BatchSignal{Tag: BatchRetryEvents, Events: events})
}

func (b *Batch) FreeEntries() {}

func (b *Batch) Cancelled() {
b.doSignal(BatchSignal{Tag: BatchCancelled})
}
Expand Down
227 changes: 135 additions & 92 deletions libbeat/outputs/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,37 @@ import (
type pendingBatch struct {
batch publisher.Batch
index uint64
serverID string
eventCount int
droppedCount int
}

type shipper struct {
log *logp.Logger
observer outputs.Observer

config Config
config Config

conn *grpc.ClientConn
client sc.ProducerClient
ackClient sc.Producer_PersistedIndexClient

serverID string

pending []pendingBatch
pendingMutex sync.Mutex
// The publish function sends to ackLoopChan to notify the ack worker of
// new pending batches
ackBatchChan chan pendingBatch

// The ack RPC listener sends to ackIndexChan to notify the ack worker
// of the new persisted index
ackIndexChan chan uint64

conn *grpc.ClientConn
client sc.ProducerClient
clientMutex sync.Mutex
// ackWaitGroup is used to synchronize the shutdown of the ack listener
// and the ack worker when a connection is closed.
ackWaitGroup sync.WaitGroup

backgroundCtx context.Context
backgroundCancel func()
// ackCancel cancels the context for the ack listener and the ack worker,
// notifying them to shut down.
ackCancel context.CancelFunc
}

func init() {
Expand All @@ -91,9 +102,6 @@ func makeShipper(
config: config,
}

// for `Close` function to stop all the background work like acknowledgment loop
s.backgroundCtx, s.backgroundCancel = context.WithCancel(context.Background())

swb := outputs.WithBackoff(s, config.Backoff.Init, config.Backoff.Max)

return outputs.Success(config.BulkMaxSize, config.MaxRetries, swb)
Expand Down Expand Up @@ -121,68 +129,39 @@ func (s *shipper) Connect() error {
grpc.WithTransportCredentials(creds),
}

ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout)
defer cancel()

s.log.Debugf("trying to connect to %s...", s.config.Server)

ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, s.config.Server, opts...)
if err != nil {
return fmt.Errorf("shipper connection failed with: %w", err)
}

s.conn = conn
s.clientMutex.Lock()
defer s.clientMutex.Unlock()

s.client = sc.NewProducerClient(conn)

// we don't need a timeout context here anymore, we use the
// `s.backgroundCtx` instead, it's going to be a long running client
ackCtx, ackCancel := context.WithCancel(s.backgroundCtx)
defer func() {
// in case we return an error before we start the `ackLoop`
// then we don't need this client anymore and must close the stream
if err != nil {
ackCancel()
}
}()

indexClient, err := s.client.PersistedIndex(ackCtx, &messages.PersistedIndexRequest{
PollingInterval: durationpb.New(s.config.AckPollingInterval),
})
if err != nil {
return fmt.Errorf("failed to connect to the server: %w", err)
}
indexReply, err := indexClient.Recv()
if err != nil {
return fmt.Errorf("failed to fetch server information: %w", err)
}
s.serverID = indexReply.GetUuid()

s.log.Debugf("connection to %s (%s) established.", s.config.Server, s.serverID)

go func() {
defer ackCancel()
s.log.Debugf("starting acknowledgment loop with server %s", s.serverID)
// the loop returns only in case of error
err := s.ackLoop(s.backgroundCtx, indexClient)
s.log.Errorf("acknowledgment loop stopped: %s", err)
}()

return nil
return s.startACKLoop()
}

// Publish converts and sends a batch of events to the shipper server.
// Also, implements `outputs.Client`
func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error {
if s.client == nil {
err := s.publish(ctx, batch)
if err != nil {
// If there was an error then we are dropping our connection.
s.Close()
}
return err
}

func (s *shipper) publish(ctx context.Context, batch publisher.Batch) error {
if s.conn == nil {
return fmt.Errorf("connection is not established")
}

st := s.observer
events := batch.Events()
st.NewBatch(len(events))
s.observer.NewBatch(len(events))

toSend := make([]*messages.Event, 0, len(events))

Expand All @@ -204,7 +183,7 @@ func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error {

convertedCount := len(toSend)

st.Dropped(droppedCount)
s.observer.Dropped(droppedCount)
s.log.Debugf("%d events converted to protobuf, %d dropped", convertedCount, droppedCount)

var lastAcceptedIndex uint64
Expand All @@ -219,8 +198,8 @@ func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error {
})

if status.Code(err) != codes.OK {
batch.Cancelled() // does not decrease the TTL
st.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events
batch.Cancelled() // does not decrease the TTL
s.observer.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events
return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(toSend), err)
}

Expand All @@ -239,29 +218,31 @@ func (s *shipper) Publish(ctx context.Context, batch publisher.Batch) error {

s.log.Debugf("total of %d events have been accepted from batch, %d dropped", convertedCount, droppedCount)

s.pendingMutex.Lock()
s.pending = append(s.pending, pendingBatch{
// We've sent as much as we can to the shipper, release the batch's events and
// save it in the queue of batches awaiting acknowledgment.
batch.FreeEntries()
s.ackBatchChan <- pendingBatch{
batch: batch,
index: lastAcceptedIndex,
serverID: s.serverID,
eventCount: len(events),
droppedCount: droppedCount,
})
s.pendingMutex.Unlock()
}

return nil
}

// Close closes the connection to the shipper server.
// Also, implements `outputs.Client`
func (s *shipper) Close() error {
if s.client == nil {
if s.conn == nil {
return fmt.Errorf("connection is not established")
}
s.backgroundCancel()
s.ackCancel()
s.ackWaitGroup.Wait()

err := s.conn.Close()
s.conn = nil
s.client = nil
s.pending = nil

return err
}
Expand All @@ -271,53 +252,115 @@ func (s *shipper) String() string {
return "shipper"
}

func (s *shipper) ackLoop(ctx context.Context, ackClient sc.Producer_PersistedIndexClient) error {
st := s.observer
func (s *shipper) startACKLoop() error {
ctx, cancel := context.WithCancel(context.Background())
s.ackCancel = cancel

indexClient, err := s.client.PersistedIndex(ctx, &messages.PersistedIndexRequest{
PollingInterval: durationpb.New(s.config.AckPollingInterval),
})
if err != nil {
return fmt.Errorf("failed to connect to the server: %w", err)
}
indexReply, err := indexClient.Recv()
if err != nil {
return fmt.Errorf("failed to fetch server information: %w", err)
}
s.serverID = indexReply.GetUuid()

s.log.Debugf("connection to %s (%s) established.", s.config.Server, s.serverID)

s.ackClient = indexClient
s.ackBatchChan = make(chan pendingBatch)
s.ackIndexChan = make(chan uint64)
s.ackWaitGroup.Add(2)

go func() {
s.ackWorker(ctx)
s.ackWaitGroup.Done()
}()

go func() {
err := s.ackListener(ctx)
s.ackWaitGroup.Done()
if err != nil {
s.log.Errorf("acknowledgment listener stopped: %s", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should store the status of the ackLoop in the shipper struct. That way we can check it before a publish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I handled this by instead calling s.Close() which will produce an error the next time Publish is called.


// Shut down the connection and clear the output metadata.
// This will not propagate back to the pipeline immediately,
// but the next time Publish is called it will return an error
// because there is no connection, which will signal the pipeline
// to try to revive this output worker via Connect().
s.Close()
}
}()

return nil
}

// ackListener's only job is to listen to the persisted index RPC stream
// and forward its values to the ack worker.
func (s *shipper) ackListener(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to have a dedicated little listener-thread-thing that just forwards events from that RPC stream? Are we just trying to make the select statement in ackWorker cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can go into more detail in the shipper sync but, yes, because we can't directly select on the result of this call, when we make it we are committing to a 30+ second window where we can't handle signals from publish calls, which would mean using a very large channel buffer to avoid spurious blocking (there's no particular limit on how many batches could go thru and they send fast) -- I'm unhappy with the extra goroutine conceptually but it is cheap and robust to bad scheduling and other config interactions. What I'd really like is to fix the shipper API so this is all unnecessary, so let's talk about that at the sync...

s.log.Debugf("starting acknowledgment listener with server %s", s.serverID)
for {
select {
indexReply, err := s.ackClient.Recv()
if err != nil {
select {
case <-ctx.Done():
// If our context has been closed, this is an intentional closed
// connection, so don't return the error.
return nil
default:
// If the context itself is not closed then this means a real
// connection error.
return fmt.Errorf("ack listener closed connection: %w", err)
}
}
s.ackIndexChan <- indexReply.PersistedIndex
}
}

case <-ctx.Done():
return ctx.Err()
// ackWorker listens for newly published batches awaiting acknowledgment,
// and for new persisted indexes that should be forwarded to already-published
// batches.
func (s *shipper) ackWorker(ctx context.Context) {
s.log.Debugf("starting acknowledgment loop with server %s", s.serverID)

default:
// this sends an update and unblocks only if the `PersistedIndex` value has changed
indexReply, err := ackClient.Recv()
if err != nil {
return fmt.Errorf("acknowledgment failed due to the connectivity error: %w", err)
pending := []pendingBatch{}
for {
select {
case <-ctx.Done():
// If there are any pending batches left when the ack loop returns, then
// they will never be acknowledged, so send the cancel signal.
for _, p := range pending {
p.batch.Cancelled()
}
return

s.pendingMutex.Lock()
lastProcessed := 0
for _, p := range s.pending {
if p.serverID != indexReply.Uuid {
s.log.Errorf("acknowledgment failed due to a connection to a different server %s, batch was accepted by %s", indexReply.Uuid, p.serverID)
p.batch.Cancelled()
st.Cancelled(len(p.batch.Events()))
lastProcessed++
continue
}
case newBatch := <-s.ackBatchChan:
pending = append(pending, newBatch)

case newIndex := <-s.ackIndexChan:
lastProcessed := 0
for _, p := range pending {
// if we met a batch that is ahead of the persisted index
// we stop iterating and wait for another update from the server.
// The latest pending batch has the max(AcceptedIndex).
if p.index > indexReply.PersistedIndex {
if p.index > newIndex {
break
}

p.batch.ACK()
ackedCount := len(p.batch.Events()) - p.droppedCount
st.Acked(ackedCount)
ackedCount := p.eventCount - p.droppedCount
s.observer.Acked(ackedCount)
s.log.Debugf("%d events have been acknowledged, %d dropped", ackedCount, p.droppedCount)
lastProcessed++
}
// so we don't perform any manipulation when the pending list is empty
// or none of the batches were acknowledged by this persisted index update
if lastProcessed != 0 {
copy(s.pending[0:], s.pending[lastProcessed:])
s.pending = s.pending[lastProcessed:]
copy(pending[0:], pending[lastProcessed:])
}
s.pendingMutex.Unlock()
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions libbeat/publisher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ type Batch interface {
// Try sending the events in this list again; all others are acknowledged.
RetryEvents(events []Event)

// Release the internal pointer to this batch's events but do not yet
// acknowledge this batch. This exists specifically for the shipper output,
// where there is potentially a long gap between when events are handed off
// to the shipper and when they are acknowledged upstream; during that time,
// we need to preserve batch metadata for producer end-to-end acknowledgments,
// but we do not need the events themselves since they are already queued by
// the shipper. It is only guaranteed to release event pointers when using the
// proxy queue.
// Never call this on a batch that might be retried.
FreeEntries()

// Send was aborted, try again but don't decrease the batch's TTL counter.
Cancelled()
}
Expand All @@ -64,6 +75,7 @@ type EventCache struct {

// Put lets outputs put key-value pairs into the event cache
func (ec *EventCache) Put(key string, value interface{}) (interface{}, error) {
//nolint:typecheck // Nil checks are ok here
if ec.m == nil {
// uninitialized map
ec.m = mapstr.M{}
Expand All @@ -74,6 +86,7 @@ func (ec *EventCache) Put(key string, value interface{}) (interface{}, error) {

// GetValue lets outputs retrieve values from the event cache by key
func (ec *EventCache) GetValue(key string) (interface{}, error) {
//nolint:typecheck // Nil checks are ok here
if ec.m == nil {
// uninitialized map
return nil, mapstr.ErrKeyNotFound
Expand Down
Loading