Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behavior of Subscriber::consume() #240

Closed
vcampmany opened this issue Jun 7, 2021 · 7 comments
Closed

Behavior of Subscriber::consume() #240

vcampmany opened this issue Jun 7, 2021 · 7 comments

Comments

@vcampmany
Copy link

vcampmany commented Jun 7, 2021

I'm trying to implement a subscriber, I have a code that looks as follows:

Subscriber sub = redis.subscriber();
sub.on_message( [&](std::string channel, std::string msg) {
   // do some task
});
sub.subscribe("my-channel");

while (!stop_condition) {
    try {
      sub.consume();
      std::cout << "loop \n";
    } catch (const TimeoutError& e) {
      continue;
    } catch (const ReplyError& e) {
      continue;
    } catch (const Error& e) {
      // logic to create a new Subscriber
    }
}

When I run this code, it usually runs as expected. Meaning, once in the while loop it tries to consume a message, if no message is available, it goes on to the next iteration (I can see the message printed out in console). However, for some executions, after a few iterations, it seems like the Subscriber::consume() methods blocks (i.e. I stop seeing new lines printed in the console). I've been able to "unblock" it by sending a PUBLISH command from redis-cli, which wakes up the consume method and the program continues execution.

The "blocking" of Subscriber::consume() seems to happen randomly. I was wondering what is the behavior of the method, I couldn't find more details in the documentation. I've been playing with socket_timeout from ConnectionOptions, but that didn't make any difference. Some help would be much appreciated 🙂

Thanks for this awesome library!

@wingunder
Copy link
Contributor

Hi @vcampmany
Could you please tell us which version of Redis server you are using, as well as your operating system and compiler versions?

Could you also please run the following command in the directory that you built redis-plus-plus, in order to see if all tests passed? Like so:

$ ./test/test_redis++ -h localhost -p 6379
Testing Redis...
Pass sanity tests
Pass connection commands tests
Pass keys commands tests
Pass string commands tests
Pass list commands tests
Pass hash commands tests
Pass set commands tests
Pass zset commands tests
Pass hyperloglog commands tests
Pass geo commands tests
Pass script commands tests
Pass pubsub tests
Pass pipeline and transaction tests
Pass stream commands tests
Pass all tests

Lastly, please take a look at the following pub-sub unit test file for more examples:
https://github.com/sewenew/redis-plus-plus/blob/master/test/src/sw/redis%2B%2B/pubsub_test.hpp

Thanks & regards

@sewenew
Copy link
Owner

sewenew commented Jun 8, 2021

@vcampmany The subscribe operation, i.e. consume method, blocks until there is new message published to channel, or the connection is timeout. If you want to mitigate the problem, you can set a small socket_timeout, say, 10ms. It will block at most 10ms, note the timeout might also depends on the precision of system clock.

Regards

@vcampmany
Copy link
Author

Thanks for your replies @sewenew and @wingunder. I spotted the problem and it was on my side. I wasn't setting the socket_timeout correctly. As a follow up question: I have a Redis object with a set of connection options and I want to create a Subscriber object with a different set of connection options (i.e. with timeout). Do I need to create a second Redis object with timeout or is there a way to modify the connection options when creating a Subscriber?

Right now I have something like this:

ConnectionOptions opts;
opts.host = "host";
opts.port = 6379;
Redis r(opts)
// ...
// Use r
// ...

ConnectionOptions sub_opts = opts;
sub_opts.socket_timeout = 100ms;
Subscriber sub = Redis(sub_opts).subscriber()

Thanks!

@wingunder
Copy link
Contributor

Hi @vcampmany

I have a Redis object with a set of connection options and I want to create a Subscriber object with a different set of connection options (i.e. with timeout). Do I need to create a second Redis object with timeout or is there a way to modify the connection options when creating a Subscriber?

It's funny, I also stumbled across this before. I ended up using a Redis instance per uniquely required timeout. This is expensive, but you can (and should) reuse the Redis instances. If you have say, 3 subscribes with a 10 sec timeout and 50 subscribes with a 3 second timeout, you'll only need to have 2 Redis instances.

I however do not know how of another way to solve this. Maybe @sewenew has some better ideas.
Regards

@sewenew
Copy link
Owner

sewenew commented Jun 8, 2021

Yes, as @wingunder mentioned, you need to create another Redis object with different connection options. So far, there's no way to reset the connection options for subscriber. There's a plan to share the connection between Redis and Subscriber, so they need to have the same connection options.

Regards

@vcampmany
Copy link
Author

Thanks for clarifying @sewenew. Closing the issue as doubts are resolved.

Regards

@wingunder
Copy link
Contributor

A similar issue is covered in #307, with a good explanation and example, by @sewenew.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants