Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

fix external process error #2279

Merged
merged 5 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 43 additions & 29 deletions src/job-exporter/src/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,24 @@ def as_array(self):

class AtomicRef(object):
""" a thread safe way to store and get object,
should not modify data get from this ref """
def __init__(self):
should not modify data get from this ref,
each get and set method should provide a time obj,
so this ref decide whether the data is out of date or not,
return None on expired """
def __init__(self, decay_time):
self.data = None
self.date_in_produced = datetime.datetime.now()
self.decay_time = decay_time
self.lock = threading.RLock()

def get_and_set(self, new_data):
data = None
def set(self, data, now):
with self.lock:
data, self.data = self.data, new_data
return data
self.data, self.date_in_produced = data, now

def get(self):
def get(self, now):
with self.lock:
if self.date_in_produced + self.decay_time < now:
return None
return self.data


Expand Down Expand Up @@ -220,7 +225,7 @@ def collect(self):
with self.collector_histogram.time():
self.iteration_counter.labels(name=self.name).inc()
try:
self.atomic_ref.get_and_set(self.collect_impl())
self.atomic_ref.set(self.collect_impl(), datetime.datetime.now())
except Exception as e:
logger.exception("%s collector get an exception", self.name)

Expand All @@ -235,17 +240,17 @@ def collect_impl(self):
pass


def instantiate_collector(name, sleep_time, collector_class, *args):
def instantiate_collector(name, sleep_time, decay_time, collector_class, *args):
""" test cases helper fn to instantiate a collector """
atomic_ref = AtomicRef()
atomic_ref = AtomicRef(decay_time)
return atomic_ref, collector_class(name, sleep_time, atomic_ref, iteration_counter, *args)


def make_collector(name, sleep_time, collector_class, *args):
def make_collector(name, sleep_time, decay_time, collector_class, *args):
""" other module should use this fn to init a collector, this fn start a thread
to run the collector and return an atomic_ref so outside world can get metrics
collected by this collector """
atomic_ref, instance = instantiate_collector(name, sleep_time, collector_class, *args)
atomic_ref, instance = instantiate_collector(name, sleep_time, decay_time, collector_class, *args)

t = threading.Thread(
target=instance.collect,
Expand Down Expand Up @@ -348,30 +353,40 @@ def convert_to_metrics(gpu_info, zombie_info, pid_to_cid_fn, mem_leak_thrashold)
if len(info.pids) > 0:
pids_use_gpu[minor]= info.pids

if zombie_info is not None and len(zombie_info) > 0 and len(pids_use_gpu) > 0:
logger.debug("pids_use_gpu is %s, zombie_info is %s", pids_use_gpu, zombie_info)
if len(pids_use_gpu) > 0:
if zombie_info is None:
zombie_info = []

for minor, pids in pids_use_gpu.items():
for pid in pids:
found, z_id = pid_to_cid_fn(pid)
if found and z_id in zombie_info:
# found corresponding container
zombie_container.add_metric([minor, z_id], 1)
logger.debug("pid %s has found %s, z_id %s", pid, found, z_id)
if found:
# NOTE: zombie_info is a set of short docker container id, but
# z_id is full id.
for zombie_id in zombie_info:
if z_id.startswith(zombie_id):
# found corresponding container
zombie_container.add_metric([minor, zombie_id], 1)
else:
external_process.add_metric([minor, pid], 1)
logger.warning("found gpu used by external %s, zombie container %s",
external_process, zombie_container)
if len(zombie_container.samples) > 0 or len(external_process.samples) > 0:
logger.warning("found gpu used by external %s, zombie container %s",
external_process, zombie_container)

return [core_utils, mem_utils, ecc_errors, mem_leak,
external_process, zombie_container]

def collect_impl(self):
zombie_info = self.zombie_info_ref.get_and_set(None)

gpu_info = nvidia.nvidia_smi(GpuCollector.cmd_histogram,
GpuCollector.cmd_timeout)

logger.debug("get gpu_info %s", gpu_info)

self.gpu_info_ref.get_and_set(gpu_info)
now = datetime.datetime.now()
self.gpu_info_ref.set(gpu_info, now)
zombie_info = self.zombie_info_ref.get(now)

if gpu_info is not None:
return GpuCollector.convert_to_metrics(gpu_info, zombie_info,
Expand Down Expand Up @@ -418,7 +433,7 @@ class ContainerCollector(Collector):
"node-exporter",
"job-exporter",
"yarn-exporter",
"nvidia-drivers",
"nvidia-drivers",
"docker-cleaner"
]))

Expand All @@ -442,13 +457,12 @@ def collect_impl(self):
ContainerCollector.iftop_histogram,
ContainerCollector.iftop_timeout)

# set it to None so if nvidia-smi hangs till next time we get,
# we will get None
gpu_infos = self.gpu_info_ref.get_and_set(None)

stats_obj = docker_stats.stats(ContainerCollector.stats_histogram,
ContainerCollector.stats_timeout)
self.stats_info_ref.get_and_set(stats_obj)

now = datetime.datetime.now()
gpu_infos = self.gpu_info_ref.get(now)
self.stats_info_ref.set(stats_obj, now)

logger.debug("all_conns is %s", all_conns)
logger.debug("gpu_info is %s", gpu_infos)
Expand Down Expand Up @@ -711,9 +725,9 @@ def update_zombie_count(self, stats):
def collect_impl(self):
# set it to None so if docker-stats hangs till next time we get,
# we will get None
stats_info = self.stats_info_ref.get_and_set(None)
stats_info = self.stats_info_ref.get(datetime.datetime.now())
all_zombies = self.update_zombie_count(stats_info)
self.zombie_ids_ref.get_and_set(all_zombies)
self.zombie_ids_ref.set(all_zombies, datetime.datetime.now())


class ProcessCollector(Collector):
Expand Down
2 changes: 1 addition & 1 deletion src/job-exporter/src/docker_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def stats(histogram, timeout):
try:
result = utils.exec_cmd([
"docker", "stats", "--no-stream", "--format",
"table {{.Container}},{{.Name}},{{.CPUPerc}},{{.MemUsage}},{{.NetIO}},{{.BlockIO}},{{.MemPerc}}"],
"table {{.ID}},{{.Name}},{{.CPUPerc}},{{.MemUsage}},{{.NetIO}},{{.BlockIO}},{{.MemPerc}}"],
histogram=histogram,
timeout=timeout)
return parse_docker_stats(result)
Expand Down
27 changes: 15 additions & 12 deletions src/job-exporter/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import signal
import faulthandler
import gc
import datetime

import prometheus_client
from prometheus_client import Gauge
Expand All @@ -34,10 +35,10 @@ def __init__(self, atomic_refs):
def collect(self):
data = []

now = datetime.datetime.now()

for ref in self.atomic_refs:
# set None to achieve
# https://github.com/Microsoft/pai/issues/1764#issuecomment-442733098
d = ref.get_and_set(None)
d = ref.get(now)
if d is not None:
data.extend(d)

Expand Down Expand Up @@ -129,27 +130,29 @@ def main(args):

configured_gpu_counter.set(get_gpu_count("/gpu-config/gpu-configuration.json"))

decay_time = datetime.timedelta(seconds=args.interval * 2)

# used to exchange gpu info between GpuCollector and ContainerCollector
gpu_info_ref = collector.AtomicRef()
gpu_info_ref = collector.AtomicRef(decay_time)

# used to exchange docker stats info between ContainerCollector and ZombieCollector
stats_info_ref = collector.AtomicRef()
stats_info_ref = collector.AtomicRef(decay_time)

# used to exchange zombie info between GpuCollector and ZombieCollector
zombie_info_ref = collector.AtomicRef()
zombie_info_ref = collector.AtomicRef(decay_time)

interval = args.interval
# Because all collector except container_collector will spent little time in calling
# external command to get metrics, so they need to sleep 30s to align with prometheus
# scrape interval. The 99th latency of container_collector loop is around 20s, so it
# should only sleep 10s to adapt to scrape interval
collector_args = [
("docker_daemon_collector", interval, collector.DockerCollector),
("gpu_collector", interval / 2, collector.GpuCollector, gpu_info_ref, zombie_info_ref, args.threshold),
("container_collector", interval - 18, collector.ContainerCollector,
("docker_daemon_collector", interval, decay_time, collector.DockerCollector),
("gpu_collector", interval, decay_time, collector.GpuCollector, gpu_info_ref, zombie_info_ref, args.threshold),
("container_collector", max(0, interval - 18), decay_time, collector.ContainerCollector,
gpu_info_ref, stats_info_ref, args.interface),
("zombie_collector", interval, collector.ZombieCollector, stats_info_ref, zombie_info_ref),
("process_collector", interval, collector.ProcessCollector),
("zombie_collector", interval, decay_time, collector.ZombieCollector, stats_info_ref, zombie_info_ref),
("process_collector", interval, decay_time, collector.ProcessCollector),
]

refs = list(map(lambda x: collector.make_collector(*x), collector_args))
Expand All @@ -168,7 +171,7 @@ def main(args):
parser = argparse.ArgumentParser()
parser.add_argument("--log", "-l", help="log dir to store log", default="/datastorage/prometheus")
parser.add_argument("--port", "-p", help="port to expose metrics", default="9102")
parser.add_argument("--interval", "-i", help="prometheus scrape interval", type=int, default=30)
parser.add_argument("--interval", "-i", help="prometheus scrape interval second", type=int, default=30)
parser.add_argument("--interface", "-n", help="network interface for job-exporter to listen on", required=True)
parser.add_argument("--threshold", "-t", help="memory threshold to consider gpu memory leak", type=int, default=20 * 1024 * 1024)
args = parser.parse_args()
Expand Down
91 changes: 86 additions & 5 deletions src/job-exporter/test/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def test_impl(self):
_, c = collector.instantiate_collector(
"test_docker_collector1",
0.5,
datetime.timedelta(seconds=1),
collector.DockerCollector)

self.assert_metrics(c.collect_impl())
Expand All @@ -86,11 +87,12 @@ def test_base_collector(self):
ref = collector.make_collector(
"test_docker_collector2",
0.5,
datetime.timedelta(seconds=10),
collector.DockerCollector)

metrics = None
for i in range(10):
metrics = ref.get()
metrics = ref.get(datetime.datetime.now())
if metrics is not None:
break
time.sleep(0.1)
Expand All @@ -107,12 +109,14 @@ def setUp(self):
# in from name, we need to differentiate name using time.
t = str(time.time()).replace(".", "_")

decay_time = datetime.timedelta(seconds=1)
_, self.collector = collector.instantiate_collector(
"test_zombie_collector" + t,
0.5,
decay_time,
collector.ZombieCollector,
collector.AtomicRef(),
collector.AtomicRef())
collector.AtomicRef(decay_time),
collector.AtomicRef(decay_time))

def test_update_zombie_count_type1(self):
start = datetime.datetime.now()
Expand Down Expand Up @@ -198,7 +202,7 @@ def test_convert_to_metrics(self):

zombie_info = {"abc", "def"}

pid_to_cid_mapping = {33: "def", 22: "ghi", 44: "jkl"} # only 33 is zombie
pid_to_cid_mapping = {33: "def", 22: "ghi"} # only 33 is zombie

metrics = GpuCollector.convert_to_metrics(gpu_info, zombie_info,
self.make_pid_to_cid_fn(pid_to_cid_mapping), 20 * 1024)
Expand All @@ -222,7 +226,6 @@ def test_convert_to_metrics(self):
self.assertEqual(target_mem_leak, mem_leak)

target_external_process = collector.gen_gpu_used_by_external_process_counter()
target_external_process.add_metric(["0", 22], 1)
target_external_process.add_metric(["0", 44], 1)
self.assertEqual(target_external_process, external_process)

Expand Down Expand Up @@ -307,5 +310,83 @@ def test_convert_to_metrics(self):
target_mem_leak.add_metric(["3"], 1)
self.assertEqual(target_mem_leak, mem_leak)

def test_convert_to_metrics_with_no_zombie_info_BUGFIX(self):
gpu_info = {"0": nvidia.NvidiaGpuStatus(20, 21, [22, 33, 44],
nvidia.EccError())}

# zombie_info is empty should also have external process metric
zombie_info = []

pid_to_cid_mapping = {33: "def", 22: "ghi"} # only 44 is external process

metrics = GpuCollector.convert_to_metrics(gpu_info, zombie_info,
self.make_pid_to_cid_fn(pid_to_cid_mapping), 20 * 1024)

core_utils, mem_utils, ecc_errors, mem_leak, external_process, zombie_container = metrics

self.assertEqual(0, len(zombie_container.samples))
self.assertEqual(1, len(external_process.samples))
self.assertEqual("0", external_process.samples[0].labels["minor_number"])
self.assertEqual(44, external_process.samples[0].labels["pid"])

# zombie_info is None should also have external process metric
zombie_info = None

metrics = GpuCollector.convert_to_metrics(gpu_info, zombie_info,
self.make_pid_to_cid_fn(pid_to_cid_mapping), 20 * 1024)

core_utils, mem_utils, ecc_errors, mem_leak, external_process, zombie_container = metrics

self.assertEqual(0, len(zombie_container.samples))
self.assertEqual(1, len(external_process.samples))
self.assertEqual("0", external_process.samples[0].labels["minor_number"])
self.assertEqual(44, external_process.samples[0].labels["pid"])

def test_convert_to_metrics_with_real_id_BUGFIX(self):
gpu_info = {"0": nvidia.NvidiaGpuStatus(20, 21, [22],
nvidia.EccError())}

# zombie_info is empty should also have external process metric
zombie_info = {"ce5de12d6275"}

pid_to_cid_mapping = {22: "ce5de12d6275dc05c9ec5b7f58484f075f4775d8f54f6a4be3dc1439344df356"}

metrics = GpuCollector.convert_to_metrics(gpu_info, zombie_info,
self.make_pid_to_cid_fn(pid_to_cid_mapping), 20 * 1024)

core_utils, mem_utils, ecc_errors, mem_leak, external_process, zombie_container = metrics

self.assertEqual(1, len(zombie_container.samples))
self.assertEqual("0", zombie_container.samples[0].labels["minor_number"])
self.assertEqual("ce5de12d6275", zombie_container.samples[0].labels["container_id"])

class TestAtomicRef(base.TestBase):
"""
Test AtomicRef in collecotr.py
"""

def test_expiration(self):
ref = collector.AtomicRef(datetime.timedelta(seconds=10))

now = datetime.datetime.now()

delta = datetime.timedelta(seconds=1)

ref.set(1, now)

self.assertEquals(1, ref.get(now))
self.assertEquals(1, ref.get(now - delta))
self.assertEquals(1, ref.get(now + delta))
self.assertEquals(1, ref.get(now + delta * 10))
self.assertEquals(None, ref.get(now + delta * 11))
self.assertEquals(1, ref.get(now + delta * 10))

ref.set(2, now + delta)
self.assertEquals(2, ref.get(now))
self.assertEquals(2, ref.get(now + delta * 10))
self.assertEquals(2, ref.get(now + delta * 11))
self.assertEquals(None, ref.get(now + delta * 12))


if __name__ == '__main__':
unittest.main()