Skip to content

Commit

Permalink
Updates Event Hubs samples (#6986)
Browse files Browse the repository at this point in the history
* Update ConsumeEvents sample.

* Add producer samples for web sockets and proxy.

* Add sample for publishing streams of events.

* Rename EventProcessor -> EventProcessorClient.

* Adding EPH sample.

* Update READMEs. Remove dead links.
  • Loading branch information
conniey authored Dec 20, 2019
1 parent 40d92b8 commit 6f3e5d9
Show file tree
Hide file tree
Showing 12 changed files with 917 additions and 81 deletions.
10 changes: 0 additions & 10 deletions sdk/eventhubs/azure-messaging-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ Beyond those discussed, the Azure Event Hubs client library offers support for m
advantage of the full feature set of the Azure Event Hubs service. In order to help explore some of the these scenarios,
the following set of sample is available [here][samples_readme].


## Contributing

If you would like to become an active contributor to this project please refer to our [Contribution
Expand All @@ -439,15 +438,6 @@ Guidelines](./CONTRIBUTING.md) for more information.
[qpid_proton_j_apache]: http://qpid.apache.org/proton/
[samples_readme]: ./src/samples/README.md
[sample_examples]: ./src/samples/java/com/azure/messaging/eventhubs/
[sample_consume_event]: ./src/samples/java/com/azure/messaging/eventhubs/ConsumeEvents.java
[sample_consume_sequence_number]: ./src/samples/java/com/azure/messaging/eventhubs/ConsumeEventsFromKnownSequenceNumberPosition.java
[sample_event_processor]: ./src/samples/java/com/azure/messaging/eventhubs/EventProcessorSample.java
[sample_get_event_hubs_metadata]: ./src/samples/java/com/azure/messaging/eventhubs/GetEventHubMetadata.java
[sample_publish_custom_metadata]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithCustomMetadata.java
[sample_publish_identity]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithAzureIdentity.java
[sample_publish_partitionId]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEventsToSpecificPartition.java
[sample_publish_partitionKey]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithPartitionKey.java
[sample_publish_size_limited]: ./src/samples/java/com/azure/messaging/eventhubs/PublishEventsWithSizeLimitedBatches.java
[source_code]: ./
[AmqpException]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpException.java
[AmqpErrorCondition]: ../../core/azure-core-amqp/src/main/java/com/azure/core/amqp/exception/AmqpErrorCondition.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static EventPosition fromEnqueuedTime(Instant enqueuedDateTime) {
* @return An {@link EventPosition} object.
*/
public static EventPosition fromOffset(long offset) {
return fromOffset(offset, true);
return fromOffset(offset, false);
}

/**
Expand Down
23 changes: 21 additions & 2 deletions sdk/eventhubs/azure-messaging-eventhubs/src/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,32 @@ Key concepts are explained in detail [here][sdk_readme_key_concepts].
## Getting started
Please refer to the [Getting Started][sdk_readme_getting_started] section.

### Obtaining an Event Hub instance connection string

All the samples authorize with an Event Hub using a connection string generated for that Event Hub. The connection
string value can be obtained by:

1. Going to your Event Hubs namespace in Azure Portal.
1. Creating an Event Hub instance.
1. Creating a "Shared access policy" for your Event Hub instance.
1. Copying the connection string from the policy's properties.

## Examples

- [Inspect Event Hub and partition properties][sample_get_event_hubs_metadata]
- [Publish events using Microsoft identity platform][sample_publish_identity]
- [Publish events to a specific Event Hub partition with partition identifier][sample_publish_partitionId]
- [Publish events to a specific Event Hub partition with partition key][sample_publish_partitionKey]
- [Publish events to an Event Hub with a size-limited batch][sample_publish_size_limited]
- [Publish events using web sockets and a proxy][sample_publish_web_sockets_proxy]
- [Publish events with custom metadata][sample_publish_custom_metadata]
- [Publish stream of events][sample_publish_stream_events]
- [Consume events from an Event Hub partition][sample_consume_event]
- [Consume events starting from an event sequence number][sample_consume_sequence_number]
- [Consume events from all partitions using EventProcessorClient][sample_event_processor]
- [Consume events from all partitions and manage state of processed events][sample_event_processor_state_management]
- [Consume events from all partitions and manage state of events using
EventProcessorClient][sample_event_processor_aggregate_state_management]
- [Consume events starting from an event sequence number][sample_consume_sequence_number]

## Troubleshooting
See [Troubleshooting][sdk_readme_troubleshooting].
Expand All @@ -39,12 +54,16 @@ Guidelines](../../CONTRIBUTING.md) for more information.
[sdk_readme_next_steps]: ../../README.md#next-steps
[sample_consume_event]: ./java/com/azure/messaging/eventhubs/ConsumeEvents.java
[sample_consume_sequence_number]: ./java/com/azure/messaging/eventhubs/ConsumeEventsFromKnownSequenceNumberPosition.java
[sample_event_processor]: ./java/com/azure/messaging/eventhubs/EventProcessorSample.java
[sample_event_processor]: ./java/com/azure/messaging/eventhubs/EventProcessorClientSample.java
[sample_event_processor_aggregate_state_management]: ./java/com/azure/messaging/eventhubs/EventProcessorClientAggregateEventsSample.java
[sample_event_processor_state_management]: ./java/com/azure/messaging/eventhubs/EventProcessorClientStateManagement.java
[sample_get_event_hubs_metadata]: ./java/com/azure/messaging/eventhubs/GetEventHubMetadata.java
[sample_publish_custom_metadata]: ./java/com/azure/messaging/eventhubs/PublishEventsWithCustomMetadata.java
[sample_publish_identity]: ./java/com/azure/messaging/eventhubs/PublishEventsWithAzureIdentity.java
[sample_publish_partitionId]: ./java/com/azure/messaging/eventhubs/PublishEventsToSpecificPartition.java
[sample_publish_partitionKey]: ./java/com/azure/messaging/eventhubs/PublishEventsWithPartitionKey.java
[sample_publish_size_limited]: ./java/com/azure/messaging/eventhubs/PublishEventsWithSizeLimitedBatches.java
[sample_publish_stream_events]: ./java/com/azure/messaging/eventhubs/PublishStreamOfEvents.java
[sample_publish_web_sockets_proxy]: ./java/com/azure/messaging/eventhubs/PublishEventsWithWebSocketsAndProxy.java

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-java%2Fsdk%2Feventhubs%2Fazure-messaging-eventhubs%2Fsrc%2Fsamples%2README.png)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.messaging.eventhubs;

import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.SendOptions;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
Expand All @@ -24,8 +25,8 @@ public class ConsumeEvents {
* Main method to invoke this demo about how to receive events from an Azure Event Hub instance.
*
* @param args Unused arguments to the program.
* @throws InterruptedException The countdown latch was interrupted while waiting for this sample to
* complete.
*
* @throws InterruptedException The countdown latch was interrupted while waiting for this sample to complete.
*/
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_EVENTS);
Expand All @@ -35,8 +36,8 @@ public static void main(String[] args) throws InterruptedException {
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}";

String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};"
+ "SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}";
// Instantiate a client that will be used to call the service.
// Create a consumer.
// The "$Default" consumer group is created by default. This value can be found by going to the Event Hub
Expand All @@ -63,12 +64,25 @@ public static void main(String[] args) throws InterruptedException {
Disposable subscription = consumer.receiveFromPartition(firstPartition, EventPosition.latest())
.subscribe(partitionEvent -> {
EventData event = partitionEvent.getData();
PartitionContext partitionContext = partitionEvent.getPartitionContext();

String contents = new String(event.getBody(), UTF_8);
System.out.println(String.format("[%s] Sequence Number: %s. Contents: %s", countDownLatch.getCount(),
event.getSequenceNumber(), contents));
System.out.printf("[#%s] Partition id: %s. Sequence Number: %s. Contents: '%s'%n",
countDownLatch.getCount(), partitionContext.getPartitionId(), event.getSequenceNumber(),
contents);

countDownLatch.countDown();
});
},
error -> {
System.err.println("Error occurred while consuming events: " + error);

// Count down until 0, so the main thread does not keep waiting for events.
while (countDownLatch.getCount() > 0) {
countDownLatch.countDown();
}
}, () -> {
System.out.println("Finished reading events.");
});

EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(connectionString)
Expand Down
Loading

0 comments on commit 6f3e5d9

Please sign in to comment.