Skip to content

Commit

Permalink
README and samples (#4114)
Browse files Browse the repository at this point in the history
* Add README
* Update Key concepts, Troubleshooting, Next steps
* Add samples
  • Loading branch information
mssfang authored and conniey committed Jun 27, 2019
1 parent 57f1296 commit 9e97d3b
Show file tree
Hide file tree
Showing 10 changed files with 700 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* General exception for AMQP related failures.
*
* @see ErrorCondition
* @see <a href="http://go.microsoft.com/fwlink/?LinkId=761101">Azure Messaging Exceptions</a>
* @see <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-messaging-exceptions">Azure Messaging Exceptions</a>
*/
public class AmqpException extends AzureException {
private static final long serialVersionUID = -3654294093967132325L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
* @see <a href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-amqp-error">AMQP
* 1.0: Transport Errors</a>
* @see <a href="http://go.microsoft.com/fwlink/?LinkId=761101">Azure Messaging Exceptions</a>
* @see <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-messaging-exceptions">Azure Messaging Exceptions</a>
*/
public enum ErrorCondition {
/**
Expand Down
285 changes: 285 additions & 0 deletions eventhubs/client/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* Sample demonstrates how to receive events from an Azure Event Hub instance.
*/
public class ReceiveEvent {
public class ConsumeEvent {
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);
private static final int NUMBER_OF_EVENTS = 10;

Expand Down Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) throws InterruptedException, IOException
Disposable subscription = consumer.receive().subscribe(event -> {
String contents = UTF_8.decode(event.body()).toString();
System.out.println(String.format("[%s] Sequence Number: %s. Contents: %s", countDownLatch.getCount(),
event.offset(), contents));
event.sequenceNumber(), contents));

countDownLatch.countDown();
});
Expand All @@ -83,17 +83,18 @@ public static void main(String[] args) throws InterruptedException, IOException
return producer.send(new EventData(body.getBytes(UTF_8)));
}).blockLast(OPERATION_TIMEOUT);

// We wait for all the events to be received before continuing.
boolean isSuccessful = countDownLatch.await(OPERATION_TIMEOUT.getSeconds(), TimeUnit.SECONDS);

if (!isSuccessful) {
System.err.printf("Did not complete successfully. There are: %s events left.", countDownLatch.getCount());
try {
// We wait for all the events to be received before continuing.
boolean isSuccessful = countDownLatch.await(OPERATION_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
if (!isSuccessful) {
System.err.printf("Did not complete successfully. There are: %s events left.", countDownLatch.getCount());
}
} finally {
// Dispose and close of all the resources we've created.
subscription.dispose();
producer.close();
consumer.close();
client.close();
}

// Dispose and close of all the resources we've created.
subscription.dispose();
producer.close();
consumer.close();
client.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumer;
import com.azure.messaging.eventhubs.EventHubProducer;
import com.azure.messaging.eventhubs.EventHubProducerOptions;
import com.azure.messaging.eventhubs.EventPosition;
import reactor.core.Disposable;

import java.io.IOException;
import java.time.Duration;

import java.util.concurrent.Semaphore;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample demonstrates how to receive events starting from the specific sequence number position in an Event Hub instance.
*/
public class ConsumeEventsFromKnownSequenceNumberPosition {
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);
private static long lastEnqueuedSequenceNumber = -1;
private static String lastEnqueuedSequencePartitionID = null;

/**
* Main method to invoke this demo about how to receive event from a known sequence number position in an Azure Event Hub instance.
*
* @param args Unused arguments to the program.
* @throws InterruptedException The countdown latch was interrupted while waiting for this sample to
* complete.
* @throws IOException If we were unable to dispose of the {@link EventHubClient}, {@link EventHubConsumer},
* or the {@link EventHubProducer}
*/
public static void main(String[] args) throws InterruptedException, IOException {
Semaphore semaphore = new Semaphore(0);

// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";

// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncClient();

client.getPartitionIds().flatMap(partitionId -> client.getPartitionProperties(partitionId))
.subscribe(
properties -> {
if (!properties.isEmpty()) {
lastEnqueuedSequenceNumber = properties.lastEnqueuedSequenceNumber();
lastEnqueuedSequencePartitionID = properties.id();
}
},
error -> System.err.println("Error occurred while fetching partition properties: " + error.toString()),
() -> {
// Releasing the semaphore now that we've finished querying for partition properties.
semaphore.release();
});

System.out.println("Waiting for partition properties to complete...");
// Acquiring the semaphore so that this sample does not end before all the partition properties are fetched.
semaphore.acquire();
System.out.printf("Last enqueued sequence number: %s\n", lastEnqueuedSequenceNumber);

// Make sure to have at least one non-empty event hub in order to continue the sample execution
// if you don't have an non-empty event hub, try with another example 'SendEvent' in the same directory.
if (lastEnqueuedSequenceNumber == -1 || lastEnqueuedSequencePartitionID == null) {
System.err.println("All event hubs are empty");
System.exit(0);
}

// Create a consumer.
// The "$Default" consumer group is created by default. This value can be found by going to the Event Hub
// instance you are connecting to, and selecting the "Consumer groups" page. EventPosition.latest() tells the
// service we only want events that are sent to the partition after we begin listening.
EventHubConsumer consumer = client.createConsumer(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
lastEnqueuedSequencePartitionID, EventPosition.fromSequenceNumber(lastEnqueuedSequenceNumber, false));

// We start receiving any events that come from `firstPartition`, print out the contents, and decrement the
// countDownLatch.
Disposable subscription = consumer.receive().subscribe(event -> {
String contents = UTF_8.decode(event.body()).toString();
// ex. The last enqueued sequence number is 99. If isInclusive is true, the received event starting from the same
// event with sequence number of '99'. Otherwise, the event with sequence number of '100' will be the first
// event received.
System.out.println(String.format("Receiving an event starting from the sequence number: %s. Contents: %s",
event.sequenceNumber(), contents));

semaphore.release();
});

// Because the consumer is only listening to new events, we need to send some events to `firstPartition`.
// This creates a producer that only sends events to `lastEnqueuedSequencePartitionID`.
EventHubProducerOptions producerOptions = new EventHubProducerOptions().partitionId(lastEnqueuedSequencePartitionID);
EventHubProducer producer = client.createProducer(producerOptions);

producer.send(new EventData("Hello world!".getBytes(UTF_8))).block(OPERATION_TIMEOUT);
// Acquiring the semaphore so that this sample does not end before all events are fetched.
semaphore.acquire();

// Dispose and close of all the resources we've created.
subscription.dispose();
producer.close();
consumer.close();
client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ public static void main(String[] args) throws InterruptedException {
// identifier to get information about each partition.
client.getPartitionIds().flatMap(partitionId -> client.getPartitionProperties(partitionId))
.subscribe(properties -> {
System.out.println(String.format(
"Event Hub: %s, Partition Id: %s, Last Enqueued Sequence Number: %s, Last Enqueued Offset: %s",
properties.eventHubPath(), properties.id(), properties.lastEnqueuedSequenceNumber(),
properties.lastEnqueuedOffset()));
System.out.println("The Event Hub has the following properties:");
System.out.printf(
"Event Hub Name: %s; Partition Id: %s; Is partition empty? %s; First Sequence Number: %s; "
+ "Last Enqueued Time: %s; Last Enqueued Sequence Number: %s; Last Enqueued Offset: %s \n",
properties.eventHubPath(), properties.id(), properties.isEmpty(),
properties.beginningSequenceNumber(),
properties.lastEnqueuedTime(),
properties.lastEnqueuedSequenceNumber(),
properties.lastEnqueuedOffset());
}, error -> {
System.err.println("Error occurred while fetching partition properties: " + error.toString());
}, () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/**
* Sample demonstrates how to send a message to an Azure Event Hub.
*/
public class SendEvent {
public class PublishEvent {
/**
* Main method to invoke this demo on how to send a message to an Azure Event Hub.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducer;
import com.azure.messaging.eventhubs.EventHubProducerOptions;
import reactor.core.publisher.Flux;

import java.io.IOException;
import java.time.Duration;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample demonstrates how to sent events to specific event hub by define partition ID in producer option only.
*/
public class PublishEventsToSpecificPartition {
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);

/**
* Main method to invoke this demo about how to send a list of events with partition ID configured in producer option
* to an Azure Event Hub instance.
*
* @param args Unused arguments to the program.
*/
public static void main(String[] args) {
// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}";

// Instantiate a client that will be used to call the service.
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncClient();

// To create a consumer, we need to know what partition to connect to. We take the first partition id.
// .blockFirst() here is used to synchronously block until the first partition id is emitted. The maximum wait
// time is set by passing in the OPERATION_TIMEOUT value. If no item is emitted before the timeout elapses, a
// TimeoutException is thrown.
String firstPartition = client.getPartitionIds().blockFirst(OPERATION_TIMEOUT);

// When an Event Hub producer is associated with any specific partition, it can publish events only to that partition.
// The producer has no ability to ask for the service to route events, including by using a partition key.
//
// If you attempt to use a partition key with an Event Hub producer that is associated with a partition, an exception
// will occur. Otherwise, publishing to a specific partition is exactly the same as other publishing scenarios.
EventHubProducerOptions producerOptions = new EventHubProducerOptions().partitionId(firstPartition);

// Create a producer. Consequently, events sent from this producer will deliver to the specific partition ID Event Hub instance.
EventHubProducer producer = client.createProducer(producerOptions);

// We will publish three events based on simple sentences.
Flux<EventData> data = Flux.just(
new EventData("EventData Sample 1".getBytes(UTF_8)),
new EventData("EventData Sample 2".getBytes(UTF_8)),
new EventData("EventData Sample 3".getBytes(UTF_8)));

// Send that event. This call returns a Mono<Void>, which we subscribe to. It completes successfully when the
// event has been delivered to the Event Hub. It completes with an error if an exception occurred while sending
// the event.
producer.send(data).subscribe(
(ignored) -> System.out.println("Events sent."),
error -> {
System.err.println("There was an error sending the event: " + error.toString());

if (error instanceof AmqpException) {
AmqpException amqpException = (AmqpException) error;
System.err.println(String.format("Is send operation retriable? %s. Error condition: %s",
amqpException.isTransient(), amqpException.getErrorCondition()));
}
}, () -> {
// Disposing of our producer and client.
try {
producer.close();
} catch (IOException e) {
System.err.println("Error encountered while closing producer: " + e.toString());
}

client.close();
});
}
}
Loading

0 comments on commit 9e97d3b

Please sign in to comment.