diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java index bd914729b3303..c6eceb1da8757 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java @@ -42,13 +42,12 @@ class EventHubAsyncClient implements Closeable { } /** - * Returns the fully qualified namespace of this Event Hub. + * Returns the fully qualified domain name (FQDN) of this Event Hub. * - * @return The fully qualified namespace of this Event Hub. + * @return The fully qualified domain name (FQDN) of this Event Hub. */ - public String getFullyQualifiedNamespace() { - // to be implemented - return null; + String getFullyQualifiedDomainName() { + return connection.getFullyQualifiedDomainName(); } /** diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java index 5a0abe584c2aa..c000f605d47e2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java @@ -4,7 +4,6 @@ package com.azure.messaging.eventhubs; import com.azure.core.amqp.RetryOptions; -import com.azure.core.amqp.implementation.ConnectionOptions; import com.azure.core.annotation.ReturnType; import com.azure.core.annotation.ServiceMethod; import com.azure.core.util.IterableStream; @@ -27,11 +26,9 @@ class EventHubClient implements Closeable { private final EventHubAsyncClient client; private final RetryOptions retry; - EventHubClient(EventHubAsyncClient client, ConnectionOptions connectionOptions) { - Objects.requireNonNull(connectionOptions, "'connectionOptions' cannot be null."); - + EventHubClient(EventHubAsyncClient client, RetryOptions retry) { this.client = Objects.requireNonNull(client, "'client' cannot be null."); - this.retry = connectionOptions.getRetry(); + this.retry = retry; } /** diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index 74ac0e46d5fda..e90b8f3b2283b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -345,17 +345,45 @@ public EventHubClientBuilder consumerOptions(EventHubConsumerOptions consumerOpt return this; } + /** + * Creates a new {@link EventHubConnection} based on the options set on this builder. Every time + * {@code buildConnection()} is invoked, a new instance of {@link EventHubConnection} is created. + * + * @return A new {@link EventHubConnection} with the configured options. + * + * @throws IllegalArgumentException if the credentials have not been set using either {@link + * #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy is specified + * but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}. + */ + public EventHubConnection buildConnection() { + final MessageSerializer messageSerializer = new EventHubMessageSerializer(); + return buildConnection(messageSerializer); + } + /** * Creates a new {@link EventHubConsumerAsyncClient} based on the options set on this builder. Every time * {@code buildAsyncConsumer()} is invoked, a new instance of {@link EventHubConsumerAsyncClient} is created. * * @return A new {@link EventHubConsumerAsyncClient} with the configured options. + * + * @throws IllegalArgumentException If shared connection is not used and the credentials have not been set using + * either {@link #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. If + * {@link #startingPosition(EventPosition)} or {@link #consumerGroup(String)} have not been set. + * Or, if a proxy is specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}. */ public EventHubConsumerAsyncClient buildAsyncConsumer() { final EventHubConsumerOptions options = consumerOptions != null ? consumerOptions : new EventHubConsumerOptions(); + if (ImplUtils.isNullOrEmpty(consumerGroup)) { + throw logger.logExceptionAsError(new IllegalArgumentException("'consumerGroup' cannot be null or an empty " + + "string. using EventHubClientBuilder.consumerGroup(String)")); + } else if (startingPosition == null) { + throw logger.logExceptionAsError(new NullPointerException("'startingPosition' has not been set. Set it " + + "using EventHubClientBuilder.consumerGroup(String)")); + } + return buildAsyncClient().createConsumer(consumerGroup, startingPosition, options); } @@ -364,6 +392,11 @@ public EventHubConsumerAsyncClient buildAsyncConsumer() { * {@code buildConsumer()} is invoked, a new instance of {@link EventHubConsumerClient} is created. * * @return A new {@link EventHubConsumerClient} with the configured options. + * + * @throws IllegalArgumentException If shared connection is not used and the credentials have not been set using + * either {@link #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. If + * {@link #startingPosition(EventPosition)} or {@link #consumerGroup(String)} have not been set. + * Or, if a proxy is specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}. */ public EventHubConsumerClient buildConsumer() { final EventHubConsumerOptions options = consumerOptions != null @@ -379,9 +412,9 @@ public EventHubConsumerClient buildConsumer() { * * @return A new {@link EventHubProducerAsyncClient} instance with all the configured options. * - * @throws IllegalArgumentException if the credentials have not been set using either {@link - * #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy is specified - * but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}. + * @throws IllegalArgumentException If shared connection is not used and the credentials have not been set using + * either {@link #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy + * is specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}. */ public EventHubProducerAsyncClient buildAsyncProducer() { return buildAsyncClient().createProducer(); @@ -393,9 +426,9 @@ public EventHubProducerAsyncClient buildAsyncProducer() { * * @return A new {@link EventHubProducerClient} instance with all the configured options. * - * @throws IllegalArgumentException if the credentials have not been set using either {@link - * #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy is specified - * but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}. + * @throws IllegalArgumentException If shared connection is not used and the credentials have not been set using + * either {@link #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy + * is specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}. */ public EventHubProducerClient buildProducer() { return buildClient().createProducer(); @@ -427,8 +460,16 @@ public EventHubProducerClient buildProducer() { * but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}. */ EventHubAsyncClient buildAsyncClient() { - final ConnectionOptions connectionOptions = getConnectionOptions(); - return buildAsyncClient(connectionOptions); + final MessageSerializer messageSerializer = new EventHubMessageSerializer(); + + final boolean isSharedConnection = eventHubConnection != null; + final EventHubConnection connection = isSharedConnection + ? eventHubConnection + : buildConnection(messageSerializer); + + final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class)); + + return new EventHubAsyncClient(connection, tracerProvider, messageSerializer, isSharedConnection); } /** @@ -457,19 +498,19 @@ EventHubAsyncClient buildAsyncClient() { * but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}. */ EventHubClient buildClient() { - final ConnectionOptions connectionOptions = getConnectionOptions(); - final EventHubAsyncClient client = buildAsyncClient(connectionOptions); + final EventHubAsyncClient client = buildAsyncClient(); - return new EventHubClient(client, connectionOptions); + return new EventHubClient(client, retryOptions); } - private EventHubAsyncClient buildAsyncClient(ConnectionOptions connectionOptions) { + private EventHubConnection buildConnection(MessageSerializer messageSerializer) { + final ConnectionOptions connectionOptions = getConnectionOptions(); final TokenManagerProvider tokenManagerProvider = new AzureTokenManagerProvider( connectionOptions.getAuthorizationType(), connectionOptions.getHostname(), ClientConstants.AZURE_ACTIVE_DIRECTORY_SCOPE); final ReactorProvider provider = new ReactorProvider(); final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider); - final MessageSerializer messageSerializer = new EventHubMessageSerializer(); + final Mono connectionMono = Mono.fromCallable(() -> { final String connectionId = StringUtil.getRandomString("MF"); @@ -477,14 +518,7 @@ private EventHubAsyncClient buildAsyncClient(ConnectionOptions connectionOptions tokenManagerProvider, messageSerializer); }); - final boolean isSharedConnection = eventHubConnection != null; - final EventHubConnection connection = isSharedConnection - ? eventHubConnection - : new EventHubConnection(connectionMono, connectionOptions); - - final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class)); - - return new EventHubAsyncClient(connection, tracerProvider, messageSerializer, isSharedConnection); + return new EventHubConnection(connectionMono, connectionOptions); } private ConnectionOptions getConnectionOptions() { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java index 7be3dd82600d5..9e45e1daf1b8a 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumer.java @@ -100,7 +100,7 @@ class EventHubPartitionAsyncConsumer implements Closeable { .doOnSubscribe(subscription -> { AmqpReceiveLink existingLink = RECEIVE_LINK_FIELD_UPDATER.get(this); if (existingLink == null) { - logger.warning("AmqpReceiveLink not set yet."); + logger.info("AmqpReceiveLink not set yet."); return; } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessor.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessor.java index 93a0f937d4090..cf0d858b1f90e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessor.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessor.java @@ -71,7 +71,7 @@ public class EventProcessor { EventHubAsyncClient eventHubAsyncClient = eventHubClientBuilder.buildAsyncClient(); this.partitionBasedLoadBalancer = new PartitionBasedLoadBalancer(this.eventProcessorStore, eventHubAsyncClient, - eventHubAsyncClient.getFullyQualifiedNamespace(), eventHubAsyncClient.getEventHubName(), + eventHubAsyncClient.getFullyQualifiedDomainName(), eventHubAsyncClient.getEventHubName(), consumerGroup, identifier, TimeUnit.MINUTES.toSeconds(1), partitionPumpManager); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java index a8a6dac207d03..935ba9b82d3c1 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java @@ -70,7 +70,10 @@ protected String getTestName() { protected void beforeTest() { builder = createBuilder() .transportType(transportType); - client = builder.buildAsyncClient(); + EventHubConnection connection = builder.buildConnection(); + client = new EventHubClientBuilder() + .connection(connection) + .buildAsyncClient(); if (HAS_PUSHED_EVENTS.getAndSet(true)) { logger.warning("Already pushed events to partition. Skipping."); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventPositionIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventPositionIntegrationTest.java index 84c9c94278967..7f1591bf7ede4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventPositionIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventPositionIntegrationTest.java @@ -62,7 +62,10 @@ protected String getTestName() { @Override protected void beforeTest() { - client = createBuilder().buildAsyncClient(); + EventHubConnection connection = createBuilder().buildConnection(); + client = createBuilder() + .connection(connection) + .buildAsyncClient(); if (!HAS_PUSHED_EVENTS.getAndSet(true)) { final SendOptions options = new SendOptions().setPartitionId(PARTITION_ID); @@ -70,8 +73,8 @@ protected void beforeTest() { // Receiving back those events we sent so we have something to compare to. logger.info("Receiving the events we sent."); - final EventHubConsumerAsyncClient consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, - EventPosition.fromEnqueuedTime(testData.getEnqueuedTime())); + final EventHubConsumerAsyncClient consumer = client + .createConsumer(DEFAULT_CONSUMER_GROUP_NAME, EventPosition.fromEnqueuedTime(testData.getEnqueuedTime())); final List receivedEvents; try { receivedEvents = consumer.receive(PARTITION_ID) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorTest.java index f8deed641e5e4..06d59a2391868 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventProcessorTest.java @@ -102,7 +102,7 @@ public void testWithSimplePartitionProcessor() throws Exception { TracerProvider tracerProvider = new TracerProvider(tracers); when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); - when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); + when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns"); when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1")); when(eventHubAsyncClient @@ -180,7 +180,7 @@ public void testWithFaultyPartitionProcessor() throws Exception { final List tracers = Collections.singletonList(tracer1); TracerProvider tracerProvider = new TracerProvider(tracers); when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); - when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); + when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns"); when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1")); when(eventHubAsyncClient @@ -231,7 +231,7 @@ public void testErrorProcessSpans() throws Exception { final List tracers = Collections.singletonList(tracer1); TracerProvider tracerProvider = new TracerProvider(tracers); when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); - when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); + when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns"); when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1")); when(eventHubAsyncClient @@ -291,7 +291,7 @@ public void testProcessSpans() throws Exception { final List tracers = Collections.singletonList(tracer1); TracerProvider tracerProvider = new TracerProvider(tracers); when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); - when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); + when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns"); when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1")); when(eventHubAsyncClient @@ -356,7 +356,7 @@ public void testWithMultiplePartitions() throws Exception { when(eventHubClientBuilder.buildAsyncClient()).thenReturn(eventHubAsyncClient); when(eventHubAsyncClient.getPartitionIds()).thenReturn(Flux.just("1", "2", "3")); - when(eventHubAsyncClient.getFullyQualifiedNamespace()).thenReturn("test-ns"); + when(eventHubAsyncClient.getFullyQualifiedDomainName()).thenReturn("test-ns"); when(eventHubAsyncClient.getEventHubName()).thenReturn("test-eh"); when(eventHubAsyncClient .createConsumer(anyString(), any(EventPosition.class), any(EventHubConsumerOptions.class)))