Skip to content

Commit

Permalink
Update Javadocs for Event Hubs (#35284)
Browse files Browse the repository at this point in the history
* Remove overlapping AAD sample.

* Begin updating package-info and builder.

* Clean up EHCB docs.

* Add documentation and samples for models classes.

* Add Null check for checkpointing sample.

* Cleaning up signature samples.

* Adding more consumer async docs.

* Update producer snippets.

* Add more documentation to buffered producer. Consolidate README samples.

* Add samples for buffered producer.

* Adding more snippets.

* Updating generation of docs

* Fix variable names.

* Update documentation based on feedback.
  • Loading branch information
conniey authored Jun 20, 2023
1 parent faa5819 commit 335b6ed
Show file tree
Hide file tree
Showing 22 changed files with 1,748 additions and 760 deletions.
158 changes: 84 additions & 74 deletions sdk/eventhubs/azure-messaging-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,8 @@ Both the asynchronous and synchronous Event Hub producer and consumer clients ca

The snippet below creates a synchronous Event Hub producer.

```java readme-sample-createSynchronousEventHubProducer
// The credential used is DefaultAzureCredential because it combines commonly used credentials
// in deployment and development and chooses the credential to used based on its running environment.
// More information can be found at: https://learn.microsoft.com/java/api/overview/azure/identity-readme
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
```java com.azure.messaging.eventhubs.eventhubproducerclient.construct
TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
Expand Down Expand Up @@ -156,16 +152,12 @@ Authorization is easiest using [DefaultAzureCredential][wiki_identity]. It finds
running environment. For more information about using Azure Active Directory authorization with Event Hubs, please refer
to [the associated documentation][aad_authorization].

```java readme-sample-useAadAuthorization
// The credential used is DefaultAzureCredential because it combines commonly used credentials
// in deployment and development and chooses the credential to used based on its running environment.
// More information can be found at: https://learn.microsoft.com/java/api/overview/azure/identity-readme
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
```java com.azure.messaging.eventhubs.eventhubproducerclient.construct
TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubProducerClient client = new EventHubClientBuilder()
EventHubProducerClient producer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.buildProducerClient();
Expand Down Expand Up @@ -217,12 +209,8 @@ Hubs service to hash the events and send them to the same partition.
The snippet below creates a synchronous producer and sends events to any partition, allowing Event Hubs service to route
the event to an available partition.

```java readme-sample-publishEvents
// The credential used is DefaultAzureCredential because it combines commonly used credentials
// in deployment and development and chooses the credential to used based on its running environment.
// More information can be found at: https://learn.microsoft.com/java/api/overview/azure/identity-readme
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
```java com.azure.messaging.eventhubs.eventhubproducerclient.createBatch
TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
Expand All @@ -244,10 +232,15 @@ for (EventData eventData : allEvents) {
}
}
}

// send the last batch of remaining events
if (eventDataBatch.getCount() > 0) {
producer.send(eventDataBatch);
}

// Clients are expected to be long-lived objects.
// Dispose of the producer to close any underlying resources when we are finished with it.
producer.close();
```
Note that `EventDataBatch.tryAdd(EventData)` is not thread-safe. Please make sure to synchronize the method access
when using multiple threads to add events.
Expand All @@ -258,18 +251,15 @@ Many Event Hub operations take place within the scope of a specific partition. A
`getPartitionIds()` or `getEventHubProperties()` to get the partition ids and metadata about in their Event Hub
instance.

```java readme-sample-publishEventsToPartition
// The credential used is DefaultAzureCredential because it combines commonly used credentials
// in deployment and development and chooses the credential to used based on its running environment.
// More information can be found at: https://learn.microsoft.com/java/api/overview/azure/identity-readme
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
```java com.azure.messaging.eventhubs.eventhubproducerclient.createBatch#CreateBatchOptions-partitionId
TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.buildProducerClient();

// Creating a batch with partitionId set will route all events in that batch to partition `0`.
CreateBatchOptions options = new CreateBatchOptions().setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);

Expand All @@ -281,25 +271,21 @@ producer.send(batch);

When a set of events are not associated with any specific partition, it may be desirable to request that the Event
Hubs service keep different events or batches of events together on the same partition. This can be accomplished by
setting a `partition key` when publishing the events.
setting a `partition key` when publishing the events. In the scenario below, all the events are related to cities, so they are sent with the partition key set to "cities".

```java readme-sample-publishEventsWithPartitionKey
// The credential used is DefaultAzureCredential because it combines commonly used credentials
// in deployment and development and chooses the credential to used based on its running environment.
// More information can be found at: https://learn.microsoft.com/java/api/overview/azure/identity-readme
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
```java com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable-SendOptions
TokenCredential credential = new DefaultAzureCredentialBuilder().build();

EventHubProducerClient producer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.buildProducerClient();

CreateBatchOptions batchOptions = new CreateBatchOptions().setPartitionKey("grouping-key");
EventDataBatch eventDataBatch = producer.createBatch(batchOptions);
List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
new EventData("New York"));

// Add events to batch and when you want to send the batch, send it using the producer.
producer.send(eventDataBatch);
SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions);
```

### Consume events from an Event Hub partition
Expand All @@ -315,46 +301,79 @@ to the newest events that get pushed to the partition. Developers can begin rece
the same `EventHubConsumerAsyncClient` by calling `receiveFromPartition(String, EventPosition)` with another partition
id.

```java readme-sample-consumeEventsFromPartition
```java com.azure.messaging.eventhubs.eventhubconsumerasyncclient.receive#string-eventposition
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
new DefaultAzureCredentialBuilder().build())
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();

// Receive newly added events from partition with id "0". EventPosition specifies the position
// within the Event Hub partition to begin consuming events.
consumer.receiveFromPartition("0", EventPosition.latest()).subscribe(event -> {
// Process each event as it arrives.
});
// add sleep or System.in.read() to receive events before exiting the process.
// Take a specific partitionId from EventHubConsumerAsyncClient.getPartitionIds()
String partitionId = "0";
EventPosition startingPosition = EventPosition.latest();

// Keep a reference to `subscription`. When the program is finished receiving events, call
// subscription.dispose(). This will stop fetching events from the Event Hub.
//
// NOTE: This is a non-blocking call and will move to the next line of code after setting up the async
// operation. If the program ends after this, or the class is immediately disposed, no events will be
// received.
Disposable subscription = consumer.receiveFromPartition(partitionId, startingPosition)
.subscribe(partitionEvent -> {
PartitionContext partitionContext = partitionEvent.getPartitionContext();
EventData event = partitionEvent.getData();

System.out.printf("Received event from partition '%s'%n", partitionContext.getPartitionId());
System.out.printf("Contents of event as string: '%s'%n", event.getBodyAsString());
}, error -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.err.print("An error occurred:" + error);
}, () -> {
// This is a terminal signal. No more events will be received from the same Flux object.
System.out.print("Stream has ended.");
});
```

#### Consume events with EventHubConsumerClient

Developers can create a synchronous consumer that returns events in batches using an `EventHubConsumerClient`. In the
snippet below, a consumer is created that starts reading events from the beginning of the partition's event stream.

```java readme-sample-consumeEventsFromPartitionUsingSyncClient
// The credential used is DefaultAzureCredential because it combines commonly used credentials
// in deployment and development and chooses the credential to used based on its running environment.
// More information can be found at: https://learn.microsoft.com/java/api/overview/azure/identity-readme
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
```java com.azure.messaging.eventhubs.eventhubconsumerclient.receive#string-int-eventposition-duration
TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubConsumerClient consumer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildConsumerClient();

String partitionId = "<< EVENT HUB PARTITION ID >>";
Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
String partitionId = "0";

// Get the first 15 events in the stream, or as many events as can be received within 40 seconds.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 15,
EventPosition.earliest(), Duration.ofSeconds(40));
for (PartitionEvent event : events) {
System.out.println("Event: " + event.getData().getBodyAsString());
// Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
startingPosition, Duration.ofSeconds(30));

Long lastSequenceNumber = -1L;
for (PartitionEvent partitionEvent : events) {
// For each event, perform some sort of processing.
System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
}

// Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
// If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
// partition.
if (lastSequenceNumber != -1L) {
EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);

// Gets the next set of events from partition '0' to consume and process.
IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
nextPosition, Duration.ofSeconds(30));
}
```

Expand All @@ -372,36 +391,27 @@ In our example, we will focus on building the [`EventProcessorClient`][EventProc
received from the Event Hub and writes to console. For production applications, it's recommended to use a durable
store like [Checkpoint Store with Azure Storage Blobs][BlobCheckpointStore].

```java readme-sample-consumeEventsUsingEventProcessor
// The credential used is DefaultAzureCredential because it combines commonly used credentials
// in deployment and development and chooses the credential to used based on its running environment.
// More information can be found at: https://learn.microsoft.com/java/api/overview/azure/identity-readme
TokenCredential credential = new DefaultAzureCredentialBuilder()
.build();
```java com.azure.messaging.eventhubs.eventprocessorclientbuilder.construct
TokenCredential credential = new DefaultAzureCredentialBuilder().build();

// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out
.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();

// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();

// (for demo purposes only - adding sleep to wait for receiving events)
TimeUnit.SECONDS.sleep(2);

// This will stop processing events.
eventProcessorClient.stop();
```

## Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,14 @@
import static java.nio.charset.StandardCharsets.UTF_8;

/**
* The data structure encapsulating the event being sent-to and received-from Event Hubs. Each Event Hub partition can
* be visualized as a stream of {@link EventData}. This class is not thread-safe.
*
* <p>
* Here's how AMQP message sections map to {@link EventData}. For reference, the specification can be found here:
* <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf">AMQP 1.0 specification</a>
*
* <ol>
* <li>{@link #getProperties()} - AMQPMessage.ApplicationProperties section</li>
* <li>{@link #getBody()} - if AMQPMessage.Body has Data section</li>
* </ol>
*
* <p>
* Serializing a received {@link EventData} with AMQP sections other than ApplicationProperties (with primitive Java
* types) and Data section is not supported.
* </p>
* <p>The data structure encapsulating the event being sent-to and received-from Event Hubs. Each Event Hub partition
* can be visualized as a stream of {@link EventData}. This class is not thread-safe.</p>
*
* @see EventDataBatch
* @see EventHubProducerClient
* @see EventHubProducerAsyncClient
*
* @see <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf">AMQP 1.0 specification</a>
*/
public class EventData extends MessageContent {
/*
Expand Down Expand Up @@ -353,7 +341,8 @@ public AmqpAnnotatedMessage getRawAmqpMessage() {
}

/**
* Gets the content type.
* Gets the MIME type describing the data contained in the {@link #getBody()}, intended to allow consumers to make
* informed decisions for inspecting and processing the event.
*
* @return The content type.
*/
Expand All @@ -362,7 +351,8 @@ public String getContentType() {
}

/**
* Sets the content type.
* Sets the MIME type describing the data contained in the {@link #getBody()}, intended to allow consumers to make
* informed decisions for inspecting and processing the event.
*
* @param contentType The content type.
*
Expand All @@ -374,7 +364,9 @@ public EventData setContentType(String contentType) {
}

/**
* Gets the correlation id.
* Gets an application-defined value that represents the context to use for correlation across one or more
* operations. The identifier is a free-form value and may reflect a unique identity or a shared data element with
* significance to the application.
*
* @return The correlation id. {@code null} if there is none set.
*/
Expand All @@ -384,7 +376,9 @@ public String getCorrelationId() {
}

/**
* Sets the correlation id.
* Sets an application-defined value that represents the context to use for correlation across one or more
* operations. The identifier is a free-form value and may reflect a unique identity or a shared data element with
* significance to the application.
*
* @param correlationId The correlation id.
*
Expand All @@ -398,7 +392,8 @@ public EventData setCorrelationId(String correlationId) {
}

/**
* Gets the message id.
* Gets an application-defined value that uniquely identifies the event. The identifier is a free-form value and
* can reflect a GUID or an identifier derived from the application context.
*
* @return The message id. {@code null} if there is none set.
*/
Expand All @@ -408,7 +403,8 @@ public String getMessageId() {
}

/**
* Sets the message id.
* Sets an application-defined value that uniquely identifies the event. The identifier is a free-form value and
* can reflect a GUID or an identifier derived from the application context.
*
* @param messageId The message id.
*
Expand All @@ -422,7 +418,7 @@ public EventData setMessageId(String messageId) {
}

/**
* {@inheritDoc}
* True if the object is an {@link EventData} and the binary contents of {@link #getBody()} are equal.
*/
@Override
public boolean equals(Object o) {
Expand All @@ -439,7 +435,7 @@ public boolean equals(Object o) {
}

/**
* {@inheritDoc}
* Gets a hash of the binary contents in {@link #getBody()}.
*/
@Override
public int hashCode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

/**
* A class for aggregating {@link EventData} into a single, size-limited, batch. It is treated as a single message when
* sent to the Azure Event Hubs service.
* sent to the Azure Event Hubs service. {@link EventDataBatch} is recommended in scenarios requiring high throughput
* for publishing events.
*
* @see EventHubProducerClient#createBatch()
* @see EventHubProducerClient#createBatch(CreateBatchOptions)
Expand Down
Loading

0 comments on commit 335b6ed

Please sign in to comment.