Skip to content

Commit

Permalink
i#5843 scheduler: Move timestamp ordering into scheduler (#5895)
Browse files Browse the repository at this point in the history
Implements timestamp ordering in scheduler_t rather than relying on the
old implementation inside file_reader_t.

Adds a sanity test.

Fixes a bug with only_threads and adds a simple test.

Removing the file_reader_t code, along with eliminating the
thread-as-sub-reader API routines, will be done as a separate
refactoring.

Issue: #5843
  • Loading branch information
derekbruening authored Mar 8, 2023
1 parent 04dfa77 commit 0a26f93
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 61 deletions.
215 changes: 158 additions & 57 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ scheduler_tmpl_t<memref_t, reader_t>::get_reader(const std::string &path, int ve
starts_with(fname, DRMEMTRACE_SERIAL_SCHEDULE_FILENAME) ||
fname == DRMEMTRACE_CPU_SCHEDULE_FILENAME)
continue;
// Skip the auxiliary files.
if (fname == DRMEMTRACE_MODULE_LIST_FILENAME ||
fname == DRMEMTRACE_FUNCTION_LIST_FILENAME ||
fname == DRMEMTRACE_ENCODING_FILENAME)
continue;
# ifdef HAS_SNAPPY
if (ends_with(*iter, ".sz")) {
return std::unique_ptr<reader_t>(
Expand Down Expand Up @@ -167,6 +172,18 @@ scheduler_tmpl_t<memref_t, reader_t>::record_type_is_marker(memref_t record,
return true;
}

template <>
bool
scheduler_tmpl_t<memref_t, reader_t>::record_type_is_timestamp(memref_t record,
uintptr_t &value)
{
if (record.marker.type != TRACE_TYPE_MARKER ||
record.marker.marker_type != TRACE_MARKER_TYPE_TIMESTAMP)
return false;
value = record.marker.marker_value;
return true;
}

template <>
memref_t
scheduler_tmpl_t<memref_t, reader_t>::create_region_separator_marker(memref_tid_t tid,
Expand Down Expand Up @@ -199,6 +216,10 @@ scheduler_tmpl_t<memref_t, reader_t>::print_record(const memref_t &record)
fprintf(stderr, "tid=%" PRId64 " type=%d", record.instr.tid, record.instr.type);
if (type_is_instr(record.instr.type))
fprintf(stderr, " pc=0x%zx size=%zu", record.instr.addr, record.instr.size);
else if (record.marker.type == TRACE_TYPE_MARKER) {
fprintf(stderr, " marker=0x%d val=%zu", record.marker.marker_type,
record.marker.marker_value);
}
fprintf(stderr, "\n");
}

Expand Down Expand Up @@ -256,6 +277,18 @@ scheduler_tmpl_t<trace_entry_t, record_reader_t>::record_type_is_marker(
return true;
}

template <>
bool
scheduler_tmpl_t<trace_entry_t, record_reader_t>::record_type_is_timestamp(
trace_entry_t record, uintptr_t &value)
{
if (record.type != TRACE_TYPE_MARKER ||
static_cast<trace_marker_type_t>(record.size) != TRACE_MARKER_TYPE_TIMESTAMP)
return false;
value = record.addr;
return true;
}

template <>
trace_entry_t
scheduler_tmpl_t<trace_entry_t, record_reader_t>::create_region_separator_marker(
Expand Down Expand Up @@ -382,7 +415,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
for (auto &reader : workload.readers) {
if (!reader.reader || !reader.end)
return STATUS_ERROR_INVALID_PARAMETER;
if (workload.only_threads.find(reader.tid) != workload.only_threads.end())
if (!workload.only_threads.empty() &&
workload.only_threads.find(reader.tid) == workload.only_threads.end())
continue;
int index = static_cast<int>(inputs_.size());
inputs_.emplace_back();
Expand Down Expand Up @@ -463,18 +497,72 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
// Initially we only support analyzer_t's serial mode.
if (options_.deps != DEPENDENCY_TIMESTAMPS)
return STATUS_ERROR_NOT_IMPLEMENTED;
// Currently file_reader_t is interleaving for us so we have just one input.
// TODO i#5843: Move interleaving logic to scheduler_t.
if (output_count != 1 || inputs_.size() != 1)
// TODO i#5843: Support more than one output.
if (output_count != 1)
return STATUS_ERROR_NOT_IMPLEMENTED;
outputs_[0].cur_input = 0;
if (inputs_.size() == 1) {
outputs_[0].cur_input = 0;
} else {
// The old file_reader_t interleaving would output the top headers for every
// thread first and then pick the oldest timestamp once it reached a
// timestamp. We instead queue those headers so we can start directly with the
// oldest timestamp's thread.
sched_type_t::scheduler_status_t res = get_initial_timestamps();
if (res != STATUS_SUCCESS)
return res;
}
} else {
// TODO i#5843: Implement scheduling onto synthetic cores.
return STATUS_ERROR_NOT_IMPLEMENTED;
// For now we only support a single output stream with no deps.
if (options_.deps != DEPENDENCY_IGNORE || output_count != 1)
return STATUS_ERROR_NOT_IMPLEMENTED;
outputs_[0].cur_input = 0;
}
return STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_tmpl_t<RecordType, ReaderType>::get_initial_timestamps()
{
// Read ahead in each input until we find a timestamp record.
// Queue up any skipped records to ensure we present them to the
// output stream(s).
uint64_t min_time = 0xffffffffffffffff;
for (size_t i = 0; i < inputs_.size(); i++) {
input_info_t &input = inputs_[i];
if (input.next_timestamp <= 0) {
for (const auto &record : input.queue) {
if (record_type_is_timestamp(record, input.next_timestamp))
break;
}
}
if (input.next_timestamp <= 0) {
if (input.needs_init) {
input.reader->init();
input.needs_init = false;
}
while (input.reader != input.reader_end) {
RecordType record = **input.reader;
if (record_type_is_timestamp(record, input.next_timestamp))
break;
input.queue.push_back(record);
++(*input.reader);
}
}
if (input.next_timestamp <= 0)
return STATUS_ERROR_INVALID_PARAMETER;
if (input.next_timestamp < min_time) {
min_time = input.next_timestamp;
// TODO i#5843: Support more than one input (already checked earlier).
outputs_[0].cur_input = static_cast<int>(i);
}
}
if (outputs_[0].cur_input >= 0)
return STATUS_SUCCESS;
return STATUS_ERROR_INVALID_PARAMETER;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_tmpl_t<RecordType, ReaderType>::open_reader(
Expand Down Expand Up @@ -502,14 +590,14 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_reader(
RecordType record = **reader;
if (record_type_has_tid(record, tid))
break;
input.queue.push(record);
input.queue.push_back(record);
++(*reader);
}
if (tid == INVALID_THREAD_ID) {
error_string_ = "Failed to read " + path;
return STATUS_ERROR_FILE_READ_FAILED;
}
if (only_threads.find(tid) != only_threads.end())
if (!only_threads.empty() && only_threads.find(tid) == only_threads.end())
return sched_type_t::STATUS_SUCCESS;
VPRINT(this, 2, "Opened reader for tid %" PRId64 " %s\n", tid, path.c_str());
input.tid = tid;
Expand All @@ -525,29 +613,6 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_readers(
const std::string &path, const std::set<memref_tid_t> &only_threads,
std::unordered_map<memref_tid_t, int> &workload_tids)
{
if (options_.mapping == MAP_TO_RECORDED_OUTPUT &&
options_.deps == DEPENDENCY_TIMESTAMPS) {
// TODO i#5843: Move the interleaving-by-timestamp code from
// file_reader_t into this scheduler. For now we leverage the
// file_reader_t code and use a sentinel tid (per-thread scheduling
// modifiers are ignored so we would only need tids for error reporting;
// we'll get those once we move the code).
std::unique_ptr<ReaderType> reader = get_reader(path, verbosity_);
std::unique_ptr<ReaderType> reader_end = get_default_reader();
if (!reader || !reader_end || !reader->init()) {
error_string_ += "Failed to open " + path;
return STATUS_ERROR_FILE_OPEN_FAILED;
}
inputs_.emplace_back();
input_info_t &input = inputs_.back();
input.index = 0;
input.tid = INVALID_THREAD_ID; // See above.
input.reader = std::move(reader);
input.reader_end = std::move(reader_end);
workload_tids[input.tid] = input.index;
return sched_type_t::STATUS_SUCCESS;
}

if (!directory_iterator_t::is_directory(path))
return open_reader(path, only_threads, workload_tids);
directory_iterator_t end;
Expand All @@ -562,6 +627,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_readers(
starts_with(fname, DRMEMTRACE_SERIAL_SCHEDULE_FILENAME) ||
fname == DRMEMTRACE_CPU_SCHEDULE_FILENAME)
continue;
// Skip the auxiliary files.
if (fname == DRMEMTRACE_MODULE_LIST_FILENAME ||
fname == DRMEMTRACE_FUNCTION_LIST_FILENAME ||
fname == DRMEMTRACE_ENCODING_FILENAME)
continue;
const std::string file = path + DIRSEP + fname;
sched_type_t::scheduler_status_t res =
open_reader(file, only_threads, workload_tids);
Expand Down Expand Up @@ -615,7 +685,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(int output_
return sched_type_t::STATUS_EOF;
else {
// We let the user know we're done.
input.queue.push(create_thread_exit(input.tid));
input.queue.push_back(create_thread_exit(input.tid));
input.at_eof = true;
return sched_type_t::STATUS_SKIPPED;
}
Expand All @@ -632,7 +702,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(int output_
// instr ordinal would include them and so be incorrect) and that we should
// thus skip it all.
while (!input.queue.empty())
input.queue.pop();
input.queue.pop_front();
uint64_t input_start_ref = input.reader->get_record_ordinal();
uint64_t input_start_instr = input.reader->get_instruction_ordinal();
input.reader->skip_instructions(cur_range.start_instruction - cur_instr);
Expand Down Expand Up @@ -677,7 +747,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(int output_
// we need to update the view tool to handle a window marker as the very
// first entry).
if (input.cur_region > 0) {
input.queue.push(create_region_separator_marker(input.tid, input.cur_region));
input.queue.push_back(
create_region_separator_marker(input.tid, input.cur_region));
}
return sched_type_t::STATUS_SKIPPED;
}
Expand All @@ -686,33 +757,48 @@ template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(int output_ordinal)
{
if (options_.mapping == MAP_TO_RECORDED_OUTPUT) {
// TODO i#5843: For now we have a single input; we'll need to change this
// once we move the file_reader_t interleaving here.
return sched_type_t::STATUS_EOF;
}
if (options_.mapping != MAP_TO_CONSISTENT_OUTPUT) {
if (options_.mapping == MAP_TO_ANY_OUTPUT) {
// TODO i#5843: Implement synthetic scheduling with instr/time quanta.
// These will require locks and central coordination.
return sched_type_t::STATUS_NOT_IMPLEMENTED;
// For now we only support a single output.
return sched_type_t::STATUS_EOF;
}
int index = outputs_[output_ordinal].cur_input;
while (true) {
if (index < 0) {
// We're done with the prior thread; take the next one that was
// pre-allocated to this output (pre-allocated to avoid locks). Invariant:
// the same output_ordinal will not be accessed by two different threads
// simultaneously in this mode, allowing us to support a lock-free
// parallel-friendly increment here.
int indices_index = ++outputs_[output_ordinal].input_indices_index;
if (indices_index >=
static_cast<int>(outputs_[output_ordinal].input_indices.size())) {
VPRINT(this, 2, "next_record[%d]: all at eof\n", output_ordinal);
return sched_type_t::STATUS_EOF;
}
index = outputs_[output_ordinal].input_indices[indices_index];
VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n",
output_ordinal, indices_index, index);
if (options_.deps == DEPENDENCY_TIMESTAMPS) {
uint64_t min_time = 0xffffffffffffffff;
for (size_t i = 0; i < inputs_.size(); i++) {
if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 &&
inputs_[i].next_timestamp < min_time) {
min_time = inputs_[i].next_timestamp;
index = static_cast<int>(i);
}
}
if (index < 0)
return sched_type_t::STATUS_EOF;
VPRINT(this, 2,
"next_record[%d]: advancing to timestamp %" PRIu64
" == input #%d\n",
output_ordinal, min_time, index);
} else if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT) {
// We're done with the prior thread; take the next one that was
// pre-allocated to this output (pre-allocated to avoid locks). Invariant:
// the same output_ordinal will not be accessed by two different threads
// simultaneously in this mode, allowing us to support a lock-free
// parallel-friendly increment here.
int indices_index = ++outputs_[output_ordinal].input_indices_index;
if (indices_index >=
static_cast<int>(outputs_[output_ordinal].input_indices.size())) {
VPRINT(this, 2, "next_record[%d]: all at eof\n", output_ordinal);
return sched_type_t::STATUS_EOF;
}
index = outputs_[output_ordinal].input_indices[indices_index];
VPRINT(this, 2,
"next_record[%d]: advancing to local index %d == input #%d\n",
output_ordinal, indices_index, index);
} else
return sched_type_t::STATUS_INVALID;
// reader_t::at_eof_ is true until init() is called.
if (inputs_[index].needs_init) {
inputs_[index].reader->init();
Expand Down Expand Up @@ -741,7 +827,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(int output_ordinal,
input_info_t *&input)
{
if (outputs_[output_ordinal].cur_input < 0) {
// This happens with more outputs than inputs for SCHEDULE_RUN_TO_COMPLETION.
// This happens with more outputs than inputs. For non-empty outputs we
// require cur_input to be set to >=0 during init().
return sched_type_t::STATUS_EOF;
}
input = &inputs_[outputs_[output_ordinal].cur_input];
Expand All @@ -758,7 +845,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(int output_ordinal,
}
if (!input->queue.empty()) {
record = input->queue.front();
input->queue.pop();
input->queue.pop_front();
} else {
// We again have a flag check because reader_t::init() does an initial ++
// and so we want to skip that on the first record but perform a ++ prior
Expand All @@ -780,6 +867,20 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(int output_ordinal,
record = **input->reader;
}
}
if (options_.deps == DEPENDENCY_TIMESTAMPS &&
record_type_is_timestamp(record, input->next_timestamp)) {
int cur_input = outputs_[output_ordinal].cur_input;
// Mark cur_input invalid to ensure pick_next_input() takes action.
outputs_[output_ordinal].cur_input = -1;
sched_type_t::stream_status_t res = pick_next_input(output_ordinal);
if (res != sched_type_t::STATUS_OK)
return res;
if (outputs_[output_ordinal].cur_input != cur_input) {
input->queue.push_back(record);
input = &inputs_[outputs_[output_ordinal].cur_input];
continue;
}
}
if (input->needs_roi && !input->regions_of_interest.empty()) {
sched_type_t::stream_status_t res =
advance_region_of_interest(output_ordinal, record, *input);
Expand Down
Loading

0 comments on commit 0a26f93

Please sign in to comment.