Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove provider/channel push/pull of files #3690

Merged
merged 9 commits into from
Nov 26, 2024
28 changes: 0 additions & 28 deletions parsl/channels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,3 @@ def script_dir(self) -> str:
@script_dir.setter
def script_dir(self, value: str) -> None:
pass

@abstractmethod
def push_file(self, source: str, dest_dir: str) -> str:
''' Channel will take care of moving the file from source to the destination
directory

Args:
source (string) : Full filepath of the file to be moved
dest_dir (string) : Absolute path of the directory to move to

Returns:
destination_path (string)
'''
pass

@abstractmethod
def pull_file(self, remote_source: str, local_dir: str) -> str:
''' Transport file on the remote side to a local directory

Args:
remote_source (string): remote_source
local_dir (string): Local directory to copy to


Returns:
destination_path (string)
'''
pass
36 changes: 0 additions & 36 deletions parsl/channels/local/local.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import logging
import os
import shutil
import subprocess

from parsl.channels.base import Channel
from parsl.channels.errors import FileCopyException
from parsl.utils import RepresentationMixin

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,40 +55,6 @@ def execute_wait(self, cmd, walltime=None):

return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))

def push_file(self, source, dest_dir):
''' If the source files dirpath is the same as dest_dir, a copy
is not necessary, and nothing is done. Else a copy is made.

Args:
- source (string) : Path to the source file
- dest_dir (string) : Path to the directory to which the files is to be copied

Returns:
- destination_path (String) : Absolute path of the destination file

Raises:
- FileCopyException : If file copy failed.
'''

local_dest = os.path.join(dest_dir, os.path.basename(source))

# Only attempt to copy if the target dir and source dir are different
if os.path.dirname(source) != dest_dir:
try:
shutil.copyfile(source, local_dest)
os.chmod(local_dest, 0o700)

except OSError as e:
raise FileCopyException(e, "localhost")

else:
os.chmod(local_dest, 0o700)

return local_dest

def pull_file(self, remote_source, local_dir):
return self.push_file(remote_source, local_dir)

@property
def script_dir(self):
return self._script_dir
Expand Down
8 changes: 3 additions & 5 deletions parsl/providers/condor/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,14 @@ def submit(self, command, tasks_per_node, job_name="parsl.condor"):
with open(userscript_path, 'w') as f:
f.write(job_config["worker_init"] + '\n' + wrapped_command)

user_script_path = self.channel.push_file(userscript_path, self.channel.script_dir)
the_input_files = [user_script_path] + self.transfer_input_files
the_input_files = [userscript_path] + self.transfer_input_files
job_config["input_files"] = ','.join(the_input_files)
job_config["job_script"] = os.path.basename(user_script_path)
job_config["job_script"] = os.path.basename(userscript_path)

# Construct and move the submit script
self._write_submit_script(template_string, script_path, job_name, job_config)
channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)

cmd = "condor_submit {0}".format(channel_script_path)
cmd = "condor_submit {0}".format(script_path)
try:
retcode, stdout, stderr = self.execute_wait(cmd)
except Exception as e:
Expand Down
5 changes: 2 additions & 3 deletions parsl/providers/grid_engine/grid_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,10 @@ def submit(self, command, tasks_per_node, job_name="parsl.sge"):
logger.debug("Writing submit script")
self._write_submit_script(template_string, script_path, job_name, job_config)

channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)
if self.queue is not None:
cmd = "qsub -q {0} -terse {1}".format(self.queue, channel_script_path)
cmd = "qsub -q {0} -terse {1}".format(self.queue, script_path)
else:
cmd = "qsub -terse {0}".format(channel_script_path)
cmd = "qsub -terse {0}".format(script_path)
retcode, stdout, stderr = self.execute_wait(cmd)

if retcode == 0:
Expand Down
16 changes: 1 addition & 15 deletions parsl/providers/local/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ class LocalProvider(ExecutionProvider, RepresentationMixin):
Ratio of provisioned task slots to active tasks. A parallelism value of 1 represents aggressive
scaling where as many resources as possible are used; parallelism close to 0 represents
the opposite situation in which as few resources as possible (i.e., min_blocks) are used.
move_files : Optional[Bool]
Should files be moved? By default, Parsl will try to figure this out itself (= None).
If True, then will always move. If False, will never move.
worker_init : str
Command to be run before starting a worker, such as 'module load Anaconda; source activate env'.
"""
Expand All @@ -48,8 +45,7 @@ def __init__(self,
max_blocks=1,
worker_init='',
cmd_timeout=30,
parallelism=1,
move_files=None):
parallelism=1):
self.channel = channel
self._label = 'local'
self.nodes_per_block = nodes_per_block
Expand All @@ -61,7 +57,6 @@ def __init__(self,
self.parallelism = parallelism
self.script_dir = None
self.cmd_timeout = cmd_timeout
self.move_files = move_files

# Dictionary that keeps track of jobs, keyed on job_id
self.resources = {}
Expand All @@ -83,7 +78,6 @@ def status(self, job_ids):
if job_dict['status'] and job_dict['status'].terminal:
# We already checked this and it can't change after that
continue
# Script path should point to remote path if _should_move_files() is True
script_path = job_dict['script_path']

alive = self._is_alive(job_dict)
Expand Down Expand Up @@ -137,8 +131,6 @@ def _is_alive(self, job_dict):

def _job_file_path(self, script_path: str, suffix: str) -> str:
path = '{0}{1}'.format(script_path, suffix)
if self._should_move_files():
path = self.channel.pull_file(path, self.script_dir)
return path

def _read_job_file(self, script_path: str, suffix: str) -> str:
Expand Down Expand Up @@ -216,9 +208,6 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"):

job_id = None
remote_pid = None
if self._should_move_files():
logger.debug("Pushing start script")
script_path = self.channel.push_file(script_path, self.channel.script_dir)

logger.debug("Launching")
# We need to capture the exit code and the streams, so we put them in files. We also write
Expand Down Expand Up @@ -254,9 +243,6 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"):

return job_id

def _should_move_files(self):
return (self.move_files is None and not isinstance(self.channel, LocalChannel)) or (self.move_files)

def cancel(self, job_ids):
''' Cancels the jobs specified by a list of job ids

Expand Down
14 changes: 2 additions & 12 deletions parsl/providers/lsf/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class LSFProvider(ClusterProvider, RepresentationMixin):
:class:`~parsl.launchers.SingleNodeLauncher` (the default),
:class:`~parsl.launchers.SrunLauncher`, or
:class:`~parsl.launchers.AprunLauncher`
move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files.
bsub_redirection: Bool
Should a redirection symbol "<" be included when submitting jobs, i.e., Bsub < job_script.
request_by_nodes: Bool
Expand All @@ -92,7 +91,6 @@ def __init__(self,
project=None,
queue=None,
cmd_timeout=120,
move_files=True,
bsub_redirection=False,
request_by_nodes=True,
launcher=SingleNodeLauncher()):
Expand All @@ -112,7 +110,6 @@ def __init__(self,
self.queue = queue
self.cores_per_block = cores_per_block
self.cores_per_node = cores_per_node
self.move_files = move_files
self.bsub_redirection = bsub_redirection
self.request_by_nodes = request_by_nodes

Expand Down Expand Up @@ -230,17 +227,10 @@ def submit(self, command, tasks_per_node, job_name="parsl.lsf"):
logger.debug("Writing submit script")
self._write_submit_script(template_string, script_path, job_name, job_config)

if self.move_files:
logger.debug("moving files")
channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)
else:
logger.debug("not moving files")
channel_script_path = script_path

if self.bsub_redirection:
cmd = "bsub < {0}".format(channel_script_path)
cmd = "bsub < {0}".format(script_path)
else:
cmd = "bsub {0}".format(channel_script_path)
cmd = "bsub {0}".format(script_path)
retcode, stdout, stderr = super().execute_wait(cmd)

job_id = None
Expand Down
4 changes: 1 addition & 3 deletions parsl/providers/pbspro/pbspro.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,13 @@ def submit(self, command, tasks_per_node, job_name="parsl"):
logger.debug("Writing submit script")
self._write_submit_script(self.template_string, script_path, job_name, job_config)

channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)

submit_options = ''
if self.queue is not None:
submit_options = '{0} -q {1}'.format(submit_options, self.queue)
if self.account is not None:
submit_options = '{0} -A {1}'.format(submit_options, self.account)

launch_cmd = "qsub {0} {1}".format(submit_options, channel_script_path)
launch_cmd = "qsub {0} {1}".format(submit_options, script_path)
retcode, stdout, stderr = self.execute_wait(launch_cmd)

job_id = None
Expand Down
12 changes: 1 addition & 11 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class SlurmProvider(ClusterProvider, RepresentationMixin):
:class:`~parsl.launchers.SingleNodeLauncher` (the default),
:class:`~parsl.launchers.SrunLauncher`, or
:class:`~parsl.launchers.AprunLauncher`
move_files : Optional[Bool]: should files be moved? by default, Parsl will try to move files.
"""

@typeguard.typechecked
Expand All @@ -134,7 +133,6 @@ def __init__(self,
worker_init: str = '',
cmd_timeout: int = 10,
exclusive: bool = True,
move_files: bool = True,
launcher: Launcher = SingleNodeLauncher()):
label = 'slurm'
super().__init__(label,
Expand All @@ -152,7 +150,6 @@ def __init__(self,
self.cores_per_node = cores_per_node
self.mem_per_node = mem_per_node
self.exclusive = exclusive
self.move_files = move_files
self.account = account
self.qos = qos
self.constraint = constraint
Expand Down Expand Up @@ -308,14 +305,7 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s
logger.debug("Writing submit script")
self._write_submit_script(template_string, script_path, job_name, job_config)

if self.move_files:
logger.debug("moving files")
channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)
else:
logger.debug("not moving files")
channel_script_path = script_path

retcode, stdout, stderr = self.execute_wait("sbatch {0}".format(channel_script_path))
retcode, stdout, stderr = self.execute_wait("sbatch {0}".format(script_path))

if retcode == 0:
for line in stdout.split('\n'):
Expand Down
4 changes: 1 addition & 3 deletions parsl/providers/torque/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,13 @@ def submit(self, command, tasks_per_node, job_name="parsl.torque"):
logger.debug("Writing submit script")
self._write_submit_script(self.template_string, script_path, job_name, job_config)

channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)

submit_options = ''
if self.queue is not None:
submit_options = '{0} -q {1}'.format(submit_options, self.queue)
if self.account is not None:
submit_options = '{0} -A {1}'.format(submit_options, self.account)

launch_cmd = "qsub {0} {1}".format(submit_options, channel_script_path)
launch_cmd = "qsub {0} {1}".format(submit_options, script_path)
retcode, stdout, stderr = self.execute_wait(launch_cmd)

job_id = None
Expand Down
Loading