Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

The PublishEvents RPC should block when the queue is full #84

Closed
Tracked by #34 ...
cmacknz opened this issue Jul 27, 2022 · 6 comments · Fixed by #91
Closed
Tracked by #34 ...

The PublishEvents RPC should block when the queue is full #84

cmacknz opened this issue Jul 27, 2022 · 6 comments · Fixed by #91
Assignees
Labels
Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Comments

@cmacknz
Copy link
Member

cmacknz commented Jul 27, 2022

Implementation issue following the discussion in #81.

Specifically the RPC should block until at least one event is accepted into the queue. The RPC should not block until all events in the batch have been accepted into the queue.

The shipper queue's publish interface already blocks when the queue is full based on the underlying Beats' memory and disk queue implementations:

producer := eventQueue.Producer(beatsqueue.ProducerConfig{})
return &Queue{eventQueue: eventQueue, producer: producer}, nil
}
func (queue *Queue) Publish(event *messages.Event) (EntryID, error) {
if !queue.producer.Publish(event) {
return EntryID(0), ErrQueueIsFull
}

  1. Beats' memory queue publish implementation: https://github.com/elastic/beats/blob/e6db9a53f2fff60b350153b03407bc38b21bf0b1/libbeat/publisher/queue/memqueue/produce.go#L128-L136
  2. Beats' disk queue publish implementation: https://github.com/elastic/beats/blob/e6db9a53f2fff60b350153b03407bc38b21bf0b1/libbeat/publisher/queue/diskqueue/producer.go#L52-L62

One missing piece with the existing Publish method is that it does not accept a context.Context as input. This means that when a PublishEvents RPC call is made with a timeout then that timeout will be ignored if the RPC is blocked in the queue Publish method. We will need to modify the queue interface to accept a context.Context and propagate the RPC context to it in the PublishEvents method.

@cmacknz cmacknz added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Jul 27, 2022
@cmacknz cmacknz changed the title PublishEvents should block when the queue is full The PublishEvents RPC should block when the queue is full Jul 27, 2022
@rdner
Copy link
Member

rdner commented Aug 9, 2022

@cmacknz The issue description sounds like this is to be implemented in the queue.Publish(event) not in the server.Publish endpoint of the gRPC server, did I get it right?

If we just blocked on the channel like it's now implemented in beats (referenced memory and disk queue implementations) then we would not comply with:

The RPC should not block until all events in the batch have been accepted into the queue.

Blocking on the channel inside queue.Publish(event) would mean blocking for all events of the batch, not just the first one.

So, my initial idea after our discussion was: when calling queue.Publish(event) for the very first event of the batch and when the queue returns queue.ErrQueueIsFull, I would just start retrying with configured (in the server config) intervals until at least the first event goes through. If same happens for following events I would just return with AcceptedCount = 1. It complies the described requirements and does not require changes in the queue.

@faec any thoughts on this?

@cmacknz
Copy link
Member Author

cmacknz commented Aug 9, 2022

We will need to modify the Publish() interface to pass the RPC context through to allow cancellation to work properly at least, so we can consider further changes or additions to the queue Publish interface if it helps us get the behaviour we want.

I would like to eliminate retry loops and anything that resembles busy waiting if we can, and just rely on having the queue unblock us when it is ready to accept new events.

Could we expose the underlying channel from the queue to help with this perhaps?

// Unconditionally block for the first event in the batch
queueCh <- events[0]

// Avoid blocking for subsequent attempts to write.
for i := 1; i < len(events); i++ {
  select {
  case queueCh <- events[i]:
  case default:
     // send would block, return from RPC
  }
}

@faec
Copy link
Contributor

faec commented Aug 9, 2022

Well, it's straightforward to handle events after the first one by calling TryPublish (which returns without blocking) instead of Publish. However yes, to wait on the first event without a retry loop we need at least some change to the queue API. I'd rather not expose the underlying channel, that's a distinct implementation detail in each queue, but we could make Publish accept a cancel/done channel? Then the gRPC caller could pass in context.Done() and beats could give it nil to preserve the current behavior. If I remember right, all three-ish implementations of Publish already have select calls it could be added to.

@rdner
Copy link
Member

rdner commented Aug 10, 2022

@faec I'm somewhat confused now.

So, we have this Publish function in the Queue (in Shipper). It's using the Publish function in the producer which claims to be blocking and returns false if publication is unsuccessful (whatever it means, I guess depends on the interface implementation).

But at the same time I see that if it returns false the queue is considered full by the Queue wrapper in Shipper, according to what I see in the shipper implementation:

func (queue *Queue) Publish(event *messages.Event) (EntryID, error) {
if !queue.producer.Publish(event) {
return EntryID(0), ErrQueueIsFull
}
return EntryID(0), nil
}

The behaviour we want is to block on the first event until the queue is free and then (as you said) TryPublish (which is not currently exposed by the shipper Queue wrapper) on the rest of events in the batch and if TryPublish fails we return the current AcceptCount.

The missing piece is Publish should block until the queue is not full (can accept an event) and the first event is published. For this we also need a way to cancel the call in case the connection with the client is lost.

I can't see how we can achieve this without changes in the implementation of queue in Beats, am I missing something?

What would be the concrete steps we need to take in order to achieve this?

@faec
Copy link
Contributor

faec commented Aug 10, 2022

Yes, TryPublish will return directly if the queue is full but the only way a Publish call actually returns false is if the producer is cancelled by its caller.

I agree, this requires changes to the beats queue. I see a couple options. We could modify the queue producer interface itself:

type Producer interface {
	// Publish adds an event to the queue, blocking if necessary, and returns
	// the new entry's id and true on success.
	Publish(event interface{}) (EntryID, bool)
        ...

We could change this prototype to be Publish(event interface{}, cancel chan struct{}) (EntryID, error)

There are three main producers that would need to change to support it: ackProducer, forgetfulProducer, and diskQueueProducer. (Possibly other assorted examples in the tests.) For example ackProducer.Publish calls a helper openState.publish (in memqueue/produce.go) which currently selects on the queue's publication channel and the producer's done channel -- a caller cancel could easily be added.

The other option that comes to mind is to modify the queue interface itself. We removed the abstraction of queue "consumers" a while ago since they had no state of their own, but we still have "producers" because they encapsulate state about callbacks / notifications. However, the shipper has no need of these features and they add potential race conditions where there's no essential dependency, so we could add a direct function on the queue for callers that don't need to preserve producer state:

// (beats/libbeat/publisher/queue/queue.go)
type Queue interface {
  Publish(event interface{}, block bool, cancel chan struct{})
...

This publish call would do the same API calls as the producers, but with no producer state and an explicit cancel channel to address the needs of the shipper. This way the shipper wouldn't have to worry about queue producers or properly synchronizing them at all, so I think I like this option somewhat better, but both are probably fine so whatever makes for a more practical implementation.

@rdner
Copy link
Member

rdner commented Aug 11, 2022

After a meeting, @cmacknz @faec and I decided on the following steps:

  • Queue.Publish (wrapper in Shipper) will support context.Context as an additional parameter to cancel blocking on event publishing if the connection with the gRPC client is lost. This will be converted into a cancel channel (inside Queue.Publish) that will be later propagated in the queue implementation of libbeat and supported throughout the queue implementations.
  • Queue.Publish (wrapper in Shipper) is already blocking until the event is published or until the whole producer is canceled. The current implementation assumes that false returned from the function means ErrQueueIsFull which is incorrect and it needs to change
  • The shipper gRPC server will use Queue.Publish (wrapper in Shipper) for the first event. This will block until the space in the queue is available
  • The shipper gRPC server will use TryPublish for all the following events of the batch. TryPublish needs to be added to the wrapper in Shipper and it does not block and return false if the queue is not available (full). If TryPublish returns false we assume ErrQueueIsFull and send the reply with the current AcceptCount back to the client.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants