From d1c1db02979078a0fad63f679faceb0c4f9475db Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Mon, 6 Mar 2023 23:21:50 -0500 Subject: [PATCH] i#5843 scheduler: Refactor readers to be single-input Removes multi-input support from file_reader_t and other readers now that the scheduler_t owns that. Specifically: + Removes read_next_thread_entry() and requires that read_next_entry() always check the queue (via a provided helper function). + Removes skip_thread_instructions() and refactors the pre-skip header reading and the post-skip walking while remembering timestamps. Places these latter two inside reader_t for use by all readers, with zipfile overriding just the fast skip in the middle and sharing all the other code. This refactoring and sharing solves the problem of missing timestamps when skipping from the middle. + Removes the arrays of data for multiple inputs from file_reader_t and all subclasses. Updates the view_test to use a scheduler for its multiple-input mock reader. While at it, removes is_complete(). Issue: #5843, #5538 --- api/docs/debug_memtrace.dox | 4 +- api/docs/release.dox | 3 + clients/drcachesim/CMakeLists.txt | 3 +- .../reader/compressed_file_reader.cpp | 62 ++-- .../reader/compressed_file_reader.h | 4 +- clients/drcachesim/reader/file_reader.cpp | 74 +--- clients/drcachesim/reader/file_reader.h | 319 +++--------------- clients/drcachesim/reader/ipc_reader.cpp | 5 +- clients/drcachesim/reader/ipc_reader.h | 9 +- clients/drcachesim/reader/reader.cpp | 131 +++++-- clients/drcachesim/reader/reader.h | 46 +-- .../drcachesim/reader/snappy_file_reader.cpp | 37 +- .../drcachesim/reader/snappy_file_reader.h | 3 +- .../drcachesim/reader/zipfile_file_reader.cpp | 200 ++++------- .../drcachesim/reader/zipfile_file_reader.h | 12 +- .../drcachesim/tests/scheduler_unit_tests.cpp | 9 +- clients/drcachesim/tests/view_test.cpp | 114 ++++--- clients/drcachesim/tracer/raw2trace.h | 8 +- 18 files changed, 398 insertions(+), 645 deletions(-) diff --git a/api/docs/debug_memtrace.dox b/api/docs/debug_memtrace.dox index 9da0ad59ac4..9c893d1d55b 100644 --- a/api/docs/debug_memtrace.dox +++ b/api/docs/debug_memtrace.dox @@ -1,5 +1,5 @@ /* ****************************************************************************** - * Copyright (c) 2010-2022 Google, Inc. All rights reserved. + * Copyright (c) 2010-2023 Google, Inc. All rights reserved. * ******************************************************************************/ /* @@ -100,7 +100,7 @@ Type cheat sheet (from trace_type_t enum): - 0x19 header - 0x16 thread - 0x18 pid -- 0x1c marker: 2=timestamp; 3=cpuid +- 0x1c marker: 2=timestamp; 3=cpuid; 0xc=version; 9=filetype - 0x0a instr (non-cti) - 0x0e direct call - 0x00 load diff --git a/api/docs/release.dox b/api/docs/release.dox index f9f7d5582fe..bbfce0afafc 100644 --- a/api/docs/release.dox +++ b/api/docs/release.dox @@ -149,6 +149,9 @@ changes: - Removed the drcachesim external iterator analyzer interface. Users should instead use the new #dynamorio::drmemtrace::scheduler_tmpl_t interface for direct control over iteration. See \ref sec_drcachesim_sched for example code. + - Refactored the drmemtrace reader and file reader classes to better fit the + new scheduler model: now each reader owns just one single stream of records + with all multi-stream interleaving owned by the scheduler. Further non-compatibility-affecting changes include: - Added AArchXX support for attaching to a running process. diff --git a/clients/drcachesim/CMakeLists.txt b/clients/drcachesim/CMakeLists.txt index d1b7fe27dfa..7a50fef6511 100644 --- a/clients/drcachesim/CMakeLists.txt +++ b/clients/drcachesim/CMakeLists.txt @@ -715,7 +715,8 @@ if (BUILD_TESTS) add_executable(tool.drcacheoff.view_test tests/view_test.cpp reader/file_reader.cpp) configure_DynamoRIO_standalone(tool.drcacheoff.view_test) add_win32_flags(tool.drcacheoff.view_test) - target_link_libraries(tool.drcacheoff.view_test drmemtrace_view drmemtrace_raw2trace) + target_link_libraries(tool.drcacheoff.view_test drmemtrace_view drmemtrace_raw2trace + drmemtrace_analyzer) use_DynamoRIO_extension(tool.drcacheoff.view_test drreg_static) use_DynamoRIO_extension(tool.drcacheoff.view_test drcovlib_static) use_DynamoRIO_extension(tool.drcacheoff.view_test drdecode) diff --git a/clients/drcachesim/reader/compressed_file_reader.cpp b/clients/drcachesim/reader/compressed_file_reader.cpp index bfbaa78cdd8..6113c266476 100644 --- a/clients/drcachesim/reader/compressed_file_reader.cpp +++ b/clients/drcachesim/reader/compressed_file_reader.cpp @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2017-2022 Google, Inc. All rights reserved. + * Copyright (c) 2017-2023 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -46,25 +46,24 @@ open_single_file_common(const std::string &path, gzFile &out) return out != nullptr; } -bool -read_next_thread_entry_common(gzip_reader_t *gzip, OUT trace_entry_t *entry, - OUT bool *eof) +trace_entry_t * +read_next_entry_common(gzip_reader_t *gzip, bool *eof) { if (gzip->cur_buf >= gzip->max_buf) { int len = gzread(gzip->file, gzip->buf, sizeof(gzip->buf)); // Returns less than asked-for for end of file, or –1 for error. // We should always get a multiple of the record size. - if (len < static_cast(sizeof(*entry)) || - len % static_cast(sizeof(*entry)) != 0) { + if (len < static_cast(sizeof(trace_entry_t)) || + len % static_cast(sizeof(trace_entry_t)) != 0) { *eof = (len >= 0); - return false; + return nullptr; } gzip->cur_buf = gzip->buf; gzip->max_buf = gzip->buf + (len / sizeof(*gzip->max_buf)); } - *entry = *gzip->cur_buf; + trace_entry_t *res = gzip->cur_buf; ++gzip->cur_buf; - return true; + return res; } } // namespace @@ -78,9 +77,8 @@ template <> /* clang-format on */ file_reader_t::~file_reader_t() { - for (auto &gzip : input_files_) - gzclose(gzip.file); - delete[] thread_eof_; + if (input_file_.file != nullptr) + gzclose(input_file_.file); } template <> @@ -91,34 +89,24 @@ file_reader_t::open_single_file(const std::string &path) if (!open_single_file_common(path, file)) return false; VPRINT(this, 1, "Opened input file %s\n", path.c_str()); - input_files_.emplace_back(file); + input_file_ = gzip_reader_t(file); return true; } template <> -bool -file_reader_t::read_next_thread_entry(size_t thread_index, - OUT trace_entry_t *entry, - OUT bool *eof) +trace_entry_t * +file_reader_t::read_next_entry() { - if (!read_next_thread_entry_common(&input_files_[thread_index], entry, eof)) - return false; - VPRINT(this, 4, "Read from thread #%zd file: type=%s (%d), size=%d, addr=%zu\n", - thread_index, trace_type_names[entry->type], entry->type, entry->size, - entry->addr); - return true; -} - -template <> -bool -file_reader_t::is_complete() -{ - // The gzip reading interface does not support seeking to SEEK_END so there - // is no efficient way to read the footer. - // We could have the trace file writer seek back and set a bit at the start. - // Currently we are forced to not use this function. - // XXX: Should we just remove this interface, then? - return false; + trace_entry_t *entry = read_queue(); + if (entry != nullptr) + return entry; + entry = read_next_entry_common(&input_file_, &at_eof_); + if (entry == nullptr) + return entry; + VPRINT(this, 4, "Read from file: type=%s (%d), size=%d, addr=%zu\n", + trace_type_names[entry->type], entry->type, entry->size, entry->addr); + entry_copy_ = *entry; + return &entry_copy_; } namespace dynamorio { @@ -154,8 +142,10 @@ template <> bool record_file_reader_t::read_next_entry() { - if (!read_next_thread_entry_common(input_file_.get(), &cur_entry_, &eof_)) + trace_entry_t *entry = read_next_entry_common(input_file_.get(), &eof_); + if (entry == nullptr) return false; + cur_entry_ = *entry; VPRINT(this, 4, "Read from file: type=%s (%d), size=%d, addr=%zu\n", trace_type_names[cur_entry_.type], cur_entry_.type, cur_entry_.size, cur_entry_.addr); diff --git a/clients/drcachesim/reader/compressed_file_reader.h b/clients/drcachesim/reader/compressed_file_reader.h index b54984371ab..19095bb4a66 100644 --- a/clients/drcachesim/reader/compressed_file_reader.h +++ b/clients/drcachesim/reader/compressed_file_reader.h @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2017-2022 Google, Inc. All rights reserved. + * Copyright (c) 2017-2023 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -40,6 +40,8 @@ #include "record_file_reader.h" struct gzip_reader_t { + gzip_reader_t() + : file(nullptr) {}; explicit gzip_reader_t(gzFile file) : file(file) { diff --git a/clients/drcachesim/reader/file_reader.cpp b/clients/drcachesim/reader/file_reader.cpp index 940de8206d7..a305780b085 100644 --- a/clients/drcachesim/reader/file_reader.cpp +++ b/clients/drcachesim/reader/file_reader.cpp @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2016-2022 Google, Inc. All rights reserved. + * Copyright (c) 2016-2023 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -38,75 +38,33 @@ template <> /* clang-format on */ file_reader_t::~file_reader_t() { - for (auto fstream : input_files_) { - delete fstream; - } - delete[] thread_eof_; + delete input_file_; } template <> bool file_reader_t::open_single_file(const std::string &path) { - auto fstream = new std::ifstream(path, std::ifstream::binary); - if (!*fstream) + input_file_ = new std::ifstream(path, std::ifstream::binary); + if (!*input_file_) return false; VPRINT(this, 1, "Opened input file %s\n", path.c_str()); - input_files_.push_back(fstream); - return true; -} - -template <> -bool -file_reader_t::read_next_thread_entry(size_t thread_index, - OUT trace_entry_t *entry, - OUT bool *eof) -{ - if (!input_files_[thread_index]->read((char *)entry, sizeof(*entry))) { - *eof = input_files_[thread_index]->eof(); - return false; - } - VPRINT(this, 4, "Read from thread #%zd file: type=%s (%d), size=%d, addr=%zu\n", - thread_index, trace_type_names[entry->type], entry->type, entry->size, - entry->addr); return true; } template <> -bool -file_reader_t::is_complete() +trace_entry_t * +file_reader_t::read_next_entry() { - // FIXME i#3230: this code is in the middle of refactoring for split thread files. - // We support the is_complete() call from analyzer_multi for a single file for now, - // but may have to abandon this function altogether since gzFile doesn't support it. - bool opened_temporarily = false; - if (input_files_.empty()) { - // Supporting analyzer_multi calling before init() for a single legacy file. - opened_temporarily = true; - if (!input_path_list_.empty() || input_path_.empty() || - directory_iterator_t::is_directory(input_path_)) - return false; // Not supported. - if (!open_single_file(input_path_)) - return false; - } - bool res = false; - for (auto fstream : input_files_) { - res = false; - std::streampos pos = fstream->tellg(); - fstream->seekg(-(int)sizeof(trace_entry_t), fstream->end); - // Avoid reaching eof b/c we can't seek away from it. - if (fstream->read((char *)&entry_copy_.type, sizeof(entry_copy_.type)) && - entry_copy_.type == TRACE_TYPE_FOOTER) - res = true; - fstream->seekg(pos); - if (!res) - break; - } - if (opened_temporarily) { - // Put things back for init(). - for (auto fstream : input_files_) - delete fstream; - input_files_.clear(); + trace_entry_t *from_queue = read_queue(); + if (from_queue != nullptr) + return from_queue; + if (!input_file_->read((char *)&entry_copy_, sizeof(entry_copy_))) { + at_eof_ = input_file_->eof(); + return nullptr; } - return res; + VPRINT(this, 4, "Read from file: type=%s (%d), size=%d, addr=%zu\n", + trace_type_names[entry_copy_.type], entry_copy_.type, entry_copy_.size, + entry_copy_.addr); + return &entry_copy_; } diff --git a/clients/drcachesim/reader/file_reader.h b/clients/drcachesim/reader/file_reader.h index 65325092310..e1c7d8ba729 100644 --- a/clients/drcachesim/reader/file_reader.h +++ b/clients/drcachesim/reader/file_reader.h @@ -78,26 +78,17 @@ template class file_reader_t : public reader_t { { online_ = false; } - explicit file_reader_t(const std::vector &path_list, int verbosity = 0) - : reader_t(verbosity, "[file_reader]") - , input_path_list_(path_list) - { - online_ = false; - } virtual ~file_reader_t(); bool init() override { at_eof_ = false; - if (!open_input_files()) + if (!open_input_file()) return false; ++*this; return true; } - virtual bool - is_complete(); - std::string get_stream_name() const override { @@ -107,287 +98,83 @@ template class file_reader_t : public reader_t { return input_path_.substr(ind + 1); } - reader_t & - skip_instructions(uint64_t instruction_count) override - { - if (instruction_count == 0) - return *this; - if (input_files_.size() > 1) { - // TODO i#5538: For fast thread-interleaved (whether serial here or the - // forthcoming per-cpu iteration) we need to read in the schedule file(s) - // that raw2trace writes out so that we can compute how far to separately - // fast-skip in each interleaved thread by calling the per-thread version. - // We'll also need to update the memref pid+tid state since we're not - // repeating top headers in every thread after a skip. For now this is a - // slow linear walk. - return reader_t::skip_instructions(instruction_count); - } - // If the user asks to skip from the very start, we still need to find the chunk - // count marker and drain the header queue and populate the stream header values. - // XXX: We assume the page size is the final header; it is complex to wait for - // the timestamp as we don't want to read it yet. - while (page_size_ == 0) { - input_entry_ = read_next_entry(); - process_input_entry(); - } - if (!queues_[0].empty()) - ERRMSG("Failed to drain header queue\n"); - bool eof = false; - if (!skip_thread_instructions(0, instruction_count, &eof) || eof) - at_eof_ = true; - return *this; - } - protected: - bool - read_next_thread_entry(size_t thread_index, OUT trace_entry_t *entry, - OUT bool *eof) override; + trace_entry_t * + read_next_entry() override; virtual bool open_single_file(const std::string &path); virtual bool - open_input_files() + open_input_file() { - if (!input_path_list_.empty()) { - for (const std::string &path : input_path_list_) { - if (!open_single_file(path)) { - ERRMSG("Failed to open %s\n", path.c_str()); - return false; - } - } - } else if (directory_iterator_t::is_directory(input_path_)) { - VPRINT(this, 1, "Iterating directory %s\n", input_path_.c_str()); - directory_iterator_t end; - directory_iterator_t iter(input_path_); - if (!iter) { - ERRMSG("Failed to list directory %s: %s", input_path_.c_str(), - iter.error_string().c_str()); - return false; - } - for (; iter != end; ++iter) { - std::string fname = *iter; - if (fname == "." || fname == ".." || - starts_with(fname, DRMEMTRACE_SERIAL_SCHEDULE_FILENAME) || - fname == DRMEMTRACE_CPU_SCHEDULE_FILENAME) - continue; - // Skip the auxiliary files. - if (fname == DRMEMTRACE_MODULE_LIST_FILENAME || - fname == DRMEMTRACE_FUNCTION_LIST_FILENAME || - fname == DRMEMTRACE_ENCODING_FILENAME) - continue; - VPRINT(this, 2, "Found file %s\n", fname.c_str()); - if (!open_single_file(input_path_ + DIRSEP + fname)) { - ERRMSG("Failed to open %s\n", fname.c_str()); - return false; - } - } - } else { - if (!open_single_file(input_path_)) { - ERRMSG("Failed to open %s\n", input_path_.c_str()); - return false; - } - } - if (input_files_.empty()) { - ERRMSG("No thread files found."); + if (!open_single_file(input_path_)) { + ERRMSG("Failed to open %s\n", input_path_.c_str()); return false; } - thread_count_ = input_files_.size(); - queues_.resize(input_files_.size()); - tids_.resize(input_files_.size()); - timestamps_.resize(input_files_.size()); - times_.resize(input_files_.size(), 0); - // We can't take the address of a vector element so we use a raw array. - thread_eof_ = new bool[input_files_.size()]; - memset(thread_eof_, 0, input_files_.size() * sizeof(*thread_eof_)); - - // First read the tid and pid entries which precede any timestamps_. + // First read the tid and pid entries which precede any timestamps. // We hand out the tid to the output on every thread switch, and the pid // the very first time for the thread. - trace_entry_t header, next, pid = {}; - for (index_ = 0; index_ < input_files_.size(); ++index_) { - if (!read_next_thread_entry(index_, &header, &thread_eof_[index_]) || - header.type != TRACE_TYPE_HEADER) { - ERRMSG("Invalid header for input file #%zu\n", index_); - return false; - } - // We can handle the older version 1 as well which simply omits the - // early marker with the arch tag, and version 2 which only differs wrt - // TRACE_MARKER_TYPE_KERNEL_EVENT.. - if (header.addr > TRACE_ENTRY_VERSION) { - ERRMSG( - "Cannot handle version #%zu (expect version <= #%u) for input file " - "#%zu\n", - header.addr, TRACE_ENTRY_VERSION, index_); + trace_entry_t *entry; + trace_entry_t header = {}, pid = {}, tid = {}; + entry = read_next_entry(); + if (entry == nullptr || entry->type != TRACE_TYPE_HEADER) { + ERRMSG("Invalid header\n"); + return false; + } + header = *entry; + // We can handle the older version 1 as well which simply omits the + // early marker with the arch tag, and version 2 which only differs wrt + // TRACE_MARKER_TYPE_KERNEL_EVENT.. + if (entry->addr > TRACE_ENTRY_VERSION) { + ERRMSG("Cannot handle version #%zu (expect version <= #%u)\n", entry->addr, + TRACE_ENTRY_VERSION); + return false; + } + // Read the meta entries until we hit the pid. + // We want to pass the tid+pid to the reader *before* any markers, + // even though 2 markers preced the tid+pid in the file. + std::queue marker_queue; + while ((entry = read_next_entry()) != nullptr) { + if (entry->type == TRACE_TYPE_PID) { + // We assume the pid entry is the last, right before the timestamp. + pid = *entry; + break; + } else if (entry->type == TRACE_TYPE_THREAD) + tid = *entry; + else if (entry->type == TRACE_TYPE_MARKER) + marker_queue.push(*entry); + else { + ERRMSG("Unexpected trace sequence\n"); return false; } - // Read the meta entries until we hit the pid. - // We want to pass the tid+pid to the reader *before* any markers, - // even though 2 markers preced the tid+pid in the file. - std::queue marker_queue; - while (read_next_thread_entry(index_, &next, &thread_eof_[index_])) { - if (next.type == TRACE_TYPE_PID) { - // We assume the pid entry is the last, right before the timestamp. - pid = next; - break; - } else if (next.type == TRACE_TYPE_THREAD) - tids_[index_] = next; - else if (next.type == TRACE_TYPE_MARKER) - marker_queue.push(next); - else { - ERRMSG("Unexpected trace sequence for input file #%zu\n", index_); - return false; - } - } - VPRINT(this, 2, "Read thread #%zd header: ver=%zu, pid=%zu, tid=%zu\n", - index_, header.addr, pid.addr, tids_[index_].addr); - // The reader expects us to own the header and pass the tid as - // the first entry. - queues_[index_].push(tids_[index_]); - queues_[index_].push(pid); - while (!marker_queue.empty()) { - queues_[index_].push(marker_queue.front()); - marker_queue.pop(); - } } - index_ = input_files_.size(); - - return true; - } - - trace_entry_t * - read_next_entry() override - { - // We read the thread files simultaneously in lockstep and merge them into - // a single interleaved stream in timestamp order. - // When a thread file runs out we leave its times_[] entry as 0 and its file at - // eof. - while (thread_count_ > 0) { - if (index_ >= input_files_.size()) { - // Pick the next thread by looking for the smallest timestamp. - uint64_t min_time = 0xffffffffffffffff; - size_t next_index = 0; - for (size_t i = 0; i < times_.size(); ++i) { - if (times_[i] == 0 && !thread_eof_[i]) { - if (!read_next_thread_entry(i, ×tamps_[i], - &thread_eof_[i])) { - ERRMSG("Failed to read from input file #%zu\n", i); - return nullptr; - } - if (timestamps_[i].type != TRACE_TYPE_MARKER && - timestamps_[i].size != TRACE_MARKER_TYPE_TIMESTAMP) { - ERRMSG("Missing timestamp entry in input file #%zu\n", i); - return nullptr; - } - times_[i] = timestamps_[i].addr; - VPRINT(this, 3, - "Thread #%zu timestamp is @0x" ZHEX64_FORMAT_STRING "\n", - i, times_[i]); - } - if (times_[i] != 0 && times_[i] < min_time) { - min_time = times_[i]; - next_index = i; - } - } - VPRINT(this, 2, - "Next thread in timestamp order is #%zu @0x" ZHEX64_FORMAT_STRING - "\n", - next_index, times_[next_index]); - index_ = next_index; - times_[index_] = 0; // Read from file for this thread's next timestamp. - // If the queue is not empty, it should contain the initial tid;pid. - if ((queues_[index_].empty() || - queues_[index_].front().type != TRACE_TYPE_THREAD) && - // For a single thread (or already-interleaved file) we do not need - // thread entries before each timestamp. - input_files_.size() > 1) - queues_[index_].push(tids_[index_]); - queues_[index_].push(timestamps_[index_]); - } - if (!queues_[index_].empty()) { - entry_copy_ = queues_[index_].front(); - queues_[index_].pop(); - return &entry_copy_; - } - VPRINT(this, 4, "About to read thread #%zu\n", index_); - if (!read_next_thread_entry(index_, &entry_copy_, &thread_eof_[index_])) { - if (thread_eof_[index_]) { - VPRINT(this, 2, "Thread #%zu at eof\n", index_); - --thread_count_; - if (thread_count_ == 0) { - VPRINT(this, 2, "All threads at eof\n"); - at_eof_ = true; - break; - } - times_[index_] = 0; - index_ = input_files_.size(); // Request thread scan. - continue; - } else { - ERRMSG("Failed to read from input file #%zu\n", index_); - return nullptr; - } - } - if (entry_copy_.type == TRACE_TYPE_MARKER && - entry_copy_.size == TRACE_MARKER_TYPE_TIMESTAMP) { - VPRINT(this, 3, "Thread #%zu timestamp 0x" ZHEX64_FORMAT_STRING "\n", - index_, (uint64_t)entry_copy_.addr); - times_[index_] = entry_copy_.addr; - timestamps_[index_] = entry_copy_; - index_ = input_files_.size(); // Request thread scan. - continue; - } - return &entry_copy_; + VPRINT(this, 2, "Read header: ver=%zu, pid=%zu, tid=%zu\n", header.addr, pid.addr, + tid.addr); + // The reader expects us to own the header and pass the tid as + // the first entry. + queue_.push(tid); + queue_.push(pid); + while (!marker_queue.empty()) { + queue_.push(marker_queue.front()); + marker_queue.pop(); } - return nullptr; + return true; } - virtual bool - skip_thread_instructions(size_t thread_index, uint64_t instruction_count, - OUT bool *eof) + // Provided so that instantiations can specialize. + reader_t & + skip_instructions(uint64_t instruction_count) override { - // TODO i#5538,i#5843: Once we move interleaving code from file_reader_t - // into scheduler_t, remove these _thread variants: read_next_thread_entry() - // and skip_thread_instructions(). Use reader_t::skip_instructions() here - // (don't override). - - // Default implementation for file types that have no fast seeking and must do a - // linear walk. - // FIXME i#5538,i#5843: This is broken as it goes too far: see how - // reader_t::skip_instructions() does it. We'll fix when we refactor (see - // comment at top of function). - uint64_t stop_count_ = cur_instr_count_ + instruction_count + 1; - while (cur_instr_count_ < stop_count_) { - if (!read_next_thread_entry(thread_index, &entry_copy_, eof)) - return false; - // Update core state. - input_entry_ = &entry_copy_; - process_input_entry(); - // TODO i#5538: Remember the last timestamp+cpu and insert it; share - // code with the zipfile reader. - } - return true; + return reader_t::skip_instructions(instruction_count); } // Protected for access by mock_file_reader_t. - std::vector input_files_; + T input_file_; private: std::string input_path_; - std::vector input_path_list_; - trace_entry_t entry_copy_; - // The current thread we're processing is "index". If it's set to input_files_.size() - // that means we need to pick a new thread. - size_t index_; - size_t thread_count_; - // TODO i#5843: Once we move interleaving code from file_reader_t into scheduler_t - // these will all become singleton as one class instance here will have just one - // thread. We can merge queues_ here into the new reader_t::queue_. - std::vector> queues_; - std::vector tids_; - std::vector timestamps_; - std::vector times_; - bool *thread_eof_ = nullptr; }; #endif /* _FILE_READER_H_ */ diff --git a/clients/drcachesim/reader/ipc_reader.cpp b/clients/drcachesim/reader/ipc_reader.cpp index 1635c5b2572..1d7b3a1dd5a 100644 --- a/clients/drcachesim/reader/ipc_reader.cpp +++ b/clients/drcachesim/reader/ipc_reader.cpp @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2015-2022 Google, Inc. All rights reserved. + * Copyright (c) 2015-2023 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -92,6 +92,9 @@ ipc_reader_t::~ipc_reader_t() trace_entry_t * ipc_reader_t::read_next_entry() { + trace_entry_t *from_queue = read_queue(); + if (from_queue != nullptr) + return from_queue; ++cur_buf_; if (cur_buf_ >= end_buf_) { ssize_t sz = pipe_.read(buf_, sizeof(buf_)); // blocking read diff --git a/clients/drcachesim/reader/ipc_reader.h b/clients/drcachesim/reader/ipc_reader.h index 8ac19feaaa5..447708cde7b 100644 --- a/clients/drcachesim/reader/ipc_reader.h +++ b/clients/drcachesim/reader/ipc_reader.h @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2015-2020 Google, Inc. All rights reserved. + * Copyright (c) 2015-2023 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -60,13 +60,6 @@ class ipc_reader_t : public reader_t { trace_entry_t * read_next_entry() override; - bool - read_next_thread_entry(size_t, trace_entry_t *, bool *) override - { - // Only an interleaved stream is supported. - return false; - } - private: named_pipe_t pipe_; bool creation_success_; diff --git a/clients/drcachesim/reader/reader.cpp b/clients/drcachesim/reader/reader.cpp index b5ea179014f..1fc5d3051d3 100644 --- a/clients/drcachesim/reader/reader.cpp +++ b/clients/drcachesim/reader/reader.cpp @@ -47,14 +47,13 @@ reader_t::operator*() } trace_entry_t * -reader_t::read_next_entry_or_queue() +reader_t::read_queue() { - if (!queue_.empty()) { - local_entry_ = queue_.front(); - queue_.pop(); - return &local_entry_; - } - return read_next_entry(); + if (queue_.empty()) + return nullptr; + entry_copy_ = queue_.front(); + queue_.pop(); + return &entry_copy_; } reader_t & @@ -63,7 +62,7 @@ reader_t::operator++() // We bail if we get a partial read, or EOF, or any error. while (true) { if (bundle_idx_ == 0 /*not in instr bundle*/) - input_entry_ = read_next_entry_or_queue(); + input_entry_ = read_next_entry(); if (input_entry_ == NULL) { if (!at_eof_) { ERRMSG("Trace is truncated\n"); @@ -337,6 +336,25 @@ reader_t::use_prev(trace_entry_t *prev) input_entry_ = prev; } +void +reader_t::pre_skip_instructions() +{ + // If the user asks to skip from the very start, we still need to find the chunk + // count marker and drain the header queue and populate the stream header values. + // XXX: We assume the page size is the final header; it is complex to wait for + // the timestamp as we don't want to read it yet. + while (page_size_ == 0) { + input_entry_ = read_next_entry(); + if (input_entry_->type != TRACE_TYPE_MARKER || + input_entry_->size == TRACE_MARKER_TYPE_TIMESTAMP) { + // Likely some mock in a test with no page size header: just move on. + queue_.push(*input_entry_); + break; + } + process_input_entry(); + } +} + reader_t & reader_t::skip_instructions(uint64_t instruction_count) { @@ -349,37 +367,98 @@ reader_t::skip_instructions(uint64_t instruction_count) at_eof_ = true; return *this; } + pre_skip_instructions(); + return skip_instructions_with_timestamp(cur_instr_count_ + instruction_count); +} + +reader_t & +reader_t::skip_instructions_with_timestamp(uint64_t stop_instruction_count) +{ // This base class has no fast seeking and must do a linear walk. // We have +1 because we need to skip the memrefs of the final skipped // instr, so we look for the 1st unskipped instr: but we do not want to // process it so we do not use the ++ operator function. - uint64_t stop_count = cur_instr_count_ + instruction_count + 1; + uint64_t stop_count = stop_instruction_count + 1; + trace_entry_t timestamp = {}; + trace_entry_t cpu = {}; + trace_entry_t next_instr = {}; + bool prev_was_record_ord = false; + bool found_real_timestamp = false; while (cur_instr_count_ < stop_count) { // End condition is never reached. - // TODO i#5538: Remember the last timestamp+cpu and insert it; share - // code with the zipfile reader. - // TODO i#5538: If skipping from the start, Record all of the header values - // until the first timestamp and present them as new memtrace_stream_t - // interfaces. // Remember the prior entry to use as the cur entry when we hit the - // too-far instr. + // too-far instr if we didn't find a timestamp. if (input_entry_ != nullptr) // Only at start: and we checked for skipping 0. - local_entry_ = *input_entry_; - input_entry_ = read_next_entry_or_queue(); - if (input_entry_ == nullptr) { - if (!at_eof_) { - ERRMSG("Trace is truncated\n"); - assert(false); - at_eof_ = true; - } + entry_copy_ = *input_entry_; + trace_entry_t *next = read_next_entry(); + if (next == nullptr) { + VPRINT(this, 1, "Failed to read next entry\n"); + at_eof_ = true; return *this; } + // We need to pass up memrefs for the final skipped instr, but we don't + // want to process_input_entry() on the first unskipped instr so we can + // insert the timestamp+cpu first. if (cur_instr_count_ + 1 == stop_count && - type_is_instr(static_cast(input_entry_->type))) { - queue_.push(*input_entry_); - use_prev(&local_entry_); + type_is_instr(static_cast(next->type))) { + next_instr = *next; break; } + // To examine the produced memrefs we'd have to have the base reader + // expose these hidden entries. It is simpler for us to read the + // trace_entry_t directly prior to processing by the base class. + if (next->type == TRACE_TYPE_MARKER) { + if (next->size == TRACE_MARKER_TYPE_RECORD_ORDINAL) { + cur_ref_count_ = next->addr; + prev_was_record_ord = true; + VPRINT(this, 4, "Found record ordinal marker: new ord %" PRIu64 "\n", + cur_ref_count_); + } else if (next->size == TRACE_MARKER_TYPE_TIMESTAMP) { + timestamp = *next; + if (prev_was_record_ord) + --cur_ref_count_; // Invisible to ordinals. + else + found_real_timestamp = true; + } else if (next->size == TRACE_MARKER_TYPE_CPU_ID) { + cpu = *next; + if (prev_was_record_ord) + --cur_ref_count_; // Invisible to ordinals. + } else + prev_was_record_ord = false; + } else + prev_was_record_ord = false; + // Update core state. + input_entry_ = next; + process_input_entry(); + } + if (timestamp.type == TRACE_TYPE_MARKER && cpu.type == TRACE_TYPE_MARKER) { + // Insert the two markers. + if (!found_real_timestamp) { + VPRINT(this, 4, "Using duplicate timestamp\n"); + // These synthetic entries are not real records in the unskipped trace, so we + // do not associate record counts with them. + suppress_ref_count_ = 1; + if (cpu.type == TRACE_TYPE_MARKER) + ++suppress_ref_count_; + } else { + // These are not invisible but we already counted them in the loop above + // so we need to avoid a double-count. + VPRINT(this, 4, "Found real timestamp: walking back ord from %" PRIu64 "\n", + cur_ref_count_); + --cur_ref_count_; + if (cpu.type == TRACE_TYPE_MARKER) + --cur_ref_count_; + } + entry_copy_ = timestamp; + input_entry_ = &entry_copy_; process_input_entry(); + if (cpu.type == TRACE_TYPE_MARKER) + queue_.push(cpu); + queue_.push(next_instr); + } else { + // We missed the markers somehow. + VPRINT(this, 1, "Skip failed to find both timestamp and cpu\n"); + queue_.push(next_instr); + use_prev(&entry_copy_); } return *this; } diff --git a/clients/drcachesim/reader/reader.h b/clients/drcachesim/reader/reader.h index 9eb0ce6b510..25092627ff1 100644 --- a/clients/drcachesim/reader/reader.h +++ b/clients/drcachesim/reader/reader.h @@ -110,18 +110,25 @@ class reader_t : public std::iterator, virtual reader_t & operator++(); - // Skips records until "count" instruction records have been passed. - // This will skip top-level headers for a thread; it is up to the caller - // to first observe those before skipping, if needed. For interleaved-thread - // iteration, top-level headers in other threads will be skipped as well - // (but generally speaking these are identical to the initial thread). - // TODO i#5538: Add access to these header values from #memtrace_stream_t - // and document it here. - // TODO i#5538: Skipping from the middle will not always duplicate the - // last timestamp,cpu. + // Skips records until "count" instruction records have been passed. If any + // timestamp (plus cpuid) is skipped, the most recent skipped timestamp will be + // duplicated prior to the target instruction. Top-level headers will still be + // observed. This generally should call pre_skip_instructions() to observe the + // headers, perform any fast skipping, and then should call + // skip_instructions_with_timestamp() to properly duplicate the prior timestamp. virtual reader_t & skip_instructions(uint64_t instruction_count); + virtual void + pre_skip_instructions(); + + // Performs a simple walk until it sees the instruction whose ordinal is + // "stop_instruction_count" along with all of its associated records such as + // memrefs. If any timestamp (plus cpuid) is skipped, the most recent skipped + // timestamp will be duplicated prior to the target instruction. + virtual reader_t & + skip_instructions_with_timestamp(uint64_t stop_instruction_count); + // Supplied for subclasses that may fail in their constructors. virtual bool operator!() @@ -182,24 +189,21 @@ class reader_t : public std::iterator, } protected: - // This reads the next entry from the stream of entries from all threads interleaved - // in timestamp order. + // This reads the next entry from the single stream of entries (or from the + // local queue if non-empty). If it returns false it will set at_eof_ to distinguish + // end-of-file from an error. It should call read_queue() first before reading + // a new entry from the input stream. virtual trace_entry_t * read_next_entry() = 0; - // This first checks the local queue before calling read_next_entry(). - // in timestamp order. + // Returns and removes the entry (nullptr if none) from the local queue. + // This should be called by read_next_entry() prior to reading a new record + // from the input stream. virtual trace_entry_t * - read_next_entry_or_queue(); + read_queue(); // Replaces the just-read record with the prior record, supplied here. // Separated into a virtual method for overriding in test mock readers. virtual void use_prev(trace_entry_t *prev); - // This reads the next entry from the single stream of entries - // from the specified thread. If it returns false it will set *eof to distinguish - // end-of-file from an error. - virtual bool - read_next_thread_entry(size_t thread_index, OUT trace_entry_t *entry, - OUT bool *eof) = 0; // This updates internal state for the just-read input_entry_. // Returns whether a new memref record is now available. virtual bool @@ -230,7 +234,7 @@ class reader_t : public std::iterator, // We store into this queue records already read from the input but not // yet returned to the iterator. std::queue queue_; - trace_entry_t local_entry_; // For use in returning a queue entry. + trace_entry_t entry_copy_; // For use in returning a queue entry. private: struct encoding_info_t { diff --git a/clients/drcachesim/reader/snappy_file_reader.cpp b/clients/drcachesim/reader/snappy_file_reader.cpp index 58a85b7495f..180f10b137a 100644 --- a/clients/drcachesim/reader/snappy_file_reader.cpp +++ b/clients/drcachesim/reader/snappy_file_reader.cpp @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2019-2022 Google, Inc. All rights reserved. + * Copyright (c) 2019-2023 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -218,32 +218,25 @@ file_reader_t::open_single_file(const std::string &path) if (!*file) return false; VPRINT(this, 1, "Opened snappy input file %s\n", path.c_str()); - input_files_.emplace_back(file); + input_file_ = snappy_reader_t(file); return true; } template <> -bool -file_reader_t::read_next_thread_entry(size_t thread_index, - OUT trace_entry_t *entry, - OUT bool *eof) +trace_entry_t * +file_reader_t::read_next_entry() { - int len = input_files_[thread_index].read(sizeof(*entry), entry); + trace_entry_t *from_queue = read_queue(); + if (from_queue != nullptr) + return from_queue; + int len = input_file_.read(sizeof(entry_copy_), &entry_copy_); // Returns less than asked-for for end of file, or –1 for error. - if (len < (int)sizeof(*entry)) { - *eof = input_files_[thread_index].eof(); - return false; + if (len < (int)sizeof(entry_copy_)) { + at_eof_ = input_file_.eof(); + return nullptr; } - VPRINT(this, 4, "Read from thread #%zd file: type=%s (%d), size=%d, addr=%zu\n", - thread_index, trace_type_names[entry->type], entry->type, entry->size, - entry->addr); - return true; -} - -template <> -bool -file_reader_t::is_complete() -{ - // Not supported, similar to gzip reader. - return false; + VPRINT(this, 4, "Read from file: type=%s (%d), size=%d, addr=%zu\n", + trace_type_names[entry_copy_.type], entry_copy_.type, entry_copy_.size, + entry_copy_.addr); + return &entry_copy_; } diff --git a/clients/drcachesim/reader/snappy_file_reader.h b/clients/drcachesim/reader/snappy_file_reader.h index 5bcdacc1bae..63ae65670f7 100644 --- a/clients/drcachesim/reader/snappy_file_reader.h +++ b/clients/drcachesim/reader/snappy_file_reader.h @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2019-2022 Google, Inc. All rights reserved. + * Copyright (c) 2019-2023 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -51,6 +51,7 @@ class snappy_reader_t : snappy_consts_t { public: + snappy_reader_t() = default; snappy_reader_t(std::ifstream *stream); // Read 'size' bytes into the 'to'. diff --git a/clients/drcachesim/reader/zipfile_file_reader.cpp b/clients/drcachesim/reader/zipfile_file_reader.cpp index c67106850e4..4c6f07d1663 100644 --- a/clients/drcachesim/reader/zipfile_file_reader.cpp +++ b/clients/drcachesim/reader/zipfile_file_reader.cpp @@ -42,9 +42,8 @@ template <> /* clang-format on */ file_reader_t::~file_reader_t() { - for (auto &zipfile : input_files_) - unzClose(zipfile.file); - delete[] thread_eof_; + if (input_file_.file != nullptr) + unzClose(input_file_.file); } template <> @@ -54,7 +53,7 @@ file_reader_t::open_single_file(const std::string &path) unzFile file = unzOpen(path.c_str()); if (file == nullptr) return false; - input_files_.emplace_back(file); + input_file_ = zipfile_reader_t(file); if (unzGoToFirstFile(file) != UNZ_OK || unzOpenCurrentFile(file) != UNZ_OK) return false; VPRINT(this, 1, "Opened input file %s\n", path.c_str()); @@ -62,13 +61,13 @@ file_reader_t::open_single_file(const std::string &path) } template <> -bool -file_reader_t::read_next_thread_entry(size_t thread_index, - OUT trace_entry_t *entry, - OUT bool *eof) +trace_entry_t * +file_reader_t::read_next_entry() { - *eof = false; - zipfile_reader_t *zipfile = &input_files_[thread_index]; + trace_entry_t *from_queue = read_queue(); + if (from_queue != nullptr) + return from_queue; + zipfile_reader_t *zipfile = &input_file_; if (zipfile->cur_buf >= zipfile->max_buf) { int num_read = unzReadCurrentFile(zipfile->file, zipfile->buf, sizeof(zipfile->buf)); @@ -80,180 +79,107 @@ file_reader_t::read_next_thread_entry(size_t thread_index, // This call is expensive if we do it every time. unzGetCurrentFileInfo64(zipfile->file, nullptr, name, sizeof(name), nullptr, 0, nullptr, 0); - VPRINT(this, 3, - "Thread #%zd hit end of component %s; opening next component\n", - thread_index, name); + VPRINT(this, 3, "Hit end of component %s; opening next component\n", + name); } #endif - // read_next_entry() stores the last entry into entry_copy_. + // read_next_entry() stored the last-read entry into entry_copy_. if ((entry_copy_.type != TRACE_TYPE_MARKER || entry_copy_.size != TRACE_MARKER_TYPE_CHUNK_FOOTER) && entry_copy_.type != TRACE_TYPE_FOOTER) { VPRINT(this, 1, "Chunk is missing footer: truncation detected\n"); - return false; + return nullptr; } if (unzCloseCurrentFile(zipfile->file) != UNZ_OK) - return false; + return nullptr; int res = unzGoToNextFile(zipfile->file); if (res != UNZ_OK) { if (res == UNZ_END_OF_LIST_OF_FILE) { - VPRINT(this, 2, "Thread #%zd hit EOF\n", thread_index); - *eof = true; + VPRINT(this, 2, "Hit EOF\n"); + at_eof_ = true; } - return false; + return nullptr; } if (unzOpenCurrentFile(zipfile->file) != UNZ_OK) - return false; + return nullptr; num_read = unzReadCurrentFile(zipfile->file, zipfile->buf, sizeof(zipfile->buf)); } - if (num_read < static_cast(sizeof(*entry))) { - VPRINT(this, 1, "Thread #%zd failed to read: returned %d\n", thread_index, - num_read); - return false; + if (num_read < static_cast(sizeof(entry_copy_))) { + VPRINT(this, 1, "Failed to read: returned %d\n", num_read); + return nullptr; } zipfile->cur_buf = zipfile->buf; zipfile->max_buf = zipfile->buf + (num_read / sizeof(*zipfile->max_buf)); } - *entry = *zipfile->cur_buf; + entry_copy_ = *zipfile->cur_buf; ++zipfile->cur_buf; - VPRINT(this, 4, "Read from thread #%zd: type=%s (%d), size=%d, addr=%zu\n", - thread_index, trace_type_names[entry->type], entry->type, entry->size, - entry->addr); - return true; -} - -template <> -bool -file_reader_t::is_complete() -{ - // We could call unzeof() but we need the thread index. - // XXX: We should remove or change this interface. It is not used now. - return false; + VPRINT(this, 4, "Read: type=%s (%d), size=%d, addr=%zu\n", + trace_type_names[entry_copy_.type], entry_copy_.type, entry_copy_.size, + entry_copy_.addr); + return &entry_copy_; } template <> -bool -file_reader_t::skip_thread_instructions(size_t thread_index, - uint64_t instruction_count, - OUT bool *eof) +reader_t & +file_reader_t::skip_instructions(uint64_t instruction_count) { if (instruction_count == 0) - return true; - VPRINT(this, 2, "Thread #%zd skipping %" PRIi64 " instrs\n", thread_index, - instruction_count); - trace_entry_t timestamp = {}; - trace_entry_t cpu = {}; - const memref_t &memref = **this; - if (memref.marker.type == TRACE_TYPE_MARKER && - memref.marker.marker_type == TRACE_MARKER_TYPE_TIMESTAMP) { - timestamp = entry_copy_; + return *this; + VPRINT(this, 2, "Skipping %" PRIi64 " instrs\n", instruction_count); + pre_skip_instructions(); + if (chunk_instr_count_ == 0) { + VPRINT(this, 1, "Failed to record chunk instr count\n"); + at_eof_ = true; + return *this; } - zipfile_reader_t *zipfile = &input_files_[thread_index]; + zipfile_reader_t *zipfile = &input_file_; // We assume our unzGoToNextFile loop is plenty performant and we don't need to // know the chunk names to use with a single unzLocateFile. - uint64_t stop_count_ = cur_instr_count_ + instruction_count + 1; + uint64_t stop_count = cur_instr_count_ + instruction_count + 1; VPRINT(this, 2, "stop=%" PRIi64 " cur=%" PRIi64 " chunk=%" PRIi64 " est=%" PRIi64 "\n", - stop_count_, cur_instr_count_, chunk_instr_count_, + stop_count, cur_instr_count_, chunk_instr_count_, cur_instr_count_ + (chunk_instr_count_ - (cur_instr_count_ % chunk_instr_count_))); // First, quickly skip over chunks to reach the chunk containing the target. while (cur_instr_count_ + (chunk_instr_count_ - (cur_instr_count_ % chunk_instr_count_)) < - stop_count_) { - if (unzCloseCurrentFile(zipfile->file) != UNZ_OK) - return false; + stop_count) { + if (unzCloseCurrentFile(zipfile->file) != UNZ_OK) { + VPRINT(this, 1, "Failed to close zip subfile\n"); + at_eof_ = true; + return *this; + } int res = unzGoToNextFile(zipfile->file); if (res != UNZ_OK) { - if (res == UNZ_END_OF_LIST_OF_FILE) { - VPRINT(this, 2, "Thread #%zd hit EOF\n", thread_index); - *eof = true; - } - return false; + if (res == UNZ_END_OF_LIST_OF_FILE) + VPRINT(this, 2, "Hit EOF\n"); + else + VPRINT(this, 2, "Failed to go to next zip subfile\n"); + at_eof_ = true; + return *this; + } + if (unzOpenCurrentFile(zipfile->file) != UNZ_OK) { + VPRINT(this, 1, "Failed to open zip subfile\n"); + at_eof_ = true; + return *this; } - if (unzOpenCurrentFile(zipfile->file) != UNZ_OK) - return false; cur_instr_count_ += chunk_instr_count_ - (cur_instr_count_ % chunk_instr_count_); - VPRINT(this, 2, "Thread #%zd at %" PRIi64 " instrs at start of new chunk\n", - thread_index, cur_instr_count_); + VPRINT(this, 2, "At %" PRIi64 " instrs at start of new chunk\n", + cur_instr_count_); VPRINT(this, 2, "zip chunk stop=%" PRIi64 " cur=%" PRIi64 " chunk=%" PRIi64 " end-of-chunk=%" PRIi64 "\n", - stop_count_, cur_instr_count_, chunk_instr_count_, + stop_count, cur_instr_count_, chunk_instr_count_, cur_instr_count_ + (chunk_instr_count_ - (cur_instr_count_ % chunk_instr_count_))); // Clear cached data from the prior chunk. zipfile->cur_buf = zipfile->max_buf; } - // We have to linearly walk the last mile. - bool prev_was_record_ord = false; - bool found_real_timestamp = false; - while (cur_instr_count_ < stop_count_) { // End condition is never reached. - if (!read_next_thread_entry(thread_index, &entry_copy_, eof)) - return false; - // We need to pass up memrefs for the final skipped instr, but we don't - // want to process_input_entry() on the first unskipped instr so we can - // insert the timestamp+cpu first. - if (cur_instr_count_ + 1 == stop_count_ && - type_is_instr(static_cast(entry_copy_.type))) - break; - // To examine the produced memrefs we'd have to have the base reader - // expose these hidden entries. It is simpler for us to read the - // trace_entry_t directly prior to processing by the base class. - if (entry_copy_.type == TRACE_TYPE_MARKER) { - if (entry_copy_.size == TRACE_MARKER_TYPE_RECORD_ORDINAL) { - cur_ref_count_ = entry_copy_.addr; - prev_was_record_ord = true; - VPRINT(this, 4, "Found record ordinal marker: new ord %" PRIu64 "\n", - cur_ref_count_); - } else if (entry_copy_.size == TRACE_MARKER_TYPE_TIMESTAMP) { - timestamp = entry_copy_; - if (prev_was_record_ord) - --cur_ref_count_; // Invisible to ordinals. - else - found_real_timestamp = true; - } else if (entry_copy_.size == TRACE_MARKER_TYPE_CPU_ID) { - cpu = entry_copy_; - if (prev_was_record_ord) - --cur_ref_count_; // Invisible to ordinals. - } else - prev_was_record_ord = false; - } else - prev_was_record_ord = false; - // Update core state. - input_entry_ = &entry_copy_; - process_input_entry(); - } - if (timestamp.type == TRACE_TYPE_MARKER && cpu.type == TRACE_TYPE_MARKER) { - // Insert the two markers. - trace_entry_t instr = entry_copy_; - entry_copy_ = timestamp; - if (!found_real_timestamp) { - // These synthetic entries are not real records in the unskipped trace, so we - // do not associate record counts with them. - suppress_ref_count_ = 1; - if (cpu.type == TRACE_TYPE_MARKER) - ++suppress_ref_count_; - } else { - // These are not invisible but we already counted them in the loop above - // so we need to avoid a double-count. - VPRINT(this, 4, "Found real timestamp: walking back ord from %" PRIu64 "\n", - cur_ref_count_); - --cur_ref_count_; - if (cpu.type == TRACE_TYPE_MARKER) - --cur_ref_count_; - } - process_input_entry(); - if (cpu.type == TRACE_TYPE_MARKER) - queues_[thread_index].push(cpu); - queues_[thread_index].push(instr); - } else { - // We missed the markers somehow; fall back to just process the instr. - // TODO i#5538: For skipping from the middle we need to have the - // base reader cache the last timestamp,cpu. - VPRINT(this, 1, "Skip failed to find both timestamp and cpu\n"); - process_input_entry(); - } - return true; + // Now do a linear walk the rest of the way, remembering timestamps (we have + // duplicated timestamps at the start of the chunk to cover any skipped in + // the fast chunk jumps we just did). + // Subtract 1 to pass the target instr itself. + return skip_instructions_with_timestamp(stop_count - 1); } diff --git a/clients/drcachesim/reader/zipfile_file_reader.h b/clients/drcachesim/reader/zipfile_file_reader.h index 3caea97082f..8f318af09f5 100644 --- a/clients/drcachesim/reader/zipfile_file_reader.h +++ b/clients/drcachesim/reader/zipfile_file_reader.h @@ -1,5 +1,5 @@ /* ********************************************************** - * Copyright (c) 2017-2022 Google, Inc. All rights reserved. + * Copyright (c) 2017-2023 Google, Inc. All rights reserved. * **********************************************************/ /* @@ -40,6 +40,10 @@ #include "file_reader.h" struct zipfile_reader_t { + zipfile_reader_t() + : file(nullptr) + { + } explicit zipfile_reader_t(unzFile file) : file(file) { @@ -59,9 +63,7 @@ typedef file_reader_t zipfile_file_reader_t; * class declaration. */ template <> -bool -file_reader_t::skip_thread_instructions(size_t thread_index, - uint64_t instruction_count, - OUT bool *eof); +reader_t & +file_reader_t::skip_instructions(uint64_t instruction_count); #endif /* _ZIPFILE_FILE_READER_H_ */ diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index 58368903594..c8c785b7e9d 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -90,6 +90,9 @@ class mock_reader_t : public reader_t { trace_entry_t * read_next_entry() override { + trace_entry_t *entry = read_queue(); + if (entry != nullptr) + return entry; // We need this to work just well enough for reader_t::skip_instructions // to identify instr entries. ++index_; @@ -106,12 +109,6 @@ class mock_reader_t : public reader_t { --index_; queue_.pop(); } - bool - read_next_thread_entry(size_t thread_index, OUT trace_entry_t *entry, - OUT bool *eof) override - { - return true; - } std::string get_stream_name() const override { diff --git a/clients/drcachesim/tests/view_test.cpp b/clients/drcachesim/tests/view_test.cpp index 86e5bf95b60..49bf64e7857 100644 --- a/clients/drcachesim/tests/view_test.cpp +++ b/clients/drcachesim/tests/view_test.cpp @@ -42,6 +42,7 @@ #include "../common/memref.h" #include "../tracer/raw2trace.h" #include "../reader/file_reader.h" +#include "../scheduler/scheduler.h" #include "memref_gen.h" #undef ASSERT @@ -61,19 +62,14 @@ } \ } while (0) +using namespace dynamorio::drmemtrace; + // These are for our mock serial reader and must be in the same namespace // as file_reader_t's declaration. template <> file_reader_t>::~file_reader_t() { } -template <> -bool -file_reader_t>::is_complete() -{ - return false; -} - template <> bool file_reader_t>::open_single_file(const std::string &path) @@ -82,11 +78,10 @@ file_reader_t>::open_single_file(const std::string &p } template <> -bool -file_reader_t>::read_next_thread_entry( - size_t thread_index, OUT trace_entry_t *entry, OUT bool *eof) +trace_entry_t * +file_reader_t>::read_next_entry() { - return false; + return nullptr; } namespace { @@ -415,48 +410,55 @@ run_limit_tests(void *drcontext) } /*************************************************************************** - * Serial reader mock. + * File reader mock. */ class mock_file_reader_t : public file_reader_t> { public: mock_file_reader_t() { + at_eof_ = true; } - mock_file_reader_t(const std::vector> &entries) - : file_reader_t(std::vector(1, "non-empty")) + mock_file_reader_t(const std::vector &entries) { - input_files_ = entries; - pos_.resize(input_files_.size(), 0); - init(); + input_file_ = entries; + pos_ = 0; } - bool - read_next_thread_entry(size_t thread_index, OUT trace_entry_t *entry, - OUT bool *eof) override + trace_entry_t * + read_next_entry() override { - if (pos_[thread_index] >= input_files_[thread_index].size()) { - *eof = true; - return false; + if (at_eof_) + return nullptr; + trace_entry_t *entry = read_queue(); + if (entry != nullptr) + return entry; + if (pos_ >= input_file_.size()) { + at_eof_ = true; + return nullptr; } - *entry = input_files_[thread_index][pos_[thread_index]]; - ++pos_[thread_index]; - return true; + entry = &input_file_[pos_]; + ++pos_; + return entry; } private: - std::vector pos_; + size_t pos_; }; std::string run_serial_test_helper(view_t &view, - const std::vector> &entries) + const std::vector> &entries, + const std::vector &tids) + { class local_stream_t : public default_memtrace_stream_t { public: local_stream_t(view_t &view, - const std::vector> &entries) + const std::vector> &entries, + const std::vector &tids) : view_(view) , entries_(entries) + , tids_(tids) { } @@ -468,10 +470,27 @@ run_serial_test_helper(view_t &view, std::stringstream capture; std::streambuf *prior = std::cerr.rdbuf(capture.rdbuf()); // Run the tool. - mock_file_reader_t serial(entries_); - mock_file_reader_t end; - for (; serial != end; ++serial) { - const memref_t memref = *serial; + std::vector readers; + for (size_t i = 0; i < entries_.size(); i++) { + readers.emplace_back( + std::unique_ptr( + new mock_file_reader_t(entries_[i])), + std::unique_ptr(new mock_file_reader_t()), + tids_[i]); + } + scheduler_t scheduler; + std::vector sched_inputs; + sched_inputs.emplace_back(std::move(readers)); + if (scheduler.init(sched_inputs, 1, + scheduler_t::make_scheduler_serial_options()) != + scheduler_t::STATUS_SUCCESS) + assert(false); + auto *stream = scheduler.get_stream(0); + memref_t memref; + for (scheduler_t::stream_status_t status = stream->next_record(memref); + status != scheduler_t::STATUS_EOF; + status = stream->next_record(memref)) { + assert(status == scheduler_t::STATUS_OK); ++ref_count_; if (type_is_instr(memref.instr.type)) ++instr_count_; @@ -498,11 +517,12 @@ run_serial_test_helper(view_t &view, private: view_t &view_; const std::vector> &entries_; + const std::vector &tids_; uint64_t ref_count_ = 0; uint64_t instr_count_ = 0; }; - local_stream_t stream(view, entries); + local_stream_t stream(view, entries, tids); return stream.run(); } @@ -512,6 +532,7 @@ bool run_single_thread_chunk_test(void *drcontext) { const memref_tid_t t1 = 3; + std::vector tids = { t1 }; std::vector> entries = { { { TRACE_TYPE_HEADER, 0, { 0x1 } }, { TRACE_TYPE_MARKER, TRACE_MARKER_TYPE_VERSION, { 3 } }, @@ -544,7 +565,7 @@ run_single_thread_chunk_test(void *drcontext) )DELIM"; instrlist_t *ilist_unused = nullptr; view_nomod_test_t view(drcontext, *ilist_unused, 0, 0, 0); - std::string res = run_serial_test_helper(view, entries); + std::string res = run_serial_test_helper(view, entries, tids); // Make 64-bit match our 32-bit expect string. res = std::regex_replace(res, std::regex("0x000000000000002a"), "0x0000002a"); if (res != expect) { @@ -559,9 +580,10 @@ run_serial_chunk_test(void *drcontext) { // We ensure headers are not omitted incorrectly, which they were // in the first implementation of the reader skipping dup headers: - // i#/5538#issuecomment-1407235283 + // i#5538#issuecomment-1407235283 const memref_tid_t t1 = 3; const memref_tid_t t2 = 7; + std::vector tids = { t1, t2 }; std::vector> entries = { { { TRACE_TYPE_HEADER, 0, { 0x1 } }, @@ -601,17 +623,15 @@ run_serial_chunk_test(void *drcontext) 2 0: 3 3 0: 3 4 0: 3 + 5 0: 3 + 6 0: 3 + 7 1: 3 ifetch 4 byte(s) @ 0x0000002a non-branch + 8 2: 3 ifetch 4 byte(s) @ 0x0000002a non-branch ------------------------------------------------------------ - 5 0: 7 - 6 0: 7 - 7 0: 7 - 8 0: 7 ------------------------------------------------------------- - 9 0: 3 - 10 0: 3 - 11 1: 3 ifetch 4 byte(s) @ 0x0000002a non-branch - 12 2: 3 ifetch 4 byte(s) @ 0x0000002a non-branch ------------------------------------------------------------- + 9 2: 7 + 10 2: 7 + 11 2: 7 + 12 2: 7 13 2: 7 14 2: 7 15 3: 7 ifetch 4 byte(s) @ 0x0000002a non-branch @@ -627,7 +647,7 @@ run_serial_chunk_test(void *drcontext) )DELIM"; instrlist_t *ilist_unused = nullptr; view_nomod_test_t view(drcontext, *ilist_unused, 0, 0, 0); - std::string res = run_serial_test_helper(view, entries); + std::string res = run_serial_test_helper(view, entries, tids); // Make 64-bit match our 32-bit expect string. res = std::regex_replace(res, std::regex("0x000000000000002a"), "0x0000002a"); if (res != expect) { diff --git a/clients/drcachesim/tracer/raw2trace.h b/clients/drcachesim/tracer/raw2trace.h index 01f2163a3a3..06d2d761bd5 100644 --- a/clients/drcachesim/tracer/raw2trace.h +++ b/clients/drcachesim/tracer/raw2trace.h @@ -1743,12 +1743,6 @@ class memref_counter_t : public reader_t { { return nullptr; }; - bool - read_next_thread_entry(size_t thread_index, OUT trace_entry_t *entry, - OUT bool *eof) override - { - return false; - } std::string get_stream_name() const override { @@ -1757,7 +1751,7 @@ class memref_counter_t : public reader_t { int entry_memref_count(const trace_entry_t *entry) { - // Mirror file_reader_t::open_input_files(). + // Mirror file_reader_t::open_input_file(). // In particular, we need to skip TRACE_TYPE_HEADER and to pass the // tid and pid to the reader before the 2 markers in front of them. if (!saw_pid_) {