Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plumb message_queue_nonempty callback #164

Merged
merged 3 commits into from
Nov 21, 2019

Conversation

benesch
Copy link
Collaborator

@benesch benesch commented Oct 23, 2019

This makes it possible to receive a notification via a Rust callback when there are new messages available. The bulk of the implementation is in the last two commits; the first commit is just preparatory work.

@benesch benesch requested a review from thijsc October 29, 2019 04:11
@benesch
Copy link
Collaborator Author

benesch commented Oct 29, 2019

@quodlibetor or @thijsc, can I talk one of you into reviewing this?

@thijsc
Copy link
Collaborator

thijsc commented Oct 30, 2019

Wouldn't it be possible to use a callback on the future? Or does that have downsides for you?

@benesch
Copy link
Collaborator Author

benesch commented Nov 5, 2019

Yeah, at @MaterializeInc we're not using the futures consumer. We have our own event loop that calls consumer.poll(0) at regular intervals. When we don't receive any messages, we want to be able to put the thread with the event loop to sleep, but get woken up if a new message arrives, which this PR enables. (Without this PR, the only option is to create another thread that forwards received Kafka messages over a channel, which creates unnecessary overhead.)

@benesch
Copy link
Collaborator Author

benesch commented Nov 5, 2019

Come to think of it, it should be possible to avoid the separate thread in the StreamConsumer entirely with the functionality in this PR. I'm going to poke at that a bit.

This makes some timeout logic in a forthcoming PR much clearer. I think
it also admits a clearer API, as

    consumer.poll(Timeout::Never)

is quite a bit more readable than

    consumer.poll(None).

Note that this is not actually a breaking API change, as
`From<Duration>` and `From<Option<Duration>>` are properly implemented
for `Timeout`, and so `consumer.poll(None)`, `consumer.poll(duration)`,
and `consumer.poll(Some(duration))` all compile as before.

There is one small API change to the admin client. Previously it was not
possible to use the builder API to unset a request timeout or operation
timeout; now it is. (This was an oversight on my part; the API should
previously have taken Option<Option<Duration>>, though that's quite a
mouthful.) Luckily, since the admin client has not yet made it into a
release, we're within rights to change the API.
This makes it possible to receive more targeted notifications when there
is activity on just one queue.
This makes it possible to receive a notification via a Rust callback
when there are new messages available.
@benesch
Copy link
Collaborator Author

benesch commented Nov 21, 2019

I'm going to poke at that a bit.

This turns out to be fairly difficult, so I'm just going to merge as is, now that tests are passing. (Just needed to increase the timeout for CI, because things seem to be a bit slower due to the small Travis VMs.)

@benesch benesch merged commit 7759023 into fede1024:master Nov 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants