Skip to content

Commit

Permalink
i#6938 sched migrate: Make replay record_index atomic (#6995)
Browse files Browse the repository at this point in the history
PR #6985 removed the global sched_lock_ which used to synchronize
access to other output's record_index field, leaving a data
race (affecting only the rough timing across outputs where an output
waits if gets ahead of other outputs' timestamps).
Since only the index and not the recorded contents need
synchronization, we use an atomic rather than a mutex to coordinate.

Tested by running an internal test under ThreadSanitizer where a race
is reported without this fix and no race with the fix.

Issue: #6938
  • Loading branch information
derekbruening authored Sep 21, 2024
1 parent 513f584 commit 409bc80
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 31 deletions.
65 changes: 36 additions & 29 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::read_and_instantiate_traced_schedule()
VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx);
set_cur_input(output_idx, INVALID_INPUT_ORDINAL);
outputs_[output_idx].waiting = true;
outputs_[output_idx].record_index = -1;
outputs_[output_idx].record_index->store(-1, std::memory_order_release);
} else {
VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx,
outputs_[output_idx].record[0].key.input);
Expand Down Expand Up @@ -2934,20 +2934,20 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
output_ordinal_t output, input_ordinal_t &index)
{
if (outputs_[output].record_index + 1 >=
static_cast<int>(outputs_[output].record.size())) {
// Our own index is only modified by us so we can cache it here.
int record_index = outputs_[output].record_index->load(std::memory_order_acquire);
if (record_index + 1 >= static_cast<int>(outputs_[output].record.size())) {
if (!outputs_[output].at_eof) {
outputs_[output].at_eof = true;
live_replay_output_count_.fetch_add(-1, std::memory_order_release);
}
return eof_or_idle(output, outputs_[output].cur_input);
}
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index + 1];
const schedule_record_t &segment = outputs_[output].record[record_index + 1];
if (segment.type == schedule_record_t::IDLE) {
outputs_[output].waiting = true;
outputs_[output].wait_start_time = get_output_time(output);
++outputs_[output].record_index;
outputs_[output].record_index->fetch_add(1, std::memory_order_release);
return sched_type_t::STATUS_IDLE;
}
index = segment.key.input;
Expand All @@ -2969,11 +2969,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
// Don't wait for an ROI that starts at the beginning.
segment.value.start_instruction > 1 &&
// The output may have begun in the wait state.
(outputs_[output].record_index == -1 ||
(record_index == -1 ||
// When we skip our separator+timestamp markers are at the
// prior instr ord so do not wait for that.
(outputs_[output].record[outputs_[output].record_index].type !=
schedule_record_t::SKIP &&
(outputs_[output].record[record_index].type != schedule_record_t::SKIP &&
// Don't wait if we're at the end and just need the end record.
segment.type != schedule_record_t::SYNTHETIC_END))) {
// Some other output stream has not advanced far enough, and we do
Expand All @@ -3000,11 +2999,15 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
// enforce finer-grained timing replay.
if (options_.deps == DEPENDENCY_TIMESTAMPS) {
for (int i = 0; i < static_cast<output_ordinal_t>(outputs_.size()); ++i) {
if (i != output &&
outputs_[i].record_index + 1 <
static_cast<int>(outputs_[i].record.size()) &&
segment.timestamp >
outputs_[i].record[outputs_[i].record_index + 1].timestamp) {
if (i == output)
continue;
// Do an atomic load once and use it to de-reference if it's not at the end.
// This is safe because if the target advances to the end concurrently it
// will only cause an extra wait that will just come back here and then
// continue.
int other_index = outputs_[i].record_index->load(std::memory_order_acquire);
if (other_index + 1 < static_cast<int>(outputs_[i].record.size()) &&
segment.timestamp > outputs_[i].record[other_index + 1].timestamp) {
VPRINT(this, 3,
"next_record[%d]: waiting because timestamp %" PRIu64
" is ahead of output %d\n",
Expand Down Expand Up @@ -3032,7 +3035,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
VPRINT(this, 2, "early end for input %d\n", index);
// We're done with this entry but we need the queued record to be read,
// so we do not move past the entry.
++outputs_[output].record_index;
outputs_[output].record_index->fetch_add(1, std::memory_order_release);
return sched_type_t::STATUS_SKIPPED;
} else if (segment.type == schedule_record_t::SKIP) {
std::lock_guard<mutex_dbg_owned> lock(*inputs_[index].lock);
Expand All @@ -3049,13 +3052,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
if (status != sched_type_t::STATUS_SKIPPED)
return sched_type_t::STATUS_INVALID;
// We're done with the skip so move to and past it.
outputs_[output].record_index += 2;
outputs_[output].record_index->fetch_add(2, std::memory_order_release);
return sched_type_t::STATUS_SKIPPED;
} else {
VPRINT(this, 2, "next_record[%d]: advancing to input %d instr #%" PRId64 "\n",
output, index, segment.value.start_instruction);
}
++outputs_[output].record_index;
outputs_[output].record_index->fetch_add(1, std::memory_order_release);
return sched_type_t::STATUS_OK;
}

Expand Down Expand Up @@ -3096,11 +3099,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
if (options_.mapping == MAP_AS_PREVIOUSLY) {
res = pick_next_input_as_previously(output, index);
VDO(this, 2, {
if (outputs_[output].record_index >= 0 &&
outputs_[output].record_index <
static_cast<int>(outputs_[output].record.size())) {
// Our own index is only modified by us so we can cache it here.
int record_index =
outputs_[output].record_index->load(std::memory_order_acquire);
if (record_index >= 0 &&
record_index < static_cast<int>(outputs_[output].record.size())) {
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
outputs_[output].record[record_index];
int input = segment.key.input;
VPRINT(this,
(res == sched_type_t::STATUS_IDLE ||
Expand Down Expand Up @@ -3619,7 +3624,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (options_.mapping == MAP_AS_PREVIOUSLY &&
outputs_[output].wait_start_time > 0) {
uint64_t duration = outputs_[output]
.record[outputs_[output].record_index]
.record[outputs_[output].record_index->load(
std::memory_order_acquire)]
.value.idle_duration;
uint64_t now = get_output_time(output);
if (now - outputs_[output].wait_start_time < duration) {
Expand Down Expand Up @@ -3729,21 +3735,22 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// could also be put into output_info_t, promoting it to a class and
// subclassing it per mapping mode.
if (options_.mapping == MAP_AS_PREVIOUSLY) {
assert(outputs_[output].record_index >= 0);
if (outputs_[output].record_index >=
static_cast<int>(outputs_[output].record.size())) {
// Our own index is only modified by us so we can cache it here.
int record_index =
outputs_[output].record_index->load(std::memory_order_acquire);
assert(record_index >= 0);
if (record_index >= static_cast<int>(outputs_[output].record.size())) {
// We're on the last record.
VPRINT(this, 4, "next_record[%d]: on last record\n", output);
} else if (outputs_[output].record[outputs_[output].record_index].type ==
} else if (outputs_[output].record[record_index].type ==
schedule_record_t::SKIP) {
VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output);
need_new_input = true;
} else if (outputs_[output].record[outputs_[output].record_index].type ==
} else if (outputs_[output].record[record_index].type ==
schedule_record_t::SYNTHETIC_END) {
VPRINT(this, 5, "next_record[%d]: at synthetic end\n", output);
} else {
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
const schedule_record_t &segment = outputs_[output].record[record_index];
assert(segment.type == schedule_record_t::DEFAULT);
uint64_t start = segment.value.start_instruction;
uint64_t stop = segment.stop_instruction;
Expand Down
11 changes: 9 additions & 2 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -1587,8 +1587,11 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {

// We have one output_info_t per output stream, and at most one worker
// thread owns one output, so most fields are accessed only by one thread.
// The exception is "ready_queue" which can be accessed by other threads;
// One exception is .ready_queue which can be accessed by other threads;
// it is protected using its internal lock.
// Another exception is .record, which is read-only after initialization.
// A few other fields are concurrently accessed and are of type std::atomic to allow
// that.
struct output_info_t {
output_info_t(scheduler_tmpl_t<RecordType, ReaderType> *scheduler,
output_ordinal_t ordinal,
Expand All @@ -1605,6 +1608,8 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
cur_time =
std::unique_ptr<std::atomic<uint64_t>>(new std::atomic<uint64_t>());
cur_time->store(0, std::memory_order_relaxed);
record_index = std::unique_ptr<std::atomic<int>>(new std::atomic<int>());
record_index->store(0, std::memory_order_relaxed);
}
stream_t self_stream;
// Normally stream points to &self_stream, but for single_lockstep_output
Expand Down Expand Up @@ -1634,7 +1639,9 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
// A list of schedule segments. During replay, this is read by other threads,
// but it is only written at init time.
std::vector<schedule_record_t> record;
int record_index = 0;
// This index into the .record vector is read by other threads and also written
// during execution, so it requires atomic accesses.
std::unique_ptr<std::atomic<int>> record_index;
bool waiting = false; // Waiting or idling.
// Used to limit stealing to one attempt per transition to idle.
bool tried_to_steal_on_idle = false;
Expand Down

0 comments on commit 409bc80

Please sign in to comment.