Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flyin - vscode resume task #2017

Merged
merged 10 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
HEARTBEAT_PATH = os.path.expanduser("~/.local/share/code-server/heartbeat")

INTERACTIVE_DEBUGGING_FILE_NAME = "flyin_interactive_entrypoint.py"
RESUME_TASK_FILE_NAME = "flyin_resume_task.py"
# Config keys to store in task template
VSCODE_TYPE_KEY = "flyin_type"
VSCODE_PORT_KEY = "flyin_port"
129 changes: 110 additions & 19 deletions plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from threading import Event
import json
import multiprocessing
import os
import platform
import shutil
import signal
import subprocess
import sys
import tarfile
import time
from typing import Callable, Optional
from flytekitplugins.flyin.utils import load_module_from_path

import fsspec

Expand All @@ -23,6 +26,7 @@
HEARTBEAT_PATH,
INTERACTIVE_DEBUGGING_FILE_NAME,
MAX_IDLE_SECONDS,
RESUME_TASK_FILE_NAME,
)


Expand All @@ -44,25 +48,36 @@

def exit_handler(
child_process: multiprocessing.Process,
fn,
args,
kwargs,
max_idle_seconds: int = 180,
post_execute: Optional[Callable] = None,
):
"""
Check the modified time of ~/.local/share/code-server/heartbeat.
If it is older than max_idle_second seconds, kill the container.
Otherwise, check again every HEARTBEAT_CHECK_SECONDS.
1. Check the modified time of ~/.local/share/code-server/heartbeat.
If it is older than max_idle_second seconds, kill the container.
Otherwise, check again every HEARTBEAT_CHECK_SECONDS.
2. Wait for user to resume the task. If resume_task is set, terminate the VSCode server, reload the task function, and run it with the input of the task.

Args:
child_process (multiprocessing.Process, optional): The process to be terminated.
max_idle_seconds (int, optional): The duration in seconds to live after no activity detected.
post_execute (function, optional): The function to be executed before the vscode is self-terminated.
"""

def terminate_process():
if post_execute is not None:
post_execute()
logger.info("Post execute function executed successfully!")
child_process.terminate()
child_process.join()

Check warning on line 74 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L69-L74

Added lines #L69 - L74 were not covered by tests

logger = flytekit.current_context().logging
start_time = time.time()
delta = 0

while True:
while not resume_task.is_set():

Check warning on line 80 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L80

Added line #L80 was not covered by tests
if not os.path.exists(HEARTBEAT_PATH):
delta = time.time() - start_time
logger.info(f"Code server has not been connected since {delta} seconds ago.")
Expand All @@ -74,14 +89,27 @@
# If the time from last connection is longer than max idle seconds, terminate the vscode server.
if delta > max_idle_seconds:
logger.info(f"VSCode server is idle for more than {max_idle_seconds} seconds. Terminating...")
if post_execute is not None:
post_execute()
logger.info("Post execute function executed successfully!")
child_process.terminate()
child_process.join()
terminate_process()

Check warning on line 92 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L92

Added line #L92 was not covered by tests
sys.exit()

time.sleep(HEARTBEAT_CHECK_SECONDS)
# Wait for HEARTBEAT_CHECK_SECONDS seconds, but return immediately when resume_task is set.
resume_task.wait(timeout=HEARTBEAT_CHECK_SECONDS)

Check warning on line 96 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L96

Added line #L96 was not covered by tests
troychiu marked this conversation as resolved.
Show resolved Hide resolved

# User has resumed the task.
terminate_process()

Check warning on line 99 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L99

Added line #L99 was not covered by tests

# Reload the task function since it may be modified.
task_function = getattr(

Check warning on line 102 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L102

Added line #L102 was not covered by tests
load_module_from_path(fn.__module__, os.path.join(os.getcwd(), f"{fn.__module__}.py")), fn.__name__
)

# Get the actual function from the task.
while hasattr(task_function, "__wrapped__"):
if isinstance(task_function, vscode):
task_function = task_function.__wrapped__
break
task_function = task_function.__wrapped__
return task_function(*args, **kwargs)

Check warning on line 112 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L107-L112

Added lines #L107 - L112 were not covered by tests


def download_file(url, target_dir: Optional[str] = "."):
Expand Down Expand Up @@ -231,7 +259,36 @@
with open(INTERACTIVE_DEBUGGING_FILE_NAME, "w") as file:
file.write(python_script)

# Generate a launch.json

def prepare_resume_task_python():
"""
Generate a Python script for users to resume the task.
"""

file_name = RESUME_TASK_FILE_NAME
python_script = f"""import os

Check warning on line 269 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L268-L269

Added lines #L268 - L269 were not covered by tests
import signal

if __name__ == "__main__":
print("Terminating server and resuming task.")
answer = input("This operation will kill the server. All unsaved data will be lost, and you will no longer be able to connect to it. Do you really want to terminate? (Y/N): ").strip().upper()
if answer == 'Y':
PID = {os.getpid()}
os.kill(PID, signal.SIGTERM)
print(f"The server has been terminated and the task has been resumed.")
else:
print("Operation canceled.")
"""

with open(file_name, "w") as file:
file.write(python_script)

Check warning on line 284 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L283-L284

Added lines #L283 - L284 were not covered by tests


def prepare_launch_json():
"""
Generate the launch.json for users to easily launch interactive debugging and task resumption.
"""

launch_json = {
"version": "0.2.0",
"configurations": [
Expand All @@ -242,7 +299,15 @@
"program": os.path.join(os.getcwd(), INTERACTIVE_DEBUGGING_FILE_NAME),
"console": "integratedTerminal",
"justMyCode": True,
}
},
{
"name": "Resume Task",
"type": "python",
"request": "launch",
"program": os.path.join(os.getcwd(), RESUME_TASK_FILE_NAME),
"console": "integratedTerminal",
"justMyCode": True,
},
],
}

Expand All @@ -254,6 +319,14 @@
json.dump(launch_json, file, indent=4)


def resume_task_handler(signum, frame):
"""
The signal handler for task resumption.
"""
resume_task.set()

Check warning on line 326 in plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-flyin/flytekitplugins/flyin/vscode_lib/decorator.py#L326

Added line #L326 was not covered by tests


resume_task = Event()
VSCODE_TYPE_VALUE = "vscode"


Expand All @@ -274,8 +347,10 @@
1. Overrides the user function with a VSCode setup function.
2. Download vscode server and extension from remote to local.
3. Prepare the interactive debugging Python script and launch.json.
4. Launches and monitors the VSCode server.
5. Terminates if the server is idle for a set duration.
4. Prepare task resumption script.
5. Launches and monitors the VSCode server.
6. Register signal handler for task resumption.
7. Terminates if the server is idle for a set duration or user trigger task resumption.

Args:
fn (function, optional): The user function to be decorated. Defaults to None.
Expand Down Expand Up @@ -321,7 +396,7 @@

# 1. If the decorator is disabled, we don't launch the VSCode server.
# 2. When user use pyflyte run or python to execute the task, we don't launch the VSCode server.
# Only when user use pyflyte run --remote to submit the task to cluster, we launch the VSCode server.
# Only when user use pyflyte run --remote to submit the task to cluster, we launch the VSCode server.
if not self.enable or ctx.execution_state.is_local_execution():
return self.fn(*args, **kwargs)

Expand All @@ -341,18 +416,34 @@
# 1. Downloads the VSCode server from Internet to local.
download_vscode(self._config)

# 2. Prepare the interactive debugging Python script and launch.json.
# 2. Prepare the interactive debugging Python script.
prepare_interactive_python(self.fn)

# 3. Launches and monitors the VSCode server.
# 3. Prepare the task resumption Python script.
prepare_resume_task_python()

# 4. Prepare the launch.json
prepare_launch_json()

# 5. Launches and monitors the VSCode server.
# Run the function in the background
child_process = multiprocessing.Process(
target=execute_command,
kwargs={"cmd": f"code-server --bind-addr 0.0.0.0:{self.port} --auth none"},
)

child_process.start()
exit_handler(child_process, self.max_idle_seconds, self._post_execute)

# 6. Register the signal handler for task resumption. This should be after creating the subprocess so that the subprocess won't inherit the signal handler.
signal.signal(signal.SIGTERM, resume_task_handler)

return exit_handler(
child_process=child_process,
fn=self.fn,
args=args,
kwargs=kwargs,
max_idle_seconds=self.max_idle_seconds,
post_execute=self._post_execute,
)

def get_extra_config(self):
return {self.LINK_TYPE_KEY: VSCODE_TYPE_VALUE, self.PORT_KEY: str(self.port)}
Loading