From 2f73be916b93cf15574539f1942e168b57a96fe1 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 6 Jul 2024 22:48:21 +0200 Subject: [PATCH 01/13] Adds more convenient read API to databuffer. --- src/utils/DataBuffer.hxx | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index d7b940715..03956238b 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -324,6 +324,29 @@ public: size_ += len; } + /// Retrieves a pointer where data can be read out of the buffer. + /// @param len will be filled in with the number of available bytes to read + /// at this point. + /// @return the read pointer, or nullptr if there is no data in this + /// buffer. + const uint8_t* data_read_pointer(size_t* len) + { + if (!head_ || !size_) + { + *len = 0; + return nullptr; + } + unsigned avail = 0; + uint8_t *p; + head_->get_read_pointer(skip_, &p, &avail); + if (avail > size_) + { + avail = size_; + } + *len = avail; + return p; + } + /// Advances the head pointer. Typically used after a successful read /// happened. /// @param len how many bytes to advance the read pointer. @@ -334,6 +357,7 @@ public: { uint8_t *p; unsigned available; + HASSERT(skip_ < head_->size()); DataBuffer *next_head = head_->get_read_pointer(skip_, &p, &available); if ((len > available) || (len == available && len < size_)) @@ -346,6 +370,7 @@ public: } else { + // now: len < available || len == available == size_ skip_ += len; size_ -= len; len = 0; From 2c14d301877ba133f77f5a428fbe2ba733f46171 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Tue, 9 Jul 2024 09:06:01 +0200 Subject: [PATCH 02/13] Adds fuzz test to data buffer. Rewrites conditions in move read pointer forward, because it was incorrect. --- src/utils/DataBuffer.cxxtest | 137 +++++++++++++++++++++++++++++++++++ src/utils/DataBuffer.hxx | 32 +++++--- 2 files changed, 158 insertions(+), 11 deletions(-) diff --git a/src/utils/DataBuffer.cxxtest b/src/utils/DataBuffer.cxxtest index ab52ca84b..a85a00a3a 100644 --- a/src/utils/DataBuffer.cxxtest +++ b/src/utils/DataBuffer.cxxtest @@ -3,6 +3,7 @@ #include "utils/test_main.hxx" DataBufferPool g_pool(64); +DataBufferPool g_pool10(10); class DataBufferTest : public ::testing::Test { @@ -390,3 +391,139 @@ TEST_F(DataBufferTest, lnk_multi) // The barriers will verify upon destruction time that they were correctly // notified. } + +class DataBufferFuzzTest : public ::testing::Test +{ +protected: + DataBufferFuzzTest() + { + for (int i = 0; i < NUM_OP; ++i) { + freq_[i] = 1; + } + freq_[NUM_OP] = 0; + } + + enum Op { + OP_APPEND, + OP_READ, + NUM_OP + }; + + int freq_[NUM_OP + 1]; + + /// @return a pseudorandom number uniformly distributed between 0 and max - + /// 1. + /// @param max distribution parameter. + unsigned get_random_uni(unsigned max) + { + return rand_r(&randSeed_) % max; + } + + void prep_fuzz() { + int sum = 0; + for (int i = 0; i <= NUM_OP; ++i) { + sum += freq_[i]; + freq_[i] = sum; + } + } + + void run_fuzz(unsigned iter) { + prep_fuzz(); + + while (--iter && !HasFatalFailure()) + { + int oper = get_random_uni(freq_[NUM_OP]); + for (int i = 0; i < NUM_OP; ++i) { + if (freq_[i] > oper) { + run_op((Op)i); + break; + } + } + } + } + + void run_op(Op op) { + switch(op) { + case OP_APPEND: { + int len = get_random_uni(22); + while(len) { + int free = lnk_.free(); + if (!free) { + DataBuffer *c; + g_pool10.alloc(&c); + lnk_.append_empty_buffer(c); + continue; + } + auto* p = lnk_.data_write_pointer(); + int count = 0; + while (free > 0 && len > 0) { + *p++ = nextByte_++; + --free; + --len; + ++count; + } + lnk_.data_write_advance(count); + } + break; + } + case OP_READ: { + int len = get_random_uni(22); + while(len > 0 && lnk_.size() > 0) { + size_t avail; + const uint8_t* ptr = lnk_.data_read_pointer(&avail); + if ((int)avail > len) { + avail = len; + } + int count = 0; + while (avail) { + EXPECT_EQ(nextByteRead_, *ptr); + ++nextByteRead_; + ++ptr; + ++count; + --avail; + --len; + } + lnk_.data_read_advance(count); + } + break; + } + default: + return; + } + } + + std::string flatten(const LinkedDataBufferPtr &p) + { + std::string ret; + p.append_to(&ret); + return ret; + } + + DataBuffer *b_; + unsigned lastFree_; + unsigned int randSeed_{83012475}; + uint8_t nextByte_{0}; + uint8_t nextByteRead_{0}; + + BarrierNotifiable bn_; + BarrierNotifiable bn2_; + LinkedDataBufferPtr lnk_; + std::vector > bns_; +}; + +TEST_F(DataBufferFuzzTest, small_fuzz) { + run_fuzz(10); +} + +TEST_F(DataBufferFuzzTest, medium_fuzz) { + run_fuzz(1000); +} + +TEST_F(DataBufferFuzzTest, large_fuzz) { + run_fuzz(100000); +} + +TEST_F(DataBufferFuzzTest, xl_fuzz) { + run_fuzz(1000000); +} + diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index 03956238b..4a125cf77 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -278,7 +278,7 @@ public: reset(buf); return; } - HASSERT(free_ >= 0); + HASSERT(free_ >= 0); // appendable HASSERT(tail_); // Note: if free_ was > 0, there were some unused bytes in the tail // buffer. However, as part of the append operation, we lose these @@ -357,24 +357,34 @@ public: { uint8_t *p; unsigned available; - HASSERT(skip_ < head_->size()); - DataBuffer *next_head = - head_->get_read_pointer(skip_, &p, &available); - if ((len > available) || (len == available && len < size_)) + while (head_ && skip_ >= head_->size()) { + skip_ -= head_->size(); + auto *next_head = head_->next(); head_->unref(); head_ = next_head; - skip_ = 0; - size_ -= available; - len -= available; + if (!head_) + tail_ = nullptr; } - else - { - // now: len < available || len == available == size_ + HASSERT(skip_ < head_->size()); + DataBuffer *next_head = + head_->get_read_pointer(skip_, &p, &available); + if (len < available || + (len == available && head_ == tail_ && free() > 0)) { + // now: len < available || len == available && there is free + // space in that buffer still skip_ += len; size_ -= len; len = 0; + // keeps the head buffer because we can still write into it. break; + } else { + // now: len >= available. + head_->unref(); + head_ = next_head; + skip_ = 0; + size_ -= available; + len -= available; } } } From 7017a30b18c2f82602584d7218a5f2a50147d4fd Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Fri, 12 Jul 2024 22:41:32 +0200 Subject: [PATCH 03/13] Ensures that try_append_to will always complete for an empty data buffer. --- src/utils/DataBuffer.hxx | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index 4a125cf77..01cc04dba 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -504,6 +504,11 @@ public: return false; } HASSERT(o.head()); + if (!size_) { + // We are empty, so anything can be appended. + reset(o); + return true; + } if (o.head() != tail_) { // Buffer does not start in the same chain where we end. From e97484e7deb3ca9f0c5955e70046a02f52ef151f Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 13 Jul 2024 15:24:31 +0200 Subject: [PATCH 04/13] Adds more fuzz testing capability to the data buffer test. --- src/utils/DataBuffer.cxxtest | 169 +++++++++++++++++++++++++++-------- 1 file changed, 131 insertions(+), 38 deletions(-) diff --git a/src/utils/DataBuffer.cxxtest b/src/utils/DataBuffer.cxxtest index a85a00a3a..84a079486 100644 --- a/src/utils/DataBuffer.cxxtest +++ b/src/utils/DataBuffer.cxxtest @@ -398,14 +398,19 @@ protected: DataBufferFuzzTest() { for (int i = 0; i < NUM_OP; ++i) { - freq_[i] = 1; + freq_[i] = 0; } + freq_[0] = 1; freq_[NUM_OP] = 0; } enum Op { OP_APPEND, OP_READ, + OP_XFERMID, + OP_READMID, + OP_XFEREND, + OP_READEND, NUM_OP }; @@ -419,6 +424,21 @@ protected: return rand_r(&randSeed_) % max; } + /// Setup a fuzz test scenario where we append a given LinkedDataBufferPtr + /// and then read from the same one. + void setup_basic_readwrite() { + freq_[OP_APPEND] = 1; + freq_[OP_READ] = 1; + } + + /// Setup a fuzz test scenario where we append one LinkedDataBufferPtr, + /// then move data to a middle one, then read that middle one. + void setup_write_transfer_read() { + freq_[OP_APPEND] = 1; + freq_[OP_XFERMID] = 1; + freq_[OP_READMID] = 1; + } + void prep_fuzz() { int sum = 0; for (int i = 0; i <= NUM_OP; ++i) { @@ -446,45 +466,32 @@ protected: switch(op) { case OP_APPEND: { int len = get_random_uni(22); - while(len) { - int free = lnk_.free(); - if (!free) { - DataBuffer *c; - g_pool10.alloc(&c); - lnk_.append_empty_buffer(c); - continue; - } - auto* p = lnk_.data_write_pointer(); - int count = 0; - while (free > 0 && len > 0) { - *p++ = nextByte_++; - --free; - --len; - ++count; - } - lnk_.data_write_advance(count); - } + append_helper(&lnk_, len); break; } case OP_READ: { int len = get_random_uni(22); - while(len > 0 && lnk_.size() > 0) { - size_t avail; - const uint8_t* ptr = lnk_.data_read_pointer(&avail); - if ((int)avail > len) { - avail = len; - } - int count = 0; - while (avail) { - EXPECT_EQ(nextByteRead_, *ptr); - ++nextByteRead_; - ++ptr; - ++count; - --avail; - --len; - } - lnk_.data_read_advance(count); - } + consume_helper(&lnk_, len); + break; + } + case OP_XFERMID: { + int len = get_random_uni(22); + xfer_helper(&lnk_, &mid_, len); + break; + } + case OP_READMID: { + int len = get_random_uni(22); + consume_helper(&mid_, len); + break; + } + case OP_XFEREND: { + int len = get_random_uni(22); + xfer_helper(&mid_, &end_, len); + break; + } + case OP_READEND: { + int len = get_random_uni(22); + consume_helper(&end_, len); break; } default: @@ -499,6 +506,81 @@ protected: return ret; } + /// Appends a certain number of characters to a ptr. Characters are always + /// taken in the input sequence. + void append_helper(LinkedDataBufferPtr *p, size_t len) + { + while (len) + { + int free = p->free(); + if (!free) + { + DataBuffer *c; + g_pool10.alloc(&c); + p->append_empty_buffer(c); + continue; + } + auto *rp = p->data_write_pointer(); + int count = 0; + while (free > 0 && len > 0) + { + *rp++ = generate(); + --free; + --len; + ++count; + } + p->data_write_advance(count); + } + } + + /// Appends a certain number of characters to a ptr. Characters are always + /// taken in the input sequence. + void xfer_helper( + LinkedDataBufferPtr *from, LinkedDataBufferPtr *to, size_t len) + { + LinkedDataBufferPtr tmp; + len = std::min(len, (size_t)from->size()); + tmp.reset(*from, len); + from->data_read_advance(len); + ASSERT_TRUE(to->try_append_from(tmp)); + } + + /// Consumes (reads) a certain number of characters from a ptr. Characters + /// are compared to the expected output sequence. + void consume_helper(LinkedDataBufferPtr *p, size_t len) + { + while (len > 0 && p->size() > 0) + { + size_t avail; + const uint8_t *ptr = p->data_read_pointer(&avail); + if (avail > len) + { + avail = len; + } + int count = 0; + while (avail) + { + consume(*(ptr++)); + ++count; + --avail; + --len; + } + p->data_read_advance(count); + } + } + + /// @return the next byte of the generated sequence. + uint8_t generate() { + return nextByte_++; + } + + /// Take in the next byte that came out at the end. Verifies that it is the + /// correct byte value. + void consume(uint8_t next_byte) { + EXPECT_EQ(nextByteRead_, next_byte); + ++nextByteRead_; + } + DataBuffer *b_; unsigned lastFree_; unsigned int randSeed_{83012475}; @@ -508,22 +590,33 @@ protected: BarrierNotifiable bn_; BarrierNotifiable bn2_; LinkedDataBufferPtr lnk_; + LinkedDataBufferPtr mid_; + LinkedDataBufferPtr end_; std::vector > bns_; }; TEST_F(DataBufferFuzzTest, small_fuzz) { + setup_basic_readwrite(); run_fuzz(10); } TEST_F(DataBufferFuzzTest, medium_fuzz) { + setup_basic_readwrite(); run_fuzz(1000); } TEST_F(DataBufferFuzzTest, large_fuzz) { + setup_basic_readwrite(); run_fuzz(100000); } -TEST_F(DataBufferFuzzTest, xl_fuzz) { - run_fuzz(1000000); +TEST_F(DataBufferFuzzTest, small_duo) { + setup_write_transfer_read(); + run_fuzz(10); +} + +TEST_F(DataBufferFuzzTest, medium_duo) { + setup_write_transfer_read(); + run_fuzz(1000); } From 2009efacedb5a1ae45864265b108ee2059da4a53 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 13 Jul 2024 15:24:42 +0200 Subject: [PATCH 05/13] Fix some crashing cornercases. --- src/utils/DataBuffer.hxx | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index 01cc04dba..ce3dc10a5 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -231,6 +231,15 @@ public: { size = o.size_; } + if ((size_t)size > o.size_) + { + size = o.size_; + } + if (!size) + { + // Nothing to copy, this will be an empty buffer. + return; + } skip_ = o.skip_; size_ = size; // Takes references, keeping the tail and tail size. From 3d433db670bf0d6474767697da08ceaafc7c9958 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 13 Jul 2024 17:22:13 +0200 Subject: [PATCH 06/13] Adds more fuzz test debugging. --- src/utils/DataBuffer.cxxtest | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/utils/DataBuffer.cxxtest b/src/utils/DataBuffer.cxxtest index 84a079486..b680dd50c 100644 --- a/src/utils/DataBuffer.cxxtest +++ b/src/utils/DataBuffer.cxxtest @@ -449,13 +449,15 @@ protected: void run_fuzz(unsigned iter) { prep_fuzz(); - + size_t idx = 0; while (--iter && !HasFatalFailure()) { int oper = get_random_uni(freq_[NUM_OP]); for (int i = 0; i < NUM_OP; ++i) { if (freq_[i] > oper) { + SCOPED_TRACE(idx); run_op((Op)i); + ++idx; break; } } @@ -542,7 +544,7 @@ protected: len = std::min(len, (size_t)from->size()); tmp.reset(*from, len); from->data_read_advance(len); - ASSERT_TRUE(to->try_append_from(tmp)); + ASSERT_TRUE(to->try_append_from(tmp, true)); } /// Consumes (reads) a certain number of characters from a ptr. Characters From e3df236533103e433ab0ab3d1950d5ef6c215d7a Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 13 Jul 2024 17:24:11 +0200 Subject: [PATCH 07/13] Expands the cases where try_append_from succeeds. - Handles the case when the src or dst buffer is empty. - Detects when the append happens exactly at a buffer boundary and the buffers are already linked buffer. - Adds a parameter that makes this work even when the link is not there yet --- src/utils/DataBuffer.hxx | 52 +++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index ce3dc10a5..db8fc480e 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -500,42 +500,66 @@ public: /// this. This tries to do `*this += o`. It will succeed if o.head() == /// this->tail() and the bytes in these buffers are back to back. /// @param o a LinkedDataBuffer with data payload. + /// @param add_link creates a tail-to-head link if none exist yet between + /// *this and o.head_. This is fundamentally dangerous, do it only if there + /// is no shared ownership of this->tail_. /// @return true if append succeeded. If false, nothing was changed. - bool try_append_from(const LinkedDataBufferPtr &o) + bool try_append_from(const LinkedDataBufferPtr &o, bool add_link = false) { if (!o.size()) { return true; // zero bytes, nothing to do. } - if (free_ >= 0) - { - // writeable buffer, cannot append. - return false; - } - HASSERT(o.head()); if (!size_) { // We are empty, so anything can be appended. reset(o); return true; } - if (o.head() != tail_) + if (free_ >= 0) { - // Buffer does not start in the same chain where we end. + // writeable buffer, cannot append. return false; } - if (-free_ != (int)o.skip()) + HASSERT(o.head()); + if (o.head() != tail_) // Buffer does not start in the same chain where + // we end. + { + HASSERT(tail_); // else we went into the !size_ branch above + + // Checks if the end of the tail buffer is already reached. This + // means that we don't depend on the value of free_ anymore for + // correctness. We also check that o starts at the beginning of the + // head buffer. + if (tail_->size() == (size_t)-free_ && o.skip() == 0) { + if (tail_->next() == o.head()) { + // link already exists + } else if (add_link && tail_->next() == nullptr) { + tail_->set_next(o.head()); + } else { + return false; + } + } + else + { + return false; + } + } + else if (-free_ != (int)o.skip()) { // Not back-to-back. return false; } // Now we're good, so take over the extra buffers. + // Acquire extra references + o.head_->ref_all(o.skip() + o.size()); + if (tail_ == o.head()) { + HASSERT(o.head_->references() > 1); + // Release duplicate reference between the two chains. + o.head_->unref(); + } tail_ = o.tail_; free_ = o.free_; size_ += o.size_; - // Acquire extra references - o.head_->ref_all(o.skip() + o.size()); - // Release duplicate reference between the two chains. - o.head_->unref(); return true; } From cff032aa61949f1eee3fde3a90bc29ff212ac81b Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 13 Jul 2024 17:26:28 +0200 Subject: [PATCH 08/13] Adds a debugging facility that verifies that data buffer unrefs are correct. --- src/utils/DataBuffer.hxx | 57 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index db8fc480e..50f57802e 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -37,9 +37,18 @@ #define _UTILS_DATABUFFER_HXX_ #include "utils/Buffer.hxx" +#include "utils/LinkedObject.hxx" + +#define DEBUG_DATA_BUFFER_FREE class DataBufferPool; + +#ifdef DEBUG_DATA_BUFFER_FREE +class DataBuffer; +static void check_db_ownership(DataBuffer* p); +#endif + /// Specialization of the Buffer class that is designed for storing untyped /// data arrays. Adds the ability to treat the next pointers as links to /// consecutive data bytes, ref'ing and unref'ing a sequence of buffers in one @@ -165,6 +174,15 @@ public: return curr->next(); } +#ifdef DEBUG_DATA_BUFFER_FREE + void unref() { + if (references() == 1) { + check_db_ownership(this); + } + Buffer::unref(); + } +#endif + private: friend class DataBufferPool; @@ -176,8 +194,13 @@ private: using DataBufferPtr = std::unique_ptr>; +class LinkedDataBufferPtr; + /// A class that keeps ownership of a chain of linked DataBuffer references. class LinkedDataBufferPtr +#ifdef DEBUG_DATA_BUFFER_FREE + : public LinkedObject +#endif { public: LinkedDataBufferPtr() @@ -306,9 +329,16 @@ public: { if (head_) { - head_->unref_all(size_ + skip_); + auto *h = head_; + size_t len = size_ + skip_; + clear(); + h->unref_all(len); + return; + } + else + { + clear(); } - clear(); } /// @return the pointer where data can be appended into the tail of this @@ -369,11 +399,13 @@ public: while (head_ && skip_ >= head_->size()) { skip_ -= head_->size(); + auto *b = head_; auto *next_head = head_->next(); - head_->unref(); head_ = next_head; - if (!head_) + if (!head_) { tail_ = nullptr; + } + b->unref(); } HASSERT(skip_ < head_->size()); DataBuffer *next_head = @@ -389,11 +421,12 @@ public: break; } else { // now: len >= available. - head_->unref(); + auto *b = head_; head_ = next_head; skip_ = 0; size_ -= available; len -= available; + b->unref(); } } } @@ -589,6 +622,20 @@ private: int16_t free_ {0}; }; +#ifdef DEBUG_DATA_BUFFER_FREE +void check_db_ownership(DataBuffer* b) { + AtomicHolder h(LinkedDataBufferPtr::head_mu()); + for (LinkedDataBufferPtr* l = LinkedDataBufferPtr::link_head(); l; l = l->link_next()) { + ssize_t total = l->skip() + l->size(); + for (DataBuffer* curr = l->head(); total > 0;) { + HASSERT(curr != b); + total -= curr->size(); + curr = curr->next(); + } + } +} +#endif + /// Proxy Pool that can allocate DataBuffer objects of a certain size. All /// memory comes from the mainBufferPool. class DataBufferPool : public Pool From 9c9ae3e7fc85e66d319d0a64ca40ae30a6c1972d Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 13 Jul 2024 17:39:02 +0200 Subject: [PATCH 09/13] Finishes fuzz tests. --- src/utils/DataBuffer.cxxtest | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/utils/DataBuffer.cxxtest b/src/utils/DataBuffer.cxxtest index b680dd50c..48b242d36 100644 --- a/src/utils/DataBuffer.cxxtest +++ b/src/utils/DataBuffer.cxxtest @@ -438,7 +438,17 @@ protected: freq_[OP_XFERMID] = 1; freq_[OP_READMID] = 1; } - + + /// Setup a fuzz test scenario where we append one LinkedDataBufferPtr, + /// then move data to a middle one, then move data to a third one, then + /// read that last. + void setup_write_transfer_read_transfer_read() { + freq_[OP_APPEND] = 1; + freq_[OP_XFERMID] = 1; + freq_[OP_XFEREND] = 1; + freq_[OP_READEND] = 1; + } + void prep_fuzz() { int sum = 0; for (int i = 0; i <= NUM_OP; ++i) { @@ -622,3 +632,22 @@ TEST_F(DataBufferFuzzTest, medium_duo) { run_fuzz(1000); } +TEST_F(DataBufferFuzzTest, large_duo) { + setup_write_transfer_read(); + run_fuzz(100000); +} + +TEST_F(DataBufferFuzzTest, small_tri) { + setup_write_transfer_read_transfer_read(); + run_fuzz(10); +} + +TEST_F(DataBufferFuzzTest, medium_tri) { + setup_write_transfer_read_transfer_read(); + run_fuzz(1000); +} + +TEST_F(DataBufferFuzzTest, large_tri) { + setup_write_transfer_read_transfer_read(); + run_fuzz(100000); +} From 18c88d07ec4957b9d3fd140a9d03f1732f317f0d Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 13 Jul 2024 17:39:45 +0200 Subject: [PATCH 10/13] Rewrite data read advance to be much simpler. This fixes a buffer refcount bug when a buffer ended up with a zero length and too few refs. --- src/utils/DataBuffer.hxx | 62 +++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index 50f57802e..b0b2e5ae5 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -39,7 +39,9 @@ #include "utils/Buffer.hxx" #include "utils/LinkedObject.hxx" -#define DEBUG_DATA_BUFFER_FREE +#ifdef GTEST +//#define DEBUG_DATA_BUFFER_FREE +#endif class DataBufferPool; @@ -127,7 +129,8 @@ public: } /// Releases one reference to all blocks of this buffer. This includes one - /// reference to the last block which may be a partially filled buffer. + /// reference to the last block which may be a partially filled + /// buffer. Calling with zero length will call release on the head block. /// @param total_size the number of bytes starting from the beginning of /// *this. void unref_all(unsigned total_size) @@ -392,42 +395,35 @@ public: void data_read_advance(size_t len) { HASSERT(len <= size()); - while (len > 0) + skip_ += len; + size_ -= len; + while (head_ && skip_ >= head_->size()) { - uint8_t *p; - unsigned available; - while (head_ && skip_ >= head_->size()) + if (head_ == tail_) { - skip_ -= head_->size(); - auto *b = head_; - auto *next_head = head_->next(); - head_ = next_head; - if (!head_) { - tail_ = nullptr; + if (free() > 0) + { + // We can still write into this buffer, do not unref it. + break; + } + else + { + // We're ending up with an empty linkedbuffer. + auto *b = head_; + clear(); + b->unref(); + return; } - b->unref(); } - HASSERT(skip_ < head_->size()); - DataBuffer *next_head = - head_->get_read_pointer(skip_, &p, &available); - if (len < available || - (len == available && head_ == tail_ && free() > 0)) { - // now: len < available || len == available && there is free - // space in that buffer still - skip_ += len; - size_ -= len; - len = 0; - // keeps the head buffer because we can still write into it. - break; - } else { - // now: len >= available. - auto *b = head_; - head_ = next_head; - skip_ = 0; - size_ -= available; - len -= available; - b->unref(); + skip_ -= head_->size(); + auto *b = head_; + auto *next_head = head_->next(); + head_ = next_head; + if (!head_) + { + tail_ = nullptr; } + b->unref(); } } From bc8997dc932ffc2e375480cacd8ee1ad684b0785 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Sat, 13 Jul 2024 22:27:57 +0200 Subject: [PATCH 11/13] Fix bug when we take bytes from an appendable data buffer. We took over o.free_ but this makes the current buffer appendable, which is wrong. --- src/utils/DataBuffer.hxx | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index b0b2e5ae5..1e2695bff 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -38,6 +38,8 @@ #include "utils/Buffer.hxx" #include "utils/LinkedObject.hxx" +#include "utils/macros.h" + #ifdef GTEST //#define DEBUG_DATA_BUFFER_FREE @@ -587,7 +589,11 @@ public: o.head_->unref(); } tail_ = o.tail_; - free_ = o.free_; + if (o.free_ < 0) { + free_ = o.free_; + } else { + free_ = -tail_->size(); + } size_ += o.size_; return true; } From 754c065e780646cc367e5be2b423d55cce48cff4 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Tue, 20 Aug 2024 15:48:12 +0200 Subject: [PATCH 12/13] Removes unnecessary forward declaration. --- src/utils/DataBuffer.hxx | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index 1e2695bff..0141b18e8 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -199,8 +199,6 @@ private: using DataBufferPtr = std::unique_ptr>; -class LinkedDataBufferPtr; - /// A class that keeps ownership of a chain of linked DataBuffer references. class LinkedDataBufferPtr #ifdef DEBUG_DATA_BUFFER_FREE From 4444eafc9c3516c03c30a65f173c5861c6d0433f Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Tue, 20 Aug 2024 15:49:12 +0200 Subject: [PATCH 13/13] Fix indentation and style. --- src/utils/DataBuffer.cxxtest | 103 ++++++++++++++++++++++------------- src/utils/DataBuffer.hxx | 57 ++++++++++++------- 2 files changed, 102 insertions(+), 58 deletions(-) diff --git a/src/utils/DataBuffer.cxxtest b/src/utils/DataBuffer.cxxtest index 48b242d36..b5d88682b 100644 --- a/src/utils/DataBuffer.cxxtest +++ b/src/utils/DataBuffer.cxxtest @@ -397,14 +397,16 @@ class DataBufferFuzzTest : public ::testing::Test protected: DataBufferFuzzTest() { - for (int i = 0; i < NUM_OP; ++i) { + for (int i = 0; i < NUM_OP; ++i) + { freq_[i] = 0; } freq_[0] = 1; freq_[NUM_OP] = 0; } - enum Op { + enum Op + { OP_APPEND, OP_READ, OP_XFERMID, @@ -415,7 +417,7 @@ protected: }; int freq_[NUM_OP + 1]; - + /// @return a pseudorandom number uniformly distributed between 0 and max - /// 1. /// @param max distribution parameter. @@ -426,14 +428,16 @@ protected: /// Setup a fuzz test scenario where we append a given LinkedDataBufferPtr /// and then read from the same one. - void setup_basic_readwrite() { + void setup_basic_readwrite() + { freq_[OP_APPEND] = 1; freq_[OP_READ] = 1; } /// Setup a fuzz test scenario where we append one LinkedDataBufferPtr, /// then move data to a middle one, then read that middle one. - void setup_write_transfer_read() { + void setup_write_transfer_read() + { freq_[OP_APPEND] = 1; freq_[OP_XFERMID] = 1; freq_[OP_READMID] = 1; @@ -442,29 +446,35 @@ protected: /// Setup a fuzz test scenario where we append one LinkedDataBufferPtr, /// then move data to a middle one, then move data to a third one, then /// read that last. - void setup_write_transfer_read_transfer_read() { + void setup_write_transfer_read_transfer_read() + { freq_[OP_APPEND] = 1; freq_[OP_XFERMID] = 1; freq_[OP_XFEREND] = 1; freq_[OP_READEND] = 1; } - void prep_fuzz() { + void prep_fuzz() + { int sum = 0; - for (int i = 0; i <= NUM_OP; ++i) { + for (int i = 0; i <= NUM_OP; ++i) + { sum += freq_[i]; freq_[i] = sum; } } - void run_fuzz(unsigned iter) { + void run_fuzz(unsigned iter) + { prep_fuzz(); size_t idx = 0; while (--iter && !HasFatalFailure()) { int oper = get_random_uni(freq_[NUM_OP]); - for (int i = 0; i < NUM_OP; ++i) { - if (freq_[i] > oper) { + for (int i = 0; i < NUM_OP; ++i) + { + if (freq_[i] > oper) + { SCOPED_TRACE(idx); run_op((Op)i); ++idx; @@ -474,34 +484,42 @@ protected: } } - void run_op(Op op) { - switch(op) { - case OP_APPEND: { + void run_op(Op op) + { + switch (op) + { + case OP_APPEND: + { int len = get_random_uni(22); append_helper(&lnk_, len); break; } - case OP_READ: { + case OP_READ: + { int len = get_random_uni(22); consume_helper(&lnk_, len); break; } - case OP_XFERMID: { + case OP_XFERMID: + { int len = get_random_uni(22); xfer_helper(&lnk_, &mid_, len); break; } - case OP_READMID: { + case OP_READMID: + { int len = get_random_uni(22); consume_helper(&mid_, len); break; } - case OP_XFEREND: { + case OP_XFEREND: + { int len = get_random_uni(22); xfer_helper(&mid_, &end_, len); break; } - case OP_READEND: { + case OP_READEND: + { int len = get_random_uni(22); consume_helper(&end_, len); break; @@ -510,7 +528,7 @@ protected: return; } } - + std::string flatten(const LinkedDataBufferPtr &p) { std::string ret; @@ -556,7 +574,7 @@ protected: from->data_read_advance(len); ASSERT_TRUE(to->try_append_from(tmp, true)); } - + /// Consumes (reads) a certain number of characters from a ptr. Characters /// are compared to the expected output sequence. void consume_helper(LinkedDataBufferPtr *p, size_t len) @@ -582,72 +600,83 @@ protected: } /// @return the next byte of the generated sequence. - uint8_t generate() { + uint8_t generate() + { return nextByte_++; } /// Take in the next byte that came out at the end. Verifies that it is the /// correct byte value. - void consume(uint8_t next_byte) { + void consume(uint8_t next_byte) + { EXPECT_EQ(nextByteRead_, next_byte); ++nextByteRead_; } - + DataBuffer *b_; unsigned lastFree_; - unsigned int randSeed_{83012475}; - uint8_t nextByte_{0}; - uint8_t nextByteRead_{0}; + unsigned int randSeed_ {83012475}; + uint8_t nextByte_ {0}; + uint8_t nextByteRead_ {0}; BarrierNotifiable bn_; BarrierNotifiable bn2_; LinkedDataBufferPtr lnk_; LinkedDataBufferPtr mid_; LinkedDataBufferPtr end_; - std::vector > bns_; + std::vector> bns_; }; -TEST_F(DataBufferFuzzTest, small_fuzz) { +TEST_F(DataBufferFuzzTest, small_fuzz) +{ setup_basic_readwrite(); run_fuzz(10); } -TEST_F(DataBufferFuzzTest, medium_fuzz) { +TEST_F(DataBufferFuzzTest, medium_fuzz) +{ setup_basic_readwrite(); run_fuzz(1000); } -TEST_F(DataBufferFuzzTest, large_fuzz) { +TEST_F(DataBufferFuzzTest, large_fuzz) +{ setup_basic_readwrite(); run_fuzz(100000); } -TEST_F(DataBufferFuzzTest, small_duo) { +TEST_F(DataBufferFuzzTest, small_duo) +{ setup_write_transfer_read(); run_fuzz(10); } -TEST_F(DataBufferFuzzTest, medium_duo) { +TEST_F(DataBufferFuzzTest, medium_duo) +{ setup_write_transfer_read(); run_fuzz(1000); } -TEST_F(DataBufferFuzzTest, large_duo) { +TEST_F(DataBufferFuzzTest, large_duo) +{ setup_write_transfer_read(); run_fuzz(100000); } -TEST_F(DataBufferFuzzTest, small_tri) { +TEST_F(DataBufferFuzzTest, small_tri) +{ setup_write_transfer_read_transfer_read(); run_fuzz(10); } -TEST_F(DataBufferFuzzTest, medium_tri) { +TEST_F(DataBufferFuzzTest, medium_tri) +{ setup_write_transfer_read_transfer_read(); run_fuzz(1000); } -TEST_F(DataBufferFuzzTest, large_tri) { +TEST_F(DataBufferFuzzTest, large_tri) +{ setup_write_transfer_read_transfer_read(); run_fuzz(100000); } diff --git a/src/utils/DataBuffer.hxx b/src/utils/DataBuffer.hxx index 0141b18e8..eb2ec8b6d 100644 --- a/src/utils/DataBuffer.hxx +++ b/src/utils/DataBuffer.hxx @@ -40,17 +40,15 @@ #include "utils/LinkedObject.hxx" #include "utils/macros.h" - #ifdef GTEST -//#define DEBUG_DATA_BUFFER_FREE +// #define DEBUG_DATA_BUFFER_FREE #endif class DataBufferPool; - #ifdef DEBUG_DATA_BUFFER_FREE class DataBuffer; -static void check_db_ownership(DataBuffer* p); +static void check_db_ownership(DataBuffer *p); #endif /// Specialization of the Buffer class that is designed for storing untyped @@ -179,15 +177,17 @@ public: return curr->next(); } -#ifdef DEBUG_DATA_BUFFER_FREE - void unref() { - if (references() == 1) { +#ifdef DEBUG_DATA_BUFFER_FREE + void unref() + { + if (references() == 1) + { check_db_ownership(this); } Buffer::unref(); } -#endif - +#endif + private: friend class DataBufferPool; @@ -371,7 +371,7 @@ public: /// at this point. /// @return the read pointer, or nullptr if there is no data in this /// buffer. - const uint8_t* data_read_pointer(size_t* len) + const uint8_t *data_read_pointer(size_t *len) { if (!head_ || !size_) { @@ -539,7 +539,8 @@ public: { return true; // zero bytes, nothing to do. } - if (!size_) { + if (!size_) + { // We are empty, so anything can be appended. reset(o); return true; @@ -559,12 +560,18 @@ public: // means that we don't depend on the value of free_ anymore for // correctness. We also check that o starts at the beginning of the // head buffer. - if (tail_->size() == (size_t)-free_ && o.skip() == 0) { - if (tail_->next() == o.head()) { + if (tail_->size() == (size_t)-free_ && o.skip() == 0) + { + if (tail_->next() == o.head()) + { // link already exists - } else if (add_link && tail_->next() == nullptr) { + } + else if (add_link && tail_->next() == nullptr) + { tail_->set_next(o.head()); - } else { + } + else + { return false; } } @@ -581,15 +588,19 @@ public: // Now we're good, so take over the extra buffers. // Acquire extra references o.head_->ref_all(o.skip() + o.size()); - if (tail_ == o.head()) { + if (tail_ == o.head()) + { HASSERT(o.head_->references() > 1); // Release duplicate reference between the two chains. o.head_->unref(); } tail_ = o.tail_; - if (o.free_ < 0) { + if (o.free_ < 0) + { free_ = o.free_; - } else { + } + else + { free_ = -tail_->size(); } size_ += o.size_; @@ -623,11 +634,15 @@ private: }; #ifdef DEBUG_DATA_BUFFER_FREE -void check_db_ownership(DataBuffer* b) { +void check_db_ownership(DataBuffer *b) +{ AtomicHolder h(LinkedDataBufferPtr::head_mu()); - for (LinkedDataBufferPtr* l = LinkedDataBufferPtr::link_head(); l; l = l->link_next()) { + for (LinkedDataBufferPtr *l = LinkedDataBufferPtr::link_head(); l; + l = l->link_next()) + { ssize_t total = l->skip() + l->size(); - for (DataBuffer* curr = l->head(); total > 0;) { + for (DataBuffer *curr = l->head(); total > 0;) + { HASSERT(curr != b); total -= curr->size(); curr = curr->next();