Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6685 core-shard: Add only_shards drmemtrace filter #6925

Merged
merged 4 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <iostream>
#include <limits>
#include <memory>
#include <set>
#include <string>
#include <thread>
#include <utility>
Expand Down Expand Up @@ -228,7 +229,8 @@ analyzer_tmpl_t<RecordType, ReaderType>::analyzer_tmpl_t()
template <typename RecordType, typename ReaderType>
bool
analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
const std::string &trace_path, memref_tid_t only_thread, int verbosity,
const std::string &trace_path, const std::set<memref_tid_t> &only_threads,
const std::set<int> &only_shards, int verbosity,
typename sched_type_t::scheduler_options_t options)
{
verbosity_ = verbosity;
Expand All @@ -245,9 +247,8 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
regions.emplace_back(skip_instrs_ + 1, 0);
}
typename sched_type_t::input_workload_t workload(trace_path, regions);
if (only_thread != INVALID_THREAD_ID) {
workload.only_threads.insert(only_thread);
}
workload.only_threads = only_threads;
workload.only_shards = only_shards;
if (regions.empty() && skip_to_timestamp_ > 0) {
workload.times_of_interest.emplace_back(skip_to_timestamp_, 0);
}
Expand Down Expand Up @@ -369,7 +370,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::analyzer_tmpl_t(
// The scheduler will call reader_t::init() for each input file. We assume
// that won't block (analyzer_multi_t separates out IPC readers).
typename sched_type_t::scheduler_options_t sched_ops;
if (!init_scheduler(trace_path, INVALID_THREAD_ID, verbosity, std::move(sched_ops))) {
if (!init_scheduler(trace_path, {}, {}, verbosity, std::move(sched_ops))) {
success_ = false;
error_string_ = "Failed to create scheduler";
return;
Expand Down
7 changes: 5 additions & 2 deletions clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

#include <iterator>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
Expand Down Expand Up @@ -216,9 +217,11 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
operator=(const analyzer_worker_data_t &) = delete;
};

// Pass INVALID_THREAD_ID for only_thread to include all threads.
bool
derekbruening marked this conversation as resolved.
Show resolved Hide resolved
init_scheduler(const std::string &trace_path, memref_tid_t only_thread, int verbosity,
init_scheduler(const std::string &trace_path,
// To include all threads/shards, use empty sets.
const std::set<memref_tid_t> &only_threads,
const std::set<int> &only_shards, int verbosity,
typename sched_type_t::scheduler_options_t options);

// For core-sharded, worker_count_ must be set prior to calling this; for parallel
Expand Down
44 changes: 40 additions & 4 deletions clients/drcachesim/analyzer_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,34 @@ record_analyzer_multi_t::create_analysis_tool_from_options(const std::string &to
* Other analyzer_multi_tmpl_t routines that do not need to be specialized.
*/

template <typename RecordType, typename ReaderType>
std::string
analyzer_multi_tmpl_t<RecordType, ReaderType>::set_input_limit(
std::set<memref_tid_t> &only_threads, std::set<int> &only_shards)
{
bool valid_limit = true;
if (op_only_thread.get_value() != 0) {
if (!op_only_threads.get_value().empty() || !op_only_shards.get_value().empty())
valid_limit = false;
only_threads.insert(op_only_thread.get_value());
} else if (!op_only_threads.get_value().empty()) {
if (!op_only_shards.get_value().empty())
valid_limit = false;
std::vector<std::string> tids = split_by(op_only_threads.get_value(), ",");
for (const std::string &tid : tids) {
only_threads.insert(strtol(tid.c_str(), nullptr, 10));
}
} else if (!op_only_shards.get_value().empty()) {
std::vector<std::string> tids = split_by(op_only_shards.get_value(), ",");
for (const std::string &tid : tids) {
only_shards.insert(strtol(tid.c_str(), nullptr, 10));
}
}
if (!valid_limit)
return "Only one of -only_thread, -only_threads, and -only_shards can be set.";
return "";
}

template <typename RecordType, typename ReaderType>
analyzer_multi_tmpl_t<RecordType, ReaderType>::analyzer_multi_tmpl_t()
{
Expand Down Expand Up @@ -457,7 +485,16 @@ analyzer_multi_tmpl_t<RecordType, ReaderType>::analyzer_multi_tmpl_t()
if (!op_indir.get_value().empty()) {
std::string tracedir =
raw2trace_directory_t::tracedir_from_rawdir(op_indir.get_value());
if (!this->init_scheduler(tracedir, op_only_thread.get_value(),

std::set<memref_tid_t> only_threads;
std::set<int> only_shards;
std::string res = set_input_limit(only_threads, only_shards);
if (!res.empty()) {
this->success_ = false;
this->error_string_ = res;
return;
}
if (!this->init_scheduler(tracedir, only_threads, only_shards,
op_verbose.get_value(), std::move(sched_ops)))
this->success_ = false;
} else if (op_infile.get_value().empty()) {
Expand All @@ -479,9 +516,8 @@ analyzer_multi_tmpl_t<RecordType, ReaderType>::analyzer_multi_tmpl_t()
}
} else {
// Legacy file.
if (!this->init_scheduler(op_infile.get_value(),
INVALID_THREAD_ID /*all threads*/,
op_verbose.get_value(), std::move(sched_ops)))
if (!this->init_scheduler(op_infile.get_value(), {}, {}, op_verbose.get_value(),
std::move(sched_ops)))
this->success_ = false;
}
if (!init_analysis_tools()) {
Expand Down
3 changes: 3 additions & 0 deletions clients/drcachesim/analyzer_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class analyzer_multi_tmpl_t : public analyzer_tmpl_t<RecordType, ReaderType> {
cache_simulator_knobs_t *
get_cache_simulator_knobs();

std::string
set_input_limit(std::set<memref_tid_t> &only_threads, std::set<int> &only_shards);

std::unique_ptr<std::istream> serial_schedule_file_;
// This is read in a single stream by invariant_checker and so is not
// an archive_istream_t.
Expand Down
20 changes: 19 additions & 1 deletion clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,25 @@ droption_t<int>
op_only_thread(DROPTION_SCOPE_FRONTEND, "only_thread", 0,
"Only analyze this thread (0 means all)",
"Limits analyis to the single "
"thread with the given identifier. 0 enables all threads.");
"thread with the given identifier. 0 enables all threads. "
"Applies only to -indir, not to -infile. "
"Cannot be combined with -only_threads or -only_shards.");

droption_t<std::string>
op_only_threads(DROPTION_SCOPE_FRONTEND, "only_threads", "",
"Only analyze these comma-separated threads",
"Limits analyis to the list of comma-separated thread ids. "
"Applies only to -indir, not to -infile. "
"Cannot be combined with -only_thread or -only_shards.");
droption_t<std::string>
op_only_shards(DROPTION_SCOPE_FRONTEND, "only_shards", "",
"Only analyze these comma-separated shard ordinals",
"Limits analyis to the list of comma-separated shard ordinals. "
"A shard is typically an input thread but might be a core for "
"core-sharded-on-disk traces. The ordinal is 0-based and indexes "
"into the sorted order of input filenames. "
"Applies only to -indir, not to -infile. "
"Cannot be combined with -only_thread or -only_threads.");

droption_t<bytesize_t> op_skip_instrs(
DROPTION_SCOPE_FRONTEND, "skip_instrs", 0, "Number of instructions to skip",
Expand Down
2 changes: 2 additions & 0 deletions clients/drcachesim/common/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ extern dynamorio::droption::droption_t<dynamorio::droption::bytesize_t>
extern dynamorio::droption::droption_t<dynamorio::droption::bytesize_t>
op_interval_instr_count;
extern dynamorio::droption::droption_t<int> op_only_thread;
extern dynamorio::droption::droption_t<std::string> op_only_threads;
extern dynamorio::droption::droption_t<std::string> op_only_shards;
extern dynamorio::droption::droption_t<dynamorio::droption::bytesize_t> op_skip_instrs;
extern dynamorio::droption::droption_t<dynamorio::droption::bytesize_t> op_skip_refs;
extern dynamorio::droption::droption_t<uint64_t> op_skip_to_timestamp;
Expand Down
106 changes: 83 additions & 23 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,33 @@ scheduler_tmpl_t<RecordType, ReaderType>::stream_t::set_active(bool active)
* Scheduler.
*/

template <typename RecordType, typename ReaderType>
bool
scheduler_tmpl_t<RecordType, ReaderType>::check_valid_input_limits(
const input_workload_t &workload, input_reader_info_t &reader_info)
{
if (!workload.only_shards.empty()) {
for (input_ordinal_t ord : workload.only_shards) {
if (ord < 0 || ord >= static_cast<input_ordinal_t>(reader_info.input_count)) {
error_string_ = "only_shards entry " + std::to_string(ord) +
" out of bounds for a shard ordinal";
return false;
}
}
}
if (!workload.only_threads.empty()) {
for (memref_tid_t tid : workload.only_threads) {
if (reader_info.unfiltered_tids.find(tid) ==
reader_info.unfiltered_tids.end()) {
error_string_ = "only_threads entry " + std::to_string(tid) +
" not found in workload inputs";
return false;
}
}
}
return true;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_tmpl_t<RecordType, ReaderType>::init(
Expand All @@ -679,16 +706,26 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
auto &workload = workload_inputs[workload_idx];
if (workload.struct_size != sizeof(input_workload_t))
return STATUS_ERROR_INVALID_PARAMETER;
std::unordered_map<memref_tid_t, int> workload_tids;
if (!workload.only_threads.empty() && !workload.only_shards.empty())
return STATUS_ERROR_INVALID_PARAMETER;
input_reader_info_t reader_info;
reader_info.only_threads = workload.only_threads;
reader_info.only_shards = workload.only_shards;
if (workload.path.empty()) {
if (workload.readers.empty())
return STATUS_ERROR_INVALID_PARAMETER;
for (auto &reader : workload.readers) {
reader_info.input_count = workload.readers.size();
for (int i = 0; i < static_cast<int>(workload.readers.size()); ++i) {
auto &reader = workload.readers[i];
if (!reader.reader || !reader.end)
return STATUS_ERROR_INVALID_PARAMETER;
reader_info.unfiltered_tids.insert(reader.tid);
if (!workload.only_threads.empty() &&
workload.only_threads.find(reader.tid) == workload.only_threads.end())
continue;
if (!workload.only_shards.empty() &&
workload.only_shards.find(i) == workload.only_shards.end())
continue;
int index = static_cast<input_ordinal_t>(inputs_.size());
inputs_.emplace_back();
input_info_t &input = inputs_.back();
Expand All @@ -699,22 +736,24 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
input.reader = std::move(reader.reader);
input.reader_end = std::move(reader.end);
input.needs_init = true;
workload_tids[input.tid] = input.index;
reader_info.tid2input[input.tid] = input.index;
tid2input_[workload_tid_t(workload_idx, input.tid)] = index;
}
} else {
if (!workload.readers.empty())
return STATUS_ERROR_INVALID_PARAMETER;
sched_type_t::scheduler_status_t res =
open_readers(workload.path, workload.only_threads, workload_tids);
open_readers(workload.path, reader_info);
if (res != STATUS_SUCCESS)
return res;
for (const auto &it : workload_tids) {
for (const auto &it : reader_info.tid2input) {
inputs_[it.second].workload = workload_idx;
workload2inputs[workload_idx].push_back(it.second);
tid2input_[workload_tid_t(workload_idx, it.first)] = it.second;
}
}
if (!check_valid_input_limits(workload, reader_info))
return STATUS_ERROR_INVALID_PARAMETER;
if (!workload.times_of_interest.empty()) {
for (const auto &modifiers : workload.thread_modifiers) {
if (!modifiers.regions_of_interest.empty()) {
Expand All @@ -723,7 +762,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
}
}
sched_type_t::scheduler_status_t status =
create_regions_from_times(workload_tids, workload);
create_regions_from_times(reader_info.tid2input, workload);
if (status != sched_type_t::STATUS_SUCCESS)
return STATUS_ERROR_INVALID_PARAMETER;
}
Expand All @@ -734,7 +773,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
std::vector<memref_tid_t> workload_tid_vector;
if (modifiers.tids.empty()) {
// Apply to all tids that have not already been modified.
for (const auto entry : workload_tids) {
for (const auto entry : reader_info.tid2input) {
if (!inputs_[entry.second].has_modifier)
workload_tid_vector.push_back(entry.first);
}
Expand All @@ -744,9 +783,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
// We assume the overhead of copying the modifiers for every thread is
// not high and the simplified code is worthwhile.
for (memref_tid_t tid : *which_tids) {
if (workload_tids.find(tid) == workload_tids.end())
if (reader_info.tid2input.find(tid) == reader_info.tid2input.end())
return STATUS_ERROR_INVALID_PARAMETER;
int index = workload_tids[tid];
int index = reader_info.tid2input[tid];
input_info_t &input = inputs_[index];
input.has_modifier = true;
input.binding = modifiers.output_binding;
Expand Down Expand Up @@ -1788,9 +1827,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_initial_input_content(

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_tmpl_t<RecordType, ReaderType>::open_reader(
const std::string &path, const std::set<memref_tid_t> &only_threads,
std::unordered_map<memref_tid_t, int> &workload_tids)
scheduler_tmpl_t<RecordType, ReaderType>::open_reader(const std::string &path,
input_ordinal_t input_ordinal,
input_reader_info_t &reader_info)
{
if (path.empty() || directory_iterator_t::is_directory(path))
return STATUS_ERROR_INVALID_PARAMETER;
Expand All @@ -1803,10 +1842,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_reader(
inputs_.emplace_back();
input_info_t &input = inputs_.back();
input.index = index;
// We need the tid up front. Rather than assume it's still part of the filename, we
// read the first record (we generalize to read until we find the first but we
// We need the tid up front. Rather than assume it's still part of the filename,
// we read the first record (we generalize to read until we find the first but we
// expect it to be the first after PR #5739 changed the order file_reader_t passes
// them to reader_t) to find it.
// XXX: For core-sharded-on-disk traces, this tid is just the first one for
// this core; it would be better to read the filetype and not match any tid
// for such files? Should we call get_initial_input_content() to do that?
std::unique_ptr<ReaderType> reader_end = get_default_reader();
memref_tid_t tid = INVALID_THREAD_ID;
while (*reader != *reader_end) {
Expand All @@ -1820,32 +1862,44 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_reader(
error_string_ = "Failed to read " + path;
return STATUS_ERROR_FILE_READ_FAILED;
}
if (!only_threads.empty() && only_threads.find(tid) == only_threads.end()) {
// For core-sharded inputs that start idle the tid might be IDLE_THREAD_ID.
// That means the size of unfiltered_tids will not be the total input
// size, which is why we have a separate input_count.
reader_info.unfiltered_tids.insert(tid);
++reader_info.input_count;
if (!reader_info.only_threads.empty() &&
reader_info.only_threads.find(tid) == reader_info.only_threads.end()) {
inputs_.pop_back();
return sched_type_t::STATUS_SUCCESS;
}
if (!reader_info.only_shards.empty() &&
reader_info.only_shards.find(input_ordinal) == reader_info.only_shards.end()) {
inputs_.pop_back();
return sched_type_t::STATUS_SUCCESS;
}
VPRINT(this, 1, "Opened reader for tid %" PRId64 " %s\n", tid, path.c_str());
input.tid = tid;
input.reader = std::move(reader);
input.reader_end = std::move(reader_end);
workload_tids[tid] = index;
reader_info.tid2input[tid] = index;
return sched_type_t::STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_tmpl_t<RecordType, ReaderType>::open_readers(
const std::string &path, const std::set<memref_tid_t> &only_threads,
std::unordered_map<memref_tid_t, int> &workload_tids)
scheduler_tmpl_t<RecordType, ReaderType>::open_readers(const std::string &path,
input_reader_info_t &reader_info)
{
if (!directory_iterator_t::is_directory(path))
return open_reader(path, only_threads, workload_tids);
if (!directory_iterator_t::is_directory(path)) {
return open_reader(path, 0, reader_info);
}
directory_iterator_t end;
directory_iterator_t iter(path);
if (!iter) {
error_string_ = "Failed to list directory " + path + ": " + iter.error_string();
return sched_type_t::STATUS_ERROR_FILE_OPEN_FAILED;
}
std::vector<std::string> files;
for (; iter != end; ++iter) {
const std::string fname = *iter;
if (fname == "." || fname == ".." ||
Expand All @@ -1858,8 +1912,14 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_readers(
fname == DRMEMTRACE_ENCODING_FILENAME)
continue;
const std::string file = path + DIRSEP + fname;
sched_type_t::scheduler_status_t res =
open_reader(file, only_threads, workload_tids);
files.push_back(file);
}
// Sort so we can have reliable shard ordinals for only_shards.
// We assume leading 0's are used for important numbers embedded in the path,
// so that a regular sort keeps numeric order.
std::sort(files.begin(), files.end());
for (int i = 0; i < static_cast<int>(files.size()); ++i) {
sched_type_t::scheduler_status_t res = open_reader(files[i], i, reader_info);
if (res != sched_type_t::STATUS_SUCCESS)
return res;
}
Expand Down
Loading
Loading