Skip to content

Commit

Permalink
Merge pull request #9 from cms-DQM/add_dataset_lumisections
Browse files Browse the repository at this point in the history
  • Loading branch information
nothingface0 authored Aug 9, 2024
2 parents f720d3b + 3308b19 commit edfa0c1
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 141 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test_package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v4
Expand All @@ -34,4 +34,4 @@ jobs:
SSO_CLIENT_SECRET: ${{ secrets.SSO_CLIENT_SECRET }}
ENVIRONMENT: ${{ vars.ENVIRONMENT }}
run: |
pytest tests -s --retries 4
pytest tests --retries 2
162 changes: 86 additions & 76 deletions runregistry/runregistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import json
import requests
import warnings
from dotenv import load_dotenv
from cernrequests import get_api_token, get_with_token
from runregistry.utils import (
Expand All @@ -25,6 +26,16 @@
# Offline table
WAITING_DQM_GUI_CONSTANT = "waiting dqm gui"

# Valid Lumisection statuses
LUMISECTION_STATES = ["GOOD", "BAD", "STANDBY", "EXCLUDED", "NOTSET"]

ONLINE_RUN_STATES = ["SIGNOFF", "OPEN", "COMPLETED"]

OFFLINE_DATASET_STATES = ["SIGNOFF", "OPEN", "COMPLETED", WAITING_DQM_GUI_CONSTANT]

# Time to sleep between JSON creation checks
JSON_CREATION_SLEEP_TIME = 15

staging_cert = ""
staging_key = ""
api_url = ""
Expand Down Expand Up @@ -70,14 +81,12 @@ def _get_headers(token: str = ""):


def _get_token():
# if not use_cookies:
# return {"dummy": "yammy"}
"""
Gets the token required to query RR API through the CERN SSO.
:return: the token required to query Run Registry API. In particular 'connect.sid' is the one we are interested in
:return: the token required to query Run Registry API.
"""
# if os.getenv("ENVIRONMENT") == "development":
# return None
if os.getenv("ENVIRONMENT") == "local":
return ""
token, expiration_date = get_api_token(
client_id=client_id,
client_secret=client_secret,
Expand All @@ -100,7 +109,7 @@ def _get_page(
query_filter = transform_to_rr_run_filter(run_filter=query_filter)
elif data_type == "datasets" and not ignore_filter_transformation:
query_filter = transform_to_rr_dataset_filter(dataset_filter=query_filter)
if os.getenv("ENVIRONMENT") == "development":
if os.getenv("ENVIRONMENT") in ["development", "local"]:
print(url)
print(query_filter)
payload = json.dumps(
Expand Down Expand Up @@ -130,8 +139,12 @@ def get_run(run_number, **kwargs):
:param run_number: run_number of specified run
"""
run = get_runs(filter={"run_number": run_number}, **kwargs)
if len(run) != 1:
return None
if not run:
return {}
if len(run) > 1:
raise Exception(
f"Unexpected number of results returned for run {run_number} ({len(run)}), was expecting exactly 1"
)
return run[0]


Expand Down Expand Up @@ -160,10 +173,9 @@ def get_runs(limit=40000, compress_attributes=True, **kwargs):
"WARNING: fetching more than 10,000 runs from run registry. you probably want to pass a filter into get_runs, or else this will take a while."
)
if resource_count > 20000 and "filter" not in kwargs:
print(
raise Exception(
"ERROR: For run registry queries that retrieve more than 20,000 runs, you must pass a filter into get_runs, an empty filter get_runs(filter={}) works"
)
return None
for page_number in range(1, page_count):
additional_runs = _get_page(
page=page_number, url=url, data_type="runs", **kwargs
Expand Down Expand Up @@ -199,8 +211,12 @@ def get_dataset(run_number, dataset_name="online", **kwargs):
dataset = get_datasets(
filter={"run_number": run_number, "dataset_name": dataset_name}, **kwargs
)
if len(dataset) != 1:
return None
if not dataset:
return {}
if len(dataset) > 1:
raise Exception(
f"Unexpected number of results returned for dataset {dataset_name} of run {run_number} ({len(dataset)}), was expecting exactly 1"
)
return dataset[0]


Expand Down Expand Up @@ -228,10 +244,9 @@ def get_datasets(limit=40000, compress_attributes=True, **kwargs) -> list:
"WARNING: fetching more than 10,000 datasets. you probably want to pass a filter into get_datasets, or else this will take a while."
)
if resource_count > 20000 and "filter" not in kwargs:
print(
raise Exception(
"ERROR: For queries that retrieve more than 20,000 datasets, you must pass a filter into get_datasets, an empty filter get_datasets(filter={}) works"
)
return []
for page_number in range(1, page_count):
additional_datasets = _get_page(
page=page_number, url=url, data_type="datasets", **kwargs
Expand Down Expand Up @@ -259,14 +274,14 @@ def get_datasets(limit=40000, compress_attributes=True, **kwargs) -> list:
def get_cycles():
url = "{}/cycles/global".format(api_url)
headers = _get_headers(token=_get_token())
if os.getenv("ENVIRONMENT") == "development":
if os.getenv("ENVIRONMENT") in ["development", "local"]:
print(url)
return requests.get(url, headers=headers).json()


def _get_lumisection_helper(url, run_number, dataset_name="online", **kwargs):
"""
Puts the headers for all other lumisection methods
Puts the headers and POST data for all other lumisection methods
"""

headers = _get_headers(token=_get_token())
Expand All @@ -292,12 +307,28 @@ def get_oms_lumisections(run_number, dataset_name="online", **kwargs):

def get_lumisection_ranges(run_number, dataset_name="online", **kwargs):
"""
Gets the lumisection ranges of the specified dataset
Gets the lumisection ranges of the specified dataset. Returns
a list of dicts, each one containing a lumisection "range", dictated
by the 'start' and 'stop' keys of the dict. In the same dict,
the status, cause, and comments per component are found.
"""
url = "{}/lumisections/rr_lumisection_ranges".format(api_url)
return _get_lumisection_helper(url, run_number, dataset_name, **kwargs)


def get_lumisection_ranges_by_component(run_number, dataset_name="online", **kwargs):
"""
Gets the lumisection ranges of the specified dataset as a dict,
where the components are the keys (e.g. 'rpc-rpc'). Each dict value is
a list of lumisection "ranges" (dicts) for the specific component. The exact
range is dictated by the 'start' and 'stop' keys of the nested dict.
Similar to get_lumisection_ranges, but organized by component.
"""
url = "{}/lumisections/rr_lumisection_ranges_by_component".format(api_url)
return _get_lumisection_helper(url, run_number, dataset_name, **kwargs)


def get_oms_lumisection_ranges(run_number, **kwargs):
"""
Gets the OMS lumisection ranges of the specified dataset (saved in RR database)
Expand All @@ -320,6 +351,10 @@ def generate_json(json_logic, **kwargs):
DO NOT USE, USE THE ONE BELOW (create_json)...
It receives a json logic configuration and returns a json with lumisections which pass the filter
"""
warnings.warn(
"The generate_json is unsafe and will be deprecated. Please use create_json instead",
PendingDeprecationWarning,
)
if not isinstance(json_logic, str):
json_logic = json.dumps(json_logic)
url = "{}/json_creation/generate".format(api_url)
Expand Down Expand Up @@ -357,21 +392,18 @@ def create_json(json_logic, dataset_name_filter, **kwargs):
response = requests.post(url, headers=headers, data=payload)
if response.status_code == 200:
return response.json()["final_json"]
elif response.status_code == 202:
# stil processing
print(f"progress creating json: {response.json()['progress']}")
time.sleep(JSON_CREATION_SLEEP_TIME)
elif response.status_code == 203:
# stil processing
print("json process is submited and pending, please wait...")
time.sleep(JSON_CREATION_SLEEP_TIME)
else:
if response.status_code == 202:
# stil processing
print("progress creating json: ", response.json()["progress"])
time.sleep(15)
elif response.status_code == 203:
# stil processing
print("json process is submited and pending, please wait...")
time.sleep(15)
elif response.status_code == 500:
print("Error creating json")
return
else:
print("error generating json")
return
raise Exception(
f"Error {response.status_code} during JSON creation: {response.text}"
)


# advanced RR operations ==============================================================================
Expand All @@ -381,21 +413,13 @@ def move_runs(from_, to_, run=None, runs=[], **kwargs):
move run/runs from one state to another
"""
if not run and not runs:
print("move_runs(): no 'run' and 'runs' arguments were provided")
return
raise ValueError("move_runs(): no 'run' and 'runs' arguments were provided")

states = ["SIGNOFF", "OPEN", "COMPLETED"]
if from_ not in states or to_ not in states:
print(
"move_runs(): get states '",
from_,
"' , '",
to_,
"', while allowed states are ",
states,
", return",
if from_ not in ONLINE_RUN_STATES or to_ not in ONLINE_RUN_STATES:
raise ValueError(
f"move_runs(): got states '{from_}, '{to_}'",
f" but allowed states are {ONLINE_RUN_STATES}",
)
return

url = "%s/runs/move_run/%s/%s" % (api_url, from_, to_)

Expand All @@ -408,7 +432,7 @@ def move_runs(from_, to_, run=None, runs=[], **kwargs):
answers = []
for run_number in runs:
payload = json.dumps({"run_number": run_number})
answer = requests.post(url, headers=headers, data=payload).json()
answer = requests.post(url, headers=headers, data=payload)
answers.append(answer)

return answers
Expand All @@ -419,8 +443,9 @@ def make_significant_runs(run=None, runs=[], **kwargs):
mark run/runs significant
"""
if not run and not runs:
print("make_significant_runs(): no 'run' and 'runs' arguments were provided")
return
raise ValueError(
"make_significant_runs(): no 'run' and 'runs' arguments were provided"
)

url = "%s/runs/mark_significant" % (api_url)

Expand All @@ -445,10 +470,9 @@ def reset_RR_attributes_and_refresh_runs(runs=[], **kwargs):
"""
runs = __parse_runs_arg(runs)
if not runs:
print(
"reset_RR_attributes_and_refresh_runs(): no 'runs' arguments were provided"
raise ValueError(
"reset_RR_attributes_and_refresh_runs(): no 'runs' argument was provided"
)
return
headers = _get_headers(token=_get_token())
answers = []
for run_number in runs:
Expand All @@ -467,10 +491,9 @@ def manually_refresh_components_statuses_for_runs(runs=[], **kwargs):
runs = __parse_runs_arg(runs)

if not runs:
print(
"manually_refresh_components_statuses_for_runs(): no 'runs' arguments were provided, return"
raise ValueError(
"manually_refresh_components_statuses_for_runs(): no 'runs' argument was provided"
)
return

headers = _get_headers(token=_get_token())
answers = []
Expand All @@ -496,16 +519,11 @@ def edit_rr_lumisections(
"""
WIP edit RR lumisections attributes
"""
states = ["GOOD", "BAD", "STANDBY", "EXCLUDED", "NONSET"]
if status not in states:
print(
"edit_rr_lumisections(): get status '",
status,
"', while allowed statuses are ",
states,
", return",
if status not in LUMISECTION_STATES:
raise Exception(
f"edit_rr_lumisections(): got status '{status}'",
f" but allowed statuses are {LUMISECTION_STATES}",
)
return

url = "%s/lumisections/edit_rr_lumisections" % (api_url)

Expand Down Expand Up @@ -535,21 +553,13 @@ def move_datasets(
Requires a privileged token.
"""
if not run and not runs:
print("move_datasets(): no 'run' and 'runs' arguments were provided, return")
return
raise ValueError("move_datasets(): no 'run' and 'runs' arguments were provided")

states = ["SIGNOFF", "OPEN", "COMPLETED", WAITING_DQM_GUI_CONSTANT]
if from_ not in states or to_ not in states:
print(
"move_datasets(): get states '",
from_,
"' , '",
to_,
"', while allowed states are ",
states,
", return",
if from_ not in OFFLINE_DATASET_STATES or to_ not in OFFLINE_DATASET_STATES:
raise ValueError(
f"move_datasets(): got states '{from_}', '{to_}",
f" but allowed states are {OFFLINE_DATASET_STATES}",
)
return

url = "%s/datasets/%s/move_dataset/%s/%s" % (api_url, workspace, from_, to_)

Expand All @@ -559,7 +569,7 @@ def move_datasets(
payload = json.dumps(
{"run_number": run, "dataset_name": dataset_name, "workspace": workspace}
)
return requests.post(url, headers=headers, data=payload)
return [requests.post(url, headers=headers, data=payload)]

answers = []
for run_number in runs:
Expand Down
Loading

0 comments on commit edfa0c1

Please sign in to comment.