diff --git a/src/freertos_drivers/net_cc32xx/CC32xxWiFi.cxx b/src/freertos_drivers/net_cc32xx/CC32xxWiFi.cxx index 03863388c..97c966caf 100644 --- a/src/freertos_drivers/net_cc32xx/CC32xxWiFi.cxx +++ b/src/freertos_drivers/net_cc32xx/CC32xxWiFi.cxx @@ -805,18 +805,18 @@ void CC32xxWiFi::wlan_wps_pbc_initiate() void CC32xxWiFi::wlan_setup_ap(const char *ssid, const char *security_key, SecurityType security_type) { - HASSERT(strlen(ssid) <= 32); - HASSERT(strlen(security_key) <= 64); - - uint8_t sec_type = security_type_to_simplelink(security_type); - - sl_WlanSet(SL_WLAN_CFG_AP_ID, SL_WLAN_AP_OPT_SSID, strlen(ssid), - (uint8_t*)ssid); - if (wlanRole == WlanRole::AP) + if (ssid) { - str_populate(this->ssid, ssid); + HASSERT(strlen(ssid) <= 32); + sl_WlanSet(SL_WLAN_CFG_AP_ID, SL_WLAN_AP_OPT_SSID, strlen(ssid), + (uint8_t *)ssid); + if (wlanRole == WlanRole::AP) + { + str_populate(this->ssid, ssid); + } } - + + uint8_t sec_type = security_type_to_simplelink(security_type); sl_WlanSet(SL_WLAN_CFG_AP_ID, SL_WLAN_AP_OPT_SECURITY_TYPE, 1, (uint8_t*)&sec_type); @@ -826,6 +826,7 @@ void CC32xxWiFi::wlan_setup_ap(const char *ssid, const char *security_key, return; } + HASSERT(strlen(security_key) <= 64); sl_WlanSet(SL_WLAN_CFG_AP_ID, SL_WLAN_AP_OPT_PASSWORD, strlen(security_key), (uint8_t*)security_key); } @@ -849,7 +850,9 @@ void CC32xxWiFi::wlan_get_ap_config(string *ssid, SecurityType *security_type) { uint16_t len = sizeof(*security_type); uint16_t config_opt = SL_WLAN_AP_OPT_SECURITY_TYPE; - sl_WlanGet(SL_WLAN_CFG_AP_ID, &config_opt, &len, (_u8*) security_type); + uint8_t sl_sec_type = 0; + sl_WlanGet(SL_WLAN_CFG_AP_ID, &config_opt, &len, &sl_sec_type); + *security_type = security_type_from_simplelink(sl_sec_type); } } diff --git a/src/freertos_drivers/net_cc32xx/CC32xxWiFi.hxx b/src/freertos_drivers/net_cc32xx/CC32xxWiFi.hxx index a58fd2635..6712cfa76 100644 --- a/src/freertos_drivers/net_cc32xx/CC32xxWiFi.hxx +++ b/src/freertos_drivers/net_cc32xx/CC32xxWiFi.hxx @@ -67,10 +67,12 @@ protected: { } - /** Setup access point role credentials. - * @param ssid access point ssid - * @param security_key access point security key - * @param security_type specifies security type + /** Setup access point role credentials. It is OK to leave ssid as nullptr + * or password as nullptr, in which case those properties will not be + * changed. + * @param ssid access point ssid (name) + * @param security_key access point security key (password) + * @param security_type specifies security type. Required. */ virtual void wlan_setup_ap(const char *ssid, const char *security_key, SecurityType security_type) = 0; diff --git a/src/openlcb/IfCanStress.cxxtest b/src/openlcb/IfCanStress.cxxtest index 072eabf56..e15627579 100644 --- a/src/openlcb/IfCanStress.cxxtest +++ b/src/openlcb/IfCanStress.cxxtest @@ -174,8 +174,10 @@ protected: ~AsyncIfStressTest() { - while (!(g1_executor.empty() && g2_executor.empty() && - g3_executor.empty() && g4_executor.empty() && + while (!(g1_executor.empty() && g1_executor.active_timers()->empty() && + g2_executor.empty() && g2_executor.active_timers()->empty() && + g3_executor.empty() && g3_executor.active_timers()->empty() && + g4_executor.empty() && g4_executor.active_timers()->empty() && g_executor.empty())) { usleep(100); diff --git a/src/utils/DataBuffer.cxxtest b/src/utils/DataBuffer.cxxtest index ab52ca84b..b5d88682b 100644 --- a/src/utils/DataBuffer.cxxtest +++ b/src/utils/DataBuffer.cxxtest @@ -3,6 +3,7 @@ #include "utils/test_main.hxx" DataBufferPool g_pool(64); +DataBufferPool g_pool10(10); class DataBufferTest : public ::testing::Test { @@ -390,3 +391,292 @@ TEST_F(DataBufferTest, lnk_multi) // The barriers will verify upon destruction time that they were correctly // notified. } + +class DataBufferFuzzTest : public ::testing::Test +{ +protected: + DataBufferFuzzTest() + { + for (int i = 0; i < NUM_OP; ++i) + { + freq_[i] = 0; + } + freq_[0] = 1; + freq_[NUM_OP] = 0; + } + + enum Op + { + OP_APPEND, + OP_READ, + OP_XFERMID, + OP_READMID, + OP_XFEREND, + OP_READEND, + NUM_OP + }; + + int freq_[NUM_OP + 1]; + + /// @return a pseudorandom number uniformly distributed between 0 and max - + /// 1. + /// @param max distribution parameter. + unsigned get_random_uni(unsigned max) + { + return rand_r(&randSeed_) % max; + } + + /// Setup a fuzz test scenario where we append a given LinkedDataBufferPtr + /// and then read from the same one. + void setup_basic_readwrite() + { + freq_[OP_APPEND] = 1; + freq_[OP_READ] = 1; + } + + /// Setup a fuzz test scenario where we append one LinkedDataBufferPtr, + /// then move data to a middle one, then read that middle one. + void setup_write_transfer_read() + { + freq_[OP_APPEND] = 1; + freq_[OP_XFERMID] = 1; + freq_[OP_READMID] = 1; + } + + /// Setup a fuzz test scenario where we append one LinkedDataBufferPtr, + /// then move data to a middle one, then move data to a third one, then + /// read that last. + void setup_write_transfer_read_transfer_read() + { + freq_[OP_APPEND] = 1; + freq_[OP_XFERMID] = 1; + freq_[OP_XFEREND] = 1; + freq_[OP_READEND] = 1; + } + + void prep_fuzz() + { + int sum = 0; + for (int i = 0; i <= NUM_OP; ++i) + { + sum += freq_[i]; + freq_[i] = sum; + } + } + + void run_fuzz(unsigned iter) + { + prep_fuzz(); + size_t idx = 0; + while (--iter && !HasFatalFailure()) + { + int oper = get_random_uni(freq_[NUM_OP]); + for (int i = 0; i < NUM_OP; ++i) + { + if (freq_[i] > oper) + { + SCOPED_TRACE(idx); + run_op((Op)i); + ++idx; + break; + } + } + } + } + + void run_op(Op op) + { + switch (op) + { + case OP_APPEND: + { + int len = get_random_uni(22); + append_helper(&lnk_, len); + break; + } + case OP_READ: + { + int len = get_random_uni(22); + consume_helper(&lnk_, len); + break; + } + case OP_XFERMID: + { + int len = get_random_uni(22); + xfer_helper(&lnk_, &mid_, len); + break; + } + case OP_READMID: + { + int len = get_random_uni(22); + consume_helper(&mid_, len); + break; + } + case OP_XFEREND: + { + int len = get_random_uni(22); + xfer_helper(&mid_, &end_, len); + break; + } + case OP_READEND: + { + int len = get_random_uni(22); + consume_helper(&end_, len); + break; + } + default: + return; + } + } + + std::string flatten(const LinkedDataBufferPtr &p) + { + std::string ret; + p.append_to(&ret); + return ret; + } + + /// Appends a certain number of characters to a ptr. Characters are always + /// taken in the input sequence. + void append_helper(LinkedDataBufferPtr *p, size_t len) + { + while (len) + { + int free = p->free(); + if (!free) + { + DataBuffer *c; + g_pool10.alloc(&c); + p->append_empty_buffer(c); + continue; + } + auto *rp = p->data_write_pointer(); + int count = 0; + while (free > 0 && len > 0) + { + *rp++ = generate(); + --free; + --len; + ++count; + } + p->data_write_advance(count); + } + } + + /// Appends a certain number of characters to a ptr. Characters are always + /// taken in the input sequence. + void xfer_helper( + LinkedDataBufferPtr *from, LinkedDataBufferPtr *to, size_t len) + { + LinkedDataBufferPtr tmp; + len = std::min(len, (size_t)from->size()); + tmp.reset(*from, len); + from->data_read_advance(len); + ASSERT_TRUE(to->try_append_from(tmp, true)); + } + + /// Consumes (reads) a certain number of characters from a ptr. Characters + /// are compared to the expected output sequence. + void consume_helper(LinkedDataBufferPtr *p, size_t len) + { + while (len > 0 && p->size() > 0) + { + size_t avail; + const uint8_t *ptr = p->data_read_pointer(&avail); + if (avail > len) + { + avail = len; + } + int count = 0; + while (avail) + { + consume(*(ptr++)); + ++count; + --avail; + --len; + } + p->data_read_advance(count); + } + } + + /// @return the next byte of the generated sequence. + uint8_t generate() + { + return nextByte_++; + } + + /// Take in the next byte that came out at the end. Verifies that it is the + /// correct byte value. + void consume(uint8_t next_byte) + { + EXPECT_EQ(nextByteRead_, next_byte); + ++nextByteRead_; + } + + DataBuffer *b_; + unsigned lastFree_; + unsigned int randSeed_ {83012475}; + uint8_t nextByte_ {0}; + uint8_t nextByteRead_ {0}; + + BarrierNotifiable bn_; + BarrierNotifiable bn2_; + LinkedDataBufferPtr lnk_; + LinkedDataBufferPtr mid_; + LinkedDataBufferPtr end_; + std::vector> bns_; +}; + +TEST_F(DataBufferFuzzTest, small_fuzz) +{ + setup_basic_readwrite(); + run_fuzz(10); +} + +TEST_F(DataBufferFuzzTest, medium_fuzz) +{ + setup_basic_readwrite(); + run_fuzz(1000); +} + +TEST_F(DataBufferFuzzTest, large_fuzz) +{ + setup_basic_readwrite(); + run_fuzz(100000); +} + +TEST_F(DataBufferFuzzTest, small_duo) +{ + setup_write_transfer_read(); + run_fuzz(10); +} + +TEST_F(DataBufferFuzzTest, medium_duo) +{ + setup_write_transfer_read(); + run_fuzz(1000); +} + +TEST_F(DataBufferFuzzTest, large_duo) +{ + setup_write_transfer_read(); + run_fuzz(100000); +} + +TEST_F(DataBufferFuzzTest, small_tri) +{ + setup_write_transfer_read_transfer_read(); + run_fuzz(10); +} + +TEST_F(DataBufferFuzzTest, medium_tri) +{ + setup_write_transfer_read_transfer_read(); + run_fuzz(1000); +} + +TEST_F(DataBufferFuzzTest, large_tri) +{ + setup_write_transfer_read_transfer_read(); + run_fuzz(100000); +} diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index d7b940715..eb2ec8b6d 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -37,9 +37,20 @@ #define _UTILS_DATABUFFER_HXX_ #include "utils/Buffer.hxx" +#include "utils/LinkedObject.hxx" +#include "utils/macros.h" + +#ifdef GTEST +// #define DEBUG_DATA_BUFFER_FREE +#endif class DataBufferPool; +#ifdef DEBUG_DATA_BUFFER_FREE +class DataBuffer; +static void check_db_ownership(DataBuffer *p); +#endif + /// Specialization of the Buffer class that is designed for storing untyped /// data arrays. Adds the ability to treat the next pointers as links to /// consecutive data bytes, ref'ing and unref'ing a sequence of buffers in one @@ -118,7 +129,8 @@ public: } /// Releases one reference to all blocks of this buffer. This includes one - /// reference to the last block which may be a partially filled buffer. + /// reference to the last block which may be a partially filled + /// buffer. Calling with zero length will call release on the head block. /// @param total_size the number of bytes starting from the beginning of /// *this. void unref_all(unsigned total_size) @@ -165,6 +177,17 @@ public: return curr->next(); } +#ifdef DEBUG_DATA_BUFFER_FREE + void unref() + { + if (references() == 1) + { + check_db_ownership(this); + } + Buffer::unref(); + } +#endif + private: friend class DataBufferPool; @@ -178,6 +201,9 @@ using DataBufferPtr = std::unique_ptr>; /// A class that keeps ownership of a chain of linked DataBuffer references. class LinkedDataBufferPtr +#ifdef DEBUG_DATA_BUFFER_FREE + : public LinkedObject +#endif { public: LinkedDataBufferPtr() @@ -231,6 +257,15 @@ public: { size = o.size_; } + if ((size_t)size > o.size_) + { + size = o.size_; + } + if (!size) + { + // Nothing to copy, this will be an empty buffer. + return; + } skip_ = o.skip_; size_ = size; // Takes references, keeping the tail and tail size. @@ -278,7 +313,7 @@ public: reset(buf); return; } - HASSERT(free_ >= 0); + HASSERT(free_ >= 0); // appendable HASSERT(tail_); // Note: if free_ was > 0, there were some unused bytes in the tail // buffer. However, as part of the append operation, we lose these @@ -297,9 +332,16 @@ public: { if (head_) { - head_->unref_all(size_ + skip_); + auto *h = head_; + size_t len = size_ + skip_; + clear(); + h->unref_all(len); + return; + } + else + { + clear(); } - clear(); } /// @return the pointer where data can be appended into the tail of this @@ -324,33 +366,64 @@ public: size_ += len; } + /// Retrieves a pointer where data can be read out of the buffer. + /// @param len will be filled in with the number of available bytes to read + /// at this point. + /// @return the read pointer, or nullptr if there is no data in this + /// buffer. + const uint8_t *data_read_pointer(size_t *len) + { + if (!head_ || !size_) + { + *len = 0; + return nullptr; + } + unsigned avail = 0; + uint8_t *p; + head_->get_read_pointer(skip_, &p, &avail); + if (avail > size_) + { + avail = size_; + } + *len = avail; + return p; + } + /// Advances the head pointer. Typically used after a successful read /// happened. /// @param len how many bytes to advance the read pointer. void data_read_advance(size_t len) { HASSERT(len <= size()); - while (len > 0) + skip_ += len; + size_ -= len; + while (head_ && skip_ >= head_->size()) { - uint8_t *p; - unsigned available; - DataBuffer *next_head = - head_->get_read_pointer(skip_, &p, &available); - if ((len > available) || (len == available && len < size_)) + if (head_ == tail_) { - head_->unref(); - head_ = next_head; - skip_ = 0; - size_ -= available; - len -= available; + if (free() > 0) + { + // We can still write into this buffer, do not unref it. + break; + } + else + { + // We're ending up with an empty linkedbuffer. + auto *b = head_; + clear(); + b->unref(); + return; + } } - else + skip_ -= head_->size(); + auto *b = head_; + auto *next_head = head_->next(); + head_ = next_head; + if (!head_) { - skip_ += len; - size_ -= len; - len = 0; - break; + tail_ = nullptr; } + b->unref(); } } @@ -456,37 +529,81 @@ public: /// this. This tries to do `*this += o`. It will succeed if o.head() == /// this->tail() and the bytes in these buffers are back to back. /// @param o a LinkedDataBuffer with data payload. + /// @param add_link creates a tail-to-head link if none exist yet between + /// *this and o.head_. This is fundamentally dangerous, do it only if there + /// is no shared ownership of this->tail_. /// @return true if append succeeded. If false, nothing was changed. - bool try_append_from(const LinkedDataBufferPtr &o) + bool try_append_from(const LinkedDataBufferPtr &o, bool add_link = false) { if (!o.size()) { return true; // zero bytes, nothing to do. } + if (!size_) + { + // We are empty, so anything can be appended. + reset(o); + return true; + } if (free_ >= 0) { // writeable buffer, cannot append. return false; } HASSERT(o.head()); - if (o.head() != tail_) + if (o.head() != tail_) // Buffer does not start in the same chain where + // we end. { - // Buffer does not start in the same chain where we end. - return false; + HASSERT(tail_); // else we went into the !size_ branch above + + // Checks if the end of the tail buffer is already reached. This + // means that we don't depend on the value of free_ anymore for + // correctness. We also check that o starts at the beginning of the + // head buffer. + if (tail_->size() == (size_t)-free_ && o.skip() == 0) + { + if (tail_->next() == o.head()) + { + // link already exists + } + else if (add_link && tail_->next() == nullptr) + { + tail_->set_next(o.head()); + } + else + { + return false; + } + } + else + { + return false; + } } - if (-free_ != (int)o.skip()) + else if (-free_ != (int)o.skip()) { // Not back-to-back. return false; } // Now we're good, so take over the extra buffers. - tail_ = o.tail_; - free_ = o.free_; - size_ += o.size_; // Acquire extra references o.head_->ref_all(o.skip() + o.size()); - // Release duplicate reference between the two chains. - o.head_->unref(); + if (tail_ == o.head()) + { + HASSERT(o.head_->references() > 1); + // Release duplicate reference between the two chains. + o.head_->unref(); + } + tail_ = o.tail_; + if (o.free_ < 0) + { + free_ = o.free_; + } + else + { + free_ = -tail_->size(); + } + size_ += o.size_; return true; } @@ -516,6 +633,24 @@ private: int16_t free_ {0}; }; +#ifdef DEBUG_DATA_BUFFER_FREE +void check_db_ownership(DataBuffer *b) +{ + AtomicHolder h(LinkedDataBufferPtr::head_mu()); + for (LinkedDataBufferPtr *l = LinkedDataBufferPtr::link_head(); l; + l = l->link_next()) + { + ssize_t total = l->skip() + l->size(); + for (DataBuffer *curr = l->head(); total > 0;) + { + HASSERT(curr != b); + total -= curr->size(); + curr = curr->next(); + } + } +} +#endif + /// Proxy Pool that can allocate DataBuffer objects of a certain size. All /// memory comes from the mainBufferPool. class DataBufferPool : public Pool diff --git a/src/utils/HubDeviceSelect.hxx b/src/utils/HubDeviceSelect.hxx index 9080297c8..fbe40634a 100644 --- a/src/utils/HubDeviceSelect.hxx +++ b/src/utils/HubDeviceSelect.hxx @@ -173,8 +173,8 @@ public: , dst_(dst) , skipMember_(skip_member) { - this->start_flow(STATE(allocate_buffer)); set_limit_input(shouldThrottle_); + this->start_flow(STATE(allocate_buffer)); } void set_limit_input(bool should_throttle) @@ -243,9 +243,9 @@ public: { /// Error reading the socket. b_->unref(); - notify_barrier(); set_terminated(); device()->report_read_error(); + notify_barrier(); return exit(); } SelectBufferInfo::check_target_size( @@ -313,6 +313,7 @@ public: on_error ? on_error : EmptyNotifiable::DefaultInstance()); barrier_.new_child(); hub_->register_port(write_port()); + isRegistered_ = true; } #endif @@ -340,6 +341,7 @@ public: ::fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK); #endif hub_->register_port(write_port()); + isRegistered_ = true; } /// If the barrier has not been called yet, will notify it inline. @@ -347,22 +349,21 @@ public: { if (fd_ >= 0) { unregister_write_port(); - int fd = -1; - executor()->sync_run([this, &fd]() - { - fd = fd_; - fd_ = -1; - readFlow_.shutdown(); - writeFlow_.shutdown(); - }); - ::close(fd); - bool completed = false; - while (!completed) { - executor()->sync_run([this, &completed]() - { - if (barrier_.is_done()) completed = true; - }); - } + close_fd(); + executor()->sync_run([this]() { + readFlow_.shutdown(); + writeFlow_.shutdown(); + }); + } + bool completed = false; + while (!completed) + { + executor()->sync_run([this, &completed]() { + if (barrier_.is_done()) + { + completed = true; + } + }); } } @@ -383,6 +384,14 @@ public: { LOG(VERBOSE, "HubDeviceSelect::unregister write port %p %p", write_port(), &writeFlow_); + { + AtomicHolder h(this); + if (!isRegistered_) + { + return; + } + isRegistered_ = false; + } hub_->unregister_port(&writeFlow_); /* We put an empty message at the end of the queue. This will cause * wait until all pending messages are dealt with, and then ping the @@ -473,10 +482,7 @@ protected: { readFlow_.shutdown(); unregister_write_port(); - if (fd_ >= 0) { - ::close(fd_); - fd_ = -1; - } + close_fd(); } /** Callback from the ReadFlow when the read call has seen an error. The @@ -485,10 +491,29 @@ protected: void report_read_error() override { unregister_write_port(); - if (fd_ >= 0) { - ::close(fd_); + close_fd(); + } + + void close_fd() + { + int fd = -1; + { + AtomicHolder h(this); + fd = fd_; + if (fd < 0) + { + return; + } fd_ = -1; } + // This is a workaround that sometimes my linux kernel gets stuck in + // ::read when I closed the fd like this, even though the fd is + // O_NONBLOCK. + executor()->add(new CallbackExecutable([this, fd]() { + ::close(fd); + readFlow_.shutdown(); + writeFlow_.shutdown(); + })); } /// Hub whose data we are trying to send. @@ -498,6 +523,9 @@ protected: /// StateFlow for writing data to the fd. Woken by data to send or the fd /// being writeable. WriteFlow writeFlow_; + /// True when the write flow is registered in the hub. Used to synchronize + /// different and concurrent shutdown paths. Protected by Atomic this. + bool isRegistered_; }; #endif // FEATURE_EXECUTOR_SELECT diff --git a/src/utils/test_main.hxx b/src/utils/test_main.hxx index ac23c6aa9..5dc306a4f 100644 --- a/src/utils/test_main.hxx +++ b/src/utils/test_main.hxx @@ -112,8 +112,8 @@ Service g_service(&g_executor); * the last command in a TEST_F. */ void wait_for_main_executor() { - ExecutorGuard guard(&g_executor); - guard.wait_for_notification(); + std::unique_ptr guard(new ExecutorGuard(&g_executor)); + guard->wait_for_notification(); }