Skip to content

Commit

Permalink
Adding architecture docs. (#22371)
Browse files Browse the repository at this point in the history
* Add architecture diagram for prefetch.

* Adding a receive flow and architecture UML.

* Adding documentation to reactor dispatcher.

* Add documentation.

* Update diagram

* Update sdk/core/azure-core-amqp/docs/architecture.md
  • Loading branch information
conniey authored Jul 12, 2021
1 parent 9144494 commit 0747a41
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 6 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
62 changes: 62 additions & 0 deletions sdk/core/azure-core-amqp/docs/architecture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Architecture Docs

## Qpid Proton-J Integration

Qpid Proton-J publishes events and messages via its event-driven process called [`Reactor`][Reactor]. `azure-core-amqp`
hooks into Qpid Proton-J Reactor via `Handlers`. [`BaseHandler`][BaseHandler] contains all of the events that can be
listened to. `Handlers` can be associated with classes implementing the `Extendable` interface via
`BaseHandler.setHandler(Extendable, Handler)`. Proton-J `Connection`, `Session`, and `Link` (`Sender` and `Receiver`
are specialised instances of `Link`) are all `Extendable`. In `azure-core-amqp`, we create and own an instance of
`Reactor`. When an AMQP connection, session, or link is created from that `Reactor`, we instantiate and correlate the
corresponding [`Handler`][Handlers]. This way, we have granular control over what events are listened to for each
handler, rather than listening to all events available on [`BaseHandler`][BaseHandler].

The UML diagram below shows this relationship; the interfaces shown in green are Proton-J classes. Each Proton-J
instance (e.g., `Connection`, `Session`, `Sender`, `Receiver`) is associated with one corresponding `*Handler`. Each
`azure-core-amqp` `AmqpConnection` is associated with one [`Reactor`][Reactor]. When that instance closes, the AMQP
connection is also closed.

Each [ReactorConnection][ReactorConnection] has one Proton-J [`Reactor`][Reactor] instance. Each [`Reactor`][Reactor]
has one [`ReactorDispatcher`][ReactorDispatcher] and one [`ReactorExecutor`][ReactorExecutor]. When a `Reactor` is
created, a [`SelectableChannel`][SelectableChannel] is associated with this `Reactor`.
[`ReactorDispatcher`][ReactorDispatcher] is responsible for queueing work on this channel and the work is run in FIFO
basis. [`ReactorExecutor`][ReactorExecutor] holds onto the `Reactor` instance and continuously processes to work
SelectableChannel.

![azure-core-amqp integration with Proton-J][AzureCoreAmpqArchitecture]

## Prefetch and AMQP Link Credits

In Project Reactor, prefetch is the initial number of items to request upstream. Afterwards, 75% of the initial prefetch
is used for subsequent `request(long)`.

In Event Hubs, prefetch is the number of AMQP link credits to put on the link when it is first created. After those
initial link credits have been consumed, we have different ways of calculating how many credits are added to the link.

The diagram below illustrates how it happens. Things to note:

* Large `EventData` use multiple AMQP link credits because
* There is no backpressure for [`EventHubConsumerAsyncClient.receiveFromPartition()`][EventHubConsumerAsyncClient].
* [`EventHubConsumerAsyncClient.receiveFromPartition()`] returns `EventData` on `Scheduler.single("<name>")`.
* Since events are not published on another Scheduler, they flow downstream using the Scheduler that
[`ReceiveLinkHandler.onDelivery`][ReceiveLinkHandler] executed on.
* All Proton-J events run on the single scheduler because it is not thread-safe.
* [`EventProcessorClient`][EventProcessorClient] uses back-pressure due to `concatMap` and `publishOn` within its
[PartitionPumpManager.startPartitionPump][PartitionPumpManager].

![Flow of credits when receiving deliveries][ReceiveFlowDiagram]

<!-- Links -->
[BaseHandler]: https://github.com/apache/qpid-proton-j/blob/main/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java
[EventHubConsumerAsyncClient]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java#L334
[EventProcessorClient]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java
[AzureCoreAmpqArchitecture]: ./architecture-uml.jpeg
[Handlers]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler
[ReceiveFlowDiagram]: ./receive-flow.jpeg
[PartitionPumpManager]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java#L228
[Reactor]: https://github.com/apache/qpid-proton-j/blob/main/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
[SelectableChannel]: https://qpid.apache.org/releases/qpid-proton-j-0.33.5/api/org/apache/qpid/proton/reactor/Selectable.html#setChannel-java.nio.channels.SelectableChannel-
[ReceiveLinkHandler]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java#L97
[ReactorConnection]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
[ReactorDispatcher]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorDispatcher.java
[ReactorExecutor]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java
Binary file added sdk/core/azure-core-amqp/docs/receive-flow.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,19 @@
import java.util.concurrent.atomic.AtomicInteger;

/**
* The following utility class is used to generate an event to hook into {@link Reactor}'s event delegation pattern. It
* uses a {@link Pipe} as the IO on which Reactor listens to.
*
* <p>
* {@link Reactor} is not thread-safe - all calls to {@link Proton} APIs should be on the Reactor Thread. {@link
* Reactor} works out-of-box for all event driven API - ex: onReceive - which could raise upon onSocketRead. {@link
* Reactor} doesn't support APIs like send() out-of-box - which could potentially run on different thread to that of the
* Reactor thread.
*
* <p>
* The following utility class is used to generate an Event to hook into {@link Reactor}'s event delegation pattern. It
* uses a {@link Pipe} as the IO on which Reactor listens to.
* </p>
*
* <p>
* Cardinality: Multiple {@link ReactorDispatcher}'s could be attached to 1 {@link Reactor}. Each {@link
* ReactorDispatcher} should be initialized synchronously - as it calls API in {@link Reactor} which is not
* thread-safe.
* ReactorDispatcher} should be initialized synchronously - as it calls API in {@link Reactor} which is not thread-safe.
* </p>
*/
public final class ReactorDispatcher {
Expand All @@ -52,6 +51,13 @@ public final class ReactorDispatcher {
private final AtomicBoolean isClosed = new AtomicBoolean();
private final Sinks.One<AmqpShutdownSignal> shutdownSignal = Sinks.one();

/**
* Creates an instance. The {@code ioSignal} is associated with {@code reactor} as a child {@link Selectable}.
*
* @param connectionId The connection id.
* @param reactor The reactor instance.
* @param ioSignal IO pipe to signal work on the {@code reactor}.
*/
public ReactorDispatcher(final String connectionId, final Reactor reactor, final Pipe ioSignal) {
this.connectionId = connectionId;
this.reactor = reactor;
Expand All @@ -60,6 +66,13 @@ public ReactorDispatcher(final String connectionId, final Reactor reactor, final
this.onClose = new CloseHandler();
this.workScheduler = new WorkScheduler();

// The Proton-J reactor goes quiescent when there is no work to do, and it only wakes up when a Selectable (by
// default, the network connection) signals that data is available.
//
// That's a problem in the send-only scenario, which is a common scenario, or any scenario where activity is
// sparse. If the reactor has gone quiescent, the SDK can put a pending send in the work queue, but it will just
// sit there until a Selectable wakes the reactor up. The pipe gives the SDK code a guaranteed way to ensure the
// reactor is awake.
final Selectable schedulerSelectable = this.reactor.selectable();

schedulerSelectable.setChannel(this.ioSignal.source());
Expand Down

0 comments on commit 0747a41

Please sign in to comment.