Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding additional logging to ReactorDispatcher and ReactorExecutor. Adding closing logic #24457

Merged
merged 10 commits into from
Oct 1, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,32 @@ public Mono<AmqpShutdownSignal> getShutdownSignal() {
return shutdownSignal.asMono();
}

/**
* Schedules work to be performed on the underlying proton-j reactor.
*
* @param work Work to be run on the underlying proton-j reactor.
*
* @throws IOException If the underlying IO pipe cannot be signalled saying that there is more work to be done.
* @throws RejectedExecutionException if the reactor instance has already been closed or the underlying IO Pipe
* has been closed.
*/
public void invoke(final Runnable work) throws IOException {
this.throwIfSchedulerError();

this.workQueue.offer(new Work(work));
this.signalWorkQueue();
}

/**
* Schedules work to be performed on the proton-j reactor after {@code delay}.
*
* @param work Work to be run on the underlying proton-j reactor.
* @param delay Delay before work should scheduled for execution.
*
* @throws IOException If the underlying IO pipe cannot be signalled saying that there is more work to be done.
* @throws RejectedExecutionException if the reactor instance has already been closed or the underlying IO Pipe
* has been closed.
*/
public void invoke(final Runnable work, final Duration delay) throws IOException {
this.throwIfSchedulerError();

Expand All @@ -106,14 +125,16 @@ private void throwIfSchedulerError() {
final RejectedExecutionException rejectedException = this.reactor.attachments()
.get(RejectedExecutionException.class, RejectedExecutionException.class);
if (rejectedException != null) {
throw logger.logExceptionAsError(new RejectedExecutionException(rejectedException.getMessage(),
rejectedException));
throw new RejectedExecutionException(
"Underlying Reactor was already disposed. Should not continue dispatching work to this. "
+ rejectedException.getMessage(), rejectedException);
}

// throw when the pipe is in closed state - in which case,
// signalling the new event-dispatch will fail
if (!this.ioSignal.sink().isOpen()) {
throw logger.logExceptionAsError(new RejectedExecutionException("ReactorDispatcher instance is closed."));
throw new RejectedExecutionException("ReactorDispatcher instance is closed. Should not continue "
+ "dispatching work to this reactor.");
}
}

Expand Down Expand Up @@ -203,7 +224,7 @@ public void run(Selectable selectable) {
logger.info("connectionId[{}] Reactor selectable is being disposed.", connectionId);

shutdownSignal.emitValue(new AmqpShutdownSignal(false, false,
String.format("connectionId[%s] Reactor selectable is disposed.", connectionId)),
String.format("connectionId[%s] Reactor selectable is disposed.", connectionId)),
Sinks.EmitFailureHandler.FAIL_FAST);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class ReactorExecutor implements AsyncCloseable {
* #closeAsync()} is called.
*/
void start() {
if (isDisposed.get()) {
logger.warning("Cannot start reactor when executor has been disposed.");
return;
}

if (hasStarted.getAndSet(true)) {
logger.warning("ReactorExecutor has already started.");
return;
Expand Down Expand Up @@ -139,7 +144,7 @@ private void run() {
+ "process.";

logger.info(LOG_MESSAGE, connectionId, reason);
close(reason);
close(reason, true);
}
}
}
Expand All @@ -149,7 +154,7 @@ private void run() {
* Schedules the release of the current reactor after operation timeout has elapsed.
*/
private void scheduleCompletePendingTasks() {
this.scheduler.schedule(() -> {
final Runnable work = () -> {
logger.info(LOG_MESSAGE, connectionId, "Processing all pending tasks and closing old reactor.");
try {
if (reactor.process()) {
Expand All @@ -169,20 +174,27 @@ private void scheduleCompletePendingTasks() {
// session before we were able to schedule this work.
}

close("Finished processing pending tasks.");
close("Finished processing pending tasks.", false);
}
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
};

try {
this.scheduler.schedule(work, timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
logger.warning("Scheduler was already closed. Manually releasing reactor.");
work.run();
}
}

private void close(String reason) {
private void close(String reason, boolean initiatedByClient) {
logger.verbose("Completing close and disposing scheduler. {}", reason);
scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason));
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, initiatedByClient, reason));
}

@Override
Expand All @@ -191,8 +203,12 @@ public Mono<Void> closeAsync() {
return isClosedMono.asMono();
}

// Pending tasks are scheduled to be invoked after the timeout period, which would complete this Mono.
if (hasStarted.get()) {
scheduleCompletePendingTasks();
} else {
// Rector never started, so just complete this Mono.
close("Closing based on user-invoked close operation.", true);
}

return isClosedMono.asMono();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected ReactorReceiver(AmqpConnection amqpConnection, String entityPath, Rece

sink.success(message);
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
sink.error(e);
}
});
Expand Down Expand Up @@ -186,6 +186,8 @@ public Mono<Void> addCredits(int credits) {
sink.error(new UncheckedIOException(String.format(
"connectionId[%s] linkName[%s] Unable to schedule work to add more credits.",
handler.getConnectionId(), getLinkName()), e));
} catch (RejectedExecutionException e) {
sink.error(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ private void processDeliveredMessage(Delivery delivery) {
private void scheduleWorkOnDispatcher() {
try {
reactorProvider.getReactorDispatcher().invoke(this::processSendWork);
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
logger.error("Error scheduling work on reactor.", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ protected Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPat

sink.success(computed.getLink());
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
sink.error(e);
}
}));
Expand Down Expand Up @@ -463,7 +463,7 @@ private Mono<AmqpSendLink> createProducer(String linkName, String entityPath,

sink.success(computed.getLink());
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
sink.error(e);
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ protected RequestResponseChannel(AmqpConnection amqpConnection, String connectio
this.sendLink.open();
this.receiveLink.open();
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
throw logger.logExceptionAsError(new RuntimeException(String.format(
"connectionId[%s], linkName[%s]: Unable to open send and receive link.", connectionId, linkName), e));
}
Expand Down Expand Up @@ -342,7 +342,7 @@ public Mono<Message> sendWithAck(final Message message, DeliveryState deliverySt
delivery.settle();
sendLink.advance();
});
} catch (IOException e) {
} catch (IOException | RejectedExecutionException e) {
sink.error(e);
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;

public class SessionHandler extends Handler {
private final String entityName;
Expand Down Expand Up @@ -48,7 +49,7 @@ public void onSessionLocalOpen(Event e) {

try {
reactorDispatcher.invoke(this::onSessionTimeout, this.openTimeout);
} catch (IOException ioException) {
} catch (IOException | RejectedExecutionException ioException) {
logger.warning("onSessionLocalOpen connectionId[{}], entityName[{}], reactorDispatcherError[{}]",
getConnectionId(), this.entityName,
ioException.getMessage());
Expand Down
Loading