Skip to content

Commit

Permalink
High-performance hub component for dealing with many sockets and high…
Browse files Browse the repository at this point in the history
… throughput (#760)

This PR adds DirectHub, which is an alternative implementation to the Hub/CanHub/HubDeviceSelect and GridConnect bridge infrastructure we have today.

The following documentation (and some more) is in `utils/DirectHub.md` in the PR as well.

# DirectHub design

DirectHub is a high performance router component that is suited to do the
forwarding of packets to multiple receivers with minimal CPU and latency
overhead.

It specifically addresses three performance issues with the traditional CanHub
/ Dispatcher infrastructure:

- DirectHub is zero-copy when forwarding packets between sockets. There is a
  buffer which is filled with a ::read on the source socket, and then the
  DirectHub passes around a reference to this buffer all the way to the output
  object, which then ::write's it to the output socket.
- CanHub and GcTcpHub as they operate together perform separate GridConnect
  formatting for every port. When data passes from one port in to three others
  out, there would be one parsing and three separate GridConnect rendering
  calls. DirectHub uses a single GridConnect representation and passes around
  that representation. Only one parsing is done when a CanIf needs a struct
  can_frame.
- DirectHub performs inline calls to the ports when forwarding the packet,
  while CanHub/GcTcpHub allocates a new copy of the buffer which then gets
  enqueued separately for each port's queue, on separate StateFlows. This means
  that the Executor is spinning a lot less for DirectHub, therefore the context
  switching overhead is much smaller. (note 1)

As future expansion, DirectHub by design will allow routing packets across
multiple interface types (e.g. CAN, GridConnect and native-TCP), apply packet
filtering, and admission control / fair queueing for multiple trafic sources.

_(note 1):_ There is a conceptual problem in `Buffer<T>*` in that it conflates
two different but equally important characteristics of data flow. A `Buffer<T>`
is reference-counted, and it can be queued. However, while different owners may
hold separate references (to the same memory), only one owner is allowed to
enqueue a `Buffer<T>` into a `Q`, `QList`, or `StateFlowWithQueue`. This is
because there is only one `QMember` pointer in the `BufferBase`. The result of
this conflation is that when a `Dispatcher` or a `Hub` / `CanHub` sends the
same data to multiple different ports or flows, it needs to actually create a
separate copy for each one of them, and taking a reference is not sufficient.


## Theory of operation


### Entry flow and threading model

In order to make the router have as little overhead as possible, almost
everything about the router should be happening inline instead of
asynchronously / via queueing. Virtual function calls are okay, but
StateFlowWithQueue operations should be avoided.

Inline calls mean that there is a difference in threading concept: most of the
time we use the thread of the caller. When concurrent calls are performed, we
have to hold one of those calls until the other is complete.

Upon an entry call (after the admission controller, see later) we want to first
check if the router is idle. If yes, we should grab a lock and start processing
the message inline. If the router is busy, we should queue the incoming
caller. To allow for both of these, the entry call doesn't actually give us a
message, we get a callback instead that we'll invoke. The sender renders the
actual message in that callback.

After processing the message, the router goes back to idle if the queue of held
callers is found to be empty.

If the queue is non-empty, that means that a different thread called the router
while we were sending a message on the current thread. We notice this in the
`on_done()` method of the service. In this case the router remains busy and the
queue front is taken out for processing. The queue front is always an
Executable and it will be scheduled on the Service's executor (effectively
yielding), while the inline caller's thread gets released.

A consequence is that the caller's send callback may be called either on the
caller's thread inline, or on the Service's thread, sometime later after the
caller signaled the intention of sending something to the DirectHub.

A special case of this threading model is that when the caller runs on the same
executor as the DirectHub, then the actual send callback is guaranteed to
happen on that executor. This is the typical case on a single-processor OpenMRN
application.

### Entry API

The Entry API defines how to send traffic to the DirectHub. It is defined by
`DirectHubInterface<T>` and `MessageAccessor<T>` in `DirectHub.hxx`.

This is an integrated API that will internally consult the admission controller
(not implemented, see later). There are three possible outcomes of an entry
call:
1. admitted and execute inline
2. admitted but queued
3. not admitted, blocked asynchronously. (this doesn't happen today)

When we queue or block the caller, a requirement is to not block the caller's
thread. This is necessary to allow Executors and StateFlows sending traffic to
the DirectHub.

When blocked, the best solution is to queue Executables (these are
queueable). So we put them into a queue, and we put the next one onto the
executor (yield) whenever we're ready, which is typically when the current
packet's processing and routing is done.

If we're idle (available) to process the packet upon the entry call, we want to
run it inline by calling run() on the Executable from the caller's thread.

In other words, assuming the caller is a StateFlow, the inline execution just
means that we `run()` the executable instead of `notify()`'ing it.

The syntax to prepare for both of this from a calling StateFlow (any
`StateFlowBase`):

```
Action have_message() {
    // Makes the next run() go to fill_request(), but does not put *this onto
    // the executor.
    wait_and_call(STATE(fill_request));
    // Will cause run() to be called now or later.
    target->enqueue_send(this);
    // Ensures we do not disturn state_ or the notification.
    return wait();
}

Action fill_request() {
    target->mutable_message()->set_...; // fills message buffer
    target->do_send();
    // should not be call_immediately() because threading is not certain at this
    // point.
    return yield_and_call(STATE(something_next)); 
}
```

There is a slightly more complicated sequence of states to do if the yield at
the end is undesired. The actual implementation of gridconnect / socket read
flows use this more complicated mechanism to process multiple gridconnect
frames that might have come with a single TCP packet.

### Exit API

The Exit API defines how to the DirectHub sends traffic to the ports. It is
defined by `DirectHubPort<T>` and the same `MessageAccessor<T>` in
`DirectHub.hxx`.

Since we are trying to make as much of the DirectHub processing happen inline,
the exit API is synchronous. The exit target is responsible for any queueing
that needs to happen. This is very much like the current FlowInterface<>.

The exit API does not by definition get a ref of the payload. If they need one,
they should take one inline. However, unlike `FlowInterface<T>`, this means
that they can not use a Buffer pointer they get from putting it into a
queue. If they need to queue, they have to allocate a new QMember
somewhere. See (note 1) in the introduction on a significant difference that
this makes.

It is guaranteed that there is at least one ref is held during the time of the
call, and the caller (the hub) will release that ref sometime after the exit
call has returned.

The exit call gets an accessor instead of a sequence of parameters. The memory
for the accessor is owned by the hub, and allows the target to inquire the
necessary parameters. The accessor is assumed to be available only during the
exit call and after the exit call has returned the accessor will be reused for
other messages. This is harmonized with the entry API where we are not queueing
_data_ but we are queueing _data sources_, which then fill in the data when we
are ready for them to do so.

API:

```
class DirectHubPort
{
    void send(MessageAccessor *message);
};

class MessageAccessor
{
    HubSource *source_;
    HubSource *dst_;
    BarrierNotifiable *done_;
    
    bool isFlush_;

    // For string typed hubs we have a BufferPtr<> data_ with a skip_ and size_
    // encapsulated in a class:
    LinkedDataPtr<uint8_t[]> payload_;
    
    // For arbitrary hubs we have a reference to a buffer:
    BufferPtr<T> payload_;
};
```

An important aspect is that the MessageAccessor is a constant sized object. The
real payload is always kept as a reference to a Buffer that was allocated by
the sender object. Output ports are allowed / encouraged to hold on to
references to this Buffer, which allows the zero-copy operation.

### Runner

The hub has at most one current message at any point in time (zero if the hub
is idle, one if the hub is busy). This is the message that is being sent by the
port that was last executed. The MessageAccessor is owned by the runner,
and accessed by the port during the entry API to fill in payload and message
parameters, and passed on to the ports as part of the exit API. There is no
queue of messages.

The runner is not a classic StateFlow, because of the lack of this queue. The
runner only manages the concurrency and queueing of the senders. After the
designated sender fills in the message in the MessageAccessor, the runner is
informed that it shall process the packet. This happens without yielding, by an
inline call to `do_send()` on the `DirectHubInterface`.

Internally, `do_send()` performs the iteration over output ports, calling all
the exit APIs synchronously. Once this is complete, the message gets cleared,
which releases the leftover reference owned by the DirectHub. Then the service
is informed that it may now look for additional callers
(`DirectHubService::on_done()`) that may have been enqueued. If none there, the
hub goes idle. For an inline caller, the control returns to the caller, and it
may attempt to send another packet. This allows a single caller to send a
sequence of messages without needing to yield or spin on an executor.

When we give the packet to an output port, that operation should never block
the router. We should rather block the incoming port than the router. It's the
job of the incoming admission controller to hold back; in the absence of that
the limit on the number and byte length of the buffers makes the data source
hold back.

### Output buffering

For TCP based output ports (both gridconnect-CAN-TCP and native TCP, but not
gridconnect-USB) we want to ensure that the number of kernel calls is much less
than the number of GridConnect packets that are being sent. This is essential
in keeping the costs low, especially that on the CC32xx platform where each
kernel call turns effectively into a packet to be sent to the network.

The DirectHub gets one call and one iteration for each GridConnect packet.

The mechanism that the legacy HubDevice infrastructure used is to create a
BufferPort, which internally reassembles these packets into larger buffers
whenever they come within a certain period of time. This results in data copies
unfortunately.

The DirectHub<uint8_t[]> infrastructure appraches this differently. Instead of
copying the input data into a separate buffer, it attempts to recognize when
the input data came from the same source and used consecutive bytes of the same
buffer. This is accomplished by comparing the Buffer references and offset/size
values of consecutive calls (see `LinkedDataBufferPtr::try_append_from()` in
`DataBuffer.hxx`). When two packets came from consecutive bytes of a single
input buffer, then the two references are united into a single reference with a
longer size. So long as the calls of the DirectHub are without yield, this
works until the entire input buffer is reassembled into a single output buffer,
which will be then written with a single `::write()` call to the socket.

While this mechanism is rather limited, it solves the the high-throughput
problem, when an input client is sending a datagram or stream with a large
number of CAN frames, a single 1460-byte read succeeds from the input socket,
then a sequence of sends happen through the directhub without yielding. On the
output there will be one write for almost all of the data, except a partial
GridConnect packet which had to be held until the next read.

Since the output object keeps the reference to the input buffer, the input
port's read flow will not observe the memory released until the output write
has completed. Since the input port has a limited number of such buffers, this
creates effective back-pressure on the input port not reading too much data
into memory.

### Message representation for untyped data in transit

See (note 1) in the introduction for background about the difference between
reference counted objects and queueable objects (QMembers). Specifically, it is
important to separate the queuing concept from the shared ownership of the data
payload. This is because the same data payload might be owned by multiple
output ports, but if the queue next pointer is tied to the buffer, then we
cannot effectively share.

Generally, all data during transit is represented in BufferPtr<T>. This is a
reference to an input buffer, but is not queueable. DirectHub does not use data
queues internally, so that's OK.

For untyped / string data, we need to ensure that we keep the length of the
contents as well. However, we don't generally know the length of the contents
until the read has happened and we have processed the incoming data on the
source port.

To avoid having to copy data, we perform a single longer read into a large
buffer (typically 1460 bytes, the size of a TCP frame), then we segment this
into individual messages. Each such message will have a reference to the longer
buffer, and an offset and a length attribute (called skip_ and size_).

A particular case to be handled is when one message spans reaches beyond the
end of one such buffer and into the beginning of the next buffer. It could also
happen that a message is longer than 1460 bytes.

For this purpose we keep `BufferBase` objects linked to each other using the
`next_` pointers. The queue created by the `next_` pointers means that the data
payload continues in the next buffer. This is different from the
`StateFlowWithQueue` infrastructure, and generally the `Q` ans `QList` classes,
where the `next_` pointer means that there is another data item (a different
message) waiting to be processed by the same StateFlow later.

The implementation of this mechanism is in `LinkedDataBufferPtr` in
`utils/DataBuffer.hxx`.

Some earlier notes:

BufferBase has the ability to do refcounting and shared ownership. It is
possible to have a BufferBase that has untyped payload (i.e., just
bytes). However, the BufferBase needs to know the amount of bytes as input; we
cannot trim down the actual bytes read from the BufferBase's size field, or
else we lose memory because after freeing the buffer will not be returned to
the right size. An alternative possibility is to have a buffer pool that
generates a single size buffer so everything is returned to the same
queue. Then size can be adjusted to the exact number of bytes read. This might
utilize a proxy buffer pool that takes buffer of a given size from the main
buffer pool and then returns them there upon freeing, resetting the size to
have them land in the appropriate bucket.

As an alternative, `shared_ptr<string>` is a standard template library solution
to the same problem. However, `shared_ptr<string>` causes two memory
allocations for payloads that are longer than 16 bytes, and it has a minimum of
36 bytes length (+payload length+4 if more than 16 bytes).

Note that the input message could be split between two shared buffer
ownerships. This means that the queue entry needs two buffer pointers, an
offset, and a length. We could use the buffer base next pointers to string up
buffers that have data from the same message, even if it's more than two. This
way we only need one buffer pointer. We have to assume that the respective
bytes always go together.

It might make sense to support appending another message to the end of the
buffers. This be especially true if the last buffer pointer is just
partially used up, and thus the bufferptr at the end of the string of
buffers is the same as the incoming next buffer.

### Input segmentation

When data arrives from the socket to be read, we will allocate a shareable
buffer, then execute the asynchronous read. As the read completes, the input
data will be passed on to the segmenter. The goal of the segmenter is to find
the boundary of the message, for gridconnect the `: ... ;` delimiter, and on
native OpenLCB-TCP the binary length of the message. Then the message can be
passed on to routing.

It is possible that during segmentation we start with one ref of a buffer, and
output two independent refs of the same buffer. This happens if a single kernel
read ends up with more than one message, which is rather typical in
GridConnect-TCP, but possibly even in GridConnect-USB.

It is also possible that the segmenter will retain a ref of the last read
buffer, waiting for the completion of the message that is present therein.

We must keep reading bytes from the hardware until the segmenter is happy to
send at least one packet onwards. Only thereafter should we send the packet (or
consult the admission controller). It is essential that a partial packet must
never be sent to the hub, because it is not guaranteed that we get the
completion of that packet before another port might try to send a different
packet. We can not interleave data from different packets, that would be an
unparseable outputs.

Implementation note:

There are several things that have to happen on the ingress port, and the order
in which we do these matters:

- allocate a BarrierNotificable* for accompanying the buffer.
- allocate a (1460-byte) buffer for the `::read` call.
- perform the `::read`
- call the segmenter (which might result in additional buffers needed and
  additional `::read` calls to be made)
- (not implemented yet) consult the admission controller on whether we are
  allowed to send.
- send the message to the hub.

The above list is the current order. There is one suboptimal part, which is
that we allocate a buffer earlier than when we know that there is data to read
from the fd or socket. We could theoretically wait until the fd is selectable
for read, and only then perform the buffer allocation. With the admission
controller this will get even more complicated.

### Legacy connection

We have two reasons to interact with a legacy `CanHub`:
- Running an OpenMRN `IfCan` and a `Node` requires this object to communicate
  with the external world.
- Interacting with a hardware CAN controller via SocketCan or via OpenMRN
  native CAN controller drivers can be done via `struct can_frame` today, and
  the implementation is in `HubDeviceSelect<struct can_frame>`.

To support these use-cases, there is a legacy bridge, which connects a
GridConnect typed `DirectHub` to a `CanHub`. It takes care of parsing the
GridConnect messages in one direction, formatting them in the other direction,
and the bridges the differences between the APIs.

When many CAN frames are generated consecutively, they typically get rendered
into a single text buffer. However, they don't typically get sent off without
a yield inbetween.

====

* Adds support for compiling simplelink with select async callback.

* Adds variety of code and documentation about routing.

* more comments on design.

* More documentation.

* Moves documentation to a separate .md file.

* more docs.

* Starts adding the outline of directhub.

* Adds implementation of input and output.

* Adds the class owning the message payloads.

* Adds DataBuffer with a respective pool to allocate buffers holding untyped data.

* Adds basic write flow to DirectHub.

* Adds code to RingBuffer that allows direct writes and reads into the buffer contents.

* swithes DirectHub to use DataBuffer.

* Merges LinkedDataBuffer and DataBuffer.
Adds missing unittests to DataBuffer.

* Adds implementation of the read flow.

* Adds end to end test for direct hub.

* Adds more test cases to the end to end test.

* Adds a TCP listener hub to the DirectHub classes.

* Adds a block notifiable class that can be used as a semaphore of BarrierNotifiables but is compatible with StateFlows.

* Adds limiting the memory buffer in the hub.

* Fixes bugs in the direct hub implementation.

* Part of the code for shutdown is copied from HuBDeviceSelect.

* Implements clean shutdown in the hub.

* Adds a unit test that tries to make the socket blocked.
Reduces the buffer size in the sockets.

* Test total bytes transmitted via blocked test.

* Adds allocation of the limited pool barrier notifiable into the read flow.

* Adds a bunch more unittests including for race conditions etc.

* reformat

* Adds test for skipping and chaining.

* fix compilation error

* updates chaining test to also check for a buffer with limited size.

* Adds segmenter API and gridconnect segmenter implementation.

* Adds trivial message segmenter.
Plumbs the message segmenter through the API.

* Adds LinkedDataBufferPtr to hold the buf_ head tail skip and free elements.

* Rewrites DirectHub to use LinkedDataBufferPtr.

* Fix compilation errors.

* Fix broken tests due to leaked reference.

* Fixes some bugs around segmentation.
Adds tests for correct segmentation calls.

* Adds support for calling tests with TESTARGS.

* Adds more expectations on correct segmenting:
- expects that clear commands are not sent superfluously.
- expects that the bytes passed on to the hub are the correct ones.

* Fixes a bug on calling the segmenter clear too many times.

* Adds (one direction) of a legacy conversion routine.

* Moves the code to reassemble a linkeddatabuffer into a string as a helper function on LinkedDataBuffer itself.

* Implements the reversed conversion routine.

* starts adding a unit test for the legacy CAN converter.

* gc_format fixes:
- Adds support for parsing an entire gridconnect packet including leading : and trailing ;.
- Fixes constant override for generate newlines.

* Adds tests for the new gridconnect hub to old can-hub connection.

* Ensures that notifiable blocks are reclaimed when being deleted.

* Makes DirectHub delete-able.

* Fixes race conditions on exit and memory leaks.

* Fix whitespace.

* Reorder includes.

* Fixes destructor and unit tests on async notifiable block.

* Adds support for SO_RCVBUF.
Adds logging for the TX failed socket async event.

* Adds better debug/logging.

* Fixes bugs in leaking buffers when the remote socket was unexpectedly closed.

* Adds support for inplace append to next data buffer.

* Adds support to requesting next() while having the lock acquired externally.

* Adds support for appending to the tail buffer instead of enqueueing a different write.

* Ensures that inbounds CAN frames are sent at the topmost priority.

* Fixes a template instantiation problem.

* Adds a facility to override the directhub buffer pool allocation size from a test.
Overrides it in the blocked test down to 64 bytes. These tests fail with the
increased 1460-byte buffer size.

* Updates documentation about incoming direct byte buffering, as the packet size
was increased to 1460 bytes. Reduces max incoming packets to 2 by default.

* Avoid too deep recursion in state flow.

* Fix broken test.

* Fix comment

* Refactors blocked socket tests.
Adds extra debugging printout.

* Makes main buffer pool contain only specifically allocated buffers by removing all buckets.

* Adds an extra blocked socket test with the prod buffer sizes.
This fails currently.

* Fixes incorrectly set done notifiable.

* Removes expectation on total number of bytes transferred, because it really depends on what the kernel is happy to accept.

* Improves readability in data buffer, adds additional tests on an invariant.

* Adds a facility to change constant values in unit tests.

* Adds some verbose log messages.

* Adds more comments for the data and code flow.

* Uses the constant override facility to restoare the blocked tests to their intended state.

Adds stricter expectation on memory use in the default blocked scenarios.

* Makes the directhub buffer size configurable with a link option.

* Fix whitespace.

* Fix spacing and duplicate lines in sources file after merge conflict.

* Fix compile error on FdUtils under freertos.

* Ensures that we keep the barrier pointer in a child when a CAN packet gets parsed.

This makes sure that when we send a legacy packet to the internal flows,
we push back on further traffic entering from the sockets.

Adds unit test for traffic being blocked through the legacy port.

* Write a new overview / introduction to the directhub page.

* Makes the documentation up to date.

Separates unimplemented parts to a future work section, such that it
is clear what is just ideas on paper, and what is the actual
implementation.

* Minor updates to the docs.

* Removes limitation on wait time.

* FIx comments.

* Fix comments and reduce unnecessary log level.

* Remove unnecessary log.

* Fixed comment and adds a todo.

* Fix test build.

---------

Co-authored-by: Balazs Racz <[email protected]>
  • Loading branch information
balazsracz and Balazs Racz authored Jun 21, 2024
1 parent 7a7a2fc commit e60f3f1
Show file tree
Hide file tree
Showing 26 changed files with 4,955 additions and 38 deletions.
8 changes: 8 additions & 0 deletions include/nmranet_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ DECLARE_CONST(gridconnect_buffer_delay_usec);
* two threads per client (multi-threaded) execution model. */
DECLARE_CONST(gridconnect_tcp_use_select);

/// Maximum number of packets to parse from a single DirectHubPort before we
/// wait for data to drain from the system.
DECLARE_CONST(directhub_port_max_incoming_packets);

/// Number of bytes that we will be reading in one go from an incoming port. We
/// will allocate at least this many bytes dedicated for each input port.
DECLARE_CONST(directhub_port_incoming_buffer_size);

/** Number of entries in the remote alias cache */
DECLARE_CONST(remote_alias_cache_size);

Expand Down
68 changes: 68 additions & 0 deletions src/executor/AsyncNotifiableBlock.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/** \copyright
* Copyright (c) 2020, Balazs Racz
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* \file AsyncNotifiableBlock.cxx
*
* An advanced notifiable construct that acts as a fixed pool of
* BarrierNotifiables. A stateflow can pend on acquiring one of them, use that
* barrier, with it automatically returning to the next caller when the Barrier
* goes out of counts.
*
* @author Balazs Racz
* @date 18 Feb 2020
*/

#ifndef _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#endif

#include "AsyncNotifiableBlock.hxx"

#include "os/sleep.h"

AsyncNotifiableBlock::~AsyncNotifiableBlock()
{
// Recollects all notifiable instances, including waiting as long as needed
// if there are some that have not finished yet.
for (unsigned i = 0; i < count_; ++i)
{
while (true)
{
QMember *m = next().item;
if (!m)
{
LOG(VERBOSE,
"shutdown async notifiable block: waiting for returns");
microsleep(100);
}
else
{
HASSERT(initialize(m)->abort_if_almost_done());
break;
}
}
}
}
110 changes: 110 additions & 0 deletions src/executor/AsyncNotifiableBlock.cxxtest
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "executor/AsyncNotifiableBlock.hxx"

#include "utils/test_main.hxx"

class AsyncNotifiableBlockTest : public ::testing::Test
{
protected:
AsyncNotifiableBlock b_ {2};
};

TEST_F(AsyncNotifiableBlockTest, create)
{
}

TEST_F(AsyncNotifiableBlockTest, count_request_release)
{
EXPECT_EQ(2u, b_.pending());
QMember *e = b_.next(0);
EXPECT_NE(nullptr, e);
EXPECT_EQ(1u, b_.pending());

QMember *f = b_.next(0);
EXPECT_NE(nullptr, f);
EXPECT_EQ(0u, b_.pending());

QMember *g = b_.next(0);
EXPECT_EQ(nullptr, g);
EXPECT_EQ(0u, b_.pending());

b_.initialize(e)->notify();
EXPECT_EQ(1u, b_.pending());

QMember *h = b_.next(0);
EXPECT_EQ(e, h);

EXPECT_EQ(0u, b_.pending());

b_.initialize(f)->notify();
b_.initialize(h)->notify();
}

TEST_F(AsyncNotifiableBlockTest, barrier_semantics)
{
EXPECT_EQ(2u, b_.pending());
QMember *e = b_.next(0);
BarrierNotifiable *bn = b_.initialize(e);
EXPECT_EQ(1u, b_.pending());

bn->new_child();
bn->notify();
EXPECT_EQ(1u, b_.pending());
bn->notify();
EXPECT_EQ(2u, b_.pending());
}

class FakeExecutable : public Executable
{
public:
void run() override
{
DIE("unexpected.");
}

void alloc_result(QMember *m) override
{
ASSERT_TRUE(m);
m_ = m;
}

QMember *m_ {nullptr};
};

TEST_F(AsyncNotifiableBlockTest, async_allocation)
{
EXPECT_EQ(2u, b_.pending());
QMember *e = b_.next(0);
EXPECT_NE(nullptr, e);
EXPECT_EQ(1u, b_.pending());

FakeExecutable cli1, cli2, cli3;
EXPECT_EQ(nullptr, cli1.m_);
EXPECT_EQ(nullptr, cli2.m_);
EXPECT_EQ(nullptr, cli3.m_);

b_.next_async(&cli1);
EXPECT_EQ(0u, b_.pending());
EXPECT_NE(nullptr, cli1.m_);
EXPECT_NE(e, cli1.m_);

b_.next_async(&cli2);
b_.next_async(&cli3);
EXPECT_EQ(nullptr, cli2.m_);
EXPECT_EQ(nullptr, cli3.m_);
EXPECT_EQ(0u, b_.pending());

b_.initialize(e)->notify(); // will be handed out to cli2

EXPECT_EQ(0u, b_.pending());
EXPECT_EQ(e, cli2.m_);

b_.initialize(cli1.m_)->notify(); // will be handed out to cli3
EXPECT_EQ(cli1.m_, cli3.m_);
EXPECT_EQ(0u, b_.pending());

b_.initialize(cli3.m_)->notify(); // will be handed back
EXPECT_EQ(1u, b_.pending());

b_.initialize(cli2.m_)->notify(); // will be handed back
EXPECT_EQ(2u, b_.pending());
}
139 changes: 139 additions & 0 deletions src/executor/AsyncNotifiableBlock.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/** \copyright
* Copyright (c) 2013, Balazs Racz
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* \file AsyncNotifiableBlock.hxx
*
* An advanced notifiable construct that acts as a fixed pool of
* BarrierNotifiables. A stateflow can pend on acquiring one of them, use that
* barrier, with it automatically returning to the next caller when the Barrier
* goes out of counts.
*
* @author Balazs Racz
* @date 18 Feb 2020
*/

#ifndef _EXECUTOR_ASYNCNOTIFIABLEBLOCK_HXX_
#define _EXECUTOR_ASYNCNOTIFIABLEBLOCK_HXX_

#include <memory>

#include "executor/Notifiable.hxx"
#include "utils/Queue.hxx"
#include "utils/logging.h"

#include "utils/Buffer.hxx"

/// A block of BarrierNotifiable objects, with an asynchronous allocation
/// call. Caller StateFlows can block on allocating a new entry, and then get
/// back a fresh BarrierNotifiable, which, upon being released will
/// automatically be reallocated to a waiting flow, if any.
class AsyncNotifiableBlock : private Notifiable, public QAsync
{
private:
/// Notifiable class that can act as a BarrierNotifiable but also be
/// enlisted in a queue.
class QueuedBarrier : public BarrierNotifiable, public QMember
{
public:
/// Notification implementation.
///
/// Theory of operation: If this was the last notification (count goes
/// from 1 to 0), we take the done_ pointer, cast it to the owning
/// AsyncNotifiableBlock, and release outselves into the queue
/// there. We keep the count at 1 at all times, which ensures that the
/// done_ pointer remains pointing to the owner AsyncNotifiableBlock.
void notify() override
{
AtomicHolder h(this);
if (count_ == 1)
{
LOG(VERBOSE, "block notifiable %p returned pool size %u",
(BarrierNotifiable *)this,
(unsigned)mainBufferPool->total_size());
auto *tgt = static_cast<AsyncNotifiableBlock *>(done_);
tgt->insert(this);
}
else
{
--count_;
}
}

/// Checks that there is exactly one count in here.
void check_one_count()
{
HASSERT(count_ == 1);
}
};

public:
/// Constructor. @param num_parallelism tells how many BarrierNotifiables
/// we should have and hand out to callers requesting them.
AsyncNotifiableBlock(unsigned num_parallelism)
: count_(num_parallelism)
, barriers_(new QueuedBarrier[num_parallelism])
{
for (unsigned i = 0; i < num_parallelism; ++i)
{
barriers_[i].reset(this);
this->insert(&barriers_[i]);
}
}

/// Destructor.
~AsyncNotifiableBlock();

/// Turns an allocated entry from the QAsync into a usable
/// BarrierNotifiable.
/// @param entry a QMember that was allocated from *this.
/// @return an initialized BarrierNotifiable with exactly one count, and
/// done_ set up to be returned for further use.
BarrierNotifiable *initialize(QMember *entry)
{
QueuedBarrier *b = static_cast<QueuedBarrier *>(entry);
// We must be owning this entry.
HASSERT(barriers_.get() <= b);
HASSERT(b <= (barriers_.get() + count_));
b->check_one_count();
return b;
}

/// Notification implementation -- should never be called.
void notify() override
{
DIE("Should not receive this notification");
}

private:
/// How many barriers do we have.
unsigned count_;
/// The pointer to the block of barriernotifiables.
std::unique_ptr<QueuedBarrier[]> barriers_;

DISALLOW_COPY_AND_ASSIGN(AsyncNotifiableBlock);
};

#endif // _EXECUTOR_ASYNCNOTIFIABLEBLOCK_HXX_
4 changes: 2 additions & 2 deletions src/executor/Notifiable.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private:
/// A BarrierNotifiable allows to create a number of child Notifiable and wait
/// for all of them to finish. When the last one is finished, the parent done
/// callback is called.
class BarrierNotifiable : public Notifiable, private Atomic
class BarrierNotifiable : public Notifiable, protected Atomic
{
public:
/** Constructs a barrier notifiable that is done. Users should call reset()
Expand Down Expand Up @@ -240,7 +240,7 @@ public:
}
}

private:
protected:
/// How many outstanding notifications we are still waiting for. When 0,
/// the barrier is not live; when reaches zero, done_ will be called.
unsigned count_;
Expand Down
1 change: 1 addition & 0 deletions src/executor/sources
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ VPATH := $(SRCDIR)
CSRCS +=

CXXSRCS += \
AsyncNotifiableBlock.cxx \
Executor.cxx \
Notifiable.cxx \
Service.cxx \
Expand Down
Loading

0 comments on commit e60f3f1

Please sign in to comment.