Skip to content

Commit

Permalink
implement fileops wait
Browse files Browse the repository at this point in the history
  • Loading branch information
sahil-verma-ib committed Jul 30, 2024
1 parent 17a49d7 commit 7f50023
Showing 1 changed file with 48 additions and 6 deletions.
54 changes: 48 additions & 6 deletions ib_cicd/ib_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,28 +442,70 @@ def list_directory(ib_host, folder, api_token):
return paths


def wait_until_job_finishes(ib_host, job_id, job_type, api_token):
def check_file_ops_status(ib_host, job_id, job_type, api_token):
"""
Polls status of an async file operation
(https://instabase.com/docs/apis/v2-apis/filesystem-v2/index.html#poll-status-of-an-async-file-operation)
:param ib_host: (string) IB host url (e.g. https://www.instabase.com)
:param job_id: (string) job id to look into
:param job_type: (string) job type [copy, move, delete, extract]
:param api_token: (string) api token for IB environment
:return: Response object
"""
url = ib_host + f"/api/v2/files/{job_type}/jobs/{job_id}"

headers = {"Authorization": "Bearer {0}".format(api_token)}

resp = requests.get(url, headers=headers, verify=False)

# Verify request is successful
content = json.loads(resp.content)
if resp.status_code != 200 or (
"state" in content and content["state"] == "ERROR"
):
raise Exception(f"Error checking job status: {resp.content}")

return resp


def wait_until_job_finishes(ib_host, job_id, job_type, api_token, operation_type='job'):
"""
Helper function to continuously wait until a job finishes (uses job status api to determine this)
:param ib_host: (string) IB host url (e.g. https://www.instabase.com)
:param job_id: (string) job id to look into
:param job_type: (string) job type [flow, refiner, job, async, group]
:param job_type: (string) job type (operation_type=job: flow, refiner, job, async, group),
(operation_type=file_ops: copy, move, delete, extract)
:param api_token: (string) api token for IB environment
:param operation_type: (string) operation type [flow, file_ops]
:return: bool indicating whether job completed successfully
"""
still_running = True
job_status_callbacks = {
"file_ops": check_file_ops_status,
"job": check_job_status
}
valid_operation_types = {
"file_ops": ['copy', 'move', 'delete', 'extract'],
"job": ['flow', 'refiner', 'job','async', 'group']
}
if job_type not in valid_operation_types[operation_type]:
raise Exception(f"Invalid job type {job_type} for the passed operation type {operation_type}!")

job_status_callback = job_status_callbacks[operation_type]

while still_running:
job_status_response = check_job_status(ib_host, job_id, job_type, api_token)
job_status_response = job_status_callback(ib_host, job_id, job_type, api_token)
job_status_response_content = json.loads(job_status_response.content)
status = job_status_response_content["status"]

state = job_status_response_content["state"]

if status != "OK":
if operation_type == "job" and job_status_response_content["status"] != "OK":
return False

still_running = state != "DONE" and state != "COMPLETE"
still_running = not state in ("DONE", "COMPLETE", "FAILED")
time.sleep(5)

return True
Expand Down

0 comments on commit 7f50023

Please sign in to comment.