Skip to content

Commit

Permalink
Sb t2 schedule multiple message validate batch size (Azure#16959)
Browse files Browse the repository at this point in the history
 added check for maxsize for schedule message api
  • Loading branch information
hemanttanwar authored Oct 29, 2020
1 parent eb76325 commit 22d4719
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,26 +371,25 @@ public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetD
return fluxError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null."));
}

return getSendLink().flatMapMany(link -> createMessageBatch()
.flatMapMany(messageBatch -> {
return createMessageBatch()
.map(messageBatch -> {
int index = 0;
for (ServiceBusMessage message : messages) {
if (!messageBatch.tryAddMessage(message)) {
final String error = String.format(Locale.US,
"Messages exceed max allowed size for all the messages together. "
+ "Failed to add message at index '%s'.", index);
throw logger.logExceptionAsError(new AmqpException(false,
AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, error, link.getErrorContext()));
throw logger.logExceptionAsError(new IllegalArgumentException(error));
}
++index;
}

return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(),
scheduledEnqueueTime, messageBatch.getMaxSizeInBytes(), link.getLinkName(),
transactionContext));
}));
return messageBatch;
})
.flatMapMany(messageBatch -> connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime,
messageBatch.getMaxSizeInBytes(), linkName.get(), transactionContext))
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_SERVICE_NAME;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -308,13 +307,12 @@ void scheduleMessageSizeTooBig() {

// Act & Assert
StepVerifier.create(sender.scheduleMessages(messages, instant))
.verifyErrorMatches(throwable -> {
assertTrue(throwable instanceof AmqpException);
assertSame(((AmqpException) throwable).getErrorCondition(), AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED);
return true;
});
.verifyError(IllegalArgumentException.class);

verify(managementNode, never()).schedule(any(), eq(instant), anyInt(), eq(LINK_NAME), isNull());
}



/**
* Verifies that sending multiple message will result in calling sender.send(MessageBatch, transaction).
*/
Expand Down

0 comments on commit 22d4719

Please sign in to comment.