Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Hubs: Synchronous APIs Part 2 #4970

Merged
merged 33 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
30ac167
Formatting changes in EventHubAsyncProducer.
conniey Aug 12, 2019
5c041aa
Adding EventHubClient, EventHubConsumer, and EventHubProducer.
conniey Aug 12, 2019
4f3bab3
Exposing EventHubClient creation in EventHubClientBuilder.
conniey Aug 12, 2019
5e10349
EventHubClient, Consumer and Producer implements Closeable.
conniey Aug 12, 2019
4727715
Fixing sample by removing event hub instance from namespace connectio…
conniey Aug 12, 2019
4c94da5
Remove unneeded sample in EventHubClientBuilder.
conniey Aug 12, 2019
42c099a
Add EventHubClient to builder annotation.
conniey Aug 12, 2019
d1ab53c
Update EventHubClientBuilder samples. Remove unneeded ones.
conniey Aug 12, 2019
eb02600
Update samples in EventHubClientBuilder.
conniey Aug 12, 2019
e473b67
Add documentation to EventHubConsumer.
conniey Aug 12, 2019
beb0090
Fixing links to EventHubAsyncProducer samples.
conniey Aug 13, 2019
17edced
Adding EventHubProducer code samples.
conniey Aug 13, 2019
d4a7871
Fixing spotbug issues.
conniey Aug 13, 2019
e4de89d
Update from Iterable to IterableResponse.
conniey Aug 13, 2019
1f48bf7
Make test contents package-private.
conniey Aug 13, 2019
49c2aaa
Update sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azur…
conniey Aug 13, 2019
3b5a012
Update sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azur…
conniey Aug 13, 2019
fd83955
Merge branch 'synchronous-client' of https://github.com/conniey/azure…
conniey Aug 13, 2019
0d40de4
Fix typo in connection strings.
conniey Aug 13, 2019
bbb1eb5
Fix javadoc errors.
conniey Aug 13, 2019
d4c89b2
Add Null checks for options.
conniey Aug 13, 2019
104fe4b
Fix null checks.
conniey Aug 13, 2019
d49d7c8
Remove unneeded log message.
conniey Aug 13, 2019
e942750
Adding tests for EventHubProducer.
conniey Aug 13, 2019
5f51b4e
Simplifying creation of EventHubAsyncProducer
conniey Aug 13, 2019
1018908
Remove redundant null check.
conniey Aug 13, 2019
470f4ef
Select correct retryDuration when constructing EventHubProducer.
conniey Aug 13, 2019
c648c2e
Adding EventHubProducer tests.
conniey Aug 13, 2019
cfb1d11
Add correct header file.
conniey Aug 13, 2019
f7cf4e4
Rename EventHubClientIntegrationTest -> EventHubAsyncClientIntegratio…
conniey Aug 13, 2019
48b960a
Add integration tests for EventHubClient.
conniey Aug 13, 2019
4f26053
Make EventHubConsumer methods public
conniey Aug 14, 2019
dbe3d82
Fix spotbug issues
conniey Aug 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@
* the Event Hubs namespace and offers operations for sending event data, receiving events, and inspecting the connected
* Event Hub.
*
* <p><strong>Creating an {@link EventHubAsyncClient} using Event Hubs namespace connection string</strong></p>
* <p><strong>Creating an {@link EventHubAsyncClient} using an Event Hubs namespace connection string</strong></p>
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncclient.connectionString#string-string}
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncclient.instantiation#string-string}
*
* <p><strong>Creating an {@link EventHubAsyncClient} using Event Hub instance connection string</strong></p>
* <p><strong>Creating an {@link EventHubAsyncClient} using an Event Hub instance connection string</strong></p>
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncclient.connectionstring#string}
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncclient.instantiation#string}
*
* @see EventHubClientBuilder
* @see <a href="https://docs.microsoft.com/Azure/event-hubs/event-hubs-about">About Azure Event Hubs</a>
Expand Down Expand Up @@ -236,20 +236,22 @@ public EventHubAsyncConsumer createConsumer(String consumerGroup, String partiti
* @param options The set of options to apply when creating the consumer.
* @return An new {@link EventHubAsyncConsumer} that receives events from the partition with all configured {@link
* EventHubConsumerOptions}.
* @throws NullPointerException If {@code eventPosition}, or {@code options} is {@code null}.
* @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an
* empty string.
* @throws NullPointerException If {@code eventPosition}, {@code consumerGroup}, {@code partitionId}, or {@code
* options} is {@code null}.
* @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is an empty string.
*/
public EventHubAsyncConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition,
EventHubConsumerOptions options) {
Objects.requireNonNull(eventPosition);
Objects.requireNonNull(options);
Objects.requireNonNull(consumerGroup);
Objects.requireNonNull(partitionId);

if (ImplUtils.isNullOrEmpty(consumerGroup)) {
throw new IllegalArgumentException("'consumerGroup' cannot be null or empty.");
throw new IllegalArgumentException("'consumerGroup' cannot be an empty string.");
}
if (ImplUtils.isNullOrEmpty(partitionId)) {
throw new IllegalArgumentException("'partitionId' cannot be null or empty.");
throw new IllegalArgumentException("'partitionId' cannot be an empty string.");
}

final EventHubConsumerOptions clonedOptions = options.clone();
Expand All @@ -268,8 +270,6 @@ public EventHubAsyncConsumer createConsumer(String consumerGroup, String partiti
return connection.createSession(entityPath).cast(EventHubSession.class);
}).flatMap(session -> {
logger.verbose("Creating consumer for path: {}", entityPath);

logger.verbose("Creating producer for {}", entityPath);
final RetryPolicy retryPolicy = RetryUtil.getRetryPolicy(clonedOptions.retry());

return session.createConsumer(linkName, entityPath, getExpression(eventPosition),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
* same partition because they all share the same {@link BatchOptions#partitionKey()}.
* <p>
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncproducer.send#eventDataBatch}
*
* @see EventHubAsyncClient#createProducer()
*/
@Immutable
Expand Down Expand Up @@ -166,10 +167,13 @@ public Mono<EventDataBatch> createBatch(BatchOptions options) {
/**
* Sends a single event to the associated Event Hub. If the size of the single event exceeds the maximum size
* allowed, an exception will be triggered and the send will fail.
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* </p>
*
* @param event Event to send to the service.
*
* @return A {@link Mono} that completes when the event is pushed to the service.
Expand All @@ -183,11 +187,13 @@ public Mono<Void> send(EventData event) {
/**
* Sends a single event to the associated Event Hub with the send options. If the size of the single event exceeds
* the maximum size allowed, an exception will be triggered and the send will fail.
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* @param event Event to send to the service.
* </p>
* @param event Event to send to the service.
* @param options The set of options to consider when sending this event.
*
* @return A {@link Mono} that completes when the event is pushed to the service.
Expand Down Expand Up @@ -217,7 +223,7 @@ public Mono<Void> send(Iterable<EventData> events) {
* Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the
* maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
* size is the max amount allowed on the link.
* @param events Events to send to the service.
* @param events Events to send to the service.
* @param options The set of options to consider when sending this batch.
*
* @return A {@link Mono} that completes when all events are pushed to the service.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.RetryOptions;
import com.azure.core.http.rest.IterableResponse;
import com.azure.core.implementation.annotation.ReturnType;
import com.azure.core.implementation.annotation.ServiceClient;
import com.azure.core.implementation.annotation.ServiceMethod;
import com.azure.messaging.eventhubs.implementation.ConnectionOptions;
import com.azure.messaging.eventhubs.models.EventHubConsumerOptions;
import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
import com.azure.messaging.eventhubs.models.EventPosition;

import java.io.Closeable;
import java.time.Duration;
import java.util.Objects;

/**
* The main point of interaction with Azure Event Hubs, the client offers a connection to a specific Event Hub within
* the Event Hubs namespace and offers operations for sending event data, receiving events, and inspecting the connected
* Event Hub.
*
* <p>
* <strong>Creating a synchronous {@link EventHubClient} using an Event Hub instance connection string</strong>
* </p>
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubclient.instantiation}
*
* @see EventHubClientBuilder
* @see EventHubAsyncClient To communicate with Event Hub using an asynchronous client.
* @see <a href="https://docs.microsoft.com/Azure/event-hubs/event-hubs-about">About Azure Event Hubs</a>
*/
@ServiceClient(builder = EventHubClientBuilder.class)
public class EventHubClient implements Closeable {
private final EventHubAsyncClient client;
private final RetryOptions retry;
private final EventHubProducerOptions defaultProducerOptions;
private final EventHubConsumerOptions defaultConsumerOptions;

EventHubClient(EventHubAsyncClient client, ConnectionOptions connectionOptions) {
Objects.requireNonNull(connectionOptions);

this.client = Objects.requireNonNull(client);
this.retry = connectionOptions.retry();
this.defaultProducerOptions = new EventHubProducerOptions()
.retry(connectionOptions.retry());
this.defaultConsumerOptions = new EventHubConsumerOptions()
.retry(connectionOptions.retry())
.scheduler(connectionOptions.scheduler());
}

/**
* Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
*
* @return The set of information for the Event Hub that this client is associated with.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventHubProperties getProperties() {
return client.getProperties().block(retry.tryTimeout());
}

/**
* Retrieves the identifiers for all the partitions of an Event Hub.
*
* @return The identifiers for all partitions of an Event Hub.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public IterableResponse<String> getPartitionIds() {
conniey marked this conversation as resolved.
Show resolved Hide resolved
return new IterableResponse<>(client.getPartitionIds());
}

/**
* Retrieves information about a specific partition for an Event Hub, including elements that describe the available
* events in the partition event stream.
*
* @param partitionId The unique identifier of a partition associated with the Event Hub.
* @return The information for the requested partition under the Event Hub this client is associated with.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public PartitionProperties getPartitionProperties(String partitionId) {
return client.getPartitionProperties(partitionId).block(retry.tryTimeout());
}

/**
* Creates an Event Hub producer responsible for transmitting {@link EventData} to the Event Hub, grouped together
* in batches. Event data is automatically routed to an available partition.
*
* @return A new {@link EventHubProducer}.
*/
public EventHubProducer createProducer() {
return createProducer(defaultProducerOptions);
}

/**
* Creates an Event Hub producer responsible for transmitting {@link EventData} to the Event Hub, grouped together
* in batches. If {@link EventHubProducerOptions#partitionId() options.partitionId()} is not {@code null}, the
* events are routed to that specific partition. Otherwise, events are automatically routed to an available
* partition.
*
* @param options The set of options to apply when creating the producer.
* @return A new {@link EventHubProducer}.
* @throws NullPointerException if {@code options} is {@code null}.
*/
public EventHubProducer createProducer(EventHubProducerOptions options) {
Objects.requireNonNull(options);

final EventHubAsyncProducer producer = client.createProducer(options);

final Duration tryTimeout = options.retry() != null && options.retry().tryTimeout() != null
? options.retry().tryTimeout()
: defaultProducerOptions.retry().tryTimeout();

return new EventHubProducer(producer, tryTimeout);
}

/**
* Creates an Event Hub consumer responsible for reading {@link EventData} from a specific Event Hub partition, as a
* member of the specified consumer group, and begins reading events from the {@code eventPosition}.
*
* The consumer created is non-exclusive, allowing multiple consumers from the same consumer group to be actively
* reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-epoch
* Consumers".
*
* @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in the
* context of this group. The name of the consumer group that is created by default is {@link
* EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
* @param partitionId The identifier of the Event Hub partition.
* @param eventPosition The position within the partition where the consumer should begin reading events.
* @return A new {@link EventHubConsumer} that receives events from the partition at the given position.
* @throws NullPointerException If {@code eventPosition}, {@code consumerGroup}, {@code partitionId}, or {@code
* options} is {@code null}.
* @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is an empty string.
*/
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition) {
final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition);
return new EventHubConsumer(consumer, defaultConsumerOptions);
}

/**
* Creates an Event Hub consumer responsible for reading {@link EventData} from a specific Event Hub partition, as a
* member of the configured consumer group, and begins reading events from the specified {@code eventPosition}.
*
* <p>
* A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that
* only one consumer from that group is reading from the partition. These exclusive consumers are sometimes referred
* to as "Epoch Consumers."
*
* A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively
* reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-epoch
* Consumers."
*
* Designating a consumer as exclusive may be specified in the {@code options}, by setting {@link
* EventHubConsumerOptions#ownerLevel(Long)} to a non-null value. By default, consumers are created as
* non-exclusive.
* </p>
*
* @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in the
* context of this group. The name of the consumer group that is created by default is {@link
* EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
* @param partitionId The identifier of the Event Hub partition from which events will be received.
* @param eventPosition The position within the partition where the consumer should begin reading events.
* @param options The set of options to apply when creating the consumer.
* @return An new {@link EventHubConsumer} that receives events from the partition with all configured {@link
* EventHubConsumerOptions}.
* @throws NullPointerException If {@code eventPosition}, {@code consumerGroup}, {@code partitionId}, or {@code
* options} is {@code null}.
* @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is an empty string.
*/
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition,
EventHubConsumerOptions options) {
final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition, options);
return new EventHubConsumer(consumer, options);
}

/**
* {@inheritDoc}
*/
@Override
public void close() {
client.close();
}
}
Loading