Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fiber executor #679

Closed
wants to merge 16 commits into from
Prev Previous commit
Next Next commit
introduce priorities
felixguendling committed Dec 2, 2024
commit c246309e069d1c96fd4c8aec23237d07507c44ac
7 changes: 4 additions & 3 deletions include/motis/scheduler/scheduler_algo.h
Original file line number Diff line number Diff line change
@@ -20,9 +20,9 @@ struct fiber_props : public boost::fibers::fiber_properties {
// has to be prioritized over new requests. Otherwise, the server only starts
// new requests and never finishes anything.
enum class type : std::uint8_t {
kWork, // follow-up work scheduled by work or I/O
kIo // initial work scheduled by I/O (web request / batch query)
} type_{type::kIo};
kHighPrio, // follow-up work scheduled by work
kLowPrio // initial work scheduled by I/O (web request / batch query)
} type_{type::kHighPrio};
};

struct work_stealing
@@ -33,6 +33,7 @@ struct work_stealing
std::uint32_t id_;
std::uint32_t thread_count_;
boost::fibers::detail::context_spinlock_queue rqueue_{};
boost::fibers::detail::context_spinlock_queue high_prio_rqueue_{};
std::mutex mtx_{};
std::condition_variable cnd_{};
bool flag_{false};
20 changes: 15 additions & 5 deletions src/scheduler/scheduler_algo.cc
Original file line number Diff line number Diff line change
@@ -35,21 +35,31 @@ work_stealing::work_stealing(std::uint32_t thread_count, bool suspend)
b.wait();
}

void work_stealing::awakened(bf::context* ctx, fiber_props&) noexcept {
void work_stealing::awakened(bf::context* ctx, fiber_props& props) noexcept {
if (!ctx->is_context(bf::type::pinned_context)) {
ctx->detach();
}
rqueue_.push(ctx);
if (props.type_ == fiber_props::type::kHighPrio) {
props.type_ = fiber_props::type::kLowPrio;
high_prio_rqueue_.push(ctx);
} else {
rqueue_.push(ctx);
}
}

bf::context* work_stealing::pick_next() noexcept {
bf::context* victim = rqueue_.pop();
if (nullptr != victim) {
bf::context* victim = nullptr;
if (victim = high_prio_rqueue_.pop(); nullptr != victim) {
boost::context::detail::prefetch_range(victim, sizeof(bf::context));
if (!victim->is_context(bf::type::pinned_context)) {
bf::context::active()->attach(victim);
}
} else {
} else if (victim = rqueue_.pop(); nullptr != victim) {
boost::context::detail::prefetch_range(victim, sizeof(bf::context));
if (!victim->is_context(bf::type::pinned_context)) {
bf::context::active()->attach(victim);
}
} else if (thread_count_ > 1U) {
std::uint32_t id = 0;
std::size_t count = 0, size = schedulers_.size();
static thread_local std::minstd_rand generator{std::random_device{}()};
2 changes: 1 addition & 1 deletion src/server.cc
Original file line number Diff line number Diff line change
@@ -129,6 +129,7 @@ int server(data d, config const& c) {

auto const stop = net::stop_handler(ioc, [&]() {
fmt::println("shutdown");
r.ch_.close();
s.stop();
ioc.stop();

@@ -144,7 +145,6 @@ int server(data d, config const& c) {
server_config.host_, server_config.port_, server_config.port_);
net::run(ioc)();

r.ch_.close();
for (auto& t : threads) {
t.join();
}