Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#5843 scheduler: Add simple interleaving #5928

Merged
merged 5 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 109 additions & 59 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
if (!workload.only_threads.empty() &&
workload.only_threads.find(reader.tid) == workload.only_threads.end())
continue;
int index = static_cast<int>(inputs_.size());
int index = static_cast<input_ordinal_t>(inputs_.size());
inputs_.emplace_back();
input_info_t &input = inputs_.back();
input.index = index;
Expand Down Expand Up @@ -499,10 +499,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT) {
// Assign the inputs up front to avoid locks once we're in parallel mode.
// We use a simple round-robin static assignment for now.
for (int i = 0; i < static_cast<int>(inputs_.size()); i++) {
for (int i = 0; i < static_cast<input_ordinal_t>(inputs_.size()); i++) {
size_t index = i % output_count;
if (outputs_[index].input_indices.empty())
outputs_[index].cur_input = i;
set_cur_input(static_cast<input_ordinal_t>(index), i);
outputs_[index].input_indices.push_back(i);
VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index);
}
Expand All @@ -515,7 +515,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
if (output_count != 1)
return STATUS_ERROR_NOT_IMPLEMENTED;
if (inputs_.size() == 1) {
outputs_[0].cur_input = 0;
set_cur_input(0, 0);
} else {
// The old file_reader_t interleaving would output the top headers for every
// thread first and then pick the oldest timestamp once it reached a
Expand All @@ -526,11 +526,25 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
return res;
}
} else {
// TODO i#5843: Implement scheduling onto synthetic cores.
// For now we only support a single output stream with no deps.
if (options_.deps != DEPENDENCY_IGNORE || output_count != 1)
// TODO i#5843: Honor deps when scheduling on synthetic cores.
if (options_.deps != DEPENDENCY_IGNORE)
return STATUS_ERROR_NOT_IMPLEMENTED;
outputs_[0].cur_input = 0;
// TODO i#5843: Implement time-based quanta.
if (options_.quantum_unit != QUANTUM_INSTRUCTIONS)
return STATUS_ERROR_NOT_IMPLEMENTED;
// Assign initial inputs.
// TODO i#5843: Once we support core bindings and priorities we'll want
// to consider that here.
for (int i = 0; i < static_cast<output_ordinal_t>(outputs_.size()); i++) {
if (i < static_cast<input_ordinal_t>(inputs_.size()))
set_cur_input(i, i);
else
set_cur_input(i, INVALID_INPUT_ORDINAL);
}
for (int i = static_cast<output_ordinal_t>(outputs_.size());
i < static_cast<input_ordinal_t>(inputs_.size()); i++) {
ready_.push(i);
}
}
return STATUS_SUCCESS;
}
Expand Down Expand Up @@ -568,8 +582,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_initial_timestamps()
return STATUS_ERROR_INVALID_PARAMETER;
if (input.next_timestamp < min_time) {
min_time = input.next_timestamp;
// TODO i#5843: Support more than one input (already checked earlier).
outputs_[0].cur_input = static_cast<int>(i);
// TODO i#5843: Support more than one output (already checked earlier).
set_cur_input(0, static_cast<input_ordinal_t>(i));
}
}
if (outputs_[0].cur_input >= 0)
Expand All @@ -590,7 +604,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_reader(
error_string_ += "Failed to open " + path;
return STATUS_ERROR_FILE_OPEN_FAILED;
}
int index = static_cast<int>(inputs_.size());
int index = static_cast<input_ordinal_t>(inputs_.size());
inputs_.emplace_back();
input_info_t &input = inputs_.back();
input.index = index;
Expand Down Expand Up @@ -659,48 +673,47 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_readers(

template <typename RecordType, typename ReaderType>
std::string
scheduler_tmpl_t<RecordType, ReaderType>::get_input_name(int output_ordinal)
scheduler_tmpl_t<RecordType, ReaderType>::get_input_name(output_ordinal_t output)
{
int index = outputs_[output_ordinal].cur_input;
int index = outputs_[output].cur_input;
if (index < 0)
return "";
return inputs_[index].reader->get_stream_name();
}

template <typename RecordType, typename ReaderType>
int
scheduler_tmpl_t<RecordType, ReaderType>::get_input_ordinal(int output_ordinal)
typename scheduler_tmpl_t<RecordType, ReaderType>::input_ordinal_t
scheduler_tmpl_t<RecordType, ReaderType>::get_input_ordinal(output_ordinal_t output)
{
return outputs_[output_ordinal].cur_input;
return outputs_[output].cur_input;
}

template <typename RecordType, typename ReaderType>
bool
scheduler_tmpl_t<RecordType, ReaderType>::is_record_synthetic(int output_ordinal)
scheduler_tmpl_t<RecordType, ReaderType>::is_record_synthetic(output_ordinal_t output)
{
int index = outputs_[output_ordinal].cur_input;
int index = outputs_[output].cur_input;
if (index < 0)
return false;
return inputs_[index].reader->is_record_synthetic();
}

template <typename RecordType, typename ReaderType>
memtrace_stream_t *
scheduler_tmpl_t<RecordType, ReaderType>::get_input_stream(int output_ordinal)
scheduler_tmpl_t<RecordType, ReaderType>::get_input_stream(output_ordinal_t output)
{
if (output_ordinal < 0 || output_ordinal >= static_cast<int>(outputs_.size()))
if (output < 0 || output >= static_cast<output_ordinal_t>(outputs_.size()))
return nullptr;
int index = outputs_[output_ordinal].cur_input;
int index = outputs_[output].cur_input;
if (index < 0)
return nullptr;
return inputs_[index].reader.get();
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(int output_ordinal,
RecordType &record,
input_info_t &input)
scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
output_ordinal_t output, RecordType &record, input_info_t &input)
{
uint64_t cur_instr = input.reader->get_instruction_ordinal();
assert(input.cur_region >= 0 &&
Expand Down Expand Up @@ -748,7 +761,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(int output_
return sched_type_t::STATUS_REGION_INVALID;
}
input.in_cur_region = true;
auto &stream = outputs_[output_ordinal].stream;
auto &stream = outputs_[output].stream;

// We've documented that an output stream's ordinals ignore skips in its input
// streams, so we do not need to remember the input's ordinals pre-skip and increase
Expand Down Expand Up @@ -776,19 +789,47 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(int output_
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(int output_ordinal)
void
scheduler_tmpl_t<RecordType, ReaderType>::set_cur_input(output_ordinal_t output,
input_ordinal_t input)
{
if (options_.mapping == MAP_TO_ANY_OUTPUT) {
// TODO i#5843: Implement synthetic scheduling with instr/time quanta.
// These will require locks and central coordination.
// For now we only support a single output.
return sched_type_t::STATUS_EOF;
}
int index = outputs_[output_ordinal].cur_input;
assert(output >= 0 && output < static_cast<output_ordinal_t>(outputs_.size()));
// 'input' might be INVALID_INPUT_ORDINAL.
assert(input < static_cast<input_ordinal_t>(inputs_.size()));
if (outputs_[output].cur_input >= 0)
ready_.push(outputs_[output].cur_input);
outputs_[output].cur_input = input;
if (input < 0)
return;
inputs_[input].instrs_in_quantum = 0;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t output)
{
bool need_lock = (options_.mapping == MAP_TO_ANY_OUTPUT);
auto scoped_lock = need_lock ? std::unique_lock<std::mutex>()
: std::unique_lock<std::mutex>(sched_lock_);
// We're only called when we should definitely swap so clear the current.
int prev_index = outputs_[output].cur_input;
input_ordinal_t index = INVALID_INPUT_ORDINAL;
set_cur_input(output, INVALID_INPUT_ORDINAL);
while (true) {
if (index < 0) {
if (options_.deps == DEPENDENCY_TIMESTAMPS) {
if (options_.mapping == MAP_TO_ANY_OUTPUT) {
if (ready_.empty()) {
if (inputs_[prev_index].at_eof)
return sched_type_t::STATUS_EOF;
else
index = prev_index; // Go back to prior.
} else {
// TODO i#5843: Add core binding and priority support.
index = ready_.front();
ready_.pop();
}
} else if (options_.deps == DEPENDENCY_TIMESTAMPS) {
// TODO i#5843: This should require a lock for >1 outputs too.
uint64_t min_time = 0xffffffffffffffff;
for (size_t i = 0; i < inputs_.size(); i++) {
if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 &&
Expand All @@ -802,23 +843,23 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(int output_ordinal)
VPRINT(this, 2,
"next_record[%d]: advancing to timestamp %" PRIu64
" == input #%d\n",
output_ordinal, min_time, index);
output, min_time, index);
} else if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT) {
// We're done with the prior thread; take the next one that was
// pre-allocated to this output (pre-allocated to avoid locks). Invariant:
// the same output_ordinal will not be accessed by two different threads
// the same output will not be accessed by two different threads
// simultaneously in this mode, allowing us to support a lock-free
// parallel-friendly increment here.
int indices_index = ++outputs_[output_ordinal].input_indices_index;
int indices_index = ++outputs_[output].input_indices_index;
if (indices_index >=
static_cast<int>(outputs_[output_ordinal].input_indices.size())) {
VPRINT(this, 2, "next_record[%d]: all at eof\n", output_ordinal);
static_cast<int>(outputs_[output].input_indices.size())) {
VPRINT(this, 2, "next_record[%d]: all at eof\n", output);
return sched_type_t::STATUS_EOF;
}
index = outputs_[output_ordinal].input_indices[indices_index];
index = outputs_[output].input_indices[indices_index];
VPRINT(this, 2,
"next_record[%d]: advancing to local index %d == input #%d\n",
output_ordinal, indices_index, index);
output, indices_index, index);
} else
return sched_type_t::STATUS_INVALID;
// reader_t::at_eof_ is true until init() is called.
Expand All @@ -830,30 +871,30 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(int output_ordinal)
if (inputs_[index].at_eof ||
*inputs_[index].reader == *inputs_[index].reader_end) {
VPRINT(this, 2, "next_record[%d]: local index %d == input #%d at eof\n",
output_ordinal, outputs_[output_ordinal].input_indices_index, index);
output, outputs_[output].input_indices_index, index);
inputs_[index].at_eof = true;
index = -1;
index = INVALID_INPUT_ORDINAL;
// Loop and pick next thread.
continue;
}
break;
}
outputs_[output_ordinal].cur_input = index;
set_cur_input(output, index);
return sched_type_t::STATUS_OK;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::next_record(int output_ordinal,
scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
RecordType &record,
input_info_t *&input)
{
if (outputs_[output_ordinal].cur_input < 0) {
if (outputs_[output].cur_input < 0) {
// This happens with more outputs than inputs. For non-empty outputs we
// require cur_input to be set to >=0 during init().
return sched_type_t::STATUS_EOF;
}
input = &inputs_[outputs_[output_ordinal].cur_input];
input = &inputs_[outputs_[output].cur_input];
while (true) {
if (input->needs_init) {
// We pay the cost of this conditional to support ipc_reader_t::init() which
Expand All @@ -880,32 +921,41 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(int output_ordinal,
input->needs_advance = true;
}
if (input->at_eof || *input->reader == *input->reader_end) {
sched_type_t::stream_status_t res = pick_next_input(output_ordinal);
sched_type_t::stream_status_t res = pick_next_input(output);
if (res != sched_type_t::STATUS_OK)
return res;
input = &inputs_[outputs_[output_ordinal].cur_input];
input = &inputs_[outputs_[output].cur_input];
continue;
} else {
record = **input->reader;
}
}
bool need_new_input = false;
if (options_.mapping == MAP_TO_ANY_OUTPUT &&
options_.quantum_unit == QUANTUM_INSTRUCTIONS &&
record_type_is_instr(record)) {
// TODO i#5843: We also want to swap on blocking syscalls.
++input->instrs_in_quantum;
if (input->instrs_in_quantum > options_.quantum_duration)
need_new_input = true;
}
if (options_.deps == DEPENDENCY_TIMESTAMPS &&
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
record_type_is_timestamp(record, input->next_timestamp)) {
int cur_input = outputs_[output_ordinal].cur_input;
// Mark cur_input invalid to ensure pick_next_input() takes action.
outputs_[output_ordinal].cur_input = -1;
sched_type_t::stream_status_t res = pick_next_input(output_ordinal);
record_type_is_timestamp(record, input->next_timestamp))
need_new_input = true;
if (need_new_input) {
int cur_input = outputs_[output].cur_input;
sched_type_t::stream_status_t res = pick_next_input(output);
if (res != sched_type_t::STATUS_OK)
return res;
if (outputs_[output_ordinal].cur_input != cur_input) {
if (outputs_[output].cur_input != cur_input) {
input->queue.push_back(record);
input = &inputs_[outputs_[output_ordinal].cur_input];
input = &inputs_[outputs_[output].cur_input];
continue;
}
}
if (input->needs_roi && !input->regions_of_interest.empty()) {
sched_type_t::stream_status_t res =
advance_region_of_interest(output_ordinal, record, *input);
advance_region_of_interest(output, record, *input);
if (res == sched_type_t::STATUS_SKIPPED) {
// We need either the queue or to re-de-ref the reader so we loop,
// but we do not want to come back here.
Expand All @@ -919,7 +969,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(int output_ordinal,
}
break;
}
VPRINT(this, 4, "next_record[%d]: from %d: ", output_ordinal, input->index);
VPRINT(this, 4, "next_record[%d]: from %d: ", output, input->index);
VDO(this, 4, print_record(record););

return sched_type_t::STATUS_OK;
Expand Down
Loading