Skip to content

Commit

Permalink
Retry transient send errors that occur on Event Hubs Producer (#7306)
Browse files Browse the repository at this point in the history
* Retry producer operation on transient failure.

* Update integration test.

* Adding checkstyle suppression for RetryUtil
  • Loading branch information
conniey authored Jan 10, 2020
1 parent 95d3927 commit f6a240a
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@

<!-- ClientLogger class suppression -->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" files="ClientLogger.java"/>

<!-- Use the logger in Utility static method. -->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" files="com.azure.storage.common.Utility.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" files="com.azure.storage.common.implementation.StorageImplUtils.java"/>
Expand All @@ -197,6 +198,9 @@
<!-- Requires static access to logger to report errors while loading i18n messages (from within a static initializer )-->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" files="Messages.java"/>

<!-- Report AMQP retry attempts in static withRetry method. -->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" files="com.azure.core.amqp.implementation.RetryUtil.java"/>

<!-- Event Hubs uses AMQP, which does not contain an HTTP response. Returning PagedResponse and Response does not apply. -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.messaging.eventhubs.EventHubClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.messaging.eventhubs.EventHubAsyncClient.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.azure.core.amqp.FixedAmqpRetryPolicy;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.logging.ClientLogger;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -19,6 +21,8 @@
* Helper class to help with retry policies.
*/
public class RetryUtil {
private static final ClientLogger LOGGER = new ClientLogger(RetryUtil.class);

// So this class can't be instantiated.
private RetryUtil() {
}
Expand Down Expand Up @@ -69,12 +73,22 @@ public static <T> Mono<T> withRetry(Mono<T> source, Duration operationTimeout, A
private static Flux<Long> retry(Flux<Throwable> source, AmqpRetryPolicy retryPolicy) {
return source.zipWith(Flux.range(1, retryPolicy.getMaxRetries() + 1),
(error, attempt) -> {
if (!(error instanceof TimeoutException) || attempt > retryPolicy.getMaxRetries()) {
if (attempt > retryPolicy.getMaxRetries()) {
LOGGER.warning("Retry attempts are exhausted. Current: {}. Max: {}.", retryPolicy.getMaxRetries(),
attempt);

throw Exceptions.propagate(error);
}

if (!(error instanceof TimeoutException)
&& (error instanceof AmqpException && !(((AmqpException) error).isTransient()))) {
LOGGER.warning("Error is not a TimeoutException nor is it a retryable AMQP exception.", error);

throw Exceptions.propagate(error);
}

//TODO (conniey): is it possible to add a logger here even though it is static? :/
return retryPolicy.calculateRetryDelay((TimeoutException) error, attempt);
LOGGER.info("Retryable error occurred. Retrying operation. Attempt: {}", attempt, error);
return retryPolicy.calculateRetryDelay(error, attempt);
})
.flatMap(Mono::delay);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void setup() throws IOException {

final ReactorHandlerProvider reactorHandlerProvider = new MockReactorHandlerProvider(reactorProvider, connectionHandler, sessionHandler, null, null);

final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setTryTimeout(TEST_DURATION);
final AmqpRetryOptions retryOptions = new AmqpRetryOptions().setMaxRetries(0).setTryTimeout(TEST_DURATION);
final ConnectionOptions connectionOptions = new ConnectionOptions(CREDENTIAL_INFO.getEndpoint().getHost(),
CREDENTIAL_INFO.getEntityPath(), tokenProvider, CbsAuthorizationType.SHARED_ACCESS_SIGNATURE,
AmqpTransportType.AMQP, retryOptions, ProxyOptions.SYSTEM_DEFAULTS, SCHEDULER);
Expand Down Expand Up @@ -282,7 +282,7 @@ public void createCBSNodeTimeoutException() {

Duration timeout = Duration.ofSeconds(2);
AmqpRetryOptions retryOptions = new AmqpRetryOptions()
.setMaxRetries(2)
.setMaxRetries(1)
.setDelay(Duration.ofMillis(200))
.setMode(AmqpRetryMode.FIXED)
.setTryTimeout(timeout);
Expand Down Expand Up @@ -325,7 +325,7 @@ public void cannotCreateResourcesOnFailure() {
AmqpException amqpException = (AmqpException) e;
Assertions.assertEquals(condition, amqpException.getErrorCondition());
})
.verify(Duration.ofSeconds(10));
.verify(Duration.ofSeconds(30));

verify(transport, times(1)).unbind();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpConstants;
Expand Down Expand Up @@ -44,6 +45,8 @@
import java.util.function.Supplier;
import java.util.stream.Collector;

import static com.azure.core.amqp.implementation.RetryUtil.getRetryPolicy;
import static com.azure.core.amqp.implementation.RetryUtil.withRetry;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.tracing.Tracer.ENTITY_PATH_KEY;
import static com.azure.core.util.tracing.Tracer.HOST_NAME_KEY;
Expand Down Expand Up @@ -101,6 +104,7 @@ public class EventHubProducerAsyncClient implements Closeable {
private final String eventHubName;
private final EventHubConnectionProcessor connectionProcessor;
private final AmqpRetryOptions retryOptions;
private final AmqpRetryPolicy retryPolicy;
private final TracerProvider tracerProvider;
private final MessageSerializer messageSerializer;
private final boolean isSharedConnection;
Expand All @@ -121,6 +125,8 @@ public class EventHubProducerAsyncClient implements Closeable {
this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' cannot be null.");
this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");

this.retryPolicy = getRetryPolicy(retryOptions);
this.isSharedConnection = isSharedConnection;
}

Expand Down Expand Up @@ -386,7 +392,7 @@ public Mono<Void> send(EventDataBatch batch) {
: null;

Context sharedContext = null;
List<Message> messages = new ArrayList<>();
final List<Message> messages = new ArrayList<>();

for (int i = 0; i < batch.getEvents().size(); i++) {
final EventData event = batch.getEvents().get(i);
Expand All @@ -409,24 +415,26 @@ public Mono<Void> send(EventDataBatch batch) {
messages.add(message);
}

Context finalSharedContext = sharedContext;
return getSendLink(batch.getPartitionId())
.flatMap(link -> {
final Context finalSharedContext = sharedContext != null ? sharedContext : Context.NONE;

return withRetry(
getSendLink(batch.getPartitionId()).flatMap(link -> {
if (isTracingEnabled) {
Context entityContext = finalSharedContext.addData(ENTITY_PATH_KEY, link.getEntityPath());
// start send span and store updated context
// Start send span and store updated context
parentContext.set(tracerProvider.startSpan(
entityContext.addData(HOST_NAME_KEY, link.getHostname()), ProcessKind.SEND));
}
return messages.size() == 1
? link.send(messages.get(0))
: link.send(messages);

}).doOnEach(signal -> {
})
.doOnEach(signal -> {
if (isTracingEnabled) {
tracerProvider.endSpan(parentContext.get(), signal);
}
});
}), retryOptions.getTryTimeout(), retryPolicy);
}

private Mono<Void> sendInternal(Flux<EventData> events, SendOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void worksAfterReconnection() throws InterruptedException {
}))
.onErrorContinue(error -> error instanceof AmqpException && ((AmqpException) error).isTransient(),
(error, value) -> {
System.out.println("Exception dropped: " + error.getMessage());
System.out.println("Retries were exhausted. No logger retrying operation. " + error.getMessage());
})
.subscribe(instant -> {
System.out.println("Sent batch at: " + instant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import com.azure.messaging.eventhubs.models.SendOptions;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -103,6 +105,16 @@ class EventHubProducerAsyncClientTest {
private TracerProvider tracerProvider;
private ConnectionOptions connectionOptions;

@BeforeAll
static void beforeAll() {
StepVerifier.setDefaultTimeout(Duration.ofSeconds(30));
}

@AfterAll
static void afterAll() {
StepVerifier.resetDefaultTimeout();
}

@BeforeEach
void setup() {
MockitoAnnotations.initMocks(this);
Expand Down Expand Up @@ -907,6 +919,81 @@ void closesOnNonTransientFailure() {
verifyZeroInteractions(sendLink3);
}

/**
* Verifies that we can resend a message when a transient error occurs.
*/
@Test
void resendMessageOnTransientLinkFailure() {
// Arrange
when(connection.getEndpointStates()).thenReturn(endpointProcessor);
endpointSink.next(AmqpEndpointState.ACTIVE);

EventHubAmqpConnection[] connections = new EventHubAmqpConnection[]{connection, connection2 };
connectionProcessor = Flux.<EventHubAmqpConnection>create(sink -> {
final AtomicInteger count = new AtomicInteger();
sink.onRequest(request -> {
for (int i = 0; i < request; i++) {
final int current = count.getAndIncrement();
final int index = current % connections.length;
sink.next(connections[index]);
}
});
}).subscribeWith(
new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
connectionOptions.getEntityPath(), connectionOptions.getRetry()));
producer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions,
tracerProvider, messageSerializer, false);

final int count = 4;
final byte[] contents = TEST_CONTENTS.getBytes(UTF_8);
final Flux<EventData> testData = Flux.range(0, count).flatMap(number -> {
final EventData data = new EventData(contents);
return Flux.just(data);
});

final String failureKey = "fail";
final EventData testData2 = new EventData("test");
testData2.getProperties().put(failureKey, "true");

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(connection.createSendLink(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME), eq(retryOptions)))
.thenReturn(Mono.just(sendLink));
when(sendLink.send(anyList())).thenReturn(Mono.empty());

// Send a transient error, and close the original link, if we get a message that contains the "failureKey".
// This simulates when a link is closed.
when(sendLink.send(argThat((Message message) -> {
return message.getApplicationProperties().getValue().containsKey(failureKey);
}))).thenAnswer(mock -> {
final Throwable error = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-message",
new AmqpErrorContext("test-namespace"));

endpointSink.error(error);
return Mono.error(error);
});

final DirectProcessor<AmqpEndpointState> connectionState2 = DirectProcessor.create();
when(connection2.getEndpointStates()).thenReturn(connectionState2);
when(connection2.createSendLink(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME), eq(retryOptions)))
.thenReturn(Mono.just(sendLink2));
when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty());

// Act
StepVerifier.create(producer.send(testData))
.verifyComplete();

StepVerifier.create(producer.send(testData2))
.verifyComplete();

// Assert
verify(sendLink).send(messagesCaptor.capture());
final List<Message> messagesSent = messagesCaptor.getValue();
Assertions.assertEquals(count, messagesSent.size());

verify(sendLink2, times(1)).send(any(Message.class));
verifyZeroInteractions(sendLink3);
}

private static final String TEST_CONTENTS = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec vehicula posuere lobortis. Aliquam finibus volutpat dolor, faucibus pellentesque ipsum bibendum vitae. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Ut sit amet urna hendrerit, dapibus justo a, sodales justo. Mauris finibus augue id pulvinar congue. Nam maximus luctus ipsum, at commodo ligula euismod ac. Phasellus vitae lacus sit amet diam porta placerat. \n"
+ "Ut sodales efficitur sapien ut posuere. Morbi sed tellus est. Proin eu erat purus. Proin massa nunc, condimentum id iaculis dignissim, consectetur et odio. Cras suscipit sem eu libero aliquam tincidunt. Nullam ut arcu suscipit, eleifend velit in, cursus libero. Ut eleifend facilisis odio sit amet feugiat. Phasellus at nunc sit amet elit sagittis commodo ac in nisi. Fusce vitae aliquam quam. Integer vel nibh euismod, tempus elit vitae, pharetra est. Duis vulputate enim a elementum dignissim. Morbi dictum enim id elit scelerisque, in elementum nulla pharetra. \n"
+ "Aenean aliquet aliquet condimentum. Proin dapibus dui id libero tempus feugiat. Sed commodo ligula a lectus mattis, vitae tincidunt velit auctor. Fusce quis semper dui. Phasellus eu efficitur sem. Ut non sem sit amet enim condimentum venenatis id dictum massa. Nullam sagittis lacus a neque sodales, et ultrices arcu mattis. Aliquam erat volutpat. \n"
Expand Down

0 comments on commit f6a240a

Please sign in to comment.