From 85f1de6d7a00a27ae385abac766bebf766ecef25 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Wed, 30 Dec 2020 18:45:40 +0100 Subject: [PATCH 1/3] Adds a version of the Queue where the scheduling algorithm can be tuned by weights. --- src/utils/ScheduledQueue.cxxtest | 382 +++++++++++++++++++++++++++++++ src/utils/ScheduledQueue.hxx | 214 +++++++++++++++++ 2 files changed, 596 insertions(+) create mode 100644 src/utils/ScheduledQueue.cxxtest create mode 100644 src/utils/ScheduledQueue.hxx diff --git a/src/utils/ScheduledQueue.cxxtest b/src/utils/ScheduledQueue.cxxtest new file mode 100644 index 000000000..e4dfb358b --- /dev/null +++ b/src/utils/ScheduledQueue.cxxtest @@ -0,0 +1,382 @@ +#include "utils/ScheduledQueue.hxx" + +#include "utils/test_main.hxx" + +class ScheduledQueueTest : public ::testing::Test +{ +protected: + ScheduledQueueTest() + { + entries_.resize(100); + for (unsigned i = 0; i < entries_.size(); ++i) + { + entries_[i].idx = i; + } + } + + ~ScheduledQueueTest() + { + if (!q_) + return; + while (!q_->empty()) + { + take(); + } + for (unsigned i = 0; i < nextEntry_; ++i) + { + EXPECT_TRUE(entries_[i].queued); + EXPECT_TRUE(entries_[i].done); + } + } + + struct Entry : QMember + { + /// Which entry this is in the vector. + unsigned idx; + /// True if this entry was added to the queue. + bool queued {false}; + /// True if this entry was returned from the queue. + bool done {false}; + }; + + /// Adds a new entry to the queue. + /// @param prio which priority band ot add to + /// @return the index of the new member. + unsigned add_empty(unsigned prio) + { + HASSERT(nextEntry_ < entries_.size()); + Entry *e = &entries_[nextEntry_++]; + e->queued = true; + q_->insert(e, prio); + return e->idx; + } + + /// Takes the next entry from the queue. Fills in lastIdx_ and lastPrio_. + void take() + { + auto ret = q_->next(); + if (!ret.item) + { + lastIdx_ = EMPTY; + lastPrio_ = EMPTY; + } + else + { + lastPrio_ = ret.index; + if (frequency_.size() <= lastPrio_) + { + frequency_.resize(lastPrio_ + 1); + } + frequency_[lastPrio_]++; + auto *e = (Entry *)ret.item; + lastIdx_ = e->idx; + ASSERT_TRUE(e->queued); + ASSERT_FALSE(e->done); + e->done = true; + } + } + + /// Takes an entry from the queue and returns its index. + unsigned take_idx() + { + take(); + return lastIdx_; + } + + /// Takes an entry from the queue and returns its priority. + unsigned take_prio() + { + take(); + return lastPrio_; + } + + /// Runs a statistical test with always full priority bands and returns the + /// percentage bandwidth allocated. + /// @param count how many iterations. + std::vector run_stat_test(unsigned count) + { + // Fills each priority band with 10 entries. + for (unsigned p = 0; p < q_->num_prio(); ++p) + { + for (unsigned i = 0; i < 2; i++) + { + add_empty(p); + } + } + + EXPECT_EQ(2 * q_->num_prio(), q_->pending()); + + for (unsigned i = 0; i < count; i++) + { + take(); + // re-add the same entry. + entries_[lastIdx_].done = false; + q_->insert(&entries_[lastIdx_], lastPrio_); + } + + std::vector v(frequency_.begin(), frequency_.end()); + for (unsigned p = 0; p < q_->num_prio(); ++p) + { + v[p] /= count; + } + return v; + } + + /// Pre-allocated entries. + vector entries_; + /// Index where to take next entry from. + unsigned nextEntry_ {0}; + /// The queue object under test. + std::unique_ptr q_; + /// Frequency of removals. index: priority. value: count. + vector frequency_; + + /// Index of the last returned entry. + unsigned lastIdx_ {0}; + /// Priority of the last returned entry. + unsigned lastPrio_ {0}; + + static constexpr unsigned EMPTY = 0xffffu; +}; + +TEST_F(ScheduledQueueTest, create) +{ +} + +TEST_F(ScheduledQueueTest, empty) +{ + constexpr Fixed16 ps[] = {{1, 0}}; + q_.reset(new ScheduledQueue(1, ps)); + + EXPECT_TRUE(q_->empty()); +} + +TEST_F(ScheduledQueueTest, fifo) +{ + constexpr Fixed16 ps[] = {{1, 0}}; + q_.reset(new ScheduledQueue(1, ps)); + add_empty(0); + add_empty(0); + add_empty(0); + EXPECT_EQ(0u, take_idx()); + EXPECT_EQ(1u, take_idx()); + add_empty(0); + EXPECT_EQ(2u, take_idx()); + add_empty(0); + add_empty(0); + EXPECT_EQ(3u, take_idx()); + EXPECT_EQ(4u, take_idx()); + + EXPECT_FALSE(q_->empty()); +} + +TEST_F(ScheduledQueueTest, strict_prio) +{ + constexpr Fixed16 ps[] = {{1, 0}, {1, 0}}; + q_.reset(new ScheduledQueue(2, ps)); + add_empty(0); + add_empty(1); + add_empty(0); + add_empty(1); + add_empty(0); + add_empty(1); + // We get back first the entries from the zero priority. + EXPECT_EQ(0u, take_idx()); + EXPECT_EQ(0u, lastPrio_); + EXPECT_EQ(2u, take_idx()); + EXPECT_EQ(0u, lastPrio_); + EXPECT_EQ(4u, take_idx()); + EXPECT_EQ(0u, lastPrio_); + + // Then the entries from the zero priority. + EXPECT_EQ(1u, take_idx()); + EXPECT_EQ(1u, lastPrio_); + EXPECT_EQ(3u, take_idx()); + EXPECT_EQ(1u, lastPrio_); +} + +TEST_F(ScheduledQueueTest, prio_pending_empty) +{ + constexpr Fixed16 ps[] = {{1, 0}, {1, 0}, {1, 0}, {1, 0}}; + q_.reset(new ScheduledQueue(4, ps)); + add_empty(0); + add_empty(1); + add_empty(2); + add_empty(2); + + // Checks empty and num pending by band + EXPECT_FALSE(q_->empty()); + EXPECT_EQ(4u, q_->pending()); + EXPECT_EQ(1u, q_->pending(0)); + EXPECT_EQ(1u, q_->pending(1)); + EXPECT_EQ(2u, q_->pending(2)); + EXPECT_EQ(0u, q_->pending(3)); + + take(); + + EXPECT_EQ(3u, q_->pending()); + EXPECT_EQ(0u, q_->pending(0)); + EXPECT_EQ(1u, q_->pending(1)); + EXPECT_EQ(2u, q_->pending(2)); + EXPECT_EQ(0u, q_->pending(3)); + EXPECT_FALSE(q_->empty()); + + take(); + + EXPECT_EQ(2u, q_->pending()); + EXPECT_EQ(0u, q_->pending(0)); + EXPECT_EQ(0u, q_->pending(1)); + EXPECT_EQ(2u, q_->pending(2)); + EXPECT_EQ(0u, q_->pending(3)); + EXPECT_FALSE(q_->empty()); + + take(); + + EXPECT_EQ(1u, q_->pending()); + EXPECT_EQ(0u, q_->pending(0)); + EXPECT_EQ(0u, q_->pending(1)); + EXPECT_EQ(1u, q_->pending(2)); + EXPECT_EQ(0u, q_->pending(3)); + EXPECT_FALSE(q_->empty()); + + take(); + + EXPECT_EQ(0u, q_->pending()); + EXPECT_EQ(0u, q_->pending(0)); + EXPECT_EQ(0u, q_->pending(1)); + EXPECT_EQ(0u, q_->pending(2)); + EXPECT_EQ(0u, q_->pending(3)); + EXPECT_TRUE(q_->empty()); +} + +TEST_F(ScheduledQueueTest, schedule_full) +{ + constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.5}, + {Fixed16::FROM_DOUBLE, 0.5}, {Fixed16::FROM_DOUBLE, 0.5}, + {Fixed16::FROM_DOUBLE, 0.5}}; + q_.reset(new ScheduledQueue(4, ps)); + // Fills each priority band with 10 entries. + for (unsigned p = 0; p < q_->num_prio(); ++p) + { + for (unsigned i = 0; i < 10; i++) + { + add_empty(p); + } + } + + EXPECT_EQ(40u, q_->pending()); + + // Every second comes from p0 + EXPECT_EQ(0u, take_prio()); + EXPECT_EQ(1u, take_prio()); + EXPECT_EQ(0u, take_prio()); + // every fourth from p2 + EXPECT_EQ(2u, take_prio()); + + EXPECT_EQ(0u, take_prio()); + EXPECT_EQ(1u, take_prio()); + EXPECT_EQ(0u, take_prio()); + + // every eight from p3 + EXPECT_EQ(3u, take_prio()); + + EXPECT_EQ(0u, take_prio()); + EXPECT_EQ(1u, take_prio()); + EXPECT_EQ(0u, take_prio()); + EXPECT_EQ(2u, take_prio()); + + EXPECT_EQ(0u, take_prio()); + EXPECT_EQ(1u, take_prio()); + EXPECT_EQ(0u, take_prio()); + + // There is no p4, so p3 will repeat here. + EXPECT_EQ(3u, take_prio()); +} + +TEST_F(ScheduledQueueTest, statistical) +{ + constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.2}, + {Fixed16::FROM_DOUBLE, 0.2}, {Fixed16::FROM_DOUBLE, 0.5}, + {Fixed16::FROM_DOUBLE, 0.5}}; + q_.reset(new ScheduledQueue(4, ps)); + + std::vector bw_frac = run_stat_test(10000); + + // 20% of bandwidth to p0 + EXPECT_NEAR(0.2, bw_frac[0], 0.01); + // 80% * 20% = 16% of bandwidth to p1 + EXPECT_NEAR(0.16, bw_frac[1], 0.01); + // 80% * 80% * 50% = 32% of bandwidth to p2 + EXPECT_NEAR(0.32, bw_frac[2], 0.01); + // same to p3 + EXPECT_NEAR(0.32, bw_frac[3], 0.01); +} + +TEST_F(ScheduledQueueTest, statistical_skewed) +{ + constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.8}, + {Fixed16::FROM_DOUBLE, 0.8}, {Fixed16::FROM_DOUBLE, 0.8}, + {Fixed16::FROM_DOUBLE, 0.5}}; + q_.reset(new ScheduledQueue(4, ps)); + + std::vector bw_frac = run_stat_test(10000); + + // 80% of bandwidth to p0 + EXPECT_NEAR(0.8, bw_frac[0], 0.01); + // 20% * 80% = 16% of bandwidth to p1 + EXPECT_NEAR(0.16, bw_frac[1], 0.01); + // 20% * 20% * 80% = 3.2% of bandwidth to p2 + EXPECT_NEAR(0.032, bw_frac[2], 0.001); + // 20% * 20% * 20% = 0.8% of bandwidth to p2 + EXPECT_NEAR(0.008, bw_frac[3], 0.001); +} + +TEST_F(ScheduledQueueTest, schedule_with_empties) +{ + constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.4}, + {Fixed16::FROM_DOUBLE, 0.22}, {Fixed16::FROM_DOUBLE, 0.1}, + {Fixed16::FROM_DOUBLE, 1}}; + q_.reset(new ScheduledQueue(4, ps)); + + // We leave p0 empty for now + add_empty(1); + add_empty(1); + add_empty(1); + add_empty(2); + add_empty(2); + + // First nonempty is found on p1. + EXPECT_EQ(1u, take_prio()); + // The next will be sent down, finds the first nonempty on p2. + EXPECT_EQ(2u, take_prio()); + // The next token will skip p2, but find nothing more so traces back to + // take p2 again. + EXPECT_EQ(2u, take_prio()); + // Now p1 is still not scheduled to arrive but that's the only nonempty. + EXPECT_EQ(1u, take_prio()); + + // Now stocking up lower priorities will cause p1 to skip quite a few. + add_empty(2); + add_empty(2); + add_empty(2); + add_empty(2); + add_empty(2); + add_empty(2); + add_empty(2); + + // p1 is not scheduled to send + EXPECT_EQ(2u, take_prio()); + EXPECT_EQ(2u, take_prio()); + EXPECT_EQ(2u, take_prio()); + EXPECT_EQ(2u, take_prio()); + // now p1 exceeded the token threshold + EXPECT_EQ(1u, take_prio()); + + // remaining entries + EXPECT_EQ(2u, take_prio()); + EXPECT_EQ(2u, take_prio()); + EXPECT_EQ(2u, take_prio()); + + // Empty + EXPECT_EQ((unsigned)EMPTY, take_prio()); +} diff --git a/src/utils/ScheduledQueue.hxx b/src/utils/ScheduledQueue.hxx new file mode 100644 index 000000000..f6e1d412b --- /dev/null +++ b/src/utils/ScheduledQueue.hxx @@ -0,0 +1,214 @@ +/** \copyright + * Copyright (c) 2020, Balazs Racz + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * \file ScheduledQueue.hxx + * + * A Queue implementation that has a priority based scheduler on the different + * bands. + * + * @author Balazs Racz + * @date 30 Dec 2020 + */ + +#ifndef _UTILS_SCHEDULEDQUEUE_HXX_ +#define _UTILS_SCHEDULEDQUEUE_HXX_ + +#include "utils/Fixed16.hxx" +#include "utils/Queue.hxx" +#include "utils/logging.h" + +/// ScheduledQueue is a queue with multiple priorities, where each priority is +/// a FIFO list. The different priorities are polled according to a weighted +/// stride scheduler instead of in strict numerical priority order. +class ScheduledQueue : private Atomic +{ +public: + /// Constructor. + /// @param num_bands now many priority bands should there be. + /// @param strides is an array of the size num_bands. It contains the + /// stride coefficients (they should all be numbers <= 1. A value of 1 + /// degrades to strict priority order). This array is not used anymore + /// after the constructor returns. A stride of 0.2 means that 20% of all + /// tokens at this level will be allocated to this band, and 80% of the + /// tokens are passed down to lower priorities (bands with higher index). + ScheduledQueue(unsigned num_bands, const Fixed16 *strides) + : numBands_(num_bands) + , bands_(new Band[num_bands]) + { + for (unsigned i = 0; i < numBands_; i++) + { + bands_[i].stride_ = strides[i]; + bands_[i].currentToken_ -= strides[i]; + } + } + + /// Destructor. + ~ScheduledQueue() + { + delete[] bands_; + } + + /// Get an item from the queue. The returned item will be according to the + /// priority scheduler. + /// @return the member and the priority from which it came. + Result next() + { + AtomicHolder h(lock()); + return next_locked(); + } + + /// Get an item from the queue. The returned item will be according to the + /// priority scheduler. The caller must acquire the lock first. + /// @return the member and the priority from which it came. + Result next_locked() + { + if (!numPending_) + { + // Empty queue. + return Result(0, 0); + } + // Execute the priority based scheduling algorithm. + for (unsigned i = 0; i < numBands_; ++i) + { + bands_[i].currentToken_ += bands_[i].stride_; + if (bands_[i].currentToken_.trunc() >= 1) + { + Result ret = bands_[i].queue_.next_locked(); + if (ret.item) + { + ret.index = i; + --numPending_; + bands_[i].currentToken_ -= 1; + return ret; + } + else + { + // This queue has a token but is empty. We remove + // fractional tokens and keep searching onwards in the + // priorities. + bands_[i].currentToken_ = 1; + } + } + } + // Fallen off at the end. We go backwards to find any queue with + // nonempty members. + for (int i = numBands_ - 1; i >= 0; --i) + { + if (!bands_[i].queue_.empty()) + { + Result ret = bands_[i].queue_.next_locked(); + bands_[i].currentToken_ = 0; + ret.index = i; + --numPending_; + return ret; + } + } + DIE("Unexpected nonempty queue"); + return Result(0, 0); + } + + /// @return the lock to use for the _locked() functions. + Atomic *lock() + { + return this; + } + + /// Adds an entry to the queue. It will be added to the end of the given + /// priority band. + /// @param item the entry to be added to the queue. + /// @param prio which priority band to add to. 0 = highest priority. Must + /// be within 0 and numBands_ - 1. + void insert(QMember *item, unsigned prio) + { + AtomicHolder h(lock()); + return insert_locked(item, prio); + } + + /// Adds an entry to the queue. It will be added to the end of the given + /// priority band. The caller must hold lock(). + /// @param item the entry to be added to the queue. + /// @param prio which priority band to add to. 0 = highest priority. Must + /// be within 0 and numBands_ - 1. + void insert_locked(QMember *item, unsigned prio) + { + HASSERT(prio < numBands_); + ++numPending_; + bands_[prio].queue_.insert_locked(item); + } + + /// Get the number of pending items in the queue. + /// @param prio in the list to operate on + /// @return number of pending items in that priority band in the queue + size_t pending(unsigned prio) + { + HASSERT(prio < numBands_); + return bands_[prio].queue_.pending(); + }; + + /// Get the number of pending items in the queue (all bands total) + /// @return number of pending items + size_t pending() const + { + return numPending_; + } + + /// @return true if the queue is empty (on all priority bands). + bool empty() const + { + return numPending_ == 0; + } + + /// @return the number of available priority bands. + unsigned num_prio() const + { + return numBands_; + } + +private: + /// This structure contains information about one priority band. + struct Band + { + /// Holds the queue for this priority band. + Q queue_; + /// How many tokens we add each call. + Fixed16 stride_ {0}; + /// How many tokens we have right now. If this is > 1 then we will emit + /// the front of this queue, if it is < 1 then we move on to the next + /// priority item. + Fixed16 currentToken_ {1, 0}; + }; + + /// How many priority bands we have. + unsigned numBands_; + + /// How many queue entries are pending. + unsigned numPending_ {0}; + + /// The actual priority bands. + Band *bands_; +}; + +#endif // _UTILS_SCHEDULEDQUEUE_HXX_ From 62f95fc72f89dae9080acab7750315124c94a818 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Wed, 30 Dec 2020 18:48:31 +0100 Subject: [PATCH 2/3] Switches to using mutexes instead of atomic for locking. The stride algorithm could run a bit too long for an atomic holder. --- src/utils/ScheduledQueue.hxx | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/utils/ScheduledQueue.hxx b/src/utils/ScheduledQueue.hxx index f6e1d412b..f71be997d 100644 --- a/src/utils/ScheduledQueue.hxx +++ b/src/utils/ScheduledQueue.hxx @@ -36,6 +36,7 @@ #ifndef _UTILS_SCHEDULEDQUEUE_HXX_ #define _UTILS_SCHEDULEDQUEUE_HXX_ +#include "os/OS.hxx" #include "utils/Fixed16.hxx" #include "utils/Queue.hxx" #include "utils/logging.h" @@ -43,7 +44,7 @@ /// ScheduledQueue is a queue with multiple priorities, where each priority is /// a FIFO list. The different priorities are polled according to a weighted /// stride scheduler instead of in strict numerical priority order. -class ScheduledQueue : private Atomic +class ScheduledQueue { public: /// Constructor. @@ -76,7 +77,7 @@ public: /// @return the member and the priority from which it came. Result next() { - AtomicHolder h(lock()); + OSMutexLock h(lock()); return next_locked(); } @@ -131,9 +132,9 @@ public: } /// @return the lock to use for the _locked() functions. - Atomic *lock() + OSMutex *lock() { - return this; + return &lock_; } /// Adds an entry to the queue. It will be added to the end of the given @@ -143,7 +144,7 @@ public: /// be within 0 and numBands_ - 1. void insert(QMember *item, unsigned prio) { - AtomicHolder h(lock()); + OSMutexLock h(lock()); return insert_locked(item, prio); } @@ -170,19 +171,19 @@ public: /// Get the number of pending items in the queue (all bands total) /// @return number of pending items - size_t pending() const + size_t pending() const { return numPending_; } /// @return true if the queue is empty (on all priority bands). - bool empty() const + bool empty() const { return numPending_ == 0; } /// @return the number of available priority bands. - unsigned num_prio() const + unsigned num_prio() const { return numBands_; } @@ -201,6 +202,9 @@ private: Fixed16 currentToken_ {1, 0}; }; + /// Protects insert and next operations. + OSMutex lock_; + /// How many priority bands we have. unsigned numBands_; From 8a18dc6e3676b442423c3f4caa3180853b6a3e89 Mon Sep 17 00:00:00 2001 From: Balazs Racz Date: Thu, 31 Dec 2020 12:33:38 +0100 Subject: [PATCH 3/3] Adds comments about the API. --- src/utils/ScheduledQueue.hxx | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/utils/ScheduledQueue.hxx b/src/utils/ScheduledQueue.hxx index f71be997d..723d4b37a 100644 --- a/src/utils/ScheduledQueue.hxx +++ b/src/utils/ScheduledQueue.hxx @@ -82,7 +82,7 @@ public: } /// Get an item from the queue. The returned item will be according to the - /// priority scheduler. The caller must acquire the lock first. + /// priority scheduler. The caller must acquire the lock() first. /// @return the member and the priority from which it came. Result next_locked() { @@ -131,6 +131,11 @@ public: return Result(0, 0); } + /// The caller must acquire this lock before using any of the _locked() + /// functions. If the caller needs to do many operations in quick + /// succession, it might be faster to do them under a single lock, + /// i.e. acquire lock() first, then call xxx_locked() repeatedly, then + /// unlock. /// @return the lock to use for the _locked() functions. OSMutex *lock() {