-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
High-performance hub component for dealing with many sockets and high throughput #760
Conversation
…t-hub-router * 'master' of github.com:bakerstu/openmrn: gcc9 fixes (#354)
Adds missing unittests to DataBuffer.
…ierNotifiables but is compatible with StateFlows.
Reduces the buffer size in the sockets.
# By Balazs Racz (4) and others # Via GitHub * master: Fix race conditions found by thread sanitizer in an example unittest (#346) Update the API of JSSerialPort for 8.x.x serialport.io (#359) Exclude node_modules directories from doxygen generation. (#361) Update Doxyfile (#347) Fixes assertion when starting an emscripten binary. (#358) Bakerstu 74hc595goi driver (#357) # Conflicts: # src/utils/Buffer.hxx
…their intended state. Adds stricter expectation on memory use in the default blocked scenarios.
… gets parsed. This makes sure that when we send a legacy packet to the internal flows, we push back on further traffic entering from the sockets. Adds unit test for traffic being blocked through the legacy port.
Separates unimplemented parts to a future work section, such that it is clear what is just ideas on paper, and what is the actual implementation.
|
||
AsyncNotifiableBlock::~AsyncNotifiableBlock() | ||
{ | ||
unsigned max = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to rename this from max
to something else to prevent confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
@@ -0,0 +1,71 @@ | |||
/** \copyright | |||
* Copyright (c) 2013, Balazs Racz |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check date
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
{ | ||
if (fd_ < 0) | ||
{ | ||
// Port already closed. Ignore data to send. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks! I was seeing some crashes on disconnect, and this might have been the cause of them.
{ | ||
if (selectHelper_.hasError_) | ||
{ | ||
LOG(INFO, "%p: Error writing to fd %d: (%d) %s", this, fd_, errno, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOG_ERROR ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. A socket error is a normal event when a remote client disconnects.
{ | ||
uint32_t rcvbuf; | ||
socklen_t len = sizeof(rcvbuf); | ||
int ret = getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &len); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this call appears to be for debug / diagnostic, perhaps it should be wrapped / excluded from production code? Or perhaps the value used by the port instance to limit data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also address Mikes comments.
|
||
AsyncNotifiableBlock::~AsyncNotifiableBlock() | ||
{ | ||
unsigned max = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this constant 10 chosen? Should it somehow be related to count_? Some commentary would help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the constant is removed.
Originally I set this code to run as a timeout. Now there is no timeout, the code runs forever (until the condition is reached).
@@ -1735,6 +1752,12 @@ void SimpleLinkFatalErrorEventHandler(SlDeviceFatal_t *slFatalErrorEvent) | |||
static_cast<CC32xxWiFi::FatalErrorEvent*>(slFatalErrorEvent)); | |||
} | |||
|
|||
void SimpleLinkSocketTriggerEventHandler(SlSockTriggerEvent_t *event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment header to this to describe what it does? Maybe copy/paste the comment from the CC32xx API documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
/* | ||
* CC32xxWiFi::trigger_event_handler() | ||
*/ | ||
void CC32xxWiFi::trigger_event_handler(SockTriggerEvent *event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the point of this callback? It doesn't look like it does anything other than log a message. Is this instrumentation for debug? If yes, we we want the LOG level to be "ALAWYS"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduced loglevel to info.
@balazsracz I observed one odd crash with this hub implementation while using a UWT-50, any ideas?
The flow was initiated by the UWT-50 sending a request for loco 4014 and it seems to have lost connection or otherwise failed to reply to the SNIP/FDI portion of the assignment. The UWT-50 became unresponsive so I restarted it and then it lost connection (weak signal from AP I suspect, AP is a Comcast/Xfinity device). When it disconnected the cleanup of the DirectHubPortSelect seems to have triggered one too many notifications. IDF is v5.2.1, OpenMRNIDF is the "directhub" branch which is based on the 5.1.0 branch with this PR integrated for testing. |
…t-hub-router # By Balazs Racz (26) and others * 'master' of github.com:bakerstu/openmrn: (28 commits) Fix build of esp8266 train implementation. Removes unnecessary includes that might not exist on an embedded compiler. Fix compilation of TempFile under esp8266. Add libatomic to esp8266 nonos target. Fix compile errors in time_client app. Fixes in file memory space: (#786) Change startup state to stopped. (#784) Fixes write code for spiflash. (#782) Handles bus passive in TivaCan. (#781) Update libify to support IDF export with symlinks (#770) Fix esp32 select race conditions. (#780) Latency test with maximum stats and custom process evaluation (#773) Fix compiler warnings in openmrn when using new GCC's. (#772) Upintegrate changes from the OpenMRNIDF repository (#771) Adds support for DCC extended accessories (#769) Fix incorrect consumer identified message being emitted by dcc accy producer. (#768) Avoids rendering hidden segments. (#767) Adds trailing zero to the cdi XML file written to the filesystem. (#777) Fix target subdirectory name (#775) Fixes file comment. ... # Conflicts: # src/utils/constants.cxx # src/utils/sources
…t-hub-router * 'master' of github.com:bakerstu/openmrn: High-performance hub component for dealing with many sockets and high throughput (#760)
* bracz-direct-hub-router: (35 commits) High-performance hub component for dealing with many sockets and high throughput (#760) Fix test build. Fixed comment and adds a todo. Remove unnecessary log. Fix comments and reduce unnecessary log level. FIx comments. Fix build of esp8266 train implementation. Removes unnecessary includes that might not exist on an embedded compiler. Fix compilation of TempFile under esp8266. Add libatomic to esp8266 nonos target. Fix compile errors in time_client app. Fixes in file memory space: (#786) Change startup state to stopped. (#784) Fixes write code for spiflash. (#782) Handles bus passive in TivaCan. (#781) Update libify to support IDF export with symlinks (#770) Fix esp32 select race conditions. (#780) Latency test with maximum stats and custom process evaluation (#773) Fix compiler warnings in openmrn when using new GCC's. (#772) Upintegrate changes from the OpenMRNIDF repository (#771) ...
* master: (76 commits) Fixes some compile errors in nucleo and bracz.acc. ESP-IDF CMakeLists (#800) BLE Basic Infrastructure (#788) Handles unhandled-addressed-messages by generating an OIR reply. (#798) Adds factory reset handler to linux:io_board. (#797) Fixes a standards compliance issue with the alias conflict handler. (#793) Bug fixes in DataBuffer (#791) Fixes race conditions in HubDeviceSelect. (#795) Fixes missing translation of enums when reading the security mode from a simplelink profile. (#796) Fixes flaky IfCanStress.test. (#794) Pin esp32 platform to 2.0.x (#792) Fixes detecting EOF in the memory config protocol handler. (#789) Adds a new hub application using DirectHub (#761) High-performance hub component for dealing with many sockets and high throughput (#760) Fix build of esp8266 train implementation. Removes unnecessary includes that might not exist on an embedded compiler. Fix compilation of TempFile under esp8266. Add libatomic to esp8266 nonos target. Fix compile errors in time_client app. Fixes in file memory space: (#786) ...
This PR adds DirectHub, which is an alternative implementation to the Hub/CanHub/HubDeviceSelect and GridConnect bridge infrastructure we have today.
The following documentation (and some more) is in
utils/DirectHub.md
in the PR as well.DirectHub design
DirectHub is a high performance router component that is suited to do the
forwarding of packets to multiple receivers with minimal CPU and latency
overhead.
It specifically addresses three performance issues with the traditional CanHub
/ Dispatcher infrastructure:
buffer which is filled with a ::read on the source socket, and then the
DirectHub passes around a reference to this buffer all the way to the output
object, which then ::write's it to the output socket.
formatting for every port. When data passes from one port in to three others
out, there would be one parsing and three separate GridConnect rendering
calls. DirectHub uses a single GridConnect representation and passes around
that representation. Only one parsing is done when a CanIf needs a struct
can_frame.
while CanHub/GcTcpHub allocates a new copy of the buffer which then gets
enqueued separately for each port's queue, on separate StateFlows. This means
that the Executor is spinning a lot less for DirectHub, therefore the context
switching overhead is much smaller. (note 1)
As future expansion, DirectHub by design will allow routing packets across
multiple interface types (e.g. CAN, GridConnect and native-TCP), apply packet
filtering, and admission control / fair queueing for multiple trafic sources.
(note 1): There is a conceptual problem in
Buffer<T>*
in that it conflatestwo different but equally important characteristics of data flow. A
Buffer<T>
is reference-counted, and it can be queued. However, while different owners may
hold separate references (to the same memory), only one owner is allowed to
enqueue a
Buffer<T>
into aQ
,QList
, orStateFlowWithQueue
. This isbecause there is only one
QMember
pointer in theBufferBase
. The result ofthis conflation is that when a
Dispatcher
or aHub
/CanHub
sends thesame data to multiple different ports or flows, it needs to actually create a
separate copy for each one of them, and taking a reference is not sufficient.
Theory of operation
Entry flow and threading model
In order to make the router have as little overhead as possible, almost
everything about the router should be happening inline instead of
asynchronously / via queueing. Virtual function calls are okay, but
StateFlowWithQueue operations should be avoided.
Inline calls mean that there is a difference in threading concept: most of the
time we use the thread of the caller. When concurrent calls are performed, we
have to hold one of those calls until the other is complete.
Upon an entry call (after the admission controller, see later) we want to first
check if the router is idle. If yes, we should grab a lock and start processing
the message inline. If the router is busy, we should queue the incoming
caller. To allow for both of these, the entry call doesn't actually give us a
message, we get a callback instead that we'll invoke. The sender renders the
actual message in that callback.
After processing the message, the router goes back to idle if the queue of held
callers is found to be empty.
If the queue is non-empty, that means that a different thread called the router
while we were sending a message on the current thread. We notice this in the
on_done()
method of the service. In this case the router remains busy and thequeue front is taken out for processing. The queue front is always an
Executable and it will be scheduled on the Service's executor (effectively
yielding), while the inline caller's thread gets released.
A consequence is that the caller's send callback may be called either on the
caller's thread inline, or on the Service's thread, sometime later after the
caller signaled the intention of sending something to the DirectHub.
A special case of this threading model is that when the caller runs on the same
executor as the DirectHub, then the actual send callback is guaranteed to
happen on that executor. This is the typical case on a single-processor OpenMRN
application.
Entry API
The Entry API defines how to send traffic to the DirectHub. It is defined by
DirectHubInterface<T>
andMessageAccessor<T>
inDirectHub.hxx
.This is an integrated API that will internally consult the admission controller
(not implemented, see later). There are three possible outcomes of an entry
call:
When we queue or block the caller, a requirement is to not block the caller's
thread. This is necessary to allow Executors and StateFlows sending traffic to
the DirectHub.
When blocked, the best solution is to queue Executables (these are
queueable). So we put them into a queue, and we put the next one onto the
executor (yield) whenever we're ready, which is typically when the current
packet's processing and routing is done.
If we're idle (available) to process the packet upon the entry call, we want to
run it inline by calling run() on the Executable from the caller's thread.
In other words, assuming the caller is a StateFlow, the inline execution just
means that we
run()
the executable instead ofnotify()
'ing it.The syntax to prepare for both of this from a calling StateFlow (any
StateFlowBase
):There is a slightly more complicated sequence of states to do if the yield at
the end is undesired. The actual implementation of gridconnect / socket read
flows use this more complicated mechanism to process multiple gridconnect
frames that might have come with a single TCP packet.
Exit API
The Exit API defines how to the DirectHub sends traffic to the ports. It is
defined by
DirectHubPort<T>
and the sameMessageAccessor<T>
inDirectHub.hxx
.Since we are trying to make as much of the DirectHub processing happen inline,
the exit API is synchronous. The exit target is responsible for any queueing
that needs to happen. This is very much like the current FlowInterface<>.
The exit API does not by definition get a ref of the payload. If they need one,
they should take one inline. However, unlike
FlowInterface<T>
, this meansthat they can not use a Buffer pointer they get from putting it into a
queue. If they need to queue, they have to allocate a new QMember
somewhere. See (note 1) in the introduction on a significant difference that
this makes.
It is guaranteed that there is at least one ref is held during the time of the
call, and the caller (the hub) will release that ref sometime after the exit
call has returned.
The exit call gets an accessor instead of a sequence of parameters. The memory
for the accessor is owned by the hub, and allows the target to inquire the
necessary parameters. The accessor is assumed to be available only during the
exit call and after the exit call has returned the accessor will be reused for
other messages. This is harmonized with the entry API where we are not queueing
data but we are queueing data sources, which then fill in the data when we
are ready for them to do so.
API:
An important aspect is that the MessageAccessor is a constant sized object. The
real payload is always kept as a reference to a Buffer that was allocated by
the sender object. Output ports are allowed / encouraged to hold on to
references to this Buffer, which allows the zero-copy operation.
Runner
The hub has at most one current message at any point in time (zero if the hub
is idle, one if the hub is busy). This is the message that is being sent by the
port that was last executed. The MessageAccessor is owned by the runner,
and accessed by the port during the entry API to fill in payload and message
parameters, and passed on to the ports as part of the exit API. There is no
queue of messages.
The runner is not a classic StateFlow, because of the lack of this queue. The
runner only manages the concurrency and queueing of the senders. After the
designated sender fills in the message in the MessageAccessor, the runner is
informed that it shall process the packet. This happens without yielding, by an
inline call to
do_send()
on theDirectHubInterface
.Internally,
do_send()
performs the iteration over output ports, calling allthe exit APIs synchronously. Once this is complete, the message gets cleared,
which releases the leftover reference owned by the DirectHub. Then the service
is informed that it may now look for additional callers
(
DirectHubService::on_done()
) that may have been enqueued. If none there, thehub goes idle. For an inline caller, the control returns to the caller, and it
may attempt to send another packet. This allows a single caller to send a
sequence of messages without needing to yield or spin on an executor.
When we give the packet to an output port, that operation should never block
the router. We should rather block the incoming port than the router. It's the
job of the incoming admission controller to hold back; in the absence of that
the limit on the number and byte length of the buffers makes the data source
hold back.
Output buffering
For TCP based output ports (both gridconnect-CAN-TCP and native TCP, but not
gridconnect-USB) we want to ensure that the number of kernel calls is much less
than the number of GridConnect packets that are being sent. This is essential
in keeping the costs low, especially that on the CC32xx platform where each
kernel call turns effectively into a packet to be sent to the network.
The DirectHub gets one call and one iteration for each GridConnect packet.
The mechanism that the legacy HubDevice infrastructure used is to create a
BufferPort, which internally reassembles these packets into larger buffers
whenever they come within a certain period of time. This results in data copies
unfortunately.
The DirectHub<uint8_t[]> infrastructure appraches this differently. Instead of
copying the input data into a separate buffer, it attempts to recognize when
the input data came from the same source and used consecutive bytes of the same
buffer. This is accomplished by comparing the Buffer references and offset/size
values of consecutive calls (see
LinkedDataBufferPtr::try_append_from()
inDataBuffer.hxx
). When two packets came from consecutive bytes of a singleinput buffer, then the two references are united into a single reference with a
longer size. So long as the calls of the DirectHub are without yield, this
works until the entire input buffer is reassembled into a single output buffer,
which will be then written with a single
::write()
call to the socket.While this mechanism is rather limited, it solves the the high-throughput
problem, when an input client is sending a datagram or stream with a large
number of CAN frames, a single 1460-byte read succeeds from the input socket,
then a sequence of sends happen through the directhub without yielding. On the
output there will be one write for almost all of the data, except a partial
GridConnect packet which had to be held until the next read.
Since the output object keeps the reference to the input buffer, the input
port's read flow will not observe the memory released until the output write
has completed. Since the input port has a limited number of such buffers, this
creates effective back-pressure on the input port not reading too much data
into memory.
Message representation for untyped data in transit
See (note 1) in the introduction for background about the difference between
reference counted objects and queueable objects (QMembers). Specifically, it is
important to separate the queuing concept from the shared ownership of the data
payload. This is because the same data payload might be owned by multiple
output ports, but if the queue next pointer is tied to the buffer, then we
cannot effectively share.
Generally, all data during transit is represented in BufferPtr. This is a
reference to an input buffer, but is not queueable. DirectHub does not use data
queues internally, so that's OK.
For untyped / string data, we need to ensure that we keep the length of the
contents as well. However, we don't generally know the length of the contents
until the read has happened and we have processed the incoming data on the
source port.
To avoid having to copy data, we perform a single longer read into a large
buffer (typically 1460 bytes, the size of a TCP frame), then we segment this
into individual messages. Each such message will have a reference to the longer
buffer, and an offset and a length attribute (called skip_ and size_).
A particular case to be handled is when one message spans reaches beyond the
end of one such buffer and into the beginning of the next buffer. It could also
happen that a message is longer than 1460 bytes.
For this purpose we keep
BufferBase
objects linked to each other using thenext_
pointers. The queue created by thenext_
pointers means that the datapayload continues in the next buffer. This is different from the
StateFlowWithQueue
infrastructure, and generally theQ
ansQList
classes,where the
next_
pointer means that there is another data item (a differentmessage) waiting to be processed by the same StateFlow later.
The implementation of this mechanism is in
LinkedDataBufferPtr
inutils/DataBuffer.hxx
.Some earlier notes:
BufferBase has the ability to do refcounting and shared ownership. It is
possible to have a BufferBase that has untyped payload (i.e., just
bytes). However, the BufferBase needs to know the amount of bytes as input; we
cannot trim down the actual bytes read from the BufferBase's size field, or
else we lose memory because after freeing the buffer will not be returned to
the right size. An alternative possibility is to have a buffer pool that
generates a single size buffer so everything is returned to the same
queue. Then size can be adjusted to the exact number of bytes read. This might
utilize a proxy buffer pool that takes buffer of a given size from the main
buffer pool and then returns them there upon freeing, resetting the size to
have them land in the appropriate bucket.
As an alternative,
shared_ptr<string>
is a standard template library solutionto the same problem. However,
shared_ptr<string>
causes two memoryallocations for payloads that are longer than 16 bytes, and it has a minimum of
36 bytes length (+payload length+4 if more than 16 bytes).
Note that the input message could be split between two shared buffer
ownerships. This means that the queue entry needs two buffer pointers, an
offset, and a length. We could use the buffer base next pointers to string up
buffers that have data from the same message, even if it's more than two. This
way we only need one buffer pointer. We have to assume that the respective
bytes always go together.
It might make sense to support appending another message to the end of the
buffers. This be especially true if the last buffer pointer is just
partially used up, and thus the bufferptr at the end of the string of
buffers is the same as the incoming next buffer.
Input segmentation
When data arrives from the socket to be read, we will allocate a shareable
buffer, then execute the asynchronous read. As the read completes, the input
data will be passed on to the segmenter. The goal of the segmenter is to find
the boundary of the message, for gridconnect the
: ... ;
delimiter, and onnative OpenLCB-TCP the binary length of the message. Then the message can be
passed on to routing.
It is possible that during segmentation we start with one ref of a buffer, and
output two independent refs of the same buffer. This happens if a single kernel
read ends up with more than one message, which is rather typical in
GridConnect-TCP, but possibly even in GridConnect-USB.
It is also possible that the segmenter will retain a ref of the last read
buffer, waiting for the completion of the message that is present therein.
We must keep reading bytes from the hardware until the segmenter is happy to
send at least one packet onwards. Only thereafter should we send the packet (or
consult the admission controller). It is essential that a partial packet must
never be sent to the hub, because it is not guaranteed that we get the
completion of that packet before another port might try to send a different
packet. We can not interleave data from different packets, that would be an
unparseable outputs.
Implementation note:
There are several things that have to happen on the ingress port, and the order
in which we do these matters:
::read
call.::read
additional
::read
calls to be made)allowed to send.
The above list is the current order. There is one suboptimal part, which is
that we allocate a buffer earlier than when we know that there is data to read
from the fd or socket. We could theoretically wait until the fd is selectable
for read, and only then perform the buffer allocation. With the admission
controller this will get even more complicated.
Legacy connection
We have two reasons to interact with a legacy
CanHub
:IfCan
and aNode
requires this object to communicatewith the external world.
native CAN controller drivers can be done via
struct can_frame
today, andthe implementation is in
HubDeviceSelect<struct can_frame>
.To support these use-cases, there is a legacy bridge, which connects a
GridConnect typed
DirectHub
to aCanHub
. It takes care of parsing theGridConnect messages in one direction, formatting them in the other direction,
and the bridges the differences between the APIs.
When many CAN frames are generated consecutively, they typically get rendered
into a single text buffer. However, they don't typically get sent off without
a yield inbetween.