Skip to content

Commit

Permalink
Adds a version of the Queue where the scheduling algorithm can be tun…
Browse files Browse the repository at this point in the history
…ed by weights. (#497)

* Adds a version of the Queue where the scheduling algorithm can be tuned by weights.

* Switches to using mutexes instead of atomic for locking.
The stride algorithm could run a bit too long for an atomic
holder.

* Adds comments about the API.
  • Loading branch information
balazsracz authored Dec 31, 2020
1 parent 03b94d6 commit 1b32851
Show file tree
Hide file tree
Showing 2 changed files with 605 additions and 0 deletions.
382 changes: 382 additions & 0 deletions src/utils/ScheduledQueue.cxxtest
Original file line number Diff line number Diff line change
@@ -0,0 +1,382 @@
#include "utils/ScheduledQueue.hxx"

#include "utils/test_main.hxx"

class ScheduledQueueTest : public ::testing::Test
{
protected:
ScheduledQueueTest()
{
entries_.resize(100);
for (unsigned i = 0; i < entries_.size(); ++i)
{
entries_[i].idx = i;
}
}

~ScheduledQueueTest()
{
if (!q_)
return;
while (!q_->empty())
{
take();
}
for (unsigned i = 0; i < nextEntry_; ++i)
{
EXPECT_TRUE(entries_[i].queued);
EXPECT_TRUE(entries_[i].done);
}
}

struct Entry : QMember
{
/// Which entry this is in the vector.
unsigned idx;
/// True if this entry was added to the queue.
bool queued {false};
/// True if this entry was returned from the queue.
bool done {false};
};

/// Adds a new entry to the queue.
/// @param prio which priority band ot add to
/// @return the index of the new member.
unsigned add_empty(unsigned prio)
{
HASSERT(nextEntry_ < entries_.size());
Entry *e = &entries_[nextEntry_++];
e->queued = true;
q_->insert(e, prio);
return e->idx;
}

/// Takes the next entry from the queue. Fills in lastIdx_ and lastPrio_.
void take()
{
auto ret = q_->next();
if (!ret.item)
{
lastIdx_ = EMPTY;
lastPrio_ = EMPTY;
}
else
{
lastPrio_ = ret.index;
if (frequency_.size() <= lastPrio_)
{
frequency_.resize(lastPrio_ + 1);
}
frequency_[lastPrio_]++;
auto *e = (Entry *)ret.item;
lastIdx_ = e->idx;
ASSERT_TRUE(e->queued);
ASSERT_FALSE(e->done);
e->done = true;
}
}

/// Takes an entry from the queue and returns its index.
unsigned take_idx()
{
take();
return lastIdx_;
}

/// Takes an entry from the queue and returns its priority.
unsigned take_prio()
{
take();
return lastPrio_;
}

/// Runs a statistical test with always full priority bands and returns the
/// percentage bandwidth allocated.
/// @param count how many iterations.
std::vector<double> run_stat_test(unsigned count)
{
// Fills each priority band with 10 entries.
for (unsigned p = 0; p < q_->num_prio(); ++p)
{
for (unsigned i = 0; i < 2; i++)
{
add_empty(p);
}
}

EXPECT_EQ(2 * q_->num_prio(), q_->pending());

for (unsigned i = 0; i < count; i++)
{
take();
// re-add the same entry.
entries_[lastIdx_].done = false;
q_->insert(&entries_[lastIdx_], lastPrio_);
}

std::vector<double> v(frequency_.begin(), frequency_.end());
for (unsigned p = 0; p < q_->num_prio(); ++p)
{
v[p] /= count;
}
return v;
}

/// Pre-allocated entries.
vector<Entry> entries_;
/// Index where to take next entry from.
unsigned nextEntry_ {0};
/// The queue object under test.
std::unique_ptr<ScheduledQueue> q_;
/// Frequency of removals. index: priority. value: count.
vector<unsigned> frequency_;

/// Index of the last returned entry.
unsigned lastIdx_ {0};
/// Priority of the last returned entry.
unsigned lastPrio_ {0};

static constexpr unsigned EMPTY = 0xffffu;
};

TEST_F(ScheduledQueueTest, create)
{
}

TEST_F(ScheduledQueueTest, empty)
{
constexpr Fixed16 ps[] = {{1, 0}};
q_.reset(new ScheduledQueue(1, ps));

EXPECT_TRUE(q_->empty());
}

TEST_F(ScheduledQueueTest, fifo)
{
constexpr Fixed16 ps[] = {{1, 0}};
q_.reset(new ScheduledQueue(1, ps));
add_empty(0);
add_empty(0);
add_empty(0);
EXPECT_EQ(0u, take_idx());
EXPECT_EQ(1u, take_idx());
add_empty(0);
EXPECT_EQ(2u, take_idx());
add_empty(0);
add_empty(0);
EXPECT_EQ(3u, take_idx());
EXPECT_EQ(4u, take_idx());

EXPECT_FALSE(q_->empty());
}

TEST_F(ScheduledQueueTest, strict_prio)
{
constexpr Fixed16 ps[] = {{1, 0}, {1, 0}};
q_.reset(new ScheduledQueue(2, ps));
add_empty(0);
add_empty(1);
add_empty(0);
add_empty(1);
add_empty(0);
add_empty(1);
// We get back first the entries from the zero priority.
EXPECT_EQ(0u, take_idx());
EXPECT_EQ(0u, lastPrio_);
EXPECT_EQ(2u, take_idx());
EXPECT_EQ(0u, lastPrio_);
EXPECT_EQ(4u, take_idx());
EXPECT_EQ(0u, lastPrio_);

// Then the entries from the zero priority.
EXPECT_EQ(1u, take_idx());
EXPECT_EQ(1u, lastPrio_);
EXPECT_EQ(3u, take_idx());
EXPECT_EQ(1u, lastPrio_);
}

TEST_F(ScheduledQueueTest, prio_pending_empty)
{
constexpr Fixed16 ps[] = {{1, 0}, {1, 0}, {1, 0}, {1, 0}};
q_.reset(new ScheduledQueue(4, ps));
add_empty(0);
add_empty(1);
add_empty(2);
add_empty(2);

// Checks empty and num pending by band
EXPECT_FALSE(q_->empty());
EXPECT_EQ(4u, q_->pending());
EXPECT_EQ(1u, q_->pending(0));
EXPECT_EQ(1u, q_->pending(1));
EXPECT_EQ(2u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));

take();

EXPECT_EQ(3u, q_->pending());
EXPECT_EQ(0u, q_->pending(0));
EXPECT_EQ(1u, q_->pending(1));
EXPECT_EQ(2u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));
EXPECT_FALSE(q_->empty());

take();

EXPECT_EQ(2u, q_->pending());
EXPECT_EQ(0u, q_->pending(0));
EXPECT_EQ(0u, q_->pending(1));
EXPECT_EQ(2u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));
EXPECT_FALSE(q_->empty());

take();

EXPECT_EQ(1u, q_->pending());
EXPECT_EQ(0u, q_->pending(0));
EXPECT_EQ(0u, q_->pending(1));
EXPECT_EQ(1u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));
EXPECT_FALSE(q_->empty());

take();

EXPECT_EQ(0u, q_->pending());
EXPECT_EQ(0u, q_->pending(0));
EXPECT_EQ(0u, q_->pending(1));
EXPECT_EQ(0u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));
EXPECT_TRUE(q_->empty());
}

TEST_F(ScheduledQueueTest, schedule_full)
{
constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.5},
{Fixed16::FROM_DOUBLE, 0.5}, {Fixed16::FROM_DOUBLE, 0.5},
{Fixed16::FROM_DOUBLE, 0.5}};
q_.reset(new ScheduledQueue(4, ps));
// Fills each priority band with 10 entries.
for (unsigned p = 0; p < q_->num_prio(); ++p)
{
for (unsigned i = 0; i < 10; i++)
{
add_empty(p);
}
}

EXPECT_EQ(40u, q_->pending());

// Every second comes from p0
EXPECT_EQ(0u, take_prio());
EXPECT_EQ(1u, take_prio());
EXPECT_EQ(0u, take_prio());
// every fourth from p2
EXPECT_EQ(2u, take_prio());

EXPECT_EQ(0u, take_prio());
EXPECT_EQ(1u, take_prio());
EXPECT_EQ(0u, take_prio());

// every eight from p3
EXPECT_EQ(3u, take_prio());

EXPECT_EQ(0u, take_prio());
EXPECT_EQ(1u, take_prio());
EXPECT_EQ(0u, take_prio());
EXPECT_EQ(2u, take_prio());

EXPECT_EQ(0u, take_prio());
EXPECT_EQ(1u, take_prio());
EXPECT_EQ(0u, take_prio());

// There is no p4, so p3 will repeat here.
EXPECT_EQ(3u, take_prio());
}

TEST_F(ScheduledQueueTest, statistical)
{
constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.2},
{Fixed16::FROM_DOUBLE, 0.2}, {Fixed16::FROM_DOUBLE, 0.5},
{Fixed16::FROM_DOUBLE, 0.5}};
q_.reset(new ScheduledQueue(4, ps));

std::vector<double> bw_frac = run_stat_test(10000);

// 20% of bandwidth to p0
EXPECT_NEAR(0.2, bw_frac[0], 0.01);
// 80% * 20% = 16% of bandwidth to p1
EXPECT_NEAR(0.16, bw_frac[1], 0.01);
// 80% * 80% * 50% = 32% of bandwidth to p2
EXPECT_NEAR(0.32, bw_frac[2], 0.01);
// same to p3
EXPECT_NEAR(0.32, bw_frac[3], 0.01);
}

TEST_F(ScheduledQueueTest, statistical_skewed)
{
constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.8},
{Fixed16::FROM_DOUBLE, 0.8}, {Fixed16::FROM_DOUBLE, 0.8},
{Fixed16::FROM_DOUBLE, 0.5}};
q_.reset(new ScheduledQueue(4, ps));

std::vector<double> bw_frac = run_stat_test(10000);

// 80% of bandwidth to p0
EXPECT_NEAR(0.8, bw_frac[0], 0.01);
// 20% * 80% = 16% of bandwidth to p1
EXPECT_NEAR(0.16, bw_frac[1], 0.01);
// 20% * 20% * 80% = 3.2% of bandwidth to p2
EXPECT_NEAR(0.032, bw_frac[2], 0.001);
// 20% * 20% * 20% = 0.8% of bandwidth to p2
EXPECT_NEAR(0.008, bw_frac[3], 0.001);
}

TEST_F(ScheduledQueueTest, schedule_with_empties)
{
constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.4},
{Fixed16::FROM_DOUBLE, 0.22}, {Fixed16::FROM_DOUBLE, 0.1},
{Fixed16::FROM_DOUBLE, 1}};
q_.reset(new ScheduledQueue(4, ps));

// We leave p0 empty for now
add_empty(1);
add_empty(1);
add_empty(1);
add_empty(2);
add_empty(2);

// First nonempty is found on p1.
EXPECT_EQ(1u, take_prio());
// The next will be sent down, finds the first nonempty on p2.
EXPECT_EQ(2u, take_prio());
// The next token will skip p2, but find nothing more so traces back to
// take p2 again.
EXPECT_EQ(2u, take_prio());
// Now p1 is still not scheduled to arrive but that's the only nonempty.
EXPECT_EQ(1u, take_prio());

// Now stocking up lower priorities will cause p1 to skip quite a few.
add_empty(2);
add_empty(2);
add_empty(2);
add_empty(2);
add_empty(2);
add_empty(2);
add_empty(2);

// p1 is not scheduled to send
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());
// now p1 exceeded the token threshold
EXPECT_EQ(1u, take_prio());

// remaining entries
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());

// Empty
EXPECT_EQ((unsigned)EMPTY, take_prio());
}
Loading

2 comments on commit 1b32851

@JohnFFlanagan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unimportant, other than the fact that everything else is so beautiful.
Line 43 of the test file has "ot" where I think you want "to"

@balazsracz
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unimportant, other than the fact that everything else is so beautiful.
Line 43 of the test file has "ot" where I think you want "to"

thanks, fixed in c3e992c.

Please sign in to comment.