Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with subscribeAsMessages Method in Quarkus Redis Client #42079

Closed
otavioprado opened this issue Jul 23, 2024 · 4 comments · Fixed by #42091
Closed

Issue with subscribeAsMessages Method in Quarkus Redis Client #42079

otavioprado opened this issue Jul 23, 2024 · 4 comments · Fixed by #42091
Labels
area/redis kind/bug Something isn't working triage/needs-reproducer We are waiting for a reproducer.
Milestone

Comments

@otavioprado
Copy link

otavioprado commented Jul 23, 2024

Describe the bug

I'm experiencing an issue with the subscribeAsMessages method in Quarkus. When attempting to subscribe to Redis channels using subscribeAsMessages, the subscription appears to occur, but the processMessage method is never called when the channel receives events. In contrast, using the subscribe method works correctly.

Expected behavior

The subscribeAsMessages method should successfully subscribe to the Redis channels and trigger the processMessage method when events are received on the subscribed channels.

Actual behavior

The subscribeAsMessages method subscribes to the channels, but the processMessage method is never invoked when events are received on those channels. The subscribe method, however, works as expected and correctly invokes processMessage.

How to Reproduce?

Both methods utilize a
private final ReactivePubSubCommands<byte[]> redisPubSub.

Working code:

private void subscribeToChannels() {
    redisPubSub.subscribe(channelsString)
            .onItem().invoke(message -> processMessage(message))
            .subscribe().with(
                    item -> LOG.infof("Subscribed to channel: item %s", item),
                    failure -> LOG.errorf("Failed to subscribe to channel: %s, due to: %s", failure.getMessage())
            );
}

Non-working code:

private void subscribeToChannels() {
    redisPubSub.subscribeAsMessages(channelsString)
            .onItem().invoke(message -> processMessage(message.getPayload()))
            .subscribe().with(
                    item -> LOG.infof("Subscribed to channel: item %s", item),
                    failure -> LOG.errorf("Failed to subscribe to channel: %s, due to: %s", failure.getMessage())
            );
}
  1. Implement the subscribeToChannels method using subscribeAsMessages.
  2. Attempt to subscribe to the Redis channels.
  3. Send events to the subscribed channels.
  4. Observe that the processMessage method is never called for subscribeAsMessages on Non-working code.

Output of uname -a or ver

Linux latitude 6.5.0-44-generic #44~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Tue Jun 18 14:36:16 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

Output of java -version

openjdk version "21.0.1" 2023-10-17 LTS

Quarkus version or git rev

Quarkus 3.12.3

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.9.7

Additional information

The documentation lacks examples for Pub/Sub using subscribeAsMessages, which could be very helpful in understanding and correctly implementing this feature.

@otavioprado otavioprado added the kind/bug Something isn't working label Jul 23, 2024
Copy link

quarkus-bot bot commented Jul 23, 2024

/cc @Ladicek (redis), @cescoffier (redis), @machi1990 (redis)

@cescoffier
Copy link
Member

Do you have a reproducer? We have test covering this features, I would like to see what is not tested on our side.

@cescoffier cescoffier added the triage/needs-reproducer We are waiting for a reproducer. label Jul 23, 2024
@otavioprado
Copy link
Author

otavioprado commented Jul 23, 2024

@cescoffier

Sure, here is the repository with the reproducer: https://github.com/otavioprado/issue-subscribe-asmessages-reproducer

In the repository, the subscribeToChannelsWorks implementation works correctly, while the subscribeToChannelsNonWorking implementation does not. The subscribeToChannelsNonWorking method uses subscribeAsMessages, but the processMessage method is never called when events are received on the channels.

You can switch between the working and non-working implementations in the MainService class by simply uncommenting the one you want to use:

void onStart(@Observes StartupEvent ev) {
    LOG.info("MainService is starting");
    reproducer.subscribeToChannelsNonWorking();
    //reproducer.subscribeToChannelsWorks();
}

Thank you for your help!

@cescoffier
Copy link
Member

Oh gosh, this is embarrassing...

Thanks for the reproducer, I found the issue, adding a test and opening a PR...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/redis kind/bug Something isn't working triage/needs-reproducer We are waiting for a reproducer.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants