Skip to content

Commit

Permalink
Expose way to get EventHubConnection (#6138)
Browse files Browse the repository at this point in the history
* Expose way to create EventHubConnection.

* Rename property to FullyQualifiedDomainName

* Only checking for connectionString if shared connection is not used.
  • Loading branch information
conniey authored Nov 1, 2019
1 parent 8cee5a3 commit ca30bd3
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,12 @@ class EventHubAsyncClient implements Closeable {
}

/**
* Returns the fully qualified namespace of this Event Hub.
* Returns the fully qualified domain name (FQDN) of this Event Hub.
*
* @return The fully qualified namespace of this Event Hub.
* @return The fully qualified domain name (FQDN) of this Event Hub.
*/
public String getFullyQualifiedNamespace() {
// to be implemented
return null;
String getFullyQualifiedDomainName() {
return connection.getFullyQualifiedDomainName();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.RetryOptions;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.util.IterableStream;
Expand All @@ -27,11 +26,9 @@ class EventHubClient implements Closeable {
private final EventHubAsyncClient client;
private final RetryOptions retry;

EventHubClient(EventHubAsyncClient client, ConnectionOptions connectionOptions) {
Objects.requireNonNull(connectionOptions, "'connectionOptions' cannot be null.");

EventHubClient(EventHubAsyncClient client, RetryOptions retry) {
this.client = Objects.requireNonNull(client, "'client' cannot be null.");
this.retry = connectionOptions.getRetry();
this.retry = retry;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,17 +345,45 @@ public EventHubClientBuilder consumerOptions(EventHubConsumerOptions consumerOpt
return this;
}

/**
* Creates a new {@link EventHubConnection} based on the options set on this builder. Every time
* {@code buildConnection()} is invoked, a new instance of {@link EventHubConnection} is created.
*
* @return A new {@link EventHubConnection} with the configured options.
*
* @throws IllegalArgumentException if the credentials have not been set using either {@link
* #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy is specified
* but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
*/
public EventHubConnection buildConnection() {
final MessageSerializer messageSerializer = new EventHubMessageSerializer();
return buildConnection(messageSerializer);
}

/**
* Creates a new {@link EventHubConsumerAsyncClient} based on the options set on this builder. Every time
* {@code buildAsyncConsumer()} is invoked, a new instance of {@link EventHubConsumerAsyncClient} is created.
*
* @return A new {@link EventHubConsumerAsyncClient} with the configured options.
*
* @throws IllegalArgumentException If shared connection is not used and the credentials have not been set using
* either {@link #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. If
* {@link #startingPosition(EventPosition)} or {@link #consumerGroup(String)} have not been set.
* Or, if a proxy is specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
*/
public EventHubConsumerAsyncClient buildAsyncConsumer() {
final EventHubConsumerOptions options = consumerOptions != null
? consumerOptions
: new EventHubConsumerOptions();

if (ImplUtils.isNullOrEmpty(consumerGroup)) {
throw logger.logExceptionAsError(new IllegalArgumentException("'consumerGroup' cannot be null or an empty "
+ "string. using EventHubClientBuilder.consumerGroup(String)"));
} else if (startingPosition == null) {
throw logger.logExceptionAsError(new NullPointerException("'startingPosition' has not been set. Set it "
+ "using EventHubClientBuilder.consumerGroup(String)"));
}

return buildAsyncClient().createConsumer(consumerGroup, startingPosition, options);
}

Expand All @@ -364,6 +392,11 @@ public EventHubConsumerAsyncClient buildAsyncConsumer() {
* {@code buildConsumer()} is invoked, a new instance of {@link EventHubConsumerClient} is created.
*
* @return A new {@link EventHubConsumerClient} with the configured options.
*
* @throws IllegalArgumentException If shared connection is not used and the credentials have not been set using
* either {@link #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. If
* {@link #startingPosition(EventPosition)} or {@link #consumerGroup(String)} have not been set.
* Or, if a proxy is specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
*/
public EventHubConsumerClient buildConsumer() {
final EventHubConsumerOptions options = consumerOptions != null
Expand All @@ -379,9 +412,9 @@ public EventHubConsumerClient buildConsumer() {
*
* @return A new {@link EventHubProducerAsyncClient} instance with all the configured options.
*
* @throws IllegalArgumentException if the credentials have not been set using either {@link
* #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy is specified
* but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
* @throws IllegalArgumentException If shared connection is not used and the credentials have not been set using
* either {@link #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy
* is specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
*/
public EventHubProducerAsyncClient buildAsyncProducer() {
return buildAsyncClient().createProducer();
Expand All @@ -393,9 +426,9 @@ public EventHubProducerAsyncClient buildAsyncProducer() {
*
* @return A new {@link EventHubProducerClient} instance with all the configured options.
*
* @throws IllegalArgumentException if the credentials have not been set using either {@link
* #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy is specified
* but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
* @throws IllegalArgumentException If shared connection is not used and the credentials have not been set using
* either {@link #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy
* is specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
*/
public EventHubProducerClient buildProducer() {
return buildClient().createProducer();
Expand Down Expand Up @@ -427,8 +460,16 @@ public EventHubProducerClient buildProducer() {
* but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
*/
EventHubAsyncClient buildAsyncClient() {
final ConnectionOptions connectionOptions = getConnectionOptions();
return buildAsyncClient(connectionOptions);
final MessageSerializer messageSerializer = new EventHubMessageSerializer();

final boolean isSharedConnection = eventHubConnection != null;
final EventHubConnection connection = isSharedConnection
? eventHubConnection
: buildConnection(messageSerializer);

final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));

return new EventHubAsyncClient(connection, tracerProvider, messageSerializer, isSharedConnection);
}

/**
Expand Down Expand Up @@ -457,34 +498,27 @@ EventHubAsyncClient buildAsyncClient() {
* but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
*/
EventHubClient buildClient() {
final ConnectionOptions connectionOptions = getConnectionOptions();
final EventHubAsyncClient client = buildAsyncClient(connectionOptions);
final EventHubAsyncClient client = buildAsyncClient();

return new EventHubClient(client, connectionOptions);
return new EventHubClient(client, retryOptions);
}

private EventHubAsyncClient buildAsyncClient(ConnectionOptions connectionOptions) {
private EventHubConnection buildConnection(MessageSerializer messageSerializer) {
final ConnectionOptions connectionOptions = getConnectionOptions();
final TokenManagerProvider tokenManagerProvider = new AzureTokenManagerProvider(
connectionOptions.getAuthorizationType(), connectionOptions.getHostname(),
ClientConstants.AZURE_ACTIVE_DIRECTORY_SCOPE);
final ReactorProvider provider = new ReactorProvider();
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider);
final MessageSerializer messageSerializer = new EventHubMessageSerializer();

final Mono<EventHubAmqpConnection> connectionMono = Mono.fromCallable(() -> {
final String connectionId = StringUtil.getRandomString("MF");

return new EventHubReactorAmqpConnection(connectionId, connectionOptions, provider, handlerProvider,
tokenManagerProvider, messageSerializer);
});

final boolean isSharedConnection = eventHubConnection != null;
final EventHubConnection connection = isSharedConnection
? eventHubConnection
: new EventHubConnection(connectionMono, connectionOptions);

final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));

return new EventHubAsyncClient(connection, tracerProvider, messageSerializer, isSharedConnection);
return new EventHubConnection(connectionMono, connectionOptions);
}

private ConnectionOptions getConnectionOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class EventHubPartitionAsyncConsumer implements Closeable {
.doOnSubscribe(subscription -> {
AmqpReceiveLink existingLink = RECEIVE_LINK_FIELD_UPDATER.get(this);
if (existingLink == null) {
logger.warning("AmqpReceiveLink not set yet.");
logger.info("AmqpReceiveLink not set yet.");
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class EventProcessor {
EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient();
this.partitionBasedLoadBalancer =
new PartitionBasedLoadBalancer(this.eventProcessorStore, eventHubAsyncClient,
eventHubAsyncClient.getFullyQualifiedNamespace(), eventHubAsyncClient.getEventHubName(),
eventHubAsyncClient.getFullyQualifiedDomainName(), eventHubAsyncClient.getEventHubName(),
consumerGroup, identifier, TimeUnit.MINUTES.toSeconds(1), partitionPumpManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ protected String getTestName() {
protected void beforeTest() {
builder = createBuilder()
.transportType(transportType);
client = builder.buildAsyncClient();
EventHubConnection connection = builder.buildConnection();
client = new EventHubClientBuilder()
.connection(connection)
.buildAsyncClient();

if (HAS_PUSHED_EVENTS.getAndSet(true)) {
logger.warning("Already pushed events to partition. Skipping.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,19 @@ protected String getTestName() {

@Override
protected void beforeTest() {
client = createBuilder().buildAsyncClient();
EventHubConnection connection = createBuilder().buildConnection();
client = createBuilder()
.connection(connection)
.buildAsyncClient();

if (!HAS_PUSHED_EVENTS.getAndSet(true)) {
final SendOptions options = new SendOptions().setPartitionId(PARTITION_ID);
testData = setupEventTestData(client.createProducer(), NUMBER_OF_EVENTS, options);

// Receiving back those events we sent so we have something to compare to.
logger.info("Receiving the events we sent.");
final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME,
EventPosition.fromEnqueuedTime(testData.getEnqueuedTime()));
final EventHubConsumerAsyncClient consumer = client
.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, EventPosition.fromEnqueuedTime(testData.getEnqueuedTime()));
final List<EventData> receivedEvents;
try {
receivedEvents = consumer.receive(PARTITION_ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testWithSimplePartitionProcessor() throws Exception {
TracerProvider tracerProvider = new TracerProvider(tracers);

when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient);
when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns");
when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns");
when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh");
when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1"));
when(eventHubAsyncClient
Expand Down Expand Up @@ -180,7 +180,7 @@ public void testWithFaultyPartitionProcessor() throws Exception {
final List<Tracer> tracers = Collections.singletonList(tracer1);
TracerProvider tracerProvider = new TracerProvider(tracers);
when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient);
when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns");
when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns");
when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh");
when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1"));
when(eventHubAsyncClient
Expand Down Expand Up @@ -231,7 +231,7 @@ public void testErrorProcessSpans() throws Exception {
final List<Tracer> tracers = Collections.singletonList(tracer1);
TracerProvider tracerProvider = new TracerProvider(tracers);
when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient);
when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns");
when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns");
when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh");
when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1"));
when(eventHubAsyncClient
Expand Down Expand Up @@ -291,7 +291,7 @@ public void testProcessSpans() throws Exception {
final List<Tracer> tracers = Collections.singletonList(tracer1);
TracerProvider tracerProvider = new TracerProvider(tracers);
when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient);
when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns");
when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns");
when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh");
when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1"));
when(eventHubAsyncClient
Expand Down Expand Up @@ -356,7 +356,7 @@ public void testWithMultiplePartitions() throws Exception {

when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient);
when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1", "2", "3"));
when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns");
when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns");
when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh");
when(eventHubAsyncClient
.createConsumer(anyString(), any(EventPosition.class), any(EventHubConsumerOptions.class)))
Expand Down

0 comments on commit ca30bd3

Please sign in to comment.