Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split historical and incoming message retrieval #542

Closed
steven-sheehy opened this issue Feb 18, 2020 · 0 comments · Fixed by #551
Closed

Split historical and incoming message retrieval #542

steven-sheehy opened this issue Feb 18, 2020 · 0 comments · Fixed by #551
Assignees
Labels
enhancement Type: New feature grpc Area: GRPC API P2
Milestone

Comments

@steven-sheehy
Copy link
Member

steven-sheehy commented Feb 18, 2020

Problem
We had to combine historical and incoming message retrieval temporarily in v0.5.4. These flows should be split again so that their implementations can vary. Specifically the historical flow we want to throttle and poll while the incoming flow we want to use the shared poller to scale up.

Solution

  • Wrap the historical flow in a new class so its implementation can vary in the future:
public interface TopicMessageRetriever {
  Flux<TopicMessage> retrieve(TopicMessageFilter filter);
}
  • Rename or copy PollingTopicListener to PollingTopicMessageRetriever and make it implement TopicMessageRetriever instead
  • Enhance PollingTopicMessageRetriever to complete when next poll returns zero rows (or optionally < maxPageSize)
  • Add RetrieverProperties with options enabled and pollingFrequency
  • Use a more efficient/less racy polling mechanism than Flux.interval() like Flux.repeatWhen()
  • Change TopicMessageRepository.findByConsensusTimestampGreaterThan() to:
  Flux<TopicMessage> findLatest(long realm, Collection<Integer> topicIds, long consensusTimestamp);
  • Enhance SharedPollingTopicListener to keep track of subscribed topic ids in a concurrent set and pass to TopicMessageRepository.findLatest(...)
  • Change SharedPollingTopicListener to default

Alternatives

Additional Context

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Type: New feature grpc Area: GRPC API P2
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant