Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sleep times for task polling configurable #106

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 25 additions & 30 deletions firecrest/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ def __init__(
self._client = client
self._task_id = task_id

async def poll_task(self, final_status):
async def poll_task(self, final_status, sleep_times):
logger.info(f"Polling task {self._task_id} until status is {final_status}")
resp = await self._client._task_safe(self._task_id, self._responses)
while resp["status"] < final_status:
# The rate limit is handled by the async client so no need to
# add a `sleep` here
try:
await asyncio.sleep(next(sleep_times))
except StopIteration:
raise fe.PollingIterException(self._task_id)

resp = await self._client._task_safe(self._task_id, self._responses)

logger.info(f'Status of {self._task_id} is {resp["status"]}')
Expand Down Expand Up @@ -172,6 +175,12 @@ def __init__(
#: Number of retries in case the rate limit is reached. When it is set to `None`, the
#: client will keep trying until it gets a different status code than 429.
self.num_retries_rate_limit: Optional[int] = None
#: Set the sleep times for the polling of a task. When this is a
#: a list an error will be raised if the task is not finished after
#: the last sleep time. By default this an list of 250 zeros in this
#: client and the rate will be controlled by the request rate of the
#: `tasks` microservice.
self.polling_sleep_times: list = 250 * [0]
self._api_version: Version = parse("1.13.1")
self._session = httpx.AsyncClient()

Expand Down Expand Up @@ -551,20 +560,6 @@ async def _invalidate(
responses.append(resp)
return self._json_response(responses, 201, allow_none_result=True)

async def _poll_tasks(self, task_id: str, final_status, sleep_time):
logger.info(f"Polling task {task_id} until status is {final_status}")
resp = await self._task_safe(task_id)
while resp["status"] < final_status:
t = next(sleep_time)
logger.info(
f'Status of {task_id} is {resp["status"]}, sleeping for {t} sec'
)
await asyncio.sleep(t)
resp = await self._task_safe(task_id)

logger.info(f'Status of {task_id} is {resp["status"]}')
return resp["data"]

# Status
async def all_services(self) -> List[t.Service]:
"""Returns a list containing all available micro services with a name, description, and status.
Expand Down Expand Up @@ -1189,8 +1184,8 @@ async def submit(
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], [resp])

result = await t.poll_task("200", iter(self.polling_sleep_times))
# Inject taskid in the result
result = await t.poll_task("200")
result["firecrest_taskid"] = json_response["task_id"]
return result

Expand Down Expand Up @@ -1241,7 +1236,7 @@ async def poll(
json_response = self._json_response([resp], 200)
logger.info(f"Job polling task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], [resp])
res = await t.poll_task("200")
res = await t.poll_task("200", iter(self.polling_sleep_times))
# When there is no job in the sacct output firecrest will return an empty dictionary instead of list
if isinstance(res, dict):
return list(res.values())
Expand Down Expand Up @@ -1290,7 +1285,7 @@ async def poll_active(
json_response = self._json_response([resp], 200)
logger.info(f"Job active polling task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], [resp])
dict_result = await t.poll_task("200")
dict_result = await t.poll_task("200", iter(self.polling_sleep_times))
if len(jobids):
ret = [i for i in dict_result.values() if i["jobid"] in jobids]
else:
Expand Down Expand Up @@ -1325,7 +1320,7 @@ async def nodes(
)
json_response = self._json_response([resp], 200)
t = ComputeTask(self, json_response["task_id"], [resp])
result = await t.poll_task("200")
result = await t.poll_task("200", iter(self.polling_sleep_times))
return result

async def partitions(
Expand Down Expand Up @@ -1355,7 +1350,7 @@ async def partitions(
)
json_response = self._json_response([resp], 200)
t = ComputeTask(self, json_response["task_id"], [resp])
result = await t.poll_task("200")
result = await t.poll_task("200", iter(self.polling_sleep_times))
return result

async def reservations(
Expand All @@ -1382,7 +1377,7 @@ async def reservations(
)
json_response = self._json_response([resp], 200)
t = ComputeTask(self, json_response["task_id"], [resp])
result = await t.poll_task("200")
result = await t.poll_task("200", iter(self.polling_sleep_times))
return result

async def cancel(self, machine: str, job_id: str | int) -> str:
Expand All @@ -1402,7 +1397,7 @@ async def cancel(self, machine: str, job_id: str | int) -> str:
json_response = self._json_response([resp], 200)
logger.info(f"Job cancellation task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], [resp])
return await t.poll_task("200")
return await t.poll_task("200", iter(self.polling_sleep_times))

# Storage
async def _internal_transfer(
Expand Down Expand Up @@ -1484,7 +1479,7 @@ async def submit_move_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200")
return await t.poll_task("200", iter(self.polling_sleep_times))

async def submit_copy_job(
self,
Expand Down Expand Up @@ -1527,7 +1522,7 @@ async def submit_copy_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200")
return await t.poll_task("200", iter(self.polling_sleep_times))

async def submit_compress_job(
self,
Expand Down Expand Up @@ -1571,7 +1566,7 @@ async def submit_compress_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200")
return await t.poll_task("200", iter(self.polling_sleep_times))

async def submit_extract_job(
self,
Expand Down Expand Up @@ -1618,7 +1613,7 @@ async def submit_extract_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200")
return await t.poll_task("200", iter(self.polling_sleep_times))

async def submit_rsync_job(
self,
Expand Down Expand Up @@ -1661,7 +1656,7 @@ async def submit_rsync_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200")
return await t.poll_task("200", iter(self.polling_sleep_times))

async def submit_delete_job(
self,
Expand Down Expand Up @@ -1702,7 +1697,7 @@ async def submit_delete_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200")
return await t.poll_task("200", iter(self.polling_sleep_times))

async def external_upload(
self, machine: str, source_path: str, target_path: str
Expand Down
63 changes: 39 additions & 24 deletions firecrest/BasicClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,30 @@ def __init__(
) -> None:
self._firecrest_url = firecrest_url
self._authorization = authorization
# This should be used only for blocking operations that require multiple requests,
# not for external upload/download
# This should be used only for blocking operations that require
# multiple requests, not for external upload/download
self._current_method_requests: List[requests.Response] = []
self._verify = verify
self._sa_role = sa_role
#: This attribute will be passed to all the requests that will be made.
#: How many seconds to wait for the server to send data before giving up.
#: After that time a `requests.exceptions.Timeout` error will be raised.
#: This attribute will be passed to all the requests that will be
#: made. How many seconds to wait for the server to send data before
#: giving up. After that time a `requests.exceptions.Timeout` error
#: will be raised.
#:
#: It can be a float or a tuple. More details here: https://requests.readthedocs.io.
self.timeout: Optional[float | Tuple[float, float] | Tuple[float, None]] = None
#: Number of retries in case the rate limit is reached. When it is set to `None`, the
#: client will keep trying until it gets a different status code than 429.
#: It can be a float or a tuple. More details here:
#: https://requests.readthedocs.io.
self.timeout: Optional[
float | Tuple[float, float] | Tuple[float, None]
] = None
#: Number of retries in case the rate limit is reached. When it is
#: set to `None`, the client will keep trying until it gets a
#: different status code than 429.
self.num_retries_rate_limit: Optional[int] = None
#: Set the sleep times for the polling of a task. When this is a
#: a list an error will be raised if the task is not finished after
#: the last sleep time. By default the sleep times will sum to
#: 1 minute and the client will make 236 requests before failing.
self.polling_sleep_times: list = [1, 0.5] + 234 * [0.25]
self._api_version: Version = parse("1.13.1")
self._session = requests.Session()

Expand Down Expand Up @@ -372,9 +382,14 @@ def _poll_tasks(self, task_id: str, final_status, sleep_time):
resp = self._task_safe(task_id)
t = 1
while resp["status"] < final_status:
t = next(sleep_time, t)
try:
t = next(sleep_time)
except StopIteration:
raise fe.PollingIterException(task_id)

logger.info(
f'Status of {task_id} is {resp["status"]}, sleeping for {t} sec'
f'Status of {task_id} is {resp["status"]}, sleeping for {t} '
f'sec'
)
time.sleep(t)
resp = self._task_safe(task_id)
Expand Down Expand Up @@ -1057,7 +1072,7 @@ def submit(

# Inject taskid in the result
result = self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)
result["firecrest_taskid"] = json_response["task_id"]
return result
Expand Down Expand Up @@ -1094,7 +1109,7 @@ def poll(
json_response = self._acct_request(machine, jobids, start_time, end_time, page_size, page_number)
logger.info(f"Job polling task: {json_response['task_id']}")
res = self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)
# When there is no job in the sacct output firecrest will return an empty dictionary instead of list
if isinstance(res, dict):
Expand Down Expand Up @@ -1131,7 +1146,7 @@ def poll_active(
json_response = self._squeue_request(machine, jobids, page_size, page_number)
logger.info(f"Job active polling task: {json_response['task_id']}")
dict_result = self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)
return list(dict_result.values())

Expand Down Expand Up @@ -1163,7 +1178,7 @@ def nodes(
self._current_method_requests.append(resp)
json_response = self._json_response(self._current_method_requests, 200)
result = self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)
return result

Expand Down Expand Up @@ -1195,7 +1210,7 @@ def partitions(
self._current_method_requests.append(resp)
json_response = self._json_response(self._current_method_requests, 200)
result = self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)
return result

Expand Down Expand Up @@ -1224,7 +1239,7 @@ def reservations(
self._current_method_requests.append(resp)
json_response = self._json_response(self._current_method_requests, 200)
result = self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)
return result

Expand All @@ -1247,7 +1262,7 @@ def cancel(self, machine: str, job_id: str | int) -> str:
json_response = self._json_response(self._current_method_requests, 200)
logger.info(f"Job cancellation task: {json_response['task_id']}")
return self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)

# Storage
Expand Down Expand Up @@ -1328,7 +1343,7 @@ def submit_move_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
return self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)

def submit_copy_job(
Expand Down Expand Up @@ -1371,7 +1386,7 @@ def submit_copy_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
return self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)

def submit_rsync_job(
Expand Down Expand Up @@ -1414,7 +1429,7 @@ def submit_rsync_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
return self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)

def submit_delete_job(
Expand Down Expand Up @@ -1455,7 +1470,7 @@ def submit_delete_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
return self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)

def external_upload(
Expand Down Expand Up @@ -1533,7 +1548,7 @@ def submit_compress_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
return self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)

def submit_extract_job(
Expand Down Expand Up @@ -1581,7 +1596,7 @@ def submit_extract_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
return self._poll_tasks(
json_response["task_id"], "200", iter([1, 0.5, 0.25])
json_response["task_id"], "200", iter(self.polling_sleep_times)
)

# Reservation
Expand Down
14 changes: 14 additions & 0 deletions firecrest/FirecrestException.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,17 @@ class StorageDownloadException(FirecrestException):

class StorageUploadException(FirecrestException):
"""Exception raised by a failed external upload"""


class PollingIterException(Exception):
"""Exception raised when the polling iterator is exhausted"""

def __init__(self, task_id):
self._task_id = task_id

def __str__(self):
return (
f"polling iterator for task {self._task_id} "
f"is exhausted. Update `polling_sleep_times` of the client "
f"to increase the number of polling attempts."
)
4 changes: 3 additions & 1 deletion tests/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ class ValidAuthorization:
def get_access_token(self):
return "VALID_TOKEN"

return firecrest.Firecrest(
client = firecrest.Firecrest(
firecrest_url=fc_server.url_for("/"), authorization=ValidAuthorization()
)
client.polling_sleep_times = [0, 0, 0]
return client


@pytest.fixture
Expand Down
Loading