Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a version of the Queue where the scheduling algorithm can be tuned by weights. #497

Merged
merged 3 commits into from
Dec 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
382 changes: 382 additions & 0 deletions src/utils/ScheduledQueue.cxxtest
Original file line number Diff line number Diff line change
@@ -0,0 +1,382 @@
#include "utils/ScheduledQueue.hxx"

#include "utils/test_main.hxx"

class ScheduledQueueTest : public ::testing::Test
{
protected:
ScheduledQueueTest()
{
entries_.resize(100);
for (unsigned i = 0; i < entries_.size(); ++i)
{
entries_[i].idx = i;
}
}

~ScheduledQueueTest()
{
if (!q_)
return;
while (!q_->empty())
{
take();
}
for (unsigned i = 0; i < nextEntry_; ++i)
{
EXPECT_TRUE(entries_[i].queued);
EXPECT_TRUE(entries_[i].done);
}
}

struct Entry : QMember
{
/// Which entry this is in the vector.
unsigned idx;
/// True if this entry was added to the queue.
bool queued {false};
/// True if this entry was returned from the queue.
bool done {false};
};

/// Adds a new entry to the queue.
/// @param prio which priority band ot add to
/// @return the index of the new member.
unsigned add_empty(unsigned prio)
{
HASSERT(nextEntry_ < entries_.size());
Entry *e = &entries_[nextEntry_++];
e->queued = true;
q_->insert(e, prio);
return e->idx;
}

/// Takes the next entry from the queue. Fills in lastIdx_ and lastPrio_.
void take()
{
auto ret = q_->next();
if (!ret.item)
{
lastIdx_ = EMPTY;
lastPrio_ = EMPTY;
}
else
{
lastPrio_ = ret.index;
if (frequency_.size() <= lastPrio_)
{
frequency_.resize(lastPrio_ + 1);
}
frequency_[lastPrio_]++;
auto *e = (Entry *)ret.item;
lastIdx_ = e->idx;
ASSERT_TRUE(e->queued);
ASSERT_FALSE(e->done);
e->done = true;
}
}

/// Takes an entry from the queue and returns its index.
unsigned take_idx()
{
take();
return lastIdx_;
}

/// Takes an entry from the queue and returns its priority.
unsigned take_prio()
{
take();
return lastPrio_;
}

/// Runs a statistical test with always full priority bands and returns the
/// percentage bandwidth allocated.
/// @param count how many iterations.
std::vector<double> run_stat_test(unsigned count)
{
// Fills each priority band with 10 entries.
for (unsigned p = 0; p < q_->num_prio(); ++p)
{
for (unsigned i = 0; i < 2; i++)
{
add_empty(p);
}
}

EXPECT_EQ(2 * q_->num_prio(), q_->pending());

for (unsigned i = 0; i < count; i++)
{
take();
// re-add the same entry.
entries_[lastIdx_].done = false;
q_->insert(&entries_[lastIdx_], lastPrio_);
}

std::vector<double> v(frequency_.begin(), frequency_.end());
for (unsigned p = 0; p < q_->num_prio(); ++p)
{
v[p] /= count;
}
return v;
}

/// Pre-allocated entries.
vector<Entry> entries_;
/// Index where to take next entry from.
unsigned nextEntry_ {0};
/// The queue object under test.
std::unique_ptr<ScheduledQueue> q_;
/// Frequency of removals. index: priority. value: count.
vector<unsigned> frequency_;

/// Index of the last returned entry.
unsigned lastIdx_ {0};
/// Priority of the last returned entry.
unsigned lastPrio_ {0};

static constexpr unsigned EMPTY = 0xffffu;
};

TEST_F(ScheduledQueueTest, create)
{
}

TEST_F(ScheduledQueueTest, empty)
{
constexpr Fixed16 ps[] = {{1, 0}};
q_.reset(new ScheduledQueue(1, ps));

EXPECT_TRUE(q_->empty());
}

TEST_F(ScheduledQueueTest, fifo)
{
constexpr Fixed16 ps[] = {{1, 0}};
q_.reset(new ScheduledQueue(1, ps));
add_empty(0);
add_empty(0);
add_empty(0);
EXPECT_EQ(0u, take_idx());
EXPECT_EQ(1u, take_idx());
add_empty(0);
EXPECT_EQ(2u, take_idx());
add_empty(0);
add_empty(0);
EXPECT_EQ(3u, take_idx());
EXPECT_EQ(4u, take_idx());

EXPECT_FALSE(q_->empty());
}

TEST_F(ScheduledQueueTest, strict_prio)
{
constexpr Fixed16 ps[] = {{1, 0}, {1, 0}};
q_.reset(new ScheduledQueue(2, ps));
add_empty(0);
add_empty(1);
add_empty(0);
add_empty(1);
add_empty(0);
add_empty(1);
// We get back first the entries from the zero priority.
EXPECT_EQ(0u, take_idx());
EXPECT_EQ(0u, lastPrio_);
EXPECT_EQ(2u, take_idx());
EXPECT_EQ(0u, lastPrio_);
EXPECT_EQ(4u, take_idx());
EXPECT_EQ(0u, lastPrio_);

// Then the entries from the zero priority.
EXPECT_EQ(1u, take_idx());
EXPECT_EQ(1u, lastPrio_);
EXPECT_EQ(3u, take_idx());
EXPECT_EQ(1u, lastPrio_);
}

TEST_F(ScheduledQueueTest, prio_pending_empty)
{
constexpr Fixed16 ps[] = {{1, 0}, {1, 0}, {1, 0}, {1, 0}};
q_.reset(new ScheduledQueue(4, ps));
add_empty(0);
add_empty(1);
add_empty(2);
add_empty(2);

// Checks empty and num pending by band
EXPECT_FALSE(q_->empty());
EXPECT_EQ(4u, q_->pending());
EXPECT_EQ(1u, q_->pending(0));
EXPECT_EQ(1u, q_->pending(1));
EXPECT_EQ(2u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));

take();

EXPECT_EQ(3u, q_->pending());
EXPECT_EQ(0u, q_->pending(0));
EXPECT_EQ(1u, q_->pending(1));
EXPECT_EQ(2u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));
EXPECT_FALSE(q_->empty());

take();

EXPECT_EQ(2u, q_->pending());
EXPECT_EQ(0u, q_->pending(0));
EXPECT_EQ(0u, q_->pending(1));
EXPECT_EQ(2u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));
EXPECT_FALSE(q_->empty());

take();

EXPECT_EQ(1u, q_->pending());
EXPECT_EQ(0u, q_->pending(0));
EXPECT_EQ(0u, q_->pending(1));
EXPECT_EQ(1u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));
EXPECT_FALSE(q_->empty());

take();

EXPECT_EQ(0u, q_->pending());
EXPECT_EQ(0u, q_->pending(0));
EXPECT_EQ(0u, q_->pending(1));
EXPECT_EQ(0u, q_->pending(2));
EXPECT_EQ(0u, q_->pending(3));
EXPECT_TRUE(q_->empty());
}

TEST_F(ScheduledQueueTest, schedule_full)
{
constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.5},
{Fixed16::FROM_DOUBLE, 0.5}, {Fixed16::FROM_DOUBLE, 0.5},
{Fixed16::FROM_DOUBLE, 0.5}};
q_.reset(new ScheduledQueue(4, ps));
// Fills each priority band with 10 entries.
for (unsigned p = 0; p < q_->num_prio(); ++p)
{
for (unsigned i = 0; i < 10; i++)
{
add_empty(p);
}
}

EXPECT_EQ(40u, q_->pending());

// Every second comes from p0
EXPECT_EQ(0u, take_prio());
EXPECT_EQ(1u, take_prio());
EXPECT_EQ(0u, take_prio());
// every fourth from p2
EXPECT_EQ(2u, take_prio());

EXPECT_EQ(0u, take_prio());
EXPECT_EQ(1u, take_prio());
EXPECT_EQ(0u, take_prio());

// every eight from p3
EXPECT_EQ(3u, take_prio());

EXPECT_EQ(0u, take_prio());
EXPECT_EQ(1u, take_prio());
EXPECT_EQ(0u, take_prio());
EXPECT_EQ(2u, take_prio());

EXPECT_EQ(0u, take_prio());
EXPECT_EQ(1u, take_prio());
EXPECT_EQ(0u, take_prio());

// There is no p4, so p3 will repeat here.
EXPECT_EQ(3u, take_prio());
}

TEST_F(ScheduledQueueTest, statistical)
{
constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.2},
{Fixed16::FROM_DOUBLE, 0.2}, {Fixed16::FROM_DOUBLE, 0.5},
{Fixed16::FROM_DOUBLE, 0.5}};
q_.reset(new ScheduledQueue(4, ps));

std::vector<double> bw_frac = run_stat_test(10000);

// 20% of bandwidth to p0
EXPECT_NEAR(0.2, bw_frac[0], 0.01);
// 80% * 20% = 16% of bandwidth to p1
EXPECT_NEAR(0.16, bw_frac[1], 0.01);
// 80% * 80% * 50% = 32% of bandwidth to p2
EXPECT_NEAR(0.32, bw_frac[2], 0.01);
// same to p3
EXPECT_NEAR(0.32, bw_frac[3], 0.01);
}

TEST_F(ScheduledQueueTest, statistical_skewed)
{
constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.8},
{Fixed16::FROM_DOUBLE, 0.8}, {Fixed16::FROM_DOUBLE, 0.8},
{Fixed16::FROM_DOUBLE, 0.5}};
q_.reset(new ScheduledQueue(4, ps));

std::vector<double> bw_frac = run_stat_test(10000);

// 80% of bandwidth to p0
EXPECT_NEAR(0.8, bw_frac[0], 0.01);
// 20% * 80% = 16% of bandwidth to p1
EXPECT_NEAR(0.16, bw_frac[1], 0.01);
// 20% * 20% * 80% = 3.2% of bandwidth to p2
EXPECT_NEAR(0.032, bw_frac[2], 0.001);
// 20% * 20% * 20% = 0.8% of bandwidth to p2
EXPECT_NEAR(0.008, bw_frac[3], 0.001);
}

TEST_F(ScheduledQueueTest, schedule_with_empties)
{
constexpr Fixed16 ps[] = {{Fixed16::FROM_DOUBLE, 0.4},
{Fixed16::FROM_DOUBLE, 0.22}, {Fixed16::FROM_DOUBLE, 0.1},
{Fixed16::FROM_DOUBLE, 1}};
q_.reset(new ScheduledQueue(4, ps));

// We leave p0 empty for now
add_empty(1);
add_empty(1);
add_empty(1);
add_empty(2);
add_empty(2);

// First nonempty is found on p1.
EXPECT_EQ(1u, take_prio());
// The next will be sent down, finds the first nonempty on p2.
EXPECT_EQ(2u, take_prio());
// The next token will skip p2, but find nothing more so traces back to
// take p2 again.
EXPECT_EQ(2u, take_prio());
// Now p1 is still not scheduled to arrive but that's the only nonempty.
EXPECT_EQ(1u, take_prio());

// Now stocking up lower priorities will cause p1 to skip quite a few.
add_empty(2);
add_empty(2);
add_empty(2);
add_empty(2);
add_empty(2);
add_empty(2);
add_empty(2);

// p1 is not scheduled to send
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());
// now p1 exceeded the token threshold
EXPECT_EQ(1u, take_prio());

// remaining entries
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());
EXPECT_EQ(2u, take_prio());

// Empty
EXPECT_EQ((unsigned)EMPTY, take_prio());
}
Loading