Skip to content

Commit

Permalink
On-demand profiling enhance
Browse files Browse the repository at this point in the history
1. dyno gputrace cmd add ACTIVITY_TYPES / VERBOSE_LOG_LEVEL / ACTIVITIES_WARMUP_PERIOD_SECS params set for kineto
2. add dyno gputrace-state cmd
3. add dyno gputrace-child-pids cmd
  • Loading branch information
qiaoning-mt committed Aug 6, 2024
1 parent 73cec3b commit d48d86f
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 4 deletions.
55 changes: 54 additions & 1 deletion cli/src/commands/gputrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ pub fn run_gputrace(
profile_start_time: u64,
profile_start_iteration_roundup: u64,
process_limit: u32,
activities: &str,
verbose_log_level: i64,
activities_warmup_period_secs: u64,

) -> Result<()> {
let trigger_config = if iterations > 0 {
format!(
Expand All @@ -37,7 +41,8 @@ pub fn run_gputrace(
)
};

let kineto_config = format!(r#"ACTIVITIES_LOG_FILE={}\n{}"#, log_file, trigger_config);
let kineto_config = format!(r#"ACTIVITIES_LOG_FILE={}\nACTIVITY_TYPES={}\nVERBOSE_LOG_LEVEL={}\nACTIVITIES_WARMUP_PERIOD_SECS={}\n{}"#,
log_file, activities, verbose_log_level, activities_warmup_period_secs, trigger_config);

println!("Kineto config = \n{}", kineto_config);

Expand Down Expand Up @@ -79,3 +84,51 @@ pub fn run_gputrace(

Ok(())
}

/// GputraceState command get GPU profiling state on pytorch apps
pub fn run_gputrace_state(
client: TcpStream,
job_id: u64,
pids: &str,
) -> Result<()> {
let request_json = format!(
r#"
{{
"fn": "getKinetOnDemandProfilingStateRequest",
"job_id": {},
"pids": [{}]
}}"#,
job_id, pids
);

utils::send_msg(&client, &request_json).expect("Error sending message to service");

let resp_str = utils::get_resp(&client).expect("Unable to decode output bytes");

println!("response = {}", resp_str);

Ok(())
}

/// GetGpuTraceChildPids command get GPU profiling child pids on pytorch apps
pub fn get_gputrace_child_pids(
client: TcpStream,
job_id: u64,
) -> Result<()> {
let request_json = format!(
r#"
{{
"fn": "getKinetOnDemandProfilingChildPidsRequest",
"job_id": {}
}}"#,
job_id
);

utils::send_msg(&client, &request_json).expect("Error sending message to service");

let resp_str = utils::get_resp(&client).expect("Unable to decode output bytes");

println!("response = {}", resp_str);

Ok(())
}
45 changes: 45 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,31 @@ enum Command {
/// Max number of processes to profile
#[clap(long, default_value_t = 3)]
process_limit: u32,
/// List of activities to capture trace for (comma separated).
/// if not set, default: cpu_op,user_annotation,gpu_user_annotation,gpu_memcpy,gpu_memset,kernel,external_correlation,musa_runtime,musa_driver,cpu_instant_event,python_function,overhead
#[clap(long, default_value = "")]
activities: String,
/// Verbose log level.
#[clap(long, default_value_t = -1)]
verbose_log_level: i64,
/// Activities warmup period secs.
#[clap(long, default_value_t = 0)]
activities_warmup_period_secs: u64,
},
/// Capture gputrace state
GputraceState {
/// Job id of the application to trace
#[clap(long, default_value_t = 0)]
job_id: u64,
/// List of pids to capture trace for (comma separated).
#[clap(long, default_value = "0")]
pids: String,
},
/// Capture gputrace child pids
GputraceChildPids {
/// Job id of the application to trace
#[clap(long, default_value_t = 0)]
job_id: u64,
},
/// Pause dcgm profiling. This enables running tools like Nsight compute and avoids conflicts.
DcgmPause {
Expand Down Expand Up @@ -116,6 +141,9 @@ fn main() -> Result<()> {
profile_start_time,
profile_start_iteration_roundup,
process_limit,
activities,
verbose_log_level,
activities_warmup_period_secs,
} => gputrace::run_gputrace(
dyno_client,
job_id,
Expand All @@ -126,6 +154,23 @@ fn main() -> Result<()> {
profile_start_time,
profile_start_iteration_roundup,
process_limit,
&activities,
verbose_log_level,
activities_warmup_period_secs,
),
Command::GputraceState {
job_id,
pids,
} => gputrace::run_gputrace_state(
dyno_client,
job_id,
&pids,
),
Command::GputraceChildPids {
job_id,
} => gputrace::get_gputrace_child_pids(
dyno_client,
job_id,
),
Command::DcgmPause { duration_s } => dcgm::run_dcgm_pause(dyno_client, duration_s),
Command::DcgmResume => dcgm::run_dcgm_resume(dyno_client),
Expand Down
65 changes: 64 additions & 1 deletion dynolog/src/LibkinetoConfigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ int32_t LibkinetoConfigManager::registerLibkinetoContext(
std::string LibkinetoConfigManager::obtainOnDemandConfig(
const std::string& jobId,
const std::vector<int32_t>& pids,
int32_t configType) {
int32_t configType,
int currentRunloopState) {
VLOG(2) << fmt::format(
"obtainOnDemandConfig({}, ({}), {})",
jobId,
Expand All @@ -157,6 +158,7 @@ std::string LibkinetoConfigManager::obtainOnDemandConfig(
std::lock_guard<std::mutex> guard(mutex_);

auto _emplace_result = jobs_[jobId].emplace(pids_set, LibkinetoProcess{});
jobs_child_[jobId].emplace(pids[0]); // put child (leaf) process
const auto& it = _emplace_result.first;
bool newProcess = _emplace_result.second;
struct LibkinetoProcess& process = it->second;
Expand Down Expand Up @@ -187,6 +189,7 @@ std::string LibkinetoConfigManager::obtainOnDemandConfig(
// Track last request time so we know which libkineto instances
// are currently active.
process.lastRequestTime = std::chrono::system_clock::now();
process.currentRunloopState = currentRunloopState;
return ret;
}

Expand Down Expand Up @@ -288,6 +291,66 @@ GpuProfilerResult LibkinetoConfigManager::setOnDemandConfig(
return res;
}

// Called by clients to control one or more libkineto instances.
// The config is any legal libkineto on-demand config (see wiki).
// Set config type to indicate whether this request is for
// event profiling, activity profiling or both.
// The limit argument is used when the job uses multiple processes or
// the pid is a parent pid of multiple processes with libkineto.
// For example, when specifying a pid with 8 child processes,
// the limit argument can be used to profile 2 of those.
int LibkinetoConfigManager::getOnDemandProfilingState(
const std::string& jobId,
const std::set<int32_t>& pids) {
LOG(INFO) << fmt::format(
"Get state of on-demand GPU profiling for job ID {}, pids [{}]",
jobId,
fmt::join(pids, ","));

size_t nPids = pids.size();
// For backwards compatibility with older versions of the dyno CLI,
// there are two conditions under which all processes should be traced:
// 1. target PIDs are empty
// 2. target PIDs contain a single PID, 0.
// As older versions of the CLI are phased out, 2) will no longer need to be
// accounted for.
bool traceAllPids = nPids == 0 || (nPids == 1 && *pids.begin() == 0);
{
std::lock_guard<std::mutex> guard(mutex_);
auto& processes = jobs_[jobId];
for (auto& pair : processes) {
for (const auto& pid : pair.first) {
// Trace the process if we find a match or target pids is empty.
if (traceAllPids || pids.find(pid) != pids.end()) {
auto& process = pair.second;
LOG(INFO) << "Get state of on-demand GPU profiling : " << process.currentRunloopState;
return process.currentRunloopState;
}
}
}
}

LOG(INFO) << "Get state of on-demand GPU profiling : 0 matching processes";

return -1;
}

// Called by clients to control one or more libkineto instances.
// The config is any legal libkineto on-demand config (see wiki).
// Set config type to indicate whether this request is for
// event profiling, activity profiling or both.
// The limit argument is used when the job uses multiple processes or
// the pid is a parent pid of multiple processes with libkineto.
// For example, when specifying a pid with 8 child processes,
// the limit argument can be used to profile 2 of those.
std::set<int32_t> LibkinetoConfigManager::getOnDemandProfilingChildPids(
const std::string& jobId) {
LOG(INFO) << fmt::format(
"Get child pids of on-demand GPU profiling for job ID {}",
jobId);
return jobs_child_[jobId];
}

int LibkinetoConfigManager::processCount(const std::string& jobId) const {
int count = 0;
std::lock_guard<std::mutex> guard(mutex_);
Expand Down
13 changes: 12 additions & 1 deletion dynolog/src/LibkinetoConfigManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class LibkinetoConfigManager {
std::string obtainOnDemandConfig(
const std::string& jobId,
const std::vector<int32_t>& pids,
int32_t configType);
int32_t configType,
int currentRunloopState);

GpuProfilerResult setOnDemandConfig(
const std::string& jobId,
Expand All @@ -46,6 +47,13 @@ class LibkinetoConfigManager {
int32_t configType,
int32_t limit);

int getOnDemandProfilingState(
const std::string& jobId,
const std::set<int32_t>& pids);

std::set<int32_t> getOnDemandProfilingChildPids(
const std::string& jobId);

// Return the number of active libkineto processes
// with the given Chronos / Tangram Job Id
int processCount(const std::string& jobId) const;
Expand All @@ -54,6 +62,7 @@ class LibkinetoConfigManager {
struct LibkinetoProcess {
int32_t pid;
std::chrono::system_clock::time_point lastRequestTime;
int currentRunloopState;
std::string eventProfilerConfig;
std::string activityProfilerConfig;
};
Expand All @@ -70,6 +79,8 @@ class LibkinetoConfigManager {
// Map of pid ancestry -> LibkinetoProcess
using ProcessMap = std::map<std::set<int32_t>, LibkinetoProcess>;
std::map<std::string, ProcessMap> jobs_;
// Map of jobId -> child pids set
std::map<std::string, std::set<int32_t>> jobs_child_;

// Map of gpu id -> pids
using InstancesPerGpuMap = std::map<int32_t, std::set<int32_t>>;
Expand Down
18 changes: 18 additions & 0 deletions dynolog/src/ServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ GpuProfilerResult ServiceHandler::setKinetOnDemandRequest(
limit);
}

int ServiceHandler::getKinetOnDemandProfilingStateRequest(
int job_id,
const std::set<int>& pids) {
return LibkinetoConfigManager::getInstance()->getOnDemandProfilingState(
// Temporarily cast to string while we iteratively migrate to string job
// id
std::to_string(job_id),
pids);
}

std::set<int32_t> ServiceHandler::getKinetOnDemandProfilingChildPidsRequest(
int job_id) {
return LibkinetoConfigManager::getInstance()->getOnDemandProfilingChildPids(
// Temporarily cast to string while we iteratively migrate to string job
// id
std::to_string(job_id));
}

bool ServiceHandler::dcgmProfPause(int duration_s) {
if (dcgm_) {
return dcgm_->pauseProfiling(duration_s);
Expand Down
5 changes: 5 additions & 0 deletions dynolog/src/ServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class ServiceHandler {
const std::set<int>& pids,
const std::string& config,
int limit);
int getKinetOnDemandProfilingStateRequest(
int job_id,
const std::set<int>& pids);
std::set<int32_t> getKinetOnDemandProfilingChildPidsRequest(
int job_id);
// ... more to come
bool dcgmProfPause(int duration_s);
bool dcgmProfResume();
Expand Down
2 changes: 2 additions & 0 deletions dynolog/src/ipcfabric/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ struct LibkinetoRequest {
int type;
// size of pids
int n;
// current loop state of pids
int currentRunloopState;
// job id of the libkineto process
int64_t jobid;
// pids of the process and its ancestors
Expand Down
32 changes: 32 additions & 0 deletions dynolog/src/rpc/SimpleJsonServerInl.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,38 @@ std::string SimpleJsonServer<TServiceHandler>::processOneImpl(
fmt::format("failed with exception = {}", ex.what());
}
}
} else if (request["fn"] == "getKinetOnDemandProfilingStateRequest") {
if (!request.contains("pids")) {
response["status"] = "failed";
} else {
try {
std::vector<int> pids = request.at("pids").get<std::vector<int>>();
std::set<int> pids_set{
pids.begin(), pids.end()}; // TODO directly convert?

int job_id = request.value("job_id", 0);
auto result = handler_->getKinetOnDemandProfilingStateRequest(
job_id, pids_set);
response["currProfilingState"] = result;
} catch (const std::exception& ex) {
LOG(ERROR) << "getKinetOnDemandProfilingStateRequest: parsing exception = "
<< ex.what();
response["status"] =
fmt::format("failed with exception = {}", ex.what());
}
}
} else if (request["fn"] == "getKinetOnDemandProfilingChildPidsRequest") {
try {
int job_id = request.value("job_id", 0);
auto result = handler_->getKinetOnDemandProfilingChildPidsRequest(
job_id);
response["childPids"] = result;
} catch (const std::exception& ex) {
LOG(ERROR) << "getKinetOnDemandProfilingChildPidsRequest: parsing exception = "
<< ex.what();
response["status"] =
fmt::format("failed with exception = {}", ex.what());
}
} else if (request["fn"] == "dcgmProfPause") {
if (!request.contains("duration_s")) {
response["status"] = "failed";
Expand Down
2 changes: 1 addition & 1 deletion dynolog/src/tracing/IPCMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void IPCMonitor::getLibkinetoOnDemandRequest(
std::vector<int32_t> pids(req->pids, req->pids + req->n);
try {
ret_config = LibkinetoConfigManager::getInstance()->obtainOnDemandConfig(
std::to_string(req->jobid), pids, req->type);
std::to_string(req->jobid), pids, req->type, req->currentRunloopState);
VLOG(0) << "getLibkinetoOnDemandRequest() : job id " << req->jobid
<< " pids = " << pids[0];
} catch (const std::runtime_error& ex) {
Expand Down
18 changes: 18 additions & 0 deletions dynolog/tests/rpc/SimpleJsonClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ struct MockServiceHandler {
return result;
}

int getKinetOnDemandProfilingStateRequest(
int job_id,
const std::set<int>& pids) {
job_id_ = job_id;
pids_ = pids;
getKinetOnDemandProfilingStateCalls_++;

return -1;
}

std::set<int32_t> getKinetOnDemandProfilingChildPidsRequest(int job_id) {
job_id_ = job_id;
getKinetOnDemandProfilingChildPidsCall_++;
return pids_;
}

bool dcgmProfPause(int duration_s) {
duration_s_ = duration_s;
dcgm_prof_enabled_ = false;
Expand All @@ -63,6 +79,8 @@ struct MockServiceHandler {
GpuProfilerResult result;

int setKinetoOnDemandCalls_ = 0;
int getKinetOnDemandProfilingStateCalls_ = 0;
int getKinetOnDemandProfilingChildPidsCall_ = 0;
int job_id_ = -1;
std::set<int> pids_;
std::string config_;
Expand Down

0 comments on commit d48d86f

Please sign in to comment.