Skip to content

Commit

Permalink
i#6822 unscheduled: Add start-unscheduled support
Browse files Browse the repository at this point in the history
Adds support for threads starting out in an "unscheduled" state.  This
is accomplished by always reading ahead in each input and looking for
a TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE marker *before* the first
instruction.  Normally such a marker indicates the invocation of a
system call and is after the system call instruction; for
start-unscheduled threads it is present at the system call exit at the
start of the trace.

Changes the scheduler's virtual method process_next_initial_record()
to make the booleans on finding certain markers input-and-output
parameters and moves filetype marker handling and timestamp recording
into the function.  This also fixes a problem where an input's initial
next_timestamp was replaced with the 2nd timestamp if a subclass read
ahead.

The extra readahead causes complexities elsewhere which are addressed:
+ The reader caches the last cpuid to use for synthetic recores
  on skipping.
+ Generalizes the existing scheduler handling of readahead (the
  "recorded_in_schedule" field in input_info_t) to store a count of
  pre-read instructions, which will generally be either 0 or 1.  Adds
  a new internal interface get_instr_ordinal() to get the input
  reader's instruction ordinal minus the pre-read count.

Changes raw2trace's virtual function process_marker_additionally() to
process_marker() and moves all marker processing (including
timestamps, which are not markers in the raw format) there, to better
support subclasses inserting start-unscheduled markers and deciding
whether to insert new markers either before or after pre-existing
markers.

Adds a scheduler test for the new feature.

Issue: #6822
  • Loading branch information
derekbruening committed Jun 21, 2024
1 parent 0fb958e commit fd16a95
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 93 deletions.
9 changes: 8 additions & 1 deletion clients/drcachesim/reader/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ reader_t::process_input_entry()
if (first_timestamp_ == 0)
first_timestamp_ = last_timestamp_;
}
} else if (cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_VERSION)
} else if (cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_CPU_ID)
last_cpuid_ = cur_ref_.marker.marker_value;
else if (cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_VERSION)
version_ = cur_ref_.marker.marker_value;
else if (cur_ref_.marker.marker_type == TRACE_MARKER_TYPE_FILETYPE) {
filetype_ = cur_ref_.marker.marker_value;
Expand Down Expand Up @@ -464,6 +466,11 @@ reader_t::skip_instructions_with_timestamp(uint64_t stop_instruction_count)
timestamp.addr = static_cast<addr_t>(last_timestamp_);
}
trace_entry_t cpu = {};
if (last_cpuid_ != 0) {
cpu.type = TRACE_TYPE_MARKER;
cpu.size = TRACE_MARKER_TYPE_CPU_ID;
cpu.addr = static_cast<addr_t>(last_cpuid_);
}
trace_entry_t next_instr = {};
bool prev_was_record_ord = false;
bool found_real_timestamp = false;
Expand Down
1 change: 1 addition & 0 deletions clients/drcachesim/reader/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class reader_t : public std::iterator<std::input_iterator_tag, memref_t>,
uint64_t last_timestamp_ = 0;
uint64_t first_timestamp_ = 0;
trace_entry_t *input_entry_ = nullptr;
uint64_t last_cpuid_ = 0;
// Remember top-level headers for the memtrace_stream_t interface.
uint64_t version_ = 0;
uint64_t filetype_ = 0;
Expand Down
152 changes: 91 additions & 61 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1643,15 +1643,44 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_switch_sequences()
template <typename RecordType, typename ReaderType>
bool
scheduler_tmpl_t<RecordType, ReaderType>::process_next_initial_record(
input_info_t &input, RecordType record, bool found_filetype, bool found_timestamp)
input_info_t &input, RecordType record, bool &found_filetype, bool &found_timestamp)
{
// TODO i#6822: Always look ahead until the first instruction, looking
// for threads that start out with an exit from an UNSCHEDULE or DIRECT
// syscall so we can have them start out unscheduled: though we can't
// easily know whether there was a timeout unless we read way ahead past
// signal handlers until the syscall exits to look for -ETIMEDOUT.
// Should we have raw2trace look for that?
return !(found_filetype && found_timestamp);
// We want to identify threads that should start out unscheduled as
// we attached in the middle of an _UNSCHEDULE system call.
// That marker *before* any instruction indicates the initial
// exit from such a syscall (the markers anywhere else are added on
// entry to a syscall, after the syscall instruction fetch record).
trace_marker_type_t marker_type;
uintptr_t marker_value;
if (record_type_is_invalid(record)) // Sentinel on first call.
return true; // Keep reading.
if (!record_type_is_marker(record, marker_type, marker_value)) {
VPRINT(this, 3, "Stopping initial readahead at non-marker\n");
return false; // Stop reading.
}
uintptr_t timestamp;
if (marker_type == TRACE_MARKER_TYPE_FILETYPE) {
found_filetype = true;
VPRINT(this, 2, "Input %d filetype %zu\n", input.index, marker_value);
} else if (record_type_is_timestamp(record, timestamp)) {
if (!found_timestamp) {
// next_timestamp must be the first timestamp, even when we read ahead.
input.next_timestamp = timestamp;
found_timestamp = true;
} else {
// Stop at a 2nd timestamp to avoid interval count issues.
VPRINT(this, 3, "Stopping initial readahead at 2nd timestamp\n");
return false;
}
} else if (marker_type == TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE) {
if (options_.honor_direct_switches) {
input.unscheduled = true;
// Ignore this marker during regular processing.
input.skip_next_unscheduled = true;
}
return false; // Stop reading.
}
return true; // Keep reading.
}

template <typename RecordType, typename ReaderType>
Expand Down Expand Up @@ -1679,16 +1708,6 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_initial_input_content(
// the non-consuming queue loop vs the consuming and queue-pushback
// reader loop.
for (const auto &record : input.queue) {
trace_marker_type_t marker_type;
uintptr_t marker_value;
if (record_type_is_marker(record, marker_type, marker_value) &&
marker_type == TRACE_MARKER_TYPE_FILETYPE) {
found_filetype = true;
VPRINT(this, 2, "Input %zu filetype %zu from queue\n", i,
marker_value);
}
if (record_type_is_timestamp(record, input.next_timestamp))
found_timestamp = true;
if (!process_next_initial_record(input, record, found_filetype,
found_timestamp))
break;
Expand All @@ -1705,15 +1724,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_initial_input_content(
}
while (*input.reader != *input.reader_end) {
RecordType record = **input.reader;
if (record_type_is_instr(record)) {
++input.instrs_pre_read;
}
trace_marker_type_t marker_type;
uintptr_t marker_value;
if (record_type_is_marker(record, marker_type, marker_value) &&
marker_type == TRACE_MARKER_TYPE_FILETYPE) {
found_filetype = true;
VPRINT(this, 2, "Input %zu filetype %zu\n", i, marker_value);
}
if (record_type_is_timestamp(record, input.next_timestamp))
found_timestamp = true;
if (!process_next_initial_record(input, record, found_filetype,
found_timestamp))
break;
Expand Down Expand Up @@ -1941,7 +1956,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_input_record_ordinal(
if (index < 0)
return 0;
uint64_t ord = inputs_[index].reader->get_record_ordinal();
if (inputs_[index].reader->get_instruction_ordinal() == 0) {
if (get_instr_ordinal(inputs_[index]) == 0) {
// Account for get_initial_input_content() readahead for filetype/timestamp.
// If this gets any more complex, the scheduler stream should track its
// own counts for every input and just ignore the input stream's tracking.
Expand All @@ -1950,6 +1965,17 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_input_record_ordinal(
return ord;
}

template <typename RecordType, typename ReaderType>
uint64_t
scheduler_tmpl_t<RecordType, ReaderType>::get_instr_ordinal(input_info_t &input)
{
uint64_t reader_cur = input.reader->get_instruction_ordinal();
assert(reader_cur >= static_cast<uint64_t>(input.instrs_pre_read));
VPRINT(this, 5, "get_instr_ordinal: %" PRId64 " - %d\n", reader_cur,
input.instrs_pre_read);
return reader_cur - input.instrs_pre_read;
}

template <typename RecordType, typename ReaderType>
uint64_t
scheduler_tmpl_t<RecordType, ReaderType>::get_input_first_timestamp(
Expand All @@ -1961,7 +1987,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_input_first_timestamp(
if (index < 0)
return 0;
uint64_t res = inputs_[index].reader->get_first_timestamp();
if (inputs_[index].reader->get_instruction_ordinal() == 0 &&
if (get_instr_ordinal(inputs_[index]) == 0 &&
(!inputs_[index].queue.empty() || inputs_[index].cur_from_queue)) {
// Account for get_initial_input_content() readahead for filetype/timestamp.
res = 0;
Expand All @@ -1980,7 +2006,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_input_last_timestamp(
if (index < 0)
return 0;
uint64_t res = inputs_[index].reader->get_last_timestamp();
if (inputs_[index].reader->get_instruction_ordinal() == 0 &&
if (get_instr_ordinal(inputs_[index]) == 0 &&
(!inputs_[index].queue.empty() || inputs_[index].cur_from_queue)) {
// Account for get_initial_input_content() readahead for filetype/timestamp.
res = 0;
Expand All @@ -1993,7 +2019,8 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
output_ordinal_t output, RecordType &record, input_info_t &input)
{
uint64_t cur_instr = input.reader->get_instruction_ordinal();
uint64_t cur_instr = get_instr_ordinal(input);
uint64_t cur_reader_instr = input.reader->get_instruction_ordinal();
assert(input.cur_region >= 0 &&
input.cur_region < static_cast<int>(input.regions_of_interest.size()));
auto &cur_range = input.regions_of_interest[input.cur_region];
Expand Down Expand Up @@ -2047,8 +2074,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
if (input.in_cur_region && cur_instr >= cur_range.start_instruction - 1)
return sched_type_t::STATUS_OK;

VPRINT(this, 2, "skipping from %" PRIu64 " to %" PRIu64 " instrs for ROI\n",
cur_instr, cur_range.start_instruction);
VPRINT(this, 2,
"skipping from %" PRIu64 " to %" PRIu64 " instrs (%" PRIu64
" in reader) for ROI\n",
cur_instr, cur_range.start_instruction,
cur_range.start_instruction - cur_reader_instr - 1);
if (options_.schedule_record_ostream != nullptr) {
sched_type_t::stream_status_t status = close_schedule_segment(output, input);
if (status != sched_type_t::STATUS_OK)
Expand All @@ -2062,7 +2092,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
if (status != sched_type_t::STATUS_OK)
return status;
}
return skip_instructions(output, input, cur_range.start_instruction - cur_instr - 1);
if (cur_range.start_instruction < cur_reader_instr) {
// We do not support skipping without skipping over the pre-read: we would
// need to extract from the queue.
return sched_type_t::STATUS_INVALID;
}
return skip_instructions(output, input,
cur_range.start_instruction - cur_reader_instr - 1);
}

template <typename RecordType, typename ReaderType>
Expand Down Expand Up @@ -2101,6 +2137,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(output_ordinal_t out
!record_type_is_encoding(input.queue.front())));
clear_input_queue(input);
input.reader->skip_instructions(skip_amount);
VPRINT(this, 3, "skip_instructions: input=%d amount=%" PRIu64 "\n", input.index,
skip_amount);
if (input.instrs_pre_read > 0) {
// We do not support skipping without skipping over the pre-read: we would
// need to extract from the queue.
input.instrs_pre_read = 0;
}
if (*input.reader == *input.reader_end) {
mark_input_eof(input);
// Raise error because the input region is out of bounds, unless the max
Expand Down Expand Up @@ -2186,9 +2229,6 @@ scheduler_tmpl_t<RecordType, ReaderType>::record_schedule_segment(
outputs_[output].record.emplace_back(type, input, start_instruction, stop_instruction,
timestamp);
// The stop is typically updated later in close_schedule_segment().
if (type == schedule_record_t::DEFAULT) {
inputs_[input].recorded_in_schedule = true;
}
return sched_type_t::STATUS_OK;
}

Expand Down Expand Up @@ -2217,7 +2257,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::close_schedule_segment(output_ordinal_
outputs_[output].record.back().timestamp);
return sched_type_t::STATUS_OK;
}
uint64_t instr_ord = input.reader->get_instruction_ordinal();
uint64_t instr_ord = get_instr_ordinal(input);
if (input.at_eof || *input.reader == *input.reader_end) {
// The end is exclusive, so use the max int value.
instr_ord = std::numeric_limits<uint64_t>::max();
Expand Down Expand Up @@ -2499,16 +2539,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::set_cur_input(output_ordinal_t output,

inputs_[input].prev_time_in_quantum = outputs_[output].cur_time;
if (options_.schedule_record_ostream != nullptr) {
uint64_t instr_ord = inputs_[input].reader->get_instruction_ordinal();
if (!inputs_[input].recorded_in_schedule && instr_ord == 1) {
// Due to differing reader->init() vs initial set_cur_input() orderings
// we can have an initial value of 1 for non-initial input streams
// with few markers; we reset to 0 for such cases.
VPRINT(this, 3,
"set_cur_input: adjusting instr_ord from 1 to 0 for input=%d\n",
input);
instr_ord = 0;
}
uint64_t instr_ord = get_instr_ordinal(inputs_[input]);
VPRINT(this, 3, "set_cur_input: recording input=%d start=%" PRId64 "\n", input,
instr_ord);
sched_type_t::stream_status_t status =
Expand Down Expand Up @@ -2544,20 +2575,18 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
VPRINT(this, 5,
"pick_next_input_as_previously[%d]: next replay segment in=%d (@%" PRId64
") type=%d start=%" PRId64 " end=%" PRId64 "\n",
output, index, inputs_[index].reader->get_instruction_ordinal(), segment.type,
output, index, get_instr_ordinal(inputs_[index]), segment.type,
segment.value.start_instruction, segment.stop_instruction);
{
std::lock_guard<std::mutex> lock(*inputs_[index].lock);
if (inputs_[index].reader->get_instruction_ordinal() >
segment.value.start_instruction) {
if (get_instr_ordinal(inputs_[index]) > segment.value.start_instruction) {
VPRINT(this, 1,
"WARNING: next_record[%d]: input %d wants instr #%" PRId64
" but it is already at #%" PRId64 "\n",
output, index, segment.value.start_instruction,
inputs_[index].reader->get_instruction_ordinal());
get_instr_ordinal(inputs_[index]));
}
if (inputs_[index].reader->get_instruction_ordinal() <
segment.value.start_instruction &&
if (get_instr_ordinal(inputs_[index]) < segment.value.start_instruction &&
// Don't wait for an ROI that starts at the beginning.
segment.value.start_instruction > 1 &&
// The output may have begun in the wait state.
Expand Down Expand Up @@ -2628,14 +2657,14 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
return sched_type_t::STATUS_SKIPPED;
} else if (segment.type == schedule_record_t::SKIP) {
std::lock_guard<std::mutex> lock(*inputs_[index].lock);
uint64_t cur_instr = inputs_[index].reader->get_instruction_ordinal();
uint64_t cur_reader_instr = inputs_[index].reader->get_instruction_ordinal();
VPRINT(this, 2,
"next_record[%d]: skipping from %" PRId64 " to %" PRId64
" in %d for schedule\n",
output, cur_instr, segment.stop_instruction, index);
auto status =
skip_instructions(output, inputs_[index],
segment.stop_instruction - cur_instr - 1 /*exclusive*/);
output, cur_reader_instr, segment.stop_instruction, index);
auto status = skip_instructions(output, inputs_[index],
segment.stop_instruction - cur_reader_instr -
1 /*exclusive*/);
// Increment the region to get window id markers with ordinals.
inputs_[index].cur_region++;
if (status != sched_type_t::STATUS_SKIPPED)
Expand Down Expand Up @@ -2702,8 +2731,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
: 2,
"next_record[%d]: replay segment in=%d (@%" PRId64
") type=%d start=%" PRId64 " end=%" PRId64 "\n",
output, input,
inputs_[input].reader->get_instruction_ordinal(),
output, input, get_instr_ordinal(inputs_[input]),
segment.type, segment.value.start_instruction,
segment.stop_instruction);
}
Expand Down Expand Up @@ -3134,7 +3162,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
}
VPRINT(this, 5,
"next_record[%d]: candidate record from %d (@%" PRId64 "): ", output,
input->index, input->reader->get_instruction_ordinal());
input->index, get_instr_ordinal(*input));
if (input->instrs_pre_read > 0 && record_type_is_instr(record))
--input->instrs_pre_read;
VDO(this, 5, print_record(record););
bool need_new_input = false;
bool preempt = false;
Expand Down Expand Up @@ -3170,7 +3200,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// (as just mentioned, it is easier to have a seemingly-redundant entry
// to get into the trace reading loop and then do something like a skip
// from the start rather than adding logic into the setup code).
if (input->reader->get_instruction_ordinal() >= stop &&
if (get_instr_ordinal(*input) >= stop &&
(!input->cur_from_queue || (start == 0 && stop == 0))) {
VPRINT(this, 5,
"next_record[%d]: need new input: at end of segment in=%d "
Expand Down
9 changes: 7 additions & 2 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
bool at_eof = false;
uintptr_t next_timestamp = 0;
uint64_t instrs_in_quantum = 0;
bool recorded_in_schedule = false;
int instrs_pre_read = 0;
// This is a per-workload value, stored in each input for convenience.
uint64_t base_timestamp = 0;
// This equals 'options_.deps == DEPENDENCY_TIMESTAMPS', stored here for
Expand Down Expand Up @@ -1464,7 +1464,7 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
// the two bool parameters are what the return value should be based on.
virtual bool
process_next_initial_record(input_info_t &input, RecordType record,
bool found_filetype, bool found_timestamp);
bool &found_filetype, bool &found_timestamp);

// Opens up all the readers for each file in 'path' which may be a directory.
// Returns a map of the thread id of each file to its index in inputs_.
Expand Down Expand Up @@ -1691,6 +1691,11 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
uint64_t
get_input_record_ordinal(output_ordinal_t output);

// Returns the input instruction ordinal taking into account queued records.
// The caller must hold the input's lock.
uint64_t
get_instr_ordinal(input_info_t &input);

// Returns the first timestamp for the current input stream interface for the
// 'output_ordinal'-th output stream.
uint64_t
Expand Down
Loading

0 comments on commit fd16a95

Please sign in to comment.