From 625d8a5f8da843240be2c05c867d5d2873aadf8a Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Thu, 23 Mar 2023 00:55:57 -0400 Subject: [PATCH 1/4] i#5843 scheduler: Add simple interleaving Adds initial support for MAP_TO_ANY_OUTPUT with multiple outputs. Uses a simple queue of ready-to-schedule inputs and implements an instruction-based scheduling quantum. Adds a test. Issue: #5843 --- clients/drcachesim/scheduler/scheduler.cpp | 89 ++++++++++++++----- clients/drcachesim/scheduler/scheduler.h | 12 +++ .../drcachesim/tests/scheduler_unit_tests.cpp | 67 +++++++++++++- 3 files changed, 147 insertions(+), 21 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index 31405ec1673..6685a928169 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -502,7 +502,7 @@ scheduler_tmpl_t::init( for (int i = 0; i < static_cast(inputs_.size()); i++) { size_t index = i % output_count; if (outputs_[index].input_indices.empty()) - outputs_[index].cur_input = i; + set_cur_input(index, i); outputs_[index].input_indices.push_back(i); VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index); } @@ -515,7 +515,7 @@ scheduler_tmpl_t::init( if (output_count != 1) return STATUS_ERROR_NOT_IMPLEMENTED; if (inputs_.size() == 1) { - outputs_[0].cur_input = 0; + set_cur_input(0, 0); } else { // The old file_reader_t interleaving would output the top headers for every // thread first and then pick the oldest timestamp once it reached a @@ -526,11 +526,25 @@ scheduler_tmpl_t::init( return res; } } else { - // TODO i#5843: Implement scheduling onto synthetic cores. - // For now we only support a single output stream with no deps. - if (options_.deps != DEPENDENCY_IGNORE || output_count != 1) + // TODO i#5843: Honor deps when scheduling on synthetic cores. + if (options_.deps != DEPENDENCY_IGNORE) return STATUS_ERROR_NOT_IMPLEMENTED; - outputs_[0].cur_input = 0; + // TODO i#5843: Implement time-based quanta. + if (options_.quantum_unit != QUANTUM_INSTRUCTIONS) + return STATUS_ERROR_NOT_IMPLEMENTED; + // Assign initial inputs. + // TODO i#5843: Once we support core bindings and priorities we'll want + // to consider that here. + for (int i = 0; i < static_cast(outputs_.size()); i++) { + if (i < static_cast(inputs_.size())) + set_cur_input(i, i); + else + set_cur_input(i, -1); + } + for (int i = static_cast(outputs_.size()); + i < static_cast(inputs_.size()); i++) { + ready_.push(i); + } } return STATUS_SUCCESS; } @@ -568,8 +582,8 @@ scheduler_tmpl_t::get_initial_timestamps() return STATUS_ERROR_INVALID_PARAMETER; if (input.next_timestamp < min_time) { min_time = input.next_timestamp; - // TODO i#5843: Support more than one input (already checked earlier). - outputs_[0].cur_input = static_cast(i); + // TODO i#5843: Support more than one output (already checked earlier). + set_cur_input(0, static_cast(i)); } } if (outputs_[0].cur_input >= 0) @@ -775,20 +789,46 @@ scheduler_tmpl_t::advance_region_of_interest(int output_ return sched_type_t::STATUS_SKIPPED; } +template +void +scheduler_tmpl_t::set_cur_input(int output_ordinal, + int input_ordinal) +{ + assert(output_ordinal >= 0 && output_ordinal < static_cast(outputs_.size())); + assert(input_ordinal < static_cast(inputs_.size())); + if (outputs_[output_ordinal].cur_input >= 0) + ready_.push(outputs_[output_ordinal].cur_input); + outputs_[output_ordinal].cur_input = input_ordinal; + if (input_ordinal < 0) + return; + inputs_[input_ordinal].instrs_in_quantum = 0; +} + template typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::pick_next_input(int output_ordinal) { - if (options_.mapping == MAP_TO_ANY_OUTPUT) { - // TODO i#5843: Implement synthetic scheduling with instr/time quanta. - // These will require locks and central coordination. - // For now we only support a single output. - return sched_type_t::STATUS_EOF; - } - int index = outputs_[output_ordinal].cur_input; + if (options_.mapping == MAP_TO_ANY_OUTPUT) + std::lock_guard guard(sched_lock_); + // We're only called when we should definitely swap so clear the current. + int prev_index = outputs_[output_ordinal].cur_input; + int index = -1; + set_cur_input(output_ordinal, -1); while (true) { if (index < 0) { - if (options_.deps == DEPENDENCY_TIMESTAMPS) { + if (options_.mapping == MAP_TO_ANY_OUTPUT) { + if (ready_.empty()) { + if (inputs_[prev_index].at_eof) + return sched_type_t::STATUS_EOF; + else + index = prev_index; // Go back to prior. + } else { + // TODO i#5843: Add core binding and priority support. + index = ready_.front(); + ready_.pop(); + } + } else if (options_.deps == DEPENDENCY_TIMESTAMPS) { + // TODO i#5843: This should require a lock for >1 outputs too. uint64_t min_time = 0xffffffffffffffff; for (size_t i = 0; i < inputs_.size(); i++) { if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 && @@ -838,7 +878,7 @@ scheduler_tmpl_t::pick_next_input(int output_ordinal) } break; } - outputs_[output_ordinal].cur_input = index; + set_cur_input(output_ordinal, index); return sched_type_t::STATUS_OK; } @@ -889,11 +929,20 @@ scheduler_tmpl_t::next_record(int output_ordinal, record = **input->reader; } } + bool need_new_input = false; + if (options_.mapping == MAP_TO_ANY_OUTPUT && + options_.quantum_unit == QUANTUM_INSTRUCTIONS && + record_type_is_instr(record)) { + // TODO i#5843: We also want to swap on blocking syscalls. + ++input->instrs_in_quantum; + if (input->instrs_in_quantum > options_.quantum_duration) + need_new_input = true; + } if (options_.deps == DEPENDENCY_TIMESTAMPS && - record_type_is_timestamp(record, input->next_timestamp)) { + record_type_is_timestamp(record, input->next_timestamp)) + need_new_input = true; + if (need_new_input) { int cur_input = outputs_[output_ordinal].cur_input; - // Mark cur_input invalid to ensure pick_next_input() takes action. - outputs_[output_ordinal].cur_input = -1; sched_type_t::stream_status_t res = pick_next_input(output_ordinal); if (res != sched_type_t::STATUS_OK) return res; diff --git a/clients/drcachesim/scheduler/scheduler.h b/clients/drcachesim/scheduler/scheduler.h index 9988a004801..18d6e1f924d 100644 --- a/clients/drcachesim/scheduler/scheduler.h +++ b/clients/drcachesim/scheduler/scheduler.h @@ -42,6 +42,8 @@ #include #include +#include +#include #include #include #include @@ -721,6 +723,7 @@ template class scheduler_tmpl_t { bool needs_roi = true; bool at_eof = false; uintptr_t next_timestamp = 0; + uint64_t instrs_in_quantum = 0; }; struct output_info_t { @@ -771,6 +774,9 @@ template class scheduler_tmpl_t { advance_region_of_interest(int output_ordinal, RecordType &record, input_info_t &input); + void + set_cur_input(int output_ordinal, int input_ordinal); + // Finds the next input stream for the 'output_ordinal'-th output stream. stream_status_t pick_next_input(int output_ordinal); @@ -830,6 +836,12 @@ template class scheduler_tmpl_t { scheduler_options_t options_; std::vector inputs_; std::vector outputs_; + // We use a central lock for global scheduling. We assume the synchronization + // cost is outweighed by the simulator's overhead. This protects concurrent + // access to the inputs_, outputs_, and ready_ fields. + std::mutex sched_lock_; + // Input indices ready to be scheduled. + std::queue ready_; }; /** See #dynamorio::drmemtrace::scheduler_tmpl_t. */ diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index f3baa6062cf..d1d4b44045c 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -82,7 +82,7 @@ class mock_reader_t : public reader_t { }; static trace_entry_t -make_instr(addr_t pc) +make_instr(addr_t pc, memref_tid_t tid = 1) { trace_entry_t entry; entry.type = TRACE_TYPE_INSTR; @@ -682,6 +682,70 @@ test_real_file_queries_and_filters(const char *testdir) #endif } +static void +test_synthetic() +{ + std::cerr << "\n----------------\nTesting synthetic\n"; + static constexpr int NUM_INPUTS = 7; + static constexpr int NUM_OUTPUTS = 2; + static constexpr int NUM_INSTRS = 9; + static constexpr memref_tid_t TID_BASE = 100; + std::vector inputs[NUM_INPUTS]; + std::vector sched_inputs; + for (int i = 0; i < NUM_INPUTS; i++) { + memref_tid_t tid = TID_BASE + i; + inputs[i].push_back(make_thread(tid)); + inputs[i].push_back(make_pid(1)); + for (int j = 0; j < NUM_INSTRS; j++) + inputs[i].push_back(make_instr(42 + j * 4, tid)); + inputs[i].push_back(make_exit(tid)); + std::vector readers; + readers.emplace_back(std::unique_ptr(new mock_reader_t(inputs[i])), + std::unique_ptr(new mock_reader_t()), tid); + sched_inputs.emplace_back(std::move(readers)); + } + scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT, + scheduler_t::DEPENDENCY_IGNORE, + scheduler_t::SCHEDULER_DEFAULTS, + /*verbosity=*/4); + sched_ops.quantum_duration = 3; + scheduler_t scheduler; + if (scheduler.init(sched_inputs, NUM_OUTPUTS, sched_ops) != + scheduler_t::STATUS_SUCCESS) + assert(false); + // Walk the outputs in lockstep for crude but deterministic concurrency. + std::vector outputs(NUM_OUTPUTS, nullptr); + std::vector eof(NUM_OUTPUTS, false); + for (int i = 0; i < NUM_OUTPUTS; i++) + outputs[i] = scheduler.get_stream(i); + int num_eof = 0; + // Record the threads, one char each. + std::vector sched_as_string(NUM_OUTPUTS); + while (num_eof < NUM_OUTPUTS) { + for (int i = 0; i < NUM_OUTPUTS; i++) { + if (eof[i]) + continue; + memref_t memref; + scheduler_t::stream_status_t status = outputs[i]->next_record(memref); + if (status == scheduler_t::STATUS_EOF) { + ++num_eof; + eof[i] = true; + continue; + } + assert(status == scheduler_t::STATUS_OK); + sched_as_string[i] += 'A' + (memref.instr.tid - TID_BASE); + } + } + for (int i = 0; i < NUM_OUTPUTS; i++) { + std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n"; + } + // Hardcoding here for the 2 outputs and 7 inputs. + // We expect 3 letter sequences (our quantum) alternating every-other as each + // core alternates; with an odd number the 2nd core finishes early. + assert(sched_as_string[0] == "AAACCCEEEGGGBBBDDDFFFAAAACCCCEEEEGGGG"); + assert(sched_as_string[1] == "BBBDDDFFFAAACCCEEEGGGBBBBDDDDFFFF"); +} + } // namespace int @@ -695,5 +759,6 @@ main(int argc, const char *argv[]) test_regions(); test_only_threads(); test_real_file_queries_and_filters(argv[1]); + test_synthetic(); return 0; } From d01f88f9a9822ec54f5d794429fd579b2075824b Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Fri, 24 Mar 2023 18:13:56 -0400 Subject: [PATCH 2/4] Fix Windows build warning --- clients/drcachesim/scheduler/scheduler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index 6685a928169..a17e7de9549 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -502,7 +502,7 @@ scheduler_tmpl_t::init( for (int i = 0; i < static_cast(inputs_.size()); i++) { size_t index = i % output_count; if (outputs_[index].input_indices.empty()) - set_cur_input(index, i); + set_cur_input(static_cast(index), i); outputs_[index].input_indices.push_back(i); VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index); } From b82f4393b81b64d3f6b44e9646c34867de6b52a9 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Tue, 28 Mar 2023 19:01:08 -0400 Subject: [PATCH 3/4] Add input_ordinal_t, output_ordinal_t, INVALID_INPUT_ORDINAL, and INVALID_OUTPUT_ORDINAL; this changes the signatures of some previously exported routines but the interfaces is in flux in any case --- clients/drcachesim/scheduler/scheduler.cpp | 117 ++++++++++----------- clients/drcachesim/scheduler/scheduler.h | 68 +++++++----- 2 files changed, 98 insertions(+), 87 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index a17e7de9549..09610565026 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -418,7 +418,7 @@ scheduler_tmpl_t::init( if (!workload.only_threads.empty() && workload.only_threads.find(reader.tid) == workload.only_threads.end()) continue; - int index = static_cast(inputs_.size()); + int index = static_cast(inputs_.size()); inputs_.emplace_back(); input_info_t &input = inputs_.back(); input.index = index; @@ -499,10 +499,10 @@ scheduler_tmpl_t::init( if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT) { // Assign the inputs up front to avoid locks once we're in parallel mode. // We use a simple round-robin static assignment for now. - for (int i = 0; i < static_cast(inputs_.size()); i++) { + for (int i = 0; i < static_cast(inputs_.size()); i++) { size_t index = i % output_count; if (outputs_[index].input_indices.empty()) - set_cur_input(static_cast(index), i); + set_cur_input(static_cast(index), i); outputs_[index].input_indices.push_back(i); VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index); } @@ -535,14 +535,14 @@ scheduler_tmpl_t::init( // Assign initial inputs. // TODO i#5843: Once we support core bindings and priorities we'll want // to consider that here. - for (int i = 0; i < static_cast(outputs_.size()); i++) { - if (i < static_cast(inputs_.size())) + for (int i = 0; i < static_cast(outputs_.size()); i++) { + if (i < static_cast(inputs_.size())) set_cur_input(i, i); else - set_cur_input(i, -1); + set_cur_input(i, INVALID_INPUT_ORDINAL); } - for (int i = static_cast(outputs_.size()); - i < static_cast(inputs_.size()); i++) { + for (int i = static_cast(outputs_.size()); + i < static_cast(inputs_.size()); i++) { ready_.push(i); } } @@ -583,7 +583,7 @@ scheduler_tmpl_t::get_initial_timestamps() if (input.next_timestamp < min_time) { min_time = input.next_timestamp; // TODO i#5843: Support more than one output (already checked earlier). - set_cur_input(0, static_cast(i)); + set_cur_input(0, static_cast(i)); } } if (outputs_[0].cur_input >= 0) @@ -604,7 +604,7 @@ scheduler_tmpl_t::open_reader( error_string_ += "Failed to open " + path; return STATUS_ERROR_FILE_OPEN_FAILED; } - int index = static_cast(inputs_.size()); + int index = static_cast(inputs_.size()); inputs_.emplace_back(); input_info_t &input = inputs_.back(); input.index = index; @@ -673,26 +673,26 @@ scheduler_tmpl_t::open_readers( template std::string -scheduler_tmpl_t::get_input_name(int output_ordinal) +scheduler_tmpl_t::get_input_name(output_ordinal_t output) { - int index = outputs_[output_ordinal].cur_input; + int index = outputs_[output].cur_input; if (index < 0) return ""; return inputs_[index].reader->get_stream_name(); } template -int -scheduler_tmpl_t::get_input_ordinal(int output_ordinal) +typename scheduler_tmpl_t::input_ordinal_t +scheduler_tmpl_t::get_input_ordinal(output_ordinal_t output) { - return outputs_[output_ordinal].cur_input; + return outputs_[output].cur_input; } template bool -scheduler_tmpl_t::is_record_synthetic(int output_ordinal) +scheduler_tmpl_t::is_record_synthetic(output_ordinal_t output) { - int index = outputs_[output_ordinal].cur_input; + int index = outputs_[output].cur_input; if (index < 0) return false; return inputs_[index].reader->is_record_synthetic(); @@ -700,11 +700,11 @@ scheduler_tmpl_t::is_record_synthetic(int output_ordinal template memtrace_stream_t * -scheduler_tmpl_t::get_input_stream(int output_ordinal) +scheduler_tmpl_t::get_input_stream(output_ordinal_t output) { - if (output_ordinal < 0 || output_ordinal >= static_cast(outputs_.size())) + if (output < 0 || output >= static_cast(outputs_.size())) return nullptr; - int index = outputs_[output_ordinal].cur_input; + int index = outputs_[output].cur_input; if (index < 0) return nullptr; return inputs_[index].reader.get(); @@ -712,9 +712,8 @@ scheduler_tmpl_t::get_input_stream(int output_ordinal) template typename scheduler_tmpl_t::stream_status_t -scheduler_tmpl_t::advance_region_of_interest(int output_ordinal, - RecordType &record, - input_info_t &input) +scheduler_tmpl_t::advance_region_of_interest( + output_ordinal_t output, RecordType &record, input_info_t &input) { uint64_t cur_instr = input.reader->get_instruction_ordinal(); assert(input.cur_region >= 0 && @@ -762,7 +761,7 @@ scheduler_tmpl_t::advance_region_of_interest(int output_ return sched_type_t::STATUS_REGION_INVALID; } input.in_cur_region = true; - auto &stream = outputs_[output_ordinal].stream; + auto &stream = outputs_[output].stream; // We've documented that an output stream's ordinals ignore skips in its input // streams, so we do not need to remember the input's ordinals pre-skip and increase @@ -791,29 +790,29 @@ scheduler_tmpl_t::advance_region_of_interest(int output_ template void -scheduler_tmpl_t::set_cur_input(int output_ordinal, - int input_ordinal) -{ - assert(output_ordinal >= 0 && output_ordinal < static_cast(outputs_.size())); - assert(input_ordinal < static_cast(inputs_.size())); - if (outputs_[output_ordinal].cur_input >= 0) - ready_.push(outputs_[output_ordinal].cur_input); - outputs_[output_ordinal].cur_input = input_ordinal; - if (input_ordinal < 0) +scheduler_tmpl_t::set_cur_input(output_ordinal_t output, + input_ordinal_t input) +{ + assert(output >= 0 && output < static_cast(outputs_.size())); + assert(input < static_cast(inputs_.size())); + if (outputs_[output].cur_input >= 0) + ready_.push(outputs_[output].cur_input); + outputs_[output].cur_input = input; + if (input < 0) return; - inputs_[input_ordinal].instrs_in_quantum = 0; + inputs_[input].instrs_in_quantum = 0; } template typename scheduler_tmpl_t::stream_status_t -scheduler_tmpl_t::pick_next_input(int output_ordinal) +scheduler_tmpl_t::pick_next_input(output_ordinal_t output) { if (options_.mapping == MAP_TO_ANY_OUTPUT) std::lock_guard guard(sched_lock_); // We're only called when we should definitely swap so clear the current. - int prev_index = outputs_[output_ordinal].cur_input; - int index = -1; - set_cur_input(output_ordinal, -1); + int prev_index = outputs_[output].cur_input; + input_ordinal_t index = INVALID_INPUT_ORDINAL; + set_cur_input(output, INVALID_INPUT_ORDINAL); while (true) { if (index < 0) { if (options_.mapping == MAP_TO_ANY_OUTPUT) { @@ -842,23 +841,23 @@ scheduler_tmpl_t::pick_next_input(int output_ordinal) VPRINT(this, 2, "next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n", - output_ordinal, min_time, index); + output, min_time, index); } else if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT) { // We're done with the prior thread; take the next one that was // pre-allocated to this output (pre-allocated to avoid locks). Invariant: - // the same output_ordinal will not be accessed by two different threads + // the same output will not be accessed by two different threads // simultaneously in this mode, allowing us to support a lock-free // parallel-friendly increment here. - int indices_index = ++outputs_[output_ordinal].input_indices_index; + int indices_index = ++outputs_[output].input_indices_index; if (indices_index >= - static_cast(outputs_[output_ordinal].input_indices.size())) { - VPRINT(this, 2, "next_record[%d]: all at eof\n", output_ordinal); + static_cast(outputs_[output].input_indices.size())) { + VPRINT(this, 2, "next_record[%d]: all at eof\n", output); return sched_type_t::STATUS_EOF; } - index = outputs_[output_ordinal].input_indices[indices_index]; + index = outputs_[output].input_indices[indices_index]; VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n", - output_ordinal, indices_index, index); + output, indices_index, index); } else return sched_type_t::STATUS_INVALID; // reader_t::at_eof_ is true until init() is called. @@ -870,30 +869,30 @@ scheduler_tmpl_t::pick_next_input(int output_ordinal) if (inputs_[index].at_eof || *inputs_[index].reader == *inputs_[index].reader_end) { VPRINT(this, 2, "next_record[%d]: local index %d == input #%d at eof\n", - output_ordinal, outputs_[output_ordinal].input_indices_index, index); + output, outputs_[output].input_indices_index, index); inputs_[index].at_eof = true; - index = -1; + index = INVALID_INPUT_ORDINAL; // Loop and pick next thread. continue; } break; } - set_cur_input(output_ordinal, index); + set_cur_input(output, index); return sched_type_t::STATUS_OK; } template typename scheduler_tmpl_t::stream_status_t -scheduler_tmpl_t::next_record(int output_ordinal, +scheduler_tmpl_t::next_record(output_ordinal_t output, RecordType &record, input_info_t *&input) { - if (outputs_[output_ordinal].cur_input < 0) { + if (outputs_[output].cur_input < 0) { // This happens with more outputs than inputs. For non-empty outputs we // require cur_input to be set to >=0 during init(). return sched_type_t::STATUS_EOF; } - input = &inputs_[outputs_[output_ordinal].cur_input]; + input = &inputs_[outputs_[output].cur_input]; while (true) { if (input->needs_init) { // We pay the cost of this conditional to support ipc_reader_t::init() which @@ -920,10 +919,10 @@ scheduler_tmpl_t::next_record(int output_ordinal, input->needs_advance = true; } if (input->at_eof || *input->reader == *input->reader_end) { - sched_type_t::stream_status_t res = pick_next_input(output_ordinal); + sched_type_t::stream_status_t res = pick_next_input(output); if (res != sched_type_t::STATUS_OK) return res; - input = &inputs_[outputs_[output_ordinal].cur_input]; + input = &inputs_[outputs_[output].cur_input]; continue; } else { record = **input->reader; @@ -942,19 +941,19 @@ scheduler_tmpl_t::next_record(int output_ordinal, record_type_is_timestamp(record, input->next_timestamp)) need_new_input = true; if (need_new_input) { - int cur_input = outputs_[output_ordinal].cur_input; - sched_type_t::stream_status_t res = pick_next_input(output_ordinal); + int cur_input = outputs_[output].cur_input; + sched_type_t::stream_status_t res = pick_next_input(output); if (res != sched_type_t::STATUS_OK) return res; - if (outputs_[output_ordinal].cur_input != cur_input) { + if (outputs_[output].cur_input != cur_input) { input->queue.push_back(record); - input = &inputs_[outputs_[output_ordinal].cur_input]; + input = &inputs_[outputs_[output].cur_input]; continue; } } if (input->needs_roi && !input->regions_of_interest.empty()) { sched_type_t::stream_status_t res = - advance_region_of_interest(output_ordinal, record, *input); + advance_region_of_interest(output, record, *input); if (res == sched_type_t::STATUS_SKIPPED) { // We need either the queue or to re-de-ref the reader so we loop, // but we do not want to come back here. @@ -968,7 +967,7 @@ scheduler_tmpl_t::next_record(int output_ordinal, } break; } - VPRINT(this, 4, "next_record[%d]: from %d: ", output_ordinal, input->index); + VPRINT(this, 4, "next_record[%d]: from %d: ", output, input->index); VDO(this, 4, print_record(record);); return sched_type_t::STATUS_OK; diff --git a/clients/drcachesim/scheduler/scheduler.h b/clients/drcachesim/scheduler/scheduler.h index 18d6e1f924d..61acf1850db 100644 --- a/clients/drcachesim/scheduler/scheduler.h +++ b/clients/drcachesim/scheduler/scheduler.h @@ -109,6 +109,18 @@ template class scheduler_tmpl_t { STATUS_SKIPPED, /**< Used for internal scheduler purposes. */ }; + /** Identifies an input stream by its index. */ + typedef int input_ordinal_t; + + /** Identifies an output stream by its index. */ + typedef int output_ordinal_t; + + /** Sentinel value indicating that no input stream is specified. */ + static constexpr input_ordinal_t INVALID_INPUT_ORDINAL = -1; + + /** Sentinel value indicating that no input stream is specified. */ + static constexpr output_ordinal_t INVALID_OUTPUT_ORDINAL = -1; + /** A bounded sequence of instructions. */ struct range_t { /** Convenience constructor. */ @@ -150,7 +162,7 @@ template class scheduler_tmpl_t { * Limits these threads to this set of output streams. They will not * be scheduled on any other output streams. */ - std::set output_binding; + std::set output_binding; /** * Relative priority for scheduling. The default is 0. */ @@ -543,7 +555,7 @@ template class scheduler_tmpl_t { /** * Returns the ordinal for the current input stream feeding this output stream. */ - int + input_ordinal_t get_input_stream_ordinal() { return scheduler_->get_input_ordinal(ordinal_); @@ -653,9 +665,9 @@ template class scheduler_tmpl_t { * Returns the 'ordinal'-th output stream. */ virtual stream_t * - get_stream(int ordinal) + get_stream(output_ordinal_t ordinal) { - if (ordinal < 0 || ordinal >= static_cast(outputs_.size())) + if (ordinal < 0 || ordinal >= static_cast(outputs_.size())) return nullptr; return &outputs_[ordinal].stream; } @@ -664,27 +676,27 @@ template class scheduler_tmpl_t { virtual int get_input_stream_count() { - return static_cast(inputs_.size()); + return static_cast(inputs_.size()); } /** Returns the #memtrace_stream_t interface for the 'ordinal'-th input stream. */ virtual memtrace_stream_t * - get_input_stream_interface(int input_ordinal) + get_input_stream_interface(input_ordinal_t input) { - if (input_ordinal < 0 || input_ordinal >= static_cast(inputs_.size())) + if (input < 0 || input >= static_cast(inputs_.size())) return nullptr; - return inputs_[input_ordinal].reader.get(); + return inputs_[input].reader.get(); } /** * Returns the name (from get_stream_name()) of the 'ordinal'-th input stream. */ virtual std::string - get_input_stream_name(int ordinal) + get_input_stream_name(input_ordinal_t input) { - if (ordinal < 0 || ordinal >= static_cast(inputs_.size())) + if (input < 0 || input >= static_cast(inputs_.size())) return nullptr; - return inputs_[ordinal].reader->get_stream_name(); + return inputs_[input].reader->get_stream_name(); } /** Returns a string further describing an error code. */ @@ -709,7 +721,7 @@ template class scheduler_tmpl_t { // This is used for read-ahead and inserting synthetic records. // We use a deque so we can iterate over it. std::deque queue; - std::set binding; + std::set binding; int priority = 0; std::vector regions_of_interest; // Index into regions_of_interest. @@ -727,18 +739,18 @@ template class scheduler_tmpl_t { }; struct output_info_t { - output_info_t(scheduler_tmpl_t *scheduler, int ordinal, - int verbosity = 0) + output_info_t(scheduler_tmpl_t *scheduler, + output_ordinal_t ordinal, int verbosity = 0) : stream(scheduler, ordinal, verbosity) { } stream_t stream; // This is an index into the inputs_ vector so -1 is an invalid value. // This is set to >=0 for all non-empty outputs during init(). - int cur_input = -1; + input_ordinal_t cur_input = INVALID_INPUT_ORDINAL; // For static schedules we can populate this up front and avoid needing a // lock for dynamically finding the next input, keeping things parallel. - std::vector input_indices; + std::vector input_indices; int input_indices_index = 0; }; @@ -749,13 +761,13 @@ template class scheduler_tmpl_t { // Returns a map of the thread id of each file to its index in inputs_. scheduler_status_t open_readers(const std::string &path, const std::set &only_threads, - std::unordered_map &workload_tids); + std::unordered_map &workload_tids); // Opens up a single reader for the (non-directory) file in 'path'. // Returns a map of the thread id of the file to its index in inputs_. scheduler_status_t open_reader(const std::string &path, const std::set &only_threads, - std::unordered_map &workload_tids); + std::unordered_map &workload_tids); // Creates a reader for the default file type we support. std::unique_ptr @@ -767,19 +779,19 @@ template class scheduler_tmpl_t { // Advances the 'output_ordinal'-th output stream. stream_status_t - next_record(int output_ordinal, RecordType &record, input_info_t *&input); + next_record(output_ordinal_t output, RecordType &record, input_info_t *&input); // Skips ahead to the next region of interest if necessary. stream_status_t - advance_region_of_interest(int output_ordinal, RecordType &record, + advance_region_of_interest(output_ordinal_t output, RecordType &record, input_info_t &input); void - set_cur_input(int output_ordinal, int input_ordinal); + set_cur_input(output_ordinal_t output, input_ordinal_t input); // Finds the next input stream for the 'output_ordinal'-th output stream. stream_status_t - pick_next_input(int output_ordinal); + pick_next_input(output_ordinal_t output); // If the given record has a thread id field, returns true and the value. bool @@ -812,22 +824,22 @@ template class scheduler_tmpl_t { // Returns the get_stream_name() value for the current input stream scheduled on // the 'output_ordinal'-th output stream. std::string - get_input_name(int output_ordinal); + get_input_name(output_ordinal_t output); // Returns the input ordinal value for the current input stream scheduled on // the 'output_ordinal'-th output stream. - int - get_input_ordinal(int output_ordinal); + input_ordinal_t + get_input_ordinal(output_ordinal_t output); // Returns whether the current record for the current input stream scheduled on // the 'output_ordinal'-th output stream is synthetic. bool - is_record_synthetic(int output_ordinal); + is_record_synthetic(output_ordinal_t output); // Returns the direct handle to the current input stream interface for the // 'output_ordinal'-th output stream. memtrace_stream_t * - get_input_stream(int output_ordinal); + get_input_stream(output_ordinal_t output); // This has the same value as scheduler_options_t.verbosity (for use in VPRINT). int verbosity_ = 0; @@ -841,7 +853,7 @@ template class scheduler_tmpl_t { // access to the inputs_, outputs_, and ready_ fields. std::mutex sched_lock_; // Input indices ready to be scheduled. - std::queue ready_; + std::queue ready_; }; /** See #dynamorio::drmemtrace::scheduler_tmpl_t. */ From a7323d9f8b0a8b6d37cff092c50c80441b43eeb0 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Tue, 28 Mar 2023 19:15:13 -0400 Subject: [PATCH 4/4] Fix lock scope to be full function, conditionally, with unique_lock; fix test sched char sequence to just be instrs; remove unused param --- clients/drcachesim/scheduler/scheduler.cpp | 6 ++++-- clients/drcachesim/tests/scheduler_unit_tests.cpp | 11 ++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index 09610565026..bc23f2be105 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -794,6 +794,7 @@ scheduler_tmpl_t::set_cur_input(output_ordinal_t output, input_ordinal_t input) { assert(output >= 0 && output < static_cast(outputs_.size())); + // 'input' might be INVALID_INPUT_ORDINAL. assert(input < static_cast(inputs_.size())); if (outputs_[output].cur_input >= 0) ready_.push(outputs_[output].cur_input); @@ -807,8 +808,9 @@ template typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::pick_next_input(output_ordinal_t output) { - if (options_.mapping == MAP_TO_ANY_OUTPUT) - std::lock_guard guard(sched_lock_); + bool need_lock = (options_.mapping == MAP_TO_ANY_OUTPUT); + auto scoped_lock = need_lock ? std::unique_lock() + : std::unique_lock(sched_lock_); // We're only called when we should definitely swap so clear the current. int prev_index = outputs_[output].cur_input; input_ordinal_t index = INVALID_INPUT_ORDINAL; diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index d1d4b44045c..424e78b35ce 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -82,7 +82,7 @@ class mock_reader_t : public reader_t { }; static trace_entry_t -make_instr(addr_t pc, memref_tid_t tid = 1) +make_instr(addr_t pc) { trace_entry_t entry; entry.type = TRACE_TYPE_INSTR; @@ -697,7 +697,7 @@ test_synthetic() inputs[i].push_back(make_thread(tid)); inputs[i].push_back(make_pid(1)); for (int j = 0; j < NUM_INSTRS; j++) - inputs[i].push_back(make_instr(42 + j * 4, tid)); + inputs[i].push_back(make_instr(42 + j * 4)); inputs[i].push_back(make_exit(tid)); std::vector readers; readers.emplace_back(std::unique_ptr(new mock_reader_t(inputs[i])), @@ -733,7 +733,8 @@ test_synthetic() continue; } assert(status == scheduler_t::STATUS_OK); - sched_as_string[i] += 'A' + (memref.instr.tid - TID_BASE); + if (type_is_instr(memref.instr.type)) + sched_as_string[i] += 'A' + (memref.instr.tid - TID_BASE); } } for (int i = 0; i < NUM_OUTPUTS; i++) { @@ -742,8 +743,8 @@ test_synthetic() // Hardcoding here for the 2 outputs and 7 inputs. // We expect 3 letter sequences (our quantum) alternating every-other as each // core alternates; with an odd number the 2nd core finishes early. - assert(sched_as_string[0] == "AAACCCEEEGGGBBBDDDFFFAAAACCCCEEEEGGGG"); - assert(sched_as_string[1] == "BBBDDDFFFAAACCCEEEGGGBBBBDDDDFFFF"); + assert(sched_as_string[0] == "AAACCCEEEGGGBBBDDDFFFAAACCCEEEGGG"); + assert(sched_as_string[1] == "BBBDDDFFFAAACCCEEEGGGBBBDDDFFF"); } } // namespace