Skip to content

Commit

Permalink
i#6938 sched migrate: Make replay record_index atomic
Browse files Browse the repository at this point in the history
PR #6985 removed the global sched_lock_ which used to synchronize
access to other output's record_index field, leaving a data
race. Since only the index and not the recorded contents need
synchronization, we use an atomic rather than a mutex to coordinate.

Tested by running an internal test under ThreadSanitizer where a race
is reported without this fix and no race with the fix.

Issue: #6938
  • Loading branch information
derekbruening committed Sep 20, 2024
1 parent 83eab3f commit 3048a01
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2999,14 +2999,14 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
// enforce finer-grained timing replay.
if (options_.deps == DEPENDENCY_TIMESTAMPS) {
for (int i = 0; i < static_cast<output_ordinal_t>(outputs_.size()); ++i) {
if (i != output &&
outputs_[i].record_index->load(std::memory_order_acquire) + 1 <
static_cast<int>(outputs_[i].record.size()) &&
segment.timestamp > outputs_[i]
.record[outputs_[i].record_index->load(
std::memory_order_acquire) +
1]
.timestamp) {
if (i == output)
continue;
// Do an atomic load once and use it to de-reference if it's not at the end.
// If the target advances to the end concurrently with us that's fine:
// an extra wait will just come back here and then continue.
int other_index = outputs_[i].record_index->load(std::memory_order_acquire);
if (other_index + 1 < static_cast<int>(outputs_[i].record.size()) &&
segment.timestamp > outputs_[i].record[other_index + 1].timestamp) {
VPRINT(this, 3,
"next_record[%d]: waiting because timestamp %" PRIu64
" is ahead of output %d\n",
Expand Down Expand Up @@ -3098,11 +3098,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
if (options_.mapping == MAP_AS_PREVIOUSLY) {
res = pick_next_input_as_previously(output, index);
VDO(this, 2, {
if (outputs_[output].record_index >= 0 &&
outputs_[output].record_index <
static_cast<int>(outputs_[output].record.size())) {
// Our own index is only modified by us so we can cache it here.
int record_index =
outputs_[output].record_index->load(std::memory_order_acquire);
if (record_index >= 0 &&
record_index < static_cast<int>(outputs_[output].record.size())) {
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
outputs_[output].record[record_index];
int input = segment.key.input;
VPRINT(this,
(res == sched_type_t::STATUS_IDLE ||
Expand Down Expand Up @@ -3621,7 +3623,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
if (options_.mapping == MAP_AS_PREVIOUSLY &&
outputs_[output].wait_start_time > 0) {
uint64_t duration = outputs_[output]
.record[outputs_[output].record_index]
.record[outputs_[output].record_index->load(
std::memory_order_acquire)]
.value.idle_duration;
uint64_t now = get_output_time(output);
if (now - outputs_[output].wait_start_time < duration) {
Expand Down Expand Up @@ -3731,21 +3734,22 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
// could also be put into output_info_t, promoting it to a class and
// subclassing it per mapping mode.
if (options_.mapping == MAP_AS_PREVIOUSLY) {
assert(outputs_[output].record_index >= 0);
if (outputs_[output].record_index >=
static_cast<int>(outputs_[output].record.size())) {
// Our own index is only modified by us so we can cache it here.
int record_index =
outputs_[output].record_index->load(std::memory_order_acquire);
assert(record_index >= 0);
if (record_index >= static_cast<int>(outputs_[output].record.size())) {
// We're on the last record.
VPRINT(this, 4, "next_record[%d]: on last record\n", output);
} else if (outputs_[output].record[outputs_[output].record_index].type ==
} else if (outputs_[output].record[record_index].type ==
schedule_record_t::SKIP) {
VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output);
need_new_input = true;
} else if (outputs_[output].record[outputs_[output].record_index].type ==
} else if (outputs_[output].record[record_index].type ==
schedule_record_t::SYNTHETIC_END) {
VPRINT(this, 5, "next_record[%d]: at synthetic end\n", output);
} else {
const schedule_record_t &segment =
outputs_[output].record[outputs_[output].record_index];
const schedule_record_t &segment = outputs_[output].record[record_index];
assert(segment.type == schedule_record_t::DEFAULT);
uint64_t start = segment.value.start_instruction;
uint64_t stop = segment.stop_instruction;
Expand Down

0 comments on commit 3048a01

Please sign in to comment.