Skip to content

Commit

Permalink
Remove provider/channel push/pull of files (#3690)
Browse files Browse the repository at this point in the history
This removes push/pull support from channels.

All the HPC providers used channel.push_file to push their batch scripts
to the (no-longer) remote system. This has been a basically dead code
path since removal of non-remote channels:

The channel is always a LocalChannel now (PR #3677)
The "remote" script directory is always the local script directory (PR
#3688) and so `LocalChannel.push_file` always skips making a copy and
returns the path it was given without further action.

So all the removed code is a no-op, and this PR simplifies that into
nothing.

# Changed Behaviour

Some providers had an option to let users decide if scripts (and other
files) would be pushed/pulled to the remote system. Those options are
removed by this PR.

## Type of change

- New feature
- Code maintenance/cleanup
  • Loading branch information
benclifford authored Nov 26, 2024
1 parent 1c3e509 commit 7c2646e
Show file tree
Hide file tree
Showing 9 changed files with 11 additions and 116 deletions.
28 changes: 0 additions & 28 deletions parsl/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,3 @@ def script_dir(self) -> str:
@script_dir.setter
def script_dir(self, value: str) -> None:
pass

@abstractmethod
def push_file(self, source: str, dest_dir: str) -> str:
''' Channel will take care of moving the file from source to the destination
directory
Args:
source (string) : Full filepath of the file to be moved
dest_dir (string) : Absolute path of the directory to move to
Returns:
destination_path (string)
'''
pass

@abstractmethod
def pull_file(self, remote_source: str, local_dir: str) -> str:
''' Transport file on the remote side to a local directory
Args:
remote_source (string): remote_source
local_dir (string): Local directory to copy to
Returns:
destination_path (string)
'''
pass
36 changes: 0 additions & 36 deletions parsl/channels/local/local.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import logging
import os
import shutil
import subprocess

from parsl.channels.base import Channel
from parsl.channels.errors import FileCopyException
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,40 +55,6 @@ def execute_wait(self, cmd, walltime=None):

return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))

def push_file(self, source, dest_dir):
''' If the source files dirpath is the same as dest_dir, a copy
is not necessary, and nothing is done. Else a copy is made.
Args:
- source (string) : Path to the source file
- dest_dir (string) : Path to the directory to which the files is to be copied
Returns:
- destination_path (String) : Absolute path of the destination file
Raises:
- FileCopyException : If file copy failed.
'''

local_dest = os.path.join(dest_dir, os.path.basename(source))

# Only attempt to copy if the target dir and source dir are different
if os.path.dirname(source) != dest_dir:
try:
shutil.copyfile(source, local_dest)
os.chmod(local_dest, 0o700)

except OSError as e:
raise FileCopyException(e, "localhost")

else:
os.chmod(local_dest, 0o700)

return local_dest

def pull_file(self, remote_source, local_dir):
return self.push_file(remote_source, local_dir)

@property
def script_dir(self):
return self._script_dir
Expand Down
8 changes: 3 additions & 5 deletions parsl/providers/condor/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,14 @@ def submit(self, command, tasks_per_node, job_name="parsl.condor"):
with open(userscript_path, 'w') as f:
f.write(job_config["worker_init"] + '\n' + wrapped_command)

user_script_path = self.channel.push_file(userscript_path, self.channel.script_dir)
the_input_files = [user_script_path] + self.transfer_input_files
the_input_files = [userscript_path] + self.transfer_input_files
job_config["input_files"] = ','.join(the_input_files)
job_config["job_script"] = os.path.basename(user_script_path)
job_config["job_script"] = os.path.basename(userscript_path)

# Construct and move the submit script
self._write_submit_script(template_string, script_path, job_name, job_config)
channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)

cmd = "condor_submit {0}".format(channel_script_path)
cmd = "condor_submit {0}".format(script_path)
try:
retcode, stdout, stderr = self.execute_wait(cmd)
except Exception as e:
Expand Down
5 changes: 2 additions & 3 deletions parsl/providers/grid_engine/grid_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,10 @@ def submit(self, command, tasks_per_node, job_name="parsl.sge"):
logger.debug("Writing submit script")
self._write_submit_script(template_string, script_path, job_name, job_config)

channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)
if self.queue is not None:
cmd = "qsub -q {0} -terse {1}".format(self.queue, channel_script_path)
cmd = "qsub -q {0} -terse {1}".format(self.queue, script_path)
else:
cmd = "qsub -terse {0}".format(channel_script_path)
cmd = "qsub -terse {0}".format(script_path)
retcode, stdout, stderr = self.execute_wait(cmd)

if retcode == 0:
Expand Down
16 changes: 1 addition & 15 deletions parsl/providers/local/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class LocalProvider(ExecutionProvider, RepresentationMixin):
Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive
scaling where as many resources as possible are used; parallelism close to 0 represents
the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
move_files : Optional[Bool]
Should files be moved? By default, Parsl will try to figure this out itself (= None).
If True, then will always move. If False, will never move.
worker_init : str
Command to be run before starting a worker, such as 'module load Anaconda; source activate env'.
"""
Expand All @@ -48,8 +45,7 @@ def __init__(self,
max_blocks=1,
worker_init='',
cmd_timeout=30,
parallelism=1,
move_files=None):
parallelism=1):
self.channel = channel
self._label = 'local'
self.nodes_per_block = nodes_per_block
Expand All @@ -61,7 +57,6 @@ def __init__(self,
self.parallelism = parallelism
self.script_dir = None
self.cmd_timeout = cmd_timeout
self.move_files = move_files

# Dictionary that keeps track of jobs, keyed on job_id
self.resources = {}
Expand All @@ -83,7 +78,6 @@ def status(self, job_ids):
if job_dict['status'] and job_dict['status'].terminal:
# We already checked this and it can't change after that
continue
# Script path should point to remote path if _should_move_files() is True
script_path = job_dict['script_path']

alive = self._is_alive(job_dict)
Expand Down Expand Up @@ -137,8 +131,6 @@ def _is_alive(self, job_dict):

def _job_file_path(self, script_path: str, suffix: str) -> str:
path = '{0}{1}'.format(script_path, suffix)
if self._should_move_files():
path = self.channel.pull_file(path, self.script_dir)
return path

def _read_job_file(self, script_path: str, suffix: str) -> str:
Expand Down Expand Up @@ -216,9 +208,6 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"):

job_id = None
remote_pid = None
if self._should_move_files():
logger.debug("Pushing start script")
script_path = self.channel.push_file(script_path, self.channel.script_dir)

logger.debug("Launching")
# We need to capture the exit code and the streams, so we put them in files. We also write
Expand Down Expand Up @@ -254,9 +243,6 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"):

return job_id

def _should_move_files(self):
return (self.move_files is None and not isinstance(self.channel, LocalChannel)) or (self.move_files)

def cancel(self, job_ids):
''' Cancels the jobs specified by a list of job ids
Expand Down
14 changes: 2 additions & 12 deletions parsl/providers/lsf/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class LSFProvider(ClusterProvider, RepresentationMixin):
:class:`~parsl.launchers.SingleNodeLauncher` (the default),
:class:`~parsl.launchers.SrunLauncher`, or
:class:`~parsl.launchers.AprunLauncher`
move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files.
bsub_redirection: Bool
Should a redirection symbol "<" be included when submitting jobs, i.e., Bsub < job_script.
request_by_nodes: Bool
Expand All @@ -92,7 +91,6 @@ def __init__(self,
project=None,
queue=None,
cmd_timeout=120,
move_files=True,
bsub_redirection=False,
request_by_nodes=True,
launcher=SingleNodeLauncher()):
Expand All @@ -112,7 +110,6 @@ def __init__(self,
self.queue = queue
self.cores_per_block = cores_per_block
self.cores_per_node = cores_per_node
self.move_files = move_files
self.bsub_redirection = bsub_redirection
self.request_by_nodes = request_by_nodes

Expand Down Expand Up @@ -230,17 +227,10 @@ def submit(self, command, tasks_per_node, job_name="parsl.lsf"):
logger.debug("Writing submit script")
self._write_submit_script(template_string, script_path, job_name, job_config)

if self.move_files:
logger.debug("moving files")
channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)
else:
logger.debug("not moving files")
channel_script_path = script_path

if self.bsub_redirection:
cmd = "bsub < {0}".format(channel_script_path)
cmd = "bsub < {0}".format(script_path)
else:
cmd = "bsub {0}".format(channel_script_path)
cmd = "bsub {0}".format(script_path)
retcode, stdout, stderr = super().execute_wait(cmd)

job_id = None
Expand Down
4 changes: 1 addition & 3 deletions parsl/providers/pbspro/pbspro.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,13 @@ def submit(self, command, tasks_per_node, job_name="parsl"):
logger.debug("Writing submit script")
self._write_submit_script(self.template_string, script_path, job_name, job_config)

channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)

submit_options = ''
if self.queue is not None:
submit_options = '{0} -q {1}'.format(submit_options, self.queue)
if self.account is not None:
submit_options = '{0} -A {1}'.format(submit_options, self.account)

launch_cmd = "qsub {0} {1}".format(submit_options, channel_script_path)
launch_cmd = "qsub {0} {1}".format(submit_options, script_path)
retcode, stdout, stderr = self.execute_wait(launch_cmd)

job_id = None
Expand Down
12 changes: 1 addition & 11 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class SlurmProvider(ClusterProvider, RepresentationMixin):
:class:`~parsl.launchers.SingleNodeLauncher` (the default),
:class:`~parsl.launchers.SrunLauncher`, or
:class:`~parsl.launchers.AprunLauncher`
move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files.
"""

@typeguard.typechecked
Expand All @@ -134,7 +133,6 @@ def __init__(self,
worker_init: str = '',
cmd_timeout: int = 10,
exclusive: bool = True,
move_files: bool = True,
launcher: Launcher = SingleNodeLauncher()):
label = 'slurm'
super().__init__(label,
Expand All @@ -152,7 +150,6 @@ def __init__(self,
self.cores_per_node = cores_per_node
self.mem_per_node = mem_per_node
self.exclusive = exclusive
self.move_files = move_files
self.account = account
self.qos = qos
self.constraint = constraint
Expand Down Expand Up @@ -308,14 +305,7 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s
logger.debug("Writing submit script")
self._write_submit_script(template_string, script_path, job_name, job_config)

if self.move_files:
logger.debug("moving files")
channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)
else:
logger.debug("not moving files")
channel_script_path = script_path

retcode, stdout, stderr = self.execute_wait("sbatch {0}".format(channel_script_path))
retcode, stdout, stderr = self.execute_wait("sbatch {0}".format(script_path))

if retcode == 0:
for line in stdout.split('\n'):
Expand Down
4 changes: 1 addition & 3 deletions parsl/providers/torque/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,13 @@ def submit(self, command, tasks_per_node, job_name="parsl.torque"):
logger.debug("Writing submit script")
self._write_submit_script(self.template_string, script_path, job_name, job_config)

channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)

submit_options = ''
if self.queue is not None:
submit_options = '{0} -q {1}'.format(submit_options, self.queue)
if self.account is not None:
submit_options = '{0} -A {1}'.format(submit_options, self.account)

launch_cmd = "qsub {0} {1}".format(submit_options, channel_script_path)
launch_cmd = "qsub {0} {1}".format(submit_options, script_path)
retcode, stdout, stderr = self.execute_wait(launch_cmd)

job_id = None
Expand Down

0 comments on commit 7c2646e

Please sign in to comment.