Skip to content

Commit

Permalink
Fix IO pipe stuck issue due to aggressive timer scheduling (#402)
Browse files Browse the repository at this point in the history
There is an issue in the logic where operation timeout timer is scheduled as part of a receive call and due to the bug operation timer can be scheduled multiple times although it is supposed to be set only once when there is no pending receive call. Over time as the operation timer keeps getting scheduled during receive API call and scheduled again by the reactor thread while the event is fired and being handled, the number of pending IO operations on the pipe gets incremented significantly and it can cause IO pipe stuck and blocks a write operation on the pipe, which in turn, blocks a receive API. There are two changes to address the issue: a) ensure that operation timer is scheduled at most two times to avoid excessive IO operations on the pipe and b) read all bytes from the channel when signaled so that there are no remaining bytes in the channel.
  • Loading branch information
sjkwak authored Dec 4, 2018
1 parent b95c07b commit 9a5a39d
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.security.InvalidKeyException;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/***
* The main class of event processor host.
Expand Down Expand Up @@ -277,7 +278,9 @@ public EventProcessorHost(
this.executorService = executorService;
} else {
this.weOwnExecutor = true;
this.executorService = Executors.newScheduledThreadPool(this.executorServicePoolSize);
this.executorService = Executors.newScheduledThreadPool(
this.executorServicePoolSize,
new EventProcessorHostThreadPoolFactory(hostName, effectiveEventHubPath, consumerGroupName));
}

this.hostContext = new HostContext(this.executorService,
Expand Down Expand Up @@ -490,35 +493,78 @@ public CompletableFuture<Void> registerEventProcessorFactory(IEventProcessorFact
*/
public CompletableFuture<Void> unregisterEventProcessor() {
TRACE_LOGGER.info(this.hostContext.withHost("Stopping event processing"));

if (this.unregistered == null) {
// PartitionManager is created in constructor. If this object exists, then
// this.partitionManager is not null.
this.unregistered = this.partitionManager.stopPartitions();
// If we own the executor, stop it also.
// Owned executor is also created in constructor.
if (this.weOwnExecutor) {
this.unregistered = this.unregistered.thenRunAsync(() ->
{
// IMPORTANT: run this last stage in the default threadpool!
// If a task running in a threadpool waits for that threadpool to terminate, it's going to wait a long time...
// It is OK to call shutdown() here even if threads are still running.
// Shutdown() causes the executor to stop accepting new tasks, but existing tasks will
// run to completion. The pool will terminate when all existing tasks finish.
// By this point all new tasks generated by the shutdown have been submitted.
this.executorService.shutdown();
try {
this.executorService.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new CompletionException(e);
}
}, ForkJoinPool.commonPool());
}
// PartitionManager is created in constructor. If this object exists, then
// this.partitionManager is not null.
this.unregistered = this.partitionManager.stopPartitions();

// If we own the executor, stop it also.
// Owned executor is also created in constructor.
if (this.weOwnExecutor) {
this.unregistered = this.unregistered.thenRunAsync(() ->
{
// IMPORTANT: run this last stage in the default threadpool!
// If a task running in a threadpool waits for that threadpool to terminate, it's going to wait a long time...

// It is OK to call shutdown() here even if threads are still running.
// Shutdown() causes the executor to stop accepting new tasks, but existing tasks will
// run to completion. The pool will terminate when all existing tasks finish.
// By this point all new tasks generated by the shutdown have been submitted.
this.executorService.shutdown();

try {
this.executorService.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new CompletionException(e);
}
}, ForkJoinPool.commonPool());
}
}

return this.unregistered;
}

static class EventProcessorHostThreadPoolFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final String namePrefix;
private final String hostName;
private final String entityName;
private final String consumerGroupName;

public EventProcessorHostThreadPoolFactory(
String hostName,
String entityName,
String consumerGroupName) {
this.hostName = hostName;
this.entityName = entityName;
this.consumerGroupName = consumerGroupName;
this.namePrefix = this.getNamePrefix();
SecurityManager s = System.getSecurityManager();
this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(this.group, r, this.namePrefix + this.threadNumber.getAndIncrement(), 0);
t.setDaemon(false);
t.setPriority(Thread.NORM_PRIORITY);
t.setUncaughtExceptionHandler(new ThreadUncaughtExceptionHandler());
return t;
}

private String getNamePrefix() {
return String.format("[%s|%s|%s]-%s-",
this.entityName, this.consumerGroupName, this.hostName, poolNumber.getAndIncrement());
}

static class ThreadUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
TRACE_LOGGER.warn("Uncaught exception occurred. Thread " + t.getName(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ private CompletableFuture<?> buildRetries(CompletableFuture<?> buildOnto, Callab
// Return Void so it can be called from a lambda.
// throwOnFailure is true
private Void scan(boolean isFirst) {
TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
TRACE_LOGGER.info(this.hostContext.withHost("Starting lease scan"));
long start = System.currentTimeMillis();

(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected void scheduleLeaseRenewer() {
if (!getIsClosingOrClosed()) {
int seconds = this.hostContext.getPartitionManagerOptions().getLeaseRenewIntervalInSeconds();
this.leaseRenewerFuture = this.hostContext.getExecutor().schedule(() -> leaseRenewer(), seconds, TimeUnit.SECONDS);
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(this.lease, "scheduling leaseRenewer in " + seconds));
TRACE_LOGGER.info(this.hostContext.withHostAndPartition(this.lease, "scheduling leaseRenewer in " + seconds));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,6 @@
import java.util.function.Consumer;
import java.util.function.Function;

import com.microsoft.azure.eventhubs.BatchOptions;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventDataBatch;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import com.microsoft.azure.eventhubs.RetryPolicy;

public final class EventHubClientImpl extends ClientEntity implements EventHubClient {

/**
Expand Down Expand Up @@ -399,8 +385,7 @@ public void run() {
new OperationCancelledException(
"OperationCancelled as the underlying client instance was closed.",
lastException));
}
else {
} else {
final Duration waitTime = ManagementRetry.this.mf.getRetryPolicy().getNextRetryInterval(
ManagementRetry.this.mf.getClientId(), lastException, this.timeoutTracker.remaining());
if (waitTime == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand All @@ -40,7 +42,10 @@
public final class MessageReceiver extends ClientEntity implements AmqpReceiver, ErrorContextProvider {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessageReceiver.class);
private static final int MIN_TIMEOUT_DURATION_MILLIS = 20;

private static final int MAX_OPERATION_TIMEOUT_SCHEDULED = 2;
// TestHooks for code injection
private static volatile Consumer<MessageReceiver> onOpenRetry = null;
private final AtomicInteger operationTimeoutScheduled = new AtomicInteger(0);
private final ConcurrentLinkedQueue<ReceiveWorkItem> pendingReceives;
private final MessagingFactory underlyingFactory;
private final String receivePath;
Expand All @@ -57,21 +62,16 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
private final CreateAndReceive createAndReceive;
private final Object errorConditionLock;
private final Timer timer;

private volatile int nextCreditToFlow;
private volatile Receiver receiveLink;
private volatile Duration receiveTimeout;
private volatile Message lastReceivedMessage;
private volatile boolean creatingLink;
private volatile CompletableFuture<?> openTimer;
private volatile CompletableFuture<?> closeTimer;

private int prefetchCount;
private Exception lastKnownLinkError;

// TestHooks for code injection
private static volatile Consumer<MessageReceiver> onOpenRetry = null;

private MessageReceiver(final MessagingFactory factory,
final String name,
final String recvPath,
Expand All @@ -98,16 +98,34 @@ private MessageReceiver(final MessagingFactory factory,
// onOperationTimeout delegate - per receive call
this.onOperationTimedout = new Runnable() {
public void run() {
MessageReceiver.this.operationTimeoutTimerFired();

WorkItem<Collection<Message>> topWorkItem = null;
while ((topWorkItem = MessageReceiver.this.pendingReceives.peek()) != null) {
if (topWorkItem.getTimeoutTracker().remaining().toMillis() <= MessageReceiver.MIN_TIMEOUT_DURATION_MILLIS) {
WorkItem<Collection<Message>> dequedWorkItem = MessageReceiver.this.pendingReceives.poll();
if (dequedWorkItem != null && dequedWorkItem.getWork() != null && !dequedWorkItem.getWork().isDone()) {
dequedWorkItem.getWork().complete(null);
} else
} else {
break;
}
} else {
MessageReceiver.this.scheduleOperationTimer(topWorkItem.getTimeoutTracker());
if (MessageReceiver.this.shouldScheduleOperationTimeoutTimer()) {
TimeoutTracker timeoutTracker = topWorkItem.getTimeoutTracker();

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US,
"path[%s], linkName[%s] - Reschedule operation timer, current: [%s], remaining: [%s] secs",
receivePath,
receiveLink.getName(),
Instant.now(),
timeoutTracker.remaining().getSeconds()));
}

MessageReceiver.this.scheduleOperationTimer(timeoutTracker);
}

break;
}
}
Expand Down Expand Up @@ -261,7 +279,17 @@ public CompletableFuture<Collection<Message>> receive(final int maxMessageCount)
return onReceive;
}

if (this.pendingReceives.isEmpty()) {
if (this.shouldScheduleOperationTimeoutTimer()) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(
String.format(Locale.US,
"path[%s], linkName[%s] - schedule operation timer, current: [%s], remaining: [%s] secs",
receivePath,
receiveLink.getName(),
Instant.now(),
this.receiveTimeout.getSeconds()));
}

timer.schedule(this.onOperationTimedout, this.receiveTimeout);
}

Expand Down Expand Up @@ -421,7 +449,7 @@ public void onEvent() {
}
}

if (nextRetryInterval == null || !recreateScheduled){
if (nextRetryInterval == null || !recreateScheduled) {
this.drainPendingReceives(completionException);
}
}
Expand Down Expand Up @@ -653,6 +681,19 @@ public void run() {
}, this.executor);
}

private boolean shouldScheduleOperationTimeoutTimer() {
boolean scheduleTimer = this.operationTimeoutScheduled.getAndIncrement() < MAX_OPERATION_TIMEOUT_SCHEDULED;
if (!scheduleTimer) {
this.operationTimeoutScheduled.decrementAndGet();
}

return scheduleTimer;
}

private void operationTimeoutTimerFired() {
MessageReceiver.this.operationTimeoutScheduled.decrementAndGet();
}

@Override
public void onClose(ErrorCondition condition) {
final Exception completionException = (condition != null && condition.getCondition() != null) ? ExceptionUtil.toException(condition) : null;
Expand Down Expand Up @@ -732,9 +773,7 @@ public void onEvent() {

ReceiveWorkItem pendingReceive;
while (!prefetchedMessages.isEmpty() && (pendingReceive = pendingReceives.poll()) != null) {

if (pendingReceive.getWork() != null && !pendingReceive.getWork().isDone()) {

Collection<Message> receivedMessages = receiveCore(pendingReceive.maxMessageCount);
pendingReceive.getWork().complete(receivedMessages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selectable.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -28,6 +30,7 @@
* Each {@link ReactorDispatcher} should be initialized Synchronously - as it calls API in {@link Reactor} which is not thread-safe.
*/
public final class ReactorDispatcher {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ReactorDispatcher.class);
private final Reactor reactor;
private final Pipe ioSignal;
private final ConcurrentLinkedQueue<BaseHandler> workQueue;
Expand Down Expand Up @@ -87,6 +90,7 @@ private void signalWorkQueue() throws IOException {
while (this.ioSignal.sink().write(ByteBuffer.allocate(1)) == 0) {
}
} catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) {
TRACE_LOGGER.info("signalWorkQueue failed with an error", ignorePipeClosedDuringReactorShutdown);
}
}

Expand All @@ -111,9 +115,13 @@ private final class ScheduleHandler implements Callback {
@Override
public void run(Selectable selectable) {
try {
ioSignal.source().read(ByteBuffer.allocate(1024));
while (ioSignal.source().read(ByteBuffer.allocate(1024)) > 0) {
// read until the end of the stream
}
} catch (ClosedChannelException ignorePipeClosedDuringReactorShutdown) {
TRACE_LOGGER.info("ScheduleHandler.run() failed with an error", ignorePipeClosedDuringReactorShutdown);
} catch (IOException ioException) {
TRACE_LOGGER.info("ScheduleHandler.run() failed with an error", ioException);
throw new RuntimeException(ioException);
}

Expand All @@ -129,21 +137,24 @@ private final class CloseHandler implements Callback {
public void run(Selectable selectable) {
try {
selectable.getChannel().close();
} catch (IOException ignore) {
} catch (IOException ioException) {
TRACE_LOGGER.info("CloseHandler.run() failed with an error", ioException);
}

try {
if (ioSignal.sink().isOpen())
ioSignal.sink().close();
} catch (IOException ignore) {
} catch (IOException ioException) {
TRACE_LOGGER.info("CloseHandler.run() failed with an error", ioException);
}

workScheduler.run(null);

try {
if (ioSignal.source().isOpen())
ioSignal.source().close();
} catch (IOException ignore) {
} catch (IOException ioException) {
TRACE_LOGGER.info("CloseHandler.run() failed with an error", ioException);
}
}
}
Expand Down

0 comments on commit 9a5a39d

Please sign in to comment.