diff --git a/include/nmranet_config.h b/include/nmranet_config.h index 85dc8da42..287e9a481 100644 --- a/include/nmranet_config.h +++ b/include/nmranet_config.h @@ -146,6 +146,10 @@ DECLARE_CONST(enable_all_memory_space); * standard. */ DECLARE_CONST(node_init_identify); +/** How many CAN frames should the bulk alias allocator be sending at the same + * time. */ +DECLARE_CONST(bulk_alias_num_can_frames); + /** Stack size for @ref SocketListener threads. */ DECLARE_CONST(socket_listener_stack_size); diff --git a/src/openlcb/AliasAllocator.cxx b/src/openlcb/AliasAllocator.cxx index 526e5c345..343fa1ea7 100644 --- a/src/openlcb/AliasAllocator.cxx +++ b/src/openlcb/AliasAllocator.cxx @@ -111,9 +111,7 @@ StateFlowBase::Action AliasAllocator::entry() HASSERT(pending_alias()->state == AliasInfo::STATE_EMPTY); while (!pending_alias()->alias) { - pending_alias()->alias = seed_; - next_seed(); - // TODO(balazs.racz): check if the alias is already known about. + pending_alias()->alias = get_new_seed(); } // Registers ourselves as a handler for incoming CAN frames to detect // conflicts. @@ -124,6 +122,24 @@ StateFlowBase::Action AliasAllocator::entry() return call_immediately(STATE(handle_allocate_for_cid_frame)); } +NodeAlias AliasAllocator::get_new_seed() +{ + while (true) + { + NodeAlias ret = seed_; + next_seed(); + if (if_can()->local_aliases()->lookup(ret)) + { + continue; + } + if (if_can()->remote_aliases()->lookup(ret)) + { + continue; + } + return ret; + } +} + void AliasAllocator::next_seed() { uint16_t offset; diff --git a/src/openlcb/AliasAllocator.cxxtest b/src/openlcb/AliasAllocator.cxxtest index 67a284dcd..2ff91b437 100644 --- a/src/openlcb/AliasAllocator.cxxtest +++ b/src/openlcb/AliasAllocator.cxxtest @@ -1,9 +1,10 @@ #include -#include "utils/async_if_test_helper.hxx" #include "openlcb/AliasAllocator.hxx" #include "openlcb/AliasCache.hxx" +#include "openlcb/BulkAliasAllocator.hxx" #include "openlcb/CanDefs.hxx" +#include "utils/async_if_test_helper.hxx" namespace openlcb { @@ -13,6 +14,7 @@ protected: AsyncAliasAllocatorTest() : b_(nullptr) , alias_allocator_(TEST_NODE_ID, ifCan_.get()) + , bulkAllocator_(create_bulk_alias_allocator(ifCan_.get())) { } @@ -48,8 +50,55 @@ protected: } } + /// Pre-generates some aliases into a vector. + void generate_aliases(AliasAllocator *alloc, unsigned count) + { + set_seed(0x555, alloc); + run_x([this, count, alloc]() { + for (unsigned i = 0; i < count; i++) + { + auto a = alloc->get_new_seed(); + LOG(INFO, "alias %03X", a); + aliases_.push_back(a); + } + }); + set_seed(0x555, alloc); + } + + /// Expects that CID frames are sent to the bus. + /// @param begin iterator into alias array + /// @param end iterator (end) into alias array + template void expect_cid(It begin, It end) + { + for (auto it = begin; it != end; ++it) + { + NodeAlias a = *it; + string msg = StringPrintf("cid %03X", a); + LOG(INFO, "cid %03X", a); + expect_packet(StringPrintf(":X17020%03XN;", a)); + expect_packet(StringPrintf(":X1610D%03XN;", a)); + expect_packet(StringPrintf(":X15000%03XN;", a)); + expect_packet(StringPrintf(":X14003%03XN;", a)); + } + } + + /// Expects that RID frames are sent to the bus. + /// @param begin iterator into alias array + /// @param end iterator (end) into alias array + template void expect_rid(It begin, It end) + { + for (auto it = begin; it != end; ++it) + { + NodeAlias a = *it; + LOG(INFO, "rid %03X", a); + expect_packet(StringPrintf(":X10700%03XN;", a)); + } + } + Buffer *b_; AliasAllocator alias_allocator_; + std::unique_ptr bulkAllocator_; + std::vector aliases_; }; TEST_F(AsyncAliasAllocatorTest, SetupTeardown) @@ -218,4 +267,62 @@ TEST_F(AsyncAliasAllocatorTest, DifferentGenerated) // Makes sure 'other' disappears from the executor before destructing it. wait(); } + +TEST_F(AsyncAliasAllocatorTest, BulkFew) +{ + generate_aliases(ifCan_->alias_allocator(), 5); + expect_cid(aliases_.begin(), aliases_.end()); + LOG(INFO, "invoke"); + auto start_time = os_get_time_monotonic(); + auto invocation = invoke_flow_nowait(bulkAllocator_.get(), 5); + wait(); + LOG(INFO, "expect RIDs"); + clear_expect(true); + expect_rid(aliases_.begin(), aliases_.end()); + LOG(INFO, "wait for complete"); + invocation->wait(); + clear_expect(true); + auto end_time = os_get_time_monotonic(); + EXPECT_LT(MSEC_TO_NSEC(200), end_time - start_time); +} + +TEST_F(AsyncAliasAllocatorTest, BulkConflict) +{ + generate_aliases(ifCan_->alias_allocator(), 7); + clear_expect(true); + expect_cid(aliases_.begin(), aliases_.begin() + 5); + LOG(INFO, "invoke"); + auto invocation = invoke_flow_nowait(bulkAllocator_.get(), 5); + wait(); + LOG(INFO, "send conflicts"); + clear_expect(true); + expect_cid(aliases_.begin() + 5, aliases_.end()); + send_packet(StringPrintf(":X10700%03XN;", aliases_[0])); + send_packet(StringPrintf(":X10700%03XN;", aliases_[1])); + wait(); + usleep(10000); + wait(); + LOG(INFO, "expect RIDs"); + clear_expect(true); + expect_rid(aliases_.begin() + 2, aliases_.end()); + LOG(INFO, "wait for complete"); + invocation->wait(); + clear_expect(true); +} + +TEST_F(AsyncAliasAllocatorTest, BulkMany) +{ + generate_aliases(ifCan_->alias_allocator(), 150); + expect_cid(aliases_.begin(), aliases_.end()); + LOG(INFO, "invoke"); + auto invocation = invoke_flow_nowait(bulkAllocator_.get(), 150); + wait(); + LOG(INFO, "expect RIDs"); + clear_expect(true); + expect_rid(aliases_.begin(), aliases_.end()); + LOG(INFO, "wait for complete"); + invocation->wait(); + clear_expect(true); +} + } // namespace openlcb diff --git a/src/openlcb/AliasAllocator.hxx b/src/openlcb/AliasAllocator.hxx index c39606a6b..4abf9d5ff 100644 --- a/src/openlcb/AliasAllocator.hxx +++ b/src/openlcb/AliasAllocator.hxx @@ -108,13 +108,24 @@ public: */ AliasAllocator(NodeID if_id, IfCan *if_can); + /** Destructor */ virtual ~AliasAllocator(); + /** @return the Node ID for the interface. */ + NodeID if_node_id() + { + return if_id_; + } + /** Resets the alias allocator to the state it was at construction. useful * after connection restart in order to ensure it will try to allocate the * same alias. */ void reinit_seed(); + /** Returns a new alias to check from the random sequence. Checks that it + * is not in the alias cache yet.*/ + NodeAlias get_new_seed(); + /** "Allocate" a buffer from this pool (but without initialization) in * order to get a reserved alias. */ QAsync *reserved_aliases() diff --git a/src/openlcb/BulkAliasAllocator.cxx b/src/openlcb/BulkAliasAllocator.cxx new file mode 100644 index 000000000..d4767f418 --- /dev/null +++ b/src/openlcb/BulkAliasAllocator.cxx @@ -0,0 +1,285 @@ +/** \copyright + * Copyright (c) 2020 Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file BulkAliasAllocator.cxx + * + * State flow for allocating many aliases at the same time. + * + * @author Balazs Racz + * @date 14 Nov 2020 + */ + +#include "openlcb/BulkAliasAllocator.hxx" +#include "openlcb/CanDefs.hxx" + +namespace openlcb +{ + +/// Implementation of the BulkAliasAllocatorInterface to allocate many aliases +/// at the same time. +class BulkAliasAllocator : public CallableFlow +{ +public: + /// Constructor + /// @param iface the openlcb CAN interface + BulkAliasAllocator(IfCan *iface) + : CallableFlow(iface) + { + } + + /// Start of flow when a request arrives to allocate many aliases. Resets + /// the internal state and goes on to start the allocation process. + Action entry() override + { + startTime_ = os_get_time_monotonic(); + pendingAliasesByTime_.clear(); + pendingAliasesByKey_.clear(); + nextToStampTime_ = 0; + nextToClaim_ = 0; + if_can()->frame_dispatcher()->register_handler(&conflictHandler_, 0, 0); + return call_immediately(STATE(send_cid_frames)); + } + + /// Picks a bunch of random aliases, sends CID frames for them to the bus. + Action send_cid_frames() + { + unsigned needed = std::min(request()->numAliases_, + (unsigned)(config_bulk_alias_num_can_frames() + 3) / 4); + if (!needed) + { + return call_immediately(STATE(wait_for_results)); + } + bn_.reset(this); + for (unsigned i = 0; i < needed; ++i) + { + NodeAlias next_alias = if_can()->alias_allocator()->get_new_seed(); + auto if_id = if_can()->alias_allocator()->if_node_id(); + send_can_frame(next_alias, (if_id >> 36) & 0xfff, 7); + send_can_frame(next_alias, (if_id >> 24) & 0xfff, 6); + send_can_frame(next_alias, (if_id >> 12) & 0xfff, 5); + send_can_frame(next_alias, (if_id >> 0) & 0xfff, 4); + --request()->numAliases_; + pendingAliasesByTime_.push_back({next_alias}); + pendingAliasesByKey_.insert({next_alias}); + } + bn_.notify(); + return wait_and_call(STATE(stamp_time)); + } + + /// Adds the timestamps when the CID requests were sent out. + Action stamp_time() + { + auto ctime = relative_time(); + for (unsigned i = nextToStampTime_; i < pendingAliasesByTime_.size(); + ++i) + { + pendingAliasesByTime_[i].cidTime_ = ctime; + } + nextToStampTime_ = pendingAliasesByTime_.size(); + // Go back to sending more CID frames as needed. + return call_immediately(STATE(send_cid_frames)); + } + + /// Sends out the RID frames for any alias that the 200 msec has already + /// elapsed, then waits a bit and tries again. + Action wait_for_results() + { + if (nextToClaim_ == pendingAliasesByTime_.size()) + { + return complete(); + } + if (request()->numAliases_) + { + // Some conflicts were identified, go and allocate more. + return call_immediately(STATE(send_cid_frames)); + } + auto ctime = relative_time(); + unsigned num_sent = 0; + bn_.reset(this); + while ((nextToClaim_ < pendingAliasesByTime_.size()) && + (num_sent < (unsigned)(config_bulk_alias_num_can_frames())) && + (pendingAliasesByTime_[nextToClaim_].cidTime_ + ALLOCATE_DELAY < + ctime)) + { + NodeAlias a = + (NodeAlias)(pendingAliasesByTime_[nextToClaim_].alias_); + ++nextToClaim_; + auto it = pendingAliasesByKey_.find(a); + if (it->hasConflict_) + { + // we skip this alias because there was a conflict. + continue; + } + /// @todo add alias to the cache as reserved alias. + ++num_sent; + send_can_frame(a, CanDefs::RID_FRAME, 0); + } + if (bn_.abort_if_almost_done()) + { + // no frame sent + return sleep_and_call( + &timer_, MSEC_TO_NSEC(10), STATE(wait_for_results)); + } + else + { + bn_.notify(); + // Wait for outgoing frames to be gone and call this again. + return wait(); + } + } + + /// Called when all RID frames are sent out. + Action complete() + { + if_can()->frame_dispatcher()->unregister_handler_all(&conflictHandler_); + pendingAliasesByTime_.clear(); + pendingAliasesByKey_.clear(); + return return_ok(); + } + +private: + /// Callback from the stack for all incoming frames while we are + /// operating. We sniff the alias uot of it and record any conflicts we + /// see. + /// @param message an incoming CAN frame. + void handle_conflict(Buffer *message) + { + auto rb = get_buffer_deleter(message); + auto alias = CanDefs::get_src(GET_CAN_FRAME_ID_EFF(*message->data())); + auto it = pendingAliasesByKey_.find(alias); + if (it != pendingAliasesByKey_.end() && !it->hasConflict_) + { + it->hasConflict_ = 1; + ++request()->numAliases_; + } + } + + /// Listens to incoming CAN frames and handles alias conflicts. + IncomingFrameHandler::GenericHandler conflictHandler_ { + this, &BulkAliasAllocator::handle_conflict}; + + /// How many count to wait before sending out the RID frames. One count is + /// 10 msec (see { \link relative_time } ). + static constexpr unsigned ALLOCATE_DELAY = 20; + + /// Sends a CAN control frame to the bus. Take a share of the barrier bn_ + /// to send with the frame. + /// @param src source alias to use on the frame. + /// @param control_field 16-bit control value (e.g. RID_FRAME, or 0 top + /// nibble and a chunk of the unique node ID in the middle). + /// @param sequence used for CID messages. + void send_can_frame(NodeAlias src, uint16_t control_field, int sequence) + { + auto *b = if_can()->frame_write_flow()->alloc(); + b->set_done(bn_.new_child()); + CanDefs::control_init(*b->data(), src, control_field, sequence); + if_can()->frame_write_flow()->send(b, 0); + } + + /// @return the openlcb CAN interface + IfCan *if_can() + { + return static_cast(service()); + } + + /// @return the time elapsed from start time in 10 msec units. + unsigned relative_time() + { + return (os_get_time_monotonic() - startTime_) / MSEC_TO_NSEC(10); + } + + /// We store this type in the time-ordered aliases structure. + struct PendingAliasInfo + { + /// Constructor + /// @param alias the openlcb alias that is being represented here. + PendingAliasInfo(NodeAlias alias) + : alias_(alias) + , cidTime_(0) + { + } + + /// The value of the alias + unsigned alias_ : 12; + /// The time when the CID requests were sent. Counter in + /// relative_time(), i.e. 10 msec per increment. + unsigned cidTime_ : 8; + }; + static_assert(sizeof(PendingAliasInfo) == 4, "memory bloat"); + + /// We store this type in the sorted map lookup structure. + struct AliasLookupInfo + { + /// Constructor + /// @param alias the openlcb alias that is being represented here. + AliasLookupInfo(NodeAlias alias) + : alias_(alias) + , hasConflict_(0) + { + } + + /// The value of the alias + uint16_t alias_ : 12; + /// 1 if we have seen a conflict + uint16_t hasConflict_ : 1; + }; + static_assert(sizeof(AliasLookupInfo) == 2, "memory bloat"); + /// Comparator function on AliasLookupInfo objects. + struct LookupCompare + { + bool operator()(AliasLookupInfo a, AliasLookupInfo b) + { + return a.alias_ < b.alias_; + } + }; + + /// Helper object for sleeping. + StateFlowTimer timer_ {this}; + /// Helper object to determine when the CAN frames have flushed from the + /// system. + BarrierNotifiable bn_; + /// We measure time elapsed relative to this point. + long long startTime_; + /// Stores the aliases we are trying to allocate in time order of picking + /// them. + std::vector pendingAliasesByTime_; + /// Stores the aliases we are trying to allocate in the alias order. + SortedListSet pendingAliasesByKey_; + /// Index into the pendingAliasesByTime_ vector where we need to stmap + /// time. + uint16_t nextToStampTime_; + /// Index into the pendingAliasesByTime_ vector where we need to send out + /// the reserve frame. + uint16_t nextToClaim_; +}; + +std::unique_ptr create_bulk_alias_allocator( + IfCan *can_if) +{ + return std::make_unique(can_if); +} + +} // namespace openlcb diff --git a/src/openlcb/BulkAliasAllocator.hxx b/src/openlcb/BulkAliasAllocator.hxx new file mode 100644 index 000000000..2fee889a0 --- /dev/null +++ b/src/openlcb/BulkAliasAllocator.hxx @@ -0,0 +1,64 @@ +/** \copyright + * Copyright (c) 2020 Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file BulkAliasAllocator.hxx + * + * State flow for allocating many aliases at the same time. + * + * @author Balazs Racz + * @date 14 Nov 2020 + */ + +#include "executor/CallableFlow.hxx" +#include "openlcb/AliasAllocator.hxx" +#include "openlcb/AliasCache.hxx" +#include "openlcb/CanDefs.hxx" + +namespace openlcb +{ + +/// Message type to request allocating many aliases for an interface. +struct BulkAliasRequest : CallableFlowRequestBase +{ + /// @param count how many aliases to allocate. + void reset(unsigned count) + { + reset_base(); + numAliases_ = count; + } + + /// How many aliases to allocate. + unsigned numAliases_; +}; + +using BulkAliasAllocatorInterface = FlowInterface>; + +/// Creates a bulk alias allocator. +/// @param can_if the interface to bind it to. +std::unique_ptr create_bulk_alias_allocator( + IfCan *can_if); + +} // namespace openlcb diff --git a/src/openlcb/nmranet_constants.cxx b/src/openlcb/nmranet_constants.cxx index 87c9f3465..c246c047b 100644 --- a/src/openlcb/nmranet_constants.cxx +++ b/src/openlcb/nmranet_constants.cxx @@ -63,3 +63,7 @@ DEFAULT_CONST_FALSE(enable_all_memory_space); * identified messages at boot time. This is required by the OpenLCB * standard. */ DEFAULT_CONST_TRUE(node_init_identify); + +/** How many CAN frames should the bulk alias allocator be sending at the same + * time. */ +DEFAULT_CONST(bulk_alias_num_can_frames, 20); diff --git a/src/openlcb/sources b/src/openlcb/sources index 9f849e954..5e8e4a01a 100644 --- a/src/openlcb/sources +++ b/src/openlcb/sources @@ -8,6 +8,7 @@ CXXSRCS += \ BroadcastTime.cxx \ BroadcastTimeClient.cxx \ BroadcastTimeServer.cxx \ + BulkAliasAllocator.cxx \ CanDefs.cxx \ ConfigEntry.cxx \ ConfigUpdateFlow.cxx \ diff --git a/src/utils/SortedListMap.hxx b/src/utils/SortedListMap.hxx index 598e934d5..7a65bc45d 100644 --- a/src/utils/SortedListMap.hxx +++ b/src/utils/SortedListMap.hxx @@ -152,7 +152,8 @@ public: } /// Removes all entries. - void clear() { + void clear() + { container_.clear(); sortedCount_ = 0; } diff --git a/src/utils/test_main.hxx b/src/utils/test_main.hxx index be1f1a19c..e85348124 100644 --- a/src/utils/test_main.hxx +++ b/src/utils/test_main.hxx @@ -49,6 +49,7 @@ #include "gmock/gmock.h" #include "can_frame.h" +#include "executor/CallableFlow.hxx" #include "executor/Executor.hxx" #include "executor/Service.hxx" #include "os/TempFile.hxx" @@ -177,6 +178,49 @@ void run_x(std::function fn) g_executor.sync_run(std::move(fn)); } +/// Structure holding returned objects for an invoke_flow_nowait command. +template struct PendingInvocation +{ + /// Buffer sent to the flow. + BufferPtr b; + /// Notifiable to wait for. + SyncNotifiable notifiable; + /// Barrier notifiable given to the buffer. + BarrierNotifiable barrier {¬ifiable}; + /// True if wait has been invoked. + bool isWaited {false}; + + ~PendingInvocation() + { + wait(); + } + + void wait() + { + if (isWaited) + { + return; + } + notifiable.wait_for_notification(); + isWaited = true; + } +}; + +/// Executes a callable flow similar to invoke_flow(...) but does not wait for +/// the result to come back. Instead, returns a PendingInvocation object, where +/// there is a wait() method to be called. +template +std::unique_ptr> invoke_flow_nowait( + FlowInterface> *flow, Args &&...args) +{ + auto ret = std::make_unique>(); + ret->b.reset(flow->alloc()); + ret->b->data()->reset(std::forward(args)...); + ret->b->data()->done.reset(&ret->barrier); + flow->send(ret->b->ref()); + return ret; +} + /** Utility class to block an executor for a while. * * Usage: add an instance of BlockExecutor to the executor you want to block,