Skip to content

Commit

Permalink
Adding additional logging to ReactorDispatcher and ReactorExecutor. A…
Browse files Browse the repository at this point in the history
…dding closing logic (#24457)

* Closing ReactorExecutor if it has never been run.

* Adding documentation to ReactorDispatcher.

* Updating ReactorExecutor to schedule close work when reactor has not started or scheduler is closed.

* Adding tests.

* In method invocations, adding catch for RejectedExecutionException in the case that the scheduler is disposed.

* Adding assertion for ReactorExecutorTest that an onError is also called.

* Adding documentatioln to reactor connection and timeout to closing execturo.

* Splitting try/catch conditions.

* Add documentation to RequestResponseChannel.

* Using testPublisher for AmqpChannelProcessorTest. Using Flux.never().
  • Loading branch information
conniey authored Oct 1, 2021
1 parent 8ed75c9 commit da926ad
Show file tree
Hide file tree
Showing 11 changed files with 499 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,16 @@ Mono<Void> closeAsync(AmqpShutdownSignal shutdownSignal) {
if (dispatcher != null) {
try {
dispatcher.invoke(() -> closeConnectionWork());
} catch (IOException | RejectedExecutionException e) {
logger.warning("connectionId[{}] Error while scheduling closeConnection work. Manually disposing.",
connectionId, e);
} catch (IOException e) {
logger.warning("connectionId[{}] IOException while scheduling closeConnection work. Manually "
+ "disposing.", connectionId, e);

closeConnectionWork();
} catch (RejectedExecutionException e) {
// Not logging error here again because we have to log the exception when we throw it.
logger.info("connectionId[{}] Could not schedule closeConnection work. Manually disposing.",
connectionId);

closeConnectionWork();
}
} else {
Expand Down Expand Up @@ -479,18 +486,20 @@ private synchronized void closeConnectionWork() {
final ArrayList<Mono<Void>> closingSessions = new ArrayList<>();
sessionMap.values().forEach(link -> closingSessions.add(link.isClosed()));

// We shouldn't need to add a timeout to this operation because executorCloseMono schedules its last
// remaining work after OperationTimeout has elapsed and closes afterwards.
final Mono<Void> closedExecutor = executor != null ? Mono.defer(() -> {
synchronized (this) {
logger.info("connectionId[{}] Closing executor.", connectionId);
return executor.closeAsync();
}
}) : Mono.empty();

// Close all the children.
final Mono<Void> closeSessionsMono = Mono.when(closingSessions)
// Close all the children and the ReactorExecutor.
final Mono<Void> closeSessionAndExecutorMono = Mono.when(closingSessions)
.timeout(operationTimeout)
.onErrorResume(error -> {
logger.warning("connectionId[{}]: Timed out waiting for all sessions to close.", connectionId);
logger.info("connectionId[{}]: Timed out waiting for all sessions to close.", connectionId);
return Mono.empty();
})
.then(closedExecutor)
Expand All @@ -504,7 +513,7 @@ private synchronized void closeConnectionWork() {
subscriptions.dispose();
}));

subscriptions.add(closeSessionsMono.subscribe());
subscriptions.add(closeSessionAndExecutorMono.subscribe());
}

private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() {
Expand Down Expand Up @@ -551,6 +560,9 @@ private synchronized Connection getOrCreateConnection() throws IOException {
return executor.closeAsync();
}
});

// We shouldn't need to add a timeout to this operation because executorCloseMono schedules its last
// remaining work after OperationTimeout has elapsed and closes afterwards.
reactorProvider.getReactorDispatcher().getShutdownSignal()
.flatMap(signal -> {
logger.info("Shutdown signal received from reactor provider.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,32 @@ public Mono<AmqpShutdownSignal> getShutdownSignal() {
return shutdownSignal.asMono();
}

/**
* Schedules work to be performed on the underlying proton-j reactor.
*
* @param work Work to be run on the underlying proton-j reactor.
*
* @throws IOException If the underlying IO pipe cannot be signalled saying that there is more work to be done.
* @throws RejectedExecutionException if the reactor instance has already been closed or the underlying IO Pipe
* has been closed.
*/
public void invoke(final Runnable work) throws IOException {
this.throwIfSchedulerError();

this.workQueue.offer(new Work(work));
this.signalWorkQueue();
}

/**
* Schedules work to be performed on the proton-j reactor after {@code delay}.
*
* @param work Work to be run on the underlying proton-j reactor.
* @param delay Delay before work should scheduled for execution.
*
* @throws IOException If the underlying IO pipe cannot be signalled saying that there is more work to be done.
* @throws RejectedExecutionException if the reactor instance has already been closed or the underlying IO Pipe
* has been closed.
*/
public void invoke(final Runnable work, final Duration delay) throws IOException {
this.throwIfSchedulerError();

Expand All @@ -106,14 +125,16 @@ private void throwIfSchedulerError() {
final RejectedExecutionException rejectedException = this.reactor.attachments()
.get(RejectedExecutionException.class, RejectedExecutionException.class);
if (rejectedException != null) {
throw logger.logExceptionAsError(new RejectedExecutionException(rejectedException.getMessage(),
rejectedException));
throw logger.logExceptionAsWarning(new RejectedExecutionException(
"Underlying Reactor was already disposed. Should not continue dispatching work to this. "
+ rejectedException.getMessage(), rejectedException));
}

// throw when the pipe is in closed state - in which case,
// signalling the new event-dispatch will fail
if (!this.ioSignal.sink().isOpen()) {
throw logger.logExceptionAsError(new RejectedExecutionException("ReactorDispatcher instance is closed."));
throw logger.logExceptionAsWarning(new RejectedExecutionException(
"ReactorDispatcher instance is closed. Should not continue dispatching work to this reactor."));
}
}

Expand Down Expand Up @@ -203,7 +224,7 @@ public void run(Selectable selectable) {
logger.info("connectionId[{}] Reactor selectable is being disposed.", connectionId);

shutdownSignal.emitValue(new AmqpShutdownSignal(false, false,
String.format("connectionId[%s] Reactor selectable is disposed.", connectionId)),
String.format("connectionId[%s] Reactor selectable is disposed.", connectionId)),
Sinks.EmitFailureHandler.FAIL_FAST);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class ReactorExecutor implements AsyncCloseable {
* #closeAsync()} is called.
*/
void start() {
if (isDisposed.get()) {
logger.warning("Cannot start reactor when executor has been disposed.");
return;
}

if (hasStarted.getAndSet(true)) {
logger.warning("ReactorExecutor has already started.");
return;
Expand Down Expand Up @@ -139,7 +144,7 @@ private void run() {
+ "process.";

logger.info(LOG_MESSAGE, connectionId, reason);
close(reason);
close(reason, true);
}
}
}
Expand All @@ -149,7 +154,7 @@ private void run() {
* Schedules the release of the current reactor after operation timeout has elapsed.
*/
private void scheduleCompletePendingTasks() {
this.scheduler.schedule(() -> {
final Runnable work = () -> {
logger.info(LOG_MESSAGE, connectionId, "Processing all pending tasks and closing old reactor.");
try {
if (reactor.process()) {
Expand All @@ -169,20 +174,27 @@ private void scheduleCompletePendingTasks() {
// session before we were able to schedule this work.
}

close("Finished processing pending tasks.");
close("Finished processing pending tasks.", false);
}
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
};

try {
this.scheduler.schedule(work, timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.warning("Scheduler was already closed. Manually releasing reactor.");
work.run();
}
}

private void close(String reason) {
private void close(String reason, boolean initiatedByClient) {
logger.verbose("Completing close and disposing scheduler. {}", reason);
scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason));
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, initiatedByClient, reason));
}

@Override
Expand All @@ -191,8 +203,12 @@ public Mono<Void> closeAsync() {
return isClosedMono.asMono();
}

// Pending tasks are scheduled to be invoked after the timeout period, which would complete this Mono.
if (hasStarted.get()) {
scheduleCompletePendingTasks();
} else {
// Rector never started, so just complete this Mono.
close("Closing based on user-invoked close operation.", true);
}

return isClosedMono.asMono();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece

sink.success(message);
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
sink.error(e);
}
});
Expand Down Expand Up @@ -186,6 +186,8 @@ public Mono<Void> addCredits(int credits) {
sink.error(new UncheckedIOException(String.format(
"connectionId[%s] linkName[%s] Unable to schedule work to add more credits.",
handler.getConnectionId(), getLinkName()), e));
} catch (RejectedExecutionException e) {
sink.error(e);
}
});
}
Expand Down Expand Up @@ -278,10 +280,17 @@ Mono<Void> closeAsync(String message, ErrorCondition errorCondition) {
return Mono.fromRunnable(() -> {
try {
dispatcher.invoke(closeReceiver);
} catch (IOException | RejectedExecutionException e) {
logger.info("connectionId[{}] linkName[{}] Could not schedule disposing of receiver on "
} catch (IOException e) {
logger.warning("connectionId[{}] linkName[{}] IO sink was closed when scheduling work. Manually "
+ "invoking and completing close.", handler.getConnectionId(), getLinkName(), e);

closeReceiver.run();
completeClose();
} catch (RejectedExecutionException e) {
// Not logging error here again because we have to log the exception when we throw it.
logger.info("connectionId[{}] linkName[{}] RejectedExecutionException when scheduling on "
+ "ReactorDispatcher. Manually invoking and completing close.", handler.getConnectionId(),
getLinkName(), e);
getLinkName());

closeReceiver.run();
completeClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,16 @@ Mono<Void> closeAsync(String message, ErrorCondition errorCondition) {
return Mono.fromRunnable(() -> {
try {
reactorProvider.getReactorDispatcher().invoke(closeWork);
} catch (IOException | RejectedExecutionException e) {
logger.info("connectionId[{}] entityPath[{}] linkName[{}]: Could not schedule close work. Running"
} catch (IOException e) {
logger.warning("connectionId[{}] entityPath[{}] linkName[{}]: Could not schedule close work. Running"
+ " manually. And completing close.", handler.getConnectionId(), entityPath, getLinkName(), e);

closeWork.run();
handleClose();
} catch (RejectedExecutionException e) {
logger.info("connectionId[{}] entityPath[{}] linkName[{}]: RejectedExecutionException scheduling close"
+ " work. And completing close.", handler.getConnectionId(), entityPath, getLinkName());

closeWork.run();
handleClose();
}
Expand Down Expand Up @@ -624,7 +630,11 @@ private void scheduleWorkOnDispatcher() {
try {
reactorProvider.getReactorDispatcher().invoke(this::processSendWork);
} catch (IOException e) {
logger.error("Error scheduling work on reactor.", e);
logger.warning("connectionId[{}] linkName[{}]: Error scheduling work on reactor.",
handler.getConnectionId(), getLinkName(), e);
} catch (RejectedExecutionException e) {
logger.info("connectionId[{}] linkName[{}]: Error scheduling work on reactor because of"
+ " RejectedExecutionException.", handler.getConnectionId(), getLinkName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,15 @@ Mono<Void> closeAsync(String message, ErrorCondition errorCondition, boolean dis
return Mono.fromRunnable(() -> {
try {
provider.getReactorDispatcher().invoke(() -> disposeWork(errorCondition, disposeLinks));
} catch (IOException | RejectedExecutionException e) {
} catch (IOException e) {
logger.info("connectionId[{}] sessionName[{}] Error while scheduling work. Manually disposing.",
sessionHandler.getConnectionId(), sessionName, e);

disposeWork(errorCondition, disposeLinks);
} catch (RejectedExecutionException e) {
logger.info("connectionId[{}] sessionName[{}] RejectedExecutionException when scheduling work.",
sessionHandler.getConnectionId(), sessionName);

disposeWork(errorCondition, disposeLinks);
}
}).then(isClosedMono.asMono());
Expand Down Expand Up @@ -366,7 +372,7 @@ protected Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPat

sink.success(computed.getLink());
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
sink.error(e);
}
}));
Expand Down Expand Up @@ -463,7 +469,7 @@ private Mono<AmqpSendLink> createProducer(String linkName, String entityPath,

sink.success(computed.getLink());
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
sink.error(e);
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public class RequestResponseChannel implements AsyncCloseable {
* @param provider The reactor provider that the request will be sent with.
* @param senderSettleMode to set as {@link SenderSettleMode} on sender.
* @param receiverSettleMode to set as {@link ReceiverSettleMode} on receiver.
*
* @throws RuntimeException if the send/receive links could not be locally scheduled to open.
*/
protected RequestResponseChannel(AmqpConnection amqpConnection, String connectionId,
String fullyQualifiedNamespace, String linkName, String entityPath, Session session,
Expand Down Expand Up @@ -210,7 +212,7 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
this.sendLink.open();
this.receiveLink.open();
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
throw logger.logExceptionAsError(new RuntimeException(String.format(
"connectionId[%s], linkName[%s]: Unable to open send and receive link.", connectionId, linkName), e));
}
Expand Down Expand Up @@ -342,7 +344,7 @@ public Mono<Message> sendWithAck(final Message message, DeliveryState deliverySt
delivery.settle();
sendLink.advance();
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
sink.error(e);
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;

public class SessionHandler extends Handler {
private final String entityName;
Expand Down Expand Up @@ -48,8 +49,8 @@ public void onSessionLocalOpen(Event e) {

try {
reactorDispatcher.invoke(this::onSessionTimeout, this.openTimeout);
} catch (IOException ioException) {
logger.warning("onSessionLocalOpen connectionId[{}], entityName[{}], reactorDispatcherError[{}]",
} catch (IOException | RejectedExecutionException ioException) {
logger.info("onSessionLocalOpen connectionId[{}], entityName[{}], reactorDispatcherError[{}]",
getConnectionId(), this.entityName,
ioException.getMessage());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ public static Stream<Throwable> newConnectionOnRetriableError() {
@ParameterizedTest
void newConnectionOnRetriableError(Throwable exception) {
// Arrange
final Flux<TestObject> publisher = createSink(connection1, connection2);
final AmqpChannelProcessor<TestObject> processor = publisher.subscribeWith(channelProcessor);
final TestPublisher<TestObject> publisher = TestPublisher.createCold();
publisher.next(connection1);
publisher.next(connection2);
final AmqpChannelProcessor<TestObject> processor = publisher.flux().subscribeWith(channelProcessor);

when(retryPolicy.calculateRetryDelay(exception, 1)).thenReturn(Duration.ofSeconds(1));
when(retryPolicy.getMaxRetries()).thenReturn(3);
Expand Down
Loading

0 comments on commit da926ad

Please sign in to comment.