Skip to content

Commit

Permalink
Sbtrack2 cancel schedule plural messages 14138 (#14585)
Browse files Browse the repository at this point in the history
* Continute to implement

* continue implementation

* Schedule and cancel schedule plural messages
* Removed API for cancelScheduledMessage using transaction.
  • Loading branch information
hemanttanwar authored Sep 30, 2020
1 parent 16355ee commit e982376
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -376,7 +375,7 @@ public ServiceBusMessage setTimeToLive(Duration timeToLive) {
public OffsetDateTime getScheduledEnqueueTime() {
Object value = amqpAnnotatedMessage.getMessageAnnotations().get(SCHEDULED_ENQUEUE_UTC_TIME_NAME.getValue());
return value != null
? ((Date) value).toInstant().atOffset(ZoneOffset.UTC)
? ((OffsetDateTime) value).toInstant().atOffset(ZoneOffset.UTC)
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -46,6 +47,7 @@
import static com.azure.core.amqp.implementation.RetryUtil.getRetryPolicy;
import static com.azure.core.amqp.implementation.RetryUtil.withRetry;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.fluxError;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
import static com.azure.core.util.tracing.Tracer.SPAN_CONTEXT_KEY;
Expand Down Expand Up @@ -326,6 +328,52 @@ public Mono<Long> scheduleMessage(ServiceBusMessage message, OffsetDateTime sche
return scheduleMessageInternal(message, scheduledEnqueueTime, null);
}

/**
* Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is
* enqueued and made available to receivers only at the scheduled enqueue time.
*
* @param messages Message to be sent to the Service Bus Queue.
* @param scheduledEnqueueTime OffsetDateTime at which the message should appear in the Service Bus queue or topic.
*
* @return The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
*
* @throws NullPointerException if {@code messages} or {@code scheduledEnqueueTime} is {@code null}.
*/
public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime) {
return scheduleMessages(messages, scheduledEnqueueTime, null);
}

/**
* Sends a scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled message is
* enqueued and made available to receivers only at the scheduled enqueue time.
*
* @param messages Message to be sent to the Service Bus Queue.
* @param scheduledEnqueueTime Instant at which the message should appear in the Service Bus queue or topic.
* @param transactionContext to be set on batch message before scheduling them on Service Bus.
*
* @return The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
*
* @throws NullPointerException if {@code message} or {@code scheduledEnqueueTime} is {@code null}.
*/
public Flux<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime,
ServiceBusTransactionContext transactionContext) {
if (Objects.isNull(messages)) {
return fluxError(logger, new NullPointerException("'messages' cannot be null."));
}

if (Objects.isNull(scheduledEnqueueTime)) {
return fluxError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null."));
}

return createBatch().flatMapMany(messageBatch -> {
messages.forEach(message -> messageBatch.tryAdd(message));
return getSendLink().flatMapMany(link -> connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMapMany(managementNode -> managementNode.schedule(messageBatch.getMessages(), scheduledEnqueueTime,
messageBatch.getMaxSizeInBytes(), link.getLinkName(), transactionContext)));
});
}

/**
* Cancels the enqueuing of an already scheduled message, if it was not already enqueued.
*
Expand All @@ -342,7 +390,33 @@ public Mono<Void> cancelScheduledMessage(long sequenceNumber) {

return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMap(managementNode -> managementNode.cancelScheduledMessage(sequenceNumber, linkName.get()));
.flatMap(managementNode -> managementNode.cancelScheduledMessages(Arrays.asList(sequenceNumber),
linkName.get()));
}

/**
* Cancels the enqueuing of an already scheduled message, if it was not already enqueued.
*
* @param sequenceNumbers of the scheduled messages to cancel.
*
* @return The {@link Mono} that finishes this operation on service bus resource.
*
* @throws NullPointerException if {@code sequenceNumbers} is null.
*/
public Mono<Void> cancelScheduledMessages(Iterable<Long> sequenceNumbers) {

if (isDisposed.get()) {
return monoError(logger, new IllegalStateException(
String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "cancelScheduledMessages")));
}

if (Objects.isNull(sequenceNumbers)) {
return monoError(logger, new NullPointerException("'messages' cannot be null."));
}

return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMap(managementNode -> managementNode.cancelScheduledMessages(sequenceNumbers, linkName.get()));
}

/**
Expand Down Expand Up @@ -426,6 +500,8 @@ private Mono<Void> sendIterable(Iterable<ServiceBusMessage> messages, ServiceBus
});
}



private Mono<Long> scheduleMessageInternal(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime,
ServiceBusTransactionContext transactionContext) {
if (Objects.isNull(message)) {
Expand All @@ -444,8 +520,9 @@ private Mono<Long> scheduleMessageInternal(ServiceBusMessage message, OffsetDate

return connectionProcessor
.flatMap(connection -> connection.getManagementNode(entityName, entityType))
.flatMap(managementNode -> managementNode.schedule(message, scheduledEnqueueTime, maxSize,
link.getLinkName(), transactionContext));
.flatMap(managementNode -> managementNode.schedule(Arrays.asList(message), scheduledEnqueueTime,
maxSize, link.getLinkName(), transactionContext)
.next());
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.IterableStream;
import com.azure.messaging.servicebus.models.CreateBatchOptions;

import java.time.Duration;
Expand Down Expand Up @@ -53,6 +54,17 @@ public void cancelScheduledMessage(long sequenceNumber) {
asyncClient.cancelScheduledMessage(sequenceNumber).block(tryTimeout);
}

/**
* Cancels the enqueuing of already scheduled messages, if they were not already enqueued.
*
* @param sequenceNumbers of the scheduled message to cancel.
*
* @throws NullPointerException if {@code sequenceNumbers} is null.
*/
public void cancelScheduledMessages(Iterable<Long> sequenceNumbers) {
asyncClient.cancelScheduledMessages(sequenceNumbers).block(tryTimeout);
}

/**
* Creates a {@link ServiceBusMessageBatch} that can fit as many messages as the transport allows.
*
Expand Down Expand Up @@ -205,6 +217,42 @@ public Long scheduleMessage(ServiceBusMessage message, OffsetDateTime scheduledE
return asyncClient.scheduleMessage(message, scheduledEnqueueTime, transactionContext).block(tryTimeout);
}


/**
* Sends a batch of scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled
* message is enqueued and made available to receivers only at the scheduled enqueue time.
*
* @param messages Message to be sent to the Service Bus Queue or Topic.
* @param scheduledEnqueueTime Instant at which the message should appear in the Service Bus queue or topic.
*
* @return The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
*
* @throws NullPointerException if {@code messages}, {@code scheduledEnqueueTime}, {@code transactionContext} or
* {@code transactionContext.transactionId} is {@code null}.
*/
public Iterable<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime) {
return new IterableStream<>(asyncClient.scheduleMessages(messages, scheduledEnqueueTime));
}


/**
* Sends a batch of scheduled messages to the Azure Service Bus entity this sender is connected to. A scheduled
* message is enqueued and made available to receivers only at the scheduled enqueue time.
*
* @param messages Messages to be sent to the Service Bus Queue or Topic.
* @param scheduledEnqueueTime Instant at which the message should appear in the Service Bus queue or topic.
* @param transactionContext to be set on message before sending to Service Bus.
*
* @return The sequence number of the scheduled message which can be used to cancel the scheduling of the message.
*
* @throws NullPointerException if {@code messages}, {@code scheduledEnqueueTime}, {@code transactionContext} or
* {@code transactionContext.transactionId} is {@code null}.
*/
public Iterable<Long> scheduleMessages(Iterable<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime,
ServiceBusTransactionContext transactionContext) {
return new IterableStream<>(asyncClient.scheduleMessages(messages, scheduledEnqueueTime, transactionContext));
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Objects;
import java.util.UUID;

import static com.azure.core.util.FluxUtil.fluxError;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_GET_SESSION_STATE;
import static com.azure.messaging.servicebus.implementation.ManagementConstants.OPERATION_PEEK;
Expand Down Expand Up @@ -87,14 +88,17 @@ public class ManagementChannel implements ServiceBusManagementNode {
* {@inheritDoc}
*/
@Override
public Mono<Void> cancelScheduledMessage(long sequenceNumber, String associatedLinkName) {
public Mono<Void> cancelScheduledMessages(Iterable<Long> sequenceNumbers, String associatedLinkName) {
return isAuthorized(ManagementConstants.OPERATION_CANCEL_SCHEDULED_MESSAGE)
.then(createChannel.flatMap(channel -> {
final Message requestMessage = createManagementMessage(
ManagementConstants.OPERATION_CANCEL_SCHEDULED_MESSAGE, associatedLinkName);

final List<Long> numbers = new ArrayList<>();
sequenceNumbers.forEach(s -> numbers.add(s));
final Long[] longs = numbers.toArray(new Long[0]);
requestMessage.setBody(new AmqpValue(Collections.singletonMap(ManagementConstants.SEQUENCE_NUMBERS,
new Long[]{sequenceNumber})));
longs)));

return sendWithVerify(channel, requestMessage, null);
})).then();
Expand Down Expand Up @@ -286,53 +290,57 @@ public Mono<OffsetDateTime> renewSessionLock(String sessionId, String associated
* {@inheritDoc}
*/
@Override
public Mono<Long> schedule(ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime, int maxLinkSize,
String associatedLinkName, ServiceBusTransactionContext transactionContext) {
message.setScheduledEnqueueTime(scheduledEnqueueTime);

return isAuthorized(OPERATION_SCHEDULE_MESSAGE).then(createChannel.flatMap(channel -> {
// Serialize the request.
final Message amqpMessage = messageSerializer.serialize(message);

// The maxsize allowed logic is from ReactorSender, this logic should be kept in sync.
final int payloadSize = messageSerializer.getSize(amqpMessage);
final int allocationSize =
Math.min(payloadSize + ManagementConstants.MAX_MESSAGING_AMQP_HEADER_SIZE_BYTES, maxLinkSize);
final byte[] bytes = new byte[allocationSize];

int encodedSize;
try {
encodedSize = amqpMessage.encode(bytes, 0, allocationSize);
} catch (BufferOverflowException exception) {
final String errorMessage = String.format(
"Error sending. Size of the payload exceeded maximum message size: %s kb", maxLinkSize / 1024);
final AmqpErrorContext errorContext = channel.getErrorContext();

return monoError(logger, Exceptions.propagate(new AmqpException(false,
AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, errorMessage, exception, errorContext)));
}
public Flux<Long> schedule(List<ServiceBusMessage> messages, OffsetDateTime scheduledEnqueueTime,
int maxLinkSize, String associatedLinkName, ServiceBusTransactionContext transactionContext) {

final Map<String, Object> messageEntry = new HashMap<>();
messageEntry.put(ManagementConstants.MESSAGE, new Binary(bytes, 0, encodedSize));
messageEntry.put(ManagementConstants.MESSAGE_ID, amqpMessage.getMessageId());
return isAuthorized(OPERATION_SCHEDULE_MESSAGE).thenMany(createChannel.flatMap(channel -> {

final String sessionId = amqpMessage.getGroupId();
if (!CoreUtils.isNullOrEmpty(sessionId)) {
messageEntry.put(ManagementConstants.SESSION_ID, sessionId);
}
final Collection<Map<String, Object>> messageList = new LinkedList<>();

final String partitionKey = message.getPartitionKey();
if (!CoreUtils.isNullOrEmpty(partitionKey)) {
messageEntry.put(ManagementConstants.PARTITION_KEY, partitionKey);
}
for (ServiceBusMessage message : messages) {
message.setScheduledEnqueueTime(scheduledEnqueueTime);
// Serialize the request.
final Message amqpMessage = messageSerializer.serialize(message);

// The maxsize allowed logic is from ReactorSender, this logic should be kept in sync.
final int payloadSize = messageSerializer.getSize(amqpMessage);
final int allocationSize =
Math.min(payloadSize + ManagementConstants.MAX_MESSAGING_AMQP_HEADER_SIZE_BYTES, maxLinkSize);
final byte[] bytes = new byte[allocationSize];

int encodedSize;
try {
encodedSize = amqpMessage.encode(bytes, 0, allocationSize);
} catch (BufferOverflowException exception) {
final String errorMessage = String.format(
"Error sending. Size of the payload exceeded maximum message size: %s kb", maxLinkSize / 1024);
final AmqpErrorContext errorContext = channel.getErrorContext();

return monoError(logger, Exceptions.propagate(new AmqpException(false,
AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, errorMessage, exception, errorContext)));
}

final String viaPartitionKey = message.getViaPartitionKey();
if (!CoreUtils.isNullOrEmpty(viaPartitionKey)) {
messageEntry.put(ManagementConstants.VIA_PARTITION_KEY, viaPartitionKey);
}
final Map<String, Object> messageEntry = new HashMap<>();
messageEntry.put(ManagementConstants.MESSAGE, new Binary(bytes, 0, encodedSize));
messageEntry.put(ManagementConstants.MESSAGE_ID, amqpMessage.getMessageId());

final Collection<Map<String, Object>> messageList = new LinkedList<>();
messageList.add(messageEntry);
final String sessionId = amqpMessage.getGroupId();
if (!CoreUtils.isNullOrEmpty(sessionId)) {
messageEntry.put(ManagementConstants.SESSION_ID, sessionId);
}

final String partitionKey = message.getPartitionKey();
if (!CoreUtils.isNullOrEmpty(partitionKey)) {
messageEntry.put(ManagementConstants.PARTITION_KEY, partitionKey);
}

final String viaPartitionKey = message.getViaPartitionKey();
if (!CoreUtils.isNullOrEmpty(viaPartitionKey)) {
messageEntry.put(ManagementConstants.VIA_PARTITION_KEY, viaPartitionKey);
}

messageList.add(messageEntry);
}

final Map<String, Object> requestBodyMap = new HashMap<>();
requestBodyMap.put(ManagementConstants.MESSAGES, messageList);
Expand All @@ -348,17 +356,15 @@ public Mono<Long> schedule(ServiceBusMessage message, OffsetDateTime scheduledEn
}

return sendWithVerify(channel, requestMessage, transactionalState);
}).handle((response, sink) -> {
final List<Long> sequenceNumbers = messageSerializer.deserializeList(response, Long.class);

if (CoreUtils.isNullOrEmpty(sequenceNumbers)) {
sink.error(logger.logExceptionAsError(new AmqpException(false, String.format(
"Service Bus response was empty. Could not schedule message with message id: '%s'.",
message.getMessageId()), getErrorContext())));
} else {
sink.next(sequenceNumbers.get(0));
}
}));
})
.flatMapMany(response -> {
final List<Long> sequenceNumbers = messageSerializer.deserializeList(response, Long.class);
if (CoreUtils.isNullOrEmpty(sequenceNumbers)) {
fluxError(logger, new AmqpException(false, String.format(
"Service Bus response was empty. Could not schedule message()s."), getErrorContext()));
}
return Flux.fromIterable(sequenceNumbers);
}));
}

@Override
Expand Down
Loading

0 comments on commit e982376

Please sign in to comment.