Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to provide receive offset or enqueued time filter? #388

Open
jo-me opened this issue Apr 1, 2021 · 1 comment
Open

How to provide receive offset or enqueued time filter? #388

jo-me opened this issue Apr 1, 2021 · 1 comment

Comments

@jo-me
Copy link

jo-me commented Apr 1, 2021

Hi,
I'm looking for a way to filter the messages received by providing an offset or time frame that I want to receive.
As far as I can see the Event hub supports something like this in combination with AMQP.
(https://azure.github.io/amqpnetlite/articles/azure_eventhubs.html)
I could not find a hint on how to do it with this lib.

How can this be done in uAMQP?

My use case is that I want to limit the messages received to only the last 10 minutes as older messages are irrelevant.

Thanks!

@jo-me
Copy link
Author

jo-me commented Nov 2, 2021

So, this thing seems to be dead, but answering to myself:

It is possible using amqp filter sets.
In other AMQP implementations this is documented better and I was able to expand upon the uampq receive example to receive only messages arriving after starting the receiver.

Basically, the simple "source" definition (containing the amqp address) needs to be replaced with a more elaborate config as the source will also contain a filter set which contains a query string (see filterByEnqueuedTime below)

So it works like this:

auto filterSet = amqpvalue_create_filter_set(amqpvalue_create_map());

std::uint64_t timeNow = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
std::string filterByEnqueuedTime = "amqp.annotation.x-opt-enqueuedtimeutc > " + std::to_string(timeNow);
auto selectorFilterKey = amqpvalue_create_symbol("apache.org:selector-filter:string");
auto selectorKey = amqpvalue_create_symbol("apache.org:selector-filter:string");
auto filterEntryValue = amqpvalue_create_string(filterByEnqueuedTime.c_str());
auto filterEntry =  amqpvalue_create_described(selectorFilterKey, filterEntryValue);
amqpvalue_set_map_value(filterSet, selectorKey,filterEntry);

auto newSource = source_create();
source_set_address(newSource, amqpvalue_create_string(("amqps://.....).c_str()));
source_set_filter(newSource, filterSet);
auto newSourceValue = amqpvalue_create_source(newSource);

link = link_create(session, "receiver-link", role_receiver, newSourceValue, target);

Reference links:
https://github.com/Azure/azure-event-hubs-c/blob/master/eventhub_client/devdoc/requirement_docs/eventhubreceiver_requirements_ll.md
https://docs.microsoft.com/de-de/azure/iot-hub/iot-hub-amqp-support
https://qpid.apache.org/releases/qpid-proton-0.18.0/proton/cpp/examples/selected_recv.cpp.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant