diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java index fa420ada2be8..f2a81b0ba5bd 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java @@ -298,10 +298,19 @@ Mono send(EventData event, SendOptions options) { * maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message * size is the max amount allowed on the link. * + * {@codesnippet com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable} + * + *

+ * For more information regarding the maximum event size allowed, see + * Azure Event Hubs Quotas and + * Limits. + *

+ * * @param events Events to send to the service. * @return A {@link Mono} that completes when all events are pushed to the service. + * @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch. */ - Mono send(Iterable events) { + public Mono send(Iterable events) { if (events == null) { return monoError(logger, new NullPointerException("'events' cannot be null.")); } @@ -314,11 +323,20 @@ Mono send(Iterable events) { * maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message * size is the max amount allowed on the link. * + * {@codesnippet com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable-SendOptions} + * + *

+ * For more information regarding the maximum event size allowed, see + * Azure Event Hubs Quotas and + * Limits. + *

+ * * @param events Events to send to the service. * @param options The set of options to consider when sending this batch. * @return A {@link Mono} that completes when all events are pushed to the service. + * @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch. */ - Mono send(Iterable events, SendOptions options) { + public Mono send(Iterable events, SendOptions options) { if (events == null) { return monoError(logger, new NullPointerException("'events' cannot be null.")); } else if (options == null) { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java index 6f52ec205350..700fd8d9c648 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java @@ -3,6 +3,7 @@ package com.azure.messaging.eventhubs; +import com.azure.core.amqp.exception.AmqpException; import com.azure.core.annotation.ReturnType; import com.azure.core.annotation.ServiceClient; import com.azure.core.annotation.ServiceMethod; @@ -176,6 +177,8 @@ void send(EventData event, SendOptions options) { * maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message * size is the max amount allowed on the link. * + * {@codesnippet com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable} + * *

* For more information regarding the maximum event size allowed, see * Azure Event Hubs Quotas and @@ -183,8 +186,9 @@ void send(EventData event, SendOptions options) { *

* * @param events Events to send to the service. + * @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch. */ - void send(Iterable events) { + public void send(Iterable events) { producer.send(events).block(); } @@ -193,6 +197,8 @@ void send(Iterable events) { * maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message * size is the max amount allowed on the link. * + * {@codesnippet com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable-SendOptions} + * *

* For more information regarding the maximum event size allowed, see * Azure Event Hubs Quotas and @@ -201,8 +207,9 @@ void send(Iterable events) { * * @param events Events to send to the service. * @param options The set of options to consider when sending this batch. + * @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch. */ - void send(Iterable events, SendOptions options) { + public void send(Iterable events, SendOptions options) { producer.send(events, options).block(); } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientJavaDocCodeSamples.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientJavaDocCodeSamples.java index ad5b804b1345..859c98119570 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientJavaDocCodeSamples.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientJavaDocCodeSamples.java @@ -4,6 +4,9 @@ package com.azure.messaging.eventhubs; import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import com.azure.messaging.eventhubs.models.SendOptions; +import java.util.Arrays; +import java.util.List; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -154,4 +157,39 @@ public void batchSizeLimited() { }); // END: com.azure.messaging.eventhubs.eventhubasyncproducerclient.createBatch#CreateBatchOptions-int } + + /** + * Code snippet to demonstrate how to send a list of events using + * {@link EventHubProducerAsyncClient#send(Iterable)}. + */ + public void sendIterableSample() { + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); + // BEGIN: com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable + List events = Arrays.asList(new EventData("maple"), new EventData("aspen"), + new EventData("oak")); + producer + .send(events) + .subscribe(unused -> { }, + error -> System.err.println("Error occurred while sending events:" + error), + () -> System.out.println("Send complete.")); + // END: com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable + } + + /** + * Code snippet to demonstrate how to send a list of events using + * {@link EventHubProducerAsyncClient#send(Iterable, SendOptions)}. + */ + public void sendIterableWithPartitionKeySample() { + final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient(); + // BEGIN: com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable-SendOptions + List events = Arrays.asList(new EventData("Melbourne"), new EventData("London"), + new EventData("New York")); + SendOptions sendOptions = new SendOptions().setPartitionKey("cities"); + producer + .send(events, sendOptions) + .subscribe(unused -> { }, + error -> System.err.println("Error occurred while sending events:" + error), + () -> System.out.println("Send complete.")); + // END: com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable-SendOptions + } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerClientJavaDocCodeSamples.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerClientJavaDocCodeSamples.java index 7c2250ae77cb..ff39b7f88947 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerClientJavaDocCodeSamples.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerClientJavaDocCodeSamples.java @@ -5,6 +5,7 @@ import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import com.azure.messaging.eventhubs.models.SendOptions; import java.util.Arrays; import java.util.List; @@ -145,4 +146,31 @@ public void batchSizeLimited() { } // END: com.azure.messaging.eventhubs.eventhubproducerclient.createBatch#CreateBatchOptions-int } + + /** + * Code snippet to demonstrate how to send a list of events using {@link EventHubProducerClient#send(Iterable)}. + */ + public void sendIterableSample() { + final EventHubProducerClient producer = builder.buildProducerClient(); + // BEGIN: com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable + List events = Arrays.asList(new EventData("maple"), new EventData("aspen"), + new EventData("oak")); + producer.send(events); + // END: com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable + } + + /** + * Code snippet to demonstrate how to send a list of events using + * {@link EventHubProducerClient#send(Iterable, SendOptions)}. + */ + public void sendIterableWithPartitionKeySample() { + final EventHubProducerClient producer = builder.buildProducerClient(); + // BEGIN: com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable-SendOptions + List events = Arrays.asList(new EventData("Melbourne"), new EventData("London"), + new EventData("New York")); + SendOptions sendOptions = new SendOptions().setPartitionKey("cities"); + producer.send(events, sendOptions); + // END: com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable-SendOptions + } + } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PublishIterableEvents.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PublishIterableEvents.java new file mode 100644 index 000000000000..756fc28df1e8 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PublishIterableEvents.java @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.azure.core.amqp.AmqpRetryMode; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.messaging.eventhubs.models.SendOptions; +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import reactor.core.publisher.Flux; + +/** + * Sample demonstrates how to send an iterable of events to specific event hub partition by defining partition id using + * {@link SendOptions#setPartitionId(String)}. + */ +public class PublishIterableEvents { + + /** + * Main method to invoke this demo about how to send an iterable of events with partition id configured. + * + * @param args Unused arguments to the program. + * @throws InterruptedException If the program was interrupted before completion. + */ + public static void main(String[] args) throws InterruptedException { + // The connection string value can be obtained by: + // 1. Going to your Event Hubs namespace in Azure Portal. + // 2. Creating an Event Hub instance. + // 3. Creating a "Shared access policy" for your Event Hub instance. + // 4. Copying the connection string from the policy's properties. + String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}"; + + // Set some custom retry options other than the default set. + AmqpRetryOptions retryOptions = new AmqpRetryOptions() + .setDelay(Duration.ofSeconds(30)) + .setMaxRetries(2) + .setMode(AmqpRetryMode.EXPONENTIAL); + + // Instantiate a client that will be used to call the service. + EventHubProducerAsyncClient producer = new EventHubClientBuilder() + .connectionString(connectionString) + .retry(retryOptions) + .buildAsyncProducerClient(); + + // Create an iterable of events to send. Note that the events in iterable should + // fit in a single batch. If the events exceed the size of the batch, then send operation will fail. + final Iterable events = Flux.range(0, 100).map(number -> { + final String contents = "event-data-" + number; + return new EventData(contents.getBytes(UTF_8)); + }).toIterable(); + + // To send our events, we need to know what partition to send it to. For the sake of this example, we take the + // first partition id. + CountDownLatch countDownLatch = new CountDownLatch(1); + producer.getPartitionIds() + .take(1) // take the first partition id + .flatMap(partitionId -> producer.send(events, new SendOptions().setPartitionId(partitionId))) + .subscribe(unused -> { }, + ex -> System.out.println("Failed to send events: " + ex.getMessage()), + () -> { + countDownLatch.countDown(); + System.out.println("Sending events completed successfully"); + }); + + // Wait for async operation to complete or timeout after 10 seconds. + try { + countDownLatch.await(10, TimeUnit.SECONDS); + } finally { + producer.close(); + } + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java index 5d0086fa3f3b..9e0455d89957 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerClientTest.java @@ -255,6 +255,32 @@ public void sendMessageRetrySpanTest() { verifyZeroInteractions(onClientClosed); } + /** + * Verifies that sending an iterable of events that exceeds batch size throws exception. + */ + @Test + public void sendEventsExceedsBatchSize() { + //Arrange + // EC is the prefix they use when creating a link that sends to the service round-robin. + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any())) + .thenReturn(Mono.just(sendLink)); + when(sendLink.getLinkSize()).thenReturn(Mono.just(1024)); + TracerProvider tracerProvider = new TracerProvider(Collections.emptyList()); + final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, + connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed); + final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout()); + + //Act & Assert + final Iterable tooManyEvents = Flux.range(0, 1024).map(number -> { + final String contents = "event-data-" + number; + return new EventData(contents.getBytes(UTF_8)); + }).toIterable(); + + AmqpException amqpException = Assertions.assertThrows(AmqpException.class, () -> producer.send(tooManyEvents)); + Assertions.assertTrue(amqpException.getMessage().startsWith("EventData does not fit into maximum number of " + + "batches. '1'")); + } + /** * Verifies we can send multiple messages. */