diff --git a/sdk/spring/CHANGELOG.md b/sdk/spring/CHANGELOG.md index 473bc2733bcb9..71544aa814772 100644 --- a/sdk/spring/CHANGELOG.md +++ b/sdk/spring/CHANGELOG.md @@ -11,6 +11,18 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module. #### Bugs Fixed - Fix error: Event Hubs connection string is still used when it's configured to empty string. [#42880](https://github.com/Azure/azure-sdk-for-java/issues/42880). +### Spring Integration Azure Core +This section includes changes in the `spring-integration-azure-core` module. + +#### Bugs Fixed +- Fix `DefaultMessageHandler.handleMessageInternal` error: When using a parallel Reactor Scheduler, blocking calls are prohibited. [#35215](https://github.com/Azure/azure-sdk-for-java/issues/35215). + +### Spring Integration Azure Event Hubs +This section includes changes in the `spring-integration-azure-eventhubs` module. + +#### Bugs Fixed +- Fix `EventHubsTemplate.doSend` error: When using a parallel Reactor Scheduler, blocking calls are prohibited. Thanks to [@AlanKrueger](https://github.com/AlanKrueger) for his contribution. [#40772](https://github.com/Azure/azure-sdk-for-java/pull/40772). + ## 5.18.0 (2024-11-05) - This release is compatible with Spring Boot 3.0.0-3.0.13, 3.1.0-3.1.12, 3.2.0-3.2.11, 3.3.0-3.3.5. (Note: 3.0.x (x>13), 3.1.y (y>12), 3.2.z (z>11) and 3.3.m (m>5) should be supported, but they aren't tested with this release.) - This release is compatible with Spring Cloud 2022.0.0-2022.0.5, 2023.0.0-2023.0.3. (Note: 2022.0.x (x>5) and 2023.0.y (y>3) should be supported, but they aren't tested with this release.) diff --git a/sdk/spring/spring-integration-azure-core/src/main/java/com/azure/spring/integration/core/handler/DefaultMessageHandler.java b/sdk/spring/spring-integration-azure-core/src/main/java/com/azure/spring/integration/core/handler/DefaultMessageHandler.java index 93b818e91b55d..a4717939ec532 100644 --- a/sdk/spring/spring-integration-azure-core/src/main/java/com/azure/spring/integration/core/handler/DefaultMessageHandler.java +++ b/sdk/spring/spring-integration-azure-core/src/main/java/com/azure/spring/integration/core/handler/DefaultMessageHandler.java @@ -25,6 +25,7 @@ import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFutureCallback; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.time.Duration; import java.time.temporal.ChronoUnit; @@ -87,7 +88,19 @@ protected void handleMessageInternal(Message message) { final Mono mono = this.sendOperation.sendAsync(dest, messageToSend); - if (this.sync) { + // When using StreamBridge.send with the non-blocking reactive chain and producer's sync being enabled, + // it results a IllegalStateException (block()/blockFirst()/blockLast() are blocking, which is not supported). + // Below steps are a reproducer for the IllegalStateException scenario: + // 1. Update configuration file: spring.cloud.azure.stream.eventhubs.bindings.supply-out-0.producer.sync=true + // 2. Use the following code to send a message: + // @PostMapping("/reproducer") + // public Mono> reproducer() { + // return Mono.defer(() -> Mono.just("reproducer")) + // .publishOn(Schedulers.parallel()) + // .doOnSuccess(message -> streamBridge.send("supply-out-0", message)) + // .map(ResponseEntity::ok); + // } + if (this.sync && !Schedulers.isInNonBlockingThread()) { waitingSendResponse(mono, message); } else { handleSendResponseAsync(mono, message); diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/EventHubsTemplate.java b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/EventHubsTemplate.java index 480d80952b140..79504cbca20ff 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/EventHubsTemplate.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/main/java/com/azure/spring/messaging/eventhubs/core/EventHubsTemplate.java @@ -102,52 +102,43 @@ private Mono doSend(String destination, List events, PartitionS EventHubProducerAsyncClient producer = producerFactory.createProducer(destination); CreateBatchOptions options = buildCreateBatchOptions(partitionSupplier); - EventDataBatch eventDataBatch = null; - try { - eventDataBatch = producer.createBatch(options).block(); - } catch (Exception e) { - LOGGER.error("EventDataBatch create error.", e); - return Mono.error(e); - } - AtomicReference currentBatch = new AtomicReference<>(eventDataBatch); - - Flux.fromIterable(events).flatMap(event -> { - final EventDataBatch batch = currentBatch.get(); - try { - if (batch.tryAdd(event)) { + AtomicReference currentBatch = new AtomicReference<>(); + return producer.createBatch(options) + .doOnSuccess(eventDataBatch -> { + currentBatch.set(eventDataBatch); + }) + .then(Flux.fromIterable(events).flatMap(event -> { + final EventDataBatch batch = currentBatch.get(); + try { + if (batch.tryAdd(event)) { + return Mono.empty(); + } else { + LOGGER.warn("EventDataBatch is full in the collect process or the first event is " + + "too large to fit in an empty batch! Max size: {}", batch.getMaxSizeInBytes()); + } + } catch (AmqpException e) { + LOGGER.error("Event is larger than maximum allowed size.", e); return Mono.empty(); - } else { - LOGGER.warn("EventDataBatch is full in the collect process or the first event is " - + "too large to fit in an empty batch! Max size: {}", batch.getMaxSizeInBytes()); } - } catch (AmqpException e) { - LOGGER.error("Event is larger than maximum allowed size.", e); - return Mono.empty(); - } - - return Mono.when( - producer.send(batch), - producer.createBatch(options).map(newBatch -> { - currentBatch.set(newBatch); - // Add the event that did not fit in the previous batch. - try { - if (!newBatch.tryAdd(event)) { - LOGGER.error("Event was too large to fit in an empty batch. Max size:{} ", - newBatch.getMaxSizeInBytes()); - } - } catch (AmqpException e) { - LOGGER.error("Event was too large to fit in an empty batch. Max size:{}", - newBatch.getMaxSizeInBytes(), e); - } - return newBatch; - })); - }) - .then() - .block(); + return Mono.when( + producer.send(batch), + producer.createBatch(options).map(newBatch -> { + currentBatch.set(newBatch); + // Add the event that did not fit in the previous batch. + try { + if (!newBatch.tryAdd(event)) { + LOGGER.error("Event was too large to fit in an empty batch. Max size:{} ", + newBatch.getMaxSizeInBytes()); + } + } catch (AmqpException e) { + LOGGER.error("Event was too large to fit in an empty batch. Max size:{}", + newBatch.getMaxSizeInBytes(), e); + } - final EventDataBatch batch = currentBatch.getAndSet(null); - return producer.send(batch); + return newBatch; + })); + }).then(Mono.defer(() -> producer.send(currentBatch.getAndSet(null))))); } private CreateBatchOptions buildCreateBatchOptions(PartitionSupplier partitionSupplier) { diff --git a/sdk/spring/spring-messaging-azure-eventhubs/src/test/java/com/azure/spring/messaging/eventhubs/core/EventHubsTemplateTests.java b/sdk/spring/spring-messaging-azure-eventhubs/src/test/java/com/azure/spring/messaging/eventhubs/core/EventHubsTemplateTests.java index 0a6109aa71dbb..bd21baf8c5c31 100644 --- a/sdk/spring/spring-messaging-azure-eventhubs/src/test/java/com/azure/spring/messaging/eventhubs/core/EventHubsTemplateTests.java +++ b/sdk/spring/spring-messaging-azure-eventhubs/src/test/java/com/azure/spring/messaging/eventhubs/core/EventHubsTemplateTests.java @@ -15,7 +15,9 @@ import org.junit.jupiter.api.Test; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import java.util.ArrayList; @@ -67,7 +69,7 @@ void testSendAsyncForMessagesWithThreeBatch() { when(eventDataBatch.getCount()).thenReturn(2, 2, 1); List messagesList = new ArrayList<>(); for (int i = 0; i < 5; i++) { - messagesList.add("abcde"); + messagesList.add("test"); } List> messages = messagesList.stream().map((Function>) GenericMessage::new).collect(Collectors.toList()); @@ -78,6 +80,35 @@ void testSendAsyncForMessagesWithThreeBatch() { verify(this.mockProducerClient, times(3)).send(any(EventDataBatch.class)); } + /** + * test the three batches case in parallel + */ + @Test + void testSendAsyncForMessagesWithThreeBatchParallel() { + EventDataBatch eventDataBatch = mock(EventDataBatch.class); + + when(this.mockProducerClient.createBatch(any(CreateBatchOptions.class))) + .thenReturn(Mono.just(eventDataBatch)); + when(eventDataBatch.tryAdd(any(EventData.class))).thenReturn(true, true, false, true, true, + false, true); + when(eventDataBatch.getCount()).thenReturn(2, 2, 1); + List messagesList = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + messagesList.add("test"); + } + List> messages = + messagesList.stream().map((Function>) GenericMessage::new).collect(Collectors.toList()); + + Mono mono = Flux.just(messages) + .parallel() + .runOn(Schedulers.parallel()) + .flatMap(msgs -> this.eventHubsTemplate.sendAsync(this.destination, messages, null)) + .then(); + StepVerifier.create(mono) + .verifyComplete(); + verify(this.mockProducerClient, times(3)).send(any(EventDataBatch.class)); + } + /** * test the normal one batch case */