Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6938 sched migrate: Make replay record_index atomic #6995

Merged
merged 3 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_and_instantiate_traced_schedule()
VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx);
set_cur_input(output_idx, INVALID_INPUT_ORDINAL);
outputs_[output_idx].waiting = true;
outputs_[output_idx].record_index = -1;
outputs_[output_idx].record_index->store(-1, std::memory_order_release);
} else {
VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx,
outputs_[output_idx].record[0].key.input);
Expand Down Expand Up @@ -2934,20 +2934,20 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
output_ordinal_t output, input_ordinal_t &index)
{
if (outputs_[output].record_index + 1 >=
static_cast<int>(outputs_[output].record.size())) {
// Our own index is only modified by us so we can cache it here.
int record_index = outputs_[output].record_index->load(std::memory_order_acquire);
if (record_index + 1 >= static_cast<int>(outputs_[output].record.size())) {
if (!outputs_[output].at_eof) {
outputs_[output].at_eof = true;
live_replay_output_count_.fetch_add(-1, std::memory_order_release);
}
return eof_or_idle(output, outputs_[output].cur_input);
}
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index + 1];
const schedule_record_t &segment = outputs_[output].record[record_index + 1];
if (segment.type == schedule_record_t::IDLE) {
outputs_[output].waiting = true;
outputs_[output].wait_start_time = get_output_time(output);
++outputs_[output].record_index;
outputs_[output].record_index->fetch_add(1, std::memory_order_release);
return sched_type_t::STATUS_IDLE;
}
index = segment.key.input;
Expand All @@ -2969,11 +2969,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
// Don't wait for an ROI that starts at the beginning.
segment.value.start_instruction > 1 &&
// The output may have begun in the wait state.
(outputs_[output].record_index == -1 ||
(record_index == -1 ||
// When we skip our separator+timestamp markers are at the
// prior instr ord so do not wait for that.
(outputs_[output].record[outputs_[output].record_index].type !=
schedule_record_t::SKIP &&
(outputs_[output].record[record_index].type != schedule_record_t::SKIP &&
// Don't wait if we're at the end and just need the end record.
segment.type != schedule_record_t::SYNTHETIC_END))) {
// Some other output stream has not advanced far enough, and we do
Expand All @@ -3000,11 +2999,15 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
// enforce finer-grained timing replay.
if (options_.deps == DEPENDENCY_TIMESTAMPS) {
for (int i = 0; i < static_cast<output_ordinal_t>(outputs_.size()); ++i) {
if (i != output &&
outputs_[i].record_index + 1 <
static_cast<int>(outputs_[i].record.size()) &&
segment.timestamp >
outputs_[i].record[outputs_[i].record_index + 1].timestamp) {
if (i == output)
continue;
// Do an atomic load once and use it to de-reference if it's not at the end.
// This is safe because if the target advances to the end concurrently it
// will only cause an extra wait that will just come back here and then
// continue.
int other_index = outputs_[i].record_index->load(std::memory_order_acquire);
if (other_index + 1 < static_cast<int>(outputs_[i].record.size()) &&
segment.timestamp > outputs_[i].record[other_index + 1].timestamp) {
VPRINT(this, 3,
"next_record[%d]: waiting because timestamp %" PRIu64
" is ahead of output %d\n",
Expand Down Expand Up @@ -3032,7 +3035,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
VPRINT(this, 2, "early end for input %d\n", index);
// We're done with this entry but we need the queued record to be read,
// so we do not move past the entry.
++outputs_[output].record_index;
outputs_[output].record_index->fetch_add(1, std::memory_order_release);
return sched_type_t::STATUS_SKIPPED;
} else if (segment.type == schedule_record_t::SKIP) {
std::lock_guard<mutex_dbg_owned> lock(*inputs_[index].lock);
Expand All @@ -3049,13 +3052,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
if (status != sched_type_t::STATUS_SKIPPED)
return sched_type_t::STATUS_INVALID;
// We're done with the skip so move to and past it.
outputs_[output].record_index += 2;
outputs_[output].record_index->fetch_add(2, std::memory_order_release);
return sched_type_t::STATUS_SKIPPED;
} else {
VPRINT(this, 2, "next_record[%d]: advancing to input %d instr #%" PRId64 "\n",
output, index, segment.value.start_instruction);
}
++outputs_[output].record_index;
outputs_[output].record_index->fetch_add(1, std::memory_order_release);
return sched_type_t::STATUS_OK;
}

Expand Down Expand Up @@ -3096,11 +3099,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
if (options_.mapping == MAP_AS_PREVIOUSLY) {
res = pick_next_input_as_previously(output, index);
VDO(this, 2, {
if (outputs_[output].record_index >= 0 &&
outputs_[output].record_index <
static_cast<int>(outputs_[output].record.size())) {
// Our own index is only modified by us so we can cache it here.
int record_index =
outputs_[output].record_index->load(std::memory_order_acquire);
if (record_index >= 0 &&
record_index < static_cast<int>(outputs_[output].record.size())) {
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
outputs_[output].record[record_index];
int input = segment.key.input;
VPRINT(this,
(res == sched_type_t::STATUS_IDLE ||
Expand Down Expand Up @@ -3619,7 +3624,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (options_.mapping == MAP_AS_PREVIOUSLY &&
outputs_[output].wait_start_time > 0) {
uint64_t duration = outputs_[output]
.record[outputs_[output].record_index]
.record[outputs_[output].record_index->load(
std::memory_order_acquire)]
.value.idle_duration;
uint64_t now = get_output_time(output);
if (now - outputs_[output].wait_start_time < duration) {
Expand Down Expand Up @@ -3729,21 +3735,22 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// could also be put into output_info_t, promoting it to a class and
// subclassing it per mapping mode.
if (options_.mapping == MAP_AS_PREVIOUSLY) {
assert(outputs_[output].record_index >= 0);
if (outputs_[output].record_index >=
static_cast<int>(outputs_[output].record.size())) {
// Our own index is only modified by us so we can cache it here.
int record_index =
outputs_[output].record_index->load(std::memory_order_acquire);
assert(record_index >= 0);
if (record_index >= static_cast<int>(outputs_[output].record.size())) {
// We're on the last record.
VPRINT(this, 4, "next_record[%d]: on last record\n", output);
} else if (outputs_[output].record[outputs_[output].record_index].type ==
} else if (outputs_[output].record[record_index].type ==
schedule_record_t::SKIP) {
VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output);
need_new_input = true;
} else if (outputs_[output].record[outputs_[output].record_index].type ==
} else if (outputs_[output].record[record_index].type ==
schedule_record_t::SYNTHETIC_END) {
VPRINT(this, 5, "next_record[%d]: at synthetic end\n", output);
} else {
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
const schedule_record_t &segment = outputs_[output].record[record_index];
assert(segment.type == schedule_record_t::DEFAULT);
uint64_t start = segment.value.start_instruction;
uint64_t stop = segment.stop_instruction;
Expand Down
11 changes: 9 additions & 2 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -1587,8 +1587,11 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {

// We have one output_info_t per output stream, and at most one worker
// thread owns one output, so most fields are accessed only by one thread.
// The exception is "ready_queue" which can be accessed by other threads;
// One exception is .ready_queue which can be accessed by other threads;
// it is protected using its internal lock.
// Another exception is .record, which is read-only after initialization.
// A few other fields are concurrently accessed and are of type std::atomic to allow
// that.
struct output_info_t {
output_info_t(scheduler_tmpl_t<RecordType, ReaderType> *scheduler,
output_ordinal_t ordinal,
Expand All @@ -1605,6 +1608,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
cur_time =
std::unique_ptr<std::atomic<uint64_t>>(new std::atomic<uint64_t>());
cur_time->store(0, std::memory_order_relaxed);
record_index = std::unique_ptr<std::atomic<int>>(new std::atomic<int>());
record_index->store(0, std::memory_order_relaxed);
}
stream_t self_stream;
// Normally stream points to &self_stream, but for single_lockstep_output
Expand Down Expand Up @@ -1634,7 +1639,9 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
// A list of schedule segments. During replay, this is read by other threads,
// but it is only written at init time.
std::vector<schedule_record_t> record;
int record_index = 0;
// This index into the .record vector is read by other threads and also written
// during execution, so it requires atomic accesses.
std::unique_ptr<std::atomic<int>> record_index;
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
bool waiting = false; // Waiting or idling.
// Used to limit stealing to one attempt per transition to idle.
bool tried_to_steal_on_idle = false;
Expand Down
Loading