From 22d471938a19469c635dc150feae119e460e3845 Mon Sep 17 00:00:00 2001 From: Hemant Tanwar Date: Wed, 28 Oct 2020 22:51:53 -0700 Subject: [PATCH] Sb t2 schedule multiple message validate batch size (#16959) added check for maxsize for schedule message api --- .../ServiceBusSenderAsyncClient.java | 21 +++++++++---------- .../ServiceBusSenderAsyncClientTest.java | 14 ++++++------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 3332ced69344d..b026fec9de706 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -371,26 +371,25 @@ public Flux scheduleMessages(Iterable messages, OffsetD return fluxError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null.")); } - return getSendLink().flatMapMany(link -> createMessageBatch() - .flatMapMany(messageBatch -> { + return createMessageBatch() + .map(messageBatch -> { int index = 0; for (ServiceBusMessage message : messages) { if (!messageBatch.tryAddMessage(message)) { final String error = String.format(Locale.US, "Messages exceed max allowed size for all the messages together. " + "Failed to add message at index '%s'.", index); - throw logger.logExceptionAsError(new AmqpException(false, - AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, error, link.getErrorContext())); + throw logger.logExceptionAsError(new IllegalArgumentException(error)); } ++index; } - - return connectionProcessor - .flatMap(connection -> connection.getManagementNode(entityName, entityType)) - .flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), - scheduledEnqueueTime, messageBatch.getMaxSizeInBytes(), link.getLinkName(), - transactionContext)); - })); + return messageBatch; + }) + .flatMapMany(messageBatch -> connectionProcessor + .flatMap(connection -> connection.getManagementNode(entityName, entityType)) + .flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime, + messageBatch.getMaxSizeInBytes(), linkName.get(), transactionContext)) + ); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index 1f832c0702116..07a821cd22eec 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -70,9 +70,8 @@ import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; @@ -308,13 +307,12 @@ void scheduleMessageSizeTooBig() { // Act & Assert StepVerifier.create(sender.scheduleMessages(messages, instant)) - .verifyErrorMatches(throwable -> { - assertTrue(throwable instanceof AmqpException); - assertSame(((AmqpException) throwable).getErrorCondition(), AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED); - return true; - }); + .verifyError(IllegalArgumentException.class); + + verify(managementNode, never()).schedule(any(), eq(instant), anyInt(), eq(LINK_NAME), isNull()); } - + + /** * Verifies that sending multiple message will result in calling sender.send(MessageBatch, transaction). */