You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Implement RequestResponseChannelCache by following the ReactorConnectionCache design Reference.
RequestResponseChannelCache "recovery-support" terminates when 1. terminated is true (as a result of closing cache) 2. Or when the hosting connection is disposed.
RequestResponseChannelClosedException seems to be the right one to emit if there is a cache access attempt post the "recovery-support" termination.
Awaiting for the activation before loading to cache (note: activation await is a prerequisite for every sendWithAck)
Delete our legacy AmqpChannelProcessor type (completion of 33224 and cache implementation means we no longer use it).
Remove witRetry applied to request-link, response-link activation in sendWithAck, instead use timeout. Why? Suppose there is a race between sendWithAck and RequestResponseChannel disposal due to an error in one of the links. In that case, resubscribing to the endpoint of the same link object after a backoff (from withRetry) will get the same error (via replay) for all recurring retries in the retry cycle. It will delay/block the consumer/producer recovery for a time equivalent to the complete retry cycle duration.
(Nice to have) sendWithAck has an entry check to see if RequestResponseChannel is disposed. We set this atomic flag in the close-async. The close-async is invoked post terminating the unconfirmed sends. Each such termination is a call to a downstream subscriber. An invocation of sendWithAck (new one or as a result of a retry from a just terminated unconfirmed send) while still iterating the unconfirmed send won't see the disposed flag as set yet. We can improve the entry check as in (isDisposed || hasError || pendingLinkTerminations <= 1).
Review the threading (hopping, type of threads- parallel, Qpid-event-loop) associated with all the routes that lead to cache refresh. And compare it with how those routes were refreshing the legacy AmqpChannelProcessor's channel.
Sketch any relevant digrams for library developer reference.
...[ identify and add]
The text was updated successfully, but these errors were encountered:
RequestResponseChannelCache::Ctr to take ReactorConnection.
The source or provider chain for cache resource (inside RRChannelCache) flow as -
Mono.defer - check "recovery-support" is not terminated, then calls connection.createSession (awaits connection to active, session to active).
New up RequestResponseChannel.
RequestResponseChannelCache to implement dispose(), which marks terminated flag and delegates to RequestResponseChannel closeAsync.
ReactorConnection::getOrCreateCBSNode to create CBS RequestResponseChannelCache and pass it to ClaimsBasedSecurityChannel
ReactorConnection::getOrCreateCBSNode is synchronized; the API is in a hot path, so reduce current synchronization there via DCL. [Changes legacy design choice]
When connection closes ClaimsBasedSecurityChannel (CBSC), CBSC to close owned CBS RequestResponseChannelCache.
ReactorConnection owns only ClaimsBasedSecurityChannel, not its backing CBS RequestResponseChannelCache. ReactorConnection.close -> ClaimsBasedSecurityChannel.close -> CBS RequestResponseChannelCache.close [Changes legacy design choice].
Today, ReactorConnection closeAsync does the following cbsCloseOperation = cbsChannelProcessor.flatMap(channel -> channel.closeAsync()); which may lead to an unnecessary subscription chain execution attempting to create a CBS session only to find that the connection endpoint is in an errored state. With the new design, this logic is no longer exists i.e. no such chain triggering.
The text was updated successfully, but these errors were encountered: