Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Reactor IllegalStateException thrown when using StreamBridge with Spring Cloud Azure Stream Binder #35215

Closed
3 tasks done
itsmariodias opened this issue May 31, 2023 · 9 comments
Assignees
Labels
azure-spring All azure-spring related issues azure-spring-eventhubs Spring event hubs related issues. Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Milestone

Comments

@itsmariodias
Copy link

itsmariodias commented May 31, 2023

Describe the bug
When using StreamBridge to publish messages to Azure Eventhubs in a reactive chain (.map, .flatMap, .doOnSuccess, etc.), an IllegalStateException is thrown by Reactor indicating that .block() call is used in the chain. The offending area is at line 106 in the com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.doSend method.

Exception or Stack Trace

2023-05-31 11:20:24.976 ERROR 8032 --- [ctor-http-nio-3] c.a.s.m.e.core.EventHubsTemplate         : EventDataBatch create error.

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
	at reactor.core.publisher.Mono.block(Mono.java:1742)
	at com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.doSend(EventHubsTemplate.java:106)
	at com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.sendAsync(EventHubsTemplate.java:60)
	at com.azure.spring.messaging.eventhubs.core.EventHubsTemplate.sendAsync(EventHubsTemplate.java:97)
	at com.azure.spring.integration.core.handler.DefaultMessageHandler.handleMessageInternal(DefaultMessageHandler.java:86)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1105)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:239)
	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:171)
	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:151)
	at com.sample.eventhubs.rest.EventhubsController.lambda$publishMessage$0(EventhubsController.java:27)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:171)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:134)
	at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:192)
	at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181)
	at reactor.core.publisher.Operators.complete(Operators.java:137)
	at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:121)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282)
	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2196)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2070)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:451)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4490)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
	at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
	at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:1002)
	at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:707)
	at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:481)
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:621)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:230)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

To Reproduce
Steps to reproduce the behavior:

  1. Publish a message via StreamBridge.send in any reactive chain.

Code Snippet
RestController.java

@PostMapping("/publishMessage")
public Mono<ResponseEntity<String>> publishMessage() {
    return eventhubsService.someMethod()
            .doOnSuccess(message -> {
                log.info("Sending message: {}", message);
                streamBridge.send("supply-out-0", message);
                log.info("Sent {}.", message);
            }).map(ResponseEntity::ok);
}

application.yaml

spring:
  cloud:
    azure:
      eventhubs:
        namespace: ${AZURE_EVENTHUBS_NAMESPACE}
    stream:
      output-bindings: supply
      bindings:
        supply-out-0:
          destination: ${AZURE_EVENTHUB_NAME}
      eventhubs:
        bindings:
          supply-out-0:
            producer:
              sync: true
      default:
        producer:
          errorChannelEnabled: true

Expected behavior
The message should be published to eventhub without any errors.

Screenshots
N/A

Setup (please complete the following information):

  • OS: Windows 11
  • IDE: IntelliJ IDEA 2023.1.2 (Community Edition)
  • Library/Libraries:
    • com.azure.spring:spring-cloud-azure-stream-binder-eventhubs:4.8.0
    • com.azure.spring:spring-cloud-azure-dependencies:4.8.0
    • org.springframework.cloud:spring-cloud-dependencies:2021.0.6
    • org.springframework.boot:spring-boot-starter-webflux:2.7.12
  • Java version: 17
  • App Server/Environment: [e.g. Tomcat, WildFly, Azure Function, Apache Spark, Databricks, IDE plugin or anything special]
  • Frameworks: Spring Boot 2.7.12

Additional context
This seems to be identical to the #12500. When adding .publishOn(Schedulers.boundedElastic()) in the reactive chain before the streamBridge call the issue is resolved, however this ought to be done in the library itself.

Information Checklist

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@github-actions github-actions bot added customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels May 31, 2023
@itsmariodias itsmariodias reopened this May 31, 2023
@itsmariodias
Copy link
Author

Closed by mistake, reopened it now.

@joshfree joshfree added Event Hubs Client This issue points to a problem in the data-plane of the library. labels May 31, 2023
@github-actions github-actions bot removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label May 31, 2023
@joshfree
Copy link
Member

Thank you for the detailed issue, @itsmariodias! @conniey @Azure/azsdk-sb-java can you please take a look?

@anuchandy
Copy link
Member

anuchandy commented May 31, 2023

Hi Yi, this is due to this line and this line in our spring layer, we may want to make the first batch creation as part of the async-chain.

@anuchandy
Copy link
Member

@yiliuTo I was thinking something like this https://gist.github.com/anuchandy/9b46fb005e783f16fad38e04c1f7ed4a . I didn't test it but maybe useful as a reference or there exists a better way to do it.

@itsmariodias
Copy link
Author

Is there any activity on this? The Additional Context provided should be useful in debugging and provide the correct fix for the same.

@itsmariodias
Copy link
Author

Hi, this seems to still exist in v5.5.0, any timeline on when this would be fixed? @joshfree @anuchandy @conniey @yiliuTo

@conniey
Copy link
Member

conniey commented Oct 16, 2023

@yiliuTo Please feel free to redirect this if you do not own the event hubs spring binder.

@conniey conniey removed their assignment Oct 16, 2023
@AlanKrueger
Copy link

This seems like the same issue as #31358. Is there a known workaround for this?

@Netyyyy Netyyyy added the azure-spring All azure-spring related issues label Jun 24, 2024
@github-actions github-actions bot added the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Jun 24, 2024
@Netyyyy Netyyyy added azure-spring-eventhubs Spring event hubs related issues. and removed needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team labels Jun 24, 2024
@github-actions github-actions bot added the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Jun 24, 2024
@saragluna saragluna added this to the Backlog milestone Jul 29, 2024
@moarychan moarychan removed this from the Backlog milestone Nov 21, 2024
@moarychan
Copy link
Member

Fixed, closing it now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
azure-spring All azure-spring related issues azure-spring-eventhubs Spring event hubs related issues. Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
Status: Done
Development

No branches or pull requests

9 participants