diff --git a/parsl/channels/base.py b/parsl/channels/base.py index ee0097f0d0..05241b878d 100644 --- a/parsl/channels/base.py +++ b/parsl/channels/base.py @@ -52,31 +52,3 @@ def script_dir(self) -> str: @script_dir.setter def script_dir(self, value: str) -> None: pass - - @abstractmethod - def push_file(self, source: str, dest_dir: str) -> str: - ''' Channel will take care of moving the file from source to the destination - directory - - Args: - source (string) : Full filepath of the file to be moved - dest_dir (string) : Absolute path of the directory to move to - - Returns: - destination_path (string) - ''' - pass - - @abstractmethod - def pull_file(self, remote_source: str, local_dir: str) -> str: - ''' Transport file on the remote side to a local directory - - Args: - remote_source (string): remote_source - local_dir (string): Local directory to copy to - - - Returns: - destination_path (string) - ''' - pass diff --git a/parsl/channels/local/local.py b/parsl/channels/local/local.py index 1d7a15dfdf..6ef014ac19 100644 --- a/parsl/channels/local/local.py +++ b/parsl/channels/local/local.py @@ -1,10 +1,8 @@ import logging import os -import shutil import subprocess from parsl.channels.base import Channel -from parsl.channels.errors import FileCopyException from parsl.utils import RepresentationMixin logger = logging.getLogger(__name__) @@ -57,40 +55,6 @@ def execute_wait(self, cmd, walltime=None): return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) - def push_file(self, source, dest_dir): - ''' If the source files dirpath is the same as dest_dir, a copy - is not necessary, and nothing is done. Else a copy is made. - - Args: - - source (string) : Path to the source file - - dest_dir (string) : Path to the directory to which the files is to be copied - - Returns: - - destination_path (String) : Absolute path of the destination file - - Raises: - - FileCopyException : If file copy failed. - ''' - - local_dest = os.path.join(dest_dir, os.path.basename(source)) - - # Only attempt to copy if the target dir and source dir are different - if os.path.dirname(source) != dest_dir: - try: - shutil.copyfile(source, local_dest) - os.chmod(local_dest, 0o700) - - except OSError as e: - raise FileCopyException(e, "localhost") - - else: - os.chmod(local_dest, 0o700) - - return local_dest - - def pull_file(self, remote_source, local_dir): - return self.push_file(remote_source, local_dir) - @property def script_dir(self): return self._script_dir diff --git a/parsl/providers/condor/condor.py b/parsl/providers/condor/condor.py index a736386d38..c8142c4026 100644 --- a/parsl/providers/condor/condor.py +++ b/parsl/providers/condor/condor.py @@ -245,16 +245,14 @@ def submit(self, command, tasks_per_node, job_name="parsl.condor"): with open(userscript_path, 'w') as f: f.write(job_config["worker_init"] + '\n' + wrapped_command) - user_script_path = self.channel.push_file(userscript_path, self.channel.script_dir) - the_input_files = [user_script_path] + self.transfer_input_files + the_input_files = [userscript_path] + self.transfer_input_files job_config["input_files"] = ','.join(the_input_files) - job_config["job_script"] = os.path.basename(user_script_path) + job_config["job_script"] = os.path.basename(userscript_path) # Construct and move the submit script self._write_submit_script(template_string, script_path, job_name, job_config) - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - cmd = "condor_submit {0}".format(channel_script_path) + cmd = "condor_submit {0}".format(script_path) try: retcode, stdout, stderr = self.execute_wait(cmd) except Exception as e: diff --git a/parsl/providers/grid_engine/grid_engine.py b/parsl/providers/grid_engine/grid_engine.py index e7db987022..ddedcaa3e8 100644 --- a/parsl/providers/grid_engine/grid_engine.py +++ b/parsl/providers/grid_engine/grid_engine.py @@ -142,11 +142,10 @@ def submit(self, command, tasks_per_node, job_name="parsl.sge"): logger.debug("Writing submit script") self._write_submit_script(template_string, script_path, job_name, job_config) - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) if self.queue is not None: - cmd = "qsub -q {0} -terse {1}".format(self.queue, channel_script_path) + cmd = "qsub -q {0} -terse {1}".format(self.queue, script_path) else: - cmd = "qsub -terse {0}".format(channel_script_path) + cmd = "qsub -terse {0}".format(script_path) retcode, stdout, stderr = self.execute_wait(cmd) if retcode == 0: diff --git a/parsl/providers/local/local.py b/parsl/providers/local/local.py index f13521466a..5ecf174df2 100644 --- a/parsl/providers/local/local.py +++ b/parsl/providers/local/local.py @@ -32,9 +32,6 @@ class LocalProvider(ExecutionProvider, RepresentationMixin): Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive scaling where as many resources as possible are used; parallelism close to 0 represents the opposite situation in which as few resources as possible (i.e., min_blocks) are used. - move_files : Optional[Bool] - Should files be moved? By default, Parsl will try to figure this out itself (= None). - If True, then will always move. If False, will never move. worker_init : str Command to be run before starting a worker, such as 'module load Anaconda; source activate env'. """ @@ -48,8 +45,7 @@ def __init__(self, max_blocks=1, worker_init='', cmd_timeout=30, - parallelism=1, - move_files=None): + parallelism=1): self.channel = channel self._label = 'local' self.nodes_per_block = nodes_per_block @@ -61,7 +57,6 @@ def __init__(self, self.parallelism = parallelism self.script_dir = None self.cmd_timeout = cmd_timeout - self.move_files = move_files # Dictionary that keeps track of jobs, keyed on job_id self.resources = {} @@ -83,7 +78,6 @@ def status(self, job_ids): if job_dict['status'] and job_dict['status'].terminal: # We already checked this and it can't change after that continue - # Script path should point to remote path if _should_move_files() is True script_path = job_dict['script_path'] alive = self._is_alive(job_dict) @@ -137,8 +131,6 @@ def _is_alive(self, job_dict): def _job_file_path(self, script_path: str, suffix: str) -> str: path = '{0}{1}'.format(script_path, suffix) - if self._should_move_files(): - path = self.channel.pull_file(path, self.script_dir) return path def _read_job_file(self, script_path: str, suffix: str) -> str: @@ -216,9 +208,6 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"): job_id = None remote_pid = None - if self._should_move_files(): - logger.debug("Pushing start script") - script_path = self.channel.push_file(script_path, self.channel.script_dir) logger.debug("Launching") # We need to capture the exit code and the streams, so we put them in files. We also write @@ -254,9 +243,6 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"): return job_id - def _should_move_files(self): - return (self.move_files is None and not isinstance(self.channel, LocalChannel)) or (self.move_files) - def cancel(self, job_ids): ''' Cancels the jobs specified by a list of job ids diff --git a/parsl/providers/lsf/lsf.py b/parsl/providers/lsf/lsf.py index 8f18f5c879..b446b063a4 100644 --- a/parsl/providers/lsf/lsf.py +++ b/parsl/providers/lsf/lsf.py @@ -68,7 +68,6 @@ class LSFProvider(ClusterProvider, RepresentationMixin): :class:`~parsl.launchers.SingleNodeLauncher` (the default), :class:`~parsl.launchers.SrunLauncher`, or :class:`~parsl.launchers.AprunLauncher` - move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files. bsub_redirection: Bool Should a redirection symbol "<" be included when submitting jobs, i.e., Bsub < job_script. request_by_nodes: Bool @@ -92,7 +91,6 @@ def __init__(self, project=None, queue=None, cmd_timeout=120, - move_files=True, bsub_redirection=False, request_by_nodes=True, launcher=SingleNodeLauncher()): @@ -112,7 +110,6 @@ def __init__(self, self.queue = queue self.cores_per_block = cores_per_block self.cores_per_node = cores_per_node - self.move_files = move_files self.bsub_redirection = bsub_redirection self.request_by_nodes = request_by_nodes @@ -230,17 +227,10 @@ def submit(self, command, tasks_per_node, job_name="parsl.lsf"): logger.debug("Writing submit script") self._write_submit_script(template_string, script_path, job_name, job_config) - if self.move_files: - logger.debug("moving files") - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - else: - logger.debug("not moving files") - channel_script_path = script_path - if self.bsub_redirection: - cmd = "bsub < {0}".format(channel_script_path) + cmd = "bsub < {0}".format(script_path) else: - cmd = "bsub {0}".format(channel_script_path) + cmd = "bsub {0}".format(script_path) retcode, stdout, stderr = super().execute_wait(cmd) job_id = None diff --git a/parsl/providers/pbspro/pbspro.py b/parsl/providers/pbspro/pbspro.py index 752f504334..71c958f000 100644 --- a/parsl/providers/pbspro/pbspro.py +++ b/parsl/providers/pbspro/pbspro.py @@ -183,15 +183,13 @@ def submit(self, command, tasks_per_node, job_name="parsl"): logger.debug("Writing submit script") self._write_submit_script(self.template_string, script_path, job_name, job_config) - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - submit_options = '' if self.queue is not None: submit_options = '{0} -q {1}'.format(submit_options, self.queue) if self.account is not None: submit_options = '{0} -A {1}'.format(submit_options, self.account) - launch_cmd = "qsub {0} {1}".format(submit_options, channel_script_path) + launch_cmd = "qsub {0} {1}".format(submit_options, script_path) retcode, stdout, stderr = self.execute_wait(launch_cmd) job_id = None diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index d201c2f745..9b6f38b9d9 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -110,7 +110,6 @@ class SlurmProvider(ClusterProvider, RepresentationMixin): :class:`~parsl.launchers.SingleNodeLauncher` (the default), :class:`~parsl.launchers.SrunLauncher`, or :class:`~parsl.launchers.AprunLauncher` - move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files. """ @typeguard.typechecked @@ -134,7 +133,6 @@ def __init__(self, worker_init: str = '', cmd_timeout: int = 10, exclusive: bool = True, - move_files: bool = True, launcher: Launcher = SingleNodeLauncher()): label = 'slurm' super().__init__(label, @@ -152,7 +150,6 @@ def __init__(self, self.cores_per_node = cores_per_node self.mem_per_node = mem_per_node self.exclusive = exclusive - self.move_files = move_files self.account = account self.qos = qos self.constraint = constraint @@ -308,14 +305,7 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s logger.debug("Writing submit script") self._write_submit_script(template_string, script_path, job_name, job_config) - if self.move_files: - logger.debug("moving files") - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - else: - logger.debug("not moving files") - channel_script_path = script_path - - retcode, stdout, stderr = self.execute_wait("sbatch {0}".format(channel_script_path)) + retcode, stdout, stderr = self.execute_wait("sbatch {0}".format(script_path)) if retcode == 0: for line in stdout.split('\n'): diff --git a/parsl/providers/torque/torque.py b/parsl/providers/torque/torque.py index c15591706c..7992893abb 100644 --- a/parsl/providers/torque/torque.py +++ b/parsl/providers/torque/torque.py @@ -189,15 +189,13 @@ def submit(self, command, tasks_per_node, job_name="parsl.torque"): logger.debug("Writing submit script") self._write_submit_script(self.template_string, script_path, job_name, job_config) - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - submit_options = '' if self.queue is not None: submit_options = '{0} -q {1}'.format(submit_options, self.queue) if self.account is not None: submit_options = '{0} -A {1}'.format(submit_options, self.account) - launch_cmd = "qsub {0} {1}".format(submit_options, channel_script_path) + launch_cmd = "qsub {0} {1}".format(submit_options, script_path) retcode, stdout, stderr = self.execute_wait(launch_cmd) job_id = None