From 9dea4d44f696ebd9f055862c3247aaf6ce07f178 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sun, 17 Apr 2022 18:00:33 +0200 Subject: [PATCH 01/10] Adds byte_stream.md, a design documentation for a proposed feature. --- doc/byte_stream.md | 137 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 doc/byte_stream.md diff --git a/doc/byte_stream.md b/doc/byte_stream.md new file mode 100644 index 000000000..5b2f4d394 --- /dev/null +++ b/doc/byte_stream.md @@ -0,0 +1,137 @@ +# Byte Streams + +***WARNING*** this is currently a proposal that is not implemented yet. + +This document describes a mechanism by which OpenMRN components can exchange a +unidirectional untyped data stream. The API is laid out for compatibility with +the StateFlow concept and provides asynchronous implementations to limit memory +usage and flow control. + +## Use-cases + +- OpenLCB Stream sending. The client component sends data to the stream + transmitter using Byte Streams. + +- OpenLCB Stream receiving. The client component is receiving data from the + Stream service using Byte Streams. + +- A fast implementation of a TCP Hub should use Byte Streams to represent the + TCP messages for proxying between ports. + +## Non-goals + +- It is not a goal to use the Byte Stream API to write from a State Flow to an + fd. (Neither sockets, pipes, nor physical files.) Writing to an fd should be + done by the native StateFlow features like `write_repeated`. + +## Requirements + +- Memory allocation + + - Allocation has to avoid memory fragmentation. While the expectation is that + larger blocks of memory are allocated in one go, these blocks should be + reused between different users of Byte Streams and at different times, + instead of returning to malloc/free. + + - Block allocation size should be 1 kbyte. This is small enough that even + 32kbyte MCUs can use the codebase, but large enough to capture meaningful + TCP packet sizes. + +- Use and copy + + - The implementation should be zero-copy. Once some data is in a byte buffer, + that data should not need to be copied in order to forward it to other byte + buffers or byte streams. + + - Reference counting should be available when the payload needs to be used + in multiple places. + + - It should be possible to take a subset of a buffer and send it to a + different byte stream. (Routers will do this.) + + - A special zero-copy mechanism should be available when the source data is + already in memory or in flash. Sending these bytes into the flow should not + need block allocation and data copy. + +- Flow control + + - The data source has to be blocked in memory allocation when the sink has + not yet consumed the data. + + - The amount of read-ahead should be configurable (i.e., how much memory does + the source fill in before it gets blocked on waiting for the sink). + + +## Implementation + +We define two buffer types. + +- `using RawBuffer = Buffer;` + + This buffer holds the raw data. This buffer is reference counted, shared + between all sinks, and never entered into a `Q`. All sinks consider this + buffer as read-only. + +- `using ByteBuffer = Buffer;` + + This buffer holds a reference to a RawBuffer, and start/end pointers within + that describe the exact range of bytes to use. This buffer is not shareable, + it can be entered into a queue, i.e., sent to a `StateFlowWithQueue` / + `FlowInterface`. + +### Memory allocation + +The `RawBuffer` data blocks should come from `rawBufferPool`, which is a +`DynamicPool` that is not the same as mainBufferPool, but instantiated with a +single bucket of ~1 kbyte size (technically, `sizeof(RawBuffer)`). This ensures +that the memory fragmentation requirements are met. When a RawBuffer is +released, it goes back to the freelist of the `rawBufferPool`. + +To limit the amount of memory allocated, a `LimitedPool` can be instantiated by +the data source which specifies a fixed number of 1kbyte blocks. The backing +pool shall be set to `rawBufferPool`. + +### Memory ownership / deallocation + +`ByteChunk` contains an `BufferPtr`, which is a unique_ptr that +unref's the buffer upon the destructor. This represents the ownership of a +reference to a `RawBuffer`. The destructor of `ByteChunk` will automatically +release this reference. + +It is optional for a ByteChunk to own a RawBuffer reference. A ByteChunk can +also be created from externally-owned memory, such as flash. In this case the +`unique_ptr` remains as nullptr, and no unref happens in the destructor. + +### Zero-copy + +To make a copy of a piece of data, a new ByteBuffer is allocated, and +initialized with a new reference of the same RawBuffer. The copy can have the +same data, or a contiguous substring of the data. Using a substring is helpful +for example when we take the payload of an incoming TCP stream message and +forward it to the stream reader client. It can also be helpful when taking an +incoming message, taking off the header, prepending a new header, and sending +it out on a different port. + +## Definition + +``` +struct RawData +{ + uint8_t payload[1024]; +}; + +using RawBuffer = Buffer; + +struct ByteChunk +{ + BufferPtr ownedData_; + + uint8_t* data_ {nullptr}; + + size_t size_ {0}; +}; + +using ByteBuffer = Buffer; + +using ByteSink = FlowInterface>; +``` From 5147a9776056e61279b89f5cce6886a2f44a6515 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 18 Apr 2022 22:30:46 +0200 Subject: [PATCH 02/10] Initializes the raw buffer pool together with the main buffer pool. --- src/utils/Buffer.cxx | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/utils/Buffer.cxx b/src/utils/Buffer.cxx index 8682ee647..436ecc7df 100644 --- a/src/utils/Buffer.cxx +++ b/src/utils/Buffer.cxx @@ -32,11 +32,17 @@ */ #include "utils/Buffer.hxx" +#include "utils/ByteBuffer.hxx" DynamicPool *mainBufferPool = nullptr; +Pool *rawBufferPool = nullptr; Pool* init_main_buffer_pool() { + if (!rawBufferPool) + { + rawBufferPool = new DynamicPool(Bucket::init(sizeof(RawBuffer), 0)); + } if (!mainBufferPool) { mainBufferPool = From 90a72d26ebfc92875c7bee8627e08552858cd665 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 18 Apr 2022 22:31:49 +0200 Subject: [PATCH 03/10] Adds ByteBuffer header, matching the design document. --- src/utils/ByteBuffer.hxx | 96 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 src/utils/ByteBuffer.hxx diff --git a/src/utils/ByteBuffer.hxx b/src/utils/ByteBuffer.hxx new file mode 100644 index 000000000..71602c1dc --- /dev/null +++ b/src/utils/ByteBuffer.hxx @@ -0,0 +1,96 @@ +/** \copyright + * Copyright (c) 2022, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file ByteBuffer.hxx + * + * Specialization of the Buffer / Pool infrastructure for untyped data + * stream. See { \link doc/byte_buffer.md } + * + * @author Balazs Racz + * @date 17 Apr 2022 + */ + +#ifndef _UTILS_BYTEBUFFER_HXX_ +#define _UTILS_BYTEBUFFER_HXX_ + +#include "utils/Buffer.hxx" + +/// This is how many bytes we have in each raw buffer allocation. +static constexpr unsigned RAWBUFFER_SIZE = 1024; + +/// Use this BufferPool to allocate raw buffers. +extern Pool* rawBufferPool; + +/// Container for holding an arbitrary untyped data stream. +struct RawData +{ + uint8_t payload[RAWBUFFER_SIZE]; +}; + +/// Buffers of this type will be allocated from the rawBufferPool to hold the +/// payloads of untyped data streams. These buffers are never enqueued into Q +/// or QList objects. +using RawBuffer = Buffer; + +/// Holds a reference to a raw buffer, with the start and size information. +struct ByteChunk +{ + /// Owns a ref for a RawData buffer. If this is nullptr, then the data + /// references by this chunk is externally owned. + BufferPtr ownedData_; + + /// Points to the first byte of the useful data. + uint8_t* data_ {nullptr}; + + /// How many bytes from data_ does this chunk represent. + size_t size_ {0}; + + /// @return number of bytes pointed to by this chunk. + size_t size() const + { + return size_; + } + + /// Moves forward the data beginning pointer by a given number of + /// bytes. Represents that some number of bytes were consumed by the sink. + /// @param num_bytes how much data was consumed. Must be <= size(). + void advance(size_t num_bytes) + { + HASSERT(num_bytes <= size_); + data_ += num_bytes; + size_ -= num_bytes; + } +}; + +/// Buffer type of references. These are enqueued for byte sinks. +using ByteBuffer = Buffer; + +template class FlowInterface; + +/// Interface for sending a stream of data from a source to a sink. +using ByteSink = FlowInterface; + +#endif // _UTILS_BYTEBUFFER_HXX_ From 12ca30158d472421b868f3d8ccfec785e8775628 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 18 Apr 2022 22:33:43 +0200 Subject: [PATCH 04/10] Adds code for sending stream data to a CAN network. --- src/openlcb/StreamSender.cxxtest | 50 +++++ src/openlcb/StreamSender.hxx | 362 +++++++++++++++++++++++++++++++ 2 files changed, 412 insertions(+) create mode 100644 src/openlcb/StreamSender.cxxtest create mode 100644 src/openlcb/StreamSender.hxx diff --git a/src/openlcb/StreamSender.cxxtest b/src/openlcb/StreamSender.cxxtest new file mode 100644 index 000000000..4611e605f --- /dev/null +++ b/src/openlcb/StreamSender.cxxtest @@ -0,0 +1,50 @@ +/** \copyright + * Copyright (c) 2022, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file StreamSender.cxxtest + * + * Unit tests for the StreamSender CAN module. + * + * @author Balazs Racz + * @date 18 Apr 2022 + */ + +#include "openlcb/StreamSender.hxx" + +#include "utils/async_if_test_helper.hxx" + +namespace openlcb +{ + +class StreamSenderTest : public AsyncNodeTest +{ +protected: +}; + +TEST_F(StreamSenderTest, create) +{ } + +} // namespace openlcb diff --git a/src/openlcb/StreamSender.hxx b/src/openlcb/StreamSender.hxx new file mode 100644 index 000000000..d126f9ee4 --- /dev/null +++ b/src/openlcb/StreamSender.hxx @@ -0,0 +1,362 @@ +/** \copyright + * Copyright (c) 2022, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file StreamSender.hxx + * + * Implementation flow for the Stream Service that sends data to a remote + * destination using the stream protocol. + * + * @author Balazs Racz + * @date 17 Apr 2022 + */ + +#ifndef _OPENLCB_STREAMSENDER_HXX_ +#define _OPENLCB_STREAMSENDER_HXX_ + +#include "executor/StateFlow.hxx" +#include "openlcb/IfCan.hxx" +#include "openlcb/CanDefs.hxx" +#include "openlcb/StreamDefs.hxx" +#include "openlcb/DatagramDefs.hxx" +#include "utils/ByteBuffer.hxx" +#include "utils/LimitedPool.hxx" + +namespace openlcb +{ + +/// Helper class for sending stream data to a CAN interface. +/// @todo add progress report API. +class StreamSenderCan : public StateFlow > +{ +public: + Action entry() + { + if (requestInit_) { + /// @todo get the destination address somehow. + requestInit_ = 0; + return call_immediately(STATE(initiate_stream)); + } + if (!streamWindowRemaining_) + { + // We ran out of the current stream window size. + return wait_and_call(STATE(wait_for_stream_proceed)); + } + if (!remaining()) + { + // We ran out of the current chunk of stream payload from the + // source. + return release_and_exit(); + } + return call_immediately(STATE(allocate_can_buffer)); + } + +private: + Action initiate_stream() + { + return allocate_and_call( + node_->iface()->addressed_message_write_flow(), + STATE(send_init_stream)); + } + + Action send_init_stream() + { + auto *b = get_allocation_result( + node_->iface()->addressed_message_write_flow()); + b->data()->reset(Defs::MTI_STREAM_INITIATE_REQUEST, node_->node_id(), + dst_, + StreamDefs::create_initiate_request( + StreamDefs::MAX_PAYLOAD, false, localStreamId_)); + + node_->iface()->dispatcher()->register_handler( + &streamInitiateReplyHandler_, Defs::MTI_STREAM_INITIATE_REPLY, + Defs::MTI_EXACT); + + node_->iface()->addressed_message_write_flow()->send(b); + sleeping_ = true; + return sleep_and_call(&timer_, SEC_TO_NSEC(STREAM_INIT_TIMEOUT_SEC), + STATE(received_init_stream)); + } + + void stream_initiate_replied(Buffer *message) + { + auto rb = get_buffer_deleter(message); + if (message->data()->dstNode != node_ || + !node_->iface()->matching_node(dst_, message->data()->src)) + { + // Not for me. + return; + } + const auto &payload = message->data()->payload; + if (payload.size() < 6 || payload[4] != localStreamId_) + { + // Talking about another stream or incorrect data. + return; + } + dstStreamId_ = payload[5]; + streamFlags_ = payload[2]; + streamAdditionalFlags_ = payload[3]; + streamWindowSize_ = (payload[0] << 8) | payload[1]; + + // We save the remote alias here if we haven't got any yet. + if (message->data()->src.alias) + { + dst_.alias = message->data()->src.alias; + } + timer_.trigger(); + } + + Action received_init_stream() + { + sleeping_ = false; + node_->iface()->dispatcher()->unregister_handler( + &streamInitiateReplyHandler_, Defs::MTI_STREAM_INITIATE_REPLY, + Defs::MTI_EXACT); + if (!(streamFlags_ & StreamDefs::FLAG_ACCEPT)) + { + if (streamFlags_ & StreamDefs::FLAG_PERMANENT_ERROR) + { + return return_error( + DatagramDefs::PERMANENT_ERROR | streamAdditionalFlags_, + "Stream initiate request was denied (permanent error)."); + } + else + { + return return_error( + Defs::ERROR_TEMPORARY | streamAdditionalFlags_, + "Stream initiate request was denied (temporary error)."); + } + } + if (!streamWindowSize_) + { + return return_error(DatagramDefs::PERMANENT_ERROR, + "Inconsistency: zero buffer length but " + "accepted stream request."); + } + streamWindowRemaining_ = streamWindowSize_; + node_->iface()->dispatcher()->register_handler( + &streamProceedHandler_, Defs::MTI_STREAM_PROCEED, Defs::MTI_EXACT); + return entry(); + } + + /// Allocates a buffer for a CAN frame. + Action allocate_can_buffer() + { + return allocate_and_call( + ifCan_->frame_write_flow(), STATE(got_frame), &canFramePool_); + } + + /// Got a buffer for an output frame. + Action got_frame() + { + auto *b = get_allocation_result(ifCan_->frame_write_flow()); + + uint32_t can_id; + NodeAlias local_alias = + ifCan_->local_aliases()->lookup(node_->node_id()); + NodeAlias remote_alias = dst_.alias; + CanDefs::set_datagram_fields( + &can_id, local_alias, remote_alias, CanDefs::STREAM_DATA); + auto *frame = b->data()->mutable_frame(); + SET_CAN_FRAME_ID_EFF(*frame, can_id); + + size_t len = compute_next_can_length(); + + frame->can_dlc = len + 1; + frame->data[0] = dstStreamId_; + memcpy(&frame->data[1], payload(), len); + advance(len); + + ifCan_->frame_write_flow()->send(b); + return entry(); + } + + Action wait_for_stream_proceed() + { + if (streamWindowRemaining_) + { + // received early stream_proceed response + return call_immediately(STATE(stream_proceed_timeout)); + } + sleeping_ = true; + return sleep_and_call(&timer_, SEC_TO_NSEC(STREAM_PROCEED_TIMEOUT_SEC), + STATE(stream_proceed_timeout)); + } + + /// Callback from the handler flow. + void stream_proceed_received(Buffer *message) + { + auto rb = get_buffer_deleter(message); + if (message->data()->dstNode != node_ || + !node_->iface()->matching_node(dst_, message->data()->src)) + { + // Not for me. + return; + } + + const auto &payload = message->data()->payload; + if (payload.size() < 2 || payload[0] != localStreamId_) + { + // Talking about another stream or incorrect data. + return; + } + + /// @todo add progress callback API + + streamWindowRemaining_ += streamWindowSize_; + if (sleeping_) + { + timer_.trigger(); + } + } + + Action stream_proceed_timeout() + { + sleeping_ = false; + if (!streamWindowRemaining_) // no proceed arrived + { + /// @todo (balazs.racz) somehow merge these two actions: remember + /// that we timed out and close the stream. + return return_error(Defs::ERROR_TEMPORARY, + "Times out waiting for stream proceed message."); + //return call_immediately(STATE(close_stream)); + } + return entry(); + } + +private: + size_t compute_next_can_length() + { + size_t ret = remaining(); + // Cannot exceed CAN frame max payload. + if (ret > MAX_BYTES_PAYLOAD_PER_CAN_FRAME) + { + ret = MAX_BYTES_PAYLOAD_PER_CAN_FRAME; + } + // Cannot exceed remaining bytes in stream window. + if (ret > streamWindowRemaining_) + { + ret = streamWindowRemaining_; + } + return ret; + } + + /// @return the number of bytes available in the current chunk. + size_t remaining() + { + return message()->data()->size_; + } + + /// @return pointer to the beginning of the data to send. + uint8_t *payload() + { + return message()->data()->data_; + } + + /// Consumes a certain number of bytes from the beginning of the data to + /// send. + /// @param num_bytes how much data to consume. + void advance(size_t num_bytes) + { + message()->data()->advance(num_bytes); + totalByteCount_ += num_bytes; + streamWindowRemaining_ -= num_bytes; + } + + Action return_error(uint32_t code, string message) { + errorCode_ = code; + /// @todo mark that we are in an error state. + return release_and_exit(); + } + + /// How many seconds for waiting for a stream proceed before we give up + /// with a timeout. + static constexpr size_t STREAM_PROCEED_TIMEOUT_SEC = 20; + + /// How many seconds for waiting for a stream init before we give up + /// with a timeout. + static constexpr size_t STREAM_INIT_TIMEOUT_SEC = 20; + + /// How many bytes payload we can copy into a single CAN frame. + static constexpr size_t MAX_BYTES_PAYLOAD_PER_CAN_FRAME = 7; + + /// How many CAN frames should we allocate at a given time. + static constexpr size_t MAX_FRAMES_IN_FLIGHT = 4; + + /// How many bytes the allocation of a single CAN frame should be. + static constexpr size_t CAN_FRAME_ALLOC_SIZE = + sizeof(CanFrameWriteFlow::message_type); + + /// Handles incoming stream proceed messages. + MessageHandler::GenericHandler streamProceedHandler_ { + this, &StreamSenderCan::stream_proceed_received}; + /// Handles incoming stream initiate reply messages. + MessageHandler::GenericHandler streamInitiateReplyHandler_ { + this, &StreamSenderCan::stream_initiate_replied}; + + /// CAN-bus interface. + IfCan *ifCan_; + /// Which node are we sending the outgoing data from. This is a local + /// virtual node. + Node *node_; + /// Destination node that we are sending to. It is important that the alias + /// is filled in here. + NodeHandle dst_; + /// How many bytes we have transmitted in this stream so far. + size_t totalByteCount_; + /// Stream ID at the source node. @todo fill in + uint8_t localStreamId_; + /// Stream ID at the destination node. @todo fill in + uint8_t dstStreamId_; + /// True if we are waiting for the timer. + uint8_t sleeping_ : 1; + /// 1 if there is a pending close request. + uint8_t requestClose_ : 1; + /// 1 if there is a pending initialize request. + uint8_t requestInit_ : 1; + /// Flags from the remote node that we got in stream initiate reply + uint8_t streamFlags_; + /// More flags from the remote node that we got in stream initiate reply + uint8_t streamAdditionalFlags_; + /// Total stream window size. @todo fill in + uint16_t streamWindowSize_; + /// Remaining stream window size. @todo fill in + uint16_t streamWindowRemaining_; + /// When the stream process fails, this variable contains an error code. + uint32_t errorCode_; + /// Source of buffers for outgoing CAN frames. Limtedpool is allocating and + /// releasing to the mainBufferPool, but blocks when we exceed a certain + /// number of allocations until some buffers get freed. + LimitedPool canFramePool_ {CAN_FRAME_ALLOC_SIZE, MAX_FRAMES_IN_FLIGHT}; + /// Helper object for timeouts. + StateFlowTimer timer_ {this}; +}; + +class StreamRendererCan : public StateFlow> +{ }; + +} // namespace openlcb + +#endif // _OPENLCB_STREAMSENDER_HXX_ From 1ca71388f8b130f182f99c5d1d3c470d37d54cb9 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sun, 1 May 2022 11:15:20 +0200 Subject: [PATCH 05/10] Adds documentation. Fixes constants to be constexpr. --- src/openlcb/StreamDefs.hxx | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/openlcb/StreamDefs.hxx b/src/openlcb/StreamDefs.hxx index e7b781d52..4ed6241f6 100644 --- a/src/openlcb/StreamDefs.hxx +++ b/src/openlcb/StreamDefs.hxx @@ -39,7 +39,10 @@ namespace openlcb /// Static constants and helper functions for the OpenLCB streaming protocol. struct StreamDefs { - static const uint16_t MAX_PAYLOAD = 0xffff; + /// Maximum window size for stream send. + static constexpr uint16_t MAX_PAYLOAD = 0xffff; + /// This value is invalid as a source or destination stream ID. + static constexpr uint8_t INVALID_STREAM_ID = 0xff; enum Flags { @@ -59,9 +62,17 @@ struct StreamDefs REJECT_TEMPORARY_OUT_OF_ORDER = 0x40, }; - static Payload create_initiate_request(uint16_t max_buffer_size, - bool has_ident, - uint8_t src_stream_id) + /// Creates a Stream Initiate Request message payload. + /// + /// @param max_buffer_size value to propose as stream window size. + /// @param has_ident if true, sets the flag for carrying a source stream + /// ID. + /// @param src_stream_id source stream ID value. + /// + /// @return a Payload object for a GenMessage. + /// + static Payload create_initiate_request( + uint16_t max_buffer_size, bool has_ident, uint8_t src_stream_id) { Payload p(5, 0); p[0] = max_buffer_size >> 8; From 274a454d3a282fd6d335ec521c118b2f0264bf59 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sun, 1 May 2022 11:15:41 +0200 Subject: [PATCH 06/10] Adds a variable that tracks the state of the sender. Adds comments. --- src/openlcb/StreamSender.hxx | 153 ++++++++++++++++++++++++++++++----- 1 file changed, 135 insertions(+), 18 deletions(-) diff --git a/src/openlcb/StreamSender.hxx b/src/openlcb/StreamSender.hxx index d126f9ee4..9090318bd 100644 --- a/src/openlcb/StreamSender.hxx +++ b/src/openlcb/StreamSender.hxx @@ -37,10 +37,10 @@ #define _OPENLCB_STREAMSENDER_HXX_ #include "executor/StateFlow.hxx" -#include "openlcb/IfCan.hxx" #include "openlcb/CanDefs.hxx" -#include "openlcb/StreamDefs.hxx" #include "openlcb/DatagramDefs.hxx" +#include "openlcb/IfCan.hxx" +#include "openlcb/StreamDefs.hxx" #include "utils/ByteBuffer.hxx" #include "utils/LimitedPool.hxx" @@ -49,12 +49,86 @@ namespace openlcb /// Helper class for sending stream data to a CAN interface. /// @todo add progress report API. -class StreamSenderCan : public StateFlow > +class StreamSenderCan : public StateFlow> { public: + StreamSenderCan(Service *service, IfCan *iface, Node *node) + : StateFlow>(service) + , ifCan_(iface) + , node_(node) + { } + + /// Initiates using the stream sender. May be called only on idle stream + /// senders. + /// + /// @param dst Destination node ID to send the stream to. + /// @param source_stream_id 8-bit stream ID to use on the this (the source) + /// side. + /// + /// @return *this for calling optional settings API commands. + /// + StreamSenderCan &start_stream(NodeHandle dst, uint8_t source_stream_id) + { + DASSERT(state_ == IDLE); + state_ = STARTED; + dst_ = dst; + totalByteCount_ = 0; + localStreamId_ = source_stream_id; + dstStreamId_ = 0xFF; + HASSERT(sleeping_ == false); + HASSERT(requestClose_ == 0); + requestInit_ = true; + trigger(); + streamFlags_ = 0; + streamAdditionalFlags_ = 0; + streamWindowSize_ = StreamDefs::MAX_PAYLOAD; + streamWindowRemaining_ = 0; + errorCode_ = 0; + return *this; + } + + /// Specifies what the source should propose as window size to the + /// destination. May be called only after start_stream. + /// + /// @param window_size in bytes, what should we propose in the stream + /// initiate call + /// + StreamSenderCan &set_proposed_window_size(uint16_t window_size) + { + HASSERT(state_ == STARTED); + streamWindowRemaining_ = window_size; + return *this; + } + + /// Specifies the Stream UID to send in the stream initiate request. May be + /// called only after start_stream. This function must be used if opening + /// an unannounced stream to a destination. + /// + /// @param stream_uid a valid 6-byte stream identifier. + /// + StreamSenderCan &set_stream_uid(NodeID stream_uid) + { + HASSERT(state_ == STARTED); + /// @todo implement opening unannounced streams. + return *this; + } + + /// Sets the stream sender to be available for reuse after a stream has + /// been closed or reached error. + void clear() + { + if (state_ == STATE_ERROR || state_ == CLOSING) + { + state_ = IDLE; + } + } + + /// Start of state machine, called when a buffer of data to send arrives + /// from the application layer. Action entry() { - if (requestInit_) { + if (requestInit_) + { /// @todo get the destination address somehow. requestInit_ = 0; return call_immediately(STATE(initiate_stream)); @@ -74,13 +148,24 @@ public: } private: + /// Sends an empty message to *this, thereby waking up the state machine. + void trigger() + { + auto *b = alloc(); + this->send(b); + } + + /// Allocates a GenMessage buffer and sends out the stream initiate message + /// to the destination. Action initiate_stream() { - return allocate_and_call( - node_->iface()->addressed_message_write_flow(), + // Grabs alias / node ID from the cache. + node_->iface()->canonicalize_handle(&dst_); + return allocate_and_call(node_->iface()->addressed_message_write_flow(), STATE(send_init_stream)); } + /// Sends the stream initiate message. Action send_init_stream() { auto *b = get_allocation_result( @@ -93,13 +178,16 @@ private: node_->iface()->dispatcher()->register_handler( &streamInitiateReplyHandler_, Defs::MTI_STREAM_INITIATE_REPLY, Defs::MTI_EXACT); - + node_->iface()->addressed_message_write_flow()->send(b); sleeping_ = true; + state_ = INITIATING; return sleep_and_call(&timer_, SEC_TO_NSEC(STREAM_INIT_TIMEOUT_SEC), STATE(received_init_stream)); } + /// Callback from GenHandler when a stream initiate reply message arrives + /// at the local interface. void stream_initiate_replied(Buffer *message) { auto rb = get_buffer_deleter(message); @@ -125,12 +213,14 @@ private: { dst_.alias = message->data()->src.alias; } + sleeping_ = false; timer_.trigger(); } + /// State executed after wakeup from the stream initiate reply received + /// handler. Action received_init_stream() { - sleeping_ = false; node_->iface()->dispatcher()->unregister_handler( &streamInitiateReplyHandler_, Defs::MTI_STREAM_INITIATE_REPLY, Defs::MTI_EXACT); @@ -158,17 +248,18 @@ private: streamWindowRemaining_ = streamWindowSize_; node_->iface()->dispatcher()->register_handler( &streamProceedHandler_, Defs::MTI_STREAM_PROCEED, Defs::MTI_EXACT); + state_ = RUNNING; return entry(); } - - /// Allocates a buffer for a CAN frame. + + /// Allocates a buffer for a CAN frame (for payload send). Action allocate_can_buffer() { return allocate_and_call( ifCan_->frame_write_flow(), STATE(got_frame), &canFramePool_); } - /// Got a buffer for an output frame. + /// Got a buffer for an output frame (payload send). Action got_frame() { auto *b = get_allocation_result(ifCan_->frame_write_flow()); @@ -193,6 +284,8 @@ private: return entry(); } + /// Starts sleeping until a proceed message arrives. Run this state when + /// streamWindowRemaining_ == 0. Action wait_for_stream_proceed() { if (streamWindowRemaining_) @@ -201,6 +294,7 @@ private: return call_immediately(STATE(stream_proceed_timeout)); } sleeping_ = true; + state_ = FULL; return sleep_and_call(&timer_, SEC_TO_NSEC(STREAM_PROCEED_TIMEOUT_SEC), STATE(stream_proceed_timeout)); } @@ -228,21 +322,22 @@ private: streamWindowRemaining_ += streamWindowSize_; if (sleeping_) { + sleeping_ = false; timer_.trigger(); } } Action stream_proceed_timeout() { - sleeping_ = false; if (!streamWindowRemaining_) // no proceed arrived { /// @todo (balazs.racz) somehow merge these two actions: remember /// that we timed out and close the stream. return return_error(Defs::ERROR_TEMPORARY, - "Times out waiting for stream proceed message."); - //return call_immediately(STATE(close_stream)); + "Timed out waiting for stream proceed message."); + // return call_immediately(STATE(close_stream)); } + state_ = RUNNING; return entry(); } @@ -285,12 +380,13 @@ private: streamWindowRemaining_ -= num_bytes; } - Action return_error(uint32_t code, string message) { + Action return_error(uint32_t code, string message) + { errorCode_ = code; - /// @todo mark that we are in an error state. + state_ = STATE_ERROR; return release_and_exit(); } - + /// How many seconds for waiting for a stream proceed before we give up /// with a timeout. static constexpr size_t STREAM_PROCEED_TIMEOUT_SEC = 20; @@ -298,7 +394,7 @@ private: /// How many seconds for waiting for a stream init before we give up /// with a timeout. static constexpr size_t STREAM_INIT_TIMEOUT_SEC = 20; - + /// How many bytes payload we can copy into a single CAN frame. static constexpr size_t MAX_BYTES_PAYLOAD_PER_CAN_FRAME = 7; @@ -309,6 +405,25 @@ private: static constexpr size_t CAN_FRAME_ALLOC_SIZE = sizeof(CanFrameWriteFlow::message_type); + /// Describes the different states in the stream sender. + enum StreamSenderState : uint8_t + { + /// This stream sender is not in use now + IDLE, + /// The local client has started using the stream sender (via API). + STARTED, + /// The stream initiate message was sent. + INITIATING, + /// Stream is open and data can be transferred. + RUNNING, + /// Stream buffer is full, waiting for proceed message. + FULL, + /// Stream close message was sent. + CLOSING, + /// An error occurred. + STATE_ERROR + }; + /// Handles incoming stream proceed messages. MessageHandler::GenericHandler streamProceedHandler_ { this, &StreamSenderCan::stream_proceed_received}; @@ -326,6 +441,8 @@ private: NodeHandle dst_; /// How many bytes we have transmitted in this stream so far. size_t totalByteCount_; + /// What state the current class is in. + StreamSenderState state_ {IDLE}; /// Stream ID at the source node. @todo fill in uint8_t localStreamId_; /// Stream ID at the destination node. @todo fill in From f08f90f3b1b72e3a72bc6ae12f88347a86c9460b Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sun, 1 May 2022 19:20:26 +0200 Subject: [PATCH 07/10] Adds tests to the stream sender. Fixes some bugs. Adds helper functions for RawBuffer. --- src/openlcb/StreamSender.cxxtest | 148 ++++++++++++++++++++++- src/openlcb/StreamSender.hxx | 87 +++++++++---- src/utils/ByteBuffer.hxx | 35 +++++- src/utils/async_datagram_test_helper.hxx | 9 ++ 4 files changed, 246 insertions(+), 33 deletions(-) diff --git a/src/openlcb/StreamSender.cxxtest b/src/openlcb/StreamSender.cxxtest index 4611e605f..82e1e19fc 100644 --- a/src/openlcb/StreamSender.cxxtest +++ b/src/openlcb/StreamSender.cxxtest @@ -34,17 +34,159 @@ #include "openlcb/StreamSender.hxx" -#include "utils/async_if_test_helper.hxx" +#include "utils/async_datagram_test_helper.hxx" namespace openlcb { -class StreamSenderTest : public AsyncNodeTest +class StreamSenderTest : public TwoNodeDatagramTest { protected: + ~StreamSenderTest() + { + shutdown(); + } + + void shutdown() + { + do + { + wait(); + } while (sender_.shutdown()); + } + + /// Starts a stream, sets expectations and acceptsthe stream on the + /// destination side. At the end, sets strict expectations for the rest of + /// the test. + /// @param max_buffer the number of bytes to accept as window size. + void setup_helper(uint16_t max_buffer) + { + clear_expect(true); + expect_packet(":X19CC822AN0225FFFF0000AA;"); + sender_.start_stream(other_handle(), 0xaa); + wait(); + EXPECT_EQ(StreamSender::INITIATING, sender_.get_state()); + // Accepts with buffer size of 8 bytes. + send_packet(StringPrintf(":X19868225N022A%04x8000AA55;", max_buffer)); + wait(); + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + // No error. + EXPECT_EQ(0, sender_.get_error()); + clear_expect(true); + } + + /// Sends a single buffer of data to the stream sender. + /// + /// @param data bytes payload to send. + /// @param use_buf if true, we allocate a 1k buffer, if false, we use + /// externally owned bytes by a string. + /// + void send_bytes(string data, bool use_buf = false) + { + auto *chunk = sender_.alloc(); + if (use_buf) + { + RawBuffer *buf; + rawBufferPool->alloc(&buf); + HASSERT(buf); + HASSERT(data.size() <= buf->data()->MAX_SIZE); + memcpy(buf->data()->payload, data.data(), data.size()); + chunk->data()->set_from(get_buffer_deleter(buf), data.size()); + } + else + { + ownedPayload_.emplace_back(new string(data)); + chunk->data()->set_from(ownedPayload_.back().get()); + } + + sender_.send(chunk); + } + + StreamSenderCan sender_ {&g_service, ifCan_.get(), node_}; + /// Temporary storage of externally owned payload. + vector> ownedPayload_; }; TEST_F(StreamSenderTest, create) -{ } +{ + EXPECT_EQ(StreamSender::IDLE, sender_.get_state()); +} + +TEST_F(StreamSenderTest, initiate) +{ + clear_expect(true); + EXPECT_EQ(StreamSender::IDLE, sender_.get_state()); + expect_packet(":X19CC822AN0225FFFF0000AA;"); + sender_.start_stream(other_handle(), 0xaa); + wait(); + EXPECT_EQ(StreamSender::INITIATING, sender_.get_state()); +} + +TEST_F(StreamSenderTest, initiate_with_bufsize) +{ + clear_expect(true); + EXPECT_EQ(StreamSender::IDLE, sender_.get_state()); + expect_packet(":X19CC822AN0225EF320000AA;"); + sender_.start_stream(other_handle(), 0xaa).set_proposed_window_size(0xef32); + wait(); + EXPECT_EQ(StreamSender::INITIATING, sender_.get_state()); +} + +TEST_F(StreamSenderTest, initiate_rejected) +{ + clear_expect(true); + expect_packet(":X19CC822AN0225FFFF0000AA;"); + sender_.start_stream(other_handle(), 0xaa); + wait(); + EXPECT_EQ(StreamSender::INITIATING, sender_.get_state()); + // Rejects with a weird error code. + send_packet(":X19868225N022A00004220AA55;"); + wait(); + EXPECT_EQ(StreamSender::STATE_ERROR, sender_.get_state()); + // Translated into a permanent error. + EXPECT_EQ(0x1020, sender_.get_error()); +} + +TEST_F(StreamSenderTest, initiate_accepted) +{ + setup_helper(8); +} + +TEST_F(StreamSenderTest, initiate_accepted_send_data) +{ + setup_helper(8); + + expect_packet(":X1F22522AN55414243;"); + send_bytes("ABC"); + wait(); + clear_expect(true); +} + +TEST_F(StreamSenderTest, buffer_full) +{ + setup_helper(12); + + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + + expect_packet(":X1F22522AN55414243;"); + send_bytes("ABC"); + wait(); + clear_expect(true); + + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + + expect_packet(":X1F22522AN554445464748494A;"); + expect_packet(":X1F22522AN554B4C;"); + send_bytes("DEFGHIJKL" "MNOPQ"); + wait(); + clear_expect(true); + + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + expect_packet(":X1F22522AN554D4E4F5051;"); + send_packet(":X19888225N022AAA55;"); + wait(); + clear_expect(true); +} } // namespace openlcb diff --git a/src/openlcb/StreamSender.hxx b/src/openlcb/StreamSender.hxx index 9090318bd..90742614f 100644 --- a/src/openlcb/StreamSender.hxx +++ b/src/openlcb/StreamSender.hxx @@ -47,13 +47,38 @@ namespace openlcb { +class StreamSender : public StateFlow> { +public: + StreamSender(Service* s) : StateFlow>(s) {} + + /// Describes the different states in the stream sender. + enum StreamSenderState : uint8_t + { + /// This stream sender is not in use now + IDLE, + /// The local client has started using the stream sender (via API). + STARTED, + /// The stream initiate message was sent. + INITIATING, + /// Stream is open and data can be transferred. + RUNNING, + /// Stream buffer is full, waiting for proceed message. + FULL, + /// Stream close message was sent. + CLOSING, + /// An error occurred. + STATE_ERROR + }; + +}; + /// Helper class for sending stream data to a CAN interface. /// @todo add progress report API. -class StreamSenderCan : public StateFlow> +class StreamSenderCan : public StreamSender { public: StreamSenderCan(Service *service, IfCan *iface, Node *node) - : StateFlow>(service) + : StreamSender(service) , ifCan_(iface) , node_(node) { } @@ -96,7 +121,7 @@ public: StreamSenderCan &set_proposed_window_size(uint16_t window_size) { HASSERT(state_ == STARTED); - streamWindowRemaining_ = window_size; + streamWindowSize_ = window_size; return *this; } @@ -123,6 +148,33 @@ public: } } +#ifdef GTEST + /// Requests to exit any timed operation. + /// @return true if a timer was woken up. + bool shutdown() + { + if (sleeping_) + { + timer_.trigger(); + sleeping_ = false; + return true; + } + return false; + } +#endif + + /// @return the state of this stream sender. + StreamSenderState get_state() + { + return state_; + } + + /// @return the error code if we got a rejection from the remote node. + uint16_t get_error() + { + return errorCode_; + } + /// Start of state machine, called when a buffer of data to send arrives /// from the application layer. Action entry() @@ -136,7 +188,7 @@ public: if (!streamWindowRemaining_) { // We ran out of the current stream window size. - return wait_and_call(STATE(wait_for_stream_proceed)); + return call_immediately(STATE(wait_for_stream_proceed)); } if (!remaining()) { @@ -173,7 +225,7 @@ private: b->data()->reset(Defs::MTI_STREAM_INITIATE_REQUEST, node_->node_id(), dst_, StreamDefs::create_initiate_request( - StreamDefs::MAX_PAYLOAD, false, localStreamId_)); + streamWindowSize_, false, localStreamId_)); node_->iface()->dispatcher()->register_handler( &streamInitiateReplyHandler_, Defs::MTI_STREAM_INITIATE_REPLY, @@ -194,12 +246,14 @@ private: if (message->data()->dstNode != node_ || !node_->iface()->matching_node(dst_, message->data()->src)) { + LOG(INFO, "stream reply not for me"); // Not for me. return; } const auto &payload = message->data()->payload; - if (payload.size() < 6 || payload[4] != localStreamId_) + if (payload.size() < 6 || (uint8_t)payload[4] != localStreamId_) { + LOG(INFO, "wrong stream ID %x %x", payload[4], localStreamId_); // Talking about another stream or incorrect data. return; } @@ -311,7 +365,7 @@ private: } const auto &payload = message->data()->payload; - if (payload.size() < 2 || payload[0] != localStreamId_) + if (payload.size() < 2 || (uint8_t)payload[0] != localStreamId_) { // Talking about another stream or incorrect data. return; @@ -405,25 +459,6 @@ private: static constexpr size_t CAN_FRAME_ALLOC_SIZE = sizeof(CanFrameWriteFlow::message_type); - /// Describes the different states in the stream sender. - enum StreamSenderState : uint8_t - { - /// This stream sender is not in use now - IDLE, - /// The local client has started using the stream sender (via API). - STARTED, - /// The stream initiate message was sent. - INITIATING, - /// Stream is open and data can be transferred. - RUNNING, - /// Stream buffer is full, waiting for proceed message. - FULL, - /// Stream close message was sent. - CLOSING, - /// An error occurred. - STATE_ERROR - }; - /// Handles incoming stream proceed messages. MessageHandler::GenericHandler streamProceedHandler_ { this, &StreamSenderCan::stream_proceed_received}; diff --git a/src/utils/ByteBuffer.hxx b/src/utils/ByteBuffer.hxx index 71602c1dc..c23af5879 100644 --- a/src/utils/ByteBuffer.hxx +++ b/src/utils/ByteBuffer.hxx @@ -42,12 +42,14 @@ static constexpr unsigned RAWBUFFER_SIZE = 1024; /// Use this BufferPool to allocate raw buffers. -extern Pool* rawBufferPool; +extern Pool *rawBufferPool; /// Container for holding an arbitrary untyped data stream. struct RawData { uint8_t payload[RAWBUFFER_SIZE]; + /// Maximum length that can be stored in a single RawBuffer. + static constexpr size_t MAX_SIZE = RAWBUFFER_SIZE; }; /// Buffers of this type will be allocated from the rawBufferPool to hold the @@ -61,9 +63,9 @@ struct ByteChunk /// Owns a ref for a RawData buffer. If this is nullptr, then the data /// references by this chunk is externally owned. BufferPtr ownedData_; - + /// Points to the first byte of the useful data. - uint8_t* data_ {nullptr}; + uint8_t *data_ {nullptr}; /// How many bytes from data_ does this chunk represent. size_t size_ {0}; @@ -83,12 +85,37 @@ struct ByteChunk data_ += num_bytes; size_ -= num_bytes; } + + /// Overwrites this chunk from a raw buffer. + /// + /// @param buf An owned share of a RawBuffer. + /// @param len How many bytes to take from this buffer. + /// @param ofs From which offset we should take these bytes (default 0, may + /// be omitted). + /// + void set_from(BufferPtr buf, size_t len, size_t ofs = 0) + { + ownedData_ = std::move(buf); + size_ = len; + data_ = ownedData_->data()->payload + ofs; + } + + /// Overwrites this chunk from a string. WARNING: the ownership of the + /// string is not transferred; the caller must make sure the string remains + /// alive as long as this Chunk is ever in use (including all copies). + void set_from(const string* data) + { + ownedData_.reset(); // no need for this anymore + size_ = data->size(); + data_ = (uint8_t*)data->data(); + } + }; /// Buffer type of references. These are enqueued for byte sinks. using ByteBuffer = Buffer; -template class FlowInterface; +template class FlowInterface; /// Interface for sending a stream of data from a source to a sink. using ByteSink = FlowInterface; diff --git a/src/utils/async_datagram_test_helper.hxx b/src/utils/async_datagram_test_helper.hxx index 5b0369d39..e7ba8a30f 100644 --- a/src/utils/async_datagram_test_helper.hxx +++ b/src/utils/async_datagram_test_helper.hxx @@ -82,6 +82,15 @@ protected: OTHER_NODE_ALIAS = 0x225, }; + /// @return NodeHandle for the other node. + NodeHandle other_handle() + { + NodeHandle ret; + ret.id = OTHER_NODE_ID; + ret.alias = OTHER_NODE_ALIAS; + return ret; + } + /// @param separate_if defines which mode the test base should operate /// in. false = mode 1 (one interface, two virtual nodes); true = mode 2 /// (two interfaces). From b2fad4aaaa13bcaedf73cebfbca25174304d1588 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 2 May 2022 11:59:38 +0200 Subject: [PATCH 08/10] Adds more complex test cases. Verifies the stream state when getting data. Drops all data after an error. --- src/openlcb/StreamSender.cxxtest | 70 ++++++++++++++++++++++++++++++++ src/openlcb/StreamSender.hxx | 8 ++++ 2 files changed, 78 insertions(+) diff --git a/src/openlcb/StreamSender.cxxtest b/src/openlcb/StreamSender.cxxtest index 82e1e19fc..e53e9aefa 100644 --- a/src/openlcb/StreamSender.cxxtest +++ b/src/openlcb/StreamSender.cxxtest @@ -49,6 +49,7 @@ protected: void shutdown() { + LOG(INFO, "shutdown."); do { wait(); @@ -189,4 +190,73 @@ TEST_F(StreamSenderTest, buffer_full) clear_expect(true); } +/// This test sends multiple chunks ahead of time to the queue, then simulates +/// the remote end to trickle out the data. +TEST_F(StreamSenderTest, queueing) +{ + setup_helper(8); + + expect_packet(":X1F22522AN5530313233343536;"); + expect_packet(":X1F22522AN5537;"); + send_bytes("012345678"); + wait(); + clear_expect(true); + + string p; + for (unsigned i = 0; i < 40; i++) + { + p.push_back(i); + } + + send_bytes(p.substr(0, 10)); + send_bytes(p.substr(10, 10)); + send_bytes(p.substr(20, 5)); + + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + // The sender never puts data into a single CAN frame from multiple inbound + // messages. Thus we have a partial frame first here. + expect_packet(":X1F22522AN5538;"); + expect_packet(":X1F22522AN5500010203040506;"); + + send_packet(":X19888225N022AAA55;"); + wait(); + + clear_expect(true); + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + expect_packet(":X1F22522AN55070809;"); + expect_packet(":X1F22522AN550a0b0c0d0e;"); + + send_packet(":X19888225N022AAA55;"); + wait(); + + clear_expect(true); + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + expect_packet(":X1F22522AN550f10111213;"); + expect_packet(":X1F22522AN55141516;"); + + send_packet(":X19888225N022AAA55;"); + wait(); + + clear_expect(true); + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + expect_packet(":X1F22522AN551718;"); + + send_packet(":X19888225N022AAA55;"); + wait(); + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + + // If we send a chunk now, it will be output immediately. + expect_packet(":X1F22522AN55191a;"); + send_bytes(p.substr(25, 2)); + wait(); + + clear_expect(true); + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); +} + + } // namespace openlcb diff --git a/src/openlcb/StreamSender.hxx b/src/openlcb/StreamSender.hxx index 90742614f..5828c6fef 100644 --- a/src/openlcb/StreamSender.hxx +++ b/src/openlcb/StreamSender.hxx @@ -185,6 +185,12 @@ public: requestInit_ = 0; return call_immediately(STATE(initiate_stream)); } + if (state_ == STATE_ERROR) + { + LOG(INFO, "dropping data due to error."); + return release_and_exit(); + } + DASSERT(state_ == RUNNING); if (!streamWindowRemaining_) { // We ran out of the current stream window size. @@ -396,6 +402,7 @@ private: } private: + /// @return how many bytes of data we can put into the next CAN frame. size_t compute_next_can_length() { size_t ret = remaining(); @@ -436,6 +443,7 @@ private: Action return_error(uint32_t code, string message) { + LOG(INFO, "error %x: %s", code, message.c_str()); errorCode_ = code; state_ = STATE_ERROR; return release_and_exit(); From 8d762f14300082d0d2db90fe5bf88602227be63a Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 2 May 2022 12:59:49 +0200 Subject: [PATCH 09/10] Adds implementation of total byte count send. Adds implementation of close stream. Adds tests for close stream. --- src/openlcb/StreamDefs.hxx | 24 ++++++++- src/openlcb/StreamSender.cxxtest | 91 +++++++++++++++++++++++++++++++- src/openlcb/StreamSender.hxx | 39 ++++++++++++-- 3 files changed, 147 insertions(+), 7 deletions(-) diff --git a/src/openlcb/StreamDefs.hxx b/src/openlcb/StreamDefs.hxx index 4ed6241f6..f5a6d11ca 100644 --- a/src/openlcb/StreamDefs.hxx +++ b/src/openlcb/StreamDefs.hxx @@ -43,6 +43,9 @@ struct StreamDefs static constexpr uint16_t MAX_PAYLOAD = 0xffff; /// This value is invalid as a source or destination stream ID. static constexpr uint8_t INVALID_STREAM_ID = 0xff; + /// Supply this value to the total byte count in stream close to mark it as + /// invalid. + static constexpr uint32_t INVALID_TOTAL_BYTE_COUNT = 0xffffffff; enum Flags { @@ -83,11 +86,28 @@ struct StreamDefs return p; } - static Payload create_close_request(uint8_t src_stream_id, uint8_t dst_stream_id) + /// Creates the payload for a stream close message. + /// + /// @param src_stream_id 1-byte SID stream identifier at the source side + /// @param dst_stream_id 1-byte SID stream identifier at the dst side + /// @param total_bytes if nonzero, specifies how many bytes were + /// transferred in the stream in total. + /// + /// @return a Payload object for GenMessage. + /// + static Payload create_close_request(uint8_t src_stream_id, + uint8_t dst_stream_id, uint32_t total_bytes = INVALID_TOTAL_BYTE_COUNT) { - Payload p(2, 0); + Payload p(total_bytes != INVALID_TOTAL_BYTE_COUNT ? 6 : 2, 0); p[0] = src_stream_id; p[1] = dst_stream_id; + if (total_bytes != INVALID_TOTAL_BYTE_COUNT) + { + p[2] = (total_bytes >> 24) & 0xff; + p[3] = (total_bytes >> 16) & 0xff; + p[4] = (total_bytes >> 8) & 0xff; + p[5] = (total_bytes >> 0) & 0xff; + } return p; } }; diff --git a/src/openlcb/StreamSender.cxxtest b/src/openlcb/StreamSender.cxxtest index e53e9aefa..19f508470 100644 --- a/src/openlcb/StreamSender.cxxtest +++ b/src/openlcb/StreamSender.cxxtest @@ -190,8 +190,8 @@ TEST_F(StreamSenderTest, buffer_full) clear_expect(true); } -/// This test sends multiple chunks ahead of time to the queue, then simulates -/// the remote end to trickle out the data. +// This test sends multiple chunks ahead of time to the queue, then simulates +// the remote end to trickle out the data. TEST_F(StreamSenderTest, queueing) { setup_helper(8); @@ -258,5 +258,92 @@ TEST_F(StreamSenderTest, queueing) EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); } +// Empty stream. +TEST_F(StreamSenderTest, close_empty) +{ + setup_helper(8); + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + + expect_packet(":X198A822AN0225AA5500000000;"); + sender_.close_stream(); + wait(); + clear_expect(true); + + EXPECT_EQ(StreamSender::CLOSING, sender_.get_state()); +} + +// Sends some data then closes the stream. +TEST_F(StreamSenderTest, send_close) +{ + setup_helper(8); + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + + expect_packet(":X1F22522AN5530313233;"); + send_bytes("0123"); + wait(); + clear_expect(true); + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + + expect_packet(":X198A822AN0225AA5500000004;"); + sender_.close_stream(); + wait(); + clear_expect(true); + + EXPECT_EQ(StreamSender::CLOSING, sender_.get_state()); +} + +// We have bytes in the queue that need to be sent off before the close +// message, but exactly filling one buffer. So the repsonse to a stream proceed +// will be complete. +TEST_F(StreamSenderTest, full_close) +{ + setup_helper(2); // very short window + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + + expect_packet(":X1F22522AN553031;"); + send_bytes("0123"); + sender_.close_stream(); + + wait(); + clear_expect(true); + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + expect_packet(":X1F22522AN553233;"); + send_packet(":X19888225N022AAA55;"); + wait(); + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + expect_packet(":X198A822AN0225AA5500000004;"); + send_packet(":X19888225N022AAA55;"); + wait(); + clear_expect(true); + + EXPECT_EQ(StreamSender::CLOSING, sender_.get_state()); +} + +// We have bytes in the queue that need to be sent off before the close message. +TEST_F(StreamSenderTest, queued_close) +{ + ::testing::InSequence seq; + setup_helper(2); // very short window + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + + expect_packet(":X1F22522AN553031;"); + send_bytes("012"); + sender_.close_stream(); + + wait(); + clear_expect(true); + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + expect_packet(":X1F22522AN5532;"); + expect_packet(":X198A822AN0225AA5500000003;"); + send_packet(":X19888225N022AAA55;"); + wait(); + clear_expect(true); + + EXPECT_EQ(StreamSender::CLOSING, sender_.get_state()); +} + } // namespace openlcb diff --git a/src/openlcb/StreamSender.hxx b/src/openlcb/StreamSender.hxx index 5828c6fef..d98cf6c4b 100644 --- a/src/openlcb/StreamSender.hxx +++ b/src/openlcb/StreamSender.hxx @@ -112,6 +112,13 @@ public: return *this; } + /// Closes the stream when all the bytes are transferred. + void close_stream() + { + requestClose_ = true; + trigger(); + } + /// Specifies what the source should propose as window size to the /// destination. May be called only after start_stream. /// @@ -185,9 +192,8 @@ public: requestInit_ = 0; return call_immediately(STATE(initiate_stream)); } - if (state_ == STATE_ERROR) + if (state_ == STATE_ERROR || state_ == CLOSING) { - LOG(INFO, "dropping data due to error."); return release_and_exit(); } DASSERT(state_ == RUNNING); @@ -200,6 +206,11 @@ public: { // We ran out of the current chunk of stream payload from the // source. + if (requestClose_ && queue_empty()) + { + requestClose_ = false; + return call_immediately(STATE(do_close_stream)); + } return release_and_exit(); } return call_immediately(STATE(allocate_can_buffer)); @@ -243,7 +254,7 @@ private: return sleep_and_call(&timer_, SEC_TO_NSEC(STREAM_INIT_TIMEOUT_SEC), STATE(received_init_stream)); } - + /// Callback from GenHandler when a stream initiate reply message arrives /// at the local interface. void stream_initiate_replied(Buffer *message) @@ -312,6 +323,28 @@ private: return entry(); } + /// Allocates a GenMessage buffer and sends out the stream close message + /// to the destination. + Action do_close_stream() + { + return allocate_and_call(node_->iface()->addressed_message_write_flow(), + STATE(send_close_stream)); + } + + /// Sends the stream close message. + Action send_close_stream() + { + auto *b = get_allocation_result( + node_->iface()->addressed_message_write_flow()); + b->data()->reset(Defs::MTI_STREAM_COMPLETE, node_->node_id(), dst_, + StreamDefs::create_close_request( + localStreamId_, dstStreamId_, totalByteCount_)); + + node_->iface()->addressed_message_write_flow()->send(b); + state_ = CLOSING; + return entry(); + } + /// Allocates a buffer for a CAN frame (for payload send). Action allocate_can_buffer() { From e448cfa92f1fffd6965862f7c0a00d5fdc1ac360 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 16 Dec 2022 17:27:15 +0100 Subject: [PATCH 10/10] removes obsolete todo. --- src/openlcb/StreamSender.hxx | 1 - 1 file changed, 1 deletion(-) diff --git a/src/openlcb/StreamSender.hxx b/src/openlcb/StreamSender.hxx index d98cf6c4b..0f7ad4532 100644 --- a/src/openlcb/StreamSender.hxx +++ b/src/openlcb/StreamSender.hxx @@ -188,7 +188,6 @@ public: { if (requestInit_) { - /// @todo get the destination address somehow. requestInit_ = 0; return call_immediately(STATE(initiate_stream)); }