-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix blocking calls #8
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please work on the suggested changes?
PS: Sorry for the late reply and thanks for the PR 👍🏼
long activeUserCount = activeUserCounter.decrementAndGet(); | ||
log.info("User '{}' Disconnected. Total Active Users: {}", webSocketSession.getId(), activeUserCount); | ||
chatMessageSink.tryEmitNext(new ChatMessage(0, "DISCONNECTED", "DISCONNECTED", activeUserCount)); | ||
}).subscribeOn(Schedulers.boundedElastic()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are missing .subscribe()
here. The code inside Mono.fromRunnable
won't execute unless explicitly subscribed or blocked.
@@ -23,7 +24,9 @@ public RouterFunction<ServerResponse> htmlRouter(@Value("classpath:/static/index | |||
return route(GET("/"), request -> ok().contentType(MediaType.TEXT_HTML).bodyValue(html)) | |||
.andRoute(POST("/message"), request -> request.bodyToMono(Message.class) | |||
.flatMap(message -> redisChatMessagePublisher.publishChatMessage(message.getMessage())) | |||
.flatMap(aLong -> ServerResponse.ok().bodyValue(new Message("Message Sent Successfully!.")))); | |||
.flatMap(aLong -> ServerResponse.ok().bodyValue(new Message("Message Sent Successfully!."))) | |||
.subscribeOn(Schedulers.boundedElastic())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to make this run on Schedulers.boundedElastic()
Did the BlockHound detected blocking call over here or inside the RedisChatMessagePublisher
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the first stack trace (first screenshot in the post), we see that the error propagates to RedisChatMessagePublisher but was triggered by WebHttpHandler, line 26 right? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the first stack trace (first screenshot in the post), we see that the error propagates to RedisChatMessagePublisher but was triggered by WebHttpHandler, line 26 right? :)
The other way round. The actual blocking call is at line - https://github.com/RawSanj/spring-redis-websocket/blob/master/src/main/java/com/github/rawsanj/messaging/RedisChatMessagePublisher.java#L35C11-L35C28 the Atomic Increment is blocking.
I think we should be good if we move that blocking call Integer totalChatMessage = chatMessageCounter.incrementAndGet()
inside the Mono.fromCallable()
and then add the .subscribeOn(Schedulers.boundedElastic())
in the end.
c4f6135
to
371c6a9
Compare
Hi! 👋
Apparently the chat module has some blocking calls (as detected by BlockHound):
This PR addresses these blocking code to ensure the pipeline remain reactive.