diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index f4ac0c63701..b59fb49f01d 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -1368,7 +1368,7 @@ scheduler_tmpl_t::read_and_instantiate_traced_schedule() VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx); set_cur_input(output_idx, INVALID_INPUT_ORDINAL); outputs_[output_idx].waiting = true; - outputs_[output_idx].record_index = -1; + outputs_[output_idx].record_index->store(-1, std::memory_order_release); } else { VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx, outputs_[output_idx].record[0].key.input); @@ -2934,20 +2934,20 @@ typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::pick_next_input_as_previously( output_ordinal_t output, input_ordinal_t &index) { - if (outputs_[output].record_index + 1 >= - static_cast(outputs_[output].record.size())) { + // Our own index is only modified by us so we can cache it here. + int record_index = outputs_[output].record_index->load(std::memory_order_acquire); + if (record_index + 1 >= static_cast(outputs_[output].record.size())) { if (!outputs_[output].at_eof) { outputs_[output].at_eof = true; live_replay_output_count_.fetch_add(-1, std::memory_order_release); } return eof_or_idle(output, outputs_[output].cur_input); } - const schedule_record_t &segment = - outputs_[output].record[outputs_[output].record_index + 1]; + const schedule_record_t &segment = outputs_[output].record[record_index + 1]; if (segment.type == schedule_record_t::IDLE) { outputs_[output].waiting = true; outputs_[output].wait_start_time = get_output_time(output); - ++outputs_[output].record_index; + outputs_[output].record_index->fetch_add(1, std::memory_order_release); return sched_type_t::STATUS_IDLE; } index = segment.key.input; @@ -2969,11 +2969,10 @@ scheduler_tmpl_t::pick_next_input_as_previously( // Don't wait for an ROI that starts at the beginning. segment.value.start_instruction > 1 && // The output may have begun in the wait state. - (outputs_[output].record_index == -1 || + (record_index == -1 || // When we skip our separator+timestamp markers are at the // prior instr ord so do not wait for that. - (outputs_[output].record[outputs_[output].record_index].type != - schedule_record_t::SKIP && + (outputs_[output].record[record_index].type != schedule_record_t::SKIP && // Don't wait if we're at the end and just need the end record. segment.type != schedule_record_t::SYNTHETIC_END))) { // Some other output stream has not advanced far enough, and we do @@ -3000,11 +2999,15 @@ scheduler_tmpl_t::pick_next_input_as_previously( // enforce finer-grained timing replay. if (options_.deps == DEPENDENCY_TIMESTAMPS) { for (int i = 0; i < static_cast(outputs_.size()); ++i) { - if (i != output && - outputs_[i].record_index + 1 < - static_cast(outputs_[i].record.size()) && - segment.timestamp > - outputs_[i].record[outputs_[i].record_index + 1].timestamp) { + if (i == output) + continue; + // Do an atomic load once and use it to de-reference if it's not at the end. + // This is safe because if the target advances to the end concurrently it + // will only cause an extra wait that will just come back here and then + // continue. + int other_index = outputs_[i].record_index->load(std::memory_order_acquire); + if (other_index + 1 < static_cast(outputs_[i].record.size()) && + segment.timestamp > outputs_[i].record[other_index + 1].timestamp) { VPRINT(this, 3, "next_record[%d]: waiting because timestamp %" PRIu64 " is ahead of output %d\n", @@ -3032,7 +3035,7 @@ scheduler_tmpl_t::pick_next_input_as_previously( VPRINT(this, 2, "early end for input %d\n", index); // We're done with this entry but we need the queued record to be read, // so we do not move past the entry. - ++outputs_[output].record_index; + outputs_[output].record_index->fetch_add(1, std::memory_order_release); return sched_type_t::STATUS_SKIPPED; } else if (segment.type == schedule_record_t::SKIP) { std::lock_guard lock(*inputs_[index].lock); @@ -3049,13 +3052,13 @@ scheduler_tmpl_t::pick_next_input_as_previously( if (status != sched_type_t::STATUS_SKIPPED) return sched_type_t::STATUS_INVALID; // We're done with the skip so move to and past it. - outputs_[output].record_index += 2; + outputs_[output].record_index->fetch_add(2, std::memory_order_release); return sched_type_t::STATUS_SKIPPED; } else { VPRINT(this, 2, "next_record[%d]: advancing to input %d instr #%" PRId64 "\n", output, index, segment.value.start_instruction); } - ++outputs_[output].record_index; + outputs_[output].record_index->fetch_add(1, std::memory_order_release); return sched_type_t::STATUS_OK; } @@ -3096,11 +3099,13 @@ scheduler_tmpl_t::pick_next_input(output_ordinal_t outpu if (options_.mapping == MAP_AS_PREVIOUSLY) { res = pick_next_input_as_previously(output, index); VDO(this, 2, { - if (outputs_[output].record_index >= 0 && - outputs_[output].record_index < - static_cast(outputs_[output].record.size())) { + // Our own index is only modified by us so we can cache it here. + int record_index = + outputs_[output].record_index->load(std::memory_order_acquire); + if (record_index >= 0 && + record_index < static_cast(outputs_[output].record.size())) { const schedule_record_t &segment = - outputs_[output].record[outputs_[output].record_index]; + outputs_[output].record[record_index]; int input = segment.key.input; VPRINT(this, (res == sched_type_t::STATUS_IDLE || @@ -3619,7 +3624,8 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, if (options_.mapping == MAP_AS_PREVIOUSLY && outputs_[output].wait_start_time > 0) { uint64_t duration = outputs_[output] - .record[outputs_[output].record_index] + .record[outputs_[output].record_index->load( + std::memory_order_acquire)] .value.idle_duration; uint64_t now = get_output_time(output); if (now - outputs_[output].wait_start_time < duration) { @@ -3729,21 +3735,22 @@ scheduler_tmpl_t::next_record(output_ordinal_t output, // could also be put into output_info_t, promoting it to a class and // subclassing it per mapping mode. if (options_.mapping == MAP_AS_PREVIOUSLY) { - assert(outputs_[output].record_index >= 0); - if (outputs_[output].record_index >= - static_cast(outputs_[output].record.size())) { + // Our own index is only modified by us so we can cache it here. + int record_index = + outputs_[output].record_index->load(std::memory_order_acquire); + assert(record_index >= 0); + if (record_index >= static_cast(outputs_[output].record.size())) { // We're on the last record. VPRINT(this, 4, "next_record[%d]: on last record\n", output); - } else if (outputs_[output].record[outputs_[output].record_index].type == + } else if (outputs_[output].record[record_index].type == schedule_record_t::SKIP) { VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output); need_new_input = true; - } else if (outputs_[output].record[outputs_[output].record_index].type == + } else if (outputs_[output].record[record_index].type == schedule_record_t::SYNTHETIC_END) { VPRINT(this, 5, "next_record[%d]: at synthetic end\n", output); } else { - const schedule_record_t &segment = - outputs_[output].record[outputs_[output].record_index]; + const schedule_record_t &segment = outputs_[output].record[record_index]; assert(segment.type == schedule_record_t::DEFAULT); uint64_t start = segment.value.start_instruction; uint64_t stop = segment.stop_instruction; diff --git a/clients/drcachesim/scheduler/scheduler.h b/clients/drcachesim/scheduler/scheduler.h index 2c6753897bf..ca2bd0c232d 100644 --- a/clients/drcachesim/scheduler/scheduler.h +++ b/clients/drcachesim/scheduler/scheduler.h @@ -1587,8 +1587,11 @@ template class scheduler_tmpl_t { // We have one output_info_t per output stream, and at most one worker // thread owns one output, so most fields are accessed only by one thread. - // The exception is "ready_queue" which can be accessed by other threads; + // One exception is .ready_queue which can be accessed by other threads; // it is protected using its internal lock. + // Another exception is .record, which is read-only after initialization. + // A few other fields are concurrently accessed and are of type std::atomic to allow + // that. struct output_info_t { output_info_t(scheduler_tmpl_t *scheduler, output_ordinal_t ordinal, @@ -1605,6 +1608,8 @@ template class scheduler_tmpl_t { cur_time = std::unique_ptr>(new std::atomic()); cur_time->store(0, std::memory_order_relaxed); + record_index = std::unique_ptr>(new std::atomic()); + record_index->store(0, std::memory_order_relaxed); } stream_t self_stream; // Normally stream points to &self_stream, but for single_lockstep_output @@ -1634,7 +1639,9 @@ template class scheduler_tmpl_t { // A list of schedule segments. During replay, this is read by other threads, // but it is only written at init time. std::vector record; - int record_index = 0; + // This index into the .record vector is read by other threads and also written + // during execution, so it requires atomic accesses. + std::unique_ptr> record_index; bool waiting = false; // Waiting or idling. // Used to limit stealing to one attempt per transition to idle. bool tried_to_steal_on_idle = false;