From 50c282c7cacac2d2cd7a24d0ec03dffff5330678 Mon Sep 17 00:00:00 2001 From: Harry Ankers Date: Mon, 30 Sep 2024 18:56:00 +0100 Subject: [PATCH 1/2] Feat: Add the option to use shell with the metaflow python Runner --- metaflow/runner/metaflow_runner.py | 8 +++++ metaflow/runner/subprocess_manager.py | 42 ++++++++++++++++++++------- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/metaflow/runner/metaflow_runner.py b/metaflow/runner/metaflow_runner.py index 161d1706d62..cd3acb4fbf8 100644 --- a/metaflow/runner/metaflow_runner.py +++ b/metaflow/runner/metaflow_runner.py @@ -218,6 +218,9 @@ class Runner(object): directory is used. file_read_timeout : int, default 3600 The timeout until which we try to read the runner attribute file. + shell : bool, default False + If True, the runner will execute commands directly through the shell. + Forwarding commands to shell can provide a performance improvement. **kwargs : Any Additional arguments that you would pass to `python myflow.py` before the `run` command. @@ -231,6 +234,7 @@ def __init__( env: Optional[Dict] = None, cwd: Optional[str] = None, file_read_timeout: int = 3600, + shell: bool = False, **kwargs ): # these imports are required here and not at the top @@ -261,6 +265,8 @@ def __init__( self.top_level_kwargs = kwargs self.api = MetaflowAPI.from_cli(self.flow_file, start) + self.shell = shell + def __enter__(self) -> "Runner": return self @@ -320,6 +326,7 @@ def run(self, **kwargs) -> ExecutingRun: env=self.env_vars, cwd=self.cwd, show_output=self.show_output, + shell=self.shell, ) command_obj = self.spm.get(pid) @@ -354,6 +361,7 @@ def resume(self, **kwargs): env=self.env_vars, cwd=self.cwd, show_output=self.show_output, + shell=self.shell, ) command_obj = self.spm.get(pid) diff --git a/metaflow/runner/subprocess_manager.py b/metaflow/runner/subprocess_manager.py index c8016244ea0..ee47b089aef 100644 --- a/metaflow/runner/subprocess_manager.py +++ b/metaflow/runner/subprocess_manager.py @@ -1,5 +1,6 @@ import asyncio import os +import shlex import time import shutil import signal @@ -67,6 +68,7 @@ def run_command( env: Optional[Dict[str, str]] = None, cwd: Optional[str] = None, show_output: bool = False, + shell: bool = False, ) -> int: """ Run a command synchronously and return its process ID. @@ -87,6 +89,9 @@ def run_command( CommandManager object: - command_obj.log_files["stdout"] - command_obj.log_files["stderr"] + shell : bool, default False + Whether to run the command in a shell or not. + Forwarding the command to shell can provide a performance improvement. Returns ------- int @@ -94,7 +99,7 @@ def run_command( """ command_obj = CommandManager(command, env, cwd) - pid = command_obj.run(show_output=show_output) + pid = command_obj.run(show_output=show_output, shell=shell) self.commands[pid] = command_obj command_obj.sync_wait() return pid @@ -245,7 +250,7 @@ def sync_wait(self): self.stdout_thread.join() self.stderr_thread.join() - def run(self, show_output: bool = False): + def run(self, show_output: bool = False, shell=False): """ Run the subprocess synchronously. This can only be called once. @@ -258,6 +263,9 @@ def run(self, show_output: bool = False): They can be accessed later by reading the files present in: - self.log_files["stdout"] - self.log_files["stderr"] + shell : bool, default False + Whether to run the command in a shell or not. + Forwarding the command to shell can provide a performance improvement. """ if not self.run_called: @@ -274,15 +282,27 @@ def stream_to_stdout_and_file(pipe, log_file): pipe.close() try: - self.process = subprocess.Popen( - self.command, - cwd=self.cwd, - env=self.env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=1, - universal_newlines=True, - ) + if shell: + self.process = subprocess.Popen( + shlex.join(self.command), + cwd=self.cwd, + env=self.env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=1, + universal_newlines=True, + shell=True, + ) + else: + self.process = subprocess.Popen( + self.command, + cwd=self.cwd, + env=self.env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=1, + universal_newlines=True, + ) self.log_files["stdout"] = stdout_logfile self.log_files["stderr"] = stderr_logfile From e244d151d8c9ee2e344848b168abbc9f0c5f3acd Mon Sep 17 00:00:00 2001 From: Harry Ankers Date: Mon, 30 Sep 2024 19:05:08 +0100 Subject: [PATCH 2/2] empty-commit for pre-commit