Skip to content

Commit

Permalink
Add send API for sending an iterable of events (#10528)
Browse files Browse the repository at this point in the history
* Add send API for sending an iterable of events

* Fix checkstyles

* Add code samples and update javadoc
  • Loading branch information
srnagar authored Apr 30, 2020
1 parent bf28c28 commit a6dd372
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,19 @@ Mono<Void> send(EventData event, SendOptions options) {
* maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
* size is the max amount allowed on the link.
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable}
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* </p>
*
* @param events Events to send to the service.
* @return A {@link Mono} that completes when all events are pushed to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
Mono<Void> send(Iterable<EventData> events) {
public Mono<Void> send(Iterable<EventData> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
Expand All @@ -314,11 +323,20 @@ Mono<Void> send(Iterable<EventData> events) {
* maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
* size is the max amount allowed on the link.
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable-SendOptions}
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* </p>
*
* @param events Events to send to the service.
* @param options The set of options to consider when sending this batch.
* @return A {@link Mono} that completes when all events are pushed to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
Mono<Void> send(Iterable<EventData> events, SendOptions options) {
public Mono<Void> send(Iterable<EventData> events, SendOptions options) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
} else if (options == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
Expand Down Expand Up @@ -176,15 +177,18 @@ void send(EventData event, SendOptions options) {
* maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
* size is the max amount allowed on the link.
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable}
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
* Limits</a>.
* </p>
*
* @param events Events to send to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
void send(Iterable<EventData> events) {
public void send(Iterable<EventData> events) {
producer.send(events).block();
}

Expand All @@ -193,6 +197,8 @@ void send(Iterable<EventData> events) {
* maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
* size is the max amount allowed on the link.
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable-SendOptions}
*
* <p>
* For more information regarding the maximum event size allowed, see
* <a href="https://docs.microsoft.com/azure/event-hubs/event-hubs-quotas">Azure Event Hubs Quotas and
Expand All @@ -201,8 +207,9 @@ void send(Iterable<EventData> events) {
*
* @param events Events to send to the service.
* @param options The set of options to consider when sending this batch.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
void send(Iterable<EventData> events, SendOptions options) {
public void send(Iterable<EventData> events, SendOptions options) {
producer.send(events, options).block();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
package com.azure.messaging.eventhubs;

import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.util.Arrays;
import java.util.List;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -154,4 +157,39 @@ public void batchSizeLimited() {
});
// END: com.azure.messaging.eventhubs.eventhubasyncproducerclient.createBatch#CreateBatchOptions-int
}

/**
* Code snippet to demonstrate how to send a list of events using
* {@link EventHubProducerAsyncClient#send(Iterable)}.
*/
public void sendIterableSample() {
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
// BEGIN: com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable
List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
new EventData("oak"));
producer
.send(events)
.subscribe(unused -> { },
error -> System.err.println("Error occurred while sending events:" + error),
() -> System.out.println("Send complete."));
// END: com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable
}

/**
* Code snippet to demonstrate how to send a list of events using
* {@link EventHubProducerAsyncClient#send(Iterable, SendOptions)}.
*/
public void sendIterableWithPartitionKeySample() {
final EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
// BEGIN: com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable-SendOptions
List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
new EventData("New York"));
SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer
.send(events, sendOptions)
.subscribe(unused -> { },
error -> System.err.println("Error occurred while sending events:" + error),
() -> System.out.println("Send complete."));
// END: com.azure.messaging.eventhubs.eventhubasyncproducerclient.send#Iterable-SendOptions
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.messaging.eventhubs.models.CreateBatchOptions;

import com.azure.messaging.eventhubs.models.SendOptions;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -145,4 +146,31 @@ public void batchSizeLimited() {
}
// END: com.azure.messaging.eventhubs.eventhubproducerclient.createBatch#CreateBatchOptions-int
}

/**
* Code snippet to demonstrate how to send a list of events using {@link EventHubProducerClient#send(Iterable)}.
*/
public void sendIterableSample() {
final EventHubProducerClient producer = builder.buildProducerClient();
// BEGIN: com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable
List<EventData> events = Arrays.asList(new EventData("maple"), new EventData("aspen"),
new EventData("oak"));
producer.send(events);
// END: com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable
}

/**
* Code snippet to demonstrate how to send a list of events using
* {@link EventHubProducerClient#send(Iterable, SendOptions)}.
*/
public void sendIterableWithPartitionKeySample() {
final EventHubProducerClient producer = builder.buildProducerClient();
// BEGIN: com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable-SendOptions
List<EventData> events = Arrays.asList(new EventData("Melbourne"), new EventData("London"),
new EventData("New York"));
SendOptions sendOptions = new SendOptions().setPartitionKey("cities");
producer.send(events, sendOptions);
// END: com.azure.messaging.eventhubs.eventhubproducerclient.send#Iterable-SendOptions
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.azure.core.amqp.AmqpRetryMode;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;

/**
* Sample demonstrates how to send an iterable of events to specific event hub partition by defining partition id using
* {@link SendOptions#setPartitionId(String)}.
*/
public class PublishIterableEvents {

/**
* Main method to invoke this demo about how to send an iterable of events with partition id configured.
*
* @param args Unused arguments to the program.
* @throws InterruptedException If the program was interrupted before completion.
*/
public static void main(String[] args) throws InterruptedException {
// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}";

// Set some custom retry options other than the default set.
AmqpRetryOptions retryOptions = new AmqpRetryOptions()
.setDelay(Duration.ofSeconds(30))
.setMaxRetries(2)
.setMode(AmqpRetryMode.EXPONENTIAL);

// Instantiate a client that will be used to call the service.
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(connectionString)
.retry(retryOptions)
.buildAsyncProducerClient();

// Create an iterable of events to send. Note that the events in iterable should
// fit in a single batch. If the events exceed the size of the batch, then send operation will fail.
final Iterable<EventData> events = Flux.range(0, 100).map(number -> {
final String contents = "event-data-" + number;
return new EventData(contents.getBytes(UTF_8));
}).toIterable();

// To send our events, we need to know what partition to send it to. For the sake of this example, we take the
// first partition id.
CountDownLatch countDownLatch = new CountDownLatch(1);
producer.getPartitionIds()
.take(1) // take the first partition id
.flatMap(partitionId -> producer.send(events, new SendOptions().setPartitionId(partitionId)))
.subscribe(unused -> { },
ex -> System.out.println("Failed to send events: " + ex.getMessage()),
() -> {
countDownLatch.countDown();
System.out.println("Sending events completed successfully");
});

// Wait for async operation to complete or timeout after 10 seconds.
try {
countDownLatch.await(10, TimeUnit.SECONDS);
} finally {
producer.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,32 @@ public void sendMessageRetrySpanTest() {
verifyZeroInteractions(onClientClosed);
}

/**
* Verifies that sending an iterable of events that exceeds batch size throws exception.
*/
@Test
public void sendEventsExceedsBatchSize() {
//Arrange
// EC is the prefix they use when creating a link that sends to the service round-robin.
when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), any()))
.thenReturn(Mono.just(sendLink));
when(sendLink.getLinkSize()).thenReturn(Mono.just(1024));
TracerProvider tracerProvider = new TracerProvider(Collections.emptyList());
final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
connectionProcessor, retryOptions, tracerProvider, messageSerializer, Schedulers.parallel(), false, onClientClosed);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());

//Act & Assert
final Iterable<EventData> tooManyEvents = Flux.range(0, 1024).map(number -> {
final String contents = "event-data-" + number;
return new EventData(contents.getBytes(UTF_8));
}).toIterable();

AmqpException amqpException = Assertions.assertThrows(AmqpException.class, () -> producer.send(tooManyEvents));
Assertions.assertTrue(amqpException.getMessage().startsWith("EventData does not fit into maximum number of "
+ "batches. '1'"));
}

/**
* Verifies we can send multiple messages.
*/
Expand Down

0 comments on commit a6dd372

Please sign in to comment.