Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dereference option in compress method #110

Merged
merged 30 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f598e05
wip
ekouts Apr 29, 2024
34b1e38
wip
ekouts May 2, 2024
96292a5
Merge branch 'main' of github.com:eth-cscs/pyfirecrest into refactor/…
ekouts May 15, 2024
76d210b
Add timers in async client
ekouts May 20, 2024
8314c84
Stash
ekouts May 24, 2024
01c5048
Don't merge reqs by default
ekouts May 28, 2024
28018d1
Bring back the old merge_request
ekouts May 28, 2024
a4019e2
Fix internal dictionaries keys
ekouts May 28, 2024
375bd81
Move decorator to wrapper GET function
ekouts May 28, 2024
3951eee
Merge branch 'main' of github.com:eth-cscs/pyfirecrest into refactor/…
ekouts Jun 3, 2024
1dd06e5
Fix tasks endpoint in reference
ekouts Jun 3, 2024
24f2a7d
Extend compress func in basic client
ekouts Jun 4, 2024
030ec1f
Extend compress func in basic client
ekouts Jun 4, 2024
4a4b87d
Merge branch 'main' of github.com:eth-cscs/pyfirecrest into refactor/…
ekouts Jun 4, 2024
6ea526b
Merge branch 'refactor/async_merge_reqs' of github.com:ekouts/pyfirec…
ekouts Jun 4, 2024
00f9e05
Extend compress and extract func in async client
ekouts Jun 4, 2024
3b346b3
Merge branch 'main' of github.com:eth-cscs/pyfirecrest into feat/exte…
ekouts Jun 13, 2024
c1f5673
Add dereference option in compress methods
ekouts Jun 13, 2024
bfd157e
Fix typo
ekouts Jun 13, 2024
f134d3d
Use exponential backoff for polling
ekouts Jun 13, 2024
eba5286
Add docs for `fail_on_timeout`
ekouts Jun 14, 2024
33d76e1
Merge branch 'feat/extend_compress_and_extract' of github.com:ekouts/…
ekouts Jun 14, 2024
7b7af00
Update docs text and make timeout string a global var
ekouts Jun 18, 2024
37a3e8b
Add comment in the code to explain the `fail_on_timeout` logic
ekouts Jun 18, 2024
e2b61dc
Update docs text and make timeout string a global var
ekouts Jun 18, 2024
60f7f5b
Add comment in the code to explain the `fail_on_timeout` logic
ekouts Jun 18, 2024
008725b
Merge branch 'feat/extend_compress_and_extract' of github.com:ekouts/…
ekouts Jun 18, 2024
239a61f
Fix typing errors
ekouts Jun 18, 2024
f9b78b9
Merge branch 'main' of github.com:eth-cscs/pyfirecrest into feat/comp…
ekouts Jun 19, 2024
14c57b7
Merge branch 'main' of github.com:eth-cscs/pyfirecrest into feat/comp…
ekouts Jul 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions firecrest/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ async def compress(
machine: str,
source_path: str,
target_path: str,
dereference: bool = False,
fail_on_timeout: bool = True
) -> str:
"""Compress files using gzip compression.
Expand All @@ -849,10 +850,17 @@ async def compress(

.. warning:: This is available only for FirecREST>=1.16.0
"""
data: dict[str, str | bool] = {
"targetPath": target_path,
"sourcePath": source_path,
}
if dereference:
data["dereference"] = dereference

resp = await self._post_request(
endpoint="/utilities/compress",
additional_headers={"X-Machine-Name": machine},
data={"targetPath": target_path, "sourcePath": source_path},
data=data,
)
# - If the response is 201, the request was successful so we can
# return the target path
Expand All @@ -876,7 +884,8 @@ async def compress(
job_info = await self.submit_compress_job(
machine,
source_path,
target_path
target_path,
dereference
)
jobid = job_info['jobid']
active_jobs = await self.poll_active(
Expand Down Expand Up @@ -1032,7 +1041,7 @@ async def stat(

:param machine: the machine name where the filesystem belongs to
:param target_path: the absolute target path
:param dereference: follow link (default False)
:param dereference: follow symbolic links
:calls: GET `/utilities/stat`
"""
params: dict[str, Any] = {"targetPath": target_path}
Expand Down Expand Up @@ -1617,6 +1626,7 @@ async def _internal_transfer(
account,
ret_response,
extension=None,
dereference=False,
):
data = {"targetPath": target_path}
if source_path:
Expand All @@ -1637,6 +1647,9 @@ async def _internal_transfer(
if extension:
data["extension"] = extension

if dereference:
data["dereference"] = dereference

resp = await self._post_request(
endpoint=endpoint, additional_headers={"X-Machine-Name": machine}, data=data
)
Expand Down Expand Up @@ -1734,6 +1747,7 @@ async def submit_compress_job(
machine: str,
source_path: str,
target_path: str,
dereference: bool = False,
job_name: Optional[str] = None,
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
Expand All @@ -1746,6 +1760,7 @@ async def submit_compress_job(
:param machine: the machine name where the scheduler belongs to
:param source_path: the absolute source path
:param target_path: the absolute target path
:param dereference: follow symbolic links
:param job_name: job name
:param time: limit on the total run time of the job. Acceptable time formats 'minutes', 'minutes:seconds', 'hours:minutes:seconds', 'days-hours', 'days-hours:minutes' and 'days-hours:minutes:seconds'.
:param stage_out_job_id: transfer data after job with ID {stage_out_job_id} is completed
Expand All @@ -1768,6 +1783,7 @@ async def submit_compress_job(
stage_out_job_id,
account,
resp,
dereference=dereference,
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
Expand Down
22 changes: 19 additions & 3 deletions firecrest/BasicClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ def compress(
machine: str,
source_path: str,
target_path: str,
dereference: bool = False,
fail_on_timeout: bool = True
) -> str:
"""Compress files using gzip compression.
Expand All @@ -617,10 +618,17 @@ def compress(

.. warning:: This is available only for FirecREST>=1.16.0
"""
data: dict[str, str | bool] = {
"targetPath": target_path,
"sourcePath": source_path
}
if dereference:
data["dereference"] = dereference

resp = self._post_request(
endpoint="/utilities/compress",
additional_headers={"X-Machine-Name": machine},
data={"targetPath": target_path, "sourcePath": source_path},
data=data,
)
# - If the response is 201, the request was successful so we can
# return the target path
Expand All @@ -644,7 +652,8 @@ def compress(
job_info = self.submit_compress_job(
machine,
source_path,
target_path
target_path,
dereference
)
jobid = job_info['jobid']
active_jobs = self.poll_active(
Expand Down Expand Up @@ -801,7 +810,7 @@ def stat(

:param machine: the machine name where the filesystem belongs to
:param target_path: the absolute target path
:param dereference: follow link (default False)
:param dereference: follow symbolic links
:calls: GET `/utilities/stat`
"""
params: dict[str, Any] = {"targetPath": target_path}
Expand Down Expand Up @@ -1416,6 +1425,7 @@ def _internal_transfer(
stage_out_job_id,
account,
extension=None,
dereference=False,
):
data = {"targetPath": target_path}
if source_path:
Expand All @@ -1436,6 +1446,9 @@ def _internal_transfer(
if extension:
data["extension"] = extension

if dereference:
data["dereference"] = dereference

resp = self._post_request(
endpoint=endpoint, additional_headers={"X-Machine-Name": machine}, data=data
)
Expand Down Expand Up @@ -1651,6 +1664,7 @@ def submit_compress_job(
machine: str,
source_path: str,
target_path: str,
dereference: bool = False,
job_name: Optional[str] = None,
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
Expand All @@ -1663,6 +1677,7 @@ def submit_compress_job(
:param machine: the machine name where the scheduler belongs to
:param source_path: the absolute source path
:param target_path: the absolute target path
:param dereference: follow symbolic links
:param job_name: job name
:param time: limit on the total run time of the job. Acceptable time formats 'minutes', 'minutes:seconds', 'hours:minutes:seconds', 'days-hours', 'days-hours:minutes' and 'days-hours:minutes:seconds'.
:param stage_out_job_id: transfer data after job with ID {stage_out_job_id} is completed
Expand All @@ -1684,6 +1699,7 @@ def submit_compress_job(
time,
stage_out_job_id,
account,
dereference=dereference,
)
logger.info(f"Job submission task: {json_response['task_id']}")
return self._poll_tasks(
Expand Down
8 changes: 7 additions & 1 deletion firecrest/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,13 @@ def submit_compress(
try:
console.print(
client.submit_compress_job(
system, source, destination, job_name, time, jobid, account
machine=system,
source_path=source,
target_path=destination,
job_name=job_name,
time=time,
stage_out_job_id=jobid,
account=account
)
)
except Exception as e:
Expand Down
Loading