diff --git a/azure-servicebus/pom.xml b/azure-servicebus/pom.xml
index 265ec670924b4..d25ed1a356188 100644
--- a/azure-servicebus/pom.xml
+++ b/azure-servicebus/pom.xml
@@ -41,6 +41,10 @@
org.apache.maven.plugins
maven-surefire-plugin
2.20
+
+ 0
+ true
+
@@ -51,17 +55,12 @@
proton-j
${proton-j-version}
-
+
com.microsoft.azure
qpid-proton-j-extensions
1.1.0
- org.bouncycastle
- bcpkix-jdk15on
- 1.53
-
-
com.microsoft.azure
adal4j
1.3.0
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java
index 0a7639bc1216a..4aed5611195c6 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessage.java
@@ -75,7 +75,7 @@ public interface IMessage {
* Gets the content type of this message.
*
* Optionally describes the payload of the message, with a descriptor following the format of
- * RFC2045, Section 5, for example "application/json".
+ * RFC2045, Section 5, for example "application/json". Note that content type is not same as message body type.
*
* @return content type of this message
*/
@@ -213,7 +213,9 @@ public interface IMessage {
*
* @return body of this message
* @see Messages, payloads, and serialization
+ * @deprecated Message body need not just a byte array. Replaced by {@link #getMessageBody()}
*/
+ @Deprecated
public byte[] getBody();
/**
@@ -221,9 +223,28 @@ public interface IMessage {
*
* @param body body of this message
* @see #getBody()
+ * @deprecated Message body need not just a byte array. Replaced by {@link #setMessageBody(MessageBody)}
*/
+ @Deprecated
public void setBody(byte[] body);
+ /**
+ * Gets the body of this message. Client applications should extract message content based on body type.
+ *
+ * @return body of this message
+ * @see Messages, payloads, and serialization
+ */
+ public MessageBody getMessageBody();
+
+ /**
+ * Sets the body of this message.
+ *
+ * @param body body of this message
+ * @see #getMessageBody()
+ */
+ public void setMessageBody(MessageBody body);
+
+
/**
* Gets the map of user application properties of this message. Client
* applications can set user properties (headers) on the message using this map.
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java
index f1d0bee507572..64197d5d8bb28 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageAndSessionPump.java
@@ -34,7 +34,7 @@ interface IMessageAndSessionPump {
* IMessageHandler methods are executed on the passed executor service.
*
* @param handler The {@link IMessageHandler} instance
- * @param executorService ExecutorService which is used to execute {@link IMessageHandler} methods. If there are
+ * @param executorService ExecutorService which is used to execute {@link IMessageHandler} methods.
* @throws InterruptedException if the current thread was interrupted while waiting
* @throws ServiceBusException if register failed
*/
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java
index ddc07605b62ca..2daabef60d931 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java
@@ -11,19 +11,12 @@
import java.util.Map;
import java.util.UUID;
-
-/**
- *
- *
- */
final public class Message implements Serializable, IMessage {
private static final long serialVersionUID = 7849508139219590863L;
-
private static final Charset DEFAULT_CHAR_SET = Charset.forName("UTF-8");
-
private static final String DEFAULT_CONTENT_TYPE = null;
- private static final byte[] DEFAULT_CONTENT = new byte[0];
+ private static final MessageBody DEFAULT_CONTENT = Utils.fromBinay(new byte[0]);
private long deliveryCount;
@@ -31,7 +24,7 @@ final public class Message implements Serializable, IMessage {
private Duration timeToLive;
- private byte[] content;
+ private MessageBody messageBody;
private String contentType;
@@ -67,40 +60,103 @@ final public class Message implements Serializable, IMessage {
private byte[] deliveryTag;
+ /**
+ * Creates an empty message with an empty byte array as body.
+ */
public Message()
{
this(DEFAULT_CONTENT);
}
+ /**
+ * Creates a message from a string. For backward compatibility reasons, the string is converted to a byte array and message body type is set to binary.
+ * @param content content of the message.
+ */
public Message(String content)
{
this(content.getBytes(DEFAULT_CHAR_SET));
}
+ /**
+ * Creates a message from a byte array. Message body type is set to binary.
+ * @param content content of the message
+ */
public Message(byte[] content)
{
- this(content, DEFAULT_CONTENT_TYPE);
+ this(Utils.fromBinay(content));
+ }
+
+ /**
+ * Creates a message from message body.
+ * @param body message body
+ */
+ public Message(MessageBody body)
+ {
+ this(body, DEFAULT_CONTENT_TYPE);
}
+ /**
+ * Creates a message from a string. For backward compatibility reasons, the string is converted to a byte array and message body type is set to binary.
+ * @param content content of the message
+ * @param contentType content type of the message
+ */
public Message(String content, String contentType)
{
this(content.getBytes(DEFAULT_CHAR_SET), contentType);
}
+ /**
+ * Creates a message from a byte array. Message body type is set to binary.
+ * @param content content of the message
+ * @param contentType content type of the message
+ */
public Message(byte[] content, String contentType)
{
- this(UUID.randomUUID().toString(), content, contentType);
+ this(Utils.fromBinay(content), contentType);
}
+ /**
+ * Creates a message from message body.
+ * @param body message body
+ * @param contentType content type of the message
+ */
+ public Message(MessageBody body, String contentType)
+ {
+ this(UUID.randomUUID().toString(), body, contentType);
+ }
+
+ /**
+ * Creates a message from a string. For backward compatibility reasons, the string is converted to a byte array and message body type is set to binary.
+ * @param messageId id of the message
+ * @param content content of the message
+ * @param contentType content type of the message
+ */
public Message(String messageId, String content, String contentType)
{
this(messageId, content.getBytes(DEFAULT_CHAR_SET), contentType);
}
-
+
+ /**
+ * Creates a message from a byte array. Message body type is set to binary.
+ * @param messageId id of the message
+ * @param content content of the message
+ * @param contentType content type of the message
+ */
public Message(String messageId, byte[] content, String contentType)
+ {
+ this(messageId, Utils.fromBinay(content), contentType);
+ }
+
+ /**
+ * Creates a message from message body.
+ * @param messageId id of the message
+ * @param body message body
+ * @param contentType content type of the message
+ */
+ public Message(String messageId, MessageBody body, String contentType)
{
this.messageId = messageId;
- this.content = content;
+ this.messageBody = body;
this.contentType = contentType;
this.properties = new HashMap<>();
}
@@ -186,16 +242,6 @@ public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
- @Override
- public byte[] getBody() {
- return this.content;
- }
-
- @Override
- public void setBody(byte[] content) {
- this.content = content;
- }
-
@Override
public Map getProperties() {
return this.properties;
@@ -325,4 +371,30 @@ void setDeliveryTag(byte[] deliveryTag)
{
this.deliveryTag = deliveryTag;
}
+
+ @Override
+ @Deprecated
+ public byte[] getBody()
+ {
+ return Utils.getDataFromMessageBody(this.messageBody);
+ }
+
+ @Override
+ @Deprecated
+ public void setBody(byte[] body)
+ {
+ this.messageBody = Utils.fromBinay(body);
+ }
+
+ @Override
+ public MessageBody getMessageBody()
+ {
+ return this.messageBody;
+ }
+
+ @Override
+ public void setMessageBody(MessageBody body)
+ {
+ this.messageBody = body;
+ }
}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java
new file mode 100644
index 0000000000000..fb697f3b79732
--- /dev/null
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java
@@ -0,0 +1,112 @@
+package com.microsoft.azure.servicebus;
+
+import java.util.List;
+
+/**
+ * This class encapsulates the body of a message. Body types map to AMQP message body types.
+ * It has getters and setters for multiple body types.
+ * Client should test for body type before calling corresponding get method.
+ * Get methods not corresponding to the type of the body return null.
+ */
+public class MessageBody {
+ private MessageBodyType bodyType;
+ private Object valueData;
+ private List> sequenceData;
+ private List binaryData;
+
+ private MessageBody() {}
+
+ /**
+ * Creates message body of AMQPValue type.
+ * @param value AMQPValue content of the message. It must be of a type supported by AMQP.
+ * @return MessageBody instance wrapping around the value data.
+ */
+ public static MessageBody fromValueData(Object value)
+ {
+ if(value == null)
+ {
+ throw new IllegalArgumentException("Value data is null.");
+ }
+
+ MessageBody body = new MessageBody();
+ body.bodyType = MessageBodyType.VALUE;
+ body.valueData = value;
+ body.sequenceData = null;
+ body.binaryData = null;
+ return body;
+ }
+
+ /**
+ * Creates a message body from a list of AMQPSequence sections.Each AMQPSequence section is in turn a list of objects.
+ * Please note that this version of the SDK supports only one AMQPSequence section in a message. It means only a list of exactly one sequence in it is accepted as message body.
+ * @param sequenceData a list of AMQPSequence sections. Each AMQPSequence section is in turn a list of objects. Every object in each list must of a type supported by AMQP.
+ * @return MessageBody instance wrapping around the sequence data.
+ */
+ public static MessageBody fromSequenceData(List> sequenceData)
+ {
+ if(sequenceData == null || sequenceData.size() == 0 || sequenceData.size() > 1)
+ {
+ throw new IllegalArgumentException("Sequence data is null or has more than one collection in it.");
+ }
+
+ MessageBody body = new MessageBody();
+ body.bodyType = MessageBodyType.SEQUENCE;
+ body.valueData = null;
+ body.sequenceData = sequenceData;
+ body.binaryData = null;
+ return body;
+ }
+
+ /**
+ * Creates a message body from a list of Data sections.Each Data section is a byte array.
+ * Please note that this version of the SDK supports only one Data section in a message. It means only a list of exactly one byte array in it is accepted as message body.
+ * @param binaryData a list of byte arrays.
+ * @return MessageBody instance wrapping around the binary data.
+ */
+ public static MessageBody fromBinaryData(List binaryData)
+ {
+ if(binaryData == null || binaryData.size() == 0 || binaryData.size() > 1)
+ {
+ throw new IllegalArgumentException("Binary data is null or has more than one byte array in it.");
+ }
+
+ MessageBody body = new MessageBody();
+ body.bodyType = MessageBodyType.BINARY;
+ body.valueData = null;
+ body.sequenceData = null;
+ body.binaryData = binaryData;
+ return body;
+ }
+
+ /**
+ * Returns the content of message body.
+ * @return value of message body only if the MessageBody is of Value type. Returns null otherwise.
+ */
+ public Object getValueData() {
+ return valueData;
+ }
+
+ /**
+ * Returns the content of message body.
+ * @return a list of AMQPSequence sections only if the MessageBody is of Sequence type. Returns null otherwise. Each AMQPSequence section is in turn a list of objects.
+ */
+ public List> getSequenceData() {
+ return sequenceData;
+ }
+
+ /**
+ * Returns the content of message body.
+ * @return message body as list of byte arrays only if the MessageBody is of Binary type. Returns null otherwise.
+ */
+ public List getBinaryData() {
+ return binaryData;
+ }
+
+ /**
+ * Return the type of content in this message body.
+ * @return type of message content
+ */
+ public MessageBodyType getBodyType() {
+ return bodyType;
+ }
+}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBodyType.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBodyType.java
new file mode 100644
index 0000000000000..486edaa4805de
--- /dev/null
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBodyType.java
@@ -0,0 +1,20 @@
+package com.microsoft.azure.servicebus;
+
+/**
+ * Enumeration to represent body type of a message.
+ *
+ */
+public enum MessageBodyType {
+ /**
+ * Message content is byte array, equivalent to AMQP Data.
+ */
+ BINARY,
+ /**
+ * Message content is a list of objects, equivalent to AMQP Sequence. Each object must be of a type supported by AMQP.
+ */
+ SEQUENCE,
+ /**
+ * Message content is a single object, equivalent to AMQP Value. The object must be of a type supported by AMQP.
+ */
+ VALUE
+}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java
index 9a02fa82c43ed..326ca4f600d8b 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java
@@ -6,6 +6,7 @@
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -13,6 +14,7 @@
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.*;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import com.microsoft.azure.servicebus.primitives.ClientConstants;
import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag;
@@ -25,9 +27,21 @@ class MessageConverter
public static org.apache.qpid.proton.message.Message convertBrokeredMessageToAmqpMessage(Message brokeredMessage)
{
org.apache.qpid.proton.message.Message amqpMessage = Proton.message();
- if(brokeredMessage.getBody() != null)
+ MessageBody body = brokeredMessage.getMessageBody();
+ if( body != null)
{
- amqpMessage.setBody(new Data(new Binary(brokeredMessage.getBody())));
+ if (body.getBodyType() == MessageBodyType.VALUE)
+ {
+ amqpMessage.setBody(new AmqpValue(body.getValueData()));
+ }
+ else if (body.getBodyType() == MessageBodyType.SEQUENCE)
+ {
+ amqpMessage.setBody(new AmqpSequence(Utils.getSequenceFromMessageBody(body)));
+ }
+ else
+ {
+ amqpMessage.setBody(new Data(new Binary(Utils.getDataFromMessageBody(body))));
+ }
}
if(brokeredMessage.getProperties() != null)
@@ -98,12 +112,22 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton
if(body instanceof Data)
{
Binary messageData = ((Data)body).getValue();
- brokeredMessage = new Message(messageData.getArray());
+ brokeredMessage = new Message(Utils.fromBinay(messageData.getArray()));
+ }
+ else if (body instanceof AmqpValue)
+ {
+ Object messageData = ((AmqpValue)body).getValue();
+ brokeredMessage = new Message(MessageBody.fromValueData(messageData));
+ }
+ else if (body instanceof AmqpSequence)
+ {
+ List messageData = ((AmqpSequence)body).getValue();
+ brokeredMessage = new Message(Utils.fromSequence(messageData));
}
else
{
- // TODO: handle other types of message body
- brokeredMessage = new Message();
+ // Should never happen
+ brokeredMessage = new Message();
}
}
else
@@ -198,5 +222,5 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton
brokeredMessage.setDeliveryTag(deliveryTag);
return brokeredMessage;
- }
+ }
}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java
index 663bcdadbad16..2f8421e70aef6 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java
@@ -651,6 +651,11 @@ public CompletableFuture renewMessageLockAsync(IMessage message) {
public CompletableFuture> renewMessageLockBatchAsync(Collection extends IMessage> messages) {
this.ensurePeekLockReceiveMode();
+ if(messages == null || messages.size() == 0)
+ {
+ throw new UnsupportedOperationException("Message collection is null or empty. Locks cannot be renewed.");
+ }
+
UUID[] lockTokens = new UUID[messages.size()];
int messageIndex = 0;
for (IMessage message : messages) {
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java
index 553ec9beeaabc..0ce0840cc9cb2 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Utils.java
@@ -3,6 +3,8 @@
package com.microsoft.azure.servicebus;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -41,4 +43,44 @@ static void assertNonNull(String argumentName, Object argument) {
if (argument == null)
throw new IllegalArgumentException("Argument '" + argumentName + "' is null.");
}
+
+ static MessageBody fromSequence(List sequence)
+ {
+ List> sequenceData = new ArrayList<>();
+ sequenceData.add(sequence);
+ return MessageBody.fromSequenceData(sequenceData);
+ }
+
+ static MessageBody fromBinay(byte[] binary)
+ {
+ List binaryData = new ArrayList<>();
+ binaryData.add(binary);
+ return MessageBody.fromBinaryData(binaryData);
+ }
+
+ static byte[] getDataFromMessageBody(MessageBody messageBody)
+ {
+ List binaryData = messageBody.getBinaryData();
+ if(binaryData == null || binaryData.size() == 0)
+ {
+ return null;
+ }
+ else
+ {
+ return binaryData.get(0);
+ }
+ }
+
+ static List getSequenceFromMessageBody(MessageBody messageBody)
+ {
+ List> sequenceData = messageBody.getSequenceData();
+ if(sequenceData == null || sequenceData.size() == 0)
+ {
+ return null;
+ }
+ else
+ {
+ return sequenceData.get(0);
+ }
+ }
}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java
index 86079527fc28b..c0ce9cc09addc 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java
@@ -25,7 +25,7 @@ public final class AmqpErrorCode
// connection errors
public static final Symbol ConnectionForced = Symbol.getSymbol("amqp:connection:forced");
-
+ public static final Symbol FramingError = Symbol.getSymbol("amqp:connection:framing-error");
// proton library IOExceptions while performing operations on SocketChannel (in IOHandler.java)
public static final Symbol ProtonIOError = Symbol.getSymbol("proton:io");
}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java
index 439471c7c2130..2d6535058c4bc 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java
@@ -4,9 +4,12 @@
*/
package com.microsoft.azure.servicebus.amqp;
+import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
+import javax.net.ssl.SSLContext;
+
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.TransportType;
import org.apache.qpid.proton.Proton;
@@ -18,6 +21,7 @@
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.SslDomain;
+import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.reactor.Handshaker;
@@ -44,7 +48,7 @@ public static ConnectionHandler create(TransportType transportType, IAmqpConnect
{
switch(transportType) {
case AMQP_WEB_SOCKETS:
- if (ProxyConnectionHandler.shouldUseProxy( ((MessagingFactory)messagingFactory).getHostName() )) {
+ if (ProxyConnectionHandler.shouldUseProxy(messagingFactory.getHostName())) {
return new ProxyConnectionHandler(messagingFactory);
} else {
return new WebSocketConnectionHandler(messagingFactory);
@@ -59,7 +63,7 @@ public static ConnectionHandler create(TransportType transportType, IAmqpConnect
public void onConnectionInit(Event event)
{
final Connection connection = event.getConnection();
- final String hostName = new StringBuilder(((MessagingFactory)messagingFactory).getHostName())
+ final String hostName = new StringBuilder(messagingFactory.getHostName())
.append(":")
.append(String.valueOf(this.getProtocolPort()))
.toString();
@@ -83,13 +87,28 @@ protected IAmqpConnection getMessagingFactory()
public void addTransportLayers(final Event event, final TransportInternal transport)
{
- final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
- transport.ssl(domain);
+ SslDomain domain = Proton.sslDomain();
+ domain.init(SslDomain.Mode.CLIENT);
+
+ try {
+ // Default SSL context will have the root certificate from azure in truststore anyway
+ SSLContext defaultContext = SSLContext.getDefault();
+ StrictTLSContextSpi strictTlsContextSpi = new StrictTLSContextSpi(defaultContext);
+ SSLContext strictTlsContext = new StrictTLSContext(strictTlsContextSpi, defaultContext.getProvider(), defaultContext.getProtocol());
+ domain.setSslContext(strictTlsContext);
+ domain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER_NAME);
+ SslPeerDetails peerDetails = Proton.sslPeerDetails(this.getOutboundSocketHostName(), this.getOutboundSocketPort());
+ transport.ssl(domain, peerDetails);
+ } catch (NoSuchAlgorithmException e) {
+ // Should never happen
+ TRACE_LOGGER.error("Default SSL algorithm not found in JRE. Please check your JRE setup.", e);
+// this.messagingFactory.onConnectionError(new ErrorCondition(AmqpErrorCode.InternalError, e.getMessage()));
+ }
}
protected void notifyTransportErrors(final Event event) { /* no-op */ }
- public String getOutboundSocketHostName() { return ((MessagingFactory)messagingFactory).getHostName(); }
+ public String getOutboundSocketHostName() { return messagingFactory.getHostName(); }
public int getOutboundSocketPort() { return this.getProtocolPort(); }
@@ -106,11 +125,10 @@ public int getMaxFrameSize()
@Override
public void onConnectionBound(Event event)
{
- TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname());
+ TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname());
Transport transport = event.getTransport();
-
+
this.addTransportLayers(event, (TransportInternal) transport);
-
Sasl sasl = transport.sasl();
sasl.setMechanisms("ANONYMOUS");
}
@@ -180,14 +198,4 @@ public void onConnectionLocalClose(Event event) {
connection.free();
}
}
-
- private static SslDomain makeDomain(SslDomain.Mode mode)
- {
- SslDomain domain = Proton.sslDomain();
- domain.init(mode);
-
- // TODO: VERIFY_PEER_NAME support
- domain.setPeerAuthentication(SslDomain.VerifyMode.ANONYMOUS_PEER);
- return domain;
- }
}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java
index 89d1e8c27de3b..b4853d3d1f4e1 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java
@@ -10,6 +10,8 @@
public interface IAmqpConnection
{
+ String getHostName();
+
void onConnectionOpen();
void onConnectionError(ErrorCondition error);
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProxyConnectionHandler.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProxyConnectionHandler.java
index bed203bd2cf82..41ab910197e68 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProxyConnectionHandler.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProxyConnectionHandler.java
@@ -91,7 +91,7 @@ protected void notifyTransportErrors(final Event event) {
final IOException ioException = reconstructIOException(errorCondition);
proxySelector.connectFailed(
- createURIFromHostNamePort(((MessagingFactory)this.getMessagingFactory()).getHostName(), this.getProtocolPort()),
+ createURIFromHostNamePort(this.getMessagingFactory().getHostName(), this.getProtocolPort()),
new InetSocketAddress(hostNameParts[0], port),
ioException
);
@@ -144,7 +144,7 @@ public int getOutboundSocketPort() {
private InetSocketAddress getProxyAddress() {
final URI serviceUri = createURIFromHostNamePort(
- ((MessagingFactory)this.getMessagingFactory()).getHostName(),
+ this.getMessagingFactory().getHostName(),
this.getProtocolPort());
final ProxySelector proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContext.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContext.java
new file mode 100644
index 0000000000000..86eaea9ffa8bf
--- /dev/null
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContext.java
@@ -0,0 +1,14 @@
+package com.microsoft.azure.servicebus.amqp;
+
+import java.security.Provider;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLContextSpi;
+
+// Customer SSLContext that wraps around an SSLContext and removes SSLv2Hello protocol from every SSLEngine created.
+public class StrictTLSContext extends SSLContext{
+
+ protected StrictTLSContext(SSLContextSpi contextSpi, Provider provider, String protocol) {
+ super(contextSpi, provider, protocol);
+ }
+}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContextSpi.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContextSpi.java
new file mode 100644
index 0000000000000..9940298bffb53
--- /dev/null
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/StrictTLSContextSpi.java
@@ -0,0 +1,94 @@
+package com.microsoft.azure.servicebus.amqp;
+
+import java.security.KeyManagementException;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLContextSpi;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSessionContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+
+// Wraps over a standard SSL context and disables the SSLv2Hello protocol.
+public class StrictTLSContextSpi extends SSLContextSpi{
+
+ private static final String SSLv2Hello = "SSLv2Hello";
+
+ SSLContext innerContext;
+ public StrictTLSContextSpi(SSLContext innerContext) {
+ this.innerContext = innerContext;
+ }
+
+ @Override
+ protected SSLEngine engineCreateSSLEngine() {
+ SSLEngine engine = this.innerContext.createSSLEngine();
+ this.removeSSLv2Hello(engine);
+ return engine;
+ }
+
+ @Override
+ protected SSLEngine engineCreateSSLEngine(String arg0, int arg1) {
+ SSLEngine engine = this.innerContext.createSSLEngine(arg0, arg1);
+ this.removeSSLv2Hello(engine);
+ return engine;
+ }
+
+ @Override
+ protected SSLSessionContext engineGetClientSessionContext() {
+ return innerContext.getClientSessionContext();
+ }
+
+ @Override
+ protected SSLSessionContext engineGetServerSessionContext() {
+ return this.innerContext.getServerSessionContext();
+ }
+
+ @Override
+ protected SSLServerSocketFactory engineGetServerSocketFactory() {
+ return this.innerContext.getServerSocketFactory();
+ }
+
+ @Override
+ protected SSLSocketFactory engineGetSocketFactory() {
+ return this.innerContext.getSocketFactory();
+ }
+
+ @Override
+ protected void engineInit(KeyManager[] km, TrustManager[] tm, SecureRandom sr) throws KeyManagementException {
+ this.innerContext.init(km, tm, sr);
+ }
+
+ private void removeSSLv2Hello(SSLEngine engine)
+ {
+ String[] enabledProtocols = engine.getEnabledProtocols();
+ boolean sslv2HelloFound = false;
+ for(String protocol : enabledProtocols)
+ {
+ if(protocol.equalsIgnoreCase(SSLv2Hello))
+ {
+ sslv2HelloFound = true;
+ break;
+ }
+ }
+
+ if(sslv2HelloFound)
+ {
+ ArrayList modifiedProtocols = new ArrayList();
+ for(String protocol : enabledProtocols)
+ {
+ if(!protocol.equalsIgnoreCase(SSLv2Hello))
+ {
+ modifiedProtocols.add(protocol);
+ }
+ }
+
+ engine.setEnabledProtocols(modifiedProtocols.toArray(new String[modifiedProtocols.size()]));
+ }
+
+
+ }
+}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java
index 3db7374ca5504..540aced30b3c2 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java
@@ -51,7 +51,6 @@ private ClientConstants() { }
public final static Symbol ENTITY_DISABLED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-disabled");
public final static Symbol PARTITION_NOT_OWNED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":partition-not-owned");
public final static Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost");
- public final static Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked");
public final static Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout");
public final static Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout");
public final static Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":transfer-destination-address");
@@ -184,6 +183,7 @@ private ClientConstants() { }
static final int DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS = 5;
static final String SAS_TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";
+ static final Duration SAS_TOKEN_SEND_TIMEOUT = Duration.ofSeconds(10);
private static String getClientVersion() {
String clientVersion;
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java
index 819155aaa1d0c..233a5596b9c8b 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java
@@ -95,7 +95,6 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver,
private Receiver receiveLink;
private RequestResponseLink requestResponseLink;
private WorkItem linkOpen;
- private Duration factoryRceiveTimeout;
private Exception lastKnownLinkError;
private Instant lastKnownErrorReportedAt;
@@ -134,7 +133,6 @@ private CoreMessageReceiver(final MessagingFactory factory,
this.prefetchedMessages = new ConcurrentLinkedQueue();
this.linkClose = new CompletableFuture();
this.lastKnownLinkError = null;
- this.factoryRceiveTimeout = factory.getOperationTimeout();
this.prefetchCountSync = new Object();
this.retryPolicy = factory.getRetryPolicy();
this.pendingReceives = new ConcurrentLinkedQueue();
@@ -420,6 +418,7 @@ private void createReceiveLink()
BaseHandler.setHandler(receiver, handler);
receiver.open();
this.receiveLink = receiver;
+ this.underlyingFactory.registerForConnectionError(this.receiveLink);
}
CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure)
@@ -431,7 +430,7 @@ CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure)
else
{
CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
- return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;TRACE_LOGGER.debug("Sent SAS Token and set renew timer");});
+ return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;});
}
}
@@ -575,7 +574,7 @@ public void run()
}
},
timeout,
- TimerType.OneTimeRun);
+ TimerType.OneTimeRun);
this.ensureLinkIsOpen().thenRun(() -> {this.addCredit(receiveWorkItem);});
return onReceive;
@@ -616,8 +615,6 @@ public void onOpenComplete(Exception exception)
if (exception == null)
{
- this.underlyingFactory.registerForConnectionError(this.receiveLink);
-
if (this.linkOpen != null && !this.linkOpen.getWork().isDone())
{
AsyncUtil.completeFuture(this.linkOpen.getWork(), this);
@@ -652,13 +649,6 @@ public void onOpenComplete(Exception exception)
{
TRACE_LOGGER.warn("Opening receive link '{}' to '{}' failed.", this.receiveLink.getName(), this.receivePath, exception);
AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception);
- if(this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException))
- {
- // No point in retrying to establish a link.. SessionLock is lost
- TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId);
- this.isSessionLockLost = true;
- this.closeAsync();
- }
}
this.lastKnownLinkError = exception;
@@ -806,12 +796,6 @@ else if (remoteOutcome instanceof Released)
}
}
- public void onError(ErrorCondition error)
- {
- Exception completionException = ExceptionUtil.toException(error);
- this.onError(completionException);
- }
-
@Override
public void onError(Exception exception)
{
@@ -832,6 +816,7 @@ public void onError(Exception exception)
}
else
{
+ this.underlyingFactory.deregisterForConnectionError(this.receiveLink);
TRACE_LOGGER.warn("Receive link '{}' to '{}', sessionId '{}' closed with error.", this.receiveLink.getName(), this.receivePath, this.sessionId, exception);
this.lastKnownLinkError = exception;
if ((this.linkOpen != null && !this.linkOpen.getWork().isDone()) ||
@@ -839,39 +824,35 @@ public void onError(Exception exception)
{
this.onOpenComplete(exception);
}
- else
- {
- this.underlyingFactory.deregisterForConnectionError(this.receiveLink);
-
- if (exception != null &&
- (!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient()))
- {
- this.clearAllPendingWorkItems(exception);
-
- if(this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException))
- {
- // No point in retrying to establish a link.. SessionLock is lost
- TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId);
- this.isSessionLockLost = true;
- this.closeAsync();
- }
- }
- else
- {
- // TODO Why recreating link needs to wait for retry interval of pending receive?
- ReceiveWorkItem workItem = this.pendingReceives.peek();
- if (workItem != null && workItem.getTimeoutTracker() != null)
- {
- Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy()
- .getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining());
- if (nextRetryInterval != null)
- {
- TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' will be reopened after '{}'", this.receiveLink.getName(), this.receivePath, this.sessionId, nextRetryInterval);
- Timer.schedule(() -> {CoreMessageReceiver.this.ensureLinkIsOpen();}, nextRetryInterval, TimerType.OneTimeRun);
- }
- }
- }
- }
+
+ if (exception != null &&
+ (!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient()))
+ {
+ this.clearAllPendingWorkItems(exception);
+
+ if(this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException))
+ {
+ // No point in retrying to establish a link.. SessionLock is lost
+ TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId);
+ this.isSessionLockLost = true;
+ this.closeAsync();
+ }
+ }
+ else
+ {
+ // TODO Why recreating link needs to wait for retry interval of pending receive?
+ ReceiveWorkItem workItem = this.pendingReceives.peek();
+ if (workItem != null && workItem.getTimeoutTracker() != null)
+ {
+ Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy()
+ .getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining());
+ if (nextRetryInterval != null)
+ {
+ TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' will be reopened after '{}'", this.receiveLink.getName(), this.receivePath, this.sessionId, nextRetryInterval);
+ Timer.schedule(() -> {CoreMessageReceiver.this.ensureLinkIsOpen();}, nextRetryInterval, TimerType.OneTimeRun);
+ }
+ }
+ }
}
}
@@ -983,7 +964,8 @@ public void onClose(ErrorCondition condition)
}
else
{
- this.onError(condition);
+ Exception completionException = ExceptionUtil.toException(condition);
+ this.onError(completionException);
}
}
@@ -1189,7 +1171,7 @@ private CompletableFuture updateMessageStateAsync(byte[] deliveryTag, Outc
state = (DeliveryState) outcome;
}
- final UpdateStateWorkItem workItem = new UpdateStateWorkItem(completeMessageFuture, state, CoreMessageReceiver.this.factoryRceiveTimeout);
+ final UpdateStateWorkItem workItem = new UpdateStateWorkItem(completeMessageFuture, state, CoreMessageReceiver.this.operationTimeout);
CoreMessageReceiver.this.pendingUpdateStateRequests.put(deliveryTagAsString, workItem);
CoreMessageReceiver.this.ensureLinkIsOpen().thenRun(() -> {
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java
index d488b99302722..9aa54d538d1ae 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java
@@ -419,7 +419,6 @@ public void onOpenComplete(Exception completionException)
{
if (completionException == null)
{
- this.underlyingFactory.registerForConnectionError(this.sendLink);
this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink);
this.lastKnownLinkError = null;
this.retryPolicy.resetRetryCount(this.getClientId());
@@ -680,6 +679,7 @@ private void createSendLink(SenderLinkSettings linkSettings)
BaseHandler.setHandler(sender, handler);
sender.open();
this.sendLink = sender;
+ this.underlyingFactory.registerForConnectionError(this.sendLink);
}
CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure)
@@ -689,9 +689,9 @@ CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure)
return CompletableFuture.completedFuture(null);
}
else
- {
+ {
CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
- CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f; TRACE_LOGGER.debug("Sent SAS Token and set renew timer");});
+ CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;});
if (this.transferDestinationPath!= null && !this.transferDestinationPath.isEmpty())
{
@@ -706,7 +706,7 @@ CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure)
private void cancelSASTokenRenewTimer()
{
if(this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone())
- {
+ {
this.sasTokenRenewTimerFuture.cancel(true);
TRACE_LOGGER.debug("Cancelled SAS Token renew timer");
}
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ExceptionUtil.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ExceptionUtil.java
index 49be50bfde83b..a4ad1ec973732 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ExceptionUtil.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ExceptionUtil.java
@@ -83,7 +83,15 @@ else if (errorCondition.getCondition() == ClientConstants.STORE_LOCK_LOST_ERROR)
}
else if (errorCondition.getCondition() == AmqpErrorCode.AmqpLinkDetachForced)
{
- return new ServiceBusException(false, new AmqpException(errorCondition));
+ return new ServiceBusException(true, new AmqpException(errorCondition));
+ }
+ else if (errorCondition.getCondition() == AmqpErrorCode.ConnectionForced)
+ {
+ return new ServiceBusException(true, new AmqpException(errorCondition));
+ }
+ else if (errorCondition.getCondition() == AmqpErrorCode.FramingError)
+ {
+ return new ServiceBusException(true, new AmqpException(errorCondition));
}
else if (errorCondition.getCondition() == AmqpErrorCode.ResourceLimitExceeded)
{
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
index 2e783afd1a710..05e218d9e5b76 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
@@ -70,16 +70,15 @@ public class MessagingFactory extends ClientEntity implements IAmqpConnection
private RequestResponseLink cbsLink;
private int cbsLinkCreationAttempts = 0;
private Throwable lastCBSLinkCreationException = null;
+ private URI namespaceEndpointUri;
private final ClientSettings clientSettings;
- private final URI namespaceEndpointUri;
private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings)
{
super("MessagingFactory".concat(StringUtil.getShortRandomString()));
- this.namespaceEndpointUri = namespaceEndpointUri;
this.clientSettings = clientSettings;
-
+ this.namespaceEndpointUri = namespaceEndpointUri;
this.hostName = namespaceEndpointUri.getHost();
this.registeredLinks = new LinkedList ();
this.connetionCloseFuture = new CompletableFuture();
@@ -181,6 +180,7 @@ private synchronized CompletableFuture createController() {
});
}
+ @Override
public String getHostName()
{
return this.hostName;
@@ -222,7 +222,7 @@ Connection getConnection()
{
if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED)
{
- TRACE_LOGGER.info("Creating connection to host '{}:{}'", hostName, ClientConstants.AMQPS_PORT);
+ TRACE_LOGGER.info("Creating connection to host '{}:{}'", this.connectionHandler.getOutboundSocketHostName(), this.connectionHandler.getOutboundSocketPort());
this.connection = this.getReactor().connectionToHost(
this.connectionHandler.getOutboundSocketHostName(),
this.connectionHandler.getOutboundSocketPort(),
@@ -655,42 +655,74 @@ CompletableFuture sendSecurityToken(String sasTokenAudienceUri)
CompletableFuture> sendSecurityTokenAndSetRenewTimer(String sasTokenAudienceURI, boolean retryOnFailure, Runnable validityRenewer)
{
+ CompletableFuture> result = new CompletableFuture>();
TRACE_LOGGER.debug("Sending token for {}", sasTokenAudienceURI);
- CompletableFuture tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceURI);
- return tokenFuture.thenComposeAsync((t) ->
- {
- SecurityToken generatedSecurityToken = t;
- CompletableFuture sendTokenFuture = this.cbsLinkCreationFuture.thenComposeAsync((v) -> {
- return CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.clientSettings.getOperationTimeout()), generatedSecurityToken);
- }, MessagingFactory.INTERNAL_THREAD_POOL);
-
- if(retryOnFailure)
- {
- return sendTokenFuture.handleAsync((v, sendTokenEx) -> {
- if(sendTokenEx == null)
- {
- TRACE_LOGGER.debug("Sent token for {}", sasTokenAudienceURI);
- return MessagingFactory.scheduleRenewTimer(generatedSecurityToken.getValidUntil(), validityRenewer);
- }
- else
- {
- TRACE_LOGGER.warn("Sending CBS Token for {} failed.", sasTokenAudienceURI, sendTokenEx);
- TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", sasTokenAudienceURI, ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS);
- return Timer.schedule(validityRenewer, Duration.ofSeconds(ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS), TimerType.OneTimeRun);
- }
- }, MessagingFactory.INTERNAL_THREAD_POOL);
- }
- else
- {
- // Let the exception of the sendToken state pass up to caller
- return sendTokenFuture.thenApply((v) -> {
- TRACE_LOGGER.debug("Sent token for {}", sasTokenAudienceURI);
- return MessagingFactory.scheduleRenewTimer(generatedSecurityToken.getValidUntil(), validityRenewer);
- });
- }
- }, MessagingFactory.INTERNAL_THREAD_POOL);
+ CompletableFuture sendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI);
+ sendTokenFuture.handleAsync((validUntil, sendTokenEx) -> {
+ if(sendTokenEx == null)
+ {
+ TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI);
+ ScheduledFuture> renewalFuture = MessagingFactory.scheduleRenewTimer(validUntil, validityRenewer);
+ result.complete(renewalFuture);
+ }
+ else
+ {
+ Throwable sendFailureCause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
+ TRACE_LOGGER.warn("Sending CBS Token for {} failed.", sasTokenAudienceURI, sendFailureCause);
+ if(retryOnFailure)
+ {
+ // Just schedule another attempt
+ TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", sasTokenAudienceURI, ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS);
+ ScheduledFuture> renewalFuture = Timer.schedule(validityRenewer, Duration.ofSeconds(ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS), TimerType.OneTimeRun);
+ result.complete(renewalFuture);
+ }
+ else
+ {
+ if(sendFailureCause instanceof TimeoutException)
+ {
+ // Retry immediately on timeout. This is a special case as CBSLink may be disconnected right after the token is sent, but before it reaches the service
+ TRACE_LOGGER.debug("Resending token for {}", sasTokenAudienceURI);
+ CompletableFuture resendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI);
+ resendTokenFuture.handleAsync((resendValidUntil, resendTokenEx) -> {
+ if(resendTokenEx == null)
+ {
+ TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI);
+ ScheduledFuture> renewalFuture = MessagingFactory.scheduleRenewTimer(resendValidUntil, validityRenewer);
+ result.complete(renewalFuture);
+ }
+ else
+ {
+ Throwable resendFailureCause = ExceptionUtil.extractAsyncCompletionCause(resendTokenEx);
+ TRACE_LOGGER.warn("Resending CBS Token for {} failed.", sasTokenAudienceURI, resendFailureCause);
+ result.completeExceptionally(resendFailureCause);
+ }
+ return null;
+ }, MessagingFactory.INTERNAL_THREAD_POOL);
+ }
+ else
+ {
+ result.completeExceptionally(sendFailureCause);
+ }
+ }
+ }
+ return null;
+ }, MessagingFactory.INTERNAL_THREAD_POOL);
+
+ return result;
}
+ private CompletableFuture generateAndSendSecurityToken(String sasTokenAudienceURI)
+ {
+ CompletableFuture tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceURI);
+ return tokenFuture.thenComposeAsync((t) ->
+ {
+ SecurityToken generatedSecurityToken = t;
+ return this.cbsLinkCreationFuture.thenComposeAsync((v) -> {
+ return CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, ClientConstants.SAS_TOKEN_SEND_TIMEOUT, generatedSecurityToken).thenApply((u) -> generatedSecurityToken.getValidUntil());
+ }, MessagingFactory.INTERNAL_THREAD_POOL);
+ }, MessagingFactory.INTERNAL_THREAD_POOL);
+ }
+
private static ScheduledFuture> scheduleRenewTimer(Instant currentTokenValidUntil, Runnable validityRenewer)
{
// It will eventually expire. Renew it
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java
index 7f61d6bacc451..af9acea1a59bd 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/RequestResponseLink.java
@@ -60,6 +60,8 @@ class RequestResponseLink extends ClientEntity{
private boolean isRecreateLinksInProgress;
private Map additionalProperties;
private MessagingEntityType entityType;
+ private boolean isInnerLinksCloseHandled;
+ private int internalLinkGeneration;
public static CompletableFuture createAsync(
MessagingFactory messagingFactory,
@@ -170,6 +172,7 @@ private RequestResponseLink(
{
super(linkName);
+ this.internalLinkGeneration = 1;
this.recreateLinksLock = new Object();
this.isRecreateLinksInProgress = false;
this.underlyingFactory = messagingFactory;
@@ -184,6 +187,7 @@ private RequestResponseLink(
this.replyTo = UUID.randomUUID().toString();
this.createFuture = new CompletableFuture();
this.entityType = entityType;
+ this.isInnerLinksCloseHandled = false;
}
public String getLinkPath()
@@ -200,7 +204,7 @@ private CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure
else
{
CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true));
- CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f; TRACE_LOGGER.debug("Set SAS Token renew timer");});
+ CompletableFuture sasTokenFuture = sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;});
if (additionalAudienceURI != null) {
CompletableFuture transferSendTokenFuture = this.underlyingFactory.sendSecurityToken(this.additionalAudienceURI);
@@ -208,6 +212,41 @@ private CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure
}
return sasTokenFuture;
+ }
+ }
+
+ private void onInnerLinksClosed(int linkGeneration, Exception exception)
+ {
+ // Ignore exceptions from last generation links
+ if(this.internalLinkGeneration == linkGeneration)
+ {
+ // This method is called twice once when inner receive link is closed and again when inner send link is closed.
+ // Both of them happen in succession anyway, not concurrently as all these happen on the reactor thread.
+ if(!this.isInnerLinksCloseHandled)
+ {
+ // Set it here and Reset the flag only before recreating inner links
+ this.isInnerLinksCloseHandled = true;
+ this.cancelSASTokenRenewTimer();
+ if(this.pendingRequests.size() > 0)
+ {
+ if(exception != null && exception instanceof ServiceBusException && ((ServiceBusException) exception).getIsTransient())
+ {
+ Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(this.getClientId(), exception, this.underlyingFactory.getOperationTimeout());
+ if (nextRetryInterval != null)
+ {
+ Timer.schedule(() -> {RequestResponseLink.this.ensureUniqueLinkRecreation();}, nextRetryInterval, TimerType.OneTimeRun);
+ }
+ else
+ {
+ this.completeAllPendingRequestsWithException(exception);
+ }
+ }
+ else
+ {
+ this.completeAllPendingRequestsWithException(exception);
+ }
+ }
+ }
}
}
@@ -222,6 +261,7 @@ private void cancelSASTokenRenewTimer()
private void createInternalLinks()
{
+ this.isInnerLinksCloseHandled = false;
Map commonLinkProperties = new HashMap<>();
// ServiceBus expects timeout to be of type unsignedint
commonLinkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis()));
@@ -257,9 +297,6 @@ private void createInternalLinks()
sender.setProperties(commonLinkProperties);
SendLinkHandler sendLinkHandler = new SendLinkHandler(this.amqpSender);
BaseHandler.setHandler(sender, sendLinkHandler);
- this.amqpSender.setSendLink(sender);
- TRACE_LOGGER.debug("RequestReponseLink - opening send link to {}", this.linkPath);
- sender.open();
// Create receive link
session = connection.session();
@@ -286,9 +323,40 @@ private void createInternalLinks()
final ReceiveLinkHandler receiveLinkHandler = new ReceiveLinkHandler(this.amqpReceiver);
BaseHandler.setHandler(receiver, receiveLinkHandler);
- this.amqpReceiver.setReceiveLink(receiver);
+
+ this.amqpSender.setLinks(sender, receiver);
+ this.amqpReceiver.setLinks(sender, receiver);
+
+ TRACE_LOGGER.debug("RequestReponseLink - opening send link to {}", this.linkPath);
+ sender.open();
+ this.underlyingFactory.registerForConnectionError(sender);
TRACE_LOGGER.debug("RequestReponseLink - opening receive link to {}", this.linkPath);
receiver.open();
+ this.underlyingFactory.registerForConnectionError(receiver);
+ }
+
+ private void ensureUniqueLinkRecreation()
+ {
+ synchronized (this.recreateLinksLock) {
+ if(!this.isRecreateLinksInProgress)
+ {
+ this.isRecreateLinksInProgress = true;
+ this.recreateInternalLinks().handleAsync((v, recreationEx) ->
+ {
+ if(recreationEx != null)
+ {
+ TRACE_LOGGER.warn("Recreating internal links of reqestresponselink '{}' failed.", this.linkPath, ExceptionUtil.extractAsyncCompletionCause(recreationEx));
+ }
+
+ synchronized (this.recreateLinksLock)
+ {
+ this.isRecreateLinksInProgress = false;
+ }
+
+ return null;
+ }, MessagingFactory.INTERNAL_THREAD_POOL);
+ }
+ }
}
private CompletableFuture recreateInternalLinks()
@@ -299,6 +367,7 @@ private CompletableFuture recreateInternalLinks()
this.cancelSASTokenRenewTimer();
// Create new internal sender and receiver objects, as old ones are closed
+ this.internalLinkGeneration++;
this.amqpSender = new InternalSender(this.getClientId() + ":internalSender", this, this.amqpSender);
this.amqpReceiver = new InternalReceiver(this.getClientId() + ":interalReceiver", this);
CompletableFuture recreateInternalLinksFuture = new CompletableFuture();
@@ -337,6 +406,7 @@ public void onEvent()
if(ex == null)
{
TRACE_LOGGER.info("Recreated internal links to {}", this.linkPath);
+ this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());
recreateInternalLinksFuture.complete(null);
}
else
@@ -384,32 +454,7 @@ private void completeAllPendingRequestsWithException(Exception exception)
public CompletableFuture requestAysnc(Message requestMessage, TransactionContext transaction, Duration timeout)
{
this.throwIfClosed(null);
- // Check and recreate links if necessary
- if(!((this.amqpSender.sendLink.getLocalState() == EndpointState.ACTIVE && this.amqpSender.sendLink.getRemoteState() == EndpointState.ACTIVE)
- && (this.amqpReceiver.receiveLink.getLocalState() == EndpointState.ACTIVE && this.amqpReceiver.receiveLink.getRemoteState() == EndpointState.ACTIVE)))
- {
- synchronized (this.recreateLinksLock) {
- if(!this.isRecreateLinksInProgress)
- {
- this.isRecreateLinksInProgress = true;
- this.recreateInternalLinks().handleAsync((v, recreationEx) ->
- {
- if(recreationEx != null)
- {
- TRACE_LOGGER.warn("Recreating internal links of reqestresponselink '{}' failed.", this.linkPath, ExceptionUtil.extractAsyncCompletionCause(recreationEx));
- }
-
- synchronized (this.recreateLinksLock)
- {
- this.isRecreateLinksInProgress = false;
- }
-
- return null;
- }, MessagingFactory.INTERNAL_THREAD_POOL);
- }
- }
- }
-
+
CompletableFuture responseFuture = new CompletableFuture();
RequestResponseWorkItem workItem = new RequestResponseWorkItem(requestMessage, transaction, responseFuture, timeout);
String requestId = "request:" + this.requestCounter.incrementAndGet();
@@ -419,6 +464,14 @@ public CompletableFuture requestAysnc(Message requestMessage, Transacti
workItem.setTimeoutTask(this.scheduleRequestTimeout(requestId, timeout));
TRACE_LOGGER.debug("Sending request with id:{}", requestId);
this.amqpSender.sendRequest(requestId, false);
+
+ // Check and recreate links if necessary
+ if(!((this.amqpSender.sendLink.getLocalState() == EndpointState.ACTIVE && this.amqpSender.sendLink.getRemoteState() == EndpointState.ACTIVE)
+ && (this.amqpReceiver.receiveLink.getLocalState() == EndpointState.ACTIVE && this.amqpReceiver.receiveLink.getRemoteState() == EndpointState.ACTIVE)))
+ {
+ this.ensureUniqueLinkRecreation();
+ }
+
return responseFuture;
}
@@ -544,12 +597,15 @@ private class InternalReceiver extends ClientEntity implements IAmqpReceiver
{
private RequestResponseLink parent;
private Receiver receiveLink;
+ private Sender matchingSendLink;
private CompletableFuture openFuture;
private CompletableFuture closeFuture;
+ private int linkGeneration;
protected InternalReceiver(String clientId, RequestResponseLink parent) {
super(clientId);
this.parent = parent;
+ this.linkGeneration = parent.internalLinkGeneration;// Read it in the constructor as it may change later
this.openFuture = new CompletableFuture();
this.closeFuture = new CompletableFuture();
}
@@ -603,7 +659,6 @@ public void onOpenComplete(Exception completionException) {
if(completionException == null)
{
TRACE_LOGGER.debug("Opened internal receive link of requestresponselink to {}", parent.linkPath);
- this.parent.underlyingFactory.registerForConnectionError(this.receiveLink);
AsyncUtil.completeFuture(this.openFuture, null);
// Send unlimited credit
@@ -633,59 +688,30 @@ public void onError(Exception exception) {
AsyncUtil.completeFutureExceptionally(this.closeFuture, exception);
}
}
-
- TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' encountered error.", this.receiveLink.getName(), this.parent.linkPath, exception);
- this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink);
- if(this.parent.amqpSender.sendLink != null)
+ else
{
- this.parent.amqpSender.sendLink.close();
- this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpSender.sendLink);
+ TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' encountered error.", this.receiveLink.getName(), this.parent.linkPath, exception);
+ this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink);
+ this.matchingSendLink.close();
+ this.parent.underlyingFactory.deregisterForConnectionError(this.matchingSendLink);
+ this.parent.onInnerLinksClosed(this.linkGeneration, exception);
}
- this.parent.cancelSASTokenRenewTimer();
- this.parent.completeAllPendingRequestsWithException(exception);
}
@Override
public void onClose(ErrorCondition condition) {
- if(this.getIsClosingOrClosed())
+ if(condition == null || condition.getCondition() == null)
{
- if(!this.closeFuture.isDone())
+ if(this.getIsClosingOrClosed() && !this.closeFuture.isDone())
{
- if(condition == null || condition.getCondition() == null)
- {
- TRACE_LOGGER.info("Closed internal receive link of requestresponselink to {}", parent.linkPath);
- AsyncUtil.completeFuture(this.closeFuture, null);
- }
- else
- {
- Exception exception = ExceptionUtil.toException(condition);
- TRACE_LOGGER.error("Closing internal receive link '{}' of requestresponselink to {} failed.", this.receiveLink.getName(), this.parent.linkPath, exception);
- AsyncUtil.completeFutureExceptionally(this.closeFuture, exception);
- }
+ TRACE_LOGGER.info("Closed internal receive link of requestresponselink to {}", parent.linkPath);
+ AsyncUtil.completeFuture(this.closeFuture, null);
}
}
else
{
- if(condition != null)
- {
- Exception exception = ExceptionUtil.toException(condition);
- if(!this.openFuture.isDone())
- {
- this.onOpenComplete(exception);
- }
- else
- {
- TRACE_LOGGER.warn("Internal receive link '{}' of requestresponselink to '{}' closed with error.", this.receiveLink.getName(), this.parent.linkPath, exception);
- this.parent.underlyingFactory.deregisterForConnectionError(this.receiveLink);
- if(this.parent.amqpSender.sendLink != null)
- {
- this.parent.amqpSender.sendLink.close();
- this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpSender.sendLink);
- }
- this.parent.cancelSASTokenRenewTimer();
- this.parent.completeAllPendingRequestsWithException(exception);
- }
- }
+ Exception exception = ExceptionUtil.toException(condition);
+ this.onError(exception);
}
}
@@ -724,15 +750,17 @@ public void onReceiveComplete(Delivery delivery)
}
});
}
-
- public void setReceiveLink(Receiver receiveLink) {
+
+ public void setLinks(Sender sendLink, Receiver receiveLink) {
this.receiveLink = receiveLink;
+ this.matchingSendLink = sendLink;
}
}
private class InternalSender extends ClientEntity implements IAmqpSender
{
private Sender sendLink;
+ private Receiver matchingReceiveLink;
private RequestResponseLink parent;
private CompletableFuture openFuture;
private CompletableFuture closeFuture;
@@ -742,10 +770,12 @@ private class InternalSender extends ClientEntity implements IAmqpSender
private Object pendingSendsSyncLock;
private boolean isSendLoopRunning;
private int maxMessageSize;
+ private int linkGeneration;
protected InternalSender(String clientId, RequestResponseLink parent, InternalSender senderToBeCopied) {
super(clientId);
this.parent = parent;
+ this.linkGeneration = parent.internalLinkGeneration;// Read it in the constructor as it may change later
this.availableCredit = new AtomicInteger(0);
this.pendingSendsSyncLock = new Object();
this.isSendLoopRunning = false;
@@ -813,7 +843,6 @@ public void onOpenComplete(Exception completionException) {
if(completionException == null)
{
TRACE_LOGGER.debug("Opened internal send link of requestresponselink to {}", parent.linkPath);
- this.parent.underlyingFactory.registerForConnectionError(this.sendLink);
this.maxMessageSize = Util.getMaxMessageSizeFromLink(this.sendLink);
AsyncUtil.completeFuture(this.openFuture, null);
this.runSendLoop();
@@ -842,61 +871,30 @@ public void onError(Exception exception) {
AsyncUtil.completeFutureExceptionally(this.closeFuture, exception);
}
}
-
- TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' encountered error.", this.sendLink.getName(), this.parent.linkPath, exception);
- this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink);
- if(this.parent.amqpReceiver.receiveLink != null)
+ else
{
- this.parent.amqpReceiver.receiveLink.close();
- this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpReceiver.receiveLink);
+ TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' encountered error.", this.sendLink.getName(), this.parent.linkPath, exception);
+ this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink);
+ this.matchingReceiveLink.close();
+ this.parent.underlyingFactory.deregisterForConnectionError(this.matchingReceiveLink);
+ this.parent.onInnerLinksClosed(this.linkGeneration, exception);
}
- this.parent.cancelSASTokenRenewTimer();
- this.parent.completeAllPendingRequestsWithException(exception);
}
@Override
public void onClose(ErrorCondition condition) {
- if(this.getIsClosingOrClosed())
+ if(condition == null || condition.getCondition() == null)
{
- if(!this.closeFuture.isDone())
+ if(!this.closeFuture.isDone() && !this.closeFuture.isDone())
{
- if(condition == null || condition.getCondition() == null)
- {
- TRACE_LOGGER.info("Closed internal send link of requestresponselink to {}", this.parent.linkPath);
- AsyncUtil.completeFuture(this.closeFuture, null);
- }
- else
- {
- Exception exception = ExceptionUtil.toException(condition);
- TRACE_LOGGER.error("Closing internal send link '{}' of requestresponselink to {} failed.", this.sendLink.getName(), this.parent.linkPath, exception);
- AsyncUtil.completeFutureExceptionally(this.closeFuture, exception);
- }
+ TRACE_LOGGER.info("Closed internal send link of requestresponselink to {}", this.parent.linkPath);
+ AsyncUtil.completeFuture(this.closeFuture, null);
}
}
else
{
- if(condition != null)
- {
- Exception exception = ExceptionUtil.toException(condition);
- if(!this.openFuture.isDone())
- {
- this.onOpenComplete(exception);
- }
- else
- {
- TRACE_LOGGER.warn("Internal send link '{}' of requestresponselink to '{}' closed with error.", this.sendLink.getName(), this.parent.linkPath, exception);
- this.parent.underlyingFactory.deregisterForConnectionError(this.sendLink);
-
- if(this.parent.amqpReceiver.receiveLink != null)
- {
- this.parent.amqpReceiver.receiveLink.close();
- this.parent.underlyingFactory.deregisterForConnectionError(this.parent.amqpReceiver.receiveLink);
- }
-
- this.parent.cancelSASTokenRenewTimer();
- this.parent.completeAllPendingRequestsWithException(exception);
- }
- }
+ Exception exception = ExceptionUtil.toException(condition);
+ this.onError(exception);
}
}
@@ -960,9 +958,10 @@ public void onFlow(int creditIssued) {
public void onSendComplete(Delivery delivery) {
// Doesn't happen as sends are settled on send
}
-
- public void setSendLink(Sender sendLink) {
+
+ public void setLinks(Sender sendLink, Receiver receiveLink) {
this.sendLink = sendLink;
+ this.matchingReceiveLink = receiveLink;
this.availableCredit = new AtomicInteger(0);
}
@@ -1043,7 +1042,7 @@ private void runSendLoop()
else
{
TRACE_LOGGER.warn("Request with id:{} not found in the requestresponse link.", requestIdToBeSent);
- }
+ }
}
}
finally
diff --git a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java
index 3511e170b5e44..d9f451685a48e 100644
--- a/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java
+++ b/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java
@@ -397,7 +397,7 @@ static int encodeMessageToCustomArray(Message message, byte[] encodedBytes, int
}
catch(BufferOverflowException exception)
{
- throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", length / 1024), exception);
+ throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s KB", length / 1024), exception);
}
}
@@ -461,7 +461,7 @@ else if(tokenValidityInSeconds >= 60)
return tokenValidityInSeconds - 10;
}
else
- {
+ {
return (tokenValidityInSeconds - 1) > 0 ? tokenValidityInSeconds - 1 : 0;
}
}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java
index 96749fe99c31e..cc54cd462260a 100644
--- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java
@@ -185,7 +185,7 @@ public CompletableFuture OnCloseSessionAsync(IMessageSession session) {
}
}, MessageAndSessionPumpTests.EXECUTOR_SERVICE);
- Thread.sleep(1000); // Sleep for a second for the exception
+ Thread.sleep(2000); // Sleep for two seconds for the exception
Assert.assertTrue("QueueClient created to a subscription which shouldn't be allowed.", unsupportedExceptionOccured.get());
} finally {
qc.close();
@@ -223,7 +223,7 @@ public CompletableFuture OnCloseSessionAsync(IMessageSession session) {
}
}, MessageAndSessionPumpTests.EXECUTOR_SERVICE);
- Thread.sleep(1000); // Sleep for a second for the exception
+ Thread.sleep(2000); // Sleep for two seconds for the exception
Assert.assertTrue("SubscriptionClient created to a queue which shouldn't be allowed.", unsupportedExceptionOccured.get());
} finally {
sc.close();
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java
index 185764579f969..1259666c0b2c7 100644
--- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageAndSessionPumpTests.java
@@ -90,7 +90,7 @@ public static void testMessagePumpRenewLock(IMessageSender sender, IMessageAndSe
sender.send(new Message("AMQPMessage"));
}
boolean autoComplete = true;
- int sleepMinutes = 1; // This should be less than message lock duration of the queue or subscription
+ int sleepMinutes = 1; // This should be more than message lock duration of the queue or subscription
CountingMessageHandler messageHandler = new CountingMessageHandler(messagePump, !autoComplete, numMessages, false, Duration.ofMinutes(sleepMinutes));
messagePump.registerMessageHandler(messageHandler, new MessageHandlerOptions(numMessages, autoComplete, Duration.ofMinutes(10)), EXECUTOR_SERVICE);
int waitMinutes = 2 * sleepMinutes;
@@ -309,7 +309,8 @@ private static class CountingMessageHandler extends TestMessageHandler
CountingMessageHandler(IMessageAndSessionPump messagePump, boolean completeMessages, int messageCount, boolean firstThrowException)
{
- this(messagePump, completeMessages, messageCount, firstThrowException, Duration.ZERO);
+ // Let's make every onMessage sleep for sometime so we can reliably count max concurrent threads
+ this(messagePump, completeMessages, messageCount, firstThrowException, Duration.ofSeconds(2));
}
CountingMessageHandler(IMessageAndSessionPump messagePump, boolean completeMessages, int messageCount, boolean firstThrowException, Duration sleepDuration)
@@ -391,7 +392,8 @@ private static class CountingSessionHandler extends TestSessionHandler
CountingSessionHandler(IMessageAndSessionPump sessionPump, boolean completeMessages, int totalMessageCount, boolean firstThrowException)
{
- this(sessionPump, completeMessages, totalMessageCount, firstThrowException, Duration.ZERO);
+ // Let's make every onMessage sleep for sometime so we can reliably count max concurrent threads
+ this(sessionPump, completeMessages, totalMessageCount, firstThrowException, Duration.ofSeconds(2));
}
CountingSessionHandler(IMessageAndSessionPump sessionPump, boolean completeMessages, int totalMessageCount, boolean firstThrowException, Duration sleepDuration)
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageBodyTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageBodyTests.java
new file mode 100644
index 0000000000000..b52647c1e1e63
--- /dev/null
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/MessageBodyTests.java
@@ -0,0 +1,171 @@
+package com.microsoft.azure.servicebus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageBodyTests {
+
+ @Test
+ public void NullBinaryDataTest()
+ {
+ try
+ {
+ MessageBody.fromBinaryData(null);
+ Assert.fail("MessageBody created with null binary data.");
+ }
+ catch(IllegalArgumentException e)
+ {
+ // passed
+ }
+ }
+
+ @Test
+ public void NullSequenceTest()
+ {
+ try
+ {
+ MessageBody.fromSequenceData(null);
+ Assert.fail("MessageBody created with null sequence data.");
+ }
+ catch(IllegalArgumentException e)
+ {
+ // passed
+ }
+ }
+
+ @Test
+ public void NullValueDataTest()
+ {
+ try
+ {
+ MessageBody.fromValueData(null);
+ Assert.fail("MessageBody created with null value data.");
+ }
+ catch(IllegalArgumentException e)
+ {
+ // passed
+ }
+ }
+
+ @Test
+ public void MultipleDataSectionsTest()
+ {
+ try
+ {
+ ArrayList dataList = new ArrayList<>();
+ dataList.add(new byte[0]);
+ dataList.add(new byte[0]);
+ MessageBody.fromBinaryData(dataList);
+ Assert.fail("MessageBody created with null binary data.");
+ }
+ catch(IllegalArgumentException e)
+ {
+ // passed
+ }
+ }
+
+ @Test
+ public void ZeroDataSectionsTest()
+ {
+ try
+ {
+ ArrayList dataList = new ArrayList<>();
+ MessageBody.fromBinaryData(dataList);
+ Assert.fail("MessageBody created with null binary data.");
+ }
+ catch(IllegalArgumentException e)
+ {
+ // passed
+ }
+ }
+
+ @Test
+ public void MultipleSequenceSectionsTest()
+ {
+ try
+ {
+ ArrayList> sequenceList = new ArrayList<>();
+ ArrayList sequence1 = new ArrayList<>();
+ sequence1.add("hello");
+ ArrayList sequence2 = new ArrayList<>();
+ sequence2.add("howdy");
+ sequenceList.add(sequence1);
+ sequenceList.add(sequence2);
+ MessageBody.fromSequenceData(sequenceList);
+ Assert.fail("MessageBody created with null binary data.");
+ }
+ catch(IllegalArgumentException e)
+ {
+ // passed
+ }
+ }
+
+ @Test
+ public void ZeroSequenceSectionsTest()
+ {
+ try
+ {
+ ArrayList> sequenceList = new ArrayList<>();
+ MessageBody.fromSequenceData(sequenceList);
+ Assert.fail("MessageBody created with null binary data.");
+ }
+ catch(IllegalArgumentException e)
+ {
+ // passed
+ }
+ }
+
+ @Test
+ public void ValueMessageBodyTest()
+ {
+ String value = "ValueBody";
+ MessageBody body = MessageBody.fromValueData(value);
+ Assert.assertEquals("Message body type didn't match.", MessageBodyType.VALUE, body.getBodyType());
+ Assert.assertNull("MessageBody of value type has binary data.", body.getBinaryData());
+ Assert.assertNull("MessageBody of value type has sequence data.", body.getSequenceData());
+ Assert.assertEquals("Message body value didn't match", value, body.getValueData());
+ }
+
+ @Test
+ public void SequenceMessageBodyTest()
+ {
+ String str1 = "hello";
+ String str2 = "howdy";
+ ArrayList> sequenceList = new ArrayList<>();
+ ArrayList sequence1 = new ArrayList<>();
+ sequence1.add(str1);
+ sequence1.add(str2);
+ sequenceList.add(sequence1);
+ MessageBody body = MessageBody.fromSequenceData(sequenceList);
+ Assert.assertEquals("Message body type didn't match.", MessageBodyType.SEQUENCE, body.getBodyType());
+ Assert.assertNull("MessageBody of sequence type has binary data.", body.getBinaryData());
+ Assert.assertNull("MessageBody of sequence type has value data.", body.getValueData());
+ List> outputSequenceList = body.getSequenceData();
+ Assert.assertEquals("Message body sequence didn't match", 1, outputSequenceList.size());
+ List outputInnerSequence = outputSequenceList.get(0);
+ Assert.assertEquals("Message body sequence didn't match", 2, outputInnerSequence.size());
+ Assert.assertEquals("Message body sequence didn't match", str1, outputInnerSequence.get(0));
+ Assert.assertEquals("Message body sequence didn't match", str2, outputInnerSequence.get(1));
+ }
+
+ @Test
+ public void BinaryMessageBodyTest()
+ {
+ byte[] binaryData = new byte[1024];
+ Arrays.fill(binaryData, (byte)32);
+ ArrayList binaryDataList = new ArrayList<>();
+ binaryDataList.add(binaryData);
+ MessageBody body = MessageBody.fromBinaryData(binaryDataList);
+ Assert.assertEquals("Message body type didn't match.", MessageBodyType.BINARY, body.getBodyType());
+ Assert.assertNull("MessageBody of binary type has value data.", body.getValueData());
+ Assert.assertNull("MessageBody of binary type has sequence data.", body.getSequenceData());
+ List outputataList = body.getBinaryData();
+ Assert.assertEquals("Message body binary data didn't match", 1, outputataList.size());
+ byte[] outputBinaryData = outputataList.get(0);
+ Assert.assertEquals("Message body sequence didn't match", binaryData, outputBinaryData);
+ }
+}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientSessionTests.java
new file mode 100644
index 0000000000000..b2ffc6088fa2f
--- /dev/null
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientSessionTests.java
@@ -0,0 +1,18 @@
+package com.microsoft.azure.servicebus;
+
+public class PartitionedQueueClientSessionTests extends QueueClientSessionTests{
+ @Override
+ public String getEntityNamePrefix() {
+ return "PartitionedQueueClientSessionTests";
+ }
+
+ @Override
+ public boolean shouldCreateEntityForEveryTest() {
+ return TestUtils.shouldCreateEntityForEveryTest();
+ }
+
+ @Override
+ public boolean isEntityPartitioned() {
+ return true;
+ }
+}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientTests.java
new file mode 100644
index 0000000000000..6665afaf19b7d
--- /dev/null
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueClientTests.java
@@ -0,0 +1,18 @@
+package com.microsoft.azure.servicebus;
+
+public class PartitionedQueueClientTests extends QueueClientTests{
+ @Override
+ public String getEntityNamePrefix() {
+ return "PartitionedQueueClientTests";
+ }
+
+ @Override
+ public boolean shouldCreateEntityForEveryTest() {
+ return TestUtils.shouldCreateEntityForEveryTest();
+ }
+
+ @Override
+ public boolean isEntityPartitioned() {
+ return true;
+ }
+}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSendReceiveTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSendReceiveTests.java
new file mode 100644
index 0000000000000..73e9a92a4225e
--- /dev/null
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSendReceiveTests.java
@@ -0,0 +1,18 @@
+package com.microsoft.azure.servicebus;
+
+public class PartitionedQueueSendReceiveTests extends QueueSendReceiveTests{
+ @Override
+ public String getEntityNamePrefix() {
+ return "PartitionedQueueSendReceiveTests";
+ }
+
+ @Override
+ public boolean shouldCreateEntityForEveryTest() {
+ return TestUtils.shouldCreateEntityForEveryTest();
+ }
+
+ @Override
+ public boolean isEntityPartitioned() {
+ return true;
+ }
+}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSessionTests.java
new file mode 100644
index 0000000000000..77902f0ec103b
--- /dev/null
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedQueueSessionTests.java
@@ -0,0 +1,19 @@
+package com.microsoft.azure.servicebus;
+
+public class PartitionedQueueSessionTests extends QueueSessionTests
+{
+ @Override
+ public String getEntityNamePrefix() {
+ return "PartitionedQueueSessionTests";
+ }
+
+ @Override
+ public boolean shouldCreateEntityForEveryTest() {
+ return TestUtils.shouldCreateEntityForEveryTest();
+ }
+
+ @Override
+ public boolean isEntityPartitioned() {
+ return true;
+ }
+}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientSessionTests.java
new file mode 100644
index 0000000000000..a01c9248b80f7
--- /dev/null
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientSessionTests.java
@@ -0,0 +1,18 @@
+package com.microsoft.azure.servicebus;
+
+public class PartitionedSubscriptionClientSessionTests extends SubscriptionClientSessionTests{
+ @Override
+ public String getEntityNamePrefix() {
+ return "PartitionedSubscriptionClientSessionTests";
+ }
+
+ @Override
+ public boolean shouldCreateEntityForEveryTest() {
+ return TestUtils.shouldCreateEntityForEveryTest();
+ }
+
+ @Override
+ public boolean isEntityPartitioned() {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientTests.java
new file mode 100644
index 0000000000000..0a0ba298c29b4
--- /dev/null
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedSubscriptionClientTests.java
@@ -0,0 +1,18 @@
+package com.microsoft.azure.servicebus;
+
+public class PartitionedSubscriptionClientTests extends SubscriptionClientTests{
+ @Override
+ public String getEntityNamePrefix() {
+ return "PartitionedSubscriptionClientTests";
+ }
+
+ @Override
+ public boolean isEntityPartitioned() {
+ return true;
+ }
+
+ @Override
+ public boolean shouldCreateEntityForEveryTest() {
+ return TestUtils.shouldCreateEntityForEveryTest();
+ }
+}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSendReceiveTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSendReceiveTests.java
new file mode 100644
index 0000000000000..9d3542300cd41
--- /dev/null
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSendReceiveTests.java
@@ -0,0 +1,19 @@
+package com.microsoft.azure.servicebus;
+
+public class PartitionedTopicSendReceiveTests extends TopicSendReceiveTests {
+
+ @Override
+ public String getEntityNamePrefix() {
+ return "PartitionedTopicSendReceiveTests";
+ }
+
+ @Override
+ public boolean isEntityPartitioned() {
+ return true;
+ }
+
+ @Override
+ public boolean shouldCreateEntityForEveryTest() {
+ return TestUtils.shouldCreateEntityForEveryTest();
+ }
+}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSessionTests.java
new file mode 100644
index 0000000000000..86e4da2084290
--- /dev/null
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/PartitionedTopicSessionTests.java
@@ -0,0 +1,19 @@
+package com.microsoft.azure.servicebus;
+
+public class PartitionedTopicSessionTests extends TopicSessionTests {
+
+ @Override
+ public String getEntityNamePrefix() {
+ return "PartitionedTopicSessionTests";
+ }
+
+ @Override
+ public boolean isEntityPartitioned() {
+ return true;
+ }
+
+ @Override
+ public boolean shouldCreateEntityForEveryTest() {
+ return TestUtils.shouldCreateEntityForEveryTest();
+ }
+}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java
index 0de4c0a5d7b91..a7781446cd4bc 100644
--- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java
@@ -106,17 +106,38 @@ public static void cleanupAfterAllTest() throws ExecutionException, InterruptedE
}
@Test
- public void testBasicReceiveAndDelete() throws InterruptedException, ServiceBusException, ExecutionException
- {
- this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE);
- TestCommons.testBasicReceiveAndDelete(this.sender, this.sessionId, this.receiver);
- }
+ public void testBasicReceiveAndDeleteWithValueData() throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE);
+ TestCommons.testBasicReceiveAndDeleteWithValueData(this.sender, this.sessionId, this.receiver);
+ }
+
+ @Test
+ public void testBasicReceiveAndDeleteWithBinaryData() throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE);
+ TestCommons.testBasicReceiveAndDeleteWithBinaryData(this.sender, this.sessionId, this.receiver);
+ }
+
+ @Test
+ public void testBasicReceiveAndDeleteWithLargeBinaryData() throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE);
+ TestCommons.testBasicReceiveAndDeleteWithLargeBinaryData(this.sender, this.sessionId, this.receiver);
+ }
+
+ @Test
+ public void testBasicReceiveAndDeleteWithSequenceData() throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE);
+ TestCommons.testBasicReceiveAndDeleteWithSequenceData(this.sender, this.sessionId, this.receiver);
+ }
@Test
public void testBasicReceiveBatchAndDelete() throws InterruptedException, ServiceBusException, ExecutionException
{
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE);
- TestCommons.testBasicReceiveBatchAndDelete(this.sender, this.sessionId, this.receiver);
+ TestCommons.testBasicReceiveBatchAndDelete(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned());
}
@Test
@@ -151,14 +172,14 @@ public void testBasicReceiveAndRenewLock() throws InterruptedException, ServiceB
public void testBasicReceiveAndRenewLockBatch() throws InterruptedException, ServiceBusException, ExecutionException
{
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK);
- TestCommons.testBasicReceiveAndRenewLockBatch(this.sender, this.sessionId, this.receiver);
+ TestCommons.testBasicReceiveAndRenewLockBatch(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned());
}
@Test
public void testBasicReceiveBatchAndComplete() throws InterruptedException, ServiceBusException, ExecutionException
{
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK);
- TestCommons.testBasicReceiveBatchAndComplete(this.sender, this.sessionId, this.receiver);
+ TestCommons.testBasicReceiveBatchAndComplete(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned());
}
@Test
@@ -186,7 +207,7 @@ public void testPeekMessage() throws InterruptedException, ServiceBusException
public void testPeekMessageBatch() throws InterruptedException, ServiceBusException
{
this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.PEEKLOCK);
- TestCommons.testPeekMessageBatch(this.sender, this.sessionId, this.receiver);
+ TestCommons.testPeekMessageBatch(this.sender, this.sessionId, this.receiver, this.isEntityPartitioned());
}
@Test
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java
index 438289c65106f..c8917e775e102 100644
--- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java
@@ -4,8 +4,8 @@
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-
import com.microsoft.azure.servicebus.management.*;
import org.junit.After;
import org.junit.AfterClass;
@@ -112,21 +112,45 @@ public static void cleanupAfterAllTest() throws ExecutionException, InterruptedE
managementClient.close();
}
-
+
@Test
- public void testBasicReceiveAndDelete() throws InterruptedException, ServiceBusException, ExecutionException
- {
+ public void testBasicReceiveAndDeleteWithValueData() throws InterruptedException, ServiceBusException, ExecutionException
+ {
String sessionId = TestUtils.getRandomString();
- this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE);
- TestCommons.testBasicReceiveAndDelete(this.sender, sessionId, this.session);
- }
+ this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE);
+ TestCommons.testBasicReceiveAndDeleteWithValueData(this.sender, sessionId, this.session);
+ }
+
+ @Test
+ public void testBasicReceiveAndDeleteWithBinaryData() throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ String sessionId = TestUtils.getRandomString();
+ this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE);
+ TestCommons.testBasicReceiveAndDeleteWithBinaryData(this.sender, sessionId, this.session);
+ }
+
+ @Test
+ public void testBasicReceiveAndDeleteWithLargeBinaryData() throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ String sessionId = TestUtils.getRandomString();
+ this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE);
+ TestCommons.testBasicReceiveAndDeleteWithLargeBinaryData(this.sender, sessionId, this.session);
+ }
+
+ @Test
+ public void testBasicReceiveAndDeleteWithSequenceData() throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ String sessionId = TestUtils.getRandomString();
+ this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE);
+ TestCommons.testBasicReceiveAndDeleteWithSequenceData(this.sender, sessionId, this.session);
+ }
@Test
public void testBasicReceiveBatchAndDelete() throws InterruptedException, ServiceBusException, ExecutionException
{
String sessionId = TestUtils.getRandomString();
this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.RECEIVEANDDELETE);
- TestCommons.testBasicReceiveBatchAndDelete(this.sender, sessionId, this.session);
+ TestCommons.testBasicReceiveBatchAndDelete(this.sender, sessionId, this.session, this.isEntityPartitioned());
}
@Test
@@ -158,7 +182,7 @@ public void testBasicReceiveBatchAndComplete() throws InterruptedException, Serv
{
String sessionId = TestUtils.getRandomString();
this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.PEEKLOCK);
- TestCommons.testBasicReceiveBatchAndComplete(this.sender, sessionId, this.session);
+ TestCommons.testBasicReceiveBatchAndComplete(this.sender, sessionId, this.session, this.isEntityPartitioned());
}
@Test
@@ -190,7 +214,7 @@ public void testPeekMessageBatch() throws InterruptedException, ServiceBusExcept
{
String sessionId = TestUtils.getRandomString();
this.session = ClientFactory.acceptSessionFromEntityPath(this.factory, this.receiveEntityPath, sessionId, ReceiveMode.PEEKLOCK);
- TestCommons.testPeekMessageBatch(this.sender, sessionId, this.session);
+ TestCommons.testPeekMessageBatch(this.sender, sessionId, this.session, this.isEntityPartitioned());
}
@Test
@@ -306,16 +330,20 @@ public void testAcceptSessionTimeoutShouldNotLockSession() throws InterruptedExc
}
@Test
- public void testRequestResponseLinkRequestLimit() throws InterruptedException, ServiceBusException
+ public void testRequestResponseLinkRequestLimit() throws InterruptedException, ServiceBusException, ExecutionException
{
int limitToTest = 5000;
String sessionId = TestUtils.getRandomString();
this.session = ClientFactory.acceptSessionFromEntityPath(TestUtils.getNamespaceEndpointURI(), this.receiveEntityPath, sessionId, TestUtils.getClientSettings(), ReceiveMode.PEEKLOCK);
+ CompletableFuture[] futures = new CompletableFuture[limitToTest];
for(int i=0; i future = this.session.renewSessionLockAsync();
+ futures[i] = future;
}
+ CompletableFuture.allOf(futures).get();
+
this.session.renewSessionLock();
}
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientSessionTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientSessionTests.java
index 5c3cd73b51adc..fc98508df677f 100644
--- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientSessionTests.java
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SubscriptionClientSessionTests.java
@@ -7,7 +7,7 @@ public String getEntityNamePrefix() {
@Override
public boolean isEntityQueue() {
- return true;
+ return false;
}
@Override
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java
index 85058292d4d26..79c4b7ac54635 100644
--- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestCommons.java
@@ -40,38 +40,120 @@ public static void testBasicSendBatch(IMessageSender sender) throws InterruptedE
sender.sendBatch(messages);
}
- public static void testBasicReceiveAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
- {
- String messageId = UUID.randomUUID().toString();
- Message message = new Message("AMQP message");
- message.setMessageId(messageId);
- if(sessionId != null)
- {
- message.setSessionId(sessionId);
- }
- sender.send(message);
-
- IMessage receivedMessage = receiver.receive();
- Assert.assertNotNull("Message not received", receivedMessage);
- Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId());
- receivedMessage = receiver.receive(SHORT_WAIT_TIME);
- Assert.assertNull("Message received again", receivedMessage);
+ public static void testBasicReceiveAndDeleteWithValueData(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ String messageData = "testBasicReceiveAndDeleteWithValueData";
+ String messageId = UUID.randomUUID().toString();
+ Message message = new Message(MessageBody.fromValueData(messageData));
+ message.setMessageId(messageId);
+ if(sessionId != null)
+ {
+ message.setSessionId(sessionId);
+ }
+
+ sender.send(message);
+
+ IMessage receivedMessage = receiver.receive();
+ Assert.assertNotNull("Message not received", receivedMessage);
+ Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId());
+ Assert.assertEquals("Message Body Type did not match", MessageBodyType.VALUE, receivedMessage.getMessageBody().getBodyType());
+ Assert.assertEquals("Message content did not match", messageData, receivedMessage.getMessageBody().getValueData());
+ receivedMessage = receiver.receive(SHORT_WAIT_TIME);
+ Assert.assertNull("Message received again", receivedMessage);
+ }
+
+ public static void testBasicReceiveAndDeleteWithBinaryData(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ testBasicReceiveAndDeleteWithBinaryData(sender, sessionId, receiver, 64);
}
- public static void testBasicReceiveBatchAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
+ public static void testBasicReceiveAndDeleteWithLargeBinaryData(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ testBasicReceiveAndDeleteWithBinaryData(sender, sessionId, receiver, 64 * 1024);
+ }
+
+ private static void testBasicReceiveAndDeleteWithBinaryData(IMessageSender sender, String sessionId, IMessageReceiver receiver, int messageSize) throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ String messageId = UUID.randomUUID().toString();
+ byte[] binaryData = new byte[messageSize];
+ for(int i=0; i< binaryData.length; i++)
+ {
+ binaryData[i] = (byte)i;
+ }
+ Message message = new Message(Utils.fromBinay(binaryData));
+ message.setMessageId(messageId);
+ if(sessionId != null)
+ {
+ message.setSessionId(sessionId);
+ }
+
+ sender.send(message);
+
+ IMessage receivedMessage = receiver.receive();
+ Assert.assertNotNull("Message not received", receivedMessage);
+ Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId());
+ Assert.assertEquals("Message Body Type did not match", MessageBodyType.BINARY, receivedMessage.getMessageBody().getBodyType());
+ Assert.assertArrayEquals("Message content did not match", binaryData, Utils.getDataFromMessageBody(receivedMessage.getMessageBody()));
+ receivedMessage = receiver.receive(SHORT_WAIT_TIME);
+ Assert.assertNull("Message received again", receivedMessage);
+ }
+
+ public static void testBasicReceiveAndDeleteWithSequenceData(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
+ {
+ String messageId = UUID.randomUUID().toString();
+ List sequence = new ArrayList();
+ sequence.add("azure");
+ sequence.add("servicebus");
+ sequence.add("messaging");
+ Message message = new Message(Utils.fromSequence(sequence));
+ message.setMessageId(messageId);
+ if(sessionId != null)
+ {
+ message.setSessionId(sessionId);
+ }
+ sender.send(message);
+
+ IMessage receivedMessage = receiver.receive();
+ Assert.assertNotNull("Message not received", receivedMessage);
+ Assert.assertEquals("Message Id did not match", messageId, receivedMessage.getMessageId());
+ Assert.assertEquals("Message Body Type did not match", MessageBodyType.SEQUENCE, receivedMessage.getMessageBody().getBodyType());
+ Assert.assertArrayEquals("Message content did not match", sequence.toArray(new String[] {}), Utils.getSequenceFromMessageBody(receivedMessage.getMessageBody()).toArray(new String[] {}));
+ receivedMessage = receiver.receive(SHORT_WAIT_TIME);
+ Assert.assertNull("Message received again", receivedMessage);
+ }
+
+ public static void testBasicReceiveBatchAndDelete(IMessageSender sender, String sessionId, IMessageReceiver receiver, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException, ExecutionException
{
int numMessages = 10;
- List messages = new ArrayList();
- for(int i=0; i messages = new ArrayList();
+ for(int i=0; i receivedMessages = receiver.receiveBatch(numMessages);
@@ -169,30 +251,55 @@ public static void testBasicReceiveAndRenewLock(IMessageSender sender, String se
receiver.complete(receivedMessage.getLockToken());
}
- public static void testBasicReceiveAndRenewLockBatch(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
+ public static void testBasicReceiveAndRenewLockBatch(IMessageSender sender, String sessionId, IMessageReceiver receiver, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException, ExecutionException
{
- int numMessages = 10;
- List messages = new ArrayList();
- for(int i=0; i messages = new ArrayList();
+ for(int i=0; i totalReceivedMessages = new ArrayList<>();
+ ArrayList totalReceivedMessages = new ArrayList<>();
- Collection receivedMessages = receiver.receiveBatch(numMessages);
- while(receivedMessages != null && receivedMessages.size() > 0 && totalReceivedMessages.size() < numMessages)
+ if(isEntityPartitioned && sessionId == null)
{
- totalReceivedMessages.addAll(receivedMessages);
- receivedMessages = receiver.receiveBatch(numMessages);
+ Collection receivedMessages = receiver.receiveBatch(1);
+ Assert.assertTrue("Messages not received", receivedMessages != null && receivedMessages.size() > 0);
+ totalReceivedMessages.addAll(receivedMessages);
+ }
+ else
+ {
+ Collection receivedMessages = receiver.receiveBatch(numMessages);
+ while(receivedMessages != null && receivedMessages.size() > 0 && totalReceivedMessages.size() < numMessages)
+ {
+ totalReceivedMessages.addAll(receivedMessages);
+ receivedMessages = receiver.receiveBatch(numMessages);
+ }
+ Assert.assertEquals("All messages not received", numMessages, totalReceivedMessages.size());
}
- Assert.assertEquals("All messages not received", numMessages, totalReceivedMessages.size());
ArrayList oldLockTimes = new ArrayList();
for(IMessage message : totalReceivedMessages)
@@ -215,20 +322,36 @@ public static void testBasicReceiveAndRenewLockBatch(IMessageSender sender, Stri
}
}
- public static void testBasicReceiveBatchAndComplete(IMessageSender sender, String sessionId, IMessageReceiver receiver) throws InterruptedException, ServiceBusException, ExecutionException
+ public static void testBasicReceiveBatchAndComplete(IMessageSender sender, String sessionId, IMessageReceiver receiver, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException, ExecutionException
{
int numMessages = 10;
- List messages = new ArrayList();
- for(int i=0; i messages = new ArrayList();
+ for(int i=0; i receivedMessages = receiver.receiveBatch(numMessages);
@@ -265,7 +388,16 @@ public static void testSendSceduledMessageAndReceive(IMessageSender sender, Stri
sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling));
sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling));
- Thread.sleep(secondsToWaitBeforeScheduling * 1000);
+ if(sessionId == null)
+ {
+ Thread.sleep(secondsToWaitBeforeScheduling * 1000 * 2);
+ }
+ else
+ {
+ Thread.sleep(secondsToWaitBeforeScheduling * 1000);
+ ((IMessageSession)receiver).renewSessionLock();
+ Thread.sleep(secondsToWaitBeforeScheduling * 1000);
+ }
Collection allReceivedMessages = new LinkedList();
Collection receivedMessages = receiver.receiveBatch(10);
@@ -307,7 +439,16 @@ public static void testSendSceduledMessageAndCancel(IMessageSender sender, Strin
sender.scheduleMessage(message1, Instant.now().plusSeconds(secondsToWaitBeforeScheduling));
long sequnceNumberMsg2 = sender.scheduleMessage(message2, Instant.now().plusSeconds(secondsToWaitBeforeScheduling));
sender.cancelScheduledMessage(sequnceNumberMsg2);
- Thread.sleep(secondsToWaitBeforeScheduling * 1000);
+ if(sessionId == null)
+ {
+ Thread.sleep(secondsToWaitBeforeScheduling * 1000 * 2);
+ }
+ else
+ {
+ Thread.sleep(secondsToWaitBeforeScheduling * 1000);
+ ((IMessageSession)receiver).renewSessionLock();
+ Thread.sleep(secondsToWaitBeforeScheduling * 1000);
+ }
Collection allReceivedMessages = new LinkedList();
Collection receivedMessages = receiver.receiveBatch(10);
@@ -346,12 +487,20 @@ public static void testPeekMessage(IMessageSender sender, String sessionId, IMes
Assert.assertEquals("Peek with sequence number failed.", firstMessageSequenceNumber, peekedMessage5.getSequenceNumber());
}
- public static void testPeekMessageBatch(IMessageSender sender, String sessionId, IMessageBrowser browser) throws InterruptedException, ServiceBusException
- {
+ public static void testPeekMessageBatch(IMessageSender sender, String sessionId, IMessageBrowser browser, boolean isEntityPartitioned) throws InterruptedException, ServiceBusException
+ {
+ String partitionKey = "pkey1";
Message message = new Message("AMQP Scheduled message");
if(sessionId != null)
{
- message.setSessionId(sessionId);
+ message.setSessionId(sessionId);
+ }
+ else
+ {
+ if(isEntityPartitioned)
+ {
+ message.setPartitionKey(partitionKey);
+ }
}
sender.send(message);
message = new Message("AMQP Scheduled message2");
@@ -359,6 +508,13 @@ public static void testPeekMessageBatch(IMessageSender sender, String sessionId,
{
message.setSessionId(sessionId);
}
+ else
+ {
+ if(isEntityPartitioned)
+ {
+ message.setPartitionKey(partitionKey);
+ }
+ }
sender.send(message);
Thread.sleep(5000);
Collection peekedMessages = browser.peekBatch(10);
diff --git a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilderTests.java b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilderTests.java
index 77c9c3ca3868f..be45178c24f72 100644
--- a/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilderTests.java
+++ b/azure-servicebus/src/test/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilderTests.java
@@ -7,8 +7,8 @@
public class ConnectionStringBuilderTests {
@Test
- public void test() {
- String connectionString = "Endpoint=sb://test.servicebus.windows.net/;SharedAccessSignature=SharedAccessSignature sr=amqp%3A%2F%2test.servicebus.windows.net%2topic";
+ public void ConnectionStringBuilderTest() {
+ String connectionString = "Endpoint=sb://test.servicebus.windows.net/;SharedAccessSignatureToken=SharedAccessSignature sr=amqp%3A%2F%2test.servicebus.windows.net%2topic";
ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString);
assertEquals("SharedAccessSignature sr=amqp%3A%2F%2test.servicebus.windows.net%2topic", builder.getSharedAccessSignatureToken());
diff --git a/pom.xml b/pom.xml
index 837f01de17d3c..83a5985fae78f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
https://github.com/Azure/azure-service-bus-java
- 0.29.0
+ 0.31.0
4.12
1.7.0
2.0.0-PREVIEW-8-SNAPSHOT