From f077f2d19856717419191bd939d91ced44406201 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Mon, 19 Dec 2022 17:10:20 +0100 Subject: [PATCH] Adds stream receiver (#685) - Adds StreamReceiverInterface, a transport-agnostic API for how the application can receive streams. - Adds StreamReceiverCan, a CAN-bus specific implementation that works in conjunction with IfCan. Misc fixes: - Adds support to LimitedPool for specifying what the underlying buffer pool should be. - Adds unique_ptr typedefs to byte buffer.hxx for RawBufferPtr and ByteBufferPtr. - Fixes bugs and uninitialized values in StreamSender. - Adds missing include guard to StreamDefs.hxx - Adds support for synchronous allocation from a pool directly into a BufferPtr. - Adds unit tests for ByteBuffer functionality. - Expands ByteChunk with some utility functions to support data sources. === * Adds stream receiver class skeleton and implementation for stream initiate handler. * Adds constant for stream window size default. * removes obsolete todo. * Fixes compilation for stream receiver. * Adds append API to bytebuffer chunk. * Adds data append code. Adds unit tests for bytebuffer. * Fix compilation error. * continue implementation of stream receiver. * Adds a synchronous allocation call directly into a BufferPtr. * Adds support for LimitedPool to take memory from custom pools instead of the mainBufferPool. * Adds typedefs with raw buffer ptr and byte chunk ptr. Adds support for determining how many free bytes are there in a raw buffer that we are filling right now. * Adds logic for receiving bytes in stream. Starts adding logic for stream complete handling. * Adds a transport-agnostic interface for stream receives. * Adds missing include guard * Completes rewrite for CallableFlow based interface. * Fixes some bugs. * Adds unit tests for stream receiver. * Fixes comments and more bugs. * Fixes an initialization bug in stream sender. * Adds a test which sends two streams at once. * Fix whitespace * Removes unnecessary logging. * removes completed todo --- doc/byte_stream.md | 2 +- include/nmranet_config.h | 4 + src/openlcb/CanDefs.hxx | 3 +- src/openlcb/StreamDefs.hxx | 67 ++++- src/openlcb/StreamReceiver.cxx | 375 ++++++++++++++++++++++++ src/openlcb/StreamReceiver.cxxtest | 312 ++++++++++++++++++++ src/openlcb/StreamReceiver.hxx | 169 +++++++++++ src/openlcb/StreamReceiverInterface.hxx | 117 ++++++++ src/openlcb/StreamSender.cxxtest | 14 +- src/openlcb/StreamSender.hxx | 28 +- src/openlcb/nmranet_constants.cxx | 4 + src/openlcb/sources | 1 + src/utils/Buffer.hxx | 10 + src/utils/ByteBuffer.cxxtest | 69 +++++ src/utils/ByteBuffer.hxx | 63 +++- src/utils/LimitedPool.hxx | 21 +- 16 files changed, 1226 insertions(+), 33 deletions(-) create mode 100644 src/openlcb/StreamReceiver.cxx create mode 100644 src/openlcb/StreamReceiver.cxxtest create mode 100644 src/openlcb/StreamReceiver.hxx create mode 100644 src/openlcb/StreamReceiverInterface.hxx create mode 100644 src/utils/ByteBuffer.cxxtest diff --git a/doc/byte_stream.md b/doc/byte_stream.md index 5b2f4d394..dd9f34ddc 100644 --- a/doc/byte_stream.md +++ b/doc/byte_stream.md @@ -93,7 +93,7 @@ pool shall be set to `rawBufferPool`. ### Memory ownership / deallocation -`ByteChunk` contains an `BufferPtr`, which is a unique_ptr that +`ByteChunk` contains a `BufferPtr`, which is a unique_ptr that unref's the buffer upon the destructor. This represents the ownership of a reference to a `RawBuffer`. The destructor of `ByteChunk` will automatically release this reference. diff --git a/include/nmranet_config.h b/include/nmranet_config.h index 1e44f39e3..f5d0f9b1f 100644 --- a/include/nmranet_config.h +++ b/include/nmranet_config.h @@ -154,6 +154,10 @@ DECLARE_CONST(node_init_identify); * time. */ DECLARE_CONST(bulk_alias_num_can_frames); +/** Default number of bytes in maximum stream window size for { @ref + * StreamReceiver }. */ +DECLARE_CONST(stream_receiver_default_window_size); + /** Stack size for @ref SocketListener threads. */ DECLARE_CONST(socket_listener_stack_size); diff --git a/src/openlcb/CanDefs.hxx b/src/openlcb/CanDefs.hxx index 9f4fdd189..e8245a631 100644 --- a/src/openlcb/CanDefs.hxx +++ b/src/openlcb/CanDefs.hxx @@ -66,6 +66,7 @@ struct CanDefs { FRAME_TYPE_MASK = 0x08000000, /**< mask for frame type field of CAN ID */ PRIORITY_MASK = 0x10000000, /**< mask for priority field of CAN ID */ PADDING_MASK = 0xe0000000, /**< mask for padding field of CAN ID */ + STREAM_DG_RECV_MASK = 0x0fffffff, /**< mask for receiving datagram and stream frames. */ SRC_SHIFT = 0, /**< shift for source field of CAN ID */ MTI_SHIFT = 12, /**< shift for MTI field of CAN ID */ @@ -90,8 +91,6 @@ struct CanDefs { CONTROL_PADDING_SHIFT = 29 /**< pad out to a full 32-bit word */ }; - - // @TODO(balazs.racz) do we need this? typedef uint16_t CanMTI; diff --git a/src/openlcb/StreamDefs.hxx b/src/openlcb/StreamDefs.hxx index f5a6d11ca..95d1e7f02 100644 --- a/src/openlcb/StreamDefs.hxx +++ b/src/openlcb/StreamDefs.hxx @@ -31,7 +31,10 @@ * @date 14 December 2014 */ -#include "openlcb/If.hxx" +#ifndef _OPENLCB_STREAMDEFS_HXX_ +#define _OPENLCB_STREAMDEFS_HXX_ + +#include "openlcb/Defs.hxx" namespace openlcb { @@ -65,6 +68,16 @@ struct StreamDefs REJECT_TEMPORARY_OUT_OF_ORDER = 0x40, }; + /// This code is sent back in the error code field in the stream initiate + /// reply if the stream is accepted. + static constexpr uint16_t STREAM_ACCEPT = ((uint16_t)FLAG_ACCEPT) << 8; + + /// This code is sent back in the error code field in the stream initiate + /// reply if the stream is rejected with invalid arguments. + static constexpr uint16_t STREAM_ERROR_INVALID_ARGS = + (((uint16_t)FLAG_PERMANENT_ERROR) << 8) | + REJECT_PERMANENT_INVALID_REQUEST; + /// Creates a Stream Initiate Request message payload. /// /// @param max_buffer_size value to propose as stream window size. @@ -74,15 +87,57 @@ struct StreamDefs /// /// @return a Payload object for a GenMessage. /// - static Payload create_initiate_request( - uint16_t max_buffer_size, bool has_ident, uint8_t src_stream_id) + static Payload create_initiate_request(uint16_t max_buffer_size, + bool has_ident, uint8_t src_stream_id, + uint8_t dst_stream_id = INVALID_STREAM_ID) { - Payload p(5, 0); + Payload p(6, 0); p[0] = max_buffer_size >> 8; p[1] = max_buffer_size & 0xff; p[2] = has_ident ? FLAG_CARRIES_ID : 0; - p[3] = 0; + p[3] = 0; // flags + p[4] = src_stream_id; + p[5] = dst_stream_id; + return p; + } + + /// Creates a Stream Initiate Reply message payload. + /// + /// @param max_buffer_size the definite window size of the stream + /// @param src_stream_id stream ID on the source side. + /// @param dst_stream_id stream ID on the dst side. + /// @param error_code error code if the stream is rejected, otherwise + /// STREAM_ACCEPT if it is accepted. + /// + /// @return a Payload object for a GenMessage. + /// + static Payload create_initiate_response(uint16_t max_buffer_size, + uint8_t src_stream_id, uint8_t dst_stream_id, + uint16_t error_code = STREAM_ACCEPT) + { + Payload p(6, 0); + p[0] = max_buffer_size >> 8; + p[1] = max_buffer_size & 0xff; + p[2] = error_code >> 8; + p[3] = error_code & 0xff; p[4] = src_stream_id; + p[5] = dst_stream_id; + return p; + } + + /// Creates a Stream Data Proceed message payload. + /// + /// @param src_stream_id stream ID on the source side + /// @param dst_stream_id stream ID on the destination side + /// + /// @return Payload object for GenMessage + /// + static Payload create_data_proceed( + uint8_t src_stream_id, uint8_t dst_stream_id) + { + Payload p(2, 0); + p[0] = src_stream_id; + p[1] = dst_stream_id; return p; } @@ -113,3 +168,5 @@ struct StreamDefs }; } // namespace openlcb + +#endif // _OPENLCB_STREAMDEFS_HXX_ diff --git a/src/openlcb/StreamReceiver.cxx b/src/openlcb/StreamReceiver.cxx new file mode 100644 index 000000000..fea37fb9f --- /dev/null +++ b/src/openlcb/StreamReceiver.cxx @@ -0,0 +1,375 @@ +/** \copyright + * Copyright (c) 2022, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file StreamReceiver.cxx + * + * Implementation flow for the Stream Service that receives data to a remote + * source using the stream protocol. + * + * @author Balazs Racz + * @date 3 May 2022 + */ + +#include "openlcb/StreamReceiver.hxx" + +#include + +#include "nmranet_config.h" +#include "openlcb/CanDefs.hxx" +#include "openlcb/Defs.hxx" +#include "utils/format_utils.hxx" + +namespace openlcb +{ + +void StreamReceiverCan::announced_stream() +{ + if (!request()->streamWindowSize_) + { + request()->streamWindowSize_ = + config_stream_receiver_default_window_size(); + } + streamWindowRemaining_ = 0; + node()->iface()->dispatcher()->register_handler(&streamInitiateHandler_, + Defs::MTI_STREAM_INITIATE_REQUEST, Defs::MTI_EXACT); +} + +void StreamReceiverCan::send(Buffer *msg, unsigned prio) +{ + reset_message(msg, prio); + + if (request()->localStreamId_ == StreamDefs::INVALID_STREAM_ID) + { + request()->localStreamId_ = assignedStreamId_; + } + + if (!request()->target_) + { + // asking for stream ID. + request()->localStreamId_ = assignedStreamId_; + return_buffer(); + return; + } + + announced_stream(); + wait_for_wakeup(); +} + +void StreamReceiverCan::handle_stream_initiate(Buffer *message) +{ + auto rb = get_buffer_deleter(message); + + if (message->data()->dstNode != node() || + !node()->iface()->matching_node(request()->src_, message->data()->src)) + { + LOG(INFO, "stream init not for me"); + // Not for me. + return; + } + // Saves alias as well. + request()->src_ = message->data()->src; + const auto &payload = message->data()->payload; + uint16_t proposed_window; + uint8_t incoming_src_id = StreamDefs::INVALID_STREAM_ID; + if (payload.size() >= 5) + { + incoming_src_id = payload[4]; + if (request()->srcStreamId_ != StreamDefs::INVALID_STREAM_ID && + request()->srcStreamId_ != incoming_src_id) + { + LOG(INFO, "stream init ID not for me"); + // Not for me. + return; + } + request()->srcStreamId_ = incoming_src_id; + } + if (payload.size() < 5 || + incoming_src_id == StreamDefs::INVALID_STREAM_ID || + ((proposed_window = data_to_error(&payload[0])) == 0)) + { + LOG(INFO, "Incoming stream: invalid arguments."); + // Invalid arguments. This will synchronously allocate a buffer and + // send the message to the interface. + send_message(node(), Defs::MTI_STREAM_INITIATE_REPLY, + message->data()->src, + StreamDefs::create_initiate_response(0, incoming_src_id, + request()->localStreamId_, + StreamDefs::STREAM_ERROR_INVALID_ARGS)); + node()->iface()->dispatcher()->unregister_handler( + &streamInitiateHandler_, Defs::MTI_STREAM_INITIATE_REQUEST, + Defs::MTI_EXACT); + return; + } + if (proposed_window < request()->streamWindowSize_) + { + request()->streamWindowSize_ = proposed_window; + } + + streamWindowRemaining_ = request()->streamWindowSize_; + totalByteCount_ = 0; + + node()->iface()->dispatcher()->register_handler( + &streamCompleteHandler_, Defs::MTI_STREAM_COMPLETE, Defs::MTI_EXACT); + + node()->iface()->dispatcher()->unregister_handler(&streamInitiateHandler_, + Defs::MTI_STREAM_INITIATE_REQUEST, Defs::MTI_EXACT); + + pendingInit_ = 1; + notify(); +} + +void StreamReceiverCan::handle_bytes_received(const uint8_t *data, size_t len) +{ + while (len > 0) + { + if (!currentBuffer_) + { + // Need to allocate a new chunk first. + mainBufferPool->alloc(¤tBuffer_); + // Add an empty raw buffer to it. + RawBufferPtr rb; + if (streamWindowRemaining_ <= RawData::MAX_SIZE) + { + // We need to use the last raw buffer. + rb = std::move(lastBuffer_); + } + else + { + // We need a new (middle) raw buffer. + rawBufferPool->alloc(&rb); + } + currentBuffer_->data()->set_from(std::move(rb), 0); + } + size_t copied = currentBuffer_->data()->append(data, len); + data += copied; + len -= copied; + totalByteCount_ += copied; + if (copied <= streamWindowRemaining_) + { + streamWindowRemaining_ -= copied; + } + else + { + LOG(WARNING, "Unexpected stream bytes, window is negative."); + streamWindowRemaining_ = 0; + } + if (!currentBuffer_->data()->free_space() || !streamWindowRemaining_) + { + // Sends off the buffer and clears currentBuffer_. + request()->target_->send(currentBuffer_.release()); + } + } // while len > 0 + if (!streamWindowRemaining_) + { + // wake up state flow to send ack to the stream + notify(); + } +} + +void StreamReceiverCan::handle_stream_complete(Buffer *message) +{ + auto rb = get_buffer_deleter(message); + + if (message->data()->dstNode != node() || + !node()->iface()->matching_node(request()->src_, message->data()->src)) + { + LOG(INFO, "stream complete not for me"); + // Not for me. + return; + } + + if (message->data()->payload.size() < 2) + { + // Invalid arguments. Ignore. + return; + } + + if (((uint8_t)message->data()->payload[0]) != request()->srcStreamId_ || + ((uint8_t)message->data()->payload[1]) != request()->localStreamId_) + { + // Different stream. + LOG(INFO, "stream complete different stream"); + return; + } + + uint32_t total_size = StreamDefs::INVALID_TOTAL_BYTE_COUNT; + + if (message->data()->payload.size() >= 6) + { + memcpy(&total_size, message->data()->payload.data() + 2, 4); + total_size = be32toh(total_size); + } + + streamClosed_ = true; + + if (total_size != StreamDefs::INVALID_TOTAL_BYTE_COUNT) + { + // We have to wait for the remaining bytes to show up. + streamWindowRemaining_ = total_size - totalByteCount_; + } + else + { + streamWindowRemaining_ = 0; + } + + if (!streamWindowRemaining_) + { + // wake up the flow. + notify(); + } + + node()->iface()->dispatcher()->unregister_handler( + &streamCompleteHandler_, Defs::MTI_STREAM_COMPLETE, Defs::MTI_EXACT); +} + +class StreamReceiverCan::StreamDataHandler : public IncomingFrameHandler +{ +public: + StreamDataHandler(StreamReceiverCan *parent) + : parent_(parent) + { + } + + /// Starts registration for receiving stream data with the given aliases. + void start(NodeAlias remote_alias, NodeAlias local_alias) + { + HASSERT(remote_alias); + HASSERT(local_alias); + uint32_t frame_id = 0; + CanDefs::set_datagram_fields( + &frame_id, remote_alias, local_alias, CanDefs::STREAM_DATA); + parent_->if_can()->frame_dispatcher()->register_handler( + this, frame_id, CanDefs::STREAM_DG_RECV_MASK); + } + + /// Stops receiving stream data. + void stop() + { + parent_->if_can()->frame_dispatcher()->unregister_handler_all(this); + } + + /// Handler callback for incoming messages. + void send(Buffer *message, unsigned priority) override + { + auto rb = get_buffer_deleter(message); + + if (message->data()->can_dlc <= 0) + { + return; // no payload + } + if (message->data()->data[0] != parent_->request()->localStreamId_) + { + return; // different stream + } + parent_->handle_bytes_received( + message->data()->data + 1, message->data()->can_dlc - 1); + } + +private: + /// Owning stream receiver object. + StreamReceiverCan *parent_; +}; + +StreamReceiverCan::StreamReceiverCan(IfCan *interface, uint8_t local_stream_id) + : CallableFlow(interface) + , dataHandler_(new StreamDataHandler(this)) + , assignedStreamId_(local_stream_id) + , streamClosed_(0) + , pendingInit_(0) +{ +} + +StreamReceiverCan::~StreamReceiverCan() +{ +} + +StateFlowBase::Action StreamReceiverCan::wakeup() +{ + // Check reason for wakeup. + if (pendingInit_) + { + pendingInit_ = 0; + return call_immediately(STATE(init_reply)); + } + if (!streamWindowRemaining_) + { + if (streamClosed_) + { + streamClosed_ = 0; + dataHandler_->stop(); + if (currentBuffer_) + { + // Sends off the buffer and clears currentBuffer_. + request()->target_->send(currentBuffer_.release()); + } + return return_ok(); + } + // Need to send an ack. + return call_immediately(STATE(window_reached)); + } + return wait(); +} + +StateFlowBase::Action StreamReceiverCan::init_reply() +{ + // Initialize the last buffer for the first window. + return allocate_and_call( + nullptr, STATE(init_buffer_ready), &lastBufferPool_); +} + +StateFlowBase::Action StreamReceiverCan::init_buffer_ready() +{ + lastBuffer_.reset(get_allocation_result(nullptr)); + + node()->iface()->canonicalize_handle(&request()->src_); + NodeHandle local(node()->node_id()); + node()->iface()->canonicalize_handle(&local); + dataHandler_->start(request()->src_.alias, local.alias); + + send_message(node(), Defs::MTI_STREAM_INITIATE_REPLY, request()->src_, + StreamDefs::create_initiate_response(request()->streamWindowSize_, + request()->srcStreamId_, request()->localStreamId_)); + + return wait_for_wakeup(); +} + +StateFlowBase::Action StreamReceiverCan::window_reached() +{ + return allocate_and_call( + nullptr, STATE(have_raw_buffer), &lastBufferPool_); +} + +StateFlowBase::Action StreamReceiverCan::have_raw_buffer() +{ + lastBuffer_.reset(get_allocation_result(nullptr)); + streamWindowRemaining_ = request()->streamWindowSize_; + send_message(node(), Defs::MTI_STREAM_PROCEED, request()->src_, + StreamDefs::create_data_proceed( + request()->srcStreamId_, request()->localStreamId_)); + return wait_for_wakeup(); +} + +} // namespace openlcb diff --git a/src/openlcb/StreamReceiver.cxxtest b/src/openlcb/StreamReceiver.cxxtest new file mode 100644 index 000000000..d81d36030 --- /dev/null +++ b/src/openlcb/StreamReceiver.cxxtest @@ -0,0 +1,312 @@ +#include "openlcb/StreamReceiver.hxx" + +#include "openlcb/StreamSender.hxx" +#include "utils/async_datagram_test_helper.hxx" + +namespace openlcb +{ + +static constexpr uint8_t LOCAL_STREAM_ID = 0x3a; +static constexpr uint8_t SRC_STREAM_ID = 0xa7; + +string get_payload_data(size_t length) +{ + string r(length, 0); + for (size_t i = 0; i < length; ++i) + { + r[i] = i & 0xff; + } + return r; +} + +struct CollectData : public ByteSink +{ + /// Bytes that arrived so far. + string data; + /// Holds buffers. + Q q; + /// if true, the buffers are added to the queue instead of unref'ed. + bool keepBuffers_ {false}; + + void send(ByteBuffer *msg, unsigned prio) override + { + auto rb = get_buffer_deleter(msg); + data.append((char *)msg->data()->data_, msg->data()->size()); + if (keepBuffers_) + { + q.insert(msg->ref()); + } + } + + /// Takes a single element from the queue, and releases it. + string qtake() + { + ByteBuffer *b = (ByteBuffer *)q.next(0); + HASSERT(b != nullptr); + string ret((char *)b->data()->data_, b->data()->size()); + b->unref(); + return ret; + } +}; + +class StreamReceiverTestBase : public TwoNodeDatagramTest +{ +protected: + StreamReceiverTestBase() + { + setup_other_node(true); + wait(); + clear_expect(false); + run_x([this]() { ifCan_->send_global_alias_enquiry(node_); }); + wait(); + } +}; + +class StreamReceiverTest : public StreamReceiverTestBase +{ +protected: + StreamReceiverTest() + { + mainBufferPool->alloc(&recvRequest_); + } + + ~StreamReceiverTest() + { + do + { + wait(); + } while (sender_.shutdown()); + wait(); + } + + void invoke_receiver(uint8_t src_stream_id = StreamDefs::INVALID_STREAM_ID) + { + recvRequest_->data()->reset( + &sink_, node_, NodeHandle(otherNode_->node_id()), src_stream_id); + recvRequest_->data()->done.reset(&sn_); + run_x([this]() { receiver_.send(recvRequest_->ref()); }); + } + + void invoke_sender() + { + sender_.start_stream(NodeHandle(node_->node_id()), SRC_STREAM_ID); + } + + void send_data(size_t bytes) + { + dataSent_ = get_payload_data(bytes); + auto *b = sender_.alloc(); + b->data()->set_from(&dataSent_); + SyncNotifiable sn; + BarrierNotifiable bn(&sn); + b->set_done(&bn); + sender_.send(b); + sn.wait_for_notification(); + } + + void e2e_test(size_t bytes, int window_size = -1) + { + invoke_receiver(); + invoke_sender(); + if (window_size > 0) + { + sender_.set_proposed_window_size(window_size); + } + send_data(bytes); + sender_.close_stream(); + wait(); + EXPECT_EQ(dataSent_, sink_.data); + } + + StreamReceiverCan receiver_ {ifCan_.get(), LOCAL_STREAM_ID}; + SyncNotifiable sn_; + CollectData sink_; + BufferPtr recvRequest_; + + StreamSenderCan sender_ {&g_service, otherIfCan_.get(), otherNode_.get()}; + string dataSent_; +}; + +TEST_F(StreamReceiverTest, create) +{ +} + +TEST_F(StreamReceiverTest, get_id) +{ + auto rb = invoke_flow(&receiver_); + EXPECT_TRUE(rb->data()->done.is_done()); + EXPECT_EQ(LOCAL_STREAM_ID, rb->data()->localStreamId_); +} + +TEST_F(StreamReceiverTest, test_e2e_small) +{ + print_all_packets(); + clear_expect(false); + e2e_test(100); +} + +TEST_F(StreamReceiverTest, test_e2e_onewindow) +{ + e2e_test(2048); +} + +TEST_F(StreamReceiverTest, test_e2e_smallwindow) +{ + e2e_test(35, 35); +} + +TEST_F(StreamReceiverTest, test_e2e_smallwindow_frac) +{ + print_all_packets(); + e2e_test(45, 35); +} + +TEST_F(StreamReceiverTest, test_e2e_onewindow_plus_small) +{ + e2e_test(2048 + 5); +} + +TEST_F(StreamReceiverTest, test_e2e_multiwindow) +{ + e2e_test(3 * 2048); +} + +TEST_F(StreamReceiverTest, test_e2e_multiwindow_frac) +{ + e2e_test(3 * 2048 + 577); +} + +/// Tests when the stream receiver data sink is not consuming the data fast +/// enough. +TEST_F(StreamReceiverTest, blocked_sink) +{ + sink_.keepBuffers_ = true; + invoke_receiver(); + invoke_sender(); + sender_.set_proposed_window_size(2); // very short window + + dataSent_ = "abcdefghijk"; + auto *b = sender_.alloc(); + b->data()->set_from(&dataSent_); + BarrierNotifiable bn(EmptyNotifiable::DefaultInstance()); + b->set_done(&bn); + sender_.send(b); + + wait(); + + // First stop: after two window lengths. + EXPECT_EQ("abcd", sink_.data); + EXPECT_FALSE(bn.is_done()); + EXPECT_EQ(2u, sink_.q.pending()); + + // Takes one buffer entry. + string r = sink_.qtake(); + EXPECT_EQ("ab", r); + EXPECT_EQ(1u, sink_.q.pending()); + + wait(); // stream will backfill the buffer. + EXPECT_EQ("abcdef", sink_.data); // 6 total bytes now + EXPECT_FALSE(bn.is_done()); + EXPECT_EQ(2u, sink_.q.pending()); + + r = sink_.qtake(); + EXPECT_EQ("cd", r); + EXPECT_EQ(1u, sink_.q.pending()); + + wait(); // stream will backfill the buffer. + EXPECT_EQ("abcdefgh", sink_.data); // 8 total bytes now + EXPECT_FALSE(bn.is_done()); + EXPECT_EQ(2u, sink_.q.pending()); + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + r = sink_.qtake(); + EXPECT_EQ("ef", r); + EXPECT_EQ(1u, sink_.q.pending()); + + wait(); // stream will backfill the buffer. + EXPECT_EQ("abcdefghij", sink_.data); // 8 total bytes now + EXPECT_FALSE(bn.is_done()); + EXPECT_EQ(2u, sink_.q.pending()); + EXPECT_EQ(StreamSender::FULL, sender_.get_state()); + + r = sink_.qtake(); + EXPECT_EQ("gh", r); + EXPECT_EQ(1u, sink_.q.pending()); + + wait(); // Now the last byte was transmitted but close was not. + EXPECT_EQ("abcdefghij", sink_.data); + // The last buffer is not handed over to the queue yet. + EXPECT_EQ(1u, sink_.q.pending()); + + EXPECT_TRUE(bn.is_done()); + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); + + // Flushes the data in the buffer. + sender_.close_stream(); + wait(); + EXPECT_EQ(StreamSender::CLOSING, sender_.get_state()); + + EXPECT_EQ(2u, sink_.q.pending()); + EXPECT_EQ("abcdefghijk", sink_.data); + + EXPECT_EQ("ij", sink_.qtake()); + EXPECT_EQ("k", sink_.qtake()); + EXPECT_EQ(0u, sink_.q.pending()); +} + +/// Runs two streams at the same time. +TEST_F(StreamReceiverTest, two_streams) +{ + StreamSenderCan sender2 {&g_service, otherIfCan_.get(), otherNode_.get()}; + StreamReceiverCan receiver2 {ifCan_.get(), LOCAL_STREAM_ID + 1}; + CollectData sink2; + sink2.keepBuffers_ = true; + wait(); + + // Starts receiver 2. + auto rreq = invoke_flow_nowait(&receiver2, &sink2, node_, + NodeHandle(otherNode_->node_id()), SRC_STREAM_ID + 1); + + // Starts receiver 1. + invoke_receiver(SRC_STREAM_ID); + + wait(); + + // Starts sender 2. + sender2.start_stream(NodeHandle(node_->node_id()), SRC_STREAM_ID + 1) + .set_proposed_window_size(2); + + wait(); + + // Starts sender 1. + invoke_sender(); + + // Pumps data through sender 2. + string p = "abcdefghijk"; + auto *b = sender_.alloc(); + b->data()->set_from(&p); + sender2.send(b); + wait(); + + // Sender 2 is blocked. + EXPECT_EQ(2u, sink2.q.pending()); + + // Runs the entire sender 1. + send_data(3576); + sender_.close_stream(); + wait(); + + // Finishes sender 2. + sender2.close_stream(); + while (sink2.q.pending()) + { + sink2.qtake(); + wait(); + } + + // Verifies that the right data arrived. + EXPECT_EQ(dataSent_, sink_.data); + EXPECT_EQ(p, sink2.data); +} + +} // namespace openlcb diff --git a/src/openlcb/StreamReceiver.hxx b/src/openlcb/StreamReceiver.hxx new file mode 100644 index 000000000..6b303b4ea --- /dev/null +++ b/src/openlcb/StreamReceiver.hxx @@ -0,0 +1,169 @@ +/** \copyright + * Copyright (c) 2022, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file StreamReceiver.hxx + * + * Implementation flow for the Stream Service that receives data to a remote + * source using the stream protocol. + * + * @author Balazs Racz + * @date 3 May 2022 + */ + +#ifndef _OPENLCB_STREAMRECEIVER_HXX_ +#define _OPENLCB_STREAMRECEIVER_HXX_ + +#include "openlcb/StreamReceiverInterface.hxx" + +#include "openlcb/IfCan.hxx" +#include "openlcb/StreamDefs.hxx" +#include "utils/ByteBuffer.hxx" +#include "utils/LimitedPool.hxx" + +namespace openlcb +{ + +class StreamReceiverCan : public CallableFlow +{ +public: + /// Constructor. + /// + /// @param interface the CAN interface that owns this stream receiver. + /// @param local_stream_id what should be the local stream ID for the + /// streams used for this receiver. + StreamReceiverCan(IfCan *interface, uint8_t local_stream_id); + + ~StreamReceiverCan(); + + /// Implements the flow interface for the request API. This is not based on + /// entry() because the registration has to be synchrnous with the calling + /// of send(). + void send(Buffer *msg, unsigned prio = 0) override; + +private: + /// Helper function for send() when a stream has to start synchronously. + void announced_stream(); + + /// This state is not used, but it's virtual abstract. + Action entry() override + { + return return_ok(); + } + + Action wait_for_wakeup() + { + return wait_and_call(STATE(wakeup)); + } + + /// Root of the flow when something happens in the handlers. + Action wakeup(); + + /// Invoked when we get the stream initiate request. Initializes receive + /// buffers and sends stream init response. + Action init_reply(); + Action init_buffer_ready(); + + /// Invoked when the stream window runs out. Maybe waits for the data to be + /// consumed below the low-watermark. + Action window_reached(); + /// Called when the allocation of the raw buffer is successful. Sends off + /// the stream proceed message. + Action have_raw_buffer(); + + /// Invoked by the GenericHandler when a stream initiate message arrives. + /// + /// @param message buffer with stream initiate message. + /// + void handle_stream_initiate(Buffer *message); + + /// Handles data arriving from the network. + inline void handle_bytes_received(const uint8_t *data, size_t len); + + /// Invoked by the GenericHandler when a stream complete message arrives. + /// + /// @param message buffer with stream complete message. + /// + void handle_stream_complete(Buffer *message); + + /// @return the local CAN interface. + IfCan *if_can() + { + return static_cast(service()); + } + + /// @return the local node pointer. + Node *node() + { + return request()->dst_; + } + + /// Helper class for incoming message for stream initiate. + MessageHandler::GenericHandler streamInitiateHandler_ { + this, &StreamReceiverCan::handle_stream_initiate}; + + class StreamDataHandler; + friend class StreamDataHandler; + + /// Helper class for incoming message for stream complete. + MessageHandler::GenericHandler streamCompleteHandler_ { + this, &StreamReceiverCan::handle_stream_complete}; + + /// This pool is used to allocate one raw buffer per stream window + /// size. This pool therefore functions as a throttling for the data + /// producer. We have a fixed size of 2, meaning that we are allowing + /// ourselves to load 2x the stream window size into our RAM. + LimitedPool lastBufferPool_ {sizeof(RawBuffer), 2, rawBufferPool}; + + /// The buffer that we are currently filling with incoming data. + ByteBufferPtr currentBuffer_; + + /// The buffer that will be the last one in this stream window. This buffer + /// comes from the lastBufferPool_ to function as throttling signal. + RawBufferPtr lastBuffer_; + + /// Helper object that receives the actual stream CAN frames. + std::unique_ptr dataHandler_; + + /// How many bytes we have transmitted in this stream so far. + size_t totalByteCount_; + + /// Remaining stream window size. + uint16_t streamWindowRemaining_; + + /// Unique stream ID at the destination (local) node, assigned at + /// construction time. + const uint8_t assignedStreamId_; + + /// 1 if we received the stream complete message. + uint8_t streamClosed_ : 1; + /// 1 if we received the stream init request message. + uint8_t pendingInit_ : 1; + +}; // class StreamReceiver + +} // namespace openlcb + +#endif // _OPENLCB_STREAMRECEIVER_HXX_ diff --git a/src/openlcb/StreamReceiverInterface.hxx b/src/openlcb/StreamReceiverInterface.hxx new file mode 100644 index 000000000..c6c96eba3 --- /dev/null +++ b/src/openlcb/StreamReceiverInterface.hxx @@ -0,0 +1,117 @@ +/** \copyright + * Copyright (c) 2022, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file StreamReceiverInterface.hxx + * + * Transport-agnostic interface for receiving a stream from OpenLCB remote. + * + * @author Balazs Racz + * @date 18 Dec 2022 + */ + +#ifndef _OPENLCB_STREAMRECEIVERINTERFACE_HXX_ +#define _OPENLCB_STREAMRECEIVERINTERFACE_HXX_ + +#include "executor/CallableFlow.hxx" +#include "openlcb/Defs.hxx" +#include "openlcb/StreamDefs.hxx" + +template class FlowInterface; +template class Buffer; +class ByteChunk; +using ByteBuffer = Buffer; +using ByteSink = FlowInterface; + +namespace openlcb +{ + +class Node; + +struct StreamReceiveRequest : public CallableFlowRequestBase +{ + /// Gets a local stream ID. This will be returning the assigned local + /// stream ID from the stream receiver object. + void reset() + { + reset_base(); + target_ = nullptr; + localStreamId_ = StreamDefs::INVALID_STREAM_ID; + } + + /// Starts the stream receiver and prepares for an announced stream. This + /// is generally invoked by a handler of a higher level protocol where the + /// stream connection is arranged, such as the Memory Config Protocol. + /// + /// This call is processed synchronously. It is expected that shortly after + /// this call a stream init message will arrive to the local interface, + /// originating from the stream source node. + /// + /// @param src node handle of the source node that announced the stream. + /// @param src_stream_id stream ID on the source node side. It is possible + /// that this is not yet known at the time of this call, in which case + /// INVALID_STREAM_ID may be passed in. + /// @param dst_stream_id allocated stream ID at the local node. If it is + /// INVALID_STREAM_ID, then the assigned local ID is used by the stream + /// receiver. + /// @param max_window if non-zero, limits the maximum window size by the + /// local side. If zero, the default max window size will be taken from a + /// linker-time constant. + void reset(ByteSink *target, Node *dst, NodeHandle src, + uint8_t src_stream_id = StreamDefs::INVALID_STREAM_ID, + uint8_t dst_stream_id = StreamDefs::INVALID_STREAM_ID, + uint16_t max_window = 0) + { + reset_base(); + HASSERT(target); + target_ = target; + src_ = src; + dst_ = dst; + srcStreamId_ = src_stream_id; + localStreamId_ = dst_stream_id; + streamWindowSize_ = max_window; + } + + /// Where to send the incoming stream data. + ByteSink *target_ {nullptr}; + /// Remote node that will send us the stream. + NodeHandle src_ {0, 0}; + /// Local node for receiving the stream. + Node *dst_ {nullptr}; + /// Source (remote) stream ID. May be INVALID_STREAM_ID. + uint8_t srcStreamId_ {StreamDefs::INVALID_STREAM_ID}; + /// Local (target) stream ID. Must be valid. + uint8_t localStreamId_ {StreamDefs::INVALID_STREAM_ID}; + /// if non-zero, limits the maximum window size by the + /// local side. If zero, the default max window size will be taken from a + /// linker-time constant. + uint16_t streamWindowSize_ {0}; +}; + +using StreamReceiverInterface = FlowInterface>; + +} // namespace openlcb + +#endif // _OPENLCB_STREAMRECEIVERINTERFACE_HXX_ diff --git a/src/openlcb/StreamSender.cxxtest b/src/openlcb/StreamSender.cxxtest index 19f508470..16d0db5f1 100644 --- a/src/openlcb/StreamSender.cxxtest +++ b/src/openlcb/StreamSender.cxxtest @@ -166,7 +166,7 @@ TEST_F(StreamSenderTest, initiate_accepted_send_data) TEST_F(StreamSenderTest, buffer_full) { setup_helper(12); - + EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); expect_packet(":X1F22522AN55414243;"); @@ -175,10 +175,11 @@ TEST_F(StreamSenderTest, buffer_full) clear_expect(true); EXPECT_EQ(StreamSender::RUNNING, sender_.get_state()); - + expect_packet(":X1F22522AN554445464748494A;"); expect_packet(":X1F22522AN554B4C;"); - send_bytes("DEFGHIJKL" "MNOPQ"); + send_bytes("DEFGHIJKL" + "MNOPQ"); wait(); clear_expect(true); @@ -195,7 +196,7 @@ TEST_F(StreamSenderTest, buffer_full) TEST_F(StreamSenderTest, queueing) { setup_helper(8); - + expect_packet(":X1F22522AN5530313233343536;"); expect_packet(":X1F22522AN5537;"); send_bytes("012345678"); @@ -303,7 +304,7 @@ TEST_F(StreamSenderTest, full_close) expect_packet(":X1F22522AN553031;"); send_bytes("0123"); sender_.close_stream(); - + wait(); clear_expect(true); EXPECT_EQ(StreamSender::FULL, sender_.get_state()); @@ -331,7 +332,7 @@ TEST_F(StreamSenderTest, queued_close) expect_packet(":X1F22522AN553031;"); send_bytes("012"); sender_.close_stream(); - + wait(); clear_expect(true); EXPECT_EQ(StreamSender::FULL, sender_.get_state()); @@ -345,5 +346,4 @@ TEST_F(StreamSenderTest, queued_close) EXPECT_EQ(StreamSender::CLOSING, sender_.get_state()); } - } // namespace openlcb diff --git a/src/openlcb/StreamSender.hxx b/src/openlcb/StreamSender.hxx index 0f7ad4532..9b7e5620f 100644 --- a/src/openlcb/StreamSender.hxx +++ b/src/openlcb/StreamSender.hxx @@ -43,14 +43,19 @@ #include "openlcb/StreamDefs.hxx" #include "utils/ByteBuffer.hxx" #include "utils/LimitedPool.hxx" +#include "utils/format_utils.hxx" namespace openlcb { -class StreamSender : public StateFlow> { +class StreamSender : public StateFlow> +{ public: - StreamSender(Service* s) : StateFlow>(s) {} - + StreamSender(Service *s) + : StateFlow>(s) + { + } + /// Describes the different states in the stream sender. enum StreamSenderState : uint8_t { @@ -69,7 +74,6 @@ public: /// An error occurred. STATE_ERROR }; - }; /// Helper class for sending stream data to a CAN interface. @@ -81,7 +85,11 @@ public: : StreamSender(service) , ifCan_(iface) , node_(node) - { } + , sleeping_(false) + , requestClose_(false) + , requestInit_(false) + { + } /// Initiates using the stream sender. May be called only on idle stream /// senders. @@ -181,7 +189,7 @@ public: { return errorCode_; } - + /// Start of state machine, called when a buffer of data to send arrives /// from the application layer. Action entry() @@ -250,14 +258,17 @@ private: node_->iface()->addressed_message_write_flow()->send(b); sleeping_ = true; state_ = INITIATING; + LOG(VERBOSE, "wait for stream init reply"); return sleep_and_call(&timer_, SEC_TO_NSEC(STREAM_INIT_TIMEOUT_SEC), STATE(received_init_stream)); } - + /// Callback from GenHandler when a stream initiate reply message arrives /// at the local interface. void stream_initiate_replied(Buffer *message) { + LOG(VERBOSE, "stream init reply: %s", + string_to_hex(message->data()->payload).c_str()); auto rb = get_buffer_deleter(message); if (message->data()->dstNode != node_ || !node_->iface()->matching_node(dst_, message->data()->src)) @@ -291,6 +302,7 @@ private: /// handler. Action received_init_stream() { + LOG(VERBOSE, "stream init reply wait done"); node_->iface()->dispatcher()->unregister_handler( &streamInitiateReplyHandler_, Defs::MTI_STREAM_INITIATE_REPLY, Defs::MTI_EXACT); @@ -343,7 +355,7 @@ private: state_ = CLOSING; return entry(); } - + /// Allocates a buffer for a CAN frame (for payload send). Action allocate_can_buffer() { diff --git a/src/openlcb/nmranet_constants.cxx b/src/openlcb/nmranet_constants.cxx index 8476bf031..d4147a723 100644 --- a/src/openlcb/nmranet_constants.cxx +++ b/src/openlcb/nmranet_constants.cxx @@ -70,3 +70,7 @@ DEFAULT_CONST_TRUE(node_init_identify); /** How many CAN frames should the bulk alias allocator be sending at the same * time. */ DEFAULT_CONST(bulk_alias_num_can_frames, 20); + +/** Default number of bytes in maximum stream window size for { @ref + * StreamReceiver }. */ +DEFAULT_CONST(stream_receiver_default_window_size, 2 * 1024); diff --git a/src/openlcb/sources b/src/openlcb/sources index 6b385b444..77276a12e 100644 --- a/src/openlcb/sources +++ b/src/openlcb/sources @@ -44,6 +44,7 @@ CXXSRCS += \ SimpleNodeInfoResponse.cxx \ SimpleNodeInfoMockUserFile.cxx \ SimpleStack.cxx \ + StreamReceiver.cxx \ TractionTestTrain.cxx \ TractionProxy.cxx \ TcpDefs.cxx \ diff --git a/src/utils/Buffer.hxx b/src/utils/Buffer.hxx index a9eba72b8..766479844 100644 --- a/src/utils/Buffer.hxx +++ b/src/utils/Buffer.hxx @@ -299,6 +299,16 @@ public: } } + /** Get a free item out of the pool. This is a synchronous call. + * @param result Buffer pointer that will hold the result + */ + template void alloc(BufferPtr *result) + { + Buffer *p; + alloc(&p); + result->reset(p); + } + /** Get a free item out of the pool. * @param flow Executable to notify upon allocation */ diff --git a/src/utils/ByteBuffer.cxxtest b/src/utils/ByteBuffer.cxxtest new file mode 100644 index 000000000..130aec063 --- /dev/null +++ b/src/utils/ByteBuffer.cxxtest @@ -0,0 +1,69 @@ +#include "utils/ByteBuffer.hxx" + +#include "utils/test_main.hxx" + +/// Allocates a new 1-kbyte sized raw buffer. +BufferPtr alloc_raw() +{ + Buffer *b; + rawBufferPool->alloc(&b); + return get_buffer_deleter(b); +} + +TEST(ByteChunkTest, create) +{ + ByteChunk ch; +} + +TEST(ByteChunkTest, assign_advance) +{ + ByteChunk ch; + ch.set_from(alloc_raw(), 0); + EXPECT_EQ(1024u, ch.free_space()); + EXPECT_EQ(6u, ch.append("abcdef", 6)); + EXPECT_EQ(6u, ch.size()); + EXPECT_EQ(1018u, ch.free_space()); + EXPECT_EQ((uint8_t)'a', ch.data_[0]); + EXPECT_EQ((uint8_t)'b', ch.data_[1]); + EXPECT_EQ((uint8_t)'f', ch.data_[5]); + + ch.advance(1); + + EXPECT_EQ(5u, ch.size()); + EXPECT_EQ((uint8_t)'b', ch.data_[0]); + EXPECT_EQ((uint8_t)'f', ch.data_[4]); +} + +TEST(ByteChunkTest, assign_external) +{ + ByteChunk ch; + static const char TEST_DATA[] = "abcdef"; + ch.set_from(TEST_DATA, 6); + EXPECT_EQ(6u, ch.size()); + EXPECT_EQ((uint8_t)'a', ch.data_[0]); + EXPECT_EQ((uint8_t)'b', ch.data_[1]); + EXPECT_EQ((uint8_t)'f', ch.data_[5]); + + ch.advance(1); + + EXPECT_EQ(5u, ch.size()); + EXPECT_EQ((uint8_t)'b', ch.data_[0]); + EXPECT_EQ((uint8_t)'f', ch.data_[4]); +} + +TEST(ByteChunkTest, assign_string) +{ + ByteChunk ch; + string testdata = "abcdef"; + ch.set_from(&testdata); + EXPECT_EQ(6u, ch.size()); + EXPECT_EQ((uint8_t)'a', ch.data_[0]); + EXPECT_EQ((uint8_t)'b', ch.data_[1]); + EXPECT_EQ((uint8_t)'f', ch.data_[5]); + + ch.advance(1); + + EXPECT_EQ(5u, ch.size()); + EXPECT_EQ((uint8_t)'b', ch.data_[0]); + EXPECT_EQ((uint8_t)'f', ch.data_[4]); +} diff --git a/src/utils/ByteBuffer.hxx b/src/utils/ByteBuffer.hxx index c23af5879..7ae6caa2b 100644 --- a/src/utils/ByteBuffer.hxx +++ b/src/utils/ByteBuffer.hxx @@ -57,12 +57,15 @@ struct RawData /// or QList objects. using RawBuffer = Buffer; +/// Holds a raw buffer. +using RawBufferPtr = BufferPtr; + /// Holds a reference to a raw buffer, with the start and size information. struct ByteChunk { /// Owns a ref for a RawData buffer. If this is nullptr, then the data /// references by this chunk is externally owned. - BufferPtr ownedData_; + RawBufferPtr ownedData_; /// Points to the first byte of the useful data. uint8_t *data_ {nullptr}; @@ -93,7 +96,7 @@ struct ByteChunk /// @param ofs From which offset we should take these bytes (default 0, may /// be omitted). /// - void set_from(BufferPtr buf, size_t len, size_t ofs = 0) + void set_from(RawBufferPtr buf, size_t len, size_t ofs = 0) { ownedData_ = std::move(buf); size_ = len; @@ -103,17 +106,67 @@ struct ByteChunk /// Overwrites this chunk from a string. WARNING: the ownership of the /// string is not transferred; the caller must make sure the string remains /// alive as long as this Chunk is ever in use (including all copies). - void set_from(const string* data) + void set_from(const string *data) { ownedData_.reset(); // no need for this anymore size_ = data->size(); - data_ = (uint8_t*)data->data(); + data_ = (uint8_t *)data->data(); + } + + /// Overwrites this chunk from an externally owned memory area. The caller + /// must make sure the memory area remains alive as long as this Chunk is + /// ever in use (including all copies). + /// @param data payload to set into this buffer. Must stay alive. + /// @param len number of bytes to use from that source. + void set_from(const void *data, size_t len) + { + ownedData_.reset(); // no need for this anymore + data_ = (uint8_t *)data; + size_ = len; + } + + /// Adds more data to the end of the buffer. Requirement: this chunk must + /// be a data source, and there has to be an ownedData_ set. + /// @param data payload to copy + /// @param len how many bytes to add + /// @return number of bytes added; this is typically less than len when the + /// RawData buffer gets full. Can be zero. + size_t append(const void *data, size_t len) + { + HASSERT(ownedData_.get()); + uint8_t *end = data_ + size_; + uint8_t *max_end = ownedData_->data()->payload + RawData::MAX_SIZE; + size_t max_len = max_end - end; + if (max_len < len) + { + len = max_len; + } + memcpy(end, data, len); + size_ += len; + return len; + } + + /// @return how many free bytes are there in the underlying raw + /// buffer. This shall only be used by the source (who set ownedData_ to a + /// real raw buffer), and assumes that all bytes beyond the end are + /// free. Always zero if data is owned externally. + size_t free_space() + { + if (!ownedData_) + { + return 0; + } + uint8_t *end = data_ + size_; + uint8_t *max_end = ownedData_->data()->payload + RawData::MAX_SIZE; + size_t max_len = max_end - end; + return max_len; } - }; /// Buffer type of references. These are enqueued for byte sinks. using ByteBuffer = Buffer; +/// Buffer pointer type for references. +using ByteBufferPtr = BufferPtr; template class FlowInterface; diff --git a/src/utils/LimitedPool.hxx b/src/utils/LimitedPool.hxx index 0d4f0095a..8ea8c6bdc 100644 --- a/src/utils/LimitedPool.hxx +++ b/src/utils/LimitedPool.hxx @@ -39,10 +39,11 @@ #include "utils/Buffer.hxx" -/// Implementation of a Pool interface that takes memory from mainBufferPool but -/// limits the number of buffers allocatable. Later async allocations will be -/// blocked until an earlier buffer gets freed. Freed buffers go back to the -/// main buffer pool, so no memory is captive in this object. +/// Implementation of a Pool interface that takes memory from mainBufferPool +/// (configurable) but limits the number of buffers allocatable. Later async +/// allocations will be blocked until an earlier buffer gets freed. Freed +/// buffers go back to the underlying pool, so no memory is captive in this +/// object. class LimitedPool : public Pool, private Atomic { public: @@ -50,9 +51,13 @@ public: /// allocated. Usually sizeof(Buffer). /// @param entry_count max number of buffer that can be allocated via this /// pool. - LimitedPool(unsigned entry_size, unsigned entry_count) + /// @param base_pool where to allocate the memory for our objects. If + /// nullptr, uses the mainBufferPool. + LimitedPool( + unsigned entry_size, unsigned entry_count, Pool *base_pool = nullptr) : itemSize_(entry_size) , freeCount_(entry_count) + , basePool_(base_pool) { } @@ -129,6 +134,10 @@ private: /// @return the pool from which we should get the actual memory we have. Pool *base_pool() { + if (basePool_) + { + return basePool_; + } return mainBufferPool; } @@ -136,6 +145,8 @@ private: uint16_t itemSize_; /// How many entries can still be allocated. uint16_t freeCount_; + /// Where to allocate memory from. + Pool *basePool_; /// Async allocators waiting for free buffers. Q waitingQueue_; };