From c935c54efc2780d218f37b42545e5c0332ae4913 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Sat, 29 Oct 2022 11:35:56 -0700 Subject: [PATCH 1/5] Fix race condition for spot logs --- sky/spot/spot_utils.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sky/spot/spot_utils.py b/sky/spot/spot_utils.py index 5dc402e1ba1..418c0402991 100644 --- a/sky/spot/spot_utils.py +++ b/sky/spot/spot_utils.py @@ -239,10 +239,15 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: spot_status = spot_state.get_status(job_id) continue handle = global_user_state.get_handle_from_cluster_name(cluster_name) - returncode = backend.tail_logs(handle, - job_id=None, - spot_job_id=job_id, - follow=follow) + returncode = 1 + # The cluster can be removed from the table before the spot state is + # updated by the controller. In this case, we should skip the logging, + # and wait for the next round of status check. + if handle is not None: + returncode = backend.tail_logs(handle, + job_id=None, + spot_job_id=job_id, + follow=follow) if returncode == 0: # If the log tailing exit successfully (the real job can be # SUCCEEDED or FAILED), we can safely break the loop. We use the From a66aa263325cec34f28c36a7f16f7ca1b23d130d Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Sat, 29 Oct 2022 12:09:36 -0700 Subject: [PATCH 2/5] fix --- sky/spot/spot_utils.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/sky/spot/spot_utils.py b/sky/spot/spot_utils.py index 418c0402991..9b68f6bc08a 100644 --- a/sky/spot/spot_utils.py +++ b/sky/spot/spot_utils.py @@ -231,23 +231,21 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: backend = backends.CloudVmRayBackend() spot_status = spot_state.get_status(job_id) while not spot_status.is_terminal(): - if spot_status != spot_state.SpotStatus.RUNNING: + handle = global_user_state.get_handle_from_cluster_name(cluster_name) + # The cluster can be removed from the table before the spot state is + # updated by the controller. In this case, we should skip the logging, + # and wait for the next round of status check. + if handle is None or spot_status != spot_state.SpotStatus.RUNNING: logger.info(f'INFO: The log is not ready yet, as the spot job ' f'is {spot_status.value}. ' f'Waiting for {JOB_STATUS_CHECK_GAP_SECONDS} seconds.') time.sleep(JOB_STATUS_CHECK_GAP_SECONDS) spot_status = spot_state.get_status(job_id) continue - handle = global_user_state.get_handle_from_cluster_name(cluster_name) - returncode = 1 - # The cluster can be removed from the table before the spot state is - # updated by the controller. In this case, we should skip the logging, - # and wait for the next round of status check. - if handle is not None: - returncode = backend.tail_logs(handle, - job_id=None, - spot_job_id=job_id, - follow=follow) + returncode = backend.tail_logs(handle, + job_id=None, + spot_job_id=job_id, + follow=follow) if returncode == 0: # If the log tailing exit successfully (the real job can be # SUCCEEDED or FAILED), we can safely break the loop. We use the From 80c305393c5982eabb57fc3520b59b41b226dbf3 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Sat, 29 Oct 2022 12:15:17 -0700 Subject: [PATCH 3/5] fix comment --- sky/spot/spot_utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sky/spot/spot_utils.py b/sky/spot/spot_utils.py index 9b68f6bc08a..f53584eb367 100644 --- a/sky/spot/spot_utils.py +++ b/sky/spot/spot_utils.py @@ -232,9 +232,9 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: spot_status = spot_state.get_status(job_id) while not spot_status.is_terminal(): handle = global_user_state.get_handle_from_cluster_name(cluster_name) - # The cluster can be removed from the table before the spot state is - # updated by the controller. In this case, we should skip the logging, - # and wait for the next round of status check. + # Check the handle: The cluster can be removed from the table before the + # spot state is updated by the controller. In this case, we should skip + # the logging, and wait for the next round of status check. if handle is None or spot_status != spot_state.SpotStatus.RUNNING: logger.info(f'INFO: The log is not ready yet, as the spot job ' f'is {spot_status.value}. ' From 14e5d3b08cc9a4acf19c42ab52a66101623da02e Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Sun, 30 Oct 2022 23:15:58 -0700 Subject: [PATCH 4/5] address comments --- sky/spot/spot_utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sky/spot/spot_utils.py b/sky/spot/spot_utils.py index f53584eb367..a3ffbfc7b0f 100644 --- a/sky/spot/spot_utils.py +++ b/sky/spot/spot_utils.py @@ -230,14 +230,17 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: cluster_name = generate_spot_cluster_name(task_name, job_id) backend = backends.CloudVmRayBackend() spot_status = spot_state.get_status(job_id) - while not spot_status.is_terminal(): + while spot_status is None or not spot_status.is_terminal(): handle = global_user_state.get_handle_from_cluster_name(cluster_name) # Check the handle: The cluster can be removed from the table before the # spot state is updated by the controller. In this case, we should skip # the logging, and wait for the next round of status check. if handle is None or spot_status != spot_state.SpotStatus.RUNNING: - logger.info(f'INFO: The log is not ready yet, as the spot job ' - f'is {spot_status.value}. ' + status_help_str = '' + if (spot_status is not None and + spot_status != spot_state.SpotStatus.RUNNING): + status_help_str = f', as the spot job is {spot_status.value}' + logger.info(f'INFO: The log is not ready yet{status_help_str}. ' f'Waiting for {JOB_STATUS_CHECK_GAP_SECONDS} seconds.') time.sleep(JOB_STATUS_CHECK_GAP_SECONDS) spot_status = spot_state.get_status(job_id) From 855ae907d1929896d9176ac1bbdf439db20cff0e Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 31 Oct 2022 10:40:41 -0700 Subject: [PATCH 5/5] add comment --- sky/spot/spot_utils.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sky/spot/spot_utils.py b/sky/spot/spot_utils.py index a3ffbfc7b0f..0700c1da751 100644 --- a/sky/spot/spot_utils.py +++ b/sky/spot/spot_utils.py @@ -230,11 +230,14 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: cluster_name = generate_spot_cluster_name(task_name, job_id) backend = backends.CloudVmRayBackend() spot_status = spot_state.get_status(job_id) + # spot_status can be None if the controller process just started and has + # not updated the spot status yet. while spot_status is None or not spot_status.is_terminal(): handle = global_user_state.get_handle_from_cluster_name(cluster_name) - # Check the handle: The cluster can be removed from the table before the - # spot state is updated by the controller. In this case, we should skip - # the logging, and wait for the next round of status check. + # Check the handle: The cluster can be preempted and removed from the + # table before the spot state is updated by the controller. In this + # case, we should skip the logging, and wait for the next round of + # status check. if handle is None or spot_status != spot_state.SpotStatus.RUNNING: status_help_str = '' if (spot_status is not None and