diff --git a/src/openlcb/HubLatency.cxxtest b/src/openlcb/HubLatency.cxxtest new file mode 100644 index 000000000..84f654a08 --- /dev/null +++ b/src/openlcb/HubLatency.cxxtest @@ -0,0 +1,275 @@ +/** \copyright + * Copyright (c) 2022, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file HubLatency.cxxtest + * + * Unit tests for how much network latency do we introduce to datagram + * send/receive operations. + * + * @author Balazs Racz + * @date 13 Oct 2022 + */ + +#define NO_GC_OPTIMIZE + +#include "openlcb/DatagramHandlerDefault.hxx" +#include "openlcb/SimpleNodeInfoMockUserFile.hxx" +#include "openlcb/SimpleStack.hxx" +#include "utils/GridConnectHub.hxx" +#include "utils/test_main.hxx" +#include "os/sleep.h" + +openlcb::MockSNIPUserFile snip_user_file( + "Default user name", "Default user description"); +const char *const openlcb::SNIP_DYNAMIC_FILENAME = + openlcb::MockSNIPUserFile::snip_user_file_path; + +const char *const openlcb::CONFIG_FILENAME = + openlcb::MockSNIPUserFile::snip_user_file_path; + +namespace openlcb +{ + +/** Ping-pong is a fake datagram-based service. When it receives a datagram + * from a particular node, it sends back the datagram to the originating node + * with a slight difference: a TTL being decremented. Two ping-pong datagram + * handlers can therefore converse with each other after the injection of a + * single message. + * + * Datagram format: id=0x7A, second byte = TTL, then some payload which is + * unchanged. + * + * The response will be in the same format, with TTL decreased by one. A + * datagram with TTL=0 will not receive a response. + */ +class PingPongHandler : public DefaultDatagramHandler +{ +public: + enum + { + DATAGRAM_ID = 0x7A, + }; + + /// Constructor + /// + /// @param if_dg datagram service + /// @param node node pointer + /// @param done_cb invoked when a message has TTL=0 and it is dropped. + PingPongHandler(DatagramService *if_dg, Node *node, + std::function done_cb) + : DefaultDatagramHandler(if_dg) + , processCount_(0) + , doneCb_(std::move(done_cb)) + { + dg_service()->registry()->insert(node, DATAGRAM_ID, this); + } + + ~PingPongHandler() + { + /** @TODO(balazs.racz) Remove handler entry from the registry. It would + * be important to remember the node for that, and need a remove API + * on the NodeHandlerMap. */ + } + + void set_done(std::function done_cb) + { + doneCb_ = std::move(done_cb); + } + + /// Returns how many datagrams this handler has seen so far. + int process_count() + { + return processCount_; + } + + Action entry() OVERRIDE + { + processCount_++; + const uint8_t *bytes = reinterpret_cast( + message()->data()->payload.data()); + size_t len = message()->data()->payload.size(); + HASSERT(len >= 1); + HASSERT(bytes[0] == DATAGRAM_ID); + if (len <= 1) + { + return respond_reject(DatagramClient::PERMANENT_ERROR); + } + if (bytes[1] > 0) + { + return respond_ok(DatagramClient::REPLY_PENDING); + } + else + { + return respond_ok(0); + } + } + + virtual Action ok_response_sent() + { + const uint8_t *bytes = reinterpret_cast( + message()->data()->payload.data()); + if (!bytes[1]) + { + if (doneCb_) + { + doneCb_(message()->data()->payload); + } + // No response. + return release_and_exit(); + } + + // We take over the buffer ownership. + responsePayload_.swap(message()->data()->payload); + --responsePayload_[1]; + + return allocate_and_call( + STATE(client_allocated), dg_service()->client_allocator()); + } + + Action client_allocated() + { + clientFlow_ = full_allocation_result(dg_service()->client_allocator()); + return allocate_and_call( + dg_service()->iface()->dispatcher(), STATE(send_response_datagram)); + } + + Action send_response_datagram() + { + auto *b = get_allocation_result(dg_service()->iface()->dispatcher()); + b->set_done(b_.reset(this)); + b->data()->reset(Defs::MTI_DATAGRAM, message()->data()->dst->node_id(), + message()->data()->src, EMPTY_PAYLOAD); + b->data()->payload.swap(responsePayload_); + release(); + clientFlow_->write_datagram(b); + return wait_and_call(STATE(wait_response_datagram)); + } + + Action wait_response_datagram() + { + if (clientFlow_->result() & DatagramClient::OPERATION_PENDING) + { + DIE("Unexpected notification from the datagram client."); + } + if (!(clientFlow_->result() & DatagramClient::OPERATION_SUCCESS)) + { + LOG(WARNING, "Error sending response datagram for PingPong: %x", + clientFlow_->result()); + } + dg_service()->client_allocator()->typed_insert(clientFlow_); + return release_and_exit(); + } + +private: + int processCount_; //< tracks the number of incoming datagrams + DatagramPayload responsePayload_; + DatagramClient *clientFlow_; + BarrierNotifiable b_; + std::function doneCb_; +}; + +constexpr NodeID NODEONE = 0x050101011801ULL; +constexpr NodeID NODETWO = 0x050101011802ULL; + +SimpleCanStack stack1_(NODEONE); +PingPongHandler ping1_(stack1_.dg_service(), stack1_.node(), nullptr); + +Executor<5> executor2_ {"stack2", 0, 0}; +Service service2_ {&executor2_}; +CanHubFlow canHub2_ {&service2_}; +IfCan ifCan2_ {&executor2_, &canHub2_, config_local_alias_cache_size(), + config_remote_alias_cache_size(), config_local_nodes_count()}; +AddAliasAllocator alloc2_(NODETWO, &ifCan2_); +CanDatagramService datagramService2_ {&ifCan2_, + config_num_datagram_registry_entries(), config_num_datagram_clients()}; +DefaultNode node2_{&ifCan2_, NODETWO}; +PingPongHandler ping2_(&datagramService2_, &node2_, nullptr); + +class DelayTest : public ::testing::Test +{ +protected: + static void SetUpTestCase() + { + LOG(INFO, "set up test case"); + stack1_.start_executor_thread("stack1", 0, 0); + stack1_.start_tcp_hub_server(50989); + int fd = ConnectSocket("localhost", 50989); + HASSERT(fd >= 0); + create_gc_port_for_can_hub(&canHub2_, fd); + microsleep(500000); + } + + static void TearDownTestCase() + { + microsleep(250000); + } + + void send_ping(uint8_t count) { + DatagramClient *c = + stack1_.dg_service()->client_allocator()->next_blocking(); + + auto *b = stack1_.iface()->dispatcher()->alloc(); + b->set_done(bn_.reset(&sn_)); + Payload p; + p.push_back(PingPongHandler::DATAGRAM_ID); + p.push_back(count); + b->data()->reset( + Defs::MTI_DATAGRAM, NODEONE, NodeHandle(NODETWO), p); + c->write_datagram(b); + + sn_.wait_for_notification(); + + // Releases client. + stack1_.dg_service()->client_allocator()->insert(c); + } + + SyncNotifiable sn_; + BarrierNotifiable bn_; + + SyncNotifiable sn2_; +}; + +// This override is in hub's main.cxx. +OVERRIDE_CONST(gridconnect_buffer_delay_usec, 2000); + +TEST_F(DelayTest, pingone) +{ + auto cb = [this](const Payload&) { + sn2_.notify(); + }; + ping1_.set_done(cb); + ping2_.set_done(cb); + auto ts1 = os_get_time_monotonic(); + send_ping(101); + auto ts2 = os_get_time_monotonic(); + sn2_.wait_for_notification(); + auto ts3 = os_get_time_monotonic(); + + LOG(INFO, "first ping: %d msec", (int)NSEC_TO_MSEC(ts2-ts1)); + LOG(INFO, "total: %d msec", (int)NSEC_TO_MSEC(ts3-ts1)); +} + +} // namespace openlcb diff --git a/src/utils/BufferPort.hxx b/src/utils/BufferPort.hxx index eaee5367b..b639ebeab 100644 --- a/src/utils/BufferPort.hxx +++ b/src/utils/BufferPort.hxx @@ -93,12 +93,41 @@ private: ? nullptr : &outputPool_); } + // Defines whether we should optimize the traffic and flush right now. + bool opt_flush = false; + // This code is OpenLCB-specific. It looks for a certain pattern in the + // output data stream, and if that pattern is found, inserts an extra + // flush right now, instead of waiting for the timeout to pass. The + // data sent on is never modified, so this is purely a performance + // optimization for OpenLCB. + if (msg().data()[0] == ':' && msg().data()[1] == 'X') { + if (msg().data()[3] == 'A' || msg().data()[3] == 'D') + { + // Found datagram "only" or "end" packet. + opt_flush = true; + } + else if (strncmp(msg().data() + 3, "9A28", 4) == 0) + { + // Found datagram acknowledge packet. + opt_flush = true; + } + } + if (opt_flush && !bufEnd_) + { + // nothing accumulated, send off directly. + downstream_->send(transfer_message(), priority()); + return exit(); + } if (msg().size() < (bufSize_ - bufEnd_)) { // Fits into the buffer. memcpy(sendBuf_ + bufEnd_, msg().data(), msg().size()); bufEnd_ += msg().size(); - if (!timerPending_) + if (opt_flush) + { + flush_buffer(); + } + else if (!timerPending_) { timerPending_ = 1; bufferTimer_.start(delayNsec_); diff --git a/src/utils/test_main.hxx b/src/utils/test_main.hxx index cfbd77840..9b1327ce7 100644 --- a/src/utils/test_main.hxx +++ b/src/utils/test_main.hxx @@ -95,8 +95,10 @@ void os_emscripten_yield() { Executor<1> g_executor("ex_thread", 0, 1024); #endif +#ifndef NO_GC_OPTIMIZE /// Do not buffer gridconnect bytes when we are running in a test. OVERRIDE_CONST(gridconnect_buffer_size, 1); +#endif Service g_service(&g_executor);