Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sarc 229 - Add function load_job_series() #89

Merged
merged 29 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
448ccc4
Add signature for load_job_series().
notoraptor Nov 17, 2023
a2979c4
Add old code for load_job_series
notoraptor Nov 17, 2023
f5f25d4
Add DUMMY_STATS and optimize imports.
notoraptor Nov 17, 2023
91fcec3
Update old code.
notoraptor Nov 17, 2023
9715371
Write correctly load_job_series().
notoraptor Nov 17, 2023
c691034
Update .gitignore.
notoraptor Nov 17, 2023
bc3311a
Fix and test count_jobs()
notoraptor Nov 17, 2023
56a991c
Add doc.
notoraptor Nov 17, 2023
fc14a47
Fix and test load_job_series.
notoraptor Nov 20, 2023
7daa5fb
Add a desc to tqdm
notoraptor Nov 20, 2023
1167a12
Reformat code.
notoraptor Nov 20, 2023
e4f329b
Improve testing for clip_time.
notoraptor Nov 20, 2023
95f925c
load_job_series(): make job fields filtering and callback call at the…
notoraptor Nov 21, 2023
e0fa912
Add a test to check end_time in data frames.
notoraptor Nov 21, 2023
412ba4c
Add test for stored statistics.
notoraptor Nov 21, 2023
9879845
Rename tests from test_get_jobs_* to test_load_job_series_*
notoraptor Nov 21, 2023
7a357b0
Use a single time to set all missing end times in a call to load_job_…
notoraptor Nov 22, 2023
8238bff
Mock with a more recent time.
notoraptor Nov 22, 2023
a1959c7
Update sarc/jobs/series.py
notoraptor Nov 23, 2023
b05dfc7
Update sarc/jobs/series.py
notoraptor Nov 23, 2023
26d792f
Merge job series and job dict, with job series fields having preceden…
notoraptor Nov 23, 2023
6e0d8f2
Get gres_gpu from job.requested
notoraptor Nov 24, 2023
bd317ed
Remove duplicated fields.
notoraptor Nov 24, 2023
f67329f
List data frame columns in docstring.
notoraptor Nov 24, 2023
fb81374
Update sarc/jobs/series.py
notoraptor Nov 27, 2023
2f60e3a
Flatten job's `allocated` and `requested` fields.
notoraptor Nov 27, 2023
64a1fcc
Flatten job.allocated anyway before further computations, to include …
notoraptor Nov 27, 2023
0c15c58
Merge branch 'master' into sarc-229
nurbal Nov 28, 2023
798c9be
Remove useless lines.
notoraptor Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ sarc-cache/
_build
sarc_mongo
*-checkpoint.ipynb
sarc-test-cache
dbconfig.txt
2 changes: 1 addition & 1 deletion sarc/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .job import SlurmJob, get_job, get_jobs
from .job import SlurmJob, count_jobs, get_job, get_jobs
from .series import get_job_time_series, get_job_time_series_metric_names
76 changes: 69 additions & 7 deletions sarc/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,16 @@ def get_clusters():


# pylint: disable=too-many-branches,dangerous-default-value
def get_jobs(
def _compute_jobs_query(
*,
cluster: str | ClusterConfig | None = None,
job_id: int | list[int] | None = None,
job_state: str | SlurmState | None = None,
user: str | None = None,
start: str | datetime | None = None,
end: str | datetime | None = None,
query_options: dict | None = None,
) -> Iterable[SlurmJob]:
"""Get jobs that match the query.
) -> dict:
"""Compute the MongoDB query dict to be used to match given arguments.

Arguments:
cluster: The cluster on which to search for jobs.
Expand All @@ -225,9 +224,6 @@ def get_jobs(
end: Get all jobs that have a status before that time.
query_options: Additional options to pass to MongoDB (limit, etc.)
"""
if query_options is None:
query_options = {}

cluster_name = cluster
if isinstance(cluster, ClusterConfig):
cluster_name = cluster.name
Expand Down Expand Up @@ -278,6 +274,72 @@ def get_jobs(
]
}

return query


def count_jobs(
*,
cluster: str | ClusterConfig | None = None,
job_id: int | list[int] | None = None,
job_state: str | SlurmState | None = None,
user: str | None = None,
start: str | datetime | None = None,
end: str | datetime | None = None,
query_options: dict | None = None,
) -> int:
"""Count jobs that match the query.

Arguments:
cluster: The cluster on which to search for jobs.
job_id: The id or a list of ids to select.
start: Get all jobs that have a status after that time.
end: Get all jobs that have a status before that time.
query_options: Additional options to pass to MongoDB (limit, etc.)
"""
query = _compute_jobs_query(
cluster=cluster,
job_id=job_id,
job_state=job_state,
user=user,
start=start,
end=end,
)
if query_options is None:
query_options = {}
return config().mongo.database_instance.jobs.count_documents(query, **query_options)


def get_jobs(
*,
cluster: str | ClusterConfig | None = None,
job_id: int | list[int] | None = None,
job_state: str | SlurmState | None = None,
user: str | None = None,
start: str | datetime | None = None,
end: str | datetime | None = None,
query_options: dict | None = None,
) -> Iterable[SlurmJob]:
"""Get jobs that match the query.

Arguments:
cluster: The cluster on which to search for jobs.
job_id: The id or a list of ids to select.
start: Get all jobs that have a status after that time.
end: Get all jobs that have a status before that time.
query_options: Additional options to pass to MongoDB (limit, etc.)
"""
if query_options is None:
query_options = {}

query = _compute_jobs_query(
cluster=cluster,
job_id=job_id,
job_state=job_state,
user=user,
start=start,
end=end,
)

coll = jobs_collection()

return coll.find_by(query, **query_options)
Expand Down
155 changes: 152 additions & 3 deletions sarc/jobs/series.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Callable

import numpy as np
import pandas
from pandas import DataFrame
from prometheus_api_client import MetricRangeDataFrame
from tqdm import tqdm

from sarc.config import MTL, UTC
from sarc.jobs.job import JobStatistics, Statistics
from sarc.jobs.job import JobStatistics, Statistics, count_jobs, get_jobs

if TYPE_CHECKING:
from sarc.jobs.sacct import SlurmJob
Expand Down Expand Up @@ -218,6 +220,153 @@ def compute_job_statistics(job: SlurmJob):
)


DUMMY_STATS = {
label: np.nan
for label in [
"gpu_utilization",
"cpu_utilization",
"gpu_memory",
"gpu_power",
"system_memory",
]
}


# pylint: disable=too-many-statements,fixme
def load_job_series(
*,
fields: None | list[str] | dict[str, str] = None,
clip_time: bool = False,
callback: None | Callable = None,
**jobs_args,
) -> pandas.DataFrame:
"""
Query jobs from the database and return them in a DataFrame, including full user info
for each job.

Parameters
----------
fields: list or dict
Job fields to include in the DataFrame. By default, include all fields.
A dictionary may be passed to select fields and rename them in the DataFrame.
In such case, the keys are the fields' names and the values are the names
they will have in the DataFrame.
clip_time: bool
Whether the duration time of the jobs should be clipped within `start` and `end`.
ValueError will be raised if `clip_time` is True and either of `start` or `end` is None.
Defaults to False.
callback: Callable
Callable taking the list of job dictionaries in the format it would be included in the DataFrame.
**jobs_args
Arguments to be passed to `get_jobs` to query jobs from the database.

Returns
-------
DataFrame
Panda's data frame containing jobs, with following columns:
- All fields returned by method SlurmJob.dict()
- Job series fields:
"gpu_utilization", "cpu_utilization", "gpu_memory", "gpu_power", "system_memory",
"gpu_allocated", "cpu_allocated", "gpu_requested", "cpu_requested"
- Optional job series fields, added if clip_time is True:
"unclipped_start" and "unclipped_end"
"""

# If fields is a list, convert it to a renaming dict with same old and new names.
if isinstance(fields, list):
fields = {key: key for key in fields}

start = jobs_args.get("start", None)
end = jobs_args.get("end", None)

total = count_jobs(**jobs_args)

rows = []
now = datetime.now(tz=MTL)
# Fetch all jobs from the clusters
for job in tqdm(get_jobs(**jobs_args), total=total, desc="load job series"):
if job.end_time is None:
job.end_time = now

# For some reason start time is not reliable, often equal to submit time,
# so we infer it based on end_time and elapsed_time.
job.start_time = job.end_time - timedelta(seconds=job.elapsed_time)

unclipped_start = None
unclipped_end = None
if clip_time:
if start is None:
raise ValueError("Clip time: missing start")
if end is None:
raise ValueError("Clip time: missing end")
# Clip the job to the time range we are interested in.
unclipped_start = job.start_time
job.start_time = max(job.start_time, start)
unclipped_end = job.end_time
job.end_time = min(job.end_time, end)
# Could be negative if job started after end. We don't want to filter
# them out because they have been submitted before end, and we want to
# compute their wait time.
job.elapsed_time = max((job.end_time - job.start_time).total_seconds(), 0)

if job.stored_statistics is None:
job_series = DUMMY_STATS.copy()
else:
job_series = job.stored_statistics.dict()
job_series = {k: select_stat(k, v) for k, v in job_series.items()}

# Flatten job.requested and job.allocated into job_series
job_series.update(
{f"requested.{key}": value for key, value in job.requested.dict().items()}
)
job_series.update(
{f"allocated.{key}": value for key, value in job.allocated.dict().items()}
)
# Additional computations for job.allocated flattened fields.
# TODO: Why is it possible to have billing smaller than gres_gpu???
billing = job.allocated.billing or 0
gres_gpu = job.requested.gres_gpu or 0
if gres_gpu:
job_series["allocated.gres_gpu"] = max(billing, gres_gpu)
job_series["allocated.cpu"] = job.allocated.cpu
else:
job_series["allocated.gres_gpu"] = 0
job_series["allocated.cpu"] = (
max(billing, job.allocated.cpu) if job.allocated.cpu else 0
)

if clip_time:
job_series["unclipped_start"] = unclipped_start
job_series["unclipped_end"] = unclipped_end

# Merge job series and job,
# with job series overriding job fields if necessary.
# Do not include raw requested and allocated anymore.
final_job_dict = job.dict(exclude={"requested", "allocated"})
final_job_dict.update(job_series)
job_series = final_job_dict

if fields is not None:
job_series = {
new_name: job_series[old_name] for old_name, new_name in fields.items()
}
rows.append(job_series)
if callback:
callback(rows)

return pandas.DataFrame(rows)


def select_stat(name, dist):
if not dist:
return np.nan

if name in ["system_memory", "gpu_memory"]:
return dist["max"]

return dist["median"]


slurm_job_metric_names = [
"slurm_job_core_usage",
"slurm_job_core_usage_total",
Expand Down
73 changes: 73 additions & 0 deletions tests/functional/jobs/test_func_count_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from datetime import datetime

import pytest

from sarc.config import MTL, config
from sarc.jobs import count_jobs, get_jobs
from sarc.jobs.job import get_job

parameters = {
"no_cluster": {},
"cluster_str": {"cluster": "patate"},
"job_state": {"job_state": "COMPLETED"},
"one_job": {"job_id": 10},
"one_job_wrong_cluster": {"job_id": 10, "cluster": "patate"},
"many_jobs": {"job_id": [8, 9]},
"no_jobs": {"job_id": []},
"start_only": {"start": datetime(2023, 2, 19, tzinfo=MTL)},
"end_only": {"end": datetime(2023, 2, 16, tzinfo=MTL)},
"start_str_only": {"start": "2023-02-19"},
"end_str_only": {"end": "2023-02-16"},
"start_and_end": {
"start": datetime(2023, 2, 15, tzinfo=MTL),
"end": datetime(2023, 2, 18, tzinfo=MTL),
},
"user": {"user": "beaubonhomme"},
"resubmitted": {"job_id": 1_000_000},
}


@pytest.mark.usefixtures("read_only_db", "tzlocal_is_mtl")
@pytest.mark.parametrize("params", parameters.values(), ids=parameters.keys())
def test_count_jobs(params, file_regression):
jobs = list(get_jobs(**params))
assert len(jobs) == count_jobs(**params)
file_regression.check(
f"Found {len(jobs)} job(s):\n"
+ "\n".join([job.json(exclude={"id": True}, indent=4) for job in jobs])
)


@pytest.mark.usefixtures("read_only_db", "tzlocal_is_mtl")
def test_count_jobs_cluster_cfg(file_regression):
jobs = list(get_jobs(cluster=config().clusters["fromage"]))
assert len(jobs) == count_jobs(cluster=config().clusters["fromage"])
file_regression.check(
f"Found {len(jobs)} job(s):\n"
+ "\n".join([job.json(exclude={"id": True}, indent=4) for job in jobs])
)


@pytest.mark.usefixtures("read_only_db", "tzlocal_is_mtl")
def test_count_jobs_wrong_job_id():
with pytest.raises(TypeError, match="job_id must be an int or a list of ints"):
count_jobs(job_id="wrong id")


@pytest.mark.usefixtures("read_only_db", "tzlocal_is_mtl")
def test_count_job():
jbs = list(get_jobs(cluster="patate"))
assert len(jbs) == count_jobs(cluster="patate")
jb = get_job(cluster="patate")
assert jb in jbs


@pytest.mark.usefixtures("read_only_db", "tzlocal_is_mtl")
def test_get_job_resubmitted():
assert count_jobs(job_id=1_000_000) == 2
jb1, jb2 = get_jobs(job_id=1_000_000)
jb = get_job(job_id=1_000_000)

assert jb is not None
assert jb1.submit_time != jb2.submit_time
assert jb.submit_time == max(jb1.submit_time, jb2.submit_time)
Loading