diff --git a/clients/drcachesim/analyzer_multi.cpp b/clients/drcachesim/analyzer_multi.cpp index 509954f0d9d..90bd8290e86 100644 --- a/clients/drcachesim/analyzer_multi.cpp +++ b/clients/drcachesim/analyzer_multi.cpp @@ -564,6 +564,9 @@ analyzer_multi_tmpl_t::init_dynamic_schedule() sched_ops.blocking_switch_threshold = op_sched_blocking_switch_us.get_value(); sched_ops.block_time_multiplier = op_sched_block_scale.get_value(); sched_ops.block_time_max_us = op_sched_block_max_us.get_value(); + sched_ops.migration_threshold_us = op_sched_migration_threshold_us.get_value(); + sched_ops.rebalance_period_us = op_sched_rebalance_period_us.get_value(); + sched_ops.time_units_per_us = op_sched_time_units_per_us.get_value(); sched_ops.randomize_next_input = op_sched_randomize.get_value(); sched_ops.honor_direct_switches = !op_sched_disable_direct_switches.get_value(); #ifdef HAS_ZIP diff --git a/clients/drcachesim/common/memtrace_stream.h b/clients/drcachesim/common/memtrace_stream.h index b763f616e7b..9adbf95da5b 100644 --- a/clients/drcachesim/common/memtrace_stream.h +++ b/clients/drcachesim/common/memtrace_stream.h @@ -98,6 +98,15 @@ class memtrace_stream_t { * i.e., the number of input migrations to this core. */ SCHED_STAT_MIGRATIONS, + /** + * Counts the number of times this output's runqueue became empty and it took + * work from another output's runqueue. + */ + SCHED_STAT_RUNQUEUE_STEALS, + /** + * Counts the number of output runqueue rebalances triggered by this output. + */ + SCHED_STAT_RUNQUEUE_REBALANCES, /** Count of statistic types. */ SCHED_STAT_TYPE_COUNT, }; diff --git a/clients/drcachesim/common/options.cpp b/clients/drcachesim/common/options.cpp index 1adad4168dd..abf5a2d4674 100644 --- a/clients/drcachesim/common/options.cpp +++ b/clients/drcachesim/common/options.cpp @@ -1002,6 +1002,18 @@ droption_t op_sched_time_units_per_us( "of the -sched_*_us values as it converts wall-clock time into the simulated " "microseconds measured by those options."); +droption_t op_sched_migration_threshold_us( + DROPTION_SCOPE_ALL, "sched_migration_threshold_us", 500, + "Time in simulated microseconds before an input can be migrated across cores", + "The minimum time in simulated microseconds that must have elapsed since an input " + "last ran on a core before it can be migrated to another core."); + +droption_t op_sched_rebalance_period_us( + DROPTION_SCOPE_ALL, "sched_rebalance_period_us", 1500000, + "Period in microseconds at which core run queues are load-balanced", + "The period in simulated microseconds at which per-core run queues are re-balanced " + "to redistribute load."); + // Schedule_stats options. droption_t op_schedule_stats_print_every(DROPTION_SCOPE_ALL, "schedule_stats_print_every", diff --git a/clients/drcachesim/common/options.h b/clients/drcachesim/common/options.h index 664594e57f0..501707defc9 100644 --- a/clients/drcachesim/common/options.h +++ b/clients/drcachesim/common/options.h @@ -215,6 +215,9 @@ extern dynamorio::droption::droption_t op_cpu_schedule_file; extern dynamorio::droption::droption_t op_sched_switch_file; extern dynamorio::droption::droption_t op_sched_randomize; extern dynamorio::droption::droption_t op_sched_disable_direct_switches; +extern dynamorio::droption::droption_t op_sched_migration_threshold_us; +extern dynamorio::droption::droption_t op_sched_rebalance_period_us; +extern dynamorio::droption::droption_t op_sched_time_units_per_us; extern dynamorio::droption::droption_t op_schedule_stats_print_every; extern dynamorio::droption::droption_t op_syscall_template_file; extern dynamorio::droption::droption_t op_filter_stop_timestamp; diff --git a/clients/drcachesim/scheduler/flexible_queue.h b/clients/drcachesim/scheduler/flexible_queue.h index 897e21ae8dc..43518ea043a 100644 --- a/clients/drcachesim/scheduler/flexible_queue.h +++ b/clients/drcachesim/scheduler/flexible_queue.h @@ -108,6 +108,15 @@ class flexible_queue_t { return entries_[rand_gen_() % size()]; // Undefined if empty. } + // Returns an entry from the back -- or at least not from the front; it's not + // guaranteed to be the lowest priority, just not the highest. + T + back() + { + assert(!empty()); + return entries_.back(); + } + bool empty() const { diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index 331df93b419..bab964080e2 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -586,7 +587,7 @@ scheduler_tmpl_t::stream_t::next_record(RecordType &reco ++cur_ref_count_; if (scheduler_->record_type_is_instr_boundary(record, prev_record_)) ++cur_instr_count_; - VPRINT(scheduler_, 4, + VPRINT(scheduler_, 5, "stream record#=%" PRId64 ", instr#=%" PRId64 " (cur input %" PRId64 " record#=%" PRId64 ", instr#=%" PRId64 ")\n", cur_ref_count_, cur_instr_count_, input->tid, @@ -671,28 +672,38 @@ scheduler_tmpl_t::~scheduler_tmpl_t() { for (unsigned int i = 0; i < outputs_.size(); ++i) { VPRINT(this, 1, "Stats for output #%d\n", i); - VPRINT(this, 1, " %-25s: %9" PRId64 "\n", "Switch input->input", + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Switch input->input", outputs_[i].stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_INPUT]); - VPRINT(this, 1, " %-25s: %9" PRId64 "\n", "Switch input->idle", + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Switch input->idle", outputs_[i].stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_IDLE]); - VPRINT(this, 1, " %-25s: %9" PRId64 "\n", "Switch idle->input", + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Switch idle->input", outputs_[i].stats[memtrace_stream_t::SCHED_STAT_SWITCH_IDLE_TO_INPUT]); - VPRINT(this, 1, " %-25s: %9" PRId64 "\n", "Switch nop", + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Switch nop", outputs_[i].stats[memtrace_stream_t::SCHED_STAT_SWITCH_NOP]); - VPRINT(this, 1, " %-25s: %9" PRId64 "\n", "Quantum preempts", + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Quantum preempts", outputs_[i].stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS]); - VPRINT(this, 1, " %-25s: %9" PRId64 "\n", "Direct switch attempts", + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Direct switch attempts", outputs_[i].stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_ATTEMPTS]); - VPRINT(this, 1, " %-25s: %9" PRId64 "\n", "Direct switch successes", + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Direct switch successes", outputs_[i].stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]); - VPRINT(this, 1, " %-25s: %9" PRId64 "\n", "Migrations", + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Migrations", outputs_[i].stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]); + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Runqueue steals", + outputs_[i].stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS]); + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Runqueue rebalances", + outputs_[i].stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_REBALANCES]); +#ifndef NDEBUG + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Runqueue lock acquired", + outputs_[i].ready_queue.lock->get_count_acquired()); + VPRINT(this, 1, " %-35s: %9" PRId64 "\n", "Runqueue lock contended", + outputs_[i].ready_queue.lock->get_count_contended()); +#endif } #ifndef NDEBUG - VPRINT(this, 1, "%-27s: %9" PRId64 "\n", "Schedule lock acquired", - sched_lock_.get_count_acquired()); - VPRINT(this, 1, "%-27s: %9" PRId64 "\n", "Schedule lock contended", - sched_lock_.get_count_contended()); + VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock acquired", + unscheduled_priority_.lock->get_count_acquired()); + VPRINT(this, 1, "%-37s: %9" PRId64 "\n", "Unscheduled queue lock contended", + unscheduled_priority_.lock->get_count_contended()); #endif } @@ -872,6 +883,7 @@ scheduler_tmpl_t::init( ? spec_type_t::USE_NOPS // TODO i#5843: Add more flags for other options. : spec_type_t::LAST_FROM_TRACE, + static_cast(get_time_micros()), create_invalid_record(), verbosity_); if (options_.single_lockstep_output) outputs_.back().stream = global_stream_.get(); @@ -961,8 +973,6 @@ typename scheduler_tmpl_t::scheduler_status_t scheduler_tmpl_t::set_initial_schedule( std::unordered_map> &workload2inputs) { - bool need_lock; - auto scoped_lock = acquire_scoped_sched_lock_if_necessary(need_lock); // Determine whether we need to read ahead in the inputs. There are cases where we // do not want to do that as it would block forever if the inputs are not available // (e.g., online analysis IPC readers); it also complicates ordinals so we avoid it @@ -1085,24 +1095,46 @@ scheduler_tmpl_t::set_initial_schedule( // We'll pick the starting inputs below by sorting by relative time from // each workload's base_timestamp, which our queue does for us. } - // We need to honor output bindings and possibly time ordering, which our queue - // does for us. We want the rest of the inputs in the queue in any case so it is - // simplest to insert all and remove the first N. + // First, put all inputs into a temporary queue to sort by priority and + // time for us. + flexible_queue_t allq; for (int i = 0; i < static_cast(inputs_.size()); ++i) { - add_to_ready_queue(&inputs_[i]); + inputs_[i].queue_counter = i; + allq.push(&inputs_[i]); + } + // Now assign round-robin to the outputs. We have to obey bindings here: we + // just take the first. This isn't guaranteed to be perfect if there are + // many bindings, but we run a rebalancing afterward. + output_ordinal_t output = 0; + while (!allq.empty()) { + input_info_t *input = allq.top(); + allq.pop(); + output_ordinal_t target = output; + if (!input->binding.empty()) + target = *input->binding.begin(); + else + output = (output + 1) % outputs_.size(); + add_to_ready_queue(target, input); + } + sched_type_t::stream_status_t status = rebalance_queues(0, {}); + if (status != STATUS_OK) { + VPRINT(this, 0, "Failed to rebalance with status %d\n", status); + return STATUS_ERROR_INVALID_PARAMETER; } for (int i = 0; i < static_cast(outputs_.size()); ++i) { input_info_t *queue_next; #ifndef NDEBUG - sched_type_t::stream_status_t status = + status = #endif - pop_from_ready_queue(i, queue_next); + pop_from_ready_queue(i, i, queue_next); assert(status == STATUS_OK || status == STATUS_IDLE); if (queue_next == nullptr) set_cur_input(i, INVALID_INPUT_ORDINAL); else set_cur_input(i, queue_next->index); } + VPRINT(this, 2, "Initial queues:\n"); + VDO(this, 2, { print_queue_stats(); }); } return STATUS_SUCCESS; } @@ -1124,8 +1156,8 @@ scheduler_tmpl_t::write_recorded_schedule() { if (options_.schedule_record_ostream == nullptr) return STATUS_ERROR_INVALID_PARAMETER; - std::lock_guard guard(sched_lock_); for (int i = 0; i < static_cast(outputs_.size()); ++i) { + auto lock = acquire_scoped_output_lock_if_necessary(i); sched_type_t::stream_status_t status = record_schedule_segment(i, schedule_record_t::FOOTER, 0, 0, 0); if (status != sched_type_t::STATUS_OK) @@ -2219,9 +2251,10 @@ scheduler_tmpl_t::advance_region_of_interest( VPRINT(this, 2, "at %" PRId64 " instrs: advancing to ROI #%d\n", cur_instr, input.cur_region); if (input.cur_region >= static_cast(input.regions_of_interest.size())) { - if (input.at_eof) - return eof_or_idle(output, /*hold_sched_lock=*/false, input.index); - else { + if (input.at_eof) { + // XXX: We're holding input.lock which is ok during eof_or_idle. + return eof_or_idle(output, input.index); + } else { // We let the user know we're done. if (options_.schedule_record_ostream != nullptr) { sched_type_t::stream_status_t status = @@ -2405,7 +2438,7 @@ template uint64_t scheduler_tmpl_t::get_output_time(output_ordinal_t output) { - return outputs_[output].cur_time; + return outputs_[output].cur_time->load(std::memory_order_acquire); } template @@ -2494,85 +2527,147 @@ scheduler_tmpl_t::close_schedule_segment(output_ordinal_ template bool -scheduler_tmpl_t::ready_queue_empty() +scheduler_tmpl_t::ready_queue_empty(output_ordinal_t output) { - assert(!need_sched_lock() || sched_lock_.owned_by_cur_thread()); - return ready_priority_.empty(); + auto lock = acquire_scoped_output_lock_if_necessary(output); + return outputs_[output].ready_queue.queue.empty(); } template void scheduler_tmpl_t::add_to_unscheduled_queue(input_info_t *input) { - assert(!need_sched_lock() || sched_lock_.owned_by_cur_thread()); + assert(input->lock->owned_by_cur_thread()); + std::lock_guard unsched_lock(*unscheduled_priority_.lock); assert(input->unscheduled && input->blocked_time == 0); // Else should be in regular queue. VPRINT(this, 4, "add_to_unscheduled_queue (pre-size %zu): input %d priority %d\n", - unscheduled_priority_.size(), input->index, input->priority); - input->queue_counter = ++unscheduled_counter_; - unscheduled_priority_.push(input); + unscheduled_priority_.queue.size(), input->index, input->priority); + input->queue_counter = ++unscheduled_priority_.fifo_counter; + unscheduled_priority_.queue.push(input); + input->prev_output = input->containing_output; + input->containing_output = INVALID_INPUT_ORDINAL; } +// NOCHECK get all callers to hold input lock template void -scheduler_tmpl_t::add_to_ready_queue(input_info_t *input) +scheduler_tmpl_t::add_to_ready_queue_hold_locks( + output_ordinal_t output, input_info_t *input) { - assert(!need_sched_lock() || sched_lock_.owned_by_cur_thread()); + assert(input->lock->owned_by_cur_thread()); + assert(!need_output_lock() || + outputs_[output].ready_queue.lock->owned_by_cur_thread()); if (input->unscheduled && input->blocked_time == 0) { + // Ensure we get prev_output set for start-unscheduled so they won't + // all resume on output #0 but rather on the initial round-robin assigment. + input->containing_output = output; add_to_unscheduled_queue(input); return; } + assert(input->binding.empty() || input->binding.find(output) != input->binding.end()); VPRINT( this, 4, "add_to_ready_queue (pre-size %zu): input %d priority %d timestamp delta %" PRIu64 " block time %" PRIu64 " start time %" PRIu64 "\n", - ready_priority_.size(), input->index, input->priority, + outputs_[output].ready_queue.queue.size(), input->index, input->priority, input->reader->get_last_timestamp() - input->base_timestamp, input->blocked_time, input->blocked_start_time); if (input->blocked_time > 0) - ++num_blocked_; - input->queue_counter = ++ready_counter_; - ready_priority_.push(input); + ++outputs_[output].ready_queue.num_blocked; + input->queue_counter = ++outputs_[output].ready_queue.fifo_counter; + outputs_[output].ready_queue.queue.push(input); + input->containing_output = output; +} + +template +void +scheduler_tmpl_t::add_to_ready_queue(output_ordinal_t output, + input_info_t *input) +{ + auto scoped_lock = acquire_scoped_output_lock_if_necessary(output); + std::lock_guard input_lock(*input->lock); + add_to_ready_queue_hold_locks(output, input); } template typename scheduler_tmpl_t::stream_status_t -scheduler_tmpl_t::pop_from_ready_queue( - output_ordinal_t for_output, input_info_t *&new_input) +scheduler_tmpl_t::pop_from_ready_queue_hold_locks( + output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input, + bool from_back) { - assert(!need_sched_lock() || sched_lock_.owned_by_cur_thread()); + assert(!need_output_lock() || + (outputs_[from_output].ready_queue.lock->owned_by_cur_thread() && + (from_output == for_output || for_output == INVALID_OUTPUT_ORDINAL || + outputs_[for_output].ready_queue.lock->owned_by_cur_thread()))); std::set skipped; std::set blocked; input_info_t *res = nullptr; sched_type_t::stream_status_t status = STATUS_OK; - uint64_t cur_time = (num_blocked_ > 0) ? get_output_time(for_output) : 0; - while (!ready_priority_.empty()) { - if (options_.randomize_next_input) { - res = ready_priority_.get_random_entry(); - ready_priority_.erase(res); + uint64_t cur_time = get_output_time(from_output); + while (!outputs_[from_output].ready_queue.queue.empty()) { + if (from_back) { + res = outputs_[from_output].ready_queue.queue.back(); + outputs_[from_output].ready_queue.queue.erase(res); + } else if (options_.randomize_next_input) { + res = outputs_[from_output].ready_queue.queue.get_random_entry(); + outputs_[from_output].ready_queue.queue.erase(res); } else { - res = ready_priority_.top(); - ready_priority_.pop(); + res = outputs_[from_output].ready_queue.queue.top(); + outputs_[from_output].ready_queue.queue.pop(); } assert(!res->unscheduled || res->blocked_time > 0); // Should be in unscheduled_priority_. - if (res->binding.empty() || res->binding.find(for_output) != res->binding.end()) { + if (res->binding.empty() || for_output == INVALID_OUTPUT_ORDINAL || + res->binding.find(for_output) != res->binding.end()) { // For blocked inputs, as we don't have interrupts or other regular // control points we only check for being unblocked when an input // would be chosen to run. We thus keep blocked inputs in the ready queue. if (res->blocked_time > 0) { assert(cur_time > 0); - --num_blocked_; + --outputs_[from_output].ready_queue.num_blocked; } if (res->blocked_time > 0 && + // XXX i#6966: We have seen wall-clock time go backward, which + // underflows here and then always unblocks the input. cur_time - res->blocked_start_time < res->blocked_time) { VPRINT(this, 4, "pop queue: %d still blocked for %" PRIu64 "\n", res->index, res->blocked_time - (cur_time - res->blocked_start_time)); // We keep searching for a suitable input. blocked.insert(res); - } else - break; + } else { + // This input is no longer blocked. + res->blocked_time = 0; + // We've found a candidate. One final check if this is a migration. + bool found_candidate = false; + if (from_output == for_output) + found_candidate = true; + else { + assert(cur_time > 0 || res->last_run_time == 0); + VPRINT(this, 5, + "migration check %d to %d: cur=%" PRIu64 " last=%" PRIu64 + " delta=%" PRId64 " vs thresh %" PRIu64 "\n", + from_output, for_output, cur_time, res->last_run_time, + cur_time - res->last_run_time, + options_.migration_threshold_us); + // Guard against time going backward, which happens: i#6966. + if (options_.migration_threshold_us == 0 || res->last_run_time == 0 || + (cur_time > res->last_run_time && + cur_time - res->last_run_time >= + static_cast(options_.migration_threshold_us * + options_.time_units_per_us))) { + VPRINT(this, 2, "migrating %d to %d\n", from_output, for_output); + found_candidate = true; + ++outputs_[from_output] + .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; + } + } + if (found_candidate) + break; + else + skipped.insert(res); + } } else { // We keep searching for a suitable input. skipped.insert(res); @@ -2587,34 +2682,75 @@ scheduler_tmpl_t::pop_from_ready_queue( // Re-add the ones we skipped, but without changing their counters so we preserve // the prior FIFO order. for (input_info_t *save : skipped) - ready_priority_.push(save); + outputs_[from_output].ready_queue.queue.push(save); // Re-add the blocked ones to the back. - for (input_info_t *save : blocked) - add_to_ready_queue(save); + for (input_info_t *save : blocked) { + std::lock_guard input_lock(*save->lock); + add_to_ready_queue_hold_locks(from_output, save); + } VDO(this, 1, { - static int heartbeat; + static int output_heartbeat; // We are ok with races as the cadence is approximate. - if (++heartbeat % 2000 == 0) { + if (++output_heartbeat % 2000 == 0) { + size_t unsched_size = 0; + { + std::lock_guard unsched_lock( + *unscheduled_priority_.lock); + unsched_size = unscheduled_priority_.queue.size(); + } VPRINT(this, 1, "heartbeat[%d] %zd in queue; %d blocked; %zd unscheduled => %d %d\n", - for_output, ready_priority_.size(), num_blocked_, - unscheduled_priority_.size(), res == nullptr ? -1 : res->index, - status); + from_output, outputs_[from_output].ready_queue.queue.size(), + outputs_[from_output].ready_queue.num_blocked, unsched_size, + res == nullptr ? -1 : res->index, status); } }); if (res != nullptr) { VPRINT(this, 4, "pop_from_ready_queue[%d] (post-size %zu): input %d priority %d timestamp " "delta %" PRIu64 "\n", - for_output, ready_priority_.size(), res->index, res->priority, - res->reader->get_last_timestamp() - res->base_timestamp); - res->blocked_time = 0; + from_output, outputs_[from_output].ready_queue.queue.size(), res->index, + res->priority, res->reader->get_last_timestamp() - res->base_timestamp); res->unscheduled = false; + res->prev_output = res->containing_output; + res->containing_output = for_output; } new_input = res; return status; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_tmpl_t::pop_from_ready_queue( + output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input) +{ + sched_type_t::stream_status_t status = sched_type_t::STATUS_OK; + { + std::unique_lock from_lock; + std::unique_lock for_lock; + // If we need both locks, acquire in increasing output order to avoid deadlocks if + // two outputs try to steal from each other. + if (from_output == for_output || for_output == INVALID_OUTPUT_ORDINAL) { + from_lock = acquire_scoped_output_lock_if_necessary(from_output); + } else if (from_output < for_output) { + from_lock = acquire_scoped_output_lock_if_necessary(from_output); + for_lock = acquire_scoped_output_lock_if_necessary(for_output); + } else { + for_lock = acquire_scoped_output_lock_if_necessary(for_output); + from_lock = acquire_scoped_output_lock_if_necessary(from_output); + } + status = pop_from_ready_queue_hold_locks(from_output, for_output, new_input); + } + VDO(this, 1, { + static int global_heartbeat; + // We are ok with races as the cadence is approximate. + if (++global_heartbeat % 100000 == 0) { + print_queue_stats(); + } + }); + return status; +} + template uint64_t scheduler_tmpl_t::scale_blocked_time(uint64_t initial_time) const @@ -2668,10 +2804,9 @@ typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::set_cur_input(output_ordinal_t output, input_ordinal_t input) { - assert(!need_sched_lock() || sched_lock_.owned_by_cur_thread()); - // XXX i#5843: Merge tracking of current inputs with ready_priority_ to better manage - // the possible 3 states of each input (a live cur_input for an output stream, in - // the ready_queue_, or at EOF) (4 states once we add i/o wait times). + // XXX i#5843: Merge tracking of current inputs with ready_queue.queue to better + // manage the possible 3 states of each input (a live cur_input for an output stream, + // in the ready_queue_, or at EOF) (4 states once we add i/o wait times). assert(output >= 0 && output < static_cast(outputs_.size())); // 'input' might be INVALID_INPUT_ORDINAL. assert(input < static_cast(inputs_.size())); @@ -2679,15 +2814,19 @@ scheduler_tmpl_t::set_cur_input(output_ordinal_t output, if (prev_input >= 0) { if (options_.mapping == MAP_TO_ANY_OUTPUT && prev_input != input && !inputs_[prev_input].at_eof) { - add_to_ready_queue(&inputs_[prev_input]); + add_to_ready_queue(output, &inputs_[prev_input]); } - if (prev_input != input && options_.schedule_record_ostream != nullptr) { + if (prev_input != input) { input_info_t &prev_info = inputs_[prev_input]; std::lock_guard lock(*prev_info.lock); - sched_type_t::stream_status_t status = - close_schedule_segment(output, prev_info); - if (status != sched_type_t::STATUS_OK) - return status; + prev_info.cur_output = INVALID_OUTPUT_ORDINAL; + prev_info.last_run_time = get_output_time(output); + if (options_.schedule_record_ostream != nullptr) { + sched_type_t::stream_status_t status = + close_schedule_segment(output, prev_info); + if (status != sched_type_t::STATUS_OK) + return status; + } } } else if (options_.schedule_record_ostream != nullptr && outputs_[output].record.back().type == schedule_record_t::IDLE) { @@ -2712,13 +2851,8 @@ scheduler_tmpl_t::set_cur_input(output_ordinal_t output, std::lock_guard lock(*inputs_[input].lock); - if (inputs_[input].prev_output != INVALID_OUTPUT_ORDINAL && - inputs_[input].prev_output != output) { - VPRINT(this, 3, "output[%d] migrating input %d from output %d\n", output, input, - inputs_[input].prev_output); - ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; - } - inputs_[input].prev_output = output; + inputs_[input].cur_output = output; + inputs_[input].containing_output = output; if (prev_input < 0 && outputs_[output].stream->version_ == 0) { // Set the version and filetype up front, to let the user query at init time @@ -2766,7 +2900,8 @@ scheduler_tmpl_t::set_cur_input(output_ordinal_t output, } } - inputs_[input].prev_time_in_quantum = outputs_[output].cur_time; + inputs_[input].prev_time_in_quantum = + outputs_[output].cur_time->load(std::memory_order_acquire); if (options_.schedule_record_ostream != nullptr) { uint64_t instr_ord = get_instr_ordinal(inputs_[input]); @@ -2796,14 +2931,13 @@ typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::pick_next_input_as_previously( output_ordinal_t output, input_ordinal_t &index) { - assert(!need_sched_lock() || sched_lock_.owned_by_cur_thread()); if (outputs_[output].record_index + 1 >= static_cast(outputs_[output].record.size())) { if (!outputs_[output].at_eof) { outputs_[output].at_eof = true; live_replay_output_count_.fetch_add(-1, std::memory_order_release); } - return eof_or_idle(output, need_sched_lock(), outputs_[output].cur_input); + return eof_or_idle(output, outputs_[output].cur_input); } const schedule_record_t &segment = outputs_[output].record[outputs_[output].record_index + 1]; @@ -2924,19 +3058,19 @@ scheduler_tmpl_t::pick_next_input_as_previously( template bool -scheduler_tmpl_t::need_sched_lock() +scheduler_tmpl_t::need_output_lock() { return options_.mapping == MAP_TO_ANY_OUTPUT || options_.mapping == MAP_AS_PREVIOUSLY; } template std::unique_lock -scheduler_tmpl_t::acquire_scoped_sched_lock_if_necessary( - bool &need_lock) +scheduler_tmpl_t::acquire_scoped_output_lock_if_necessary( + output_ordinal_t output) { - need_lock = need_sched_lock(); - auto scoped_lock = need_lock ? std::unique_lock(sched_lock_) - : std::unique_lock(); + auto scoped_lock = need_output_lock() + ? std::unique_lock(*outputs_[output].ready_queue.lock) + : std::unique_lock(); return scoped_lock; } @@ -2945,10 +3079,9 @@ typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::pick_next_input(output_ordinal_t output, uint64_t blocked_time) { + sched_type_t::stream_status_t res = sched_type_t::STATUS_OK; - bool need_lock; - auto scoped_lock = acquire_scoped_sched_lock_if_necessary(need_lock); - input_ordinal_t prev_index = outputs_[output].cur_input; + const input_ordinal_t prev_index = outputs_[output].cur_input; input_ordinal_t index = INVALID_INPUT_ORDINAL; int iters = 0; while (true) { @@ -2983,6 +3116,29 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu if (res != sched_type_t::STATUS_OK) return res; } else if (options_.mapping == MAP_TO_ANY_OUTPUT) { + uint64_t cur_time = get_output_time(output); + uint64_t last_time = last_rebalance_time_.load(std::memory_order_acquire); + if (last_time == 0) { + // Initialize. + last_rebalance_time_.store(cur_time, std::memory_order_release); + } else { + // Guard against time going backward, which happens: i#6966. + if (cur_time > last_time && + cur_time - last_time >= + static_cast(options_.rebalance_period_us * + options_.time_units_per_us) && + rebalancer_.load(std::memory_order_acquire) == + std::thread::id()) { + VPRINT(this, 2, + "Output %d hit rebalance period @%" PRIu64 + " (last rebalance @%" PRIu64 ")\n", + output, cur_time, last_time); + sched_type_t::stream_status_t status = + rebalance_queues(output, {}); + if (status != STATUS_OK) + return status; + } + } if (blocked_time > 0 && prev_index != INVALID_INPUT_ORDINAL) { std::lock_guard lock(*inputs_[prev_index].lock); if (inputs_[prev_index].blocked_time == 0) { @@ -2996,37 +3152,56 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu inputs_[prev_index].switch_to_input != INVALID_INPUT_ORDINAL) { input_info_t *target = &inputs_[inputs_[prev_index].switch_to_input]; inputs_[prev_index].switch_to_input = INVALID_INPUT_ORDINAL; - std::lock_guard lock(*target->lock); + std::unique_lock target_input_lock(*target->lock); // XXX i#5843: Add an invariant check that the next timestamp of the // target is later than the pre-switch-syscall timestamp? - if (ready_priority_.find(target)) { - VPRINT(this, 2, - "next_record[%d]: direct switch from input %d to input %d " - "@%" PRIu64 "\n", - output, prev_index, target->index, - inputs_[prev_index].reader->get_last_timestamp()); - ready_priority_.erase(target); - index = target->index; - // Erase any remaining wait time for the target. - if (target->blocked_time > 0) { - VPRINT(this, 3, - "next_record[%d]: direct switch erasing blocked time " - "for input %d\n", - output, target->index); - --num_blocked_; - target->blocked_time = 0; - target->unscheduled = false; - } - if (target->prev_output != INVALID_OUTPUT_ORDINAL && - target->prev_output != output) { - ++outputs_[output] - .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; + if (target->containing_output != INVALID_OUTPUT_ORDINAL) { + output_ordinal_t target_output = target->containing_output; + output_info_t &out = outputs_[target->containing_output]; + // We cannot hold an input lock when we acquire an output lock. + target_input_lock.unlock(); + { + auto target_output_lock = + acquire_scoped_output_lock_if_necessary(target_output); + target_input_lock.lock(); + if (out.ready_queue.queue.find(target)) { + VPRINT(this, 2, + "next_record[%d]: direct switch from input %d to " + "input %d " + "@%" PRIu64 "\n", + output, prev_index, target->index, + inputs_[prev_index].reader->get_last_timestamp()); + out.ready_queue.queue.erase(target); + index = target->index; + // Erase any remaining wait time for the target. + if (target->blocked_time > 0) { + VPRINT(this, 3, + "next_record[%d]: direct switch erasing " + "blocked time " + "for input %d\n", + output, target->index); + --out.ready_queue.num_blocked; + target->blocked_time = 0; + target->unscheduled = false; + } + if (target->containing_output != output) { + ++outputs_[output].stats + [memtrace_stream_t::SCHED_STAT_MIGRATIONS]; + } + ++outputs_[output] + .stats[memtrace_stream_t:: + SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; + } // Else, actively running. + target_input_lock.unlock(); } - ++outputs_[output].stats - [memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; - } else if (unscheduled_priority_.find(target)) { + target_input_lock.lock(); + } + std::lock_guard unsched_lock( + *unscheduled_priority_.lock); + if (index == INVALID_INPUT_ORDINAL && + unscheduled_priority_.queue.find(target)) { target->unscheduled = false; - unscheduled_priority_.erase(target); + unscheduled_priority_.queue.erase(target); index = target->index; VPRINT(this, 2, "next_record[%d]: direct switch from input %d to " @@ -3041,7 +3216,8 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu } ++outputs_[output].stats [memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; - } else { + } + if (index == INVALID_INPUT_ORDINAL) { // We assume that inter-input dependencies are captured in // the _DIRECT_THREAD_SWITCH, _UNSCHEDULE, and _SCHEDULE markers // and that if a switch request targets a thread running elsewhere @@ -3049,7 +3225,7 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu // dynamic switch to whoever happens to be available (and // different timing between tracing and analysis has caused this // miss). - VPRINT(this, 1, + VPRINT(this, 2, "Direct switch (from %d) target input #%d is running " "elsewhere; picking a different target @%" PRIu64 "\n", prev_index, target->index, @@ -3062,24 +3238,47 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu } if (index != INVALID_INPUT_ORDINAL) { // We found a direct switch target above. - } else if (ready_queue_empty() && blocked_time == 0) { - if (prev_index == INVALID_INPUT_ORDINAL) - return eof_or_idle(output, need_lock, prev_index); - auto lock = - std::unique_lock(*inputs_[prev_index].lock); - // If we can't go back to the current input, we're EOF or idle. - // TODO i#6959: We should go the EOF/idle route if - // inputs_[prev_index].unscheduled as otherwise we're ignoring its - // unscheduled transition: although if there are no other threads at - // all (not just an empty queue) this turns into the eof_or_idle() - // all-unscheduled scenario. Once we have some kind of early exit - // option we'll add the unscheduled check here. - if (inputs_[prev_index].at_eof) { - lock.unlock(); - return eof_or_idle(output, need_lock, prev_index); - } else - index = prev_index; // Go back to prior. + } + // XXX: We're grabbing the output ready_queue lock 3x here: + // ready_queue_empty(), set_cur_input()'s add_to_ready_queue(), + // and pop_from_ready_queue(). We could call versions of those + // that let the caller hold the lock: but holding it across other + // calls in between here adds complexity. + else if (ready_queue_empty(output) && blocked_time == 0) { + // There's nothing else to run so either stick with the + // current input or if it's invalid go idle/eof. + if (prev_index == INVALID_INPUT_ORDINAL) { + sched_type_t::stream_status_t status = + eof_or_idle(output, prev_index); + if (status != STATUS_STOLE) + return status; + // eof_or_idle stole an input for us, now in .cur_input. + index = outputs_[output].cur_input; + res = STATUS_OK; + } else { + auto lock = + std::unique_lock(*inputs_[prev_index].lock); + // If we can't go back to the current input, we're EOF or idle. + // TODO i#6959: We should go the EOF/idle route if + // inputs_[prev_index].unscheduled as otherwise we're ignoring its + // unscheduled transition: although if there are no other threads + // at all (not just an empty queue) this turns into the + // eof_or_idle() all-unscheduled scenario. Once we have some kind + // of early exit option we'll add the unscheduled check here. + if (inputs_[prev_index].at_eof) { + lock.unlock(); + sched_type_t::stream_status_t status = + eof_or_idle(output, prev_index); + if (status != STATUS_STOLE) + return status; + index = outputs_[output].cur_input; + res = STATUS_OK; + } else + index = prev_index; // Go back to prior. + } } else { + // There's something else to run, or we'll soon be in the queue + // even if it's empty now. // Give up the input before we go to the queue so we can add // ourselves to the queue. If we're the highest priority we // shouldn't switch. The queue preserves FIFO for same-priority @@ -3088,7 +3287,7 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu set_cur_input(output, INVALID_INPUT_ORDINAL); input_info_t *queue_next = nullptr; sched_type_t::stream_status_t status = - pop_from_ready_queue(output, queue_next); + pop_from_ready_queue(output, output, queue_next); if (status != STATUS_OK) { if (status == STATUS_IDLE) { outputs_[output].waiting = true; @@ -3108,10 +3307,13 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu return status; } if (queue_next == nullptr) { - assert(blocked_time == 0 || prev_index == INVALID_INPUT_ORDINAL); - return eof_or_idle(output, need_lock, prev_index); - } - index = queue_next->index; + status = eof_or_idle(output, prev_index); + if (status != STATUS_STOLE) + return status; + index = outputs_[output].cur_input; + res = STATUS_OK; + } else + index = queue_next->index; } } else if (options_.deps == DEPENDENCY_TIMESTAMPS) { uint64_t min_time = std::numeric_limits::max(); @@ -3123,8 +3325,14 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu index = static_cast(i); } } - if (index < 0) - return eof_or_idle(output, need_lock, prev_index); + if (index < 0) { + sched_type_t::stream_status_t status = + eof_or_idle(output, prev_index); + if (status != STATUS_STOLE) + return status; + index = outputs_[output].cur_input; + res = STATUS_OK; + } VPRINT(this, 2, "next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n", @@ -3168,16 +3376,28 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu } // We can't easily place these stats inside set_cur_input() as we call that to // temporarily give up our input. - if (prev_index == index) + update_switch_stats(output, prev_index, index); + set_cur_input(output, index); + return res; +} + +template +void +scheduler_tmpl_t::update_switch_stats(output_ordinal_t output, + input_ordinal_t prev_input, + input_ordinal_t new_input) +{ + if (prev_input == new_input) ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_SWITCH_NOP]; - else if (prev_index != INVALID_INPUT_ORDINAL && index != INVALID_INPUT_ORDINAL) + else if (prev_input != INVALID_INPUT_ORDINAL && new_input != INVALID_INPUT_ORDINAL) ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_INPUT]; - else if (index == INVALID_INPUT_ORDINAL) + else if (new_input == INVALID_INPUT_ORDINAL) ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_IDLE]; - else + else { ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_SWITCH_IDLE_TO_INPUT]; - set_cur_input(output, index); - return res; + // Reset the flag so we'll try to steal if we go idle again. + outputs_[output].tried_to_steal_on_idle = false; + } } template @@ -3292,32 +3512,69 @@ scheduler_tmpl_t::process_marker(input_info_t &input, input_ordinal_t target_idx = it->second; VPRINT(this, 3, "input %d re-scheduling input %d @%" PRIu64 "\n", input.index, target_idx, input.reader->get_last_timestamp()); - // Release the input lock before acquiring sched_lock, to meet our lock - // ordering convention to avoid deadlocks. + // Release the input lock before acquiring more input locks input.lock->unlock(); { - bool need_sched_lock; - auto scoped_sched_lock = - acquire_scoped_sched_lock_if_necessary(need_sched_lock); input_info_t *target = &inputs_[target_idx]; - std::lock_guard lock(*target->lock); - if (target->unscheduled) { + std::unique_lock target_lock(*target->lock); + if (target->at_eof) { + VPRINT(this, 3, "input %d at eof ignoring re-schedule\n", target_idx); + } else if (target->unscheduled) { target->unscheduled = false; - if (unscheduled_priority_.find(target)) { - add_to_ready_queue(target); - unscheduled_priority_.erase(target); - } else if (ready_priority_.find(target)) { + bool on_unsched_queue = false; + { + std::lock_guard unsched_lock( + *unscheduled_priority_.lock); + if (unscheduled_priority_.queue.find(target)) { + unscheduled_priority_.queue.erase(target); + on_unsched_queue = true; + } + } + // We have to give up the unsched lock before calling add_to_ready_queue + // as it acquires the output lock. + if (on_unsched_queue) { + output_ordinal_t resume_output = target->prev_output; + if (resume_output == INVALID_OUTPUT_ORDINAL) + resume_output = output; + // We can't hold any locks when calling add_to_ready_queue. + // This input is no longer on any queue, so few things can happen + // while we don't hold the input lock: a competing _SCHEDULE will + // not find the output and it can't have blocked_time>0 (invariant + // for things on unsched q); once it's on the new queue we don't + // do anything further here so we're good to go. + target_lock.unlock(); + add_to_ready_queue(resume_output, target); + target_lock.lock(); + } else { // We assume blocked_time is from _ARG_TIMEOUT and is not from // regularly-blocking i/o. We assume i/o getting into the mix is // rare enough or does not matter enough to try to have separate // timeouts. if (target->blocked_time > 0) { - VPRINT( - this, 3, - "switchto::resume erasing blocked time for target input %d\n", - target->index); - --num_blocked_; - target->blocked_time = 0; + VPRINT(this, 3, + "switchto::resume erasing blocked time for target " + "input %d\n", + target->index); + output_ordinal_t target_output = target->containing_output; + // There could be no output owner if we're mid-rebalance. + if (target_output != INVALID_OUTPUT_ORDINAL) { + // We can't hold the input lock to acquire the output lock. + target_lock.unlock(); + { + auto scoped_output_lock = + acquire_scoped_output_lock_if_necessary( + target_output); + output_info_t &out = outputs_[target_output]; + if (out.ready_queue.queue.find(target)) { + --out.ready_queue.num_blocked; + } + // Decrement this holding the lock to synch with + // pop_from_ready_queue(). + target->blocked_time = 0; + } + target_lock.lock(); + } else + target->blocked_time = 0; } } } else { @@ -3351,8 +3608,9 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, // with a counter-based time, weighted appropriately for STATUS_IDLE. cur_time = get_time_micros(); } - outputs_[output].cur_time = cur_time; // Invalid values are checked below. - if (!outputs_[output].active) + // Invalid values for cur_time are checked below. + outputs_[output].cur_time->store(cur_time, std::memory_order_release); + if (!outputs_[output].active->load(std::memory_order_acquire)) return sched_type_t::STATUS_IDLE; if (outputs_[output].waiting) { if (options_.mapping == MAP_AS_PREVIOUSLY && @@ -3378,7 +3636,11 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, if (outputs_[output].cur_input < 0) { // This happens with more outputs than inputs. For non-empty outputs we // require cur_input to be set to >=0 during init(). - return eof_or_idle(output, /*hold_sched_lock=*/false, outputs_[output].cur_input); + sched_type_t::stream_status_t status = + eof_or_idle(output, outputs_[output].cur_input); + assert(status != STATUS_OK); + if (status != STATUS_STOLE) + return status; } input = &inputs_[outputs_[output].cur_input]; auto lock = std::unique_lock(*input->lock); @@ -3460,7 +3722,9 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, uint64_t prev_time_in_quantum = 0; // XXX i#6831: Refactor to use subclasses or templates to specialize // scheduler code based on mapping options, to avoid these top-level - // conditionals in many functions? + // conditionals in many functions? The next_record() and pick_next_input() + // could also be put into output_info_t, promoting it to a class and + // subclassing it per mapping mode. if (options_.mapping == MAP_AS_PREVIOUSLY) { assert(outputs_[output].record_index >= 0); if (outputs_[output].record_index >= @@ -3679,6 +3943,7 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, } if (input->needs_roi && options_.mapping != MAP_AS_PREVIOUSLY && !input->regions_of_interest.empty()) { + input_ordinal_t prev_input = input->index; sched_type_t::stream_status_t res = advance_region_of_interest(output, record, *input); if (res == sched_type_t::STATUS_SKIPPED) { @@ -3687,6 +3952,14 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, input->needs_roi = false; input->needs_advance = false; continue; + } else if (res == sched_type_t::STATUS_STOLE) { + // We need to loop to get the new record. + input = &inputs_[outputs_[output].cur_input]; + update_switch_stats(output, prev_input, input->index); + lock.unlock(); + lock = std::unique_lock(*input->lock); + lock.lock(); + continue; } else if (res != sched_type_t::STATUS_OK) return res; } else { @@ -3798,7 +4071,6 @@ scheduler_tmpl_t::mark_input_eof(input_info_t &input) template typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::eof_or_idle(output_ordinal_t output, - bool hold_sched_lock, input_ordinal_t prev_input) { // XXX i#6831: Refactor to use subclasses or templates to specialize @@ -3813,58 +4085,37 @@ scheduler_tmpl_t::eof_or_idle(output_ordinal_t output, live_replay_output_count_.load(std::memory_order_acquire) == 0)) { assert(options_.mapping != MAP_AS_PREVIOUSLY || outputs_[output].at_eof); return sched_type_t::STATUS_EOF; - } else { - bool need_lock; - auto scoped_lock = hold_sched_lock - ? std::unique_lock() - : acquire_scoped_sched_lock_if_necessary(need_lock); - if (options_.mapping == MAP_TO_ANY_OUTPUT) { - // Workaround to avoid hangs when _SCHEDULE and/or _DIRECT_THREAD_SWITCH - // directives miss their targets (due to running with a subset of the - // original threads, or other scenarios) and we end up with no scheduled - // inputs but a set of unscheduled inputs who will never be scheduled. - VPRINT(this, 4, - "eof_or_idle output=%d live=%d unsched=%zu runq=%zu blocked=%d\n", - output, live_input_count_.load(std::memory_order_acquire), - unscheduled_priority_.size(), ready_priority_.size(), num_blocked_); - if (ready_priority_.empty() && !unscheduled_priority_.empty()) { - if (outputs_[output].wait_start_time == 0) { - outputs_[output].wait_start_time = get_output_time(output); - } else { - uint64_t now = get_output_time(output); - double elapsed_micros = (now - outputs_[output].wait_start_time) * - options_.time_units_per_us; - if (elapsed_micros > options_.block_time_max_us) { - // XXX i#6822: We may want some other options here for what to - // do. We could release just one input at a time, which would be - // the same scheduling order (as we have FIFO in - // unscheduled_priority_) but may take a long time at - // block_time_max_us each; we could declare we're done and just - // exit, maybe under a flag or if we could see what % of total - // records we've processed. - VPRINT(this, 1, - "eof_or_idle moving entire unscheduled queue to ready " - "queue\n"); - while (!unscheduled_priority_.empty()) { - input_info_t *tomove = unscheduled_priority_.top(); - std::lock_guard lock(*tomove->lock); - tomove->unscheduled = false; - ready_priority_.push(tomove); - unscheduled_priority_.pop(); - } - outputs_[output].wait_start_time = 0; - } - } - } else { - outputs_[output].wait_start_time = 0; + } + // Before going idle, try to steal work from another output. + // We start with us+1 to avoid everyone stealing from the low-numbered outputs. + // We only try when we first transition to idle; we rely on rebalancing after that, + // to avoid repeatededly grabbing other output's locks over and over. + if (!outputs_[output].tried_to_steal_on_idle) { + outputs_[output].tried_to_steal_on_idle = true; + for (unsigned int i = 1; i < outputs_.size(); ++i) { + output_ordinal_t target = (output + i) % outputs_.size(); + assert(target != output); // Sanity check (we won't reach "output"). + input_info_t *queue_next = nullptr; + sched_type_t::stream_status_t status = + pop_from_ready_queue(target, output, queue_next); + if (status == STATUS_OK && queue_next != nullptr) { + set_cur_input(output, queue_next->index); + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS]; + VPRINT(this, 2, + "eof_or_idle: output %d stole input %d from %d's ready_queue\n", + output, queue_next->index, target); + return STATUS_STOLE; } + // We didn't find anything; loop and check another output. } - outputs_[output].waiting = true; - if (prev_input != INVALID_INPUT_ORDINAL) - ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_IDLE]; - set_cur_input(output, INVALID_INPUT_ORDINAL); - return sched_type_t::STATUS_IDLE; + VPRINT(this, 3, "eof_or_idle: output %d failed to steal from anyone\n", output); } + // We rely on rebalancing to handle the case of every input being unscheduled. + outputs_[output].waiting = true; + if (prev_input != INVALID_INPUT_ORDINAL) + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_IDLE]; + set_cur_input(output, INVALID_INPUT_ORDINAL); + return sched_type_t::STATUS_IDLE; } template @@ -3894,24 +4145,194 @@ scheduler_tmpl_t::set_output_active(output_ordinal_t out { if (options_.mapping != MAP_TO_ANY_OUTPUT) return sched_type_t::STATUS_INVALID; - if (outputs_[output].active == active) + if (outputs_[output].active->load(std::memory_order_acquire) == active) return sched_type_t::STATUS_OK; - outputs_[output].active = active; + outputs_[output].active->store(active, std::memory_order_release); VPRINT(this, 2, "Output stream %d is now %s\n", output, active ? "active" : "inactive"); - std::lock_guard guard(sched_lock_); + std::vector ordinals; if (!active) { // Make the now-inactive output's input available for other cores. // This will reset its quantum too. // We aren't switching on a just-read instruction not passed to the consumer, // if the queue is empty. - if (inputs_[outputs_[output].cur_input].queue.empty()) - inputs_[outputs_[output].cur_input].switching_pre_instruction = true; - set_cur_input(output, INVALID_INPUT_ORDINAL); + input_ordinal_t cur_input = outputs_[output].cur_input; + if (cur_input != INVALID_INPUT_ORDINAL) { + if (inputs_[cur_input].queue.empty()) + inputs_[cur_input].switching_pre_instruction = true; + set_cur_input(output, INVALID_INPUT_ORDINAL); + } + // Move the ready_queue to other outputs. + { + auto lock = acquire_scoped_output_lock_if_necessary(output); + while (!outputs_[output].ready_queue.queue.empty()) { + input_info_t *tomove = outputs_[output].ready_queue.queue.top(); + ordinals.push_back(tomove->index); + outputs_[output].ready_queue.queue.pop(); + } + } } else { outputs_[output].waiting = true; } - return sched_type_t::STATUS_OK; + return rebalance_queues(output, ordinals); +} + +template +void +scheduler_tmpl_t::print_queue_stats() +{ + size_t unsched_size = 0; + { + std::lock_guard unsched_lock(*unscheduled_priority_.lock); + unsched_size = unscheduled_priority_.queue.size(); + } + int live = live_input_count_.load(std::memory_order_acquire); + VPRINT(this, 1, "inputs: %zd scheduleable, %zd unscheduled, %zd eof\n", + live - unsched_size, unsched_size, inputs_.size() - live); + for (unsigned int i = 0; i < outputs_.size(); ++i) { + auto lock = acquire_scoped_output_lock_if_necessary(i); + VPRINT(this, 1, " out #%d: running #%d; %zd in queue; %d blocked\n", i, + // XXX: Reading this is racy; we're ok with that. + outputs_[i].cur_input, outputs_[i].ready_queue.queue.size(), + outputs_[i].ready_queue.num_blocked); + } +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_tmpl_t::rebalance_queues( + output_ordinal_t triggering_output, std::vector inputs_to_add) +{ + std::thread::id nobody; + if (!rebalancer_.compare_exchange_weak(nobody, std::this_thread::get_id(), + std::memory_order_release, + std::memory_order_relaxed)) { + // Someone else is rebalancing. + return sched_type_t::STATUS_OK; + } + sched_type_t::stream_status_t status = sched_type_t::STATUS_OK; + assert(options_.mapping == MAP_TO_ANY_OUTPUT); + VPRINT(this, 1, "Output %d triggered a rebalance @%" PRIu64 ":\n", triggering_output, + get_output_time(triggering_output)); + // First, update the time to avoid more threads coming here. + last_rebalance_time_.store(get_output_time(triggering_output), + std::memory_order_release); + VPRINT(this, 2, "Before rebalance:\n"); + VDO(this, 2, { print_queue_stats(); }); + ++outputs_[triggering_output] + .stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_REBALANCES]; + + // Workaround to avoid hangs when _SCHEDULE and/or _DIRECT_THREAD_SWITCH + // directives miss their targets (due to running with a subset of the + // original threads, or other scenarios) and we end up with no scheduled + // inputs but a set of unscheduled inputs who will never be scheduled. + // TODO i#6959: Just exit early instead, maybe under a flag. + // It would help to see what % of total records we've processed. + size_t unsched_size = 0; + { + std::lock_guard unsched_lock(*unscheduled_priority_.lock); + unsched_size = unscheduled_priority_.queue.size(); + } + if (live_input_count_.load(std::memory_order_acquire) == + static_cast(unsched_size)) { + VPRINT(this, 1, "rebalancing moving entire unscheduled queue to ready_queues\n"); + { + std::lock_guard unsched_lock(*unscheduled_priority_.lock); + while (!unscheduled_priority_.queue.empty()) { + input_info_t *tomove = unscheduled_priority_.queue.top(); + inputs_to_add.push_back(tomove->index); + unscheduled_priority_.queue.pop(); + } + } + for (input_ordinal_t input : inputs_to_add) { + std::lock_guard lock(*inputs_[input].lock); + inputs_[input].unscheduled = false; + } + } + + int live_inputs = live_input_count_.load(std::memory_order_acquire); + int live_outputs = 0; + for (unsigned int i = 0; i < outputs_.size(); ++i) { + if (outputs_[i].active->load(std::memory_order_acquire)) + ++live_outputs; + } + double avg_per_output = live_inputs / static_cast(live_outputs); + unsigned int avg_ceiling = static_cast(std::ceil(avg_per_output)); + unsigned int avg_floor = static_cast(std::floor(avg_per_output)); + int iteration = 0; + do { + // Walk the outputs, filling too-short queues from inputs_to_add and + // shrinking too-long queues into inputs_to_add. We may need a 2nd pass + // for this; and a 3rd pass if bindings prevent even splitting. + VPRINT( + this, 3, + "Rebalance iteration %d inputs_to_add size=%zu avg_per_output=%4.1f %d-%d\n", + iteration, inputs_to_add.size(), avg_per_output, avg_floor, avg_ceiling); + // We're giving up the output locks as we go, so there may be some stealing + // in the middle of our operation, but the rebalancing is approximate anyway. + for (unsigned int i = 0; i < outputs_.size(); ++i) { + if (!outputs_[i].active->load(std::memory_order_acquire)) + continue; + auto lock = acquire_scoped_output_lock_if_necessary(i); + // Only remove on the 1st iteration; later we can exceed due to binding + // constraints. + while (iteration == 0 && outputs_[i].ready_queue.queue.size() > avg_ceiling) { + input_info_t *queue_next = nullptr; + // We use our regular pop_from_ready_queue which means we leave + // blocked inputs on the queue: those do not get rebalanced. + // XXX: Should we revisit that? + // + // We remove from the back to avoid penalizing the next-to-run entries + // at the front of the queue by putting them at the back of another + // queue. + status = pop_from_ready_queue_hold_locks(i, INVALID_OUTPUT_ORDINAL, + queue_next, /*from_back=*/true); + if (status == STATUS_OK && queue_next != nullptr) { + VPRINT(this, 3, + "Rebalance iteration %d: output %d giving up input %d\n", + iteration, i, queue_next->index); + inputs_to_add.push_back(queue_next->index); + } else + break; + } + std::vector incompatible_inputs; + // If we reach the 3rd iteration, we have fussy inputs with bindings. + // Try to add them to every output. + while ( + (outputs_[i].ready_queue.queue.size() < avg_ceiling || iteration > 1) && + !inputs_to_add.empty()) { + input_ordinal_t ordinal = inputs_to_add.back(); + inputs_to_add.pop_back(); + input_info_t &input = inputs_[ordinal]; + std::lock_guard input_lock(*input.lock); + if (input.binding.empty() || + input.binding.find(i) != input.binding.end()) { + VPRINT(this, 3, "Rebalance iteration %d: output %d taking input %d\n", + iteration, i, ordinal); + add_to_ready_queue_hold_locks(i, &input); + } else { + incompatible_inputs.push_back(ordinal); + } + } + inputs_to_add.insert(inputs_to_add.end(), incompatible_inputs.begin(), + incompatible_inputs.end()); + } + ++iteration; + if (iteration >= 3 && !inputs_to_add.empty()) { + // This is possible with bindings limited to inactive outputs. + // XXX: Rather than return an error, we could add to the unscheduled queue, + // but do not mark the input unscheduled. Then when an output is + // marked active, we could walk the unscheduled queue and take + // inputs not marked unscheduled. + VPRINT(this, 1, "Rebalance hit impossible binding\n"); + status = sched_type_t::STATUS_IMPOSSIBLE_BINDING; + break; + } + } while (!inputs_to_add.empty()); + VPRINT(this, 2, "After:\n"); + VDO(this, 2, { print_queue_stats(); }); + rebalancer_.store(std::thread::id(), std::memory_order_release); + return status; } template class scheduler_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler.h b/clients/drcachesim/scheduler/scheduler.h index 821644b5c38..48634edd3a9 100644 --- a/clients/drcachesim/scheduler/scheduler.h +++ b/clients/drcachesim/scheduler/scheduler.h @@ -142,6 +142,11 @@ template class scheduler_tmpl_t { * time for #STATUS_IDLE. */ STATUS_IDLE, + /** + * Indicates an input has a binding whose outputs are all marked inactive. + */ + STATUS_IMPOSSIBLE_BINDING, + STATUS_STOLE, /**< Used for internal scheduler purposes. */ }; /** Identifies an input stream by its index. */ @@ -780,6 +785,20 @@ template class scheduler_tmpl_t { */ // TODO i#6959: Once we have -exit_if_all_unscheduled raise this. uint64_t block_time_max_us = 2500; + /** + * The minimum time in microseconds that must have elapsed after an input last + * ran on an output before that input is allowed to be migrated to a different + * output. This value is multiplied by #time_units_per_us to produce a value + * that is compared to the "cur_time" parameter to next_record(). + */ + uint64_t migration_threshold_us = 500; + /** + * The period in microseconds at which rebalancing is performed to keep output + * run queues from becoming uneven. This value is multiplied by + * #time_units_per_us to produce a value that is compared to the "cur_time" + * parameter to next_record(). + */ + uint64_t rebalance_period_us = 150000; }; /** @@ -1207,8 +1226,8 @@ template class scheduler_tmpl_t { /** Default constructor. */ scheduler_tmpl_t() - : ready_priority_(static_cast(get_time_micros())) { + last_rebalance_time_.store(0, std::memory_order_relaxed); } virtual ~scheduler_tmpl_t(); @@ -1308,9 +1327,11 @@ template class scheduler_tmpl_t { std::unique_ptr reader_end; // While the scheduler only hands an input to one output at a time, during // scheduling decisions one thread may need to access another's fields. + // This lock controls access to fields that are modified during scheduling. + // This must be accessed after any output lock. + // If multiple input locks are held at once, they should be acquired in + // increased "index" order. // We use a unique_ptr to make this moveable for vector storage. - // For inputs not actively assigned to a core but sitting in the ready_queue, - // sched_lock_ suffices to synchronize access. std::unique_ptr lock; // A tid can be duplicated across workloads so we need the pair of // workload index + tid to identify the original input. @@ -1338,7 +1359,12 @@ template class scheduler_tmpl_t { bool needs_advance = false; bool needs_roi = true; bool at_eof = false; + // The output whose ready queue or active run slot we are in. + output_ordinal_t containing_output = INVALID_OUTPUT_ORDINAL; + // The previous containing_output. output_ordinal_t prev_output = INVALID_OUTPUT_ORDINAL; + // The current output where we're actively running. + output_ordinal_t cur_output = INVALID_OUTPUT_ORDINAL; uintptr_t next_timestamp = 0; uint64_t instrs_in_quantum = 0; int instrs_pre_read = 0; @@ -1380,6 +1406,7 @@ template class scheduler_tmpl_t { bool unscheduled = false; // Causes the next unscheduled entry to abort. bool skip_next_unscheduled = false; + uint64_t last_run_time = 0; }; // Format for recording a schedule to disk. A separate sequence of these records @@ -1447,16 +1474,137 @@ template class scheduler_tmpl_t { uint64_t timestamp = 0; } END_PACKED_STRUCTURE; + /////////////////////////////////////////////////////////////////////////// + // Support for ready queues for who to schedule next: + + // I tried using a lambda where we could capture "this" and so use int indices + // in the queues instead of pointers but hit problems (weird crash while running) + // so I'm sticking with this solution of a separate struct. + struct InputTimestampComparator { + bool + operator()(input_info_t *a, input_info_t *b) const + { + if (a->priority != b->priority) + return a->priority < b->priority; // Higher is better. + if (a->order_by_timestamp && + (a->reader->get_last_timestamp() - a->base_timestamp) != + (b->reader->get_last_timestamp() - b->base_timestamp)) { + // Lower is better. + return (a->reader->get_last_timestamp() - a->base_timestamp) > + (b->reader->get_last_timestamp() - b->base_timestamp); + } + // We use a counter to provide FIFO order for same-priority inputs. + return a->queue_counter > b->queue_counter; // Lower is better. + } + }; + + // Now that we have the lock usage narrow inside certain routines, we + // may want to consider making this a class and having it own the add/pop + // routines? The complexity is popping for a different output. + struct input_queue_t { + explicit input_queue_t(int rand_seed = 0) + : lock(new mutex_dbg_owned) + , queue(rand_seed) + { + } + // Protects access to this structure. + // We use a unique_ptr to make this moveable for vector storage. + // An output's ready_queue lock must be acquired *before* any input locks. + // Multiple output locks should be acquired in increasing output ordinal order. + std::unique_ptr lock; + // Inputs ready to be scheduled, sorted by priority and then timestamp if + // timestamp dependencies are requested. We use the timestamp delta from the + // first observed timestamp in each workload in order to mix inputs from different + // workloads in the same queue. FIFO ordering is used for same-priority entries. + flexible_queue_t queue; + // Queue counter used to provide FIFO for same-priority inputs. + uint64_t fifo_counter = 0; + // Tracks the count of blocked inputs. + int num_blocked = 0; + }; + + bool + need_output_lock(); + + std::unique_lock + acquire_scoped_output_lock_if_necessary(output_ordinal_t output); + + bool + ready_queue_empty(output_ordinal_t output); + + // If input->unscheduled is true and input->blocked_time is 0, input + // is placed on the unscheduled_priority_ queue instead. + // The caller cannot hold the input's lock: this routine will acquire it. + void + add_to_ready_queue(output_ordinal_t output, input_info_t *input); + + // Identical to add_to_ready_queue() except the output's lock must be held by the + // caller. + // The caller must also hold the input's lock. + void + add_to_ready_queue_hold_locks(output_ordinal_t output, input_info_t *input); + + // The caller must hold the input's lock. + void + add_to_unscheduled_queue(input_info_t *input); + + uint64_t + scale_blocked_time(uint64_t blocked_time) const; + + // The input's lock must be held by the caller. + // Returns a multiplier for how long the input should be considered blocked. + bool + syscall_incurs_switch(input_info_t *input, uint64_t &blocked_time); + + // "for_output" is which output stream is looking for a new input; only an + // input which is able to run on that output will be selected. + // for_output can be INVALID_OUTPUT_ORDINAL, which will ignore bindings. + // If from_output != for_output (including for_output == INVALID_OUTPUT_ORDINAL) + // this is a migration and only migration-ready inputs will be picked. + stream_status_t + pop_from_ready_queue(output_ordinal_t from_output, output_ordinal_t for_output, + input_info_t *&new_input); + + // Identical to pop_from_ready_queue but the caller must hold both output locks. + stream_status_t + pop_from_ready_queue_hold_locks(output_ordinal_t from_output, + output_ordinal_t for_output, input_info_t *&new_input, + bool from_back = false); + + stream_status_t + rebalance_queues(output_ordinal_t triggering_output, + std::vector inputs_to_add); + + void + print_queue_stats(); + + void + update_switch_stats(output_ordinal_t output, input_ordinal_t prev_input, + input_ordinal_t new_input); + + /// + /////////////////////////////////////////////////////////////////////////// + + // We have one output_info_t per output stream, and at most one worker + // thread owns one output, so most fields are accessed only by one thread. + // The exception is "ready_queue" which can be accessed by other threads; + // it is protected using its internal lock. struct output_info_t { output_info_t(scheduler_tmpl_t *scheduler, output_ordinal_t ordinal, typename spec_type_t::speculator_flags_t speculator_flags, - RecordType last_record_init, int verbosity = 0) + int rand_seed, RecordType last_record_init, int verbosity = 0) : self_stream(scheduler, ordinal, verbosity) , stream(&self_stream) + , ready_queue(rand_seed) , speculator(speculator_flags, verbosity) , last_record(last_record_init) { + active = std::unique_ptr>(new std::atomic()); + active->store(true, std::memory_order_relaxed); + cur_time = + std::unique_ptr>(new std::atomic()); + cur_time->store(0, std::memory_order_relaxed); } stream_t self_stream; // Normally stream points to &self_stream, but for single_lockstep_output @@ -1471,6 +1619,8 @@ template class scheduler_tmpl_t { // lock for dynamically finding the next input, keeping things parallel. std::vector input_indices; int input_indices_index = 0; + // Inputs ready to be scheduled on this output. + input_queue_t ready_queue; // Speculation support. std::stack speculation_stack; // Stores PC of resumption point. speculator_tmpl_t speculator; @@ -1481,17 +1631,23 @@ template class scheduler_tmpl_t { // queueing a read-ahead instruction record for start_speculation(). addr_t prev_speculate_pc = 0; RecordType last_record; // Set to TRACE_TYPE_INVALID in constructor. - // A list of schedule segments. These are accessed only while holding - // sched_lock_. + // A list of schedule segments. During replay, this is read by other threads, + // but it is only written at init time. std::vector record; int record_index = 0; bool waiting = false; // Waiting or idling. - bool active = true; + // Used to limit stealing to one attempt per transition to idle. + bool tried_to_steal_on_idle = false; + // This is accessed by other outputs for stealing and rebalancing. + // Indirected so we can store it in our vector. + std::unique_ptr> active; bool in_kernel_code = false; bool in_context_switch_code = false; bool hit_switch_code_end = false; // Used for time-based quanta. - uint64_t cur_time = 0; + // This is accessed by other outputs for stealing and rebalancing. + // Indirected so we can store it in our vector. + std::unique_ptr> cur_time; // Used for MAP_TO_RECORDED_OUTPUT get_output_cpuid(). int64_t as_traced_cpuid = -1; // Used for MAP_AS_PREVIOUSLY with live_replay_output_count_. @@ -1609,6 +1765,7 @@ template class scheduler_tmpl_t { // Skips ahead to the next region of interest if necessary. // The caller must hold the input.lock. + // If STATUS_SKIPPED or STATUS_STOLE is returned, a new next record needs to be read. stream_status_t advance_region_of_interest(output_ordinal_t output, RecordType &record, input_info_t &input); @@ -1699,7 +1856,7 @@ template class scheduler_tmpl_t { check_valid_input_limits(const input_workload_t &workload, input_reader_info_t &reader_info); - // The sched_lock_ must be held when this is called. + // The caller cannot hold the output or input lock. stream_status_t set_cur_input(output_ordinal_t output, input_ordinal_t input); @@ -1710,7 +1867,6 @@ template class scheduler_tmpl_t { // Helper for pick_next_input() for MAP_AS_PREVIOUSLY. // No input_info_t lock can be held on entry. - // The sched_lock_ must be held on entry. stream_status_t pick_next_input_as_previously(output_ordinal_t output, input_ordinal_t &index); @@ -1851,9 +2007,11 @@ template class scheduler_tmpl_t { // Determines whether to exit or wait for other outputs when one output // runs out of things to do. May end up scheduling new inputs. + // If STATUS_STOLE is returned, a new input was found and its next record needs + // to be read. + // Never returns STATUS_OK. stream_status_t - eof_or_idle(output_ordinal_t output, bool hold_sched_lock, - input_ordinal_t prev_input); + eof_or_idle(output_ordinal_t output, input_ordinal_t prev_input); // Returns whether the current record for the current input stream scheduled on // the 'output_ordinal'-th output stream is from a part of the trace corresponding @@ -1867,67 +2025,6 @@ template class scheduler_tmpl_t { get_statistic(output_ordinal_t output, memtrace_stream_t::schedule_statistic_t stat) const; - /////////////////////////////////////////////////////////////////////////// - // Support for ready queues for who to schedule next: - - // I tried using a lambda where we could capture "this" and so use int indices - // in the queues instead of pointers but hit problems (weird crash while running) - // so I'm sticking with this solution of a separate struct. - struct InputTimestampComparator { - bool - operator()(input_info_t *a, input_info_t *b) const - { - if (a->priority != b->priority) - return a->priority < b->priority; // Higher is better. - if (a->order_by_timestamp && - (a->reader->get_last_timestamp() - a->base_timestamp) != - (b->reader->get_last_timestamp() - b->base_timestamp)) { - // Lower is better. - return (a->reader->get_last_timestamp() - a->base_timestamp) > - (b->reader->get_last_timestamp() - b->base_timestamp); - } - // We use a counter to provide FIFO order for same-priority inputs. - return a->queue_counter > b->queue_counter; // Lower is better. - } - }; - - bool - need_sched_lock(); - - std::unique_lock - acquire_scoped_sched_lock_if_necessary(bool &need_lock); - - // sched_lock_ must be held by the caller. - bool - ready_queue_empty(); - - // sched_lock_ must be held by the caller. - // If input->unscheduled is true and input->blocked_time is 0, input - // is placed on the unscheduled_priority_ queue instead. - void - add_to_ready_queue(input_info_t *input); - - // sched_lock_ must be held by the caller. - void - add_to_unscheduled_queue(input_info_t *input); - - uint64_t - scale_blocked_time(uint64_t blocked_time) const; - - // The input's lock must be held by the caller. - // Returns a multiplier for how long the input should be considered blocked. - bool - syscall_incurs_switch(input_info_t *input, uint64_t &blocked_time); - - // sched_lock_ must be held by the caller. - // "for_output" is which output stream is looking for a new input; only an - // input which is able to run on that output will be selected. - stream_status_t - pop_from_ready_queue(output_ordinal_t for_output, input_info_t *&new_input); - - /// - /////////////////////////////////////////////////////////////////////////// - // This has the same value as scheduler_options_t.verbosity (for use in VPRINT). int verbosity_ = 0; const char *output_prefix_ = "[scheduler]"; @@ -1936,27 +2033,17 @@ template class scheduler_tmpl_t { // Each vector element has a mutex which should be held when accessing its fields. std::vector inputs_; // Each vector element is accessed only by its owning thread, except the - // record and record_index fields which are accessed under sched_lock_. + // ready_queue-related plus record and record_index fields which are accessed under + // the output's own lock. std::vector outputs_; - // We use a central lock for global scheduling. We assume the synchronization - // cost is outweighed by the simulator's overhead. This protects concurrent - // access to inputs_.size(), outputs_.size(), ready_priority_, - // ready_counter_, unscheduled_priority_, and unscheduled_counter_. - // This cannot be acquired while holding an input lock: it must - // be acquired first, to avoid deadlocks. - mutex_dbg_owned sched_lock_; - // Inputs ready to be scheduled, sorted by priority and then timestamp if timestamp - // dependencies are requested. We use the timestamp delta from the first observed - // timestamp in each workload in order to mix inputs from different workloads in the - // same queue. FIFO ordering is used for same-priority entries. - flexible_queue_t ready_priority_; - // Inputs that are unscheduled indefinitely unless directly targeted. - flexible_queue_t unscheduled_priority_; - // Trackes the count of blocked inputs. Protected by sched_lock_. - int num_blocked_ = 0; - // Global queue counters used to provide FIFO for same-priority inputs. - uint64_t ready_counter_ = 0; - uint64_t unscheduled_counter_ = 0; + // This lock protects unscheduled_priority_ and unscheduled_counter_. + // It should be acquired *after* both output or input locks: it is narrowmost. + mutex_dbg_owned unsched_lock_; + // Inputs that are unscheduled indefinitely until directly targeted. + input_queue_t unscheduled_priority_; + // Rebalancing coordination. + std::atomic rebalancer_; + std::atomic last_rebalance_time_; // Count of inputs not yet at eof. std::atomic live_input_count_; // In replay mode, count of outputs not yet at the end of the replay sequence. diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index d9515676db9..9092b233186 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -34,12 +34,14 @@ #undef NDEBUG #include #include +#include #include #include #include #include #include #include +#include #include #include @@ -556,9 +558,9 @@ test_legacy_fields() // the usage to persist to their next scheduling which should only have // a single letter. static const char *const CORE0_SCHED_STRING = - "..AA......CCC..EEE..GGGDDDFFFBBBCCC.EEE.AAA.GGG."; + "..AA......CCC..EEE..GGGACCCEEEGGGAAACCC.EEGGAAE.G.A."; static const char *const CORE1_SCHED_STRING = - "..BB......DDD..FFFABCCCEEEAAAGGGDDD.FFF.BBB.____"; + "..BB......DDD..FFFBDDDFFFBBBDDD.FFF.BBB.____________"; for (int i = 0; i < NUM_OUTPUTS; i++) { std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; } @@ -1220,15 +1222,20 @@ test_synthetic() inputs[i].push_back(make_exit(tid)); } // Hardcoding here for the 2 outputs and 7 inputs. - // We expect 3 letter sequences (our quantum) alternating every-other as each - // core alternates. The dots are markers and thread exits. + // We make assumptions on the scheduler's initial runqueue assignment + // being round-robin, resulting in 4 on core0 (odd parity letters) and 3 on + // core1 (even parity letters). + // We expect 3 letter sequences (our quantum). + // The dots are markers and thread exits. // A and B have a voluntary switch after their 1st 2 letters, but we expect - // the usage to persist to their next scheduling which should only have + // their cpu usage to persist to their next scheduling which should only have // a single letter. + // Since core0 has an extra input, core1 finishes its runqueue first and then + // steals G from core0 (migration threshold is 0) and finishes it off. static const char *const CORE0_SCHED_STRING = - "..AA......CCC..EEE..GGGDDDFFFBBBCCC.EEE.AAA.GGG."; + "..AA......CCC..EEE..GGGACCCEEEGGGAAACCC.EEE.AAA."; static const char *const CORE1_SCHED_STRING = - "..BB......DDD..FFFABCCCEEEAAAGGGDDD.FFF.BBB.____"; + "..BB......DDD..FFFBDDDFFFBBBDDD.FFF.BBB.GGG.____"; { // Test instruction quanta. std::vector sched_inputs; @@ -1246,6 +1253,9 @@ test_synthetic() sched_ops.quantum_duration_instrs = QUANTUM_DURATION; sched_ops.block_time_multiplier = BLOCK_SCALE; sched_ops.time_units_per_us = 1.; + // Migration is measured in wall-clock-time for instr quanta + // so avoid non-determinism by having no threshold. + sched_ops.migration_threshold_us = 0; scheduler_t scheduler; if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) != scheduler_t::STATUS_SUCCESS) @@ -1259,14 +1269,18 @@ test_synthetic() // is the instances where the same letter appears 3 times without another letter // appearing in between (and ignoring the last letter for an input: EOF doesn't // count as a preempt). - verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/10, + verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/11, /*switch_input_to_idle=*/0, /*switch_idle_to_input=*/0, - /*switch_nop=*/0, /*preempts=*/6, /*direct_attempts=*/0, - /*direct_successes=*/0, /*migrations=*/7); - verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/11, - /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/0, /*switch_nop=*/0, /*preempts=*/8, /*direct_attempts=*/0, - /*direct_successes=*/0, /*migrations=*/7); + /*direct_successes=*/0, /*migrations=*/1); + verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/10, + /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/0, + /*switch_nop=*/0, /*preempts=*/6, /*direct_attempts=*/0, + /*direct_successes=*/0, /*migrations=*/0); + assert(scheduler.get_stream(0)->get_schedule_statistic( + memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS) == 0); + assert(scheduler.get_stream(1)->get_schedule_statistic( + memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS) == 1); #ifndef WIN32 // XXX: Windows microseconds on test VMs are very coarse and stay the same // for long periods. Instruction quanta use wall-clock idle times, so @@ -1312,6 +1326,7 @@ test_synthetic() sched_ops.time_units_per_us = 1.; sched_ops.quantum_duration_us = QUANTUM_DURATION; sched_ops.block_time_multiplier = BLOCK_SCALE; + sched_ops.migration_threshold_us = 0; scheduler_t scheduler; if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) != scheduler_t::STATUS_SUCCESS) @@ -1377,6 +1392,8 @@ test_synthetic_time_quanta() sched_ops.quantum_duration_us = 3; // Ensure it waits 10 steps. sched_ops.block_time_multiplier = 10. / (POST_BLOCK_TIME - PRE_BLOCK_TIME); + // Ensure steals happen in this short test. + sched_ops.migration_threshold_us = 0; zipfile_ostream_t outfile(record_fname); sched_ops.schedule_record_ostream = &outfile; if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) != @@ -1396,12 +1413,14 @@ test_synthetic_time_quanta() if (status == scheduler_t::STATUS_OK) { if (memref.marker.tid != expect_tid) { std::cerr << "Expected tid " << expect_tid - << " != " << memref.marker.tid << "\n"; + << " != " << memref.marker.tid << " at time " << time + << "\n"; assert(false); } if (memref.marker.type != expect_type) { std::cerr << "Expected type " << expect_type - << " != " << memref.marker.type << "\n"; + << " != " << memref.marker.type << " at time " << time + << "\n"; assert(false); } } @@ -1422,18 +1441,19 @@ test_synthetic_time_quanta() check_next(cpu0, time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_MARKER); check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_INSTR); // Advance cpu1 which is now at its quantum end at time 6 and should switch. + // However, there's no one else in cpu1's runqueue, so it proceeds with TID_B. + check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR); + check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR); + check_next(cpu1, time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_THREAD_EXIT); + // cpu1 should now steal TID_A from cpu0. check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR); check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER); check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER); check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER); check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_MARKER); - // We just hit a blocking syscall in A so we swap to B. - check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR); - // This is another quantum end at 9 but no other input is available. - check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR); - check_next(cpu1, time, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_THREAD_EXIT); + // We just hit a blocking syscall in A but there is nothing else to run. check_next(cpu1, ++time, scheduler_t::STATUS_IDLE); - // Finish off C on cpu 0. + // Finish off C on cpu 0. This hits a quantum end but there's no one else. check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_INSTR); check_next(cpu0, ++time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_INSTR); check_next(cpu0, time, scheduler_t::STATUS_OK, TID_C, TRACE_TYPE_THREAD_EXIT); @@ -1443,21 +1463,24 @@ test_synthetic_time_quanta() check_next(cpu1, ++time, scheduler_t::STATUS_IDLE); check_next(cpu0, ++time, scheduler_t::STATUS_IDLE); check_next(cpu1, ++time, scheduler_t::STATUS_IDLE); + check_next(cpu0, ++time, scheduler_t::STATUS_IDLE); + check_next(cpu1, ++time, scheduler_t::STATUS_IDLE); check_next(cpu1, ++time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR); check_next(cpu1, time, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_THREAD_EXIT); check_next(cpu1, ++time, scheduler_t::STATUS_EOF); check_next(cpu0, ++time, scheduler_t::STATUS_EOF); if (scheduler.write_recorded_schedule() != scheduler_t::STATUS_SUCCESS) assert(false); - // Check scheduler stats. + // Check scheduler stats. 2 nops (quantum end but no one else); 1 migration + // (the steal). verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/1, /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/0, /*switch_nop=*/1, /*preempts=*/2, /*direct_attempts=*/0, - /*direct_successes=*/0, /*migrations=*/0); - verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/2, - /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/1, - /*switch_nop=*/0, /*preempts=*/1, /*direct_attempts=*/0, /*direct_successes=*/0, /*migrations=*/1); + verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/1, + /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/1, + /*switch_nop=*/1, /*preempts=*/1, /*direct_attempts=*/0, + /*direct_successes=*/0, /*migrations=*/0); } { replay_file_checker_t checker; @@ -1496,7 +1519,7 @@ test_synthetic_time_quanta() // so the idle portions at the end here can have variable idle and wait // record counts. We thus just check the start. assert(sched_as_string[0].substr(0, 10) == "..A..CCC._"); - assert(sched_as_string[1].substr(0, 12) == "..BA....BB._"); + assert(sched_as_string[1].substr(0, 12) == "..BBB.A...._"); } #endif } @@ -1572,25 +1595,28 @@ test_synthetic_with_timestamps() // Hardcoding here for the 3x3+1 inputs where the inverted timestamps mean the // priorities are {C,B,A},{F,E,D},{I,H,G},{J} within the workloads. Across // workloads we should start with {C,F,I,J} and then move on to {B,E,H} and finish - // with {A,D,G}. We should interleave within each group -- except once we reach J - // we should completely finish it. + // with {A,D,G}. The scheduler's initial round-robin-in-priority-order allocation + // to runqueues means it will alternate in the priority order C,F,I,J,B,E,H,A,D,G: + // thus core0 has C,I,B,H,D and core1 has F,J,E,A,G. + // We should interleave within each group -- except once we reach J + // we should completely finish it. There should be no migrations. assert( sched_as_string[0] == - ".CC.C.II.IC.CC.F.FF.I.II.FF.F..BB.B.HH.HE.EE.BB.B.HH.H..DD.DA.AA.G.GG.DD.D._"); + ".CC.C.II.IC.CC.I.II.CC.C.II.I..BB.B.HH.HB.BB.H.HH.BB.B.HH.H..DD.DD.DD.DD.D._"); assert(sched_as_string[1] == - ".FF.F.JJ.JJ.JJ.JJ.J.CC.C.II.I..EE.EB.BB.H.HH.EE.E..AA.A.GG.GD.DD.AA.A.GG.G."); + ".FF.F.JJ.JJ.JJ.JJ.J.F.FF.FF.F..EE.EE.EE.EE.E..AA.A.GG.GA.AA.G.GG.AA.A.GG.G."); // Check scheduler stats. # switches is the # of letter transitions; # preempts // is the instances where the same letter appears 3 times without another letter // appearing in between (and ignoring the last letter for an input: EOF doesn't // count as a preempt). - verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/14, + verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/12, /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/0, - /*switch_nop=*/0, /*preempts=*/11, /*direct_attempts=*/0, - /*direct_successes=*/0, /*migrations=*/7); - verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/12, + /*switch_nop=*/2, /*preempts=*/10, /*direct_attempts=*/0, + /*direct_successes=*/0, /*migrations=*/0); + verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/9, /*switch_input_to_idle=*/0, /*switch_idle_to_input=*/0, - /*switch_nop=*/2, /*preempts=*/9, /*direct_attempts=*/0, - /*direct_successes=*/0, /*migrations=*/8); + /*switch_nop=*/5, /*preempts=*/10, /*direct_attempts=*/0, + /*direct_successes=*/0, /*migrations=*/0); } static void @@ -1672,21 +1698,21 @@ test_synthetic_with_priorities() // first. J remains uninterrupted due to lower timestamps. assert( sched_as_string[0] == - ".BB.B.HH.HE.EE.BB.B.HH.H..FF.F.JJ.JJ.JJ.JJ.J.CC.C.II.I..DD.DA.AA.G.GG.DD.D._"); + ".BB.B.HH.HB.BB.H.HH.BB.B.HH.H..FF.F.JJ.JJ.JJ.JJ.J.F.FF.FF.F..DD.DD.DD.DD.D._"); assert(sched_as_string[1] == - ".EE.EB.BB.H.HH.EE.E..CC.C.II.IC.CC.F.FF.I.II.FF.F..AA.A.GG.GD.DD.AA.A.GG.G."); + ".EE.EE.EE.EE.E..CC.C.II.IC.CC.I.II.CC.C.II.I..AA.A.GG.GA.AA.G.GG.AA.A.GG.G."); // Check scheduler stats. # switches is the # of letter transitions; # preempts // is the instances where the same letter appears 3 times without another letter // appearing in between (and ignoring the last letter for an input: EOF doesn't // count as a preempt). - verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/12, + verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/9, /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/0, - /*switch_nop=*/2, /*preempts=*/9, /*direct_attempts=*/0, - /*direct_successes=*/0, /*migrations=*/8); - verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/14, + /*switch_nop=*/5, /*preempts=*/10, /*direct_attempts=*/0, + /*direct_successes=*/0, /*migrations=*/0); + verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/12, /*switch_input_to_idle=*/0, /*switch_idle_to_input=*/0, - /*switch_nop=*/0, /*preempts=*/11, /*direct_attempts=*/0, - /*direct_successes=*/0, /*migrations=*/7); + /*switch_nop=*/2, /*preempts=*/10, /*direct_attempts=*/0, + /*direct_successes=*/0, /*migrations=*/0); } static void @@ -1744,6 +1770,9 @@ test_synthetic_with_bindings_time(bool time_deps) scheduler_t::SCHEDULER_DEFAULTS, /*verbosity=*/3); sched_ops.quantum_duration_instrs = 3; + // Migration is measured in wall-clock-time for instr quanta + // so avoid non-determinism by having no threshold. + sched_ops.migration_threshold_us = 0; scheduler_t scheduler; if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) != scheduler_t::STATUS_SUCCESS) @@ -1753,12 +1782,15 @@ test_synthetic_with_bindings_time(bool time_deps) for (int i = 0; i < NUM_OUTPUTS; i++) { std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; } - // We have {A,B,C} on {2,4}, {D,E,F} on {0,1}, and {G,H,I} on {1,2,3}: - assert(sched_as_string[0] == ".DD.D.FF.FD.DD.F.FF.DD.D.FF.F._"); - assert(sched_as_string[1] == ".EE.E.HH.HE.EE.I.II.EE.E.______"); - assert(sched_as_string[2] == ".AA.A.CC.CG.GG.C.CC.HH.H.CC.C."); - assert(sched_as_string[3] == ".GG.G.II.IH.HH.GG.G.II.I._____"); - assert(sched_as_string[4] == ".BB.BA.AA.B.BB.AA.A.BB.B._____"); + // We have {A,B,C} on {2,4}, {D,E,F} on {0,1}, and {G,H,I} on {1,2,3}. + // We should *not* see cores stealing inputs that can't run on them: so we + // should see tail idle time. We should see allowed steals with no migration + // threshold. + assert(sched_as_string[0] == ".DD.D.EE.E.FF.FD.DD.E.EE.F.FF.EE.E.FF.F."); + assert(sched_as_string[1] == ".GG.G.HH.HG.GG.H.HH.HH.H.DD.D.__________"); + assert(sched_as_string[2] == ".AA.A.BB.BA.AA.B.BB.BB.B._______________"); + assert(sched_as_string[3] == ".II.II.II.II.I.GG.G.____________________"); + assert(sched_as_string[4] == ".CC.CC.CC.CC.C.AA.A.____________________"); } static void @@ -1878,11 +1910,11 @@ test_synthetic_with_bindings_weighted() std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; } // We have {A,B,C} on {2,4}, {D,E,F} on {0,1}, and {G,H,I} on {1,2,3}: - assert(sched_as_string[0] == ".FF.FF.FF.FF.F..EE.EE.EE.EE.E._"); - assert(sched_as_string[1] == ".II.II.II.II.I..DD.DD.DD.DD.D._"); - assert(sched_as_string[2] == ".CC.CC.CC.CC.C..AA.AA.AA.AA.A._"); - assert(sched_as_string[3] == ".HH.HH.HH.HH.H..GG.GG.GG.GG.G."); - assert(sched_as_string[4] == ".BB.BB.BB.BB.B._______________"); + assert(sched_as_string[0] == ".FF.FF.FF.FF.F..EE.EE.EE.EE.E..DD.DD.DD.DD.D."); + assert(sched_as_string[1] == ".II.II.II.II.I..HH.HH.HH.HH.H._______________"); + assert(sched_as_string[2] == ".CC.CC.CC.CC.C..BB.BB.BB.BB.B._______________"); + assert(sched_as_string[3] == ".GG.GG.GG.GG.G.______________________________"); + assert(sched_as_string[4] == ".AA.AA.AA.AA.A.______________________________"); } static void @@ -2006,20 +2038,22 @@ test_synthetic_with_syscalls_multiple() // with the "." in run_lockstep_simulation(). The omitted "." markers also // explains why the two strings are different lengths. assert(sched_as_string[0] == - "BHHHFFFJJJJJJBEEHHHFFFBCCCEEIIIDDDBAAAGGGDDDB________B_______"); - assert(sched_as_string[1] == "EECCCIIICCCBEEJJJHHHIIIFFFEAAAGGGBDDDAAAGGG________B"); + "BHHHFFFJJJJJJBHHHJJJFFFFFFBHHHDDDDDDDDDB__________B__________B__________B____" + "______B_______B"); + assert(sched_as_string[1] == + "EECCCIIICCCIIIEECCCIIIAAAGGGEEAAAGGEEGAAEGGAG_________"); // Check scheduler stats. # switches is the # of letter transitions; # preempts // is the instances where the same letter appears 3 times without another letter // appearing in between (and ignoring the last letter for an input: EOF doesn't // count as a preempt). - verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/17, - /*switch_input_to_idle=*/2, /*switch_idle_to_input=*/1, - /*switch_nop=*/2, /*preempts=*/11, /*direct_attempts=*/0, - /*direct_successes=*/0, /*migrations=*/9); - verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/16, - /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/1, - /*switch_nop=*/0, /*preempts=*/10, /*direct_attempts=*/0, - /*direct_successes=*/0, /*migrations=*/11); + verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/11, + /*switch_input_to_idle=*/5, /*switch_idle_to_input=*/5, + /*switch_nop=*/4, /*preempts=*/10, /*direct_attempts=*/0, + /*direct_successes=*/0, /*migrations=*/0); + verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/19, + /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/0, + /*switch_nop=*/3, /*preempts=*/16, /*direct_attempts=*/0, + /*direct_successes=*/0, /*migrations=*/0); } static void @@ -2104,8 +2138,11 @@ test_synthetic_with_syscalls_single() std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; } // We expect an idle CPU every 3 instrs but starting at the 2nd (1-based % 3). - assert(sched_as_string[0] == "..A....A....___________________________________A....."); - assert(sched_as_string[1] == "____________A....A.....A....__A.....A....A...._______"); + // With per-output runqueues, cpu1 is idle the whole time. + assert(sched_as_string[0] == + "..A....A....__A....A.....A....__A.....A....A....__A....."); + assert(sched_as_string[1] == + "________________________________________________________"); } static bool @@ -2667,10 +2704,11 @@ test_replay() static constexpr int NUM_INSTRS = 9; static constexpr int QUANTUM_INSTRS = 3; // For our 2 outputs and 7 inputs: - // We expect 3 letter sequences (our quantum) alternating every-other as each - // core alternates; with an odd number the 2nd core finishes early. - static const char *const CORE0_SCHED_STRING = "AAACCCEEEGGGBBBDDDFFFAAA.CCC.EEE.GGG."; - static const char *const CORE1_SCHED_STRING = "BBBDDDFFFAAACCCEEEGGGBBB.DDD.FFF.____"; + // We expect 3 letter sequences (our quantum) alternating every-other with + // odd parity letters on core0 (A,C,E,G) and even parity on core1 (B,D,F). + // With a smaller runqueue, the 2nd core finishes early and steals E. + static const char *const CORE0_SCHED_STRING = "AAACCCEEEGGGAAACCCEEEGGGAAA.CCC.GGG."; + static const char *const CORE1_SCHED_STRING = "BBBDDDFFFBBBDDDFFFBBB.DDD.FFF.EEE.__"; static constexpr memref_tid_t TID_BASE = 100; std::vector inputs[NUM_INPUTS]; @@ -2700,6 +2738,9 @@ test_replay() scheduler_t::SCHEDULER_DEFAULTS, /*verbosity=*/3); sched_ops.quantum_duration_instrs = QUANTUM_INSTRS; + // Migration is measured in wall-clock-time for instr quanta + // so avoid non-determinism by having no threshold. + sched_ops.migration_threshold_us = 0; zipfile_ostream_t outfile(record_fname); sched_ops.schedule_record_ostream = &outfile; @@ -3489,7 +3530,7 @@ test_replay_limit() // The schedule varies by machine load and other factors so we don't // check for any precise ordering. // We do ensure we saw interleaving on at least one output. - assert(switches > 4); + assert(switches > 0); } // Replay. replay_func(); @@ -4258,17 +4299,17 @@ test_inactive() check_next(stream0, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR); // End of quantum. check_next(stream0, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_INSTR); - // Make cpu0 inactive and cpu1 active. + // Make cpu1 active and then cpu0 inactive. + status = stream1->set_active(true); + assert(status == scheduler_t::STATUS_OK); status = stream0->set_active(false); assert(status == scheduler_t::STATUS_OK); check_next(stream0, scheduler_t::STATUS_IDLE); - status = stream1->set_active(true); - assert(status == scheduler_t::STATUS_OK); // Now cpu1 should finish things. + check_next(stream1, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_THREAD_EXIT); check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR); check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_INSTR); check_next(stream1, scheduler_t::STATUS_OK, TID_B, TRACE_TYPE_THREAD_EXIT); - check_next(stream1, scheduler_t::STATUS_OK, TID_A, TRACE_TYPE_THREAD_EXIT); check_next(stream1, scheduler_t::STATUS_EOF); if (scheduler.write_recorded_schedule() != scheduler_t::STATUS_SUCCESS) assert(false); @@ -4305,8 +4346,8 @@ test_inactive() for (int i = 0; i < NUM_OUTPUTS; i++) { std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; } - assert(sched_as_string[0] == "..AABA.__"); - assert(sched_as_string[1] == "..B--BB."); + assert(sched_as_string[0] == "..AAB.___"); + assert(sched_as_string[1] == "..B-ABB."); } #endif // HAS_ZIP } @@ -4795,7 +4836,7 @@ test_unscheduled_fallback() "____________________________________________________________________________" "____________________________________________________________________________" "____________________________________________________________________________" - "_________________________________________________________BBBB.A."; + "___________BBBB.A."; std::vector sched_inputs; sched_inputs.emplace_back(std::move(readers)); @@ -4810,6 +4851,7 @@ test_unscheduled_fallback() sched_ops.blocking_switch_threshold = BLOCK_LATENCY; sched_ops.block_time_multiplier = BLOCK_SCALE; sched_ops.block_time_max_us = BLOCK_TIME_MAX; + sched_ops.rebalance_period_us = BLOCK_TIME_MAX; scheduler_t scheduler; if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) != scheduler_t::STATUS_SUCCESS) @@ -4848,6 +4890,7 @@ test_unscheduled_fallback() sched_ops.blocking_switch_threshold = BLOCK_LATENCY; sched_ops.block_time_multiplier = BLOCK_SCALE; sched_ops.block_time_max_us = BLOCK_TIME_MAX; + sched_ops.rebalance_period_us = BLOCK_TIME_MAX; sched_ops.honor_direct_switches = false; scheduler_t scheduler; if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) != @@ -5176,7 +5219,7 @@ test_kernel_switch_sequences() scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT, scheduler_t::DEPENDENCY_TIMESTAMPS, scheduler_t::SCHEDULER_DEFAULTS, - /*verbosity=*/4); + /*verbosity=*/3); sched_ops.quantum_duration_instrs = INSTR_QUANTUM; sched_ops.kernel_switch_reader = std::move(switch_reader); sched_ops.kernel_switch_reader_end = std::move(switch_reader_end); @@ -5270,11 +5313,13 @@ test_kernel_switch_sequences() std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; } assert(sched_as_string[0] == - "Av0iii,Ct0iitv0iii,Ep0iipv0iii,Gp0iipv0iii,It0iitv0iii,Cp0iipiii,Ep0iipiii," - "Gp0iipiii,Ap0iipiii,Bt0iitiii,Dp0iipiii,Ft0iitiii,Hp0iipiii______"); - assert(sched_as_string[1] == - "Bv0iii,Dp0iipv0iii,Ft0iitv0iii,Hp0iipv0iii,Ap0iipiii,Bt0iitiii,Dp0iipiii," - "Ft0iitiii,Hp0iipiii,It0iitiii,Cp0iipiii,Ep0iipiii,Gp0iipiii,It0iitiii"); + "Av0iii,Ct0iitv0iii,Ep0iipv0iii,Gp0iipv0iii,It0iitv0iii,Ap0iipiii,Ct0iitiii," + "Ep0iipiii,Gp0iipiii,It0iitiii,Ap0iipiii,Ct0iitiii,Ep0iipiii,Gp0iipiii," + "It0iitiii"); + assert( + sched_as_string[1] == + "Bv0iii,Dp0iipv0iii,Ft0iitv0iii,Hp0iipv0iii,Bp0iipiii,Dp0iipiii,Ft0iitiii," + "Hp0iipiii,Bp0iipiii,Dp0iipiii,Ft0iitiii,Hp0iipiii___________________________"); // Zoom in and check the first sequence record by record with value checks. int idx = 0; @@ -5577,6 +5622,125 @@ test_record_scheduler() check_next(stream0, record_scheduler_t::STATUS_EOF); } +static void +test_rebalancing() +{ + std::cerr << "\n----------------\nTesting rebalancing\n"; + // We want to get the cores into an unbalanced state. + // The scheduler will start out with round-robin even assignment. + // We use "unschedule" and "direct switch" operations to get all + // inputs onto one core. + static constexpr int NUM_OUTPUTS = 8; + static constexpr int NUM_INPUTS_UNSCHED = 24; + static constexpr int BLOCK_LATENCY = 100; + static constexpr double BLOCK_SCALE = 1. / (BLOCK_LATENCY); + static constexpr int QUANTUM_DURATION = 3 * NUM_OUTPUTS; + static constexpr int NUM_INSTRS = QUANTUM_DURATION * 3; + static constexpr int REBALANCE_PERIOD = NUM_OUTPUTS * 20 * NUM_INPUTS_UNSCHED; + static constexpr int MIGRATION_THRESHOLD = QUANTUM_DURATION; + static constexpr memref_tid_t TID_BASE = 100; + static constexpr memref_tid_t TID_A = TID_BASE + 0; + static constexpr memref_tid_t TID_B = TID_BASE + 1; + static constexpr uint64_t TIMESTAMP_START_INSTRS = 9999; + + std::vector refs_controller; + refs_controller.push_back(make_thread(TID_A)); + refs_controller.push_back(make_pid(1)); + refs_controller.push_back(make_version(TRACE_ENTRY_VERSION)); + refs_controller.push_back(make_timestamp(1001)); + refs_controller.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + // Our controller switches to the first thread, who then switches to + // the next, etc. + refs_controller.push_back(make_instr(/*pc=*/101)); + refs_controller.push_back(make_instr(/*pc=*/102)); + refs_controller.push_back(make_timestamp(1101)); + refs_controller.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + refs_controller.push_back(make_marker(TRACE_MARKER_TYPE_SYSCALL, 999)); + refs_controller.push_back( + make_marker(TRACE_MARKER_TYPE_SYSCALL_ARG_TIMEOUT, BLOCK_LATENCY)); + refs_controller.push_back(make_marker(TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH, TID_B)); + refs_controller.push_back(make_timestamp(1201)); + refs_controller.push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + refs_controller.push_back(make_instr(/*pc=*/401)); + refs_controller.push_back(make_exit(TID_A)); + // Our unsched threads all start unscheduled. + std::vector> refs_unsched(NUM_INPUTS_UNSCHED); + for (int i = 0; i < NUM_INPUTS_UNSCHED; ++i) { + refs_unsched[i].push_back(make_thread(TID_B + i)); + refs_unsched[i].push_back(make_pid(1)); + refs_unsched[i].push_back(make_version(TRACE_ENTRY_VERSION)); + refs_unsched[i].push_back(make_timestamp(2001)); + refs_unsched[i].push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + // B starts unscheduled with no timeout. + refs_unsched[i].push_back(make_marker(TRACE_MARKER_TYPE_SYSCALL, 999)); + refs_unsched[i].push_back( + make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0)); + refs_unsched[i].push_back(make_marker(TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE, 0)); + refs_unsched[i].push_back(make_timestamp(3001)); + refs_unsched[i].push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + // Once scheduled, wake up the next thread. + refs_unsched[i].push_back(make_timestamp(1101 + 100 * i)); + refs_unsched[i].push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + refs_unsched[i].push_back(make_marker(TRACE_MARKER_TYPE_SYSCALL, 999)); + refs_unsched[i].push_back( + make_marker(TRACE_MARKER_TYPE_SYSCALL_ARG_TIMEOUT, BLOCK_LATENCY)); + refs_unsched[i].push_back( + make_marker(TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH, TID_B + i + 1)); + // Give everyone the same timestamp so we alternate on preempts. + refs_unsched[i].push_back(make_timestamp(TIMESTAMP_START_INSTRS)); + refs_unsched[i].push_back(make_marker(TRACE_MARKER_TYPE_CPU_ID, 0)); + // Now run a bunch of instrs so we'll reach our rebalancing period. + for (int instrs = 0; instrs < NUM_INSTRS; ++instrs) { + refs_unsched[i].push_back(make_instr(/*pc=*/200 + instrs)); + } + refs_unsched[i].push_back(make_exit(TID_B + i)); + } + std::vector readers; + readers.emplace_back( + std::unique_ptr(new mock_reader_t(refs_controller)), + std::unique_ptr(new mock_reader_t()), TID_A); + for (int i = 0; i < NUM_INPUTS_UNSCHED; ++i) { + readers.emplace_back( + std::unique_ptr(new mock_reader_t(refs_unsched[i])), + std::unique_ptr(new mock_reader_t()), TID_B + i); + } + + std::vector sched_inputs; + sched_inputs.emplace_back(std::move(readers)); + scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT, + scheduler_t::DEPENDENCY_TIMESTAMPS, + scheduler_t::SCHEDULER_DEFAULTS, + /*verbosity=*/3); + // We use our mock's time==instruction count for a deterministic result. + sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME; + sched_ops.time_units_per_us = 1.; + sched_ops.quantum_duration_us = QUANTUM_DURATION; + sched_ops.block_time_multiplier = BLOCK_SCALE; + sched_ops.migration_threshold_us = MIGRATION_THRESHOLD; + sched_ops.rebalance_period_us = REBALANCE_PERIOD; + scheduler_t scheduler; + if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) != + scheduler_t::STATUS_SUCCESS) + assert(false); + std::vector sched_as_string = + run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE, /*send_time=*/true); + // We should see a lot of migrations away from output 0: we should see the + // per-output average per other output, minus the live input. + assert(scheduler.get_stream(0)->get_schedule_statistic( + memtrace_stream_t::SCHED_STAT_MIGRATIONS) >= + (NUM_INPUTS_UNSCHED / NUM_OUTPUTS) * (NUM_OUTPUTS - 1) - 1); + for (int i = 0; i < NUM_OUTPUTS; i++) { + std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; + // Ensure we see multiple inputs on each output. + std::unordered_set inputs; + for (char c : sched_as_string[i]) { + if (std::isalpha(c)) + inputs.insert(c); + } + assert(inputs.size() >= (NUM_INPUTS_UNSCHED / NUM_OUTPUTS) - 1); + } +} + int test_main(int argc, const char *argv[]) { @@ -5620,6 +5784,7 @@ test_main(int argc, const char *argv[]) test_kernel_switch_sequences(); test_random_schedule(); test_record_scheduler(); + test_rebalancing(); dr_standalone_exit(); return 0;