From 2edad13482989ad7fcce72eb5883a8dff674aa6b Mon Sep 17 00:00:00 2001 From: Vijaya Gopal Yarramneni Date: Thu, 20 Dec 2018 12:56:58 -0800 Subject: [PATCH] Merging message body fix, hostname verification fix and bug fixes from master branch (#315) * Test improvements (#309) * Adding a large message test. Adding duplicate tests for partitioned entities. Minor tweaks to make message handler tests more reliable. * Invalid merge. * Removed merge conflict files * Some test fixes. * Adding ARM deployment template for CI (#218) build/azuredeploy.json * Test reliability fixes. And RequestResponseLimitTest is modified to drastically reduce its runtime. * More test reliability fixes. * More tweaks. # Conflicts: # azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java # azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilderTests.java * Adding retry to RequestResponse link when idle connection is closed. (#305) * Adding retry to RequestResponse link when idle connection is closed. Otherwise CBS token sending will fail if a SEND request comes in right at the time of connnection closing. * Minor tweaks to fix 15 minute timeout bug. * Another fix to handle connection while the links are being opened. * Another fix to retry sending SAS token if it times out. There are cases when client sending SAS token and server closing the connection happen at the same time and response never arrives. * More fixes to request-response link to retry on connection close. * A test fix. # Conflicts: # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java * Adding support for multiple message body types. (#296) * Adding support for multiple message body types. Deprecated getBody and setBody methods. * Changes to MessageBody class API to support AMQP spec of multiple data sections or sequence sections in a message. * Adding a minor null check. * Change to handle Messages constructed with string body as binary messages to preserve backward compatibility. * Adding message body tests. # Conflicts: # azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java # azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java # azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java * Merging the fix for man-in-the-middle vulnerability from master branch. --- azure-servicebus/pom.xml | 11 +- .../microsoft/azure/servicebus/IMessage.java | 23 +- .../servicebus/IMessageAndSessionPump.java | 2 +- .../microsoft/azure/servicebus/Message.java | 118 ++++++-- .../azure/servicebus/MessageBody.java | 112 ++++++++ .../azure/servicebus/MessageBodyType.java | 20 ++ .../azure/servicebus/MessageConverter.java | 36 ++- .../azure/servicebus/MessageReceiver.java | 5 + .../com/microsoft/azure/servicebus/Utils.java | 42 +++ .../azure/servicebus/amqp/AmqpErrorCode.java | 2 +- .../servicebus/amqp/ConnectionHandler.java | 44 +-- .../servicebus/amqp/IAmqpConnection.java | 2 + .../amqp/ProxyConnectionHandler.java | 4 +- .../servicebus/amqp/StrictTLSContext.java | 14 + .../servicebus/amqp/StrictTLSContextSpi.java | 94 +++++++ .../primitives/ClientConstants.java | 2 +- .../primitives/CoreMessageReceiver.java | 90 +++--- .../primitives/CoreMessageSender.java | 8 +- .../servicebus/primitives/ExceptionUtil.java | 10 +- .../primitives/MessagingFactory.java | 106 ++++--- .../primitives/RequestResponseLink.java | 243 ++++++++-------- .../azure/servicebus/primitives/Util.java | 4 +- .../servicebus/ClientValidationTests.java | 4 +- .../MessageAndSessionPumpTests.java | 8 +- .../azure/servicebus/MessageBodyTests.java | 171 ++++++++++++ .../PartitionedQueueClientSessionTests.java | 18 ++ .../PartitionedQueueClientTests.java | 18 ++ .../PartitionedQueueSendReceiveTests.java | 18 ++ .../PartitionedQueueSessionTests.java | 19 ++ ...itionedSubscriptionClientSessionTests.java | 18 ++ .../PartitionedSubscriptionClientTests.java | 18 ++ .../PartitionedTopicSendReceiveTests.java | 19 ++ .../PartitionedTopicSessionTests.java | 19 ++ .../azure/servicebus/SendReceiveTests.java | 39 ++- .../azure/servicebus/SessionTests.java | 52 +++- .../SubscriptionClientSessionTests.java | 2 +- .../azure/servicebus/TestCommons.java | 260 ++++++++++++++---- .../ConnectionStringBuilderTests.java | 4 +- pom.xml | 2 +- 39 files changed, 1320 insertions(+), 361 deletions(-) create mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java create mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBodyType.java create mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContext.java create mode 100644 azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContextSpi.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageBodyTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientSessionTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSendReceiveTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSessionTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientSessionTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSendReceiveTests.java create mode 100644 azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSessionTests.java diff --git a/azure-servicebus/pom.xml b/azure-servicebus/pom.xml index 265ec670924b4..d25ed1a356188 100644 --- a/azure-servicebus/pom.xml +++ b/azure-servicebus/pom.xml @@ -41,6 +41,10 @@ org.apache.maven.plugins maven-surefire-plugin 2.20 + + 0 + true + @@ -51,17 +55,12 @@ proton-j ${proton-j-version} - + com.microsoft.azure qpid-proton-j-extensions 1.1.0 - org.bouncycastle - bcpkix-jdk15on - 1.53 - - com.microsoft.azure adal4j 1.3.0 diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java index 0a7639bc1216a..4aed5611195c6 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java @@ -75,7 +75,7 @@ public interface IMessage { * Gets the content type of this message. * * Optionally describes the payload of the message, with a descriptor following the format of - * RFC2045, Section 5, for example "application/json". + * RFC2045, Section 5, for example "application/json". Note that content type is not same as message body type. * * @return content type of this message */ @@ -213,7 +213,9 @@ public interface IMessage { * * @return body of this message * @see Messages, payloads, and serialization + * @deprecated Message body need not just a byte array. Replaced by {@link #getMessageBody()} */ + @Deprecated public byte[] getBody(); /** @@ -221,9 +223,28 @@ public interface IMessage { * * @param body body of this message * @see #getBody() + * @deprecated Message body need not just a byte array. Replaced by {@link #setMessageBody(MessageBody)} */ + @Deprecated public void setBody(byte[] body); + /** + * Gets the body of this message. Client applications should extract message content based on body type. + * + * @return body of this message + * @see Messages, payloads, and serialization + */ + public MessageBody getMessageBody(); + + /** + * Sets the body of this message. + * + * @param body body of this message + * @see #getMessageBody() + */ + public void setMessageBody(MessageBody body); + + /** * Gets the map of user application properties of this message. Client * applications can set user properties (headers) on the message using this map. diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java index f1d0bee507572..64197d5d8bb28 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java @@ -34,7 +34,7 @@ interface IMessageAndSessionPump { * IMessageHandler methods are executed on the passed executor service. * * @param handler The {@link IMessageHandler} instance - * @param executorService ExecutorService which is used to execute {@link IMessageHandler} methods. If there are + * @param executorService ExecutorService which is used to execute {@link IMessageHandler} methods. * @throws InterruptedException if the current thread was interrupted while waiting * @throws ServiceBusException if register failed */ diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java index ddc07605b62ca..2daabef60d931 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java @@ -11,19 +11,12 @@ import java.util.Map; import java.util.UUID; - -/** - * - * - */ final public class Message implements Serializable, IMessage { private static final long serialVersionUID = 7849508139219590863L; - private static final Charset DEFAULT_CHAR_SET = Charset.forName("UTF-8"); - private static final String DEFAULT_CONTENT_TYPE = null; - private static final byte[] DEFAULT_CONTENT = new byte[0]; + private static final MessageBody DEFAULT_CONTENT = Utils.fromBinay(new byte[0]); private long deliveryCount; @@ -31,7 +24,7 @@ final public class Message implements Serializable, IMessage { private Duration timeToLive; - private byte[] content; + private MessageBody messageBody; private String contentType; @@ -67,40 +60,103 @@ final public class Message implements Serializable, IMessage { private byte[] deliveryTag; + /** + * Creates an empty message with an empty byte array as body. + */ public Message() { this(DEFAULT_CONTENT); } + /** + * Creates a message from a string. For backward compatibility reasons, the string is converted to a byte array and message body type is set to binary. + * @param content content of the message. + */ public Message(String content) { this(content.getBytes(DEFAULT_CHAR_SET)); } + /** + * Creates a message from a byte array. Message body type is set to binary. + * @param content content of the message + */ public Message(byte[] content) { - this(content, DEFAULT_CONTENT_TYPE); + this(Utils.fromBinay(content)); + } + + /** + * Creates a message from message body. + * @param body message body + */ + public Message(MessageBody body) + { + this(body, DEFAULT_CONTENT_TYPE); } + /** + * Creates a message from a string. For backward compatibility reasons, the string is converted to a byte array and message body type is set to binary. + * @param content content of the message + * @param contentType content type of the message + */ public Message(String content, String contentType) { this(content.getBytes(DEFAULT_CHAR_SET), contentType); } + /** + * Creates a message from a byte array. Message body type is set to binary. + * @param content content of the message + * @param contentType content type of the message + */ public Message(byte[] content, String contentType) { - this(UUID.randomUUID().toString(), content, contentType); + this(Utils.fromBinay(content), contentType); } + /** + * Creates a message from message body. + * @param body message body + * @param contentType content type of the message + */ + public Message(MessageBody body, String contentType) + { + this(UUID.randomUUID().toString(), body, contentType); + } + + /** + * Creates a message from a string. For backward compatibility reasons, the string is converted to a byte array and message body type is set to binary. + * @param messageId id of the message + * @param content content of the message + * @param contentType content type of the message + */ public Message(String messageId, String content, String contentType) { this(messageId, content.getBytes(DEFAULT_CHAR_SET), contentType); } - + + /** + * Creates a message from a byte array. Message body type is set to binary. + * @param messageId id of the message + * @param content content of the message + * @param contentType content type of the message + */ public Message(String messageId, byte[] content, String contentType) + { + this(messageId, Utils.fromBinay(content), contentType); + } + + /** + * Creates a message from message body. + * @param messageId id of the message + * @param body message body + * @param contentType content type of the message + */ + public Message(String messageId, MessageBody body, String contentType) { this.messageId = messageId; - this.content = content; + this.messageBody = body; this.contentType = contentType; this.properties = new HashMap<>(); } @@ -186,16 +242,6 @@ public void setSessionId(String sessionId) { this.sessionId = sessionId; } - @Override - public byte[] getBody() { - return this.content; - } - - @Override - public void setBody(byte[] content) { - this.content = content; - } - @Override public Map getProperties() { return this.properties; @@ -325,4 +371,30 @@ void setDeliveryTag(byte[] deliveryTag) { this.deliveryTag = deliveryTag; } + + @Override + @Deprecated + public byte[] getBody() + { + return Utils.getDataFromMessageBody(this.messageBody); + } + + @Override + @Deprecated + public void setBody(byte[] body) + { + this.messageBody = Utils.fromBinay(body); + } + + @Override + public MessageBody getMessageBody() + { + return this.messageBody; + } + + @Override + public void setMessageBody(MessageBody body) + { + this.messageBody = body; + } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java new file mode 100644 index 0000000000000..fb697f3b79732 --- /dev/null +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java @@ -0,0 +1,112 @@ +package com.microsoft.azure.servicebus; + +import java.util.List; + +/** + * This class encapsulates the body of a message. Body types map to AMQP message body types. + * It has getters and setters for multiple body types. + * Client should test for body type before calling corresponding get method. + * Get methods not corresponding to the type of the body return null. + */ +public class MessageBody { + private MessageBodyType bodyType; + private Object valueData; + private List> sequenceData; + private List binaryData; + + private MessageBody() {} + + /** + * Creates message body of AMQPValue type. + * @param value AMQPValue content of the message. It must be of a type supported by AMQP. + * @return MessageBody instance wrapping around the value data. + */ + public static MessageBody fromValueData(Object value) + { + if(value == null) + { + throw new IllegalArgumentException("Value data is null."); + } + + MessageBody body = new MessageBody(); + body.bodyType = MessageBodyType.VALUE; + body.valueData = value; + body.sequenceData = null; + body.binaryData = null; + return body; + } + + /** + * Creates a message body from a list of AMQPSequence sections.Each AMQPSequence section is in turn a list of objects. + * Please note that this version of the SDK supports only one AMQPSequence section in a message. It means only a list of exactly one sequence in it is accepted as message body. + * @param sequenceData a list of AMQPSequence sections. Each AMQPSequence section is in turn a list of objects. Every object in each list must of a type supported by AMQP. + * @return MessageBody instance wrapping around the sequence data. + */ + public static MessageBody fromSequenceData(List> sequenceData) + { + if(sequenceData == null || sequenceData.size() == 0 || sequenceData.size() > 1) + { + throw new IllegalArgumentException("Sequence data is null or has more than one collection in it."); + } + + MessageBody body = new MessageBody(); + body.bodyType = MessageBodyType.SEQUENCE; + body.valueData = null; + body.sequenceData = sequenceData; + body.binaryData = null; + return body; + } + + /** + * Creates a message body from a list of Data sections.Each Data section is a byte array. + * Please note that this version of the SDK supports only one Data section in a message. It means only a list of exactly one byte array in it is accepted as message body. + * @param binaryData a list of byte arrays. + * @return MessageBody instance wrapping around the binary data. + */ + public static MessageBody fromBinaryData(List binaryData) + { + if(binaryData == null || binaryData.size() == 0 || binaryData.size() > 1) + { + throw new IllegalArgumentException("Binary data is null or has more than one byte array in it."); + } + + MessageBody body = new MessageBody(); + body.bodyType = MessageBodyType.BINARY; + body.valueData = null; + body.sequenceData = null; + body.binaryData = binaryData; + return body; + } + + /** + * Returns the content of message body. + * @return value of message body only if the MessageBody is of Value type. Returns null otherwise. + */ + public Object getValueData() { + return valueData; + } + + /** + * Returns the content of message body. + * @return a list of AMQPSequence sections only if the MessageBody is of Sequence type. Returns null otherwise. Each AMQPSequence section is in turn a list of objects. + */ + public List> getSequenceData() { + return sequenceData; + } + + /** + * Returns the content of message body. + * @return message body as list of byte arrays only if the MessageBody is of Binary type. Returns null otherwise. + */ + public List getBinaryData() { + return binaryData; + } + + /** + * Return the type of content in this message body. + * @return type of message content + */ + public MessageBodyType getBodyType() { + return bodyType; + } +} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBodyType.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBodyType.java new file mode 100644 index 0000000000000..486edaa4805de --- /dev/null +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBodyType.java @@ -0,0 +1,20 @@ +package com.microsoft.azure.servicebus; + +/** + * Enumeration to represent body type of a message. + * + */ +public enum MessageBodyType { + /** + * Message content is byte array, equivalent to AMQP Data. + */ + BINARY, + /** + * Message content is a list of objects, equivalent to AMQP Sequence. Each object must be of a type supported by AMQP. + */ + SEQUENCE, + /** + * Message content is a single object, equivalent to AMQP Value. The object must be of a type supported by AMQP. + */ + VALUE +} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java index 9a02fa82c43ed..326ca4f600d8b 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java @@ -6,6 +6,7 @@ import java.time.Duration; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -13,6 +14,7 @@ import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.*; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; import com.microsoft.azure.servicebus.primitives.ClientConstants; import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag; @@ -25,9 +27,21 @@ class MessageConverter public static org.apache.qpid.proton.message.Message convertBrokeredMessageToAmqpMessage(Message brokeredMessage) { org.apache.qpid.proton.message.Message amqpMessage = Proton.message(); - if(brokeredMessage.getBody() != null) + MessageBody body = brokeredMessage.getMessageBody(); + if( body != null) { - amqpMessage.setBody(new Data(new Binary(brokeredMessage.getBody()))); + if (body.getBodyType() == MessageBodyType.VALUE) + { + amqpMessage.setBody(new AmqpValue(body.getValueData())); + } + else if (body.getBodyType() == MessageBodyType.SEQUENCE) + { + amqpMessage.setBody(new AmqpSequence(Utils.getSequenceFromMessageBody(body))); + } + else + { + amqpMessage.setBody(new Data(new Binary(Utils.getDataFromMessageBody(body)))); + } } if(brokeredMessage.getProperties() != null) @@ -98,12 +112,22 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton if(body instanceof Data) { Binary messageData = ((Data)body).getValue(); - brokeredMessage = new Message(messageData.getArray()); + brokeredMessage = new Message(Utils.fromBinay(messageData.getArray())); + } + else if (body instanceof AmqpValue) + { + Object messageData = ((AmqpValue)body).getValue(); + brokeredMessage = new Message(MessageBody.fromValueData(messageData)); + } + else if (body instanceof AmqpSequence) + { + List messageData = ((AmqpSequence)body).getValue(); + brokeredMessage = new Message(Utils.fromSequence(messageData)); } else { - // TODO: handle other types of message body - brokeredMessage = new Message(); + // Should never happen + brokeredMessage = new Message(); } } else @@ -198,5 +222,5 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton brokeredMessage.setDeliveryTag(deliveryTag); return brokeredMessage; - } + } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java index 663bcdadbad16..2f8421e70aef6 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java @@ -651,6 +651,11 @@ public CompletableFuture renewMessageLockAsync(IMessage message) { public CompletableFuture> renewMessageLockBatchAsync(Collection messages) { this.ensurePeekLockReceiveMode(); + if(messages == null || messages.size() == 0) + { + throw new UnsupportedOperationException("Message collection is null or empty. Locks cannot be renewed."); + } + UUID[] lockTokens = new UUID[messages.size()]; int messageIndex = 0; for (IMessage message : messages) { diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java index 553ec9beeaabc..0ce0840cc9cb2 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java @@ -3,6 +3,8 @@ package com.microsoft.azure.servicebus; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -41,4 +43,44 @@ static void assertNonNull(String argumentName, Object argument) { if (argument == null) throw new IllegalArgumentException("Argument '" + argumentName + "' is null."); } + + static MessageBody fromSequence(List sequence) + { + List> sequenceData = new ArrayList<>(); + sequenceData.add(sequence); + return MessageBody.fromSequenceData(sequenceData); + } + + static MessageBody fromBinay(byte[] binary) + { + List binaryData = new ArrayList<>(); + binaryData.add(binary); + return MessageBody.fromBinaryData(binaryData); + } + + static byte[] getDataFromMessageBody(MessageBody messageBody) + { + List binaryData = messageBody.getBinaryData(); + if(binaryData == null || binaryData.size() == 0) + { + return null; + } + else + { + return binaryData.get(0); + } + } + + static List getSequenceFromMessageBody(MessageBody messageBody) + { + List> sequenceData = messageBody.getSequenceData(); + if(sequenceData == null || sequenceData.size() == 0) + { + return null; + } + else + { + return sequenceData.get(0); + } + } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java index 86079527fc28b..c0ce9cc09addc 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java @@ -25,7 +25,7 @@ public final class AmqpErrorCode // connection errors public static final Symbol ConnectionForced = Symbol.getSymbol("amqp:connection:forced"); - + public static final Symbol FramingError = Symbol.getSymbol("amqp:connection:framing-error"); // proton library IOExceptions while performing operations on SocketChannel (in IOHandler.java) public static final Symbol ProtonIOError = Symbol.getSymbol("proton:io"); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java index 439471c7c2130..2d6535058c4bc 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java @@ -4,9 +4,12 @@ */ package com.microsoft.azure.servicebus.amqp; +import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; +import javax.net.ssl.SSLContext; + import com.microsoft.azure.servicebus.primitives.MessagingFactory; import com.microsoft.azure.servicebus.primitives.TransportType; import org.apache.qpid.proton.Proton; @@ -18,6 +21,7 @@ import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.SslDomain; +import org.apache.qpid.proton.engine.SslPeerDetails; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.TransportInternal; import org.apache.qpid.proton.reactor.Handshaker; @@ -44,7 +48,7 @@ public static ConnectionHandler create(TransportType transportType, IAmqpConnect { switch(transportType) { case AMQP_WEB_SOCKETS: - if (ProxyConnectionHandler.shouldUseProxy( ((MessagingFactory)messagingFactory).getHostName() )) { + if (ProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) { return new ProxyConnectionHandler(messagingFactory); } else { return new WebSocketConnectionHandler(messagingFactory); @@ -59,7 +63,7 @@ public static ConnectionHandler create(TransportType transportType, IAmqpConnect public void onConnectionInit(Event event) { final Connection connection = event.getConnection(); - final String hostName = new StringBuilder(((MessagingFactory)messagingFactory).getHostName()) + final String hostName = new StringBuilder(messagingFactory.getHostName()) .append(":") .append(String.valueOf(this.getProtocolPort())) .toString(); @@ -83,13 +87,28 @@ protected IAmqpConnection getMessagingFactory() public void addTransportLayers(final Event event, final TransportInternal transport) { - final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT); - transport.ssl(domain); + SslDomain domain = Proton.sslDomain(); + domain.init(SslDomain.Mode.CLIENT); + + try { + // Default SSL context will have the root certificate from azure in truststore anyway + SSLContext defaultContext = SSLContext.getDefault(); + StrictTLSContextSpi strictTlsContextSpi = new StrictTLSContextSpi(defaultContext); + SSLContext strictTlsContext = new StrictTLSContext(strictTlsContextSpi, defaultContext.getProvider(), defaultContext.getProtocol()); + domain.setSslContext(strictTlsContext); + domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME); + SslPeerDetails peerDetails = Proton.sslPeerDetails(this.getOutboundSocketHostName(), this.getOutboundSocketPort()); + transport.ssl(domain, peerDetails); + } catch (NoSuchAlgorithmException e) { + // Should never happen + TRACE_LOGGER.error("Default SSL algorithm not found in JRE. Please check your JRE setup.", e); +// this.messagingFactory.onConnectionError(new ErrorCondition(AmqpErrorCode.InternalError, e.getMessage())); + } } protected void notifyTransportErrors(final Event event) { /* no-op */ } - public String getOutboundSocketHostName() { return ((MessagingFactory)messagingFactory).getHostName(); } + public String getOutboundSocketHostName() { return messagingFactory.getHostName(); } public int getOutboundSocketPort() { return this.getProtocolPort(); } @@ -106,11 +125,10 @@ public int getMaxFrameSize() @Override public void onConnectionBound(Event event) { - TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname()); + TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname()); Transport transport = event.getTransport(); - + this.addTransportLayers(event, (TransportInternal) transport); - Sasl sasl = transport.sasl(); sasl.setMechanisms("ANONYMOUS"); } @@ -180,14 +198,4 @@ public void onConnectionLocalClose(Event event) { connection.free(); } } - - private static SslDomain makeDomain(SslDomain.Mode mode) - { - SslDomain domain = Proton.sslDomain(); - domain.init(mode); - - // TODO: VERIFY_PEER_NAME support - domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER); - return domain; - } } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java index 89d1e8c27de3b..b4853d3d1f4e1 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java @@ -10,6 +10,8 @@ public interface IAmqpConnection { + String getHostName(); + void onConnectionOpen(); void onConnectionError(ErrorCondition error); diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProxyConnectionHandler.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProxyConnectionHandler.java index bed203bd2cf82..41ab910197e68 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProxyConnectionHandler.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProxyConnectionHandler.java @@ -91,7 +91,7 @@ protected void notifyTransportErrors(final Event event) { final IOException ioException = reconstructIOException(errorCondition); proxySelector.connectFailed( - createURIFromHostNamePort(((MessagingFactory)this.getMessagingFactory()).getHostName(), this.getProtocolPort()), + createURIFromHostNamePort(this.getMessagingFactory().getHostName(), this.getProtocolPort()), new InetSocketAddress(hostNameParts[0], port), ioException ); @@ -144,7 +144,7 @@ public int getOutboundSocketPort() { private InetSocketAddress getProxyAddress() { final URI serviceUri = createURIFromHostNamePort( - ((MessagingFactory)this.getMessagingFactory()).getHostName(), + this.getMessagingFactory().getHostName(), this.getProtocolPort()); final ProxySelector proxySelector = ProxySelector.getDefault(); if (proxySelector == null) { diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContext.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContext.java new file mode 100644 index 0000000000000..86eaea9ffa8bf --- /dev/null +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContext.java @@ -0,0 +1,14 @@ +package com.microsoft.azure.servicebus.amqp; + +import java.security.Provider; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLContextSpi; + +// Customer SSLContext that wraps around an SSLContext and removes SSLv2Hello protocol from every SSLEngine created. +public class StrictTLSContext extends SSLContext{ + + protected StrictTLSContext(SSLContextSpi contextSpi, Provider provider, String protocol) { + super(contextSpi, provider, protocol); + } +} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContextSpi.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContextSpi.java new file mode 100644 index 0000000000000..9940298bffb53 --- /dev/null +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContextSpi.java @@ -0,0 +1,94 @@ +package com.microsoft.azure.servicebus.amqp; + +import java.security.KeyManagementException; +import java.security.SecureRandom; +import java.util.ArrayList; + +import javax.net.ssl.KeyManager; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLContextSpi; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLServerSocketFactory; +import javax.net.ssl.SSLSessionContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; + +// Wraps over a standard SSL context and disables the SSLv2Hello protocol. +public class StrictTLSContextSpi extends SSLContextSpi{ + + private static final String SSLv2Hello = "SSLv2Hello"; + + SSLContext innerContext; + public StrictTLSContextSpi(SSLContext innerContext) { + this.innerContext = innerContext; + } + + @Override + protected SSLEngine engineCreateSSLEngine() { + SSLEngine engine = this.innerContext.createSSLEngine(); + this.removeSSLv2Hello(engine); + return engine; + } + + @Override + protected SSLEngine engineCreateSSLEngine(String arg0, int arg1) { + SSLEngine engine = this.innerContext.createSSLEngine(arg0, arg1); + this.removeSSLv2Hello(engine); + return engine; + } + + @Override + protected SSLSessionContext engineGetClientSessionContext() { + return innerContext.getClientSessionContext(); + } + + @Override + protected SSLSessionContext engineGetServerSessionContext() { + return this.innerContext.getServerSessionContext(); + } + + @Override + protected SSLServerSocketFactory engineGetServerSocketFactory() { + return this.innerContext.getServerSocketFactory(); + } + + @Override + protected SSLSocketFactory engineGetSocketFactory() { + return this.innerContext.getSocketFactory(); + } + + @Override + protected void engineInit(KeyManager[] km, TrustManager[] tm, SecureRandom sr) throws KeyManagementException { + this.innerContext.init(km, tm, sr); + } + + private void removeSSLv2Hello(SSLEngine engine) + { + String[] enabledProtocols = engine.getEnabledProtocols(); + boolean sslv2HelloFound = false; + for(String protocol : enabledProtocols) + { + if(protocol.equalsIgnoreCase(SSLv2Hello)) + { + sslv2HelloFound = true; + break; + } + } + + if(sslv2HelloFound) + { + ArrayList modifiedProtocols = new ArrayList(); + for(String protocol : enabledProtocols) + { + if(!protocol.equalsIgnoreCase(SSLv2Hello)) + { + modifiedProtocols.add(protocol); + } + } + + engine.setEnabledProtocols(modifiedProtocols.toArray(new String[modifiedProtocols.size()])); + } + + + } +} diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java index 3db7374ca5504..540aced30b3c2 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java @@ -51,7 +51,6 @@ private ClientConstants() { } public final static Symbol ENTITY_DISABLED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-disabled"); public final static Symbol PARTITION_NOT_OWNED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":partition-not-owned"); public final static Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); - public final static Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked"); public final static Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); public final static Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); public final static Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":transfer-destination-address"); @@ -184,6 +183,7 @@ private ClientConstants() { } static final int DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS = 5; static final String SAS_TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s"; + static final Duration SAS_TOKEN_SEND_TIMEOUT = Duration.ofSeconds(10); private static String getClientVersion() { String clientVersion; diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java index 819155aaa1d0c..233a5596b9c8b 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java @@ -95,7 +95,6 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, private Receiver receiveLink; private RequestResponseLink requestResponseLink; private WorkItem linkOpen; - private Duration factoryRceiveTimeout; private Exception lastKnownLinkError; private Instant lastKnownErrorReportedAt; @@ -134,7 +133,6 @@ private CoreMessageReceiver(final MessagingFactory factory, this.prefetchedMessages = new ConcurrentLinkedQueue(); this.linkClose = new CompletableFuture(); this.lastKnownLinkError = null; - this.factoryRceiveTimeout = factory.getOperationTimeout(); this.prefetchCountSync = new Object(); this.retryPolicy = factory.getRetryPolicy(); this.pendingReceives = new ConcurrentLinkedQueue(); @@ -420,6 +418,7 @@ private void createReceiveLink() BaseHandler.setHandler(receiver, handler); receiver.open(); this.receiveLink = receiver; + this.underlyingFactory.registerForConnectionError(this.receiveLink); } CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) @@ -431,7 +430,7 @@ CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) else { CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true)); - return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;TRACE_LOGGER.debug("Sent SAS Token and set renew timer");}); + return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;}); } } @@ -575,7 +574,7 @@ public void run() } }, timeout, - TimerType.OneTimeRun); + TimerType.OneTimeRun); this.ensureLinkIsOpen().thenRun(() -> {this.addCredit(receiveWorkItem);}); return onReceive; @@ -616,8 +615,6 @@ public void onOpenComplete(Exception exception) if (exception == null) { - this.underlyingFactory.registerForConnectionError(this.receiveLink); - if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) { AsyncUtil.completeFuture(this.linkOpen.getWork(), this); @@ -652,13 +649,6 @@ public void onOpenComplete(Exception exception) { TRACE_LOGGER.warn("Opening receive link '{}' to '{}' failed.", this.receiveLink.getName(), this.receivePath, exception); AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception); - if(this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException)) - { - // No point in retrying to establish a link.. SessionLock is lost - TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId); - this.isSessionLockLost = true; - this.closeAsync(); - } } this.lastKnownLinkError = exception; @@ -806,12 +796,6 @@ else if (remoteOutcome instanceof Released) } } - public void onError(ErrorCondition error) - { - Exception completionException = ExceptionUtil.toException(error); - this.onError(completionException); - } - @Override public void onError(Exception exception) { @@ -832,6 +816,7 @@ public void onError(Exception exception) } else { + this.underlyingFactory.deregisterForConnectionError(this.receiveLink); TRACE_LOGGER.warn("Receive link '{}' to '{}', sessionId '{}' closed with error.", this.receiveLink.getName(), this.receivePath, this.sessionId, exception); this.lastKnownLinkError = exception; if ((this.linkOpen != null && !this.linkOpen.getWork().isDone()) || @@ -839,39 +824,35 @@ public void onError(Exception exception) { this.onOpenComplete(exception); } - else - { - this.underlyingFactory.deregisterForConnectionError(this.receiveLink); - - if (exception != null && - (!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient())) - { - this.clearAllPendingWorkItems(exception); - - if(this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException)) - { - // No point in retrying to establish a link.. SessionLock is lost - TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId); - this.isSessionLockLost = true; - this.closeAsync(); - } - } - else - { - // TODO Why recreating link needs to wait for retry interval of pending receive? - ReceiveWorkItem workItem = this.pendingReceives.peek(); - if (workItem != null && workItem.getTimeoutTracker() != null) - { - Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy() - .getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining()); - if (nextRetryInterval != null) - { - TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' will be reopened after '{}'", this.receiveLink.getName(), this.receivePath, this.sessionId, nextRetryInterval); - Timer.schedule(() -> {CoreMessageReceiver.this.ensureLinkIsOpen();}, nextRetryInterval, TimerType.OneTimeRun); - } - } - } - } + + if (exception != null && + (!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient())) + { + this.clearAllPendingWorkItems(exception); + + if(this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException)) + { + // No point in retrying to establish a link.. SessionLock is lost + TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId); + this.isSessionLockLost = true; + this.closeAsync(); + } + } + else + { + // TODO Why recreating link needs to wait for retry interval of pending receive? + ReceiveWorkItem workItem = this.pendingReceives.peek(); + if (workItem != null && workItem.getTimeoutTracker() != null) + { + Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy() + .getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining()); + if (nextRetryInterval != null) + { + TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' will be reopened after '{}'", this.receiveLink.getName(), this.receivePath, this.sessionId, nextRetryInterval); + Timer.schedule(() -> {CoreMessageReceiver.this.ensureLinkIsOpen();}, nextRetryInterval, TimerType.OneTimeRun); + } + } + } } } @@ -983,7 +964,8 @@ public void onClose(ErrorCondition condition) } else { - this.onError(condition); + Exception completionException = ExceptionUtil.toException(condition); + this.onError(completionException); } } @@ -1189,7 +1171,7 @@ private CompletableFuture updateMessageStateAsync(byte[] deliveryTag, Outc state = (DeliveryState) outcome; } - final UpdateStateWorkItem workItem = new UpdateStateWorkItem(completeMessageFuture, state, CoreMessageReceiver.this.factoryRceiveTimeout); + final UpdateStateWorkItem workItem = new UpdateStateWorkItem(completeMessageFuture, state, CoreMessageReceiver.this.operationTimeout); CoreMessageReceiver.this.pendingUpdateStateRequests.put(deliveryTagAsString, workItem); CoreMessageReceiver.this.ensureLinkIsOpen().thenRun(() -> { diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java index d488b99302722..9aa54d538d1ae 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java @@ -419,7 +419,6 @@ public void onOpenComplete(Exception completionException) { if (completionException == null) { - this.underlyingFactory.registerForConnectionError(this.sendLink); this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink); this.lastKnownLinkError = null; this.retryPolicy.resetRetryCount(this.getClientId()); @@ -680,6 +679,7 @@ private void createSendLink(SenderLinkSettings linkSettings) BaseHandler.setHandler(sender, handler); sender.open(); this.sendLink = sender; + this.underlyingFactory.registerForConnectionError(this.sendLink); } CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) @@ -689,9 +689,9 @@ CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) return CompletableFuture.completedFuture(null); } else - { + { CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true)); - CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f; TRACE_LOGGER.debug("Sent SAS Token and set renew timer");}); + CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;}); if (this.transferDestinationPath!= null && !this.transferDestinationPath.isEmpty()) { @@ -706,7 +706,7 @@ CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) private void cancelSASTokenRenewTimer() { if(this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) - { + { this.sasTokenRenewTimerFuture.cancel(true); TRACE_LOGGER.debug("Cancelled SAS Token renew timer"); } diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ExceptionUtil.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ExceptionUtil.java index 49be50bfde83b..a4ad1ec973732 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ExceptionUtil.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ExceptionUtil.java @@ -83,7 +83,15 @@ else if (errorCondition.getCondition() == ClientConstants.STORE_LOCK_LOST_ERROR) } else if (errorCondition.getCondition() == AmqpErrorCode.AmqpLinkDetachForced) { - return new ServiceBusException(false, new AmqpException(errorCondition)); + return new ServiceBusException(true, new AmqpException(errorCondition)); + } + else if (errorCondition.getCondition() == AmqpErrorCode.ConnectionForced) + { + return new ServiceBusException(true, new AmqpException(errorCondition)); + } + else if (errorCondition.getCondition() == AmqpErrorCode.FramingError) + { + return new ServiceBusException(true, new AmqpException(errorCondition)); } else if (errorCondition.getCondition() == AmqpErrorCode.ResourceLimitExceeded) { diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java index 2e783afd1a710..05e218d9e5b76 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java @@ -70,16 +70,15 @@ public class MessagingFactory extends ClientEntity implements IAmqpConnection private RequestResponseLink cbsLink; private int cbsLinkCreationAttempts = 0; private Throwable lastCBSLinkCreationException = null; + private URI namespaceEndpointUri; private final ClientSettings clientSettings; - private final URI namespaceEndpointUri; private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings) { super("MessagingFactory".concat(StringUtil.getShortRandomString())); - this.namespaceEndpointUri = namespaceEndpointUri; this.clientSettings = clientSettings; - + this.namespaceEndpointUri = namespaceEndpointUri; this.hostName = namespaceEndpointUri.getHost(); this.registeredLinks = new LinkedList(); this.connetionCloseFuture = new CompletableFuture(); @@ -181,6 +180,7 @@ private synchronized CompletableFuture createController() { }); } + @Override public String getHostName() { return this.hostName; @@ -222,7 +222,7 @@ Connection getConnection() { if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { - TRACE_LOGGER.info("Creating connection to host '{}:{}'", hostName, ClientConstants.AMQPS_PORT); + TRACE_LOGGER.info("Creating connection to host '{}:{}'", this.connectionHandler.getOutboundSocketHostName(), this.connectionHandler.getOutboundSocketPort()); this.connection = this.getReactor().connectionToHost( this.connectionHandler.getOutboundSocketHostName(), this.connectionHandler.getOutboundSocketPort(), @@ -655,42 +655,74 @@ CompletableFuture sendSecurityToken(String sasTokenAudienceUri) CompletableFuture> sendSecurityTokenAndSetRenewTimer(String sasTokenAudienceURI, boolean retryOnFailure, Runnable validityRenewer) { + CompletableFuture> result = new CompletableFuture>(); TRACE_LOGGER.debug("Sending token for {}", sasTokenAudienceURI); - CompletableFuture tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceURI); - return tokenFuture.thenComposeAsync((t) -> - { - SecurityToken generatedSecurityToken = t; - CompletableFuture sendTokenFuture = this.cbsLinkCreationFuture.thenComposeAsync((v) -> { - return CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.clientSettings.getOperationTimeout()), generatedSecurityToken); - }, MessagingFactory.INTERNAL_THREAD_POOL); - - if(retryOnFailure) - { - return sendTokenFuture.handleAsync((v, sendTokenEx) -> { - if(sendTokenEx == null) - { - TRACE_LOGGER.debug("Sent token for {}", sasTokenAudienceURI); - return MessagingFactory.scheduleRenewTimer(generatedSecurityToken.getValidUntil(), validityRenewer); - } - else - { - TRACE_LOGGER.warn("Sending CBS Token for {} failed.", sasTokenAudienceURI, sendTokenEx); - TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", sasTokenAudienceURI, ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS); - return Timer.schedule(validityRenewer, Duration.ofSeconds(ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS), TimerType.OneTimeRun); - } - }, MessagingFactory.INTERNAL_THREAD_POOL); - } - else - { - // Let the exception of the sendToken state pass up to caller - return sendTokenFuture.thenApply((v) -> { - TRACE_LOGGER.debug("Sent token for {}", sasTokenAudienceURI); - return MessagingFactory.scheduleRenewTimer(generatedSecurityToken.getValidUntil(), validityRenewer); - }); - } - }, MessagingFactory.INTERNAL_THREAD_POOL); + CompletableFuture sendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI); + sendTokenFuture.handleAsync((validUntil, sendTokenEx) -> { + if(sendTokenEx == null) + { + TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI); + ScheduledFuture renewalFuture = MessagingFactory.scheduleRenewTimer(validUntil, validityRenewer); + result.complete(renewalFuture); + } + else + { + Throwable sendFailureCause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx); + TRACE_LOGGER.warn("Sending CBS Token for {} failed.", sasTokenAudienceURI, sendFailureCause); + if(retryOnFailure) + { + // Just schedule another attempt + TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", sasTokenAudienceURI, ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS); + ScheduledFuture renewalFuture = Timer.schedule(validityRenewer, Duration.ofSeconds(ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS), TimerType.OneTimeRun); + result.complete(renewalFuture); + } + else + { + if(sendFailureCause instanceof TimeoutException) + { + // Retry immediately on timeout. This is a special case as CBSLink may be disconnected right after the token is sent, but before it reaches the service + TRACE_LOGGER.debug("Resending token for {}", sasTokenAudienceURI); + CompletableFuture resendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI); + resendTokenFuture.handleAsync((resendValidUntil, resendTokenEx) -> { + if(resendTokenEx == null) + { + TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI); + ScheduledFuture renewalFuture = MessagingFactory.scheduleRenewTimer(resendValidUntil, validityRenewer); + result.complete(renewalFuture); + } + else + { + Throwable resendFailureCause = ExceptionUtil.extractAsyncCompletionCause(resendTokenEx); + TRACE_LOGGER.warn("Resending CBS Token for {} failed.", sasTokenAudienceURI, resendFailureCause); + result.completeExceptionally(resendFailureCause); + } + return null; + }, MessagingFactory.INTERNAL_THREAD_POOL); + } + else + { + result.completeExceptionally(sendFailureCause); + } + } + } + return null; + }, MessagingFactory.INTERNAL_THREAD_POOL); + + return result; } + private CompletableFuture generateAndSendSecurityToken(String sasTokenAudienceURI) + { + CompletableFuture tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceURI); + return tokenFuture.thenComposeAsync((t) -> + { + SecurityToken generatedSecurityToken = t; + return this.cbsLinkCreationFuture.thenComposeAsync((v) -> { + return CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, ClientConstants.SAS_TOKEN_SEND_TIMEOUT, generatedSecurityToken).thenApply((u) -> generatedSecurityToken.getValidUntil()); + }, MessagingFactory.INTERNAL_THREAD_POOL); + }, MessagingFactory.INTERNAL_THREAD_POOL); + } + private static ScheduledFuture scheduleRenewTimer(Instant currentTokenValidUntil, Runnable validityRenewer) { // It will eventually expire. Renew it diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java index 7f61d6bacc451..af9acea1a59bd 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java @@ -60,6 +60,8 @@ class RequestResponseLink extends ClientEntity{ private boolean isRecreateLinksInProgress; private Map additionalProperties; private MessagingEntityType entityType; + private boolean isInnerLinksCloseHandled; + private int internalLinkGeneration; public static CompletableFuture createAsync( MessagingFactory messagingFactory, @@ -170,6 +172,7 @@ private RequestResponseLink( { super(linkName); + this.internalLinkGeneration = 1; this.recreateLinksLock = new Object(); this.isRecreateLinksInProgress = false; this.underlyingFactory = messagingFactory; @@ -184,6 +187,7 @@ private RequestResponseLink( this.replyTo = UUID.randomUUID().toString(); this.createFuture = new CompletableFuture(); this.entityType = entityType; + this.isInnerLinksCloseHandled = false; } public String getLinkPath() @@ -200,7 +204,7 @@ private CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure else { CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true)); - CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f; TRACE_LOGGER.debug("Set SAS Token renew timer");}); + CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;}); if (additionalAudienceURI != null) { CompletableFuture transferSendTokenFuture = this.underlyingFactory.sendSecurityToken(this.additionalAudienceURI); @@ -208,6 +212,41 @@ private CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure } return sasTokenFuture; + } + } + + private void onInnerLinksClosed(int linkGeneration, Exception exception) + { + // Ignore exceptions from last generation links + if(this.internalLinkGeneration == linkGeneration) + { + // This method is called twice once when inner receive link is closed and again when inner send link is closed. + // Both of them happen in succession anyway, not concurrently as all these happen on the reactor thread. + if(!this.isInnerLinksCloseHandled) + { + // Set it here and Reset the flag only before recreating inner links + this.isInnerLinksCloseHandled = true; + this.cancelSASTokenRenewTimer(); + if(this.pendingRequests.size() > 0) + { + if(exception != null && exception instanceof ServiceBusException && ((ServiceBusException) exception).getIsTransient()) + { + Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), exception, this.underlyingFactory.getOperationTimeout()); + if (nextRetryInterval != null) + { + Timer.schedule(() -> {RequestResponseLink.this.ensureUniqueLinkRecreation();}, nextRetryInterval, TimerType.OneTimeRun); + } + else + { + this.completeAllPendingRequestsWithException(exception); + } + } + else + { + this.completeAllPendingRequestsWithException(exception); + } + } + } } } @@ -222,6 +261,7 @@ private void cancelSASTokenRenewTimer() private void createInternalLinks() { + this.isInnerLinksCloseHandled = false; Map commonLinkProperties = new HashMap<>(); // ServiceBus expects timeout to be of type unsignedint commonLinkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis())); @@ -257,9 +297,6 @@ private void createInternalLinks() sender.setProperties(commonLinkProperties); SendLinkHandler sendLinkHandler = new SendLinkHandler(this.amqpSender); BaseHandler.setHandler(sender, sendLinkHandler); - this.amqpSender.setSendLink(sender); - TRACE_LOGGER.debug("RequestReponseLink - opening send link to {}", this.linkPath); - sender.open(); // Create receive link session = connection.session(); @@ -286,9 +323,40 @@ private void createInternalLinks() final ReceiveLinkHandler receiveLinkHandler = new ReceiveLinkHandler(this.amqpReceiver); BaseHandler.setHandler(receiver, receiveLinkHandler); - this.amqpReceiver.setReceiveLink(receiver); + + this.amqpSender.setLinks(sender, receiver); + this.amqpReceiver.setLinks(sender, receiver); + + TRACE_LOGGER.debug("RequestReponseLink - opening send link to {}", this.linkPath); + sender.open(); + this.underlyingFactory.registerForConnectionError(sender); TRACE_LOGGER.debug("RequestReponseLink - opening receive link to {}", this.linkPath); receiver.open(); + this.underlyingFactory.registerForConnectionError(receiver); + } + + private void ensureUniqueLinkRecreation() + { + synchronized (this.recreateLinksLock) { + if(!this.isRecreateLinksInProgress) + { + this.isRecreateLinksInProgress = true; + this.recreateInternalLinks().handleAsync((v, recreationEx) -> + { + if(recreationEx != null) + { + TRACE_LOGGER.warn("Recreating internal links of reqestresponselink '{}' failed.", this.linkPath, ExceptionUtil.extractAsyncCompletionCause(recreationEx)); + } + + synchronized (this.recreateLinksLock) + { + this.isRecreateLinksInProgress = false; + } + + return null; + }, MessagingFactory.INTERNAL_THREAD_POOL); + } + } } private CompletableFuture recreateInternalLinks() @@ -299,6 +367,7 @@ private CompletableFuture recreateInternalLinks() this.cancelSASTokenRenewTimer(); // Create new internal sender and receiver objects, as old ones are closed + this.internalLinkGeneration++; this.amqpSender = new InternalSender(this.getClientId() + ":internalSender", this, this.amqpSender); this.amqpReceiver = new InternalReceiver(this.getClientId() + ":interalReceiver", this); CompletableFuture recreateInternalLinksFuture = new CompletableFuture(); @@ -337,6 +406,7 @@ public void onEvent() if(ex == null) { TRACE_LOGGER.info("Recreated internal links to {}", this.linkPath); + this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId()); recreateInternalLinksFuture.complete(null); } else @@ -384,32 +454,7 @@ private void completeAllPendingRequestsWithException(Exception exception) public CompletableFuture requestAysnc(Message requestMessage, TransactionContext transaction, Duration timeout) { this.throwIfClosed(null); - // Check and recreate links if necessary - if(!((this.amqpSender.sendLink.getLocalState() == EndpointState.ACTIVE && this.amqpSender.sendLink.getRemoteState() == EndpointState.ACTIVE) - && (this.amqpReceiver.receiveLink.getLocalState() == EndpointState.ACTIVE && this.amqpReceiver.receiveLink.getRemoteState() == EndpointState.ACTIVE))) - { - synchronized (this.recreateLinksLock) { - if(!this.isRecreateLinksInProgress) - { - this.isRecreateLinksInProgress = true; - this.recreateInternalLinks().handleAsync((v, recreationEx) -> - { - if(recreationEx != null) - { - TRACE_LOGGER.warn("Recreating internal links of reqestresponselink '{}' failed.", this.linkPath, ExceptionUtil.extractAsyncCompletionCause(recreationEx)); - } - - synchronized (this.recreateLinksLock) - { - this.isRecreateLinksInProgress = false; - } - - return null; - }, MessagingFactory.INTERNAL_THREAD_POOL); - } - } - } - + CompletableFuture responseFuture = new CompletableFuture(); RequestResponseWorkItem workItem = new RequestResponseWorkItem(requestMessage, transaction, responseFuture, timeout); String requestId = "request:" + this.requestCounter.incrementAndGet(); @@ -419,6 +464,14 @@ public CompletableFuture requestAysnc(Message requestMessage, Transacti workItem.setTimeoutTask(this.scheduleRequestTimeout(requestId, timeout)); TRACE_LOGGER.debug("Sending request with id:{}", requestId); this.amqpSender.sendRequest(requestId, false); + + // Check and recreate links if necessary + if(!((this.amqpSender.sendLink.getLocalState() == EndpointState.ACTIVE && this.amqpSender.sendLink.getRemoteState() == EndpointState.ACTIVE) + && (this.amqpReceiver.receiveLink.getLocalState() == EndpointState.ACTIVE && this.amqpReceiver.receiveLink.getRemoteState() == EndpointState.ACTIVE))) + { + this.ensureUniqueLinkRecreation(); + } + return responseFuture; } @@ -544,12 +597,15 @@ private class InternalReceiver extends ClientEntity implements IAmqpReceiver { private RequestResponseLink parent; private Receiver receiveLink; + private Sender matchingSendLink; private CompletableFuture openFuture; private CompletableFuture closeFuture; + private int linkGeneration; protected InternalReceiver(String clientId, RequestResponseLink parent) { super(clientId); this.parent = parent; + this.linkGeneration = parent.internalLinkGeneration;// Read it in the constructor as it may change later this.openFuture = new CompletableFuture(); this.closeFuture = new CompletableFuture(); } @@ -603,7 +659,6 @@ public void onOpenComplete(Exception completionException) { if(completionException == null) { TRACE_LOGGER.debug("Opened internal receive link of requestresponselink to {}", parent.linkPath); - this.parent.underlyingFactory.registerForConnectionError(this.receiveLink); AsyncUtil.completeFuture(this.openFuture, null); // Send unlimited credit @@ -633,59 +688,30 @@ public void onError(Exception exception) { AsyncUtil.completeFutureExceptionally(this.closeFuture, exception); } } - - TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' encountered error.", this.receiveLink.getName(), this.parent.linkPath, exception); - this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink); - if(this.parent.amqpSender.sendLink != null) + else { - this.parent.amqpSender.sendLink.close(); - this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpSender.sendLink); + TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' encountered error.", this.receiveLink.getName(), this.parent.linkPath, exception); + this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink); + this.matchingSendLink.close(); + this.parent.underlyingFactory.deregisterForConnectionError(this.matchingSendLink); + this.parent.onInnerLinksClosed(this.linkGeneration, exception); } - this.parent.cancelSASTokenRenewTimer(); - this.parent.completeAllPendingRequestsWithException(exception); } @Override public void onClose(ErrorCondition condition) { - if(this.getIsClosingOrClosed()) + if(condition == null || condition.getCondition() == null) { - if(!this.closeFuture.isDone()) + if(this.getIsClosingOrClosed() && !this.closeFuture.isDone()) { - if(condition == null || condition.getCondition() == null) - { - TRACE_LOGGER.info("Closed internal receive link of requestresponselink to {}", parent.linkPath); - AsyncUtil.completeFuture(this.closeFuture, null); - } - else - { - Exception exception = ExceptionUtil.toException(condition); - TRACE_LOGGER.error("Closing internal receive link '{}' of requestresponselink to {} failed.", this.receiveLink.getName(), this.parent.linkPath, exception); - AsyncUtil.completeFutureExceptionally(this.closeFuture, exception); - } + TRACE_LOGGER.info("Closed internal receive link of requestresponselink to {}", parent.linkPath); + AsyncUtil.completeFuture(this.closeFuture, null); } } else { - if(condition != null) - { - Exception exception = ExceptionUtil.toException(condition); - if(!this.openFuture.isDone()) - { - this.onOpenComplete(exception); - } - else - { - TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' closed with error.", this.receiveLink.getName(), this.parent.linkPath, exception); - this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink); - if(this.parent.amqpSender.sendLink != null) - { - this.parent.amqpSender.sendLink.close(); - this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpSender.sendLink); - } - this.parent.cancelSASTokenRenewTimer(); - this.parent.completeAllPendingRequestsWithException(exception); - } - } + Exception exception = ExceptionUtil.toException(condition); + this.onError(exception); } } @@ -724,15 +750,17 @@ public void onReceiveComplete(Delivery delivery) } }); } - - public void setReceiveLink(Receiver receiveLink) { + + public void setLinks(Sender sendLink, Receiver receiveLink) { this.receiveLink = receiveLink; + this.matchingSendLink = sendLink; } } private class InternalSender extends ClientEntity implements IAmqpSender { private Sender sendLink; + private Receiver matchingReceiveLink; private RequestResponseLink parent; private CompletableFuture openFuture; private CompletableFuture closeFuture; @@ -742,10 +770,12 @@ private class InternalSender extends ClientEntity implements IAmqpSender private Object pendingSendsSyncLock; private boolean isSendLoopRunning; private int maxMessageSize; + private int linkGeneration; protected InternalSender(String clientId, RequestResponseLink parent, InternalSender senderToBeCopied) { super(clientId); this.parent = parent; + this.linkGeneration = parent.internalLinkGeneration;// Read it in the constructor as it may change later this.availableCredit = new AtomicInteger(0); this.pendingSendsSyncLock = new Object(); this.isSendLoopRunning = false; @@ -813,7 +843,6 @@ public void onOpenComplete(Exception completionException) { if(completionException == null) { TRACE_LOGGER.debug("Opened internal send link of requestresponselink to {}", parent.linkPath); - this.parent.underlyingFactory.registerForConnectionError(this.sendLink); this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink); AsyncUtil.completeFuture(this.openFuture, null); this.runSendLoop(); @@ -842,61 +871,30 @@ public void onError(Exception exception) { AsyncUtil.completeFutureExceptionally(this.closeFuture, exception); } } - - TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' encountered error.", this.sendLink.getName(), this.parent.linkPath, exception); - this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink); - if(this.parent.amqpReceiver.receiveLink != null) + else { - this.parent.amqpReceiver.receiveLink.close(); - this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpReceiver.receiveLink); + TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' encountered error.", this.sendLink.getName(), this.parent.linkPath, exception); + this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink); + this.matchingReceiveLink.close(); + this.parent.underlyingFactory.deregisterForConnectionError(this.matchingReceiveLink); + this.parent.onInnerLinksClosed(this.linkGeneration, exception); } - this.parent.cancelSASTokenRenewTimer(); - this.parent.completeAllPendingRequestsWithException(exception); } @Override public void onClose(ErrorCondition condition) { - if(this.getIsClosingOrClosed()) + if(condition == null || condition.getCondition() == null) { - if(!this.closeFuture.isDone()) + if(!this.closeFuture.isDone() && !this.closeFuture.isDone()) { - if(condition == null || condition.getCondition() == null) - { - TRACE_LOGGER.info("Closed internal send link of requestresponselink to {}", this.parent.linkPath); - AsyncUtil.completeFuture(this.closeFuture, null); - } - else - { - Exception exception = ExceptionUtil.toException(condition); - TRACE_LOGGER.error("Closing internal send link '{}' of requestresponselink to {} failed.", this.sendLink.getName(), this.parent.linkPath, exception); - AsyncUtil.completeFutureExceptionally(this.closeFuture, exception); - } + TRACE_LOGGER.info("Closed internal send link of requestresponselink to {}", this.parent.linkPath); + AsyncUtil.completeFuture(this.closeFuture, null); } } else { - if(condition != null) - { - Exception exception = ExceptionUtil.toException(condition); - if(!this.openFuture.isDone()) - { - this.onOpenComplete(exception); - } - else - { - TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' closed with error.", this.sendLink.getName(), this.parent.linkPath, exception); - this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink); - - if(this.parent.amqpReceiver.receiveLink != null) - { - this.parent.amqpReceiver.receiveLink.close(); - this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpReceiver.receiveLink); - } - - this.parent.cancelSASTokenRenewTimer(); - this.parent.completeAllPendingRequestsWithException(exception); - } - } + Exception exception = ExceptionUtil.toException(condition); + this.onError(exception); } } @@ -960,9 +958,10 @@ public void onFlow(int creditIssued) { public void onSendComplete(Delivery delivery) { // Doesn't happen as sends are settled on send } - - public void setSendLink(Sender sendLink) { + + public void setLinks(Sender sendLink, Receiver receiveLink) { this.sendLink = sendLink; + this.matchingReceiveLink = receiveLink; this.availableCredit = new AtomicInteger(0); } @@ -1043,7 +1042,7 @@ private void runSendLoop() else { TRACE_LOGGER.warn("Request with id:{} not found in the requestresponse link.", requestIdToBeSent); - } + } } } finally diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java index 3511e170b5e44..d9f451685a48e 100644 --- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java +++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java @@ -397,7 +397,7 @@ static int encodeMessageToCustomArray(Message message, byte[] encodedBytes, int } catch(BufferOverflowException exception) { - throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", length / 1024), exception); + throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s KB", length / 1024), exception); } } @@ -461,7 +461,7 @@ else if(tokenValidityInSeconds >= 60) return tokenValidityInSeconds - 10; } else - { + { return (tokenValidityInSeconds - 1) > 0 ? tokenValidityInSeconds - 1 : 0; } } diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java index 96749fe99c31e..cc54cd462260a 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java @@ -185,7 +185,7 @@ public CompletableFuture OnCloseSessionAsync(IMessageSession session) { } }, MessageAndSessionPumpTests.EXECUTOR_SERVICE); - Thread.sleep(1000); // Sleep for a second for the exception + Thread.sleep(2000); // Sleep for two seconds for the exception Assert.assertTrue("QueueClient created to a subscription which shouldn't be allowed.", unsupportedExceptionOccured.get()); } finally { qc.close(); @@ -223,7 +223,7 @@ public CompletableFuture OnCloseSessionAsync(IMessageSession session) { } }, MessageAndSessionPumpTests.EXECUTOR_SERVICE); - Thread.sleep(1000); // Sleep for a second for the exception + Thread.sleep(2000); // Sleep for two seconds for the exception Assert.assertTrue("SubscriptionClient created to a queue which shouldn't be allowed.", unsupportedExceptionOccured.get()); } finally { sc.close(); diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java index 185764579f969..1259666c0b2c7 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java @@ -90,7 +90,7 @@ public static void testMessagePumpRenewLock(IMessageSender sender, IMessageAndSe sender.send(new Message("AMQPMessage")); } boolean autoComplete = true; - int sleepMinutes = 1; // This should be less than message lock duration of the queue or subscription + int sleepMinutes = 1; // This should be more than message lock duration of the queue or subscription CountingMessageHandler messageHandler = new CountingMessageHandler(messagePump, !autoComplete, numMessages, false, Duration.ofMinutes(sleepMinutes)); messagePump.registerMessageHandler(messageHandler, new MessageHandlerOptions(numMessages, autoComplete, Duration.ofMinutes(10)), EXECUTOR_SERVICE); int waitMinutes = 2 * sleepMinutes; @@ -309,7 +309,8 @@ private static class CountingMessageHandler extends TestMessageHandler CountingMessageHandler(IMessageAndSessionPump messagePump, boolean completeMessages, int messageCount, boolean firstThrowException) { - this(messagePump, completeMessages, messageCount, firstThrowException, Duration.ZERO); + // Let's make every onMessage sleep for sometime so we can reliably count max concurrent threads + this(messagePump, completeMessages, messageCount, firstThrowException, Duration.ofSeconds(2)); } CountingMessageHandler(IMessageAndSessionPump messagePump, boolean completeMessages, int messageCount, boolean firstThrowException, Duration sleepDuration) @@ -391,7 +392,8 @@ private static class CountingSessionHandler extends TestSessionHandler CountingSessionHandler(IMessageAndSessionPump sessionPump, boolean completeMessages, int totalMessageCount, boolean firstThrowException) { - this(sessionPump, completeMessages, totalMessageCount, firstThrowException, Duration.ZERO); + // Let's make every onMessage sleep for sometime so we can reliably count max concurrent threads + this(sessionPump, completeMessages, totalMessageCount, firstThrowException, Duration.ofSeconds(2)); } CountingSessionHandler(IMessageAndSessionPump sessionPump, boolean completeMessages, int totalMessageCount, boolean firstThrowException, Duration sleepDuration) diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageBodyTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageBodyTests.java new file mode 100644 index 0000000000000..b52647c1e1e63 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageBodyTests.java @@ -0,0 +1,171 @@ +package com.microsoft.azure.servicebus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +public class MessageBodyTests { + + @Test + public void NullBinaryDataTest() + { + try + { + MessageBody.fromBinaryData(null); + Assert.fail("MessageBody created with null binary data."); + } + catch(IllegalArgumentException e) + { + // passed + } + } + + @Test + public void NullSequenceTest() + { + try + { + MessageBody.fromSequenceData(null); + Assert.fail("MessageBody created with null sequence data."); + } + catch(IllegalArgumentException e) + { + // passed + } + } + + @Test + public void NullValueDataTest() + { + try + { + MessageBody.fromValueData(null); + Assert.fail("MessageBody created with null value data."); + } + catch(IllegalArgumentException e) + { + // passed + } + } + + @Test + public void MultipleDataSectionsTest() + { + try + { + ArrayList dataList = new ArrayList<>(); + dataList.add(new byte[0]); + dataList.add(new byte[0]); + MessageBody.fromBinaryData(dataList); + Assert.fail("MessageBody created with null binary data."); + } + catch(IllegalArgumentException e) + { + // passed + } + } + + @Test + public void ZeroDataSectionsTest() + { + try + { + ArrayList dataList = new ArrayList<>(); + MessageBody.fromBinaryData(dataList); + Assert.fail("MessageBody created with null binary data."); + } + catch(IllegalArgumentException e) + { + // passed + } + } + + @Test + public void MultipleSequenceSectionsTest() + { + try + { + ArrayList> sequenceList = new ArrayList<>(); + ArrayList sequence1 = new ArrayList<>(); + sequence1.add("hello"); + ArrayList sequence2 = new ArrayList<>(); + sequence2.add("howdy"); + sequenceList.add(sequence1); + sequenceList.add(sequence2); + MessageBody.fromSequenceData(sequenceList); + Assert.fail("MessageBody created with null binary data."); + } + catch(IllegalArgumentException e) + { + // passed + } + } + + @Test + public void ZeroSequenceSectionsTest() + { + try + { + ArrayList> sequenceList = new ArrayList<>(); + MessageBody.fromSequenceData(sequenceList); + Assert.fail("MessageBody created with null binary data."); + } + catch(IllegalArgumentException e) + { + // passed + } + } + + @Test + public void ValueMessageBodyTest() + { + String value = "ValueBody"; + MessageBody body = MessageBody.fromValueData(value); + Assert.assertEquals("Message body type didn't match.", MessageBodyType.VALUE, body.getBodyType()); + Assert.assertNull("MessageBody of value type has binary data.", body.getBinaryData()); + Assert.assertNull("MessageBody of value type has sequence data.", body.getSequenceData()); + Assert.assertEquals("Message body value didn't match", value, body.getValueData()); + } + + @Test + public void SequenceMessageBodyTest() + { + String str1 = "hello"; + String str2 = "howdy"; + ArrayList> sequenceList = new ArrayList<>(); + ArrayList sequence1 = new ArrayList<>(); + sequence1.add(str1); + sequence1.add(str2); + sequenceList.add(sequence1); + MessageBody body = MessageBody.fromSequenceData(sequenceList); + Assert.assertEquals("Message body type didn't match.", MessageBodyType.SEQUENCE, body.getBodyType()); + Assert.assertNull("MessageBody of sequence type has binary data.", body.getBinaryData()); + Assert.assertNull("MessageBody of sequence type has value data.", body.getValueData()); + List> outputSequenceList = body.getSequenceData(); + Assert.assertEquals("Message body sequence didn't match", 1, outputSequenceList.size()); + List outputInnerSequence = outputSequenceList.get(0); + Assert.assertEquals("Message body sequence didn't match", 2, outputInnerSequence.size()); + Assert.assertEquals("Message body sequence didn't match", str1, outputInnerSequence.get(0)); + Assert.assertEquals("Message body sequence didn't match", str2, outputInnerSequence.get(1)); + } + + @Test + public void BinaryMessageBodyTest() + { + byte[] binaryData = new byte[1024]; + Arrays.fill(binaryData, (byte)32); + ArrayList binaryDataList = new ArrayList<>(); + binaryDataList.add(binaryData); + MessageBody body = MessageBody.fromBinaryData(binaryDataList); + Assert.assertEquals("Message body type didn't match.", MessageBodyType.BINARY, body.getBodyType()); + Assert.assertNull("MessageBody of binary type has value data.", body.getValueData()); + Assert.assertNull("MessageBody of binary type has sequence data.", body.getSequenceData()); + List outputataList = body.getBinaryData(); + Assert.assertEquals("Message body binary data didn't match", 1, outputataList.size()); + byte[] outputBinaryData = outputataList.get(0); + Assert.assertEquals("Message body sequence didn't match", binaryData, outputBinaryData); + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientSessionTests.java new file mode 100644 index 0000000000000..b2ffc6088fa2f --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientSessionTests.java @@ -0,0 +1,18 @@ +package com.microsoft.azure.servicebus; + +public class PartitionedQueueClientSessionTests extends QueueClientSessionTests{ + @Override + public String getEntityNamePrefix() { + return "PartitionedQueueClientSessionTests"; + } + + @Override + public boolean shouldCreateEntityForEveryTest() { + return TestUtils.shouldCreateEntityForEveryTest(); + } + + @Override + public boolean isEntityPartitioned() { + return true; + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientTests.java new file mode 100644 index 0000000000000..6665afaf19b7d --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientTests.java @@ -0,0 +1,18 @@ +package com.microsoft.azure.servicebus; + +public class PartitionedQueueClientTests extends QueueClientTests{ + @Override + public String getEntityNamePrefix() { + return "PartitionedQueueClientTests"; + } + + @Override + public boolean shouldCreateEntityForEveryTest() { + return TestUtils.shouldCreateEntityForEveryTest(); + } + + @Override + public boolean isEntityPartitioned() { + return true; + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSendReceiveTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSendReceiveTests.java new file mode 100644 index 0000000000000..73e9a92a4225e --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSendReceiveTests.java @@ -0,0 +1,18 @@ +package com.microsoft.azure.servicebus; + +public class PartitionedQueueSendReceiveTests extends QueueSendReceiveTests{ + @Override + public String getEntityNamePrefix() { + return "PartitionedQueueSendReceiveTests"; + } + + @Override + public boolean shouldCreateEntityForEveryTest() { + return TestUtils.shouldCreateEntityForEveryTest(); + } + + @Override + public boolean isEntityPartitioned() { + return true; + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSessionTests.java new file mode 100644 index 0000000000000..77902f0ec103b --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSessionTests.java @@ -0,0 +1,19 @@ +package com.microsoft.azure.servicebus; + +public class PartitionedQueueSessionTests extends QueueSessionTests +{ + @Override + public String getEntityNamePrefix() { + return "PartitionedQueueSessionTests"; + } + + @Override + public boolean shouldCreateEntityForEveryTest() { + return TestUtils.shouldCreateEntityForEveryTest(); + } + + @Override + public boolean isEntityPartitioned() { + return true; + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientSessionTests.java new file mode 100644 index 0000000000000..a01c9248b80f7 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientSessionTests.java @@ -0,0 +1,18 @@ +package com.microsoft.azure.servicebus; + +public class PartitionedSubscriptionClientSessionTests extends SubscriptionClientSessionTests{ + @Override + public String getEntityNamePrefix() { + return "PartitionedSubscriptionClientSessionTests"; + } + + @Override + public boolean shouldCreateEntityForEveryTest() { + return TestUtils.shouldCreateEntityForEveryTest(); + } + + @Override + public boolean isEntityPartitioned() { + return true; + } +} \ No newline at end of file diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientTests.java new file mode 100644 index 0000000000000..0a0ba298c29b4 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientTests.java @@ -0,0 +1,18 @@ +package com.microsoft.azure.servicebus; + +public class PartitionedSubscriptionClientTests extends SubscriptionClientTests{ + @Override + public String getEntityNamePrefix() { + return "PartitionedSubscriptionClientTests"; + } + + @Override + public boolean isEntityPartitioned() { + return true; + } + + @Override + public boolean shouldCreateEntityForEveryTest() { + return TestUtils.shouldCreateEntityForEveryTest(); + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSendReceiveTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSendReceiveTests.java new file mode 100644 index 0000000000000..9d3542300cd41 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSendReceiveTests.java @@ -0,0 +1,19 @@ +package com.microsoft.azure.servicebus; + +public class PartitionedTopicSendReceiveTests extends TopicSendReceiveTests { + + @Override + public String getEntityNamePrefix() { + return "PartitionedTopicSendReceiveTests"; + } + + @Override + public boolean isEntityPartitioned() { + return true; + } + + @Override + public boolean shouldCreateEntityForEveryTest() { + return TestUtils.shouldCreateEntityForEveryTest(); + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSessionTests.java new file mode 100644 index 0000000000000..86e4da2084290 --- /dev/null +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSessionTests.java @@ -0,0 +1,19 @@ +package com.microsoft.azure.servicebus; + +public class PartitionedTopicSessionTests extends TopicSessionTests { + + @Override + public String getEntityNamePrefix() { + return "PartitionedTopicSessionTests"; + } + + @Override + public boolean isEntityPartitioned() { + return true; + } + + @Override + public boolean shouldCreateEntityForEveryTest() { + return TestUtils.shouldCreateEntityForEveryTest(); + } +} diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java index 0de4c0a5d7b91..a7781446cd4bc 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java @@ -106,17 +106,38 @@ public static void cleanupAfterAllTest() throws ExecutionException, InterruptedE } @Test - public void testBasicReceiveAndDelete() throws InterruptedException, ServiceBusException, ExecutionException - { - this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE); - TestCommons.testBasicReceiveAndDelete(this.sender, this.sessionId, this.receiver); - } + public void testBasicReceiveAndDeleteWithValueData() throws InterruptedException, ServiceBusException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE); + TestCommons.testBasicReceiveAndDeleteWithValueData(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveAndDeleteWithBinaryData() throws InterruptedException, ServiceBusException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE); + TestCommons.testBasicReceiveAndDeleteWithBinaryData(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveAndDeleteWithLargeBinaryData() throws InterruptedException, ServiceBusException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE); + TestCommons.testBasicReceiveAndDeleteWithLargeBinaryData(this.sender, this.sessionId, this.receiver); + } + + @Test + public void testBasicReceiveAndDeleteWithSequenceData() throws InterruptedException, ServiceBusException, ExecutionException + { + this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE); + TestCommons.testBasicReceiveAndDeleteWithSequenceData(this.sender, this.sessionId, this.receiver); + } @Test public void testBasicReceiveBatchAndDelete() throws InterruptedException, ServiceBusException, ExecutionException { this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE); - TestCommons.testBasicReceiveBatchAndDelete(this.sender, this.sessionId, this.receiver); + TestCommons.testBasicReceiveBatchAndDelete(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned()); } @Test @@ -151,14 +172,14 @@ public void testBasicReceiveAndRenewLock() throws InterruptedException, ServiceB public void testBasicReceiveAndRenewLockBatch() throws InterruptedException, ServiceBusException, ExecutionException { this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK); - TestCommons.testBasicReceiveAndRenewLockBatch(this.sender, this.sessionId, this.receiver); + TestCommons.testBasicReceiveAndRenewLockBatch(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned()); } @Test public void testBasicReceiveBatchAndComplete() throws InterruptedException, ServiceBusException, ExecutionException { this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK); - TestCommons.testBasicReceiveBatchAndComplete(this.sender, this.sessionId, this.receiver); + TestCommons.testBasicReceiveBatchAndComplete(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned()); } @Test @@ -186,7 +207,7 @@ public void testPeekMessage() throws InterruptedException, ServiceBusException public void testPeekMessageBatch() throws InterruptedException, ServiceBusException { this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK); - TestCommons.testPeekMessageBatch(this.sender, this.sessionId, this.receiver); + TestCommons.testPeekMessageBatch(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned()); } @Test diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java index 438289c65106f..c8917e775e102 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java @@ -4,8 +4,8 @@ import java.net.URI; import java.time.Duration; import java.time.Instant; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - import com.microsoft.azure.servicebus.management.*; import org.junit.After; import org.junit.AfterClass; @@ -112,21 +112,45 @@ public static void cleanupAfterAllTest() throws ExecutionException, InterruptedE managementClient.close(); } - + @Test - public void testBasicReceiveAndDelete() throws InterruptedException, ServiceBusException, ExecutionException - { + public void testBasicReceiveAndDeleteWithValueData() throws InterruptedException, ServiceBusException, ExecutionException + { String sessionId = TestUtils.getRandomString(); - this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE); - TestCommons.testBasicReceiveAndDelete(this.sender, sessionId, this.session); - } + this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE); + TestCommons.testBasicReceiveAndDeleteWithValueData(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveAndDeleteWithBinaryData() throws InterruptedException, ServiceBusException, ExecutionException + { + String sessionId = TestUtils.getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE); + TestCommons.testBasicReceiveAndDeleteWithBinaryData(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveAndDeleteWithLargeBinaryData() throws InterruptedException, ServiceBusException, ExecutionException + { + String sessionId = TestUtils.getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE); + TestCommons.testBasicReceiveAndDeleteWithLargeBinaryData(this.sender, sessionId, this.session); + } + + @Test + public void testBasicReceiveAndDeleteWithSequenceData() throws InterruptedException, ServiceBusException, ExecutionException + { + String sessionId = TestUtils.getRandomString(); + this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE); + TestCommons.testBasicReceiveAndDeleteWithSequenceData(this.sender, sessionId, this.session); + } @Test public void testBasicReceiveBatchAndDelete() throws InterruptedException, ServiceBusException, ExecutionException { String sessionId = TestUtils.getRandomString(); this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE); - TestCommons.testBasicReceiveBatchAndDelete(this.sender, sessionId, this.session); + TestCommons.testBasicReceiveBatchAndDelete(this.sender, sessionId, this.session, this.isEntityPartitioned()); } @Test @@ -158,7 +182,7 @@ public void testBasicReceiveBatchAndComplete() throws InterruptedException, Serv { String sessionId = TestUtils.getRandomString(); this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.PEEKLOCK); - TestCommons.testBasicReceiveBatchAndComplete(this.sender, sessionId, this.session); + TestCommons.testBasicReceiveBatchAndComplete(this.sender, sessionId, this.session, this.isEntityPartitioned()); } @Test @@ -190,7 +214,7 @@ public void testPeekMessageBatch() throws InterruptedException, ServiceBusExcept { String sessionId = TestUtils.getRandomString(); this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.PEEKLOCK); - TestCommons.testPeekMessageBatch(this.sender, sessionId, this.session); + TestCommons.testPeekMessageBatch(this.sender, sessionId, this.session, this.isEntityPartitioned()); } @Test @@ -306,16 +330,20 @@ public void testAcceptSessionTimeoutShouldNotLockSession() throws InterruptedExc } @Test - public void testRequestResponseLinkRequestLimit() throws InterruptedException, ServiceBusException + public void testRequestResponseLinkRequestLimit() throws InterruptedException, ServiceBusException, ExecutionException { int limitToTest = 5000; String sessionId = TestUtils.getRandomString(); this.session = ClientFactory.acceptSessionFromEntityPath(TestUtils.getNamespaceEndpointURI(), this.receiveEntityPath, sessionId, TestUtils.getClientSettings(), ReceiveMode.PEEKLOCK); + CompletableFuture[] futures = new CompletableFuture[limitToTest]; for(int i=0; i future = this.session.renewSessionLockAsync(); + futures[i] = future; } + CompletableFuture.allOf(futures).get(); + this.session.renewSessionLock(); } diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientSessionTests.java index 5c3cd73b51adc..fc98508df677f 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientSessionTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientSessionTests.java @@ -7,7 +7,7 @@ public String getEntityNamePrefix() { @Override public boolean isEntityQueue() { - return true; + return false; } @Override diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java index 85058292d4d26..79c4b7ac54635 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java @@ -40,38 +40,120 @@ public static void testBasicSendBatch(IMessageSender sender) throws InterruptedE sender.sendBatch(messages); } - public static void testBasicReceiveAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException - { - String messageId = UUID.randomUUID().toString(); - Message message = new Message("AMQP message"); - message.setMessageId(messageId); - if(sessionId != null) - { - message.setSessionId(sessionId); - } - sender.send(message); - - IMessage receivedMessage = receiver.receive(); - Assert.assertNotNull("Message not received", receivedMessage); - Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); - receivedMessage = receiver.receive(SHORT_WAIT_TIME); - Assert.assertNull("Message received again", receivedMessage); + public static void testBasicReceiveAndDeleteWithValueData(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException + { + String messageData = "testBasicReceiveAndDeleteWithValueData"; + String messageId = UUID.randomUUID().toString(); + Message message = new Message(MessageBody.fromValueData(messageData)); + message.setMessageId(messageId); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + + sender.send(message); + + IMessage receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); + Assert.assertEquals("Message Body Type did not match", MessageBodyType.VALUE, receivedMessage.getMessageBody().getBodyType()); + Assert.assertEquals("Message content did not match", messageData, receivedMessage.getMessageBody().getValueData()); + receivedMessage = receiver.receive(SHORT_WAIT_TIME); + Assert.assertNull("Message received again", receivedMessage); + } + + public static void testBasicReceiveAndDeleteWithBinaryData(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException + { + testBasicReceiveAndDeleteWithBinaryData(sender, sessionId, receiver, 64); } - public static void testBasicReceiveBatchAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException + public static void testBasicReceiveAndDeleteWithLargeBinaryData(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException + { + testBasicReceiveAndDeleteWithBinaryData(sender, sessionId, receiver, 64 * 1024); + } + + private static void testBasicReceiveAndDeleteWithBinaryData(IMessageSender sender, String sessionId, IMessageReceiver receiver, int messageSize) throws InterruptedException, ServiceBusException, ExecutionException + { + String messageId = UUID.randomUUID().toString(); + byte[] binaryData = new byte[messageSize]; + for(int i=0; i< binaryData.length; i++) + { + binaryData[i] = (byte)i; + } + Message message = new Message(Utils.fromBinay(binaryData)); + message.setMessageId(messageId); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + + sender.send(message); + + IMessage receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); + Assert.assertEquals("Message Body Type did not match", MessageBodyType.BINARY, receivedMessage.getMessageBody().getBodyType()); + Assert.assertArrayEquals("Message content did not match", binaryData, Utils.getDataFromMessageBody(receivedMessage.getMessageBody())); + receivedMessage = receiver.receive(SHORT_WAIT_TIME); + Assert.assertNull("Message received again", receivedMessage); + } + + public static void testBasicReceiveAndDeleteWithSequenceData(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException + { + String messageId = UUID.randomUUID().toString(); + List sequence = new ArrayList(); + sequence.add("azure"); + sequence.add("servicebus"); + sequence.add("messaging"); + Message message = new Message(Utils.fromSequence(sequence)); + message.setMessageId(messageId); + if(sessionId != null) + { + message.setSessionId(sessionId); + } + sender.send(message); + + IMessage receivedMessage = receiver.receive(); + Assert.assertNotNull("Message not received", receivedMessage); + Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId()); + Assert.assertEquals("Message Body Type did not match", MessageBodyType.SEQUENCE, receivedMessage.getMessageBody().getBodyType()); + Assert.assertArrayEquals("Message content did not match", sequence.toArray(new String[] {}), Utils.getSequenceFromMessageBody(receivedMessage.getMessageBody()).toArray(new String[] {})); + receivedMessage = receiver.receive(SHORT_WAIT_TIME); + Assert.assertNull("Message received again", receivedMessage); + } + + public static void testBasicReceiveBatchAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException, ExecutionException { int numMessages = 10; - List messages = new ArrayList(); - for(int i=0; i messages = new ArrayList(); + for(int i=0; i receivedMessages = receiver.receiveBatch(numMessages); @@ -169,30 +251,55 @@ public static void testBasicReceiveAndRenewLock(IMessageSender sender, String se receiver.complete(receivedMessage.getLockToken()); } - public static void testBasicReceiveAndRenewLockBatch(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException + public static void testBasicReceiveAndRenewLockBatch(IMessageSender sender, String sessionId, IMessageReceiver receiver, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException, ExecutionException { - int numMessages = 10; - List messages = new ArrayList(); - for(int i=0; i messages = new ArrayList(); + for(int i=0; i totalReceivedMessages = new ArrayList<>(); + ArrayList totalReceivedMessages = new ArrayList<>(); - Collection receivedMessages = receiver.receiveBatch(numMessages); - while(receivedMessages != null && receivedMessages.size() > 0 && totalReceivedMessages.size() < numMessages) + if(isEntityPartitioned && sessionId == null) { - totalReceivedMessages.addAll(receivedMessages); - receivedMessages = receiver.receiveBatch(numMessages); + Collection receivedMessages = receiver.receiveBatch(1); + Assert.assertTrue("Messages not received", receivedMessages != null && receivedMessages.size() > 0); + totalReceivedMessages.addAll(receivedMessages); + } + else + { + Collection receivedMessages = receiver.receiveBatch(numMessages); + while(receivedMessages != null && receivedMessages.size() > 0 && totalReceivedMessages.size() < numMessages) + { + totalReceivedMessages.addAll(receivedMessages); + receivedMessages = receiver.receiveBatch(numMessages); + } + Assert.assertEquals("All messages not received", numMessages, totalReceivedMessages.size()); } - Assert.assertEquals("All messages not received", numMessages, totalReceivedMessages.size()); ArrayList oldLockTimes = new ArrayList(); for(IMessage message : totalReceivedMessages) @@ -215,20 +322,36 @@ public static void testBasicReceiveAndRenewLockBatch(IMessageSender sender, Stri } } - public static void testBasicReceiveBatchAndComplete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException + public static void testBasicReceiveBatchAndComplete(IMessageSender sender, String sessionId, IMessageReceiver receiver, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException, ExecutionException { int numMessages = 10; - List messages = new ArrayList(); - for(int i=0; i messages = new ArrayList(); + for(int i=0; i receivedMessages = receiver.receiveBatch(numMessages); @@ -265,7 +388,16 @@ public static void testSendSceduledMessageAndReceive(IMessageSender sender, Stri sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); - Thread.sleep(secondsToWaitBeforeScheduling * 1000); + if(sessionId == null) + { + Thread.sleep(secondsToWaitBeforeScheduling * 1000 * 2); + } + else + { + Thread.sleep(secondsToWaitBeforeScheduling * 1000); + ((IMessageSession)receiver).renewSessionLock(); + Thread.sleep(secondsToWaitBeforeScheduling * 1000); + } Collection allReceivedMessages = new LinkedList(); Collection receivedMessages = receiver.receiveBatch(10); @@ -307,7 +439,16 @@ public static void testSendSceduledMessageAndCancel(IMessageSender sender, Strin sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); long sequnceNumberMsg2 = sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling)); sender.cancelScheduledMessage(sequnceNumberMsg2); - Thread.sleep(secondsToWaitBeforeScheduling * 1000); + if(sessionId == null) + { + Thread.sleep(secondsToWaitBeforeScheduling * 1000 * 2); + } + else + { + Thread.sleep(secondsToWaitBeforeScheduling * 1000); + ((IMessageSession)receiver).renewSessionLock(); + Thread.sleep(secondsToWaitBeforeScheduling * 1000); + } Collection allReceivedMessages = new LinkedList(); Collection receivedMessages = receiver.receiveBatch(10); @@ -346,12 +487,20 @@ public static void testPeekMessage(IMessageSender sender, String sessionId, IMes Assert.assertEquals("Peek with sequence number failed.", firstMessageSequenceNumber, peekedMessage5.getSequenceNumber()); } - public static void testPeekMessageBatch(IMessageSender sender, String sessionId, IMessageBrowser browser) throws InterruptedException, ServiceBusException - { + public static void testPeekMessageBatch(IMessageSender sender, String sessionId, IMessageBrowser browser, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException + { + String partitionKey = "pkey1"; Message message = new Message("AMQP Scheduled message"); if(sessionId != null) { - message.setSessionId(sessionId); + message.setSessionId(sessionId); + } + else + { + if(isEntityPartitioned) + { + message.setPartitionKey(partitionKey); + } } sender.send(message); message = new Message("AMQP Scheduled message2"); @@ -359,6 +508,13 @@ public static void testPeekMessageBatch(IMessageSender sender, String sessionId, { message.setSessionId(sessionId); } + else + { + if(isEntityPartitioned) + { + message.setPartitionKey(partitionKey); + } + } sender.send(message); Thread.sleep(5000); Collection peekedMessages = browser.peekBatch(10); diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilderTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilderTests.java index 77c9c3ca3868f..be45178c24f72 100644 --- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilderTests.java +++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilderTests.java @@ -7,8 +7,8 @@ public class ConnectionStringBuilderTests { @Test - public void test() { - String connectionString = "Endpoint=sb://test.servicebus.windows.net/;SharedAccessSignature=SharedAccessSignature sr=amqp%3A%2F%2test.servicebus.windows.net%2topic"; + public void ConnectionStringBuilderTest() { + String connectionString = "Endpoint=sb://test.servicebus.windows.net/;SharedAccessSignatureToken=SharedAccessSignature sr=amqp%3A%2F%2test.servicebus.windows.net%2topic"; ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString); assertEquals("SharedAccessSignature sr=amqp%3A%2F%2test.servicebus.windows.net%2topic", builder.getSharedAccessSignatureToken()); diff --git a/pom.xml b/pom.xml index 837f01de17d3c..83a5985fae78f 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ https://github.com/Azure/azure-service-bus-java - 0.29.0 + 0.31.0 4.12 1.7.0 2.0.0-PREVIEW-8-SNAPSHOT