Skip to content

Commit

Permalink
WebSocket implementation no proxy support (Azure#275)
Browse files Browse the repository at this point in the history
* WebSocket implementation no proxy support (Azure#275)
  • Loading branch information
apraovjr authored and binzywu committed Sep 12, 2018
1 parent fbfcd13 commit 0d14a39
Show file tree
Hide file tree
Showing 21 changed files with 1,278 additions and 1,106 deletions.
7 changes: 6 additions & 1 deletion azure-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
<artifactId>proton-j</artifactId>
<version>${proton-j-version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>qpid-proton-j-extensions</artifactId>
<version>${qpid-proton-j-extensions-version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
Expand Down Expand Up @@ -92,5 +97,5 @@
<artifactId>async-http-client</artifactId>
<version>2.5.2</version>
</dependency>
</dependencies>
</dependencies>
</project>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.microsoft.azure.servicebus.primitives.ClientConstants;
import com.microsoft.azure.servicebus.primitives.RetryPolicy;
import com.microsoft.azure.servicebus.primitives.TransportType;
import com.microsoft.azure.servicebus.security.TokenProvider;

/**
Expand All @@ -16,6 +17,7 @@ public class ClientSettings {
private TokenProvider tokenProvider;
private RetryPolicy retryPolicy;
private Duration operationTimeout;
private TransportType transportType;

/**
* Creates a new instance with the given token provider, default retry policy and default operation timeout.
Expand All @@ -25,7 +27,7 @@ public class ClientSettings {
*/
public ClientSettings(TokenProvider tokenProvider)
{
this(tokenProvider, RetryPolicy.getDefault(), Duration.ofSeconds(ClientConstants.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS));
this(tokenProvider, RetryPolicy.getDefault(), Duration.ofSeconds(ClientConstants.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS), TransportType.AMQP);
}

/**
Expand All @@ -34,11 +36,12 @@ public ClientSettings(TokenProvider tokenProvider)
* @param retryPolicy {@link RetryPolicy} instance
* @param operationTimeout default operation timeout to be used for all client operations. Client can override this value by explicitly specifying a timeout in the operation.
*/
public ClientSettings(TokenProvider tokenProvider, RetryPolicy retryPolicy, Duration operationTimeout)
public ClientSettings(TokenProvider tokenProvider, RetryPolicy retryPolicy, Duration operationTimeout, TransportType transportType)
{
this.tokenProvider = tokenProvider;
this.retryPolicy = retryPolicy;
this.operationTimeout = operationTimeout;
this.transportType = transportType;
}

/**
Expand Down Expand Up @@ -67,4 +70,6 @@ public Duration getOperationTimeout()
{
return operationTimeout;
}

public TransportType getTransportType() { return transportType; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,7 @@ else if (c.isEmpty())
return null;
else
return convertAmqpMessagesWithDeliveryTagsToBrokeredMessages(c);
});

});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ final class MessageSender extends InitializableEntity implements IMessageSender
private CoreMessageSender internalSender = null;
private boolean isInitialized = false;
private URI namespaceEndpointURI;
private ClientSettings clientSettings;
private ClientSettings clientSettings;

private MessageSender() {
super(StringUtil.getShortRandomString());
Expand Down Expand Up @@ -229,4 +229,4 @@ public void cancelScheduledMessage(long sequenceNumber) throws InterruptedExcept
MessagingFactory getMessagingFactory() {
return this.messagingFactory;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ private AmqpConstants() { }
public static final int AMQP_BATCH_MESSAGE_FORMAT = 0x80013700; // 2147563264L;

public static final int MAX_FRAME_SIZE = 65536;
public static final int WEBSOCKET_MAX_FRAME_SIZE = 4096;

public static final String MANAGEMENT_NODE_ADDRESS_SEGMENT = "$management";
public static final String CBS_NODE_ADDRESS_SEGMENT = "$cbs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.apache.qpid.proton.reactor.Handshaker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,7 +27,7 @@

// ServiceBus <-> ProtonReactor interaction handles all
// amqp_connection/transport related events from reactor
public final class ConnectionHandler extends BaseHandler
public class ConnectionHandler extends BaseHandler
{
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
private final IAmqpConnection messagingFactory;
Expand Down Expand Up @@ -55,12 +56,27 @@ public void onConnectionInit(Event event)
connection.open();
}

public void addTransportLayers(final Event event, final TransportInternal transport)
{
}
public int getPort()
{
return ClientConstants.AMQPS_PORT;
}
public int getMaxFrameSize()
{

return AmqpConstants.MAX_FRAME_SIZE;
}

@Override
public void onConnectionBound(Event event)
{
TRACE_LOGGER.debug("onConnectionBound: hostname:{}", event.getConnection().getHostname());
Transport transport = event.getTransport();

this.addTransportLayers(event, (TransportInternal) transport);

SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
transport.ssl(domain);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void onConnectionLocalOpen(Event event)
}

Transport transport = Proton.transport();
transport.setMaxFrameSize(AmqpConstants.MAX_FRAME_SIZE);
transport.setMaxFrameSize(AmqpConstants.WEBSOCKET_MAX_FRAME_SIZE);
transport.sasl();
transport.setEmitFlowEventOnSend(false);
transport.bind(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@
import java.io.IOException;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.ReactorOptions;

public final class ProtonUtil
{
private ProtonUtil()
{
}

public static Reactor reactor(ReactorHandler reactorHandler) throws IOException
public static Reactor reactor(ReactorHandler reactorHandler, final int maxFrameSize) throws IOException
{
Reactor reactor = Proton.reactor(reactorHandler);
final ReactorOptions reactorOptions = new ReactorOptions();
reactorOptions.setMaxFrameSize(maxFrameSize);

Reactor reactor = Proton.reactor(reactorOptions, reactorHandler);
reactor.setGlobalHandler(new CustomIOHandler());
reactor.getGlobalHandler().add(new LoggingHandler());
return reactor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.microsoft.azure.servicebus.amqp;

import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl;
import com.microsoft.azure.servicebus.primitives.ClientConstants;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.impl.TransportInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketConnectionHandler extends ConnectionHandler {

private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);

public WebSocketConnectionHandler(IAmqpConnection messagingFactory)
{
super(messagingFactory);
}

@Override
public void addTransportLayers(final Event event, final TransportInternal transport)
{
final WebSocketImpl webSocket = new WebSocketImpl();
webSocket.configure(
event.getConnection().getHostname(),
"/$servicebus/websocket",
null,
0,
"AMQPWSB10",
null,
null);

transport.addTransportLayer(webSocket);

if (TRACE_LOGGER.isInfoEnabled())
{
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + event.getConnection().getHostname() +"]");
}
}

@Override
public int getPort()
{
return ClientConstants.HTTPS_PORT;
}

@Override
public int getMaxFrameSize()
{
// This is the current limitation of https://github.com/Azure/qpid-proton-j-extensions
// once, this library enables larger frames - this property can be removed.
return 4 * 1024;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private ClientConstants() { }
public static final UUID ZEROLOCKTOKEN = new UUID(0l, 0l);

public final static int AMQPS_PORT = 5671;
public final static int HTTPS_PORT = 443;
public final static int MAX_PARTITION_KEY_LENGTH = 128;

public final static Symbol SERVER_BUSY_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":server-busy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ConnectionStringBuilder
private final static String SHARED_ACCESS_KEY_CONFIG_NAME = "SharedAccessKey";
private final static String ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME = "SharedAccessSignature";
private final static String SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME = "SharedAccessSignatureToken";
private final static String TRANSPORT_TYPE_CONFIG_NAME = "TransportType";
private final static String ENTITY_PATH_CONFIG_NAME = "EntityPath";
private final static String OPERATION_TIMEOUT_CONFIG_NAME = "OperationTimeout";
private final static String RETRY_POLICY_CONFIG_NAME = "RetryPolicy";
Expand All @@ -56,7 +57,7 @@ public class ConnectionStringBuilder

private static final String ALL_KEY_ENUMERATE_REGEX = "(" + HOSTNAME_CONFIG_NAME + "|" + ENDPOINT_CONFIG_NAME + "|" + SHARED_ACCESS_KEY_NAME_CONFIG_NAME
+ "|" + SHARED_ACCESS_KEY_CONFIG_NAME + "|" + SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + "|" + ENTITY_PATH_CONFIG_NAME + "|" + OPERATION_TIMEOUT_CONFIG_NAME
+ "|" + RETRY_POLICY_CONFIG_NAME + "|" + ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + ")";
+ "|" + RETRY_POLICY_CONFIG_NAME + "|" + ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + "|" + TRANSPORT_TYPE_CONFIG_NAME + "|" +")";

private static final String KEYS_WITH_DELIMITERS_REGEX = KEY_VALUE_PAIR_DELIMITER + ALL_KEY_ENUMERATE_REGEX + KEY_VALUE_SEPARATOR;

Expand All @@ -69,6 +70,7 @@ public class ConnectionStringBuilder
private String entityPath;
private Duration operationTimeout;
private RetryPolicy retryPolicy;
private TransportType transportType;

/**
* Default operation timeout if timeout is not specified in the connection string. 30 seconds.
Expand Down Expand Up @@ -304,6 +306,30 @@ public void setRetryPolicy(final RetryPolicy retryPolicy)
this.retryPolicy = retryPolicy;
}


/**
* TransportType on which all the communication for the Service Bus created using this ConnectionString.
* Default value is {@link TransportType#AMQP}.
*
* @return transportType
*/
public TransportType getTransportType()
{
return (this.transportType == null ? TransportType.AMQP : transportType);
}

/**
* Set the TransportType value in the Connection String. If no TransportType is set, this defaults to {@link TransportType#AMQP}.
*
* @param transportType Transport Type
* @return the {@link ConnectionStringBuilder} instance being set.
*/
public ConnectionStringBuilder setTransportType(final TransportType transportType)
{
this.transportType = transportType;
return this;
}

/**
* Returns an inter-operable connection string that can be used to connect to ServiceBus Namespace
* @return connection string
Expand Down Expand Up @@ -356,6 +382,12 @@ public String toString()
KEY_VALUE_SEPARATOR, this.retryPolicy.toString()));
}

if (this.transportType != null)
{
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", KEY_VALUE_PAIR_DELIMITER, TRANSPORT_TYPE_CONFIG_NAME,
KEY_VALUE_SEPARATOR, this.transportType.toString()));
}

this.connectionString = connectionStringBuilder.toString();
}

Expand Down Expand Up @@ -485,6 +517,18 @@ else if (key.equalsIgnoreCase(RETRY_POLICY_CONFIG_NAME))
String.format(Locale.US, "Connection string parameter '%s'='%s' is not recognized",
RETRY_POLICY_CONFIG_NAME, values[valueIndex]));
}
else if (key.equalsIgnoreCase(TRANSPORT_TYPE_CONFIG_NAME))
{
try
{
this.transportType = TransportType.fromString(values[valueIndex]);
} catch (IllegalArgumentException exception)
{
throw new IllegalConnectionStringFormatException(
String.format("Invalid value specified for property '%s' in the ConnectionString.", TRANSPORT_TYPE_CONFIG_NAME),
exception);
}
}
else
{
throw new IllegalConnectionStringFormatException(
Expand Down
Loading

0 comments on commit 0d14a39

Please sign in to comment.