Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#5843 scheduler: Add simple interleaving #5928

Merged
merged 5 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 69 additions & 20 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
for (int i = 0; i < static_cast<int>(inputs_.size()); i++) {
size_t index = i % output_count;
if (outputs_[index].input_indices.empty())
outputs_[index].cur_input = i;
set_cur_input(static_cast<int>(index), i);
outputs_[index].input_indices.push_back(i);
VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index);
}
Expand All @@ -515,7 +515,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
if (output_count != 1)
return STATUS_ERROR_NOT_IMPLEMENTED;
if (inputs_.size() == 1) {
outputs_[0].cur_input = 0;
set_cur_input(0, 0);
} else {
// The old file_reader_t interleaving would output the top headers for every
// thread first and then pick the oldest timestamp once it reached a
Expand All @@ -526,11 +526,25 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
return res;
}
} else {
// TODO i#5843: Implement scheduling onto synthetic cores.
// For now we only support a single output stream with no deps.
if (options_.deps != DEPENDENCY_IGNORE || output_count != 1)
// TODO i#5843: Honor deps when scheduling on synthetic cores.
if (options_.deps != DEPENDENCY_IGNORE)
return STATUS_ERROR_NOT_IMPLEMENTED;
outputs_[0].cur_input = 0;
// TODO i#5843: Implement time-based quanta.
if (options_.quantum_unit != QUANTUM_INSTRUCTIONS)
return STATUS_ERROR_NOT_IMPLEMENTED;
// Assign initial inputs.
// TODO i#5843: Once we support core bindings and priorities we'll want
// to consider that here.
for (int i = 0; i < static_cast<int>(outputs_.size()); i++) {
if (i < static_cast<int>(inputs_.size()))
set_cur_input(i, i);
else
set_cur_input(i, -1);
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
}
for (int i = static_cast<int>(outputs_.size());
i < static_cast<int>(inputs_.size()); i++) {
ready_.push(i);
}
}
return STATUS_SUCCESS;
}
Expand Down Expand Up @@ -568,8 +582,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_initial_timestamps()
return STATUS_ERROR_INVALID_PARAMETER;
if (input.next_timestamp < min_time) {
min_time = input.next_timestamp;
// TODO i#5843: Support more than one input (already checked earlier).
outputs_[0].cur_input = static_cast<int>(i);
// TODO i#5843: Support more than one output (already checked earlier).
set_cur_input(0, static_cast<int>(i));
}
}
if (outputs_[0].cur_input >= 0)
Expand Down Expand Up @@ -775,20 +789,46 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(int output_
return sched_type_t::STATUS_SKIPPED;
}

template <typename RecordType, typename ReaderType>
void
scheduler_tmpl_t<RecordType, ReaderType>::set_cur_input(int output_ordinal,
int input_ordinal)
{
assert(output_ordinal >= 0 && output_ordinal < static_cast<int>(outputs_.size()));
assert(input_ordinal < static_cast<int>(inputs_.size()));
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
if (outputs_[output_ordinal].cur_input >= 0)
ready_.push(outputs_[output_ordinal].cur_input);
outputs_[output_ordinal].cur_input = input_ordinal;
if (input_ordinal < 0)
return;
inputs_[input_ordinal].instrs_in_quantum = 0;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(int output_ordinal)
{
if (options_.mapping == MAP_TO_ANY_OUTPUT) {
// TODO i#5843: Implement synthetic scheduling with instr/time quanta.
// These will require locks and central coordination.
// For now we only support a single output.
return sched_type_t::STATUS_EOF;
}
int index = outputs_[output_ordinal].cur_input;
if (options_.mapping == MAP_TO_ANY_OUTPUT)
std::lock_guard<std::mutex> guard(sched_lock_);
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
// We're only called when we should definitely swap so clear the current.
int prev_index = outputs_[output_ordinal].cur_input;
int index = -1;
set_cur_input(output_ordinal, -1);
while (true) {
if (index < 0) {
if (options_.deps == DEPENDENCY_TIMESTAMPS) {
if (options_.mapping == MAP_TO_ANY_OUTPUT) {
if (ready_.empty()) {
if (inputs_[prev_index].at_eof)
return sched_type_t::STATUS_EOF;
else
index = prev_index; // Go back to prior.
} else {
// TODO i#5843: Add core binding and priority support.
index = ready_.front();
ready_.pop();
}
} else if (options_.deps == DEPENDENCY_TIMESTAMPS) {
// TODO i#5843: This should require a lock for >1 outputs too.
uint64_t min_time = 0xffffffffffffffff;
for (size_t i = 0; i < inputs_.size(); i++) {
if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 &&
Expand Down Expand Up @@ -838,7 +878,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(int output_ordinal)
}
break;
}
outputs_[output_ordinal].cur_input = index;
set_cur_input(output_ordinal, index);
return sched_type_t::STATUS_OK;
}

Expand Down Expand Up @@ -889,11 +929,20 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(int output_ordinal,
record = **input->reader;
}
}
bool need_new_input = false;
if (options_.mapping == MAP_TO_ANY_OUTPUT &&
options_.quantum_unit == QUANTUM_INSTRUCTIONS &&
record_type_is_instr(record)) {
// TODO i#5843: We also want to swap on blocking syscalls.
++input->instrs_in_quantum;
if (input->instrs_in_quantum > options_.quantum_duration)
need_new_input = true;
}
if (options_.deps == DEPENDENCY_TIMESTAMPS &&
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
record_type_is_timestamp(record, input->next_timestamp)) {
record_type_is_timestamp(record, input->next_timestamp))
need_new_input = true;
if (need_new_input) {
int cur_input = outputs_[output_ordinal].cur_input;
// Mark cur_input invalid to ensure pick_next_input() takes action.
outputs_[output_ordinal].cur_input = -1;
sched_type_t::stream_status_t res = pick_next_input(output_ordinal);
if (res != sched_type_t::STATUS_OK)
return res;
Expand Down
12 changes: 12 additions & 0 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

#include <assert.h>
#include <deque>
#include <mutex>
#include <queue>
#include <set>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -721,6 +723,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
bool needs_roi = true;
bool at_eof = false;
uintptr_t next_timestamp = 0;
uint64_t instrs_in_quantum = 0;
};

struct output_info_t {
Expand Down Expand Up @@ -771,6 +774,9 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
advance_region_of_interest(int output_ordinal, RecordType &record,
input_info_t &input);

void
set_cur_input(int output_ordinal, int input_ordinal);

// Finds the next input stream for the 'output_ordinal'-th output stream.
stream_status_t
pick_next_input(int output_ordinal);
Expand Down Expand Up @@ -830,6 +836,12 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
scheduler_options_t options_;
std::vector<input_info_t> inputs_;
std::vector<output_info_t> outputs_;
// We use a central lock for global scheduling. We assume the synchronization
// cost is outweighed by the simulator's overhead. This protects concurrent
// access to the inputs_, outputs_, and ready_ fields.
std::mutex sched_lock_;
// Input indices ready to be scheduled.
std::queue<int> ready_;
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
};

/** See #dynamorio::drmemtrace::scheduler_tmpl_t. */
Expand Down
67 changes: 66 additions & 1 deletion clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class mock_reader_t : public reader_t {
};

static trace_entry_t
make_instr(addr_t pc)
make_instr(addr_t pc, memref_tid_t tid = 1)
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
{
trace_entry_t entry;
entry.type = TRACE_TYPE_INSTR;
Expand Down Expand Up @@ -682,6 +682,70 @@ test_real_file_queries_and_filters(const char *testdir)
#endif
}

static void
test_synthetic()
{
std::cerr << "\n----------------\nTesting synthetic\n";
static constexpr int NUM_INPUTS = 7;
static constexpr int NUM_OUTPUTS = 2;
static constexpr int NUM_INSTRS = 9;
static constexpr memref_tid_t TID_BASE = 100;
std::vector<trace_entry_t> inputs[NUM_INPUTS];
std::vector<scheduler_t::input_workload_t> sched_inputs;
for (int i = 0; i < NUM_INPUTS; i++) {
memref_tid_t tid = TID_BASE + i;
inputs[i].push_back(make_thread(tid));
inputs[i].push_back(make_pid(1));
for (int j = 0; j < NUM_INSTRS; j++)
inputs[i].push_back(make_instr(42 + j * 4, tid));
inputs[i].push_back(make_exit(tid));
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), tid);
sched_inputs.emplace_back(std::move(readers));
}
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_IGNORE,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/4);
sched_ops.quantum_duration = 3;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
// Walk the outputs in lockstep for crude but deterministic concurrency.
std::vector<scheduler_t::stream_t *> outputs(NUM_OUTPUTS, nullptr);
std::vector<bool> eof(NUM_OUTPUTS, false);
for (int i = 0; i < NUM_OUTPUTS; i++)
outputs[i] = scheduler.get_stream(i);
int num_eof = 0;
// Record the threads, one char each.
std::vector<std::string> sched_as_string(NUM_OUTPUTS);
while (num_eof < NUM_OUTPUTS) {
for (int i = 0; i < NUM_OUTPUTS; i++) {
if (eof[i])
continue;
memref_t memref;
scheduler_t::stream_status_t status = outputs[i]->next_record(memref);
if (status == scheduler_t::STATUS_EOF) {
++num_eof;
eof[i] = true;
continue;
}
assert(status == scheduler_t::STATUS_OK);
sched_as_string[i] += 'A' + (memref.instr.tid - TID_BASE);
}
}
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
// Hardcoding here for the 2 outputs and 7 inputs.
// We expect 3 letter sequences (our quantum) alternating every-other as each
// core alternates; with an odd number the 2nd core finishes early.
assert(sched_as_string[0] == "AAACCCEEEGGGBBBDDDFFFAAAACCCCEEEEGGGG");
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
assert(sched_as_string[1] == "BBBDDDFFFAAACCCEEEGGGBBBBDDDDFFFF");
}

} // namespace

int
Expand All @@ -695,5 +759,6 @@ main(int argc, const char *argv[])
test_regions();
test_only_threads();
test_real_file_queries_and_filters(argv[1]);
test_synthetic();
return 0;
}