Skip to content

Commit

Permalink
[Event Hubs] Check connection status of each partition periodically (A…
Browse files Browse the repository at this point in the history
…zure#15852)

* Check connection state periodically
  • Loading branch information
srnagar authored Oct 9, 2020
1 parent 97c9e9b commit 90b5173
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck"
files="com.azure.core.amqp.implementation.RetryUtil.java"/>

<!-- AMQP logger name set in lambda function to specify the function name -->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck"
files="com.azure.core.amqp.implementation.ReactorConnection.java"/>

<!-- The Identity Azure Platform End to tend Test.-->
<suppress checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck"
files="com.azure.endtoend.identity.IdentityTest.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@ private void close(T channel) {
}
}

/**
* Checks the current state of the channel for this channel and returns true if the channel is null or if this
* processor is disposed.
*
* @return true if the current channel in the processor is null or if the processor is disposed
*/
public boolean isChannelClosed() {
synchronized (lock) {
return currentChannel == null || isDisposed();
}
}

/**
* Represents a subscriber, waiting for an AMQP connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ protected Mono<RequestResponseChannel> createRequestResponseChannel(String sessi

return createChannel.subscribeWith(new AmqpChannelProcessor<>(connectionId, entityPath,
channel -> channel.getEndpointStates(), retryPolicy,
new ClientLogger(String.format("%s<%s>", RequestResponseChannel.class, sessionName))));
new ClientLogger(RequestResponseChannel.class)));
}

private boolean removeSession(String sessionName, ErrorCondition errorCondition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class RequestResponseChannel implements Disposable {
ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED);
private final FluxSink<AmqpEndpointState> endpointStatesSink =
endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
private final ClientLogger logger;
private final ClientLogger logger = new ClientLogger(RequestResponseChannel.class);

private final Sender sendLink;
private final Receiver receiveLink;
Expand All @@ -71,6 +71,7 @@ public class RequestResponseChannel implements Disposable {
private final Disposable.Composite subscriptions;
private final AmqpRetryPolicy retryPolicy;
private final SenderSettleMode senderSettleMode;
private final String linkName;

/**
* Creates a new instance of {@link RequestResponseChannel} to send and receive responses from the {@code
Expand All @@ -91,7 +92,7 @@ protected RequestResponseChannel(String connectionId, String fullyQualifiedNames
String entityPath, Session session, AmqpRetryOptions retryOptions, ReactorHandlerProvider handlerProvider,
ReactorProvider provider, MessageSerializer messageSerializer,
SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
this.logger = new ClientLogger(String.format("%s<%s>", RequestResponseChannel.class, linkName));
this.linkName = linkName;
this.provider = provider;
this.operationTimeout = retryOptions.getTryTimeout();
this.retryPolicy = RetryUtil.getRetryPolicy(retryOptions);
Expand Down Expand Up @@ -131,7 +132,7 @@ protected RequestResponseChannel(String connectionId, String fullyQualifiedNames
receiveLinkHandler.getDeliveredMessages()
.map(this::decodeDelivery)
.subscribe(message -> {
logger.verbose("Settling message: {}", message.getCorrelationId());
logger.verbose("{} - Settling message: {}", this.linkName, message.getCorrelationId());
settleMessage(message);
}),

Expand Down Expand Up @@ -229,7 +230,7 @@ public Mono<Message> sendWithAck(final Message message, DeliveryState deliverySt
.then(
Mono.create(sink -> {
try {
logger.verbose("Scheduling on dispatcher. Message Id {}", messageId);
logger.verbose("{} - Scheduling on dispatcher. Message Id {}", linkName, messageId);
unconfirmedSends.putIfAbsent(messageId, sink);

// If we try to do proton-j API calls such as sending on AMQP links, it may encounter a race
Expand All @@ -239,7 +240,7 @@ public Mono<Message> sendWithAck(final Message message, DeliveryState deliverySt
.replace("-", "").getBytes(UTF_8));

if (deliveryState != null) {
logger.verbose("Setting delivery state as [{}].", deliveryState);
logger.verbose("{} - Setting delivery state as [{}].", linkName, deliveryState);
delivery.setMessageFormat(DeliveryImpl.DEFAULT_MESSAGE_FORMAT);
delivery.disposition(deliveryState);
}
Expand Down Expand Up @@ -291,7 +292,7 @@ private void settleMessage(Message message) {

if (sink == null) {
int size = unconfirmedSends.size();
logger.warning("Received delivery without pending messageId[{}]. Size[{}]", id, size);
logger.warning("{} - Received delivery without pending messageId[{}]. Size[{}]", linkName, id, size);
return;
}

Expand All @@ -304,7 +305,8 @@ private void handleError(Throwable error) {
}

endpointStatesSink.error(error);
logger.error("Exception in RequestResponse links. Disposing and clearing unconfirmed sends.", error);
logger.error("{} - Exception in RequestResponse links. Disposing and clearing unconfirmed sends.", linkName,
error);
dispose();

unconfirmedSends.forEach((key, value) -> value.error(error));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,8 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName,
getEventHubName(), consumerGroup, partitionId, initialPosition,
receiveOptions.getTrackLastEnqueuedEventProperties(), scheduler);
}

boolean isConnectionClosed() {
return this.connectionProcessor.isChannelClosed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private void renewOwnership(Map<String, PartitionOwnership> partitionOwnershipMa
.getOwnerId().equals(this.ownerId))
.map(partitionId -> createPartitionOwnershipRequest(partitionOwnershipMap, partitionId))
.collect(Collectors.toList()))
.subscribe(ignored -> { },
.subscribe(partitionPumpManager::verifyPartitionConnection,
ex -> {
logger.error("Error renewing partition ownership", ex);
isLoadBalancerRunning.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.implementation.PartitionProcessorException;
Expand Down Expand Up @@ -120,6 +121,23 @@ void stopAllPartitionPumps() {
});
}

/**
* Checks the state of the connection for the given partition. If the connection is closed, then this method will
* remove the partition from the list of partition pumps.
*
* @param ownership The partition ownership information for which the connection state will be verified.
*/
void verifyPartitionConnection(PartitionOwnership ownership) {
if (partitionPumps.containsKey(ownership.getPartitionId())) {
EventHubConsumerAsyncClient consumerClient = partitionPumps.get(ownership.getPartitionId());
if (consumerClient.isConnectionClosed()) {
logger.info("Connection closed for {}, partition {}. Removing the consumer.",
ownership.getEventHubName(), ownership.getPartitionId());
partitionPumps.remove(ownership.getPartitionId());
}
}
}

/**
* Starts a new partition pump for the newly claimed partition. If the partition already has an active partition
* pump, this will not create a new consumer.
Expand Down Expand Up @@ -167,17 +185,25 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
partitionPumps.put(claimedOwnership.getPartitionId(), eventHubConsumer);
//@formatter:off
Flux<Flux<PartitionEvent>> partitionEventFlux;
Flux<PartitionEvent> receiver = eventHubConsumer
.receiveFromPartition(claimedOwnership.getPartitionId(), startFromEventPosition, receiveOptions)
.doOnNext(partitionEvent -> {
if (logger.canLogAtLevel(LogLevel.VERBOSE)) {
logger.verbose("On next {}, {}, {}",
partitionContext.getEventHubName(), partitionContext.getPartitionId(),
partitionEvent.getData().getSequenceNumber());
}
});

if (maxWaitTime != null) {
partitionEventFlux = eventHubConsumer
.receiveFromPartition(claimedOwnership.getPartitionId(), startFromEventPosition, receiveOptions)
partitionEventFlux = receiver
.windowTimeout(maxBatchSize, maxWaitTime);
} else {
partitionEventFlux = eventHubConsumer
.receiveFromPartition(claimedOwnership.getPartitionId(), startFromEventPosition, receiveOptions)
partitionEventFlux = receiver
.window(maxBatchSize);
}
partitionEventFlux
.flatMap(Flux::collectList)
.concatMap(Flux::collectList)
.publishOn(Schedulers.boundedElastic())
.subscribe(partitionEventBatch -> {
processEvents(partitionContext, partitionProcessor,
Expand Down Expand Up @@ -215,8 +241,16 @@ private void processEvent(PartitionContext partitionContext, PartitionProcessor
}
}
try {
if (logger.canLogAtLevel(LogLevel.VERBOSE)) {
logger.verbose("Processing event {}, {}", partitionContext.getEventHubName(),
partitionContext.getPartitionId());
}
partitionProcessor.processEvent(new EventContext(partitionContext, eventData, checkpointStore,
eventContext.getLastEnqueuedEventProperties()));
if (logger.canLogAtLevel(LogLevel.VERBOSE)) {
logger.verbose("Completed processing event {}, {}", partitionContext.getEventHubName(),
partitionContext.getPartitionId());
}
endProcessTracingSpan(processSpanContext, Signal.complete());
} catch (Throwable throwable) {
/* user code for event processing threw an exception - log and bubble up */
Expand All @@ -238,9 +272,16 @@ private void processEvents(PartitionContext partitionContext, PartitionProcessor
})
.collect(Collectors.toList());
EventBatchContext eventBatchContext = new EventBatchContext(partitionContext, eventDataList,
checkpointStore,
lastEnqueuedEventProperties[0]);
checkpointStore, lastEnqueuedEventProperties[0]);
if (logger.canLogAtLevel(LogLevel.VERBOSE)) {
logger.verbose("Processing event batch {}, {}", partitionContext.getEventHubName(),
partitionContext.getPartitionId());
}
partitionProcessor.processEventBatch(eventBatchContext);
if (logger.canLogAtLevel(LogLevel.VERBOSE)) {
logger.verbose("Completed processing event batch{}, {}", partitionContext.getEventHubName(),
partitionContext.getPartitionId());
}
} else {
EventData eventData = (partitionEventBatch.size() == 1
? partitionEventBatch.get(0).getData() : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,6 @@ private boolean checkAndSetTerminated() {
if (!isTerminated()) {
return false;
}

final CoreSubscriber<? super Message> subscriber = downstream.get();
final Throwable error = lastError;
if (error != null) {
Expand Down

0 comments on commit 90b5173

Please sign in to comment.