From 0a26f9369a87ebb52b6e8d740b848915705e0358 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Wed, 8 Mar 2023 09:29:39 -0500 Subject: [PATCH] i#5843 scheduler: Move timestamp ordering into scheduler (#5895) Implements timestamp ordering in scheduler_t rather than relying on the old implementation inside file_reader_t. Adds a sanity test. Fixes a bug with only_threads and adds a simple test. Removing the file_reader_t code, along with eliminating the thread-as-sub-reader API routines, will be done as a separate refactoring. Issue: #5843 --- clients/drcachesim/scheduler/scheduler.cpp | 215 +++++++++++++----- clients/drcachesim/scheduler/scheduler.h | 15 +- .../drcachesim/tests/scheduler_unit_tests.cpp | 136 ++++++++++- 3 files changed, 305 insertions(+), 61 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index b26a9193859..5101899665a 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -117,6 +117,11 @@ scheduler_tmpl_t::get_reader(const std::string &path, int ve starts_with(fname, DRMEMTRACE_SERIAL_SCHEDULE_FILENAME) || fname == DRMEMTRACE_CPU_SCHEDULE_FILENAME) continue; + // Skip the auxiliary files. + if (fname == DRMEMTRACE_MODULE_LIST_FILENAME || + fname == DRMEMTRACE_FUNCTION_LIST_FILENAME || + fname == DRMEMTRACE_ENCODING_FILENAME) + continue; # ifdef HAS_SNAPPY if (ends_with(*iter, ".sz")) { return std::unique_ptr( @@ -167,6 +172,18 @@ scheduler_tmpl_t::record_type_is_marker(memref_t record, return true; } +template <> +bool +scheduler_tmpl_t::record_type_is_timestamp(memref_t record, + uintptr_t &value) +{ + if (record.marker.type != TRACE_TYPE_MARKER || + record.marker.marker_type != TRACE_MARKER_TYPE_TIMESTAMP) + return false; + value = record.marker.marker_value; + return true; +} + template <> memref_t scheduler_tmpl_t::create_region_separator_marker(memref_tid_t tid, @@ -199,6 +216,10 @@ scheduler_tmpl_t::print_record(const memref_t &record) fprintf(stderr, "tid=%" PRId64 " type=%d", record.instr.tid, record.instr.type); if (type_is_instr(record.instr.type)) fprintf(stderr, " pc=0x%zx size=%zu", record.instr.addr, record.instr.size); + else if (record.marker.type == TRACE_TYPE_MARKER) { + fprintf(stderr, " marker=0x%d val=%zu", record.marker.marker_type, + record.marker.marker_value); + } fprintf(stderr, "\n"); } @@ -256,6 +277,18 @@ scheduler_tmpl_t::record_type_is_marker( return true; } +template <> +bool +scheduler_tmpl_t::record_type_is_timestamp( + trace_entry_t record, uintptr_t &value) +{ + if (record.type != TRACE_TYPE_MARKER || + static_cast(record.size) != TRACE_MARKER_TYPE_TIMESTAMP) + return false; + value = record.addr; + return true; +} + template <> trace_entry_t scheduler_tmpl_t::create_region_separator_marker( @@ -382,7 +415,8 @@ scheduler_tmpl_t::init( for (auto &reader : workload.readers) { if (!reader.reader || !reader.end) return STATUS_ERROR_INVALID_PARAMETER; - if (workload.only_threads.find(reader.tid) != workload.only_threads.end()) + if (!workload.only_threads.empty() && + workload.only_threads.find(reader.tid) == workload.only_threads.end()) continue; int index = static_cast(inputs_.size()); inputs_.emplace_back(); @@ -463,18 +497,72 @@ scheduler_tmpl_t::init( // Initially we only support analyzer_t's serial mode. if (options_.deps != DEPENDENCY_TIMESTAMPS) return STATUS_ERROR_NOT_IMPLEMENTED; - // Currently file_reader_t is interleaving for us so we have just one input. - // TODO i#5843: Move interleaving logic to scheduler_t. - if (output_count != 1 || inputs_.size() != 1) + // TODO i#5843: Support more than one output. + if (output_count != 1) return STATUS_ERROR_NOT_IMPLEMENTED; - outputs_[0].cur_input = 0; + if (inputs_.size() == 1) { + outputs_[0].cur_input = 0; + } else { + // The old file_reader_t interleaving would output the top headers for every + // thread first and then pick the oldest timestamp once it reached a + // timestamp. We instead queue those headers so we can start directly with the + // oldest timestamp's thread. + sched_type_t::scheduler_status_t res = get_initial_timestamps(); + if (res != STATUS_SUCCESS) + return res; + } } else { // TODO i#5843: Implement scheduling onto synthetic cores. - return STATUS_ERROR_NOT_IMPLEMENTED; + // For now we only support a single output stream with no deps. + if (options_.deps != DEPENDENCY_IGNORE || output_count != 1) + return STATUS_ERROR_NOT_IMPLEMENTED; + outputs_[0].cur_input = 0; } return STATUS_SUCCESS; } +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_tmpl_t::get_initial_timestamps() +{ + // Read ahead in each input until we find a timestamp record. + // Queue up any skipped records to ensure we present them to the + // output stream(s). + uint64_t min_time = 0xffffffffffffffff; + for (size_t i = 0; i < inputs_.size(); i++) { + input_info_t &input = inputs_[i]; + if (input.next_timestamp <= 0) { + for (const auto &record : input.queue) { + if (record_type_is_timestamp(record, input.next_timestamp)) + break; + } + } + if (input.next_timestamp <= 0) { + if (input.needs_init) { + input.reader->init(); + input.needs_init = false; + } + while (input.reader != input.reader_end) { + RecordType record = **input.reader; + if (record_type_is_timestamp(record, input.next_timestamp)) + break; + input.queue.push_back(record); + ++(*input.reader); + } + } + if (input.next_timestamp <= 0) + return STATUS_ERROR_INVALID_PARAMETER; + if (input.next_timestamp < min_time) { + min_time = input.next_timestamp; + // TODO i#5843: Support more than one input (already checked earlier). + outputs_[0].cur_input = static_cast(i); + } + } + if (outputs_[0].cur_input >= 0) + return STATUS_SUCCESS; + return STATUS_ERROR_INVALID_PARAMETER; +} + template typename scheduler_tmpl_t::scheduler_status_t scheduler_tmpl_t::open_reader( @@ -502,14 +590,14 @@ scheduler_tmpl_t::open_reader( RecordType record = **reader; if (record_type_has_tid(record, tid)) break; - input.queue.push(record); + input.queue.push_back(record); ++(*reader); } if (tid == INVALID_THREAD_ID) { error_string_ = "Failed to read " + path; return STATUS_ERROR_FILE_READ_FAILED; } - if (only_threads.find(tid) != only_threads.end()) + if (!only_threads.empty() && only_threads.find(tid) == only_threads.end()) return sched_type_t::STATUS_SUCCESS; VPRINT(this, 2, "Opened reader for tid %" PRId64 " %s\n", tid, path.c_str()); input.tid = tid; @@ -525,29 +613,6 @@ scheduler_tmpl_t::open_readers( const std::string &path, const std::set &only_threads, std::unordered_map &workload_tids) { - if (options_.mapping == MAP_TO_RECORDED_OUTPUT && - options_.deps == DEPENDENCY_TIMESTAMPS) { - // TODO i#5843: Move the interleaving-by-timestamp code from - // file_reader_t into this scheduler. For now we leverage the - // file_reader_t code and use a sentinel tid (per-thread scheduling - // modifiers are ignored so we would only need tids for error reporting; - // we'll get those once we move the code). - std::unique_ptr reader = get_reader(path, verbosity_); - std::unique_ptr reader_end = get_default_reader(); - if (!reader || !reader_end || !reader->init()) { - error_string_ += "Failed to open " + path; - return STATUS_ERROR_FILE_OPEN_FAILED; - } - inputs_.emplace_back(); - input_info_t &input = inputs_.back(); - input.index = 0; - input.tid = INVALID_THREAD_ID; // See above. - input.reader = std::move(reader); - input.reader_end = std::move(reader_end); - workload_tids[input.tid] = input.index; - return sched_type_t::STATUS_SUCCESS; - } - if (!directory_iterator_t::is_directory(path)) return open_reader(path, only_threads, workload_tids); directory_iterator_t end; @@ -562,6 +627,11 @@ scheduler_tmpl_t::open_readers( starts_with(fname, DRMEMTRACE_SERIAL_SCHEDULE_FILENAME) || fname == DRMEMTRACE_CPU_SCHEDULE_FILENAME) continue; + // Skip the auxiliary files. + if (fname == DRMEMTRACE_MODULE_LIST_FILENAME || + fname == DRMEMTRACE_FUNCTION_LIST_FILENAME || + fname == DRMEMTRACE_ENCODING_FILENAME) + continue; const std::string file = path + DIRSEP + fname; sched_type_t::scheduler_status_t res = open_reader(file, only_threads, workload_tids); @@ -615,7 +685,7 @@ scheduler_tmpl_t::advance_region_of_interest(int output_ return sched_type_t::STATUS_EOF; else { // We let the user know we're done. - input.queue.push(create_thread_exit(input.tid)); + input.queue.push_back(create_thread_exit(input.tid)); input.at_eof = true; return sched_type_t::STATUS_SKIPPED; } @@ -632,7 +702,7 @@ scheduler_tmpl_t::advance_region_of_interest(int output_ // instr ordinal would include them and so be incorrect) and that we should // thus skip it all. while (!input.queue.empty()) - input.queue.pop(); + input.queue.pop_front(); uint64_t input_start_ref = input.reader->get_record_ordinal(); uint64_t input_start_instr = input.reader->get_instruction_ordinal(); input.reader->skip_instructions(cur_range.start_instruction - cur_instr); @@ -677,7 +747,8 @@ scheduler_tmpl_t::advance_region_of_interest(int output_ // we need to update the view tool to handle a window marker as the very // first entry). if (input.cur_region > 0) { - input.queue.push(create_region_separator_marker(input.tid, input.cur_region)); + input.queue.push_back( + create_region_separator_marker(input.tid, input.cur_region)); } return sched_type_t::STATUS_SKIPPED; } @@ -686,33 +757,48 @@ template typename scheduler_tmpl_t::stream_status_t scheduler_tmpl_t::pick_next_input(int output_ordinal) { - if (options_.mapping == MAP_TO_RECORDED_OUTPUT) { - // TODO i#5843: For now we have a single input; we'll need to change this - // once we move the file_reader_t interleaving here. - return sched_type_t::STATUS_EOF; - } - if (options_.mapping != MAP_TO_CONSISTENT_OUTPUT) { + if (options_.mapping == MAP_TO_ANY_OUTPUT) { // TODO i#5843: Implement synthetic scheduling with instr/time quanta. // These will require locks and central coordination. - return sched_type_t::STATUS_NOT_IMPLEMENTED; + // For now we only support a single output. + return sched_type_t::STATUS_EOF; } int index = outputs_[output_ordinal].cur_input; while (true) { if (index < 0) { - // We're done with the prior thread; take the next one that was - // pre-allocated to this output (pre-allocated to avoid locks). Invariant: - // the same output_ordinal will not be accessed by two different threads - // simultaneously in this mode, allowing us to support a lock-free - // parallel-friendly increment here. - int indices_index = ++outputs_[output_ordinal].input_indices_index; - if (indices_index >= - static_cast(outputs_[output_ordinal].input_indices.size())) { - VPRINT(this, 2, "next_record[%d]: all at eof\n", output_ordinal); - return sched_type_t::STATUS_EOF; - } - index = outputs_[output_ordinal].input_indices[indices_index]; - VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n", - output_ordinal, indices_index, index); + if (options_.deps == DEPENDENCY_TIMESTAMPS) { + uint64_t min_time = 0xffffffffffffffff; + for (size_t i = 0; i < inputs_.size(); i++) { + if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 && + inputs_[i].next_timestamp < min_time) { + min_time = inputs_[i].next_timestamp; + index = static_cast(i); + } + } + if (index < 0) + return sched_type_t::STATUS_EOF; + VPRINT(this, 2, + "next_record[%d]: advancing to timestamp %" PRIu64 + " == input #%d\n", + output_ordinal, min_time, index); + } else if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT) { + // We're done with the prior thread; take the next one that was + // pre-allocated to this output (pre-allocated to avoid locks). Invariant: + // the same output_ordinal will not be accessed by two different threads + // simultaneously in this mode, allowing us to support a lock-free + // parallel-friendly increment here. + int indices_index = ++outputs_[output_ordinal].input_indices_index; + if (indices_index >= + static_cast(outputs_[output_ordinal].input_indices.size())) { + VPRINT(this, 2, "next_record[%d]: all at eof\n", output_ordinal); + return sched_type_t::STATUS_EOF; + } + index = outputs_[output_ordinal].input_indices[indices_index]; + VPRINT(this, 2, + "next_record[%d]: advancing to local index %d == input #%d\n", + output_ordinal, indices_index, index); + } else + return sched_type_t::STATUS_INVALID; // reader_t::at_eof_ is true until init() is called. if (inputs_[index].needs_init) { inputs_[index].reader->init(); @@ -741,7 +827,8 @@ scheduler_tmpl_t::next_record(int output_ordinal, input_info_t *&input) { if (outputs_[output_ordinal].cur_input < 0) { - // This happens with more outputs than inputs for SCHEDULE_RUN_TO_COMPLETION. + // This happens with more outputs than inputs. For non-empty outputs we + // require cur_input to be set to >=0 during init(). return sched_type_t::STATUS_EOF; } input = &inputs_[outputs_[output_ordinal].cur_input]; @@ -758,7 +845,7 @@ scheduler_tmpl_t::next_record(int output_ordinal, } if (!input->queue.empty()) { record = input->queue.front(); - input->queue.pop(); + input->queue.pop_front(); } else { // We again have a flag check because reader_t::init() does an initial ++ // and so we want to skip that on the first record but perform a ++ prior @@ -780,6 +867,20 @@ scheduler_tmpl_t::next_record(int output_ordinal, record = **input->reader; } } + if (options_.deps == DEPENDENCY_TIMESTAMPS && + record_type_is_timestamp(record, input->next_timestamp)) { + int cur_input = outputs_[output_ordinal].cur_input; + // Mark cur_input invalid to ensure pick_next_input() takes action. + outputs_[output_ordinal].cur_input = -1; + sched_type_t::stream_status_t res = pick_next_input(output_ordinal); + if (res != sched_type_t::STATUS_OK) + return res; + if (outputs_[output_ordinal].cur_input != cur_input) { + input->queue.push_back(record); + input = &inputs_[outputs_[output_ordinal].cur_input]; + continue; + } + } if (input->needs_roi && !input->regions_of_interest.empty()) { sched_type_t::stream_status_t res = advance_region_of_interest(output_ordinal, record, *input); diff --git a/clients/drcachesim/scheduler/scheduler.h b/clients/drcachesim/scheduler/scheduler.h index 0ee3c0c210f..ea259965ad9 100644 --- a/clients/drcachesim/scheduler/scheduler.h +++ b/clients/drcachesim/scheduler/scheduler.h @@ -41,7 +41,7 @@ */ #include -#include +#include #include #include #include @@ -624,7 +624,8 @@ template class scheduler_tmpl_t { memref_tid_t tid = INVALID_THREAD_ID; // If non-empty these records should be returned before incrementing the reader. // This is used for read-ahead and inserting synthetic records. - std::queue queue; + // We use a deque so we can iterate over it. + std::deque queue; std::set binding; int priority = 0; std::vector regions_of_interest; @@ -634,6 +635,7 @@ template class scheduler_tmpl_t { bool needs_advance = false; bool needs_roi = true; bool at_eof = false; + uintptr_t next_timestamp = 0; }; struct output_info_t { @@ -643,6 +645,8 @@ template class scheduler_tmpl_t { { } stream_t stream; + // This is an index into the inputs_ vector so -1 is an invalid value. + // This is set to >=0 for all non-empty outputs during init(). int cur_input = -1; // For static schedules we can populate this up front and avoid needing a // lock for dynamically finding the next input, keeping things parallel. @@ -650,6 +654,9 @@ template class scheduler_tmpl_t { int input_indices_index = 0; }; + scheduler_status_t + get_initial_timestamps(); + // Opens up all the readers for each file in 'path' which may be a directory. // Returns a map of the thread id of each file to its index in inputs_. scheduler_status_t @@ -695,6 +702,10 @@ template class scheduler_tmpl_t { bool record_type_is_marker(RecordType record, trace_marker_type_t &type, uintptr_t &value); + // If the given record is a timestamp, returns true and its fields. + bool + record_type_is_timestamp(RecordType record, uintptr_t &value); + // Creates the marker we insert between regions of interest. RecordType create_region_separator_marker(memref_tid_t tid, uintptr_t value); diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index 4affbbd5e4c..58368903594 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -145,6 +145,89 @@ make_exit(memref_tid_t tid) return memref; } +static memref_t +make_version(memref_tid_t tid, int version) +{ + memref_t memref; + memref.marker.tid = tid; + memref.marker.type = TRACE_TYPE_MARKER; + memref.marker.marker_type = TRACE_MARKER_TYPE_VERSION; + memref.marker.marker_value = version; + return memref; +} + +static memref_t +make_timestamp(memref_tid_t tid, uint64_t timestamp) +{ + memref_t memref; + memref.marker.tid = tid; + memref.marker.type = TRACE_TYPE_MARKER; + memref.marker.marker_type = TRACE_MARKER_TYPE_TIMESTAMP; + memref.marker.marker_value = timestamp; + return memref; +} + +static void +test_serial() +{ + static constexpr memref_tid_t TID_A = 42; + static constexpr memref_tid_t TID_B = 99; + std::vector refs_A = { + /* clang-format off */ + // Include a header to test the scheduler queuing it. + make_version(TID_A, 4), + // Each timestamp is followed by an instr whose PC==time. + make_timestamp(TID_A, 10), + make_instr(TID_A, 10), + make_timestamp(TID_A, 30), + make_instr(TID_A, 30), + make_timestamp(TID_A, 50), + make_instr(TID_A, 50), + make_exit(TID_A), + /* clang-format on */ + }; + std::vector refs_B = { + /* clang-format off */ + make_version(TID_B, 4), + make_timestamp(TID_B, 20), + make_instr(TID_B, 20), + make_timestamp(TID_B, 40), + make_instr(TID_B, 40), + make_timestamp(TID_B, 60), + make_instr(TID_B, 60), + make_exit(TID_B), + /* clang-format on */ + }; + std::vector readers; + readers.emplace_back(std::unique_ptr(new mock_reader_t(refs_A)), + std::unique_ptr(new mock_reader_t()), TID_A); + readers.emplace_back(std::unique_ptr(new mock_reader_t(refs_B)), + std::unique_ptr(new mock_reader_t()), TID_B); + scheduler_t scheduler; + std::vector sched_inputs; + sched_inputs.emplace_back(std::move(readers)); + if (scheduler.init(sched_inputs, 1, + scheduler_t::make_scheduler_serial_options(/*verbosity=*/4)) != + scheduler_t::STATUS_SUCCESS) + assert(false); + auto *stream = scheduler.get_stream(0); + memref_t memref; + uint64_t last_timestamp = 0; + memref_tid_t last_timestamp_tid = INVALID_THREAD_ID; + for (scheduler_t::stream_status_t status = stream->next_record(memref); + status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) { + assert(status == scheduler_t::STATUS_OK); + if (memref.marker.type == TRACE_TYPE_MARKER && + memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP) { + assert(memref.marker.marker_value > last_timestamp); + last_timestamp = memref.marker.marker_value; + // In our test case we have alternating threads. + assert(last_timestamp_tid != memref.marker.tid); + last_timestamp_tid = memref.marker.tid; + } + } +} + static void test_parallel() { @@ -245,9 +328,12 @@ test_regions() std::vector sched_inputs; sched_inputs.emplace_back(std::move(readers)); sched_inputs[0].thread_modifiers.push_back(scheduler_t::input_thread_info_t(regions)); + // Since reader_t::skip_instructions() is unfinished and does not repeat timestamps, + // we can't use the serial options as it will fail without timestamps. if (scheduler.init(sched_inputs, 1, - scheduler_t::make_scheduler_serial_options(/*verbosity=*/4)) != - scheduler_t::STATUS_SUCCESS) + scheduler_t::scheduler_options_t( + scheduler_t::MAP_TO_ANY_OUTPUT, scheduler_t::DEPENDENCY_IGNORE, + /*verbosity=*/4)) != scheduler_t::STATUS_SUCCESS) assert(false); int ordinal = 0; auto *stream = scheduler.get_stream(0); @@ -280,13 +366,59 @@ test_regions() assert(ordinal == 5); } +static void +test_only_threads() +{ + static constexpr memref_tid_t TID_A = 42; + static constexpr memref_tid_t TID_B = 99; + static constexpr memref_tid_t TID_C = 7; + std::vector refs_A = { + make_instr(TID_A, 50), + make_exit(TID_A), + }; + std::vector refs_B = { + make_instr(TID_B, 60), + make_exit(TID_B), + }; + std::vector refs_C = { + make_instr(TID_B, 60), + make_exit(TID_B), + }; + std::vector readers; + readers.emplace_back(std::unique_ptr(new mock_reader_t(refs_A)), + std::unique_ptr(new mock_reader_t()), TID_A); + readers.emplace_back(std::unique_ptr(new mock_reader_t(refs_B)), + std::unique_ptr(new mock_reader_t()), TID_B); + readers.emplace_back(std::unique_ptr(new mock_reader_t(refs_C)), + std::unique_ptr(new mock_reader_t()), TID_C); + + scheduler_t scheduler; + std::vector sched_inputs; + sched_inputs.emplace_back(std::move(readers)); + sched_inputs[0].only_threads.insert(TID_B); + if (scheduler.init(sched_inputs, 1, + scheduler_t::make_scheduler_serial_options(/*verbosity=*/4)) != + scheduler_t::STATUS_SUCCESS) + assert(false); + auto *stream = scheduler.get_stream(0); + memref_t memref; + uint64_t last_timestamp = 0; + for (scheduler_t::stream_status_t status = stream->next_record(memref); + status != scheduler_t::STATUS_EOF; status = stream->next_record(memref)) { + assert(status == scheduler_t::STATUS_OK); + assert(memref.instr.tid == TID_B); + } +} + } // namespace int main(int argc, const char *argv[]) { + test_serial(); test_parallel(); test_param_checks(); test_regions(); + test_only_threads(); return 0; }