-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closing reactor connection and children. #19924
Conversation
08eb9ab
to
ce4a583
Compare
/azp run java - eventhubs - tests |
Azure Pipelines successfully started running 1 pipeline(s). |
998f964
to
8d6ef0e
Compare
Changing double hasStarted check.
… for each interval (for a Flux).
… longer "connected".
…into conniey/messaging-factory
...ore/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AsyncAutoCloseable.java
Show resolved
Hide resolved
...core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
Outdated
Show resolved
Hide resolved
...core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
Outdated
Show resolved
Hide resolved
...core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
Show resolved
Hide resolved
...core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
Outdated
Show resolved
Hide resolved
...core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java
Show resolved
Hide resolved
952c1f0
to
500ad8c
Compare
500ad8c
to
45d9f8e
Compare
* Adding lock around creation of reactor. * Align with Track 1 by scheduling pending sends after operation timeout. Changing hasStarted check. * ReactorSession has reference to AmqpConnection (parent) to dispose. * SessionHandler passes along its remote state instead of Active. * Only sets errorCondition when it exists. * Emitting a connection when it is active. Also, adding timeout messages. * Connect AmqpConnection to dispose of child when shutdown signal received. * When endpointStates or shutdownSignals are received, the sender is no longer "connected". * ReactorSender: Rename to isClosed. Adding Mono. * ReactorReceiver: Adding async closing. * ReactorExecutor: Close scheduler when there is no more work. * ReactorSender: Change logging to verbose. * ReactorSession: Close session when it has completed all tasks. * Update connection to close when sessions are disposed. * Update Connection to only propagate more shutdown signals if it has not been signaled already. * Adding retry time to close ReactorExecutor. * Moves decodeMessage and remote credit into dispatcher.invoke method. * Do not use .distinct() for endpoint handlers. * Use boundedElastic in EventHubClient instead of deprecated elastic(). * Dispose sender and receiver when it is no longer authorized. * Using distinctUntilChanged() rather than keeping track of endpoints myself. * Add PartitionPump to release the scheduler when a partition is closed. * Use try/catch around dispose in case an exception occurs. * Adding shutdown signals to ReactorDispatcher * Adding AsyncAutoCloseable and using it to dispose Reactor classes. * Emitting exception when IOException thrown before reactor is closed. * Catching RejectedExecutionException in the case that the IO signal is closed, so we can close resources manually. * Removing unused scheduler. * Removing duplicated logging. * Fixing possible NPE between checking key and getting value. * Only take while AzureCredentialTokenManager is not disposed. * Checking for NPE when closing links. * Removing comment about elastic scheduler since it is not configurable.
/** | ||
* Asynchronous class to close resources. | ||
*/ | ||
public interface AsyncAutoCloseable extends AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want to extend AutoCloseable? Any class that wants to support both sync and async close
method can implement both interfaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jonathan had mentioned that it would be nice if our AsyncAutoCloseable works nicely with try-with-resources syntactic sugar.
if (isDisposed.getAndSet(true)) { | ||
logger.verbose("connectionId[{}] was already disposed. {}", connectionId, message); | ||
} else { | ||
dispose(new AmqpShutdownSignal(false, false, message)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When an error occurs, we should inspect the AMQP error to ensure that the error was meant for this connection by comparing the connection id in the error with the current connection id. Otherwise, we'll dispose of a good connection when the error was for an older connection that is no longer active. We should do the same for session and link errors as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ConnectionHandler is bound to a single connection, so it will always be the same one. When we create a proton-j Connection, we bind an instance of the ConnectionHandler to it. So each connection has its own connection handler instance.
// When we encounter an error refreshing authorization results, close the receive link. | ||
final Mono<Void> operation = | ||
closeAsync(String.format( | ||
"connectionId[%s] linkName[%s] Token renewal failure. Disposing receive link.", | ||
amqpConnection.getId(), getLinkName()), | ||
new ErrorCondition(Symbol.getSymbol(AmqpErrorCondition.NOT_ALLOWED.getErrorCondition()), | ||
error.getMessage())); | ||
|
||
return operation.then(Mono.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be done in the subscribe
's error handler instead of using onErrorResume
? It is odd to have an empty error handler below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had that before but on an error, we want to output another async operation (ie. close the receiver) and the handlers only support synchronous callbacks, I moved it back to be a part of the chain.
} catch (IOException e) { | ||
logger.warning("Unable to schedule work to add more credits.", e); | ||
sink.error(new RuntimeException(String.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use UncheckedIOException
instead when mapping IOException to a runtime exception.
https://docs.oracle.com/javase/8/docs/api/java/io/UncheckedIOException.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet. I've never heard of this.
})); | ||
next.receive() | ||
.onBackpressureBuffer(maxQueueSize, BufferOverflowStrategy.ERROR) | ||
.subscribe(message -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this subscribe have an error handler to dispose the link if there's an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No because the underlying deliverySink this fetches from never outputs an onError, it will do that via the endpointStates
* Adding lock around creation of reactor. * Align with Track 1 by scheduling pending sends after operation timeout. Changing hasStarted check. * ReactorSession has reference to AmqpConnection (parent) to dispose. * SessionHandler passes along its remote state instead of Active. * Only sets errorCondition when it exists. * Emitting a connection when it is active. Also, adding timeout messages. * Connect AmqpConnection to dispose of child when shutdown signal received. * When endpointStates or shutdownSignals are received, the sender is no longer "connected". * ReactorSender: Rename to isClosed. Adding Mono. * ReactorReceiver: Adding async closing. * ReactorExecutor: Close scheduler when there is no more work. * ReactorSender: Change logging to verbose. * ReactorSession: Close session when it has completed all tasks. * Update connection to close when sessions are disposed. * Update Connection to only propagate more shutdown signals if it has not been signaled already. * Adding retry time to close ReactorExecutor. * Moves decodeMessage and remote credit into dispatcher.invoke method. * Do not use .distinct() for endpoint handlers. * Use boundedElastic in EventHubClient instead of deprecated elastic(). * Dispose sender and receiver when it is no longer authorized. * Using distinctUntilChanged() rather than keeping track of endpoints myself. * Add PartitionPump to release the scheduler when a partition is closed. * Use try/catch around dispose in case an exception occurs. * Adding shutdown signals to ReactorDispatcher * Adding AsyncAutoCloseable and using it to dispose Reactor classes. * Emitting exception when IOException thrown before reactor is closed. * Catching RejectedExecutionException in the case that the IO signal is closed, so we can close resources manually. * Removing unused scheduler. * Removing duplicated logging. * Fixing possible NPE between checking key and getting value. * Only take while AzureCredentialTokenManager is not disposed. * Checking for NPE when closing links. * Removing comment about elastic scheduler since it is not configurable.
* Adding lock around creation of reactor. * Align with Track 1 by scheduling pending sends after operation timeout. Changing hasStarted check. * ReactorSession has reference to AmqpConnection (parent) to dispose. * SessionHandler passes along its remote state instead of Active. * Only sets errorCondition when it exists. * Emitting a connection when it is active. Also, adding timeout messages. * Connect AmqpConnection to dispose of child when shutdown signal received. * When endpointStates or shutdownSignals are received, the sender is no longer "connected". * ReactorSender: Rename to isClosed. Adding Mono. * ReactorReceiver: Adding async closing. * ReactorExecutor: Close scheduler when there is no more work. * ReactorSender: Change logging to verbose. * ReactorSession: Close session when it has completed all tasks. * Update connection to close when sessions are disposed. * Update Connection to only propagate more shutdown signals if it has not been signaled already. * Adding retry time to close ReactorExecutor. * Moves decodeMessage and remote credit into dispatcher.invoke method. * Do not use .distinct() for endpoint handlers. * Use boundedElastic in EventHubClient instead of deprecated elastic(). * Dispose sender and receiver when it is no longer authorized. * Using distinctUntilChanged() rather than keeping track of endpoints myself. * Add PartitionPump to release the scheduler when a partition is closed. * Use try/catch around dispose in case an exception occurs. * Adding shutdown signals to ReactorDispatcher * Adding AsyncAutoCloseable and using it to dispose Reactor classes. * Emitting exception when IOException thrown before reactor is closed. * Catching RejectedExecutionException in the case that the IO signal is closed, so we can close resources manually. * Removing unused scheduler. * Removing duplicated logging. * Fixing possible NPE between checking key and getting value. * Only take while AzureCredentialTokenManager is not disposed. * Checking for NPE when closing links. * Removing comment about elastic scheduler since it is not configurable.
*Processor
usage to new reactorSinks.
.Related: #18070