Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#5843 scheduler: Add direct thread switch support #6424

Merged
merged 4 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions clients/drcachesim/common/trace_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,14 @@ typedef enum {
TRACE_MARKER_TYPE_SYSCALL_FAILED,

/**
* This marker is emitted prior to a system call that causes an immediate switch to
* another thread on the same core (with the current thread entering an unscheduled
* state), bypassing the kernel scheduler's normal dynamic switch code based on run
* queues. The marker value holds the thread id of the target thread.
* This marker is emitted prior to a system call (but after the system call's
* #TRACE_MARKER_TYPE_SYSCALL and #TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL markers)
* that causes an immediate switch to another thread on the same core (with the
* current thread entering an unscheduled state), bypassing the kernel scheduler's
* normal dynamic switch code based on run queues. The marker value holds the
* thread id of the target thread. This should generally always be after a
* #TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL marker as such a switch always
* has a chance of blocking if the target needs to be migrated.
*/
TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH,

Expand Down
51 changes: 47 additions & 4 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
input.reader_end = std::move(reader.end);
input.needs_init = true;
workload_tids[input.tid] = input.index;
tid2input_[workload_tid_t(workload_idx, input.tid)] = index;
}
} else {
if (!workload.readers.empty())
Expand All @@ -556,6 +557,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
for (const auto &it : workload_tids) {
inputs_[it.second].workload = workload_idx;
workload2inputs[workload_idx].push_back(it.second);
tid2input_[workload_tid_t(workload_idx, it.first)] = it.second;
}
}
for (const auto &modifiers : workload.thread_modifiers) {
Expand Down Expand Up @@ -1567,9 +1569,9 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::set_cur_input(output_ordinal_t output,
input_ordinal_t input)
{
// XXX i#5843: Merge tracking of current inputs with ready_queue_ to better manage
// XXX i#5843: Merge tracking of current inputs with ready_priority_ to better manage
// the possible 3 states of each input (a live cur_input for an output stream, in
// the ready_queue_, or at EOF).
// the ready_queue_, or at EOF) (4 states once we add i/o wait times).
assert(output >= 0 && output < static_cast<output_ordinal_t>(outputs_.size()));
// 'input' might be INVALID_INPUT_ORDINAL.
assert(input < static_cast<input_ordinal_t>(inputs_.size()));
Expand Down Expand Up @@ -1764,7 +1766,35 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
if (res != sched_type_t::STATUS_OK)
return res;
} else if (options_.mapping == MAP_TO_ANY_OUTPUT) {
if (ready_queue_empty()) {
if (prev_index != INVALID_INPUT_ORDINAL &&
inputs_[prev_index].switch_to_input != INVALID_INPUT_ORDINAL) {
input_info_t *target = &inputs_[inputs_[prev_index].switch_to_input];
inputs_[prev_index].switch_to_input = INVALID_INPUT_ORDINAL;
// TODO i#5843: Once we add i/o wait times, we should also check
// the sleeping queue and wake up the target.
// We should probably implement the "Merge tracking..." proposal
// from the comment at the top of set_cur_input() too.
if (ready_priority_.find(target)) {
VPRINT(this, 2, "next_record[%d]: direct switch to input %d\n",
output, target->index);
ready_priority_.erase(target);
index = target->index;
} else {
// TODO i#5843: If the target is running on another output, we
// need to do a forced migration by setting a flag to force a
// preempt and presumably waiting (STATUS_WAIT or STATUS_IDLE?)
// here until the input is available.
// For now we print a message so we can notice when this
// happens, but we ignore the direct switch request.
fprintf(stderr,
"Direct switch target input #%d is running elsewhere and "
"forced migration is NYI\n",
target->index);
}
}
if (index != INVALID_INPUT_ORDINAL) {
// We found a direct switch target above.
} else if (ready_queue_empty()) {
if (prev_index == INVALID_INPUT_ORDINAL)
return sched_type_t::STATUS_EOF;
std::lock_guard<std::mutex> lock(*inputs_[prev_index].lock);
Expand Down Expand Up @@ -1979,7 +2009,20 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// XXX: We may prefer to stop before the return value marker for futex,
// or a kernel xfer marker, but our recorded format is on instr
// boundaries so we live with those being before the switch.
if (record_type_is_instr(record)) {
if (record_type_is_marker(record, marker_type, marker_value) &&
marker_type == TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH) {
memref_tid_t target_tid = marker_value;
auto it =
tid2input_.find(workload_tid_t(input->workload, target_tid));
if (it == tid2input_.end()) {
VPRINT(this, 1,
"Failed to find input for target switch thread %" PRId64
"\n",
target_tid);
} else {
input->switch_to_input = it->second;
}
} else if (record_type_is_instr(record)) {
// Assume it will block and we should switch to a different input.
need_new_input = true;
in_wait_state = true;
Expand Down
26 changes: 25 additions & 1 deletion clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -945,8 +945,9 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
bool order_by_timestamp = false;
// Global ready queue counter used to provide FIFO for same-priority inputs.
uint64_t queue_counter = 0;
// Used to switch on the insruction *after* a blocking syscall.
// Used to switch on the instruction *after* a blocking syscall.
bool processing_blocking_syscall = false;
input_ordinal_t switch_to_input = INVALID_INPUT_ORDINAL;
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
// Used to switch before we've read the next instruction.
bool switching_pre_instruction = false;
// Used for time-based quanta.
Expand Down Expand Up @@ -1282,6 +1283,29 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
flexible_queue_t<input_info_t *, InputTimestampComparator> ready_priority_;
// Global ready queue counter used to provide FIFO for same-priority inputs.
uint64_t ready_counter_ = 0;
// Map from workload,tid pair to input.
struct workload_tid_t {
workload_tid_t(int wl, memref_tid_t tid)
: workload(wl)
, tid(tid)
{
}
bool
operator==(const workload_tid_t &rhs) const
{
return workload == rhs.workload && tid == rhs.tid;
}
int workload;
memref_tid_t tid;
};
struct workload_tid_hash_t {
std::size_t
operator()(const workload_tid_t &wt) const
{
return std::hash<int>()(wt.workload) ^ std::hash<memref_tid_t>()(wt.tid);
}
};
std::unordered_map<workload_tid_t, input_ordinal_t, workload_tid_hash_t> tid2input_;
};

/** See #dynamorio::drmemtrace::scheduler_tmpl_t. */
Expand Down
95 changes: 95 additions & 0 deletions clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2964,6 +2964,100 @@ test_inactive()
#endif // HAS_ZIP
}

static void
test_direct_switch()
{
std::cerr << "\n----------------\nTesting direct switches\n";
static constexpr int NUM_OUTPUTS = 2;
static constexpr int QUANTUM_DURATION = 100; // Never reached.
static constexpr memref_tid_t TID_BASE = 100;
static constexpr memref_tid_t TID_A = TID_BASE + 0;
static constexpr memref_tid_t TID_B = TID_BASE + 1;
static constexpr memref_tid_t TID_C = TID_BASE + 2;
std::vector<trace_entry_t> refs_A = {
make_thread(TID_A),
make_pid(1),
make_timestamp(101),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_instr(/*pc=*/101),
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
make_instr(/*pc=*/102),
make_timestamp(102),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_marker(TRACE_MARKER_TYPE_SYSCALL, 999),
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0),
make_marker(TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH, TID_C),
make_timestamp(401),
make_instr(/*pc=*/401),
make_exit(TID_A),
};
std::vector<trace_entry_t> refs_B = {
make_thread(TID_B), make_pid(1),
make_timestamp(201), make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_instr(/*pc=*/201), make_instr(/*pc=*/202),
make_instr(/*pc=*/203), make_instr(/*pc=*/204),
make_instr(/*pc=*/205), make_instr(/*pc=*/206),
make_instr(/*pc=*/207), make_instr(/*pc=*/208),
make_instr(/*pc=*/209), make_instr(/*pc=*/210),
make_instr(/*pc=*/211), make_instr(/*pc=*/212),
make_instr(/*pc=*/213), make_instr(/*pc=*/214),
make_instr(/*pc=*/215), make_instr(/*pc=*/216),
make_instr(/*pc=*/217), make_instr(/*pc=*/218),
make_instr(/*pc=*/219), make_instr(/*pc=*/220),
make_exit(TID_B),
};
std::vector<trace_entry_t> refs_C = {
make_thread(TID_C),
make_pid(1),
make_timestamp(301),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_instr(/*pc=*/301),
make_instr(/*pc=*/302),
make_timestamp(102),
make_marker(TRACE_MARKER_TYPE_CPU_ID, 0),
make_marker(TRACE_MARKER_TYPE_SYSCALL, 999),
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0),
make_marker(TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH, TID_A),
make_timestamp(501),
make_instr(/*pc=*/501),
// Test a non-existent target: should be ignored, but not crash.
make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0),
make_marker(TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH, TID_BASE + 3),
make_exit(TID_C),
};
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_C)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_C);
// The lockstep treats markers like instructions so we have a long sequence
// of dots before we do the direct swap to C and back.
// TODO i#5843: Once we have i/o wait times we can make this more interesting
// by adding a D and having C be unschedule-able at switch time.
static const char *const CORE0_SCHED_STRING = "..AA........CC......A.C...";
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
static const char *const CORE1_SCHED_STRING = "..BBBBBBBBBBBBBBBBBBBB.";

std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/3);
sched_ops.quantum_duration = QUANTUM_DURATION;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
assert(sched_as_string[1] == CORE1_SCHED_STRING);
}

int
test_main(int argc, const char *argv[])
{
Expand Down Expand Up @@ -2996,6 +3090,7 @@ test_main(int argc, const char *argv[])
test_replay_as_traced();
test_replay_as_traced_i6107_workaround();
test_inactive();
test_direct_switch();

dr_standalone_exit();
return 0;
Expand Down
Loading