From 1fb70779098aa1cedd94804aaaed6ac63aa830cf Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Sun, 12 May 2024 13:04:52 +0200 Subject: [PATCH 1/2] Make polling sleep times configurable --- firecrest/AsyncClient.py | 55 +++++++++++++--------------- firecrest/BasicClient.py | 63 ++++++++++++++++++++------------- firecrest/FirecrestException.py | 14 ++++++++ tests/test_compute.py | 4 ++- 4 files changed, 81 insertions(+), 55 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index cf09e99..2e6012b 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -62,12 +62,15 @@ def __init__( self._client = client self._task_id = task_id - async def poll_task(self, final_status): + async def poll_task(self, final_status, sleep_times): logger.info(f"Polling task {self._task_id} until status is {final_status}") resp = await self._client._task_safe(self._task_id, self._responses) while resp["status"] < final_status: - # The rate limit is handled by the async client so no need to - # add a `sleep` here + try: + await asyncio.sleep(next(sleep_times)) + except StopIteration: + raise fe.PollingIterException(self._task_id) + resp = await self._client._task_safe(self._task_id, self._responses) logger.info(f'Status of {self._task_id} is {resp["status"]}') @@ -172,6 +175,12 @@ def __init__( #: Number of retries in case the rate limit is reached. When it is set to `None`, the #: client will keep trying until it gets a different status code than 429. self.num_retries_rate_limit: Optional[int] = None + #: Set the sleep times for the polling of a task. When this is a + #: a list an error will be raised if the task is not finished after + #: the last sleep time. By default this an list of 100 zeros in this + #: client and the rate will be controlled by the request rate of the + #: `tasks` microservice. + self.polling_sleep_times: list = 100 * [0] self._api_version: Version = parse("1.13.1") self._session = httpx.AsyncClient() @@ -551,20 +560,6 @@ async def _invalidate( responses.append(resp) return self._json_response(responses, 201, allow_none_result=True) - async def _poll_tasks(self, task_id: str, final_status, sleep_time): - logger.info(f"Polling task {task_id} until status is {final_status}") - resp = await self._task_safe(task_id) - while resp["status"] < final_status: - t = next(sleep_time) - logger.info( - f'Status of {task_id} is {resp["status"]}, sleeping for {t} sec' - ) - await asyncio.sleep(t) - resp = await self._task_safe(task_id) - - logger.info(f'Status of {task_id} is {resp["status"]}') - return resp["data"] - # Status async def all_services(self) -> List[t.Service]: """Returns a list containing all available micro services with a name, description, and status. @@ -1189,8 +1184,8 @@ async def submit( logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], [resp]) + result = await t.poll_task("200", iter(self.polling_sleep_times)) # Inject taskid in the result - result = await t.poll_task("200") result["firecrest_taskid"] = json_response["task_id"] return result @@ -1241,7 +1236,7 @@ async def poll( json_response = self._json_response([resp], 200) logger.info(f"Job polling task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], [resp]) - res = await t.poll_task("200") + res = await t.poll_task("200", iter(self.polling_sleep_times)) # When there is no job in the sacct output firecrest will return an empty dictionary instead of list if isinstance(res, dict): return list(res.values()) @@ -1290,7 +1285,7 @@ async def poll_active( json_response = self._json_response([resp], 200) logger.info(f"Job active polling task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], [resp]) - dict_result = await t.poll_task("200") + dict_result = await t.poll_task("200", iter(self.polling_sleep_times)) if len(jobids): ret = [i for i in dict_result.values() if i["jobid"] in jobids] else: @@ -1325,7 +1320,7 @@ async def nodes( ) json_response = self._json_response([resp], 200) t = ComputeTask(self, json_response["task_id"], [resp]) - result = await t.poll_task("200") + result = await t.poll_task("200", iter(self.polling_sleep_times)) return result async def partitions( @@ -1355,7 +1350,7 @@ async def partitions( ) json_response = self._json_response([resp], 200) t = ComputeTask(self, json_response["task_id"], [resp]) - result = await t.poll_task("200") + result = await t.poll_task("200", iter(self.polling_sleep_times)) return result async def reservations( @@ -1382,7 +1377,7 @@ async def reservations( ) json_response = self._json_response([resp], 200) t = ComputeTask(self, json_response["task_id"], [resp]) - result = await t.poll_task("200") + result = await t.poll_task("200", iter(self.polling_sleep_times)) return result async def cancel(self, machine: str, job_id: str | int) -> str: @@ -1402,7 +1397,7 @@ async def cancel(self, machine: str, job_id: str | int) -> str: json_response = self._json_response([resp], 200) logger.info(f"Job cancellation task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], [resp]) - return await t.poll_task("200") + return await t.poll_task("200", iter(self.polling_sleep_times)) # Storage async def _internal_transfer( @@ -1484,7 +1479,7 @@ async def submit_move_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200") + return await t.poll_task("200", iter(self.polling_sleep_times)) async def submit_copy_job( self, @@ -1527,7 +1522,7 @@ async def submit_copy_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200") + return await t.poll_task("200", iter(self.polling_sleep_times)) async def submit_compress_job( self, @@ -1571,7 +1566,7 @@ async def submit_compress_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200") + return await t.poll_task("200", iter(self.polling_sleep_times)) async def submit_extract_job( self, @@ -1618,7 +1613,7 @@ async def submit_extract_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200") + return await t.poll_task("200", iter(self.polling_sleep_times)) async def submit_rsync_job( self, @@ -1661,7 +1656,7 @@ async def submit_rsync_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200") + return await t.poll_task("200", iter(self.polling_sleep_times)) async def submit_delete_job( self, @@ -1702,7 +1697,7 @@ async def submit_delete_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200") + return await t.poll_task("200", iter(self.polling_sleep_times)) async def external_upload( self, machine: str, source_path: str, target_path: str diff --git a/firecrest/BasicClient.py b/firecrest/BasicClient.py index 5748d54..5986d8f 100644 --- a/firecrest/BasicClient.py +++ b/firecrest/BasicClient.py @@ -125,20 +125,30 @@ def __init__( ) -> None: self._firecrest_url = firecrest_url self._authorization = authorization - # This should be used only for blocking operations that require multiple requests, - # not for external upload/download + # This should be used only for blocking operations that require + # multiple requests, not for external upload/download self._current_method_requests: List[requests.Response] = [] self._verify = verify self._sa_role = sa_role - #: This attribute will be passed to all the requests that will be made. - #: How many seconds to wait for the server to send data before giving up. - #: After that time a `requests.exceptions.Timeout` error will be raised. + #: This attribute will be passed to all the requests that will be + #: made. How many seconds to wait for the server to send data before + #: giving up. After that time a `requests.exceptions.Timeout` error + #: will be raised. #: - #: It can be a float or a tuple. More details here: https://requests.readthedocs.io. - self.timeout: Optional[float | Tuple[float, float] | Tuple[float, None]] = None - #: Number of retries in case the rate limit is reached. When it is set to `None`, the - #: client will keep trying until it gets a different status code than 429. + #: It can be a float or a tuple. More details here: + #: https://requests.readthedocs.io. + self.timeout: Optional[ + float | Tuple[float, float] | Tuple[float, None] + ] = None + #: Number of retries in case the rate limit is reached. When it is + #: set to `None`, the client will keep trying until it gets a + #: different status code than 429. self.num_retries_rate_limit: Optional[int] = None + #: Set the sleep times for the polling of a task. When this is a + #: a list an error will be raised if the task is not finished after + #: the last sleep time. By default the sleep times will sum to + #: 10.5sec. + self.polling_sleep_times: list = [1, 0.5] + 4 * 60 * 9 * [0.25] self._api_version: Version = parse("1.13.1") self._session = requests.Session() @@ -372,9 +382,14 @@ def _poll_tasks(self, task_id: str, final_status, sleep_time): resp = self._task_safe(task_id) t = 1 while resp["status"] < final_status: - t = next(sleep_time, t) + try: + t = next(sleep_time) + except StopIteration: + raise fe.PollingIterException(task_id) + logger.info( - f'Status of {task_id} is {resp["status"]}, sleeping for {t} sec' + f'Status of {task_id} is {resp["status"]}, sleeping for {t} ' + f'sec' ) time.sleep(t) resp = self._task_safe(task_id) @@ -1057,7 +1072,7 @@ def submit( # Inject taskid in the result result = self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) result["firecrest_taskid"] = json_response["task_id"] return result @@ -1094,7 +1109,7 @@ def poll( json_response = self._acct_request(machine, jobids, start_time, end_time, page_size, page_number) logger.info(f"Job polling task: {json_response['task_id']}") res = self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) # When there is no job in the sacct output firecrest will return an empty dictionary instead of list if isinstance(res, dict): @@ -1131,7 +1146,7 @@ def poll_active( json_response = self._squeue_request(machine, jobids, page_size, page_number) logger.info(f"Job active polling task: {json_response['task_id']}") dict_result = self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) return list(dict_result.values()) @@ -1163,7 +1178,7 @@ def nodes( self._current_method_requests.append(resp) json_response = self._json_response(self._current_method_requests, 200) result = self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) return result @@ -1195,7 +1210,7 @@ def partitions( self._current_method_requests.append(resp) json_response = self._json_response(self._current_method_requests, 200) result = self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) return result @@ -1224,7 +1239,7 @@ def reservations( self._current_method_requests.append(resp) json_response = self._json_response(self._current_method_requests, 200) result = self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) return result @@ -1247,7 +1262,7 @@ def cancel(self, machine: str, job_id: str | int) -> str: json_response = self._json_response(self._current_method_requests, 200) logger.info(f"Job cancellation task: {json_response['task_id']}") return self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) # Storage @@ -1328,7 +1343,7 @@ def submit_move_job( ) logger.info(f"Job submission task: {json_response['task_id']}") return self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) def submit_copy_job( @@ -1371,7 +1386,7 @@ def submit_copy_job( ) logger.info(f"Job submission task: {json_response['task_id']}") return self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) def submit_rsync_job( @@ -1414,7 +1429,7 @@ def submit_rsync_job( ) logger.info(f"Job submission task: {json_response['task_id']}") return self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) def submit_delete_job( @@ -1455,7 +1470,7 @@ def submit_delete_job( ) logger.info(f"Job submission task: {json_response['task_id']}") return self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) def external_upload( @@ -1533,7 +1548,7 @@ def submit_compress_job( ) logger.info(f"Job submission task: {json_response['task_id']}") return self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) def submit_extract_job( @@ -1581,7 +1596,7 @@ def submit_extract_job( ) logger.info(f"Job submission task: {json_response['task_id']}") return self._poll_tasks( - json_response["task_id"], "200", iter([1, 0.5, 0.25]) + json_response["task_id"], "200", iter(self.polling_sleep_times) ) # Reservation diff --git a/firecrest/FirecrestException.py b/firecrest/FirecrestException.py index 127a4b7..fddf6dd 100644 --- a/firecrest/FirecrestException.py +++ b/firecrest/FirecrestException.py @@ -98,3 +98,17 @@ class StorageDownloadException(FirecrestException): class StorageUploadException(FirecrestException): """Exception raised by a failed external upload""" + + +class PollingIterException(Exception): + """Exception raised when the polling iterator is exhausted""" + + def __init__(self, task_id): + self._task_id = task_id + + def __str__(self): + return ( + f"polling iterator for task {self._task_id} " + f"is exhausted. Update `polling_sleep_times` of the client " + f"to increase the number of polling attempts." + ) diff --git a/tests/test_compute.py b/tests/test_compute.py index f45394b..2794fc7 100644 --- a/tests/test_compute.py +++ b/tests/test_compute.py @@ -20,9 +20,11 @@ class ValidAuthorization: def get_access_token(self): return "VALID_TOKEN" - return firecrest.Firecrest( + client = firecrest.Firecrest( firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization() ) + client.polling_sleep_times = [0, 0, 0] + return client @pytest.fixture From 24c626a540da250b252af5470d7e7b0a3a426633 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 29 May 2024 17:03:22 +0300 Subject: [PATCH 2/2] Decrease max polling sleep time --- firecrest/AsyncClient.py | 4 ++-- firecrest/BasicClient.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index 2e6012b..ff09f98 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -177,10 +177,10 @@ def __init__( self.num_retries_rate_limit: Optional[int] = None #: Set the sleep times for the polling of a task. When this is a #: a list an error will be raised if the task is not finished after - #: the last sleep time. By default this an list of 100 zeros in this + #: the last sleep time. By default this an list of 250 zeros in this #: client and the rate will be controlled by the request rate of the #: `tasks` microservice. - self.polling_sleep_times: list = 100 * [0] + self.polling_sleep_times: list = 250 * [0] self._api_version: Version = parse("1.13.1") self._session = httpx.AsyncClient() diff --git a/firecrest/BasicClient.py b/firecrest/BasicClient.py index 5986d8f..39d5e32 100644 --- a/firecrest/BasicClient.py +++ b/firecrest/BasicClient.py @@ -147,8 +147,8 @@ def __init__( #: Set the sleep times for the polling of a task. When this is a #: a list an error will be raised if the task is not finished after #: the last sleep time. By default the sleep times will sum to - #: 10.5sec. - self.polling_sleep_times: list = [1, 0.5] + 4 * 60 * 9 * [0.25] + #: 1 minute and the client will make 236 requests before failing. + self.polling_sleep_times: list = [1, 0.5] + 234 * [0.25] self._api_version: Version = parse("1.13.1") self._session = requests.Session()