Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

README and samples #4114

Merged
merged 25 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* General exception for AMQP related failures.
*
* @see ErrorCondition
* @see <a href="http://go.microsoft.com/fwlink/?LinkId=761101">Azure Messaging Exceptions</a>
* @see <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-messaging-exceptions">Azure Messaging Exceptions</a>
mssfang marked this conversation as resolved.
Show resolved Hide resolved
*/
public class AmqpException extends AzureException {
private static final long serialVersionUID = -3654294093967132325L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-amqp-error">AMQP
* 1.0: Transport Errors</a>
* @see <a href="http://go.microsoft.com/fwlink/?LinkId=761101">Azure Messaging Exceptions</a>
* @see <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-messaging-exceptions">Azure Messaging Exceptions</a>
*/
public enum ErrorCondition {
/**
Expand Down
185 changes: 185 additions & 0 deletions eventhubs/client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Azure Event Hubs client library for Java

Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream
them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected
devices and applications. Once Event Hubs has collected the data, you can retrieve, transform, and store it by using any
real-time analytics provider or with batching/storage adapters. If you would like to know more about Azure Event Hubs,
you may wish to review: [What is Event Hubs](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-about)?

The Azure Event Hubs client library allows for publishing and consuming of Azure Event Hubs events and may be used to:

- Emit telemetry about your application for business intelligence and diagnostic purposes.
- Publish facts about the state of your application which interested parties may observe and use as a trigger for taking
action.
- Observe interesting operations and interactions happening within your business or other ecosystem, allowing loosely
coupled systems to interact without the need to bind them together.
- Receive events from one or more publishers, transform them to better meet the needs of your ecosystem, then publish
the transformed events to a new stream for consumers to observe.

[Source code][source_code] | [Package (Maven) (coming soon)][package] | [API reference documentation][api_documentation]
mssfang marked this conversation as resolved.
Show resolved Hide resolved
| [Product documentation][event_hubs_product_docs]

## Getting started

### Prerequisites

- Java Development Kit (JDK) with version 8 or above
- [Maven][maven]
- Microsoft Azure subscription
- You can create a free account at: https://azure.microsoft.com
- Azure Event Hubs instance
- Step-by-step guide for [creating an Event Hub using the Azure Portal][event_hubs_create]

### Adding the package to your product

```xml
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>1.0.0-SNAPSHOT</version>
mssfang marked this conversation as resolved.
Show resolved Hide resolved
</dependency>
```

### Obtain a connection string

For the Event Hubs client library to interact with an Event Hub, it will need to understand how to connect and authorize
with it. The easiest means for doing so is to use a connection string, which is created automatically when creating an
Event Hubs namespace. If you aren't familiar with shared access policies in Azure, you may wish to follow the
step-by-step guide to [get an Event Hubs connection string][event_hubs_connection_string].

### Create an Event Hub client

Once the Event Hub and connection string are available, they can be used to create a client for interacting with Azure
Event Hubs. Create an `EventHubClient` using the `EventHubClientBuilder`:

```java
String connectionString = "<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>";
String eventHubName = "<< NAME OF THE EVENT HUB >>";
EventHubClientBuilder builder = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName);
EventHubClient client = builder.build();
mssfang marked this conversation as resolved.
Show resolved Hide resolved
```

## Key concepts

- An **Event Hub client** is the primary interface for developers interacting with the Event Hubs client library,
allowing for inspection of Event Hub metadata and providing a guided experience towards specific Event Hub operations
such as the creation of producers and consumers.

- An **Event Hub producer** is a source of telemetry data, diagnostics information, usage logs, or other log data, as
part of an embedded device solution, a mobile device application, a game title running on a console or other device,
some client or server based business solution, or a web site.

- An **Event Hub consumer** picks up such information from the Event Hub and processes it. Processing may involve
aggregation, complex computation and filtering. Processing may also involve distribution or storage of the information
mssfang marked this conversation as resolved.
Show resolved Hide resolved
in a raw or transformed fashion. Event Hub consumers are often robust and high-scale platform infrastructure parts
with built-in analytics capabilities, like Azure Stream Analytics, Apache Spark, or Apache Storm.

- A **partition** is an ordered sequence of events that is held in an Event Hub. Partitions are a means of data
organization associated with the parallelism required by event consumers. Azure Event Hubs provides message streaming
mssfang marked this conversation as resolved.
Show resolved Hide resolved
through a partitioned consumer pattern in which each consumer only reads a specific subset, or partition, of the
message stream. As newer events arrive, they are added to the end of this sequence. The number of partitions is
specified at the time an Event Hub is created and cannot be changed.

- A **consumer group** is a view of an entire Event Hub. Consumer groups enable multiple consuming applications to each
have a separate view of the event stream, and to read the stream independently at their own pace and from their own
position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended that
there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all of
the events from its partition; if there are multiple readers on the same partition, then they will receive duplicate
events.

For more concepts and deeper discussion, see: [Event Hubs Features][event_hubs_features]. Also, the concepts for AMQP
are well documented in [OASIS Advanced Messaging Queuing Protocol (AMQP) Version 1.0][oasis_amqp_v1].

## Examples

- [Publish an event to an Event Hub][sample_send_event]
conniey marked this conversation as resolved.
Show resolved Hide resolved
- [Consume events from an Event Hub partition][sample_receive_event]


## Troubleshooting

### Common exceptions
mssfang marked this conversation as resolved.
Show resolved Hide resolved

#### Logging in debug
You can use the ClientLogger class to get the debug logs. The class supports asVerbose(), asInfo(), asWarning(), and asError() as logging levels. An example of logging level, asInfo(), shows below
conniey marked this conversation as resolved.
Show resolved Hide resolved

```java
ClientLogger logger = new ClientLogger(Example.class);
try {
upload(resource);
} catch (Throwable ex) {
logger.asError().log("Failed to upload {}", resource.name(), ex);
}
```
conniey marked this conversation as resolved.
Show resolved Hide resolved

#### AMQP exception
This is a general exception for AMQP related failures, which includes the AMQP errors as ErrorCondition and the context
that caused this exception as ErrorContext. 'isTransient' is A boolean indicating if the exception is a transient error
mssfang marked this conversation as resolved.
Show resolved Hide resolved
or not. If true, then the request can be retried; otherwise not.

- ErrorCondition: it contains constants common to the AMQP protocol and constants shared by Azure services. More detail
mssfang marked this conversation as resolved.
Show resolved Hide resolved
can be found in the link: [Event Hubs Messaging Exceptions][event_hubs_messaging_exceptions].
- ErrorContext, it provides context that caused the AmqpException. The error occurs could be from AmqpConnection,
mssfang marked this conversation as resolved.
Show resolved Hide resolved
AmqpSession, or AmqpLink. Such as SessionErrorContext and LinkErrorContext, the context for an error that occurs in an
AMQP session and AMQP link, respectively.

The recommend way to solve the specific exception the AMQP exception represents, please take the recommended action in [Event Hubs Messaging Exceptions][event_hubs_messaging_exceptions].
mssfang marked this conversation as resolved.
Show resolved Hide resolved
#### Operation cancelled exception
mssfang marked this conversation as resolved.
Show resolved Hide resolved
It occurs when the underlying AMQP layer encounters an abnormal link abort or the connection is disconnected in an
unexpected fashion. It is recommended to attempt to verify the current state and retry if necessary.

#### Message size exceeded
Event data, both individual and in batches, have a maximum size allowed. This includes the data of the event, as well as
any associated metadata and system overhead. The best approach for resolving this error is to reduce the number of events
being sent in a batch or the size of data included in the message. Because size limits are subject to change, please
refer to Azure Event Hubs quotas and limits for specifics.


mssfang marked this conversation as resolved.
Show resolved Hide resolved
### Other exceptions
For detailed information about these and other exceptions that may occur, please refer to
[Event Hubs Messaging Exceptions][event_hubs_messaging_exceptions].

## Next steps
Beyond those discussed, the Azure Event Hubs client library offers support for
many additional scenarios to help take advantage of the full feature set of the Azure Event Hubs service. In order to help explore some of the these scenarios, the following set of sample is available:
mssfang marked this conversation as resolved.
Show resolved Hide resolved
- [Inspect Event Hub and partition properties][sample_get_event_hubs_metadata]
- [Publish an event to an Event Hub][sample_send_event]
- [Publish events to a specific Event Hub partition with producer option][sample_send_producer_option]
- [Publish events to a specific Event Hub partition with send option][sample_send_send_option]
- [Publish events with custom metadata][sample_send_custom_event_data]
- [Consume events from an Event Hub partition][sample_receive_event]
- [Consume event batch][sample_receive_batch]
mssfang marked this conversation as resolved.
Show resolved Hide resolved
- [Save the last read event and resume from that point][sample_sequence_number]

## Contributing

If you would like to become an active contributor to this project please follow the instructions provided in [Microsoft
mssfang marked this conversation as resolved.
Show resolved Hide resolved
Azure Projects Contribution Guidelines](http://azure.github.io/guidelines.html).

1. Fork it
mssfang marked this conversation as resolved.
Show resolved Hide resolved
1. Create your feature branch (`git checkout -b my-new-feature`)
1. Commit your changes (`git commit -am 'Add some feature'`)
1. Push to the branch (`git push origin my-new-feature`)
1. Create new Pull Request

<!-- Links -->
[api_documentation]: https://azuresdkartifacts.blob.core.windows.net/azure-sdk-for-java/index.html
[event_hubs_connection_string]: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string
[event_hubs_create]: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create
[event_hubs_product_docs]: https://docs.microsoft.com/en-us/azure/event-hubs/
[event_hubs_features]: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features
[maven]: https://maven.apache.org/
[package]: not-valid-link
[source_code]: https://github.com/Azure/azure-sdk-for-java/tree/master/eventhubs/client/
[event_hubs_messaging_exceptions]: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-messaging-exceptions
[amqp_transport_error]: https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-amqp-error
[oasis_amqp_v1]: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html
[sample_receive_event]: https://github.com/Azure/azure-sdk-for-java/blob/master/eventhubs/client/azure-eventhubs/src/samples/java/ReceiveEvent.java
[sample_send_event]:https://github.com/Azure/azure-sdk-for-java/blob/master/eventhubs/client/azure-eventhubs/src/samples/java/SendEvent.java
[sample_get_event_hubs_metadata]: https://github.com/Azure/azure-sdk-for-java/blob/master/eventhubs/client/azure-eventhubs/src/samples/java/GetEventHubMetadata.java
[sample_send_custom_event_data]: https://github.com/Azure/azure-sdk-for-java/blob/master/eventhubs/client/azure-eventhubs/src/samples/java/SendCustomEventDataList.java
[sample_sequence_number]: https://github.com/Azure/azure-sdk-for-java/blob/master/eventhubs/client/azure-eventhubs/src/samples/java/ReceiveEventsFromKnownSequenceNumberPosition.java
[sample_send_producer_option]: https://github.com/Azure/azure-sdk-for-java/blob/master/eventhubs/client/azure-eventhubs/src/samples/java/SendEventsWithProducerOptions.java
[sample_send_send_option]: https://github.com/Azure/azure-sdk-for-java/blob/master/eventhubs/client/azure-eventhubs/src/samples/java/SendEventDataListWIthSendOption.java
[sample_receive_batch]: https://github.com/Azure/azure-sdk-for-java/blob/master/eventhubs/client/azure-eventhubs/src/samples/java/ReceiveEventsByBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ public static void main(String[] args) throws InterruptedException {
// identifier to get information about each partition.
client.getPartitionIds().flatMap(partitionId -> client.getPartitionProperties(partitionId))
.subscribe(properties -> {
System.out.println("The Event Hub has the following properties:");
System.out.println(String.format(
mssfang marked this conversation as resolved.
Show resolved Hide resolved
"Event Hub: %s, Partition Id: %s, Last Enqueued Sequence Number: %s, Last Enqueued Offset: %s",
properties.eventHubPath(), properties.id(), properties.lastEnqueuedSequenceNumber(),
"Event Hub Name: %s; Partition Id: %s; Is partition empty? %s; First Sequence Number: %s; "
+ "Last Enqueued Time: %s; Last Enqueued Sequence Number: %s; Last Enqueued Offset: %s",
properties.eventHubPath(), properties.id(), properties.isEmpty(),
properties.beginningSequenceNumber(),
properties.lastEnqueuedTime(),
properties.lastEnqueuedSequenceNumber(),
properties.lastEnqueuedOffset()));
}, error -> {
System.err.println("Error occurred while fetching partition properties: " + error.toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
mssfang marked this conversation as resolved.
Show resolved Hide resolved
// Licensed under the MIT License.

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumer;
import com.azure.messaging.eventhubs.EventHubProducer;
import com.azure.messaging.eventhubs.EventHubProducerOptions;
import com.azure.messaging.eventhubs.EventPosition;
import reactor.core.Disposable;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample demonstrates on how to receive an event batch
*/
public class ReceiveEventsByBatch {
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);
private static final int EVENT_BATCH_SIZE = 10;
private static final int NUMBER_OF_EVENTS = 10;

/**
* Main method to invoke this demo about how to receive event batch from an Azure Event Hub instance.
*
* @param args Unused arguments to the program.
* @throws InterruptedException The countdown latch was interrupted while waiting for this sample to
* complete.
* @throws IOException If we were unable to dispose of the {@link EventHubClient}, {@link EventHubConsumer},
* or the {@link EventHubProducer}
*/
public static void main(String[] args) throws InterruptedException, IOException {
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_EVENTS);

// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";

// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.build();

// To create a consumer, we need to know what partition to connect to. We take the first partition id.
// .blockFirst() here is used to synchronously block until the first partition id is emitted. The maximum wait
// time is set by passing in the OPERATION_TIMEOUT value. If no item is emitted before the timeout elapses, a
// TimeoutException is thrown.
String firstPartition = client.getPartitionIds().blockFirst(OPERATION_TIMEOUT);

// Create a consumer.
// The "$Default" consumer group is created by default. This value can be found by going to the Event Hub
// instance you are connecting to, and selecting the "Consumer groups" page. EventPosition.latest() tells the
// service we only want events that are sent to the partition after we begin listening.
EventHubConsumer consumer = client.createConsumer(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
firstPartition, EventPosition.latest());

// We start receiving any events that come from `firstPartition`, print out the contents, and decrement the
// countDownLatch.
Disposable subscription = consumer.receive().subscribe(event -> {
String contents = UTF_8.decode(event.body()).toString();
System.out.println(String.format("[%s] Sequence Number: %s. Contents: %s", countDownLatch.getCount(),
event.sequenceNumber(), contents));

countDownLatch.countDown();
});

// Because the consumer is only listening to new events, we need to send some events to `firstPartition`.
// This creates a producer that only sends events to `firstPartition`.
EventHubProducerOptions producerOptions = new EventHubProducerOptions().partitionId(firstPartition);
EventHubProducer producer = client.createProducer(producerOptions);

// Crate 10 events
ArrayList<EventData> events = new ArrayList<>(EVENT_BATCH_SIZE);
for (int i = 0; i < EVENT_BATCH_SIZE; i++) {
events.add(new EventData(UTF_8.encode("I am Event " + i)));
}

// We create 10 events to send to the service and block until the send has completed.
producer.send(events).block(OPERATION_TIMEOUT);
// We wait for all the events to be received before continuing.
countDownLatch.await(OPERATION_TIMEOUT.getSeconds(), TimeUnit.SECONDS);

// Dispose and close of all the resources we've created.
subscription.dispose();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Shouldn't these be in a try/finally block to prevent leaking in the face of exceptions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we don't care about the exception from close() in the sample. It applies to all the samples we have. But need double check with @conniey.

Copy link
Member

@conniey conniey Jun 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that calling producer.send.block will throw an exception or countdown.await(), and even if those throw an exception,. you want to dispose of the subscription, so use try/finally

Copy link
Member

@jsquire jsquire Jun 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add two cents to that, if samples are to be the model for what we consider best practices, I would think it important to show a structure where there are no leaks and things that need to be closed/disposed are. In the worst case, I'd argue that we have an obligation to at least add a descriptive comment to that effect about how these should be closed and real-world code should expect exceptions and react appropriately.

producer.close();
consumer.close();
client.close();
}
}
Loading