Skip to content

Commit

Permalink
remove sys.stdout
Browse files Browse the repository at this point in the history
  • Loading branch information
Wael Abid committed Jun 30, 2022
2 parents dbf75d2 + c3bcd5b commit 53871f9
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 66 deletions.
10 changes: 4 additions & 6 deletions ludwig/utils/misc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,12 @@ def set_saved_weights_in_checkpoint_flag(config):


def processify(func):
"""Decorator to run a function as a process.
Be sure that every argument and the return value
is *pickable*.
The created process is joined, so the code does not
run in parallel.
"""Decorator to run a function as a process. Be sure that every argument and the return value is *pickable*.
The created process is joined, so the code does not run in parallel.
Taken from https://gist.github.com/schlamar/2311116
"""

def process_func(q, *args, **kwargs):
try:
ret = func(q, *args, **kwargs)
Expand All @@ -181,4 +179,4 @@ def wrapper(*args, **kwargs):

return p, queue

return wrapper
return wrapper
148 changes: 89 additions & 59 deletions ludwig/utils/tracker.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,62 @@
"""
some parts are inspired from
https://github.com/Breakend/experiment-impact-tracker/blob/master/experiment_impact_tracker/compute_tracker.py
"""
"""some parts are inspired from https://github.com/Breakend/experiment-impact-
tracker/blob/master/experiment_impact_tracker/compute_tracker.py."""

import psutil
import shutil
import multiprocessing
import os
import shutil
import sys
import time
import torch
import multiprocessing
import traceback
import sys

from queue import Empty as EmptyQueueException

import psutil
import torch
from gpustat.core import GPUStatCollection

from ludwig.utils.data_utils import save_json, load_json
from ludwig.utils.misc_utils import processify
from ludwig.globals import LUDWIG_VERSION
from ludwig.utils.data_utils import load_json, save_json
from ludwig.utils.misc_utils import processify

<<<<<<< HEAD
from experiment_impact_tracker.py_environment.common import get_python_packages_and_versions
from experiment_impact_tracker.gpu.nvidia import get_gpu_info
from experiment_impact_tracker.cpu.common import get_my_cpu_info
=======
# disabling print because the following imports are verbose
f = open(os.devnull, "w")
sys.stdout = f
from experiment_impact_tracker.cpu.common import get_my_cpu_info
from experiment_impact_tracker.gpu.nvidia import get_gpu_info
from experiment_impact_tracker.py_environment.common import get_python_packages_and_versions

f.close()
sys.stdout = sys.__stdout__
>>>>>>> c3bcd5b967b8b2f50abaf5f9477ddbca504a19fc

STOP_MESSAGE = "stop"

STOP_MESSAGE = 'stop'


@processify
def monitor(queue, info, output_dir, logging_interval):
"""
Monitors hardware resource use as part of a separate process.
"""Monitors hardware resource use as part of a separate process.
Populate `info` with system specific metrics (GPU, CPU, RAM, swap) at a `logging_interval` interval
and saves the output in `output_dir`.
Populate `info` with system specific metrics (GPU, CPU, RAM, swap) at a `logging_interval` interval and saves the
output in `output_dir`.
"""
for key in info['system']:
if 'gpu_' in key:
info['system'][key]['memory_used'] = []
info['system']['cpu_utilization'] = []
info['system']['ram_utilization'] = []
info['system']['swap_utilization'] = []
for key in info["system"]:
if "gpu_" in key:
info["system"][key]["memory_used"] = []
info["system"]["cpu_utilization"] = []
info["system"]["ram_utilization"] = []
info["system"]["swap_utilization"] = []

while True:
try:
message = queue.get(block=False)
if isinstance(message, str):
if message == STOP_MESSAGE:
save_json(os.path.join(output_dir, info['tag'] + "_temp.json"), info)
save_json(os.path.join(output_dir, info["tag"] + "_temp.json"), info)
return
else:
queue.put(message)
Expand All @@ -55,18 +65,22 @@ def monitor(queue, info, output_dir, logging_interval):

gpu_infos = GPUStatCollection.new_query()
for i, gpu_info in enumerate(gpu_infos):
gpu_key = 'gpu_{}'.format(i)
info['system'][gpu_key]['memory_used'].append(gpu_info.memory_used)
info['system']['cpu_utilization'].append(psutil.cpu_percent())
info['system']['ram_utilization'].append(psutil.virtual_memory().percent)
info['system']['swap_utilization'].append(psutil.swap_memory().percent)
gpu_key = f"gpu_{i}"
info["system"][gpu_key]["memory_used"].append(gpu_info.memory_used)
info["system"]["cpu_utilization"].append(psutil.cpu_percent())
info["system"]["ram_utilization"].append(psutil.virtual_memory().percent)
info["system"]["swap_utilization"].append(psutil.swap_memory().percent)
time.sleep(logging_interval)


class Tracker:
<<<<<<< HEAD
"""
Track system resource (hardware and software) usage by a chunk of code.
"""
=======
"""Track system resource (hardware and software) usage by a chunk of code."""
>>>>>>> c3bcd5b967b8b2f50abaf5f9477ddbca504a19fc

def __init__(self, tag, output_dir, logging_interval=1, num_batches=None, num_examples=None):
"""
Expand All @@ -78,49 +92,50 @@ def __init__(self, tag, output_dir, logging_interval=1, num_batches=None, num_ex
"""
self.output_dir = output_dir
self.tag = tag
self.info = {'tag': self.tag, 'system': {}}
self.info = {"tag": self.tag, "system": {}}
self.num_batches = num_batches
self.num_examples = num_examples
self.logging_interval = logging_interval
self.launched = False
os.makedirs(os.path.join(self.output_dir), exist_ok=True)

def populate_static_information(self):
"""
Populates the report with static software and hardware information.
"""
self.info['ludwig_version'] = LUDWIG_VERSION
self.info['start_disk_usage'] = shutil.disk_usage(os.path.expanduser('~')).used
"""Populates the report with static software and hardware information."""
self.info["ludwig_version"] = LUDWIG_VERSION
self.info["start_disk_usage"] = shutil.disk_usage(os.path.expanduser("~")).used

# CPU information
<<<<<<< HEAD
self.info['system']['python_packages_and_versions'] = [str(package) for package in
get_python_packages_and_versions()]
=======
self.info["system"]["python_packages_and_versions"] = [
str(package) for package in get_python_packages_and_versions()
]
>>>>>>> c3bcd5b967b8b2f50abaf5f9477ddbca504a19fc
cpu_info = get_my_cpu_info()
self.info['system']['cpu_architecture'] = cpu_info['arch']
self.info['system']['num_cpu'] = cpu_info['count']
self.info['system']['cpu_name'] = cpu_info['brand_raw']
self.info["system"]["cpu_architecture"] = cpu_info["arch"]
self.info["system"]["num_cpu"] = cpu_info["count"]
self.info["system"]["cpu_name"] = cpu_info["brand_raw"]

# GPU information
gpu_infos = get_gpu_info()
for i, gpu_info in enumerate(gpu_infos):
gpu_key = 'gpu_{}'.format(i)
self.info['system'][gpu_key] = {}
self.info['system'][gpu_key]['name'] = gpu_info['name']
self.info['system'][gpu_key]['total_memory'] = gpu_info['total_memory']
self.info['system'][gpu_key]['driver_version'] = gpu_info['driver_version']
self.info['system'][gpu_key]['cuda_version'] = gpu_info['cuda_version']
gpu_key = f"gpu_{i}"
self.info["system"][gpu_key] = {}
self.info["system"][gpu_key]["name"] = gpu_info["name"]
self.info["system"][gpu_key]["total_memory"] = gpu_info["total_memory"]
self.info["system"][gpu_key]["driver_version"] = gpu_info["driver_version"]
self.info["system"][gpu_key]["cuda_version"] = gpu_info["cuda_version"]

torch.cuda.synchronize()
self.info['start_time'] = time.time()
self.info['num_examples'] = self.num_examples
self.info["start_time"] = time.time()
self.info["num_examples"] = self.num_examples

def __enter__(self):
"""
Populates static information and forks process to monitor resource
usage.
"""
"""Populates static information and forks process to monitor resource usage."""
if self.launched:
raise ValueError('Tracker already launched.')
raise ValueError("Tracker already launched.")

self.populate_static_information()

Expand All @@ -140,24 +155,24 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""
Waits for monitoring process to exit.
Computes and postprocesses more metrics.
Saves report.
"""Waits for monitoring process to exit.
Computes and postprocesses more metrics. Saves report.
"""
self.queue.put(STOP_MESSAGE)
torch.cuda.synchronize()
self.p.join()

self.info = load_json(os.path.join(self.output_dir, self.info['tag'] + "_temp.json"))
os.remove(os.path.join(self.output_dir, self.info['tag'] + "_temp.json"))
self.info = load_json(os.path.join(self.output_dir, self.info["tag"] + "_temp.json"))
os.remove(os.path.join(self.output_dir, self.info["tag"] + "_temp.json"))

self.info['end_time'] = time.time()
self.info['{}_total_duration'.format(self.tag)] = self.info['end_time'] - self.info['start_time']
self.info["end_time"] = time.time()
self.info[f"{self.tag}_total_duration"] = self.info["end_time"] - self.info["start_time"]

# if self.num_batches:
# self.info['per_batch_duration'] = self.info['{}_total_duration'.format(self.tag)] / self.num_batches
if self.num_examples:
<<<<<<< HEAD
self.info['per_example_duration'] = self.info['{}_total_duration'.format(self.tag)] / self.num_examples
self.info['examples_per_second'] = self.num_examples / self.info['{}_total_duration'.format(self.tag)]
self.info['end_disk_usage'] = shutil.disk_usage(os.path.expanduser('~')).used
Expand All @@ -171,3 +186,18 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.info['system']['max_swap_utilization'] = max(self.info['system']['swap_utilization'])

save_json(os.path.join(self.output_dir, self.info['tag'] + "_metrics.json"), self.info)
=======
self.info["per_example_duration"] = self.info[f"{self.tag}_total_duration"] / self.num_examples
self.info["examples_per_second"] = self.num_examples / self.info[f"{self.tag}_total_duration"]
self.info["end_disk_usage"] = shutil.disk_usage(os.path.expanduser("~")).used
self.info["disk_footprint"] = self.info["end_disk_usage"] - self.info["start_disk_usage"]

for key in self.info["system"]:
if "gpu_" in key:
self.info["system"][key]["max_memory_used"] = max(self.info["system"][key]["memory_used"])
self.info["system"]["max_cpu_utilization"] = max(self.info["system"]["cpu_utilization"])
self.info["system"]["max_ram_utilization"] = max(self.info["system"]["ram_utilization"])
self.info["system"]["max_swap_utilization"] = max(self.info["system"]["swap_utilization"])

save_json(os.path.join(self.output_dir, self.info["tag"] + "_metrics.json"), self.info)
>>>>>>> c3bcd5b967b8b2f50abaf5f9477ddbca504a19fc
2 changes: 1 addition & 1 deletion requirements_tracker.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
experiment_impact_tracker
gpustat
psutil
psutil

0 comments on commit 53871f9

Please sign in to comment.