Skip to content
This repository has been archived by the owner on Jul 28, 2021. It is now read-only.

Stream API to get batch of messages from certain sequence with limit #266

Closed
FZambia opened this issue Jul 19, 2020 · 9 comments
Closed

Comments

@FZambia
Copy link

FZambia commented Jul 19, 2020

This is a follow up issue after short discussion with @ripienaar in Nats Slack.

I have a system which uses streams (Centrifugo). At moment we have two implementations of stream data structure that fits internal Centrifugo design. One is in-memory, the second based on Redis Stream data structure. I am investigating a possibility to extend possible options for Jetstream in future. I am still thinking about advisability in general but decided to ask a question here as soon as possible while Jetstream is in tech preview stage.

Centrifugo is designed in a way that clients should have a possibility to paginate over a stream since certain offset asking for a batch of messages. Actually something like this in pseudocode:

messages = stream.Get(sinceSequence, limit)

It also needs a way to access current stream max sequence. Which I suppose already possible with $JS.API.STREAM.INFO.*

But regarding to getting a batch of messages from a stream things are not too handy. As far as I understand at moment I can only create consumer to load messages one by one to form a batch for client to return. Pull based or maybe push based to avoid many RTT for messages. But this does not seem like a good approach for my use case.

For example Redis streams allow iterating over a stream with sth like:

XRANGE <STREAM_KEY> <OFFSET> + COUNT <LIMIT>

I suppose it can be part of Jetstream admin API maybe – it already has a method to get a single message from a stream: $JS.API.STREAM.MSG.GET.*. But looking at Store interface it has a method to load only one message from memory or disk (LoadMsg ). For in-memory store I suppose this may work fine for asking many messages in perspective, but for disk storage asking for a batch can be a problem (not sure how messages are stored on disk).

What I want to ask here – whether an API to iterate over stream with certain position and with a certain limit is in Jetstream design vision? Should I consider that it will be available at some moment?

@ripienaar
Copy link
Collaborator

@derekcollison tldr, they want to be able to show a stream contents on a web page and need to be able to walk through it in pages, seems fine as we already have a get msg?

I imagine you ask for 10 messages with a inbox, you respond with them each and the last one has a header indicating its the last one so they dont have to wait for some timeout if there is only 8 messages not 10.

@derekcollison
Copy link
Member

I would do this with a simple pull based consumer which supports batching. You can know that last sequence number of the stream as you pointed out so you know when to stop. We are also (most likely) going to add that information to the ack reply so you will not have to ask for it separately.

@ripienaar
Copy link
Collaborator

He doesn’t want to round trip to the server 90 times to build a single web page. And the get msgs api we have is awkward too.

Consumer like this was my first suggestion

@derekcollison
Copy link
Member

I would not use the stream message API for this, at least as it exists today.

A pull based consumer makes most sense but I understand the potential issues since its durable etc.

You can grab state of stream, create durable pull based and use the batching mechanism there. That will work. Make sure to delete the consumer when done.

@ripienaar
Copy link
Collaborator

You mean you wouldnt use it even if it supported a request to retrieve n messages?

The consumer approach will be really difficult, because the durble work as long as no1 ever hit back :) so such a page would have to make ephemeral ones for every single request, it'll be super tedious

@derekcollison
Copy link
Member

If the stream itself had an api to get batches of messages that could possibly be useful. But need to think more through that and the issues we have now with push based backlogs and applications not keeping up, etc.

@ripienaar
Copy link
Collaborator

Yes, that API is the feature request in this issue.

We could cap it at 256 messages max maybe?

@FZambia
Copy link
Author

FZambia commented Jul 21, 2020

One thing to mention is that I am planning to do this from backend side since clients communicate with server over custom protocol with its own existing encoding/mechanics.

I understand that consumer approach should work but this seems a bit heavy to do in my specific use case. As far as I understand each consumer creation is an extra request to Jetstream, each consumer deletion is an extra request to Jetstream, each consumer may cost a goroutine to me, and I should wait an arbitrary and not very opaque amount of time while a batch of messages is being collected (actually I don't understand at moment whether I can control batch size in pull consumers or I only have Next method available). And yes - this should be done for every client request for messages in stream in my case.

With Redis Streams task is solved with one RTT to Redis (with a help of some Lua scripting that returns both messages and stream stats). And no extra goroutine involved in this process.

I also asked my colleague about similar possibilities of Kafka – and looks like it only possible with ksql-like tools or ksqlDB at moment (I mean without consumers involved).

@ripienaar
Copy link
Collaborator

Pull consumers now do batches and is probably what we'd suggest now, closing this. Accessing a stream directly repeatedly is not the correct approach

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants