Skip to content

Commit

Permalink
Event Hubs: Synchronous APIs Part 2 (#4970)
Browse files Browse the repository at this point in the history
* Formatting changes in EventHubAsyncProducer.

* Adding EventHubClient, EventHubConsumer, and EventHubProducer.

* Exposing EventHubClient creation in EventHubClientBuilder.

* EventHubClient, Consumer and Producer implements Closeable.

* Fixing sample by removing event hub instance from namespace connection string.

* Remove unneeded sample in EventHubClientBuilder.

* Add EventHubClient to builder annotation.

* Update EventHubClientBuilder samples. Remove unneeded ones.

* Update samples in EventHubClientBuilder.

* Fixing links to EventHubAsyncProducer samples.

* Adding EventHubProducer code samples.

* Update from Iterable to IterableResponse.

* Make test contents package-private.

* Adding tests for EventHubProducer.

* Simplifying creation of EventHubAsyncProducer

* Select correct retryDuration when constructing EventHubProducer.

* Adding EventHubProducer tests.

* Rename EventHubClientIntegrationTest -> EventHubAsyncClientIntegrationTests

* Add integration tests for EventHubClient.

* Make EventHubConsumer methods public
  • Loading branch information
conniey authored Aug 14, 2019
1 parent 1b40f3a commit 7d24f9a
Show file tree
Hide file tree
Showing 16 changed files with 1,350 additions and 295 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@
* the Event Hubs namespace and offers operations for sending event data, receiving events, and inspecting the connected
* Event Hub.
*
* <p><strong>Creating an {@link EventHubAsyncClient} using Event Hubs namespace connection string</strong></p>
* <p><strong>Creating an {@link EventHubAsyncClient} using an Event Hubs namespace connection string</strong></p>
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncclient.connectionString#string-string}
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncclient.instantiation#string-string}
*
* <p><strong>Creating an {@link EventHubAsyncClient} using Event Hub instance connection string</strong></p>
* <p><strong>Creating an {@link EventHubAsyncClient} using an Event Hub instance connection string</strong></p>
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncclient.connectionstring#string}
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncclient.instantiation#string}
*
* @see EventHubClientBuilder
* @see <a href="https://docs.microsoft.com/Azure/event-hubs/event-hubs-about">About Azure Event Hubs</a>
Expand Down Expand Up @@ -236,20 +236,22 @@ public EventHubAsyncConsumer createConsumer(String consumerGroup, String partiti
* @param options The set of options to apply when creating the consumer.
* @return An new {@link EventHubAsyncConsumer} that receives events from the partition with all configured {@link
* EventHubConsumerOptions}.
* @throws NullPointerException If {@code eventPosition}, or {@code options} is {@code null}.
* @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an
* empty string.
* @throws NullPointerException If {@code eventPosition}, {@code consumerGroup}, {@code partitionId}, or {@code
* options} is {@code null}.
* @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is an empty string.
*/
public EventHubAsyncConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition,
EventHubConsumerOptions options) {
Objects.requireNonNull(eventPosition);
Objects.requireNonNull(options);
Objects.requireNonNull(consumerGroup);
Objects.requireNonNull(partitionId);

if (ImplUtils.isNullOrEmpty(consumerGroup)) {
throw new IllegalArgumentException("'consumerGroup' cannot be null or empty.");
throw new IllegalArgumentException("'consumerGroup' cannot be an empty string.");
}
if (ImplUtils.isNullOrEmpty(partitionId)) {
throw new IllegalArgumentException("'partitionId' cannot be null or empty.");
throw new IllegalArgumentException("'partitionId' cannot be an empty string.");
}

final EventHubConsumerOptions clonedOptions = options.clone();
Expand All @@ -268,8 +270,6 @@ public EventHubAsyncConsumer createConsumer(String consumerGroup, String partiti
return connection.createSession(entityPath).cast(EventHubSession.class);
}).flatMap(session -> {
logger.verbose("Creating consumer for path: {}", entityPath);

logger.verbose("Creating producer for {}", entityPath);
final RetryPolicy retryPolicy = RetryUtil.getRetryPolicy(clonedOptions.retry());

return session.createConsumer(linkName, entityPath, getExpression(eventPosition),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
* same partition because they all share the same {@link BatchOptions#partitionKey()}.
* <p>
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncproducer.send#eventDataBatch}
*
* @see EventHubAsyncClient#createProducer()
*/
@Immutable
Expand Down Expand Up @@ -166,10 +167,13 @@ public Mono<EventDataBatch> createBatch(BatchOptions options) {
/**
* Sends a single event to the associated Event Hub. If the size of the single event exceeds the maximum size
* allowed, an exception will be triggered and the send will fail.
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* </p>
*
* @param event Event to send to the service.
*
* @return A {@link Mono} that completes when the event is pushed to the service.
Expand All @@ -183,11 +187,13 @@ public Mono<Void> send(EventData event) {
/**
* Sends a single event to the associated Event Hub with the send options. If the size of the single event exceeds
* the maximum size allowed, an exception will be triggered and the send will fail.
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* @param event Event to send to the service.
* </p>
* @param event Event to send to the service.
* @param options The set of options to consider when sending this event.
*
* @return A {@link Mono} that completes when the event is pushed to the service.
Expand Down Expand Up @@ -217,7 +223,7 @@ public Mono<Void> send(Iterable<EventData> events) {
* Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the
* maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
* size is the max amount allowed on the link.
* @param events Events to send to the service.
* @param events Events to send to the service.
* @param options The set of options to consider when sending this batch.
*
* @return A {@link Mono} that completes when all events are pushed to the service.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.RetryOptions;
import com.azure.core.http.rest.IterableResponse;
import com.azure.core.implementation.annotation.ReturnType;
import com.azure.core.implementation.annotation.ServiceClient;
import com.azure.core.implementation.annotation.ServiceMethod;
import com.azure.messaging.eventhubs.implementation.ConnectionOptions;
import com.azure.messaging.eventhubs.models.EventHubConsumerOptions;
import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
import com.azure.messaging.eventhubs.models.EventPosition;

import java.io.Closeable;
import java.time.Duration;
import java.util.Objects;

/**
* The main point of interaction with Azure Event Hubs, the client offers a connection to a specific Event Hub within
* the Event Hubs namespace and offers operations for sending event data, receiving events, and inspecting the connected
* Event Hub.
*
* <p>
* <strong>Creating a synchronous {@link EventHubClient} using an Event Hub instance connection string</strong>
* </p>
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubclient.instantiation}
*
* @see EventHubClientBuilder
* @see EventHubAsyncClient To communicate with Event Hub using an asynchronous client.
* @see <a href="https://docs.microsoft.com/Azure/event-hubs/event-hubs-about">About Azure Event Hubs</a>
*/
@ServiceClient(builder = EventHubClientBuilder.class)
public class EventHubClient implements Closeable {
private final EventHubAsyncClient client;
private final RetryOptions retry;
private final EventHubProducerOptions defaultProducerOptions;
private final EventHubConsumerOptions defaultConsumerOptions;

EventHubClient(EventHubAsyncClient client, ConnectionOptions connectionOptions) {
Objects.requireNonNull(connectionOptions);

this.client = Objects.requireNonNull(client);
this.retry = connectionOptions.retry();
this.defaultProducerOptions = new EventHubProducerOptions()
.retry(connectionOptions.retry());
this.defaultConsumerOptions = new EventHubConsumerOptions()
.retry(connectionOptions.retry())
.scheduler(connectionOptions.scheduler());
}

/**
* Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
*
* @return The set of information for the Event Hub that this client is associated with.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventHubProperties getProperties() {
return client.getProperties().block(retry.tryTimeout());
}

/**
* Retrieves the identifiers for all the partitions of an Event Hub.
*
* @return The identifiers for all partitions of an Event Hub.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public IterableResponse<String> getPartitionIds() {
return new IterableResponse<>(client.getPartitionIds());
}

/**
* Retrieves information about a specific partition for an Event Hub, including elements that describe the available
* events in the partition event stream.
*
* @param partitionId The unique identifier of a partition associated with the Event Hub.
* @return The information for the requested partition under the Event Hub this client is associated with.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public PartitionProperties getPartitionProperties(String partitionId) {
return client.getPartitionProperties(partitionId).block(retry.tryTimeout());
}

/**
* Creates an Event Hub producer responsible for transmitting {@link EventData} to the Event Hub, grouped together
* in batches. Event data is automatically routed to an available partition.
*
* @return A new {@link EventHubProducer}.
*/
public EventHubProducer createProducer() {
return createProducer(defaultProducerOptions);
}

/**
* Creates an Event Hub producer responsible for transmitting {@link EventData} to the Event Hub, grouped together
* in batches. If {@link EventHubProducerOptions#partitionId() options.partitionId()} is not {@code null}, the
* events are routed to that specific partition. Otherwise, events are automatically routed to an available
* partition.
*
* @param options The set of options to apply when creating the producer.
* @return A new {@link EventHubProducer}.
* @throws NullPointerException if {@code options} is {@code null}.
*/
public EventHubProducer createProducer(EventHubProducerOptions options) {
Objects.requireNonNull(options);

final EventHubAsyncProducer producer = client.createProducer(options);

final Duration tryTimeout = options.retry() != null && options.retry().tryTimeout() != null
? options.retry().tryTimeout()
: defaultProducerOptions.retry().tryTimeout();

return new EventHubProducer(producer, tryTimeout);
}

/**
* Creates an Event Hub consumer responsible for reading {@link EventData} from a specific Event Hub partition, as a
* member of the specified consumer group, and begins reading events from the {@code eventPosition}.
*
* The consumer created is non-exclusive, allowing multiple consumers from the same consumer group to be actively
* reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-epoch
* Consumers".
*
* @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in the
* context of this group. The name of the consumer group that is created by default is {@link
* EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
* @param partitionId The identifier of the Event Hub partition.
* @param eventPosition The position within the partition where the consumer should begin reading events.
* @return A new {@link EventHubConsumer} that receives events from the partition at the given position.
* @throws NullPointerException If {@code eventPosition}, {@code consumerGroup}, {@code partitionId}, or {@code
* options} is {@code null}.
* @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is an empty string.
*/
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition) {
final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition);
return new EventHubConsumer(consumer, defaultConsumerOptions);
}

/**
* Creates an Event Hub consumer responsible for reading {@link EventData} from a specific Event Hub partition, as a
* member of the configured consumer group, and begins reading events from the specified {@code eventPosition}.
*
* <p>
* A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that
* only one consumer from that group is reading from the partition. These exclusive consumers are sometimes referred
* to as "Epoch Consumers."
*
* A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively
* reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-epoch
* Consumers."
*
* Designating a consumer as exclusive may be specified in the {@code options}, by setting {@link
* EventHubConsumerOptions#ownerLevel(Long)} to a non-null value. By default, consumers are created as
* non-exclusive.
* </p>
*
* @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in the
* context of this group. The name of the consumer group that is created by default is {@link
* EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
* @param partitionId The identifier of the Event Hub partition from which events will be received.
* @param eventPosition The position within the partition where the consumer should begin reading events.
* @param options The set of options to apply when creating the consumer.
* @return An new {@link EventHubConsumer} that receives events from the partition with all configured {@link
* EventHubConsumerOptions}.
* @throws NullPointerException If {@code eventPosition}, {@code consumerGroup}, {@code partitionId}, or {@code
* options} is {@code null}.
* @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is an empty string.
*/
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition,
EventHubConsumerOptions options) {
final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition, options);
return new EventHubConsumer(consumer, options);
}

/**
* {@inheritDoc}
*/
@Override
public void close() {
client.close();
}
}
Loading

0 comments on commit 7d24f9a

Please sign in to comment.