Skip to content

Commit

Permalink
Implementation of ReceiveBySequenceNumber and Complete, Abandon, Defe…
Browse files Browse the repository at this point in the history
…r, DeadLetter on messages receive by sequence number (#9)

* Chaning directory structure to align maven standard directory structure. Also changes to pom.xml files to align Maven standard build process like running tests.

* Removing an unwanted file that was accidentally checked in.

* Implemented Peek and PeekBatch functionality. Added some tests.

* Implementation of ReceiveBySequenceNumber and Complete,Abandon,Defer,DeadLetter of messages received by sequence numbers.
  • Loading branch information
yvgopal authored Feb 15, 2017
1 parent 7b4b251 commit 4100a1c
Show file tree
Hide file tree
Showing 13 changed files with 600 additions and 104 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,53 @@
import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import com.microsoft.azure.servicebus.primitives.ServiceBusException;

public interface IMessageReceiver extends IMessageEntity{
ReceiveMode getReceiveMode();

void abandon(IBrokeredMessage message) throws InterruptedException, ServiceBusException;
void abandon(UUID lockToken) throws InterruptedException, ServiceBusException;

void abandon(IBrokeredMessage message, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;
void abandon(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;

CompletableFuture<Void> abandonAsync(IBrokeredMessage message);
CompletableFuture<Void> abandonAsync(UUID lockToken);

CompletableFuture<Void> abandonAsync(IBrokeredMessage message, Map<String, Object> propertiesToModify);
CompletableFuture<Void> abandonAsync(UUID lockToken, Map<String, Object> propertiesToModify);

void complete(IBrokeredMessage message) throws InterruptedException, ServiceBusException;
void complete(UUID lockToken) throws InterruptedException, ServiceBusException;

void completeBatch(Collection<? extends IBrokeredMessage> messages);

CompletableFuture<Void> completeAsync(IBrokeredMessage message);
CompletableFuture<Void> completeAsync(UUID lockToken);

CompletableFuture<Void> completeBatchAsync(Collection<? extends IBrokeredMessage> messages);

void defer(IBrokeredMessage message) throws InterruptedException, ServiceBusException;
void defer(UUID lockToken) throws InterruptedException, ServiceBusException;

void defer(IBrokeredMessage message, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;
void defer(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;

CompletableFuture<Void> deferAsync(IBrokeredMessage message);
CompletableFuture<Void> deferAsync(UUID lockToken);

CompletableFuture<Void> deferAsync(IBrokeredMessage message, Map<String, Object> propertiesToModify);
CompletableFuture<Void> deferAsync(UUID lockToken, Map<String, Object> propertiesToModify);

void deadLetter(IBrokeredMessage message) throws InterruptedException, ServiceBusException;
void deadLetter(UUID lockToken) throws InterruptedException, ServiceBusException;

void deadLetter(IBrokeredMessage message, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;
void deadLetter(UUID lockToken, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;

void deadLetter(IBrokeredMessage message, String deadLetterReason, String deadLetterErrorDescription) throws InterruptedException, ServiceBusException;
void deadLetter(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription) throws InterruptedException, ServiceBusException;

void deadLetter(IBrokeredMessage message, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;
void deadLetter(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify) throws InterruptedException, ServiceBusException;

CompletableFuture<Void> deadLetterAsync(IBrokeredMessage message);
CompletableFuture<Void> deadLetterAsync(UUID lockToken);

CompletableFuture<Void> deadLetterAsync(IBrokeredMessage message, Map<String, Object> propertiesToModify);
CompletableFuture<Void> deadLetterAsync(UUID lockToken, Map<String, Object> propertiesToModify);

CompletableFuture<Void> deadLetterAsync(IBrokeredMessage message, String deadLetterReason, String deadLetterErrorDescription);
CompletableFuture<Void> deadLetterAsync(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription);

CompletableFuture<Void> deadLetterAsync(IBrokeredMessage message, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify);
CompletableFuture<Void> deadLetterAsync(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify);

IBrokeredMessage receive() throws InterruptedException, ServiceBusException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

import com.microsoft.azure.servicebus.primitives.ClientConstants;
import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag;
import com.microsoft.azure.servicebus.primitives.MessageWithLockToken;
import com.microsoft.azure.servicebus.primitives.StringUtil;
import com.microsoft.azure.servicebus.primitives.Util;

class MessageConverter
public class MessageConverter
{
public static Message convertBrokeredMessageToAmqpMessage(BrokeredMessage brokeredMessage)
{
Expand Down Expand Up @@ -67,7 +68,7 @@ public static Message convertBrokeredMessageToAmqpMessage(BrokeredMessage broker

public static BrokeredMessage convertAmqpMessageToBrokeredMessage(Message amqpMessage)
{
return convertAmqpMessageToBrokeredMessage(amqpMessage, null);
return convertAmqpMessageToBrokeredMessage(amqpMessage, (byte[])null);
}

public static BrokeredMessage convertAmqpMessageToBrokeredMessage(MessageWithDeliveryTag amqpMessageWithDeliveryTag)
Expand All @@ -76,6 +77,13 @@ public static BrokeredMessage convertAmqpMessageToBrokeredMessage(MessageWithDel
byte[] deliveryTag = amqpMessageWithDeliveryTag.getDeliveryTag();
return convertAmqpMessageToBrokeredMessage(amqpMessage, deliveryTag);
}

public static BrokeredMessage convertAmqpMessageToBrokeredMessage(MessageWithLockToken amqpMessageWithLockToken)
{
BrokeredMessage convertedMessage = convertAmqpMessageToBrokeredMessage(amqpMessageWithLockToken.getMessage(), (byte[])null);
convertedMessage.setLockToken(amqpMessageWithLockToken.getLockToken());
return convertedMessage;
}

public static BrokeredMessage convertAmqpMessageToBrokeredMessage(Message amqpMessage, byte[] deliveryTag)
{
Expand Down Expand Up @@ -175,5 +183,5 @@ public static BrokeredMessage convertAmqpMessageToBrokeredMessage(Message amqpMe
brokeredMessage.setDeliveryTag(deliveryTag);

return brokeredMessage;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,17 @@ private ClientConstants() { }
public static final String REQUEST_RESPONSE_OPERATION_NAME = "operation";
public static final String REQUEST_RESPONSE_TIMEOUT = AmqpConstants.VENDOR + ":server-timeout";
public static final String REQUEST_RESPONSE_RENEWLOCK_OPERATION = AmqpConstants.VENDOR + ":renew-lock";
public static final String REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER = AmqpConstants.VENDOR + ":receive-by-sequence-number";
public static final String REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION = AmqpConstants.VENDOR + ":schedule-message";
public static final String REQUEST_RESPONSE_CANCEL_CHEDULE_MESSAGE_OPERATION = AmqpConstants.VENDOR + ":cancel-scheduled-message";
public static final String REQUEST_RESPONSE_PEEK_OPERATION = AmqpConstants.VENDOR + ":peek-message";
public static final String REQUEST_RESPONSE_UPDATE_DISPOSTION = AmqpConstants.VENDOR + ":update-disposition";
public static final String REQUEST_RESPONSE_LOCKTOKENS = "lock-tokens";
public static final String REQUEST_RESPONSE_LOCKTOKEN = "lock-token";
public static final String REQUEST_RESPONSE_EXPIRATIONS = "expirations";
public static final String REQUEST_RESPONSE_SESSIONID = "session-id";
public static final String REQUEST_RESPONSE_SEQUENCE_NUMBERS = "sequence-numbers";
public static final String REQUEST_RESPONSE_RECEIVER_SETTLE_MODE = "receiver-settle-mode";
public static final String REQUEST_RESPONSE_MESSAGES = "messages";
public static final String REQUEST_RESPONSE_MESSAGE = "message";
public static final String REQUEST_RESPONSE_MESSAGE_ID = "message-id";
Expand All @@ -88,6 +92,17 @@ private ClientConstants() { }
public static final String REQUEST_RESPONSE_STATUS_CODE = "statusCode";
public static final String REQUEST_RESPONSE_STATUS_DESCRIPTION = "statusDescription";
public static final String REQUEST_RESPONSE_ERROR_CONDITION = "errorCondition";
public static final String REQUEST_RESPONSE_DISPOSITION_STATUS = "disposition-status";
public static final String REQUEST_RESPONSE_DEADLETTER_REASON = "deadletter-reason";
public static final String REQUEST_RESPONSE_DEADLETTER_DESCRIPTION = "deadletter-description";
public static final String REQUEST_RESPONSE_PROPERTIES_TO_MODIFY = "properties-to-modify";

public static final String DISPOSITION_STATUS_COMPLETED = "completed";
public static final String DISPOSITION_STATUS_DEFERED = "defered";
public static final String DISPOSITION_STATUS_SUSPENDED = "suspended";
public static final String DISPOSITION_STATUS_ABANDONED = "abandoned";
// public static final String DISPOSITION_STATUS_RENEWED = "renewed";
// public static final String DISPOSITION_STATUS_UNLOCKED = "unlocked";

public static final int REQUEST_RESPONSE_OK_STATUS_CODE = 200;
public static final int REQUEST_RESPONSE_NOCONTENT_STATUS_CODE = 0xcc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
Expand All @@ -40,12 +41,18 @@
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;

import com.microsoft.azure.servicebus.BrokeredMessage;
import com.microsoft.azure.servicebus.IBrokeredMessage;
import com.microsoft.azure.servicebus.MessageConverter;
import com.microsoft.azure.servicebus.amqp.AmqpConstants;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
Expand Down Expand Up @@ -736,6 +743,11 @@ public CompletableFuture<Void> completeMessageAsync(byte[] deliveryTag)
return this.updateMessageStateAsync(deliveryTag, outcome);
}

public CompletableFuture<Void> completeMessageAsync(UUID lockToken)
{
return this.updateDispositionAsync(new UUID[]{lockToken}, ClientConstants.DISPOSITION_STATUS_COMPLETED, null, null, null);
}

public CompletableFuture<Void> abandonMessageAsync(byte[] deliveryTag, Map<String, Object> propertiesToModify)
{
Modified outcome = new Modified();
Expand All @@ -746,6 +758,11 @@ public CompletableFuture<Void> abandonMessageAsync(byte[] deliveryTag, Map<Strin
return this.updateMessageStateAsync(deliveryTag, outcome);
}

public CompletableFuture<Void> abandonMessageAsync(UUID lockToken, Map<String, Object> propertiesToModify)
{
return this.updateDispositionAsync(new UUID[]{lockToken}, ClientConstants.DISPOSITION_STATUS_ABANDONED, null, null, propertiesToModify);
}

public CompletableFuture<Void> deferMessageAsync(byte[] deliveryTag, Map<String, Object> propertiesToModify)
{
Modified outcome = new Modified();
Expand All @@ -757,6 +774,11 @@ public CompletableFuture<Void> deferMessageAsync(byte[] deliveryTag, Map<String,
return this.updateMessageStateAsync(deliveryTag, outcome);
}

public CompletableFuture<Void> deferMessageAsync(UUID lockToken, Map<String, Object> propertiesToModify)
{
return this.updateDispositionAsync(new UUID[]{lockToken}, ClientConstants.DISPOSITION_STATUS_DEFERED, null, null, propertiesToModify);
}

public CompletableFuture<Void> deadLetterMessageAsync(byte[] deliveryTag, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify)
{
Rejected outcome = new Rejected();
Expand All @@ -780,6 +802,11 @@ public CompletableFuture<Void> deadLetterMessageAsync(byte[] deliveryTag, String
return this.updateMessageStateAsync(deliveryTag, outcome);
}

public CompletableFuture<Void> deadLetterMessageAsync(UUID lockToken, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify)
{
return this.updateDispositionAsync(new UUID[]{lockToken}, ClientConstants.DISPOSITION_STATUS_SUSPENDED, deadLetterReason, deadLetterErrorDescription, propertiesToModify);
}

private CompletableFuture<Void> updateMessageStateAsync(byte[] deliveryTag, Outcome outcome)
{
this.throwIfClosed(this.lastKnownLinkError);
Expand Down Expand Up @@ -911,4 +938,92 @@ public CompletableFuture<Collection<Instant>> renewMessageLocksAsync(UUID[] lock
return returningFuture;
});
}

public CompletableFuture<Collection<MessageWithLockToken>> receiveBySequenceNumbersAsync(Long[] sequenceNumbers)
{
HashMap requestBodyMap = new HashMap();
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, sequenceNumbers);
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_RECEIVER_SETTLE_MODE, UnsignedInteger.valueOf(this.settleModePair.getReceiverSettleMode() == ReceiverSettleMode.FIRST ? 0 : 1));

Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, requestBodyMap, RequestResponseUtils.adjustServerTimeout(this.operationTimeout));
CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout);
return responseFuture.thenCompose((responseMessage) -> {
CompletableFuture<Collection<MessageWithLockToken>> returningFuture = new CompletableFuture<Collection<MessageWithLockToken>>();
int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE)
{
List<MessageWithLockToken> receivedMessages = new ArrayList<MessageWithLockToken>();
Object responseBodyMap = ((AmqpValue)responseMessage.getBody()).getValue();
if(responseBodyMap != null && responseBodyMap instanceof Map)
{
Object messages = ((Map)responseBodyMap).get(ClientConstants.REQUEST_RESPONSE_MESSAGES);
if(messages != null && messages instanceof Iterable)
{
for(Object message : (Iterable)messages)
{
if(message instanceof Map)
{
Message receivedMessage = Message.Factory.create();
Binary messagePayLoad = (Binary)((Map)message).get(ClientConstants.REQUEST_RESPONSE_MESSAGE);
receivedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength());
UUID lockToken = ClientConstants.ZEROLOCKTOKEN;
if(((Map)message).containsKey(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN))
{
lockToken = (UUID)((Map)message).get(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN);
}

receivedMessages.add(new MessageWithLockToken(receivedMessage, lockToken));
}
}
}
}
returningFuture.complete(receivedMessages);
}
else
{
// error response
returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage));
}
return returningFuture;
});
}

public CompletableFuture<Void> updateDispositionAsync(UUID[] lockTokens, String dispositionStatus, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify)
{
HashMap requestBodyMap = new HashMap();
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, lockTokens);
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DISPOSITION_STATUS, dispositionStatus);

if(deadLetterReason != null)
{
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_REASON, deadLetterReason);
}

if(deadLetterErrorDescription != null)
{
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_DESCRIPTION, deadLetterErrorDescription);
}

if(propertiesToModify != null && propertiesToModify.size() > 0)
{
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_PROPERTIES_TO_MODIFY, propertiesToModify);
}

Message requestMessage = RequestResponseUtils.createRequestMessage(ClientConstants.REQUEST_RESPONSE_UPDATE_DISPOSTION, requestBodyMap, RequestResponseUtils.adjustServerTimeout(this.operationTimeout));
CompletableFuture<Message> responseFuture = this.requestResponseLink.requestAysnc(requestMessage, this.operationTimeout);
return responseFuture.thenCompose((responseMessage) -> {
CompletableFuture<Void> returningFuture = new CompletableFuture<Void>();
int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE)
{
returningFuture.complete(null);
}
else
{
// error response
returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage));
}
return returningFuture;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.microsoft.azure.servicebus.primitives;

import java.util.UUID;

import org.apache.qpid.proton.message.Message;

public class MessageWithLockToken {
private final Message message;
private final UUID lockToken;

public MessageWithLockToken(Message message, UUID lockToken)
{
this.message = message;
this.lockToken = lockToken;
}

public Message getMessage() {
return message;
}

public UUID getLockToken() {
return lockToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static Message createRequestMessage(String operation, Map propertyBag, Du
// Pass one second less to the server so client doesn't time out before server times out
public static Duration adjustServerTimeout(Duration clientTimeout)
{
return clientTimeout.minusSeconds(1);
return clientTimeout.minusMillis(100);
}

public static int getResponseStatusCode(Message responseMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ public class ServiceBusException extends Exception
private boolean isTransient;
private ErrorContext errorContext;

ServiceBusException(final boolean isTransient)
public ServiceBusException(final boolean isTransient)
{
super();
this.isTransient = isTransient;
}

ServiceBusException(final boolean isTransient, final String message)
public ServiceBusException(final boolean isTransient, final String message)
{
super(message);
this.isTransient = isTransient;
Expand All @@ -34,7 +34,7 @@ public ServiceBusException(final boolean isTransient, final Throwable cause)
this.isTransient = isTransient;
}

ServiceBusException(final boolean isTransient, final String message, final Throwable cause)
public ServiceBusException(final boolean isTransient, final String message, final Throwable cause)
{
super(message, cause);
this.isTransient = isTransient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* An abstraction for a Scheduler functionality - which can later be replaced by a light-weight Thread
*/
final class Timer
final public class Timer
{
private static ScheduledThreadPoolExecutor executor = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
package com.microsoft.azure.servicebus.primitives;

enum TimerType
public enum TimerType
{
OneTimeRun,
RepeatRun
Expand Down
Loading

0 comments on commit 4100a1c

Please sign in to comment.