Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Implement the shipper's gRPC API #34

Closed
9 of 10 tasks
Tracked by #8 ...
cmacknz opened this issue May 11, 2022 · 7 comments
Closed
9 of 10 tasks
Tracked by #8 ...

Implement the shipper's gRPC API #34

cmacknz opened this issue May 11, 2022 · 7 comments
Assignees

Comments

@cmacknz
Copy link
Member

cmacknz commented May 11, 2022

Implement the shipper's gRPC API, which is currently a skeleton defined in https://github.com/elastic/elastic-agent-shipper/blob/main/server/server.go

The API specification is currently defined in https://github.com/elastic/elastic-agent-shipper/blob/main/api/shipper.proto

Acceptance Criteria:

@rdner
Copy link
Member

rdner commented Jul 12, 2022

@faec I have some questions to ask regarding the last shipper API change:

This are statements to double check that I understood the docs correctly:

  1. accepted_count (in Shipper.Publish) is how many events from a single request went through queue.Publish without errors
  2. accepted_index is a sum of all accepted_count values during all shipper publish requests
  3. According to the queue Metrics we can calculate the following persisted_index = accepted_index - Metrics.UnackedConsumedEvents. Currently I run a background go routine that polls the queue metrics with an interval and updates the persisted_index with this formula.

Q1: I faced one edge-case I'm not sure about: what to do on the server when we failed to enqueue a single event? I'm talking about the time when Queue.Publish returns an error which is not queue.ErrQueueIsFull. Seems like I cannot return this error to the client since it would lead to data duplication. I think we should just log the error, stop processing the request and accept all the previously published events from this request. For example, we have 20 events, on 13th we have an error from Queue.Publish, so we accept 12 (AcceptedCount=12) and return the PublishReply normally.

Q2: What's the idea behind having PersistedIndex endpoint as a stream? Should I be pushing persisted_index updates every time this value is changed?

@faec
Copy link
Contributor

faec commented Jul 12, 2022

Q1: A full queue is the only way an event can fail to enqueue -- the queue API itself (e.g. Publish) doesn't return an error. We could imagine other error conditions, e.g. in the disk queue the input event might fail to serialize, but that doesn't arise until after the client's Publish request has already returned. So, the client calling Publish should never get an error other than queue-full once it's gotten as far as adding something to the queue. Any errors after that stage should be reported internally within the shipper, not to the originating client.

Q2: That's probably an oversight -- I didn't intentionally make those different, and didn't realize what the stream qualifier did, so that was likely left over from the previous version of the call and can be removed.

3 doesn't look right, though -- UnackedConsumedEvents gives the number of events that have been read by a consumer but not yet acknowledged. This means that if a queue has 1000 events, and none of them have been read by a consumer (output worker) yet, then UnackedConsumedEvents will be 0, but this doesn't mean persisted_index should be 1000 (if it's the memory queue then none of those events have been persisted). I'm not sure I understand the need here, though -- why is there a background go routine to poll this value?

@rdner
Copy link
Member

rdner commented Jul 13, 2022

@faec

Regarding Q1 – that's what I also thought.

Q2: I've already implemented the streaming API there, after a closer look I think it can be beneficial for the clients. They can just subscribe to persistedIndex changes and listen until persistedIndex >= the last acceptedIndex they received and then ack their current batch and advance. I think from the consumer perspective it might be a good experience.

Regarding 3:

I think I'm missing something here, how does one get the persisted_index value then? I see no current way in the queue to get this information, therefore I started polling the queue metrics in intervals and updating the index in the background.

See #76 for more context.

@faec
Copy link
Contributor

faec commented Jul 13, 2022

Hmm, you're right, I could see it making sense for the persisted index call to be a stream, let's try it that way.

The actual value of persisted_index requires support from the queue itself, which I'm working on -- I'll add you on the PR as soon as it's ready.

@rdner
Copy link
Member

rdner commented Aug 9, 2022

As we discussed face to face with @cmacknz, we can remove #63 from the acceptance criteria check list and leave it for future research.

@cmacknz
Copy link
Member Author

cmacknz commented Aug 24, 2022

Removed #97 as it doesn't block completion of this issue.

@cmacknz
Copy link
Member Author

cmacknz commented Sep 14, 2022

Closing this one, testing tracked as part of the 8.5 milestone.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

5 participants