diff --git a/lib/galaxy/job_execution/setup.py b/lib/galaxy/job_execution/setup.py index c511df77bec1..48af3aa1c9fe 100644 --- a/lib/galaxy/job_execution/setup.py +++ b/lib/galaxy/job_execution/setup.py @@ -9,3 +9,11 @@ def ensure_configs_directory(work_dir): if not os.path.exists(configs_dir): safe_makedirs(configs_dir) return configs_dir + + +def create_working_directory_for_job(object_store, job): + object_store.create( + job, base_dir='job_work', dir_only=True, obj_dir=True) + working_directory = object_store.get_filename( + job, base_dir='job_work', dir_only=True, obj_dir=True) + return working_directory diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index dd061c10abb6..fb46d84a86ff 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -40,7 +40,10 @@ TaskPathRewriter ) from galaxy.job_execution.output_collect import collect_extra_files -from galaxy.job_execution.setup import ensure_configs_directory +from galaxy.job_execution.setup import ( + create_working_directory_for_job, + ensure_configs_directory, +) from galaxy.jobs.actions.post import ActionBox from galaxy.jobs.mapper import ( JobMappingException, @@ -1190,11 +1193,7 @@ def tool_working_directory(self): return os.path.join(self.working_directory, "working") def _create_working_directory(self, job): - self.object_store.create( - job, base_dir='job_work', dir_only=True, obj_dir=True) - working_directory = self.object_store.get_filename( - job, base_dir='job_work', dir_only=True, obj_dir=True) - return working_directory + return create_working_directory_for_job(self.object_store, job) def clear_working_directory(self): job = self.get_job() diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index edb4ed85961a..fc719e181d1f 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1585,6 +1585,8 @@ def __eq__(self, other): class JobExportHistoryArchive(RepresentById): + ATTRS_FILENAME_HISTORY = 'history_attrs.txt' + def __init__(self, job=None, history=None, dataset=None, compressed=False, history_attrs_filename=None): self.job = job @@ -1627,6 +1629,31 @@ def export_name(self): hname += ".gz" return hname + @staticmethod + def create_for_history(history, job, sa_session, object_store, compressed): + # Create dataset that will serve as archive. + archive_dataset = Dataset() + sa_session.add(archive_dataset) + sa_session.flush() # ensure job.id and archive_dataset.id are available + object_store.create(archive_dataset) # set the object store id, create dataset (if applicable) + # Add association for keeping track of job, history, archive relationship. + jeha = JobExportHistoryArchive( + job=job, history=history, + dataset=archive_dataset, + compressed=compressed + ) + sa_session.add(jeha) + + # + # Create attributes/metadata files for export. + # + jeha.dataset.create_extra_files_path() + temp_output_dir = jeha.dataset.extra_files_path + + history_attrs_filename = os.path.join(temp_output_dir, jeha.ATTRS_FILENAME_HISTORY) + jeha.history_attrs_filename = history_attrs_filename + return jeha + class JobImportHistoryArchive(RepresentById): def __init__(self, job=None, history=None, archive_dir=None): diff --git a/lib/galaxy/tools/actions/history_imp_exp.py b/lib/galaxy/tools/actions/history_imp_exp.py index bed203969695..fc4eaef23dd4 100644 --- a/lib/galaxy/tools/actions/history_imp_exp.py +++ b/lib/galaxy/tools/actions/history_imp_exp.py @@ -1,13 +1,16 @@ +import datetime import logging import os import tempfile from collections import OrderedDict +from galaxy.job_execution.setup import create_working_directory_for_job from galaxy.tools.actions import ToolAction from galaxy.tools.imp_exp import ( JobExportHistoryArchiveWrapper, JobImportHistoryArchiveWrapper ) +from galaxy.util import ready_name_for_url log = logging.getLogger(__name__) @@ -99,10 +102,7 @@ def execute(self, tool, trans, incoming={}, set_output_hid=False, overwrite=True job.galaxy_version = trans.app.config.version_major session = trans.get_galaxy_session() job.session_id = session and session.id - if history: - history_id = history.id - else: - history_id = trans.history.id + history_id = history.id job.history_id = history_id job.tool_id = tool.id if trans.user: @@ -112,26 +112,32 @@ def execute(self, tool, trans, incoming={}, set_output_hid=False, overwrite=True job.state = job.states.WAITING # we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters trans.sa_session.add(job) - # Create dataset that will serve as archive. - archive_dataset = trans.app.model.Dataset() - trans.sa_session.add(archive_dataset) - - trans.sa_session.flush() # ensure job.id and archive_dataset.id are available - trans.app.object_store.create(archive_dataset) # set the object store id, create dataset (if applicable) + compressed = incoming['compress'] + exporting_to_uri = "directory_uri" in incoming + if not exporting_to_uri: + # see comment below about how this should be transitioned to occuring in a + # job handler or detached MQ-driven thread + jeha = trans.app.model.JobExportHistoryArchive.create_for_history( + history, job, trans.sa_session, trans.app.object_store, compressed + ) + store_directory = jeha.temp_directory + else: + # creating a job directory in the web thread is bad (it is slow, bypasses + # dynamic objectstore assignment, etc..) but it is arguably less bad than + # creating a dataset (like above for dataset export case). + # ensure job.id is available + trans.sa_session.flush() + job_directory = create_working_directory_for_job(trans.app.object_store, job) + store_directory = os.path.join(job_directory, "working", "_object_export") + os.makedirs(store_directory) # # Setup job and job wrapper. # - - # Add association for keeping track of job, history, archive relationship. - jeha = trans.app.model.JobExportHistoryArchive(job=job, history=history, - dataset=archive_dataset, - compressed=incoming['compress']) - trans.sa_session.add(jeha) - job_wrapper = JobExportHistoryArchiveWrapper(trans.app, job) - cmd_line = job_wrapper.setup_job(jeha, include_hidden=incoming['include_hidden'], - include_deleted=incoming['include_deleted']) + cmd_line = job_wrapper.setup_job(history, store_directory, include_hidden=incoming['include_hidden'], + include_deleted=incoming['include_deleted'], + compressed=compressed) # # Add parameters to job_parameter table. @@ -140,6 +146,22 @@ def execute(self, tool, trans, incoming={}, set_output_hid=False, overwrite=True # Set additional parameters. incoming['__HISTORY_TO_EXPORT__'] = history.id incoming['__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__'] = cmd_line + if exporting_to_uri: + directory_uri = incoming["directory_uri"] + file_name = incoming.get("file_name") + if file_name is None: + hname = ready_name_for_url(history.name) + human_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + if compressed: + extension = ".tar.gz" + else: + extension = ".tar" + file_name = f"Galaxy-History-{hname}-{human_timestamp}.{extension}" + + file_name = os.path.basename(os.path.abspath(file_name)) + sep = "" if directory_uri.endswith("/") else "/" + incoming['__EXPORT_TO_URI__'] = f"{directory_uri}{sep}{file_name}" + for name, value in tool.params_to_strings(incoming, trans.app).items(): job.add_parameter(name, value) diff --git a/lib/galaxy/tools/imp_exp/__init__.py b/lib/galaxy/tools/imp_exp/__init__.py index 09ce4d451376..e8b74b87191c 100644 --- a/lib/galaxy/tools/imp_exp/__init__.py +++ b/lib/galaxy/tools/imp_exp/__init__.py @@ -10,8 +10,6 @@ log = logging.getLogger(__name__) -ATTRS_FILENAME_HISTORY = 'history_attrs.txt' - class JobImportHistoryArchiveWrapper: """ @@ -80,33 +78,24 @@ def __init__(self, app, job_id): self.job_id = job_id self.sa_session = self.app.model.context - def setup_job(self, jeha, include_hidden=False, include_deleted=False): - """ Perform setup for job to export a history into an archive. Method generates - attribute files for export, sets the corresponding attributes in the jeha - object, and returns a command line for running the job. The command line - includes the command, inputs, and options; it does not include the output - file because it must be set at runtime. """ + def setup_job(self, history, store_directory, include_hidden=False, include_deleted=False, compressed=True): + """Perform setup for job to export a history into an archive. + Method generates attribute files for export, sets the corresponding attributes + in the jeha object, and returns a command line for running the job. The command + line includes the command, inputs, and options; it does not include the output + file because it must be set at runtime. + """ app = self.app - # - # Create attributes/metadata files for export. - # - jeha.dataset.create_extra_files_path() - temp_output_dir = jeha.dataset.extra_files_path - - history = jeha.history - history_attrs_filename = os.path.join(temp_output_dir, ATTRS_FILENAME_HISTORY) - jeha.history_attrs_filename = history_attrs_filename - # symlink files on export, on worker files will tarred up in a dereferenced manner. - with store.DirectoryModelExportStore(temp_output_dir, app=app, export_files="symlink") as export_store: + with store.DirectoryModelExportStore(store_directory, app=app, export_files="symlink") as export_store: export_store.export_history(history, include_hidden=include_hidden, include_deleted=include_deleted) # # Create and return command line for running tool. # - options = "--galaxy-version '%s'" % VERSION_MAJOR - if jeha.compressed: + options = f"--galaxy-version '{VERSION_MAJOR}'" + if compressed: options += " -G" - return f"{options} {temp_output_dir}" + return f"{options} {store_directory}" diff --git a/lib/galaxy/tools/imp_exp/exp_history_to_uri.xml b/lib/galaxy/tools/imp_exp/exp_history_to_uri.xml new file mode 100644 index 000000000000..3aa54fc5772a --- /dev/null +++ b/lib/galaxy/tools/imp_exp/exp_history_to_uri.xml @@ -0,0 +1,16 @@ + + + + python '$export_history' --file-sources '$file_sources' $__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__ '$__EXPORT_TO_URI__' + + + + + + + + + from galaxy.tools.imp_exp.export_history import main; main() + + + diff --git a/lib/galaxy/tools/imp_exp/export_history.py b/lib/galaxy/tools/imp_exp/export_history.py index 67a941e3f239..856fc7410570 100644 --- a/lib/galaxy/tools/imp_exp/export_history.py +++ b/lib/galaxy/tools/imp_exp/export_history.py @@ -6,7 +6,9 @@ -G, --gzip: gzip archive file """ +import json import optparse +import os import shutil import sys @@ -33,6 +35,7 @@ def main(argv=None): parser = optparse.OptionParser() parser.add_option('-G', '--gzip', dest='gzip', action="store_true", help='Compress archive using gzip.') parser.add_option('--galaxy-version', dest='galaxy_version', help='Galaxy version that initiated the command.', default=None) + parser.add_option('--file-sources', type=str, help='file sources json') (options, args) = parser.parse_args(argv) galaxy_version = options.galaxy_version if galaxy_version is None: @@ -41,10 +44,37 @@ def main(argv=None): gzip = bool(options.gzip) assert len(args) >= 2 temp_directory = args[0] - out_file = args[1] + out_arg = args[1] + destination_uri = None + if "://" in out_arg: + # writing to a file source instead of a dataset path. + destination_uri = out_arg + out_file = "./temp_out_archive" + else: + out_file = out_arg # Create archive. - return create_archive(temp_directory, out_file, gzip=gzip) + exit = create_archive(temp_directory, out_file, gzip=gzip) + if destination_uri is not None and exit == 0: + _write_to_destination(options.file_sources, os.path.abspath(out_file), destination_uri) + return exit + + +def _write_to_destination(file_sources_path, out_file, destination_uri): + file_sources = get_file_sources(file_sources_path) + file_source_path = file_sources.get_file_source_path(destination_uri) + file_source = file_source_path.file_source + assert os.path.exists(out_file) + file_source.write_from(file_source_path.path, out_file) + + +def get_file_sources(file_sources_path): + assert os.path.exists(file_sources_path), "file sources path [%s] does not exist" % file_sources_path + from galaxy.files import ConfiguredFileSources + with open(file_sources_path) as f: + file_sources_as_dict = json.load(f) + file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict) + return file_sources if __name__ == "__main__": diff --git a/lib/galaxy/tools/special_tools.py b/lib/galaxy/tools/special_tools.py index 5665118bce2f..952cb1bc9bad 100644 --- a/lib/galaxy/tools/special_tools.py +++ b/lib/galaxy/tools/special_tools.py @@ -5,6 +5,7 @@ SPECIAL_TOOLS = { "history export": "imp_exp/exp_history_to_archive.xml", + "history export to uri": "imp_exp/exp_history_to_uri.xml", "history import": "imp_exp/imp_history_from_archive.xml", "data fetch": "data_fetch.xml", } diff --git a/lib/galaxy/webapps/base/controller.py b/lib/galaxy/webapps/base/controller.py index ec11105714bf..3e7ca1fc63e5 100644 --- a/lib/galaxy/webapps/base/controller.py +++ b/lib/galaxy/webapps/base/controller.py @@ -410,7 +410,7 @@ def serve_ready_history_export(self, trans, jeha): archive = trans.app.object_store.get_filename(jeha.dataset) return open(archive, mode='rb') - def queue_history_export(self, trans, history, gzip=True, include_hidden=False, include_deleted=False): + def queue_history_export(self, trans, history, gzip=True, include_hidden=False, include_deleted=False, directory_uri=None, file_name=None): # Convert options to booleans. if isinstance(gzip, str): gzip = (gzip in ['True', 'true', 'T', 't']) @@ -419,8 +419,6 @@ def queue_history_export(self, trans, history, gzip=True, include_hidden=False, if isinstance(include_deleted, str): include_deleted = (include_deleted in ['True', 'true', 'T', 't']) - # Run job to do export. - history_exp_tool = trans.app.toolbox.get_tool('__EXPORT_HISTORY__') params = { 'history_to_export': history, 'compress': gzip, @@ -428,7 +426,17 @@ def queue_history_export(self, trans, history, gzip=True, include_hidden=False, 'include_deleted': include_deleted } - history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True) + if directory_uri is None: + export_tool_id = '__EXPORT_HISTORY__' + else: + params['directory_uri'] = directory_uri + params['file_name'] = file_name or None + export_tool_id = '__EXPORT_HISTORY_URI__' + + # Run job to do export. + history_exp_tool = trans.app.toolbox.get_tool(export_tool_id) + job, _ = history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True) + return job class ImportsHistoryMixin: diff --git a/lib/galaxy/webapps/galaxy/api/histories.py b/lib/galaxy/webapps/galaxy/api/histories.py index 6c28784c4cdc..274a8d48f984 100644 --- a/lib/galaxy/webapps/galaxy/api/histories.py +++ b/lib/galaxy/webapps/galaxy/api/histories.py @@ -468,16 +468,34 @@ def archive_export(self, trans, id, **kwds): # in one object being created. history = self.manager.get_accessible(self.decode_id(id), trans.user, current_history=trans.history) jeha = history.latest_export - up_to_date = jeha and jeha.up_to_date - if 'force' in kwds: - up_to_date = False # Temp hack to force rebuild everytime during dev + force = 'force' in kwds # Hack to force rebuild everytime during dev + exporting_to_uri = 'directory_uri' in kwds + # always just issue a new export when exporting to a URI. + up_to_date = not force and not exporting_to_uri and (jeha and jeha.up_to_date) + job = None if not up_to_date: # Need to create new JEHA + job. gzip = kwds.get("gzip", True) include_hidden = kwds.get("include_hidden", False) include_deleted = kwds.get("include_deleted", False) - self.queue_history_export(trans, history, gzip=gzip, include_hidden=include_hidden, include_deleted=include_deleted) - + directory_uri = kwds.get("directory_uri", None) + file_name = kwds.get("file_name", None) + job = self.queue_history_export( + trans, + history, + gzip=gzip, + include_hidden=include_hidden, + include_deleted=include_deleted, + directory_uri=directory_uri, + file_name=file_name, + ) + + if exporting_to_uri: + # we don't have a jeha, there will never be a download_url. Just let + # the client poll on the created job_id to determine when the file has been + # written. + job_id = trans.security.encode_id(job.id) + return dict(job_id=job_id) if up_to_date and jeha.ready: jeha_id = trans.security.encode_id(jeha.id) return dict(download_url=url_for("history_archive_download", id=id, jeha_id=jeha_id)) diff --git a/lib/galaxy_test/api/test_histories.py b/lib/galaxy_test/api/test_histories.py index c0c43892f21f..b8ed1c8e1546 100644 --- a/lib/galaxy_test/api/test_histories.py +++ b/lib/galaxy_test/api/test_histories.py @@ -210,12 +210,7 @@ def setUp(self): def test_import_export(self): history_name = "for_export_default" - history_id = self.dataset_populator.new_history(name=history_name) - self.dataset_populator.new_dataset(history_id, content="1 2 3") - deleted_hda = self.dataset_populator.new_dataset(history_id, content="1 2 3", wait=True) - self.dataset_populator.delete_dataset(history_id, deleted_hda["id"]) - deleted_details = self.dataset_populator.get_history_dataset_details(history_id, id=deleted_hda["id"]) - assert deleted_details["deleted"] + history_id = self.dataset_populator.setup_history_for_export_testing(history_name) imported_history_id = self._reimport_history(history_id, history_name, wait_on_history_length=2) def upload_job_check(job): diff --git a/lib/galaxy_test/base/populators.py b/lib/galaxy_test/base/populators.py index f62b30cac4ed..c218968209b8 100644 --- a/lib/galaxy_test/base/populators.py +++ b/lib/galaxy_test/base/populators.py @@ -608,19 +608,37 @@ def validated(): "dataset validation" ) - def export_url(self, history_id, data, check_download=True): + def setup_history_for_export_testing(self, history_name): + history_id = self.new_history(name=history_name) + self.new_dataset(history_id, content="1 2 3") + deleted_hda = self.new_dataset(history_id, content="1 2 3", wait=True) + self.delete_dataset(history_id, deleted_hda["id"]) + deleted_details = self.get_history_dataset_details(history_id, id=deleted_hda["id"]) + assert deleted_details["deleted"] + return history_id + + def prepare_export(self, history_id, data): url = "histories/%s/exports" % history_id put_response = self._put(url, data) - api_asserts.assert_status_code_is(put_response, 202) + put_response.raise_for_status() - def export_ready_response(): - put_response = self._put(url) - if put_response.status_code == 202: - return None + if put_response.status_code == 202: + def export_ready_response(): + put_response = self._put(url) + if put_response.status_code == 202: + return None + return put_response + + put_response = wait_on(export_ready_response, desc="export ready") + api_asserts.assert_status_code_is(put_response, 200) return put_response + else: + job_desc = put_response.json() + assert "job_id" in job_desc + return self.wait_for_job(job_desc["job_id"]) - put_response = wait_on(export_ready_response, desc="export ready") - api_asserts.assert_status_code_is(put_response, 200) + def export_url(self, history_id, data, check_download=True): + put_response = self.prepare_export(history_id, data) response = put_response.json() api_asserts.assert_has_keys(response, "download_url") download_url = response["download_url"] diff --git a/test/integration/test_remote_files_histories.py b/test/integration/test_remote_files_histories.py index a03a968de434..06425fa8df9d 100644 --- a/test/integration/test_remote_files_histories.py +++ b/test/integration/test_remote_files_histories.py @@ -30,3 +30,32 @@ def test_history_import_from_ftp_dir(self): imported_history_id = self.dataset_populator.import_history_and_wait_for_name(import_data, "API Test History") self.dataset_populator.wait_on_history_length(imported_history_id, 2) self.dataset_populator.delete_history(imported_history_id) + + def test_history_export_to_ftp_dir(self): + # need to reference user_ftp_dir before test to ensure directory is created + assert not os.path.exists(os.path.join(self.user_ftp_dir, "from_export.tgz")) + + history_name = "for_export_default" + history_id = self.dataset_populator.setup_history_for_export_testing(history_name) + export_data = { + "directory_uri": "gxftp://", + "file_name": "from_export.tgz", + } + self.dataset_populator.prepare_export( + history_id, + export_data, + ) + assert os.path.exists(os.path.join(self.user_ftp_dir, "from_export.tgz")) + + # Also test with default file_name... + export_data = { + "directory_uri": "gxftp://", + } + self.dataset_populator.prepare_export( + history_id, + export_data, + ) + exports = os.listdir(self.user_ftp_dir) + assert len(exports) == 2 + default_exports = [e for e in exports if e.startswith("Galaxy-History-")] + assert len(default_exports) == 1 diff --git a/test/unit/tools/test_history_imp_exp.py b/test/unit/tools/test_history_imp_exp.py index 070db91aca6c..fe3e8cd49670 100644 --- a/test/unit/tools/test_history_imp_exp.py +++ b/test/unit/tools/test_history_imp_exp.py @@ -621,13 +621,12 @@ def _import_export(app, h, dest_export=None): dest_parent = tempfile.mkdtemp() dest_export = os.path.join(dest_parent, "moo.tgz") - dataset = model.Dataset(id=100) - - jeha = model.JobExportHistoryArchive(job=model.Job(), history=h, - dataset=dataset, - compressed=True) - wrapper = JobExportHistoryArchiveWrapper(app, 1) - wrapper.setup_job(jeha) + job = model.Job() + jeha = model.JobExportHistoryArchive.create_for_history( + h, job, app.model.context, app.object_store, compressed=True + ) + wrapper = JobExportHistoryArchiveWrapper(app, job.id) + wrapper.setup_job(h, jeha.temp_directory) from galaxy.tools.imp_exp import export_history ret = export_history.main(["--gzip", jeha.temp_directory, dest_export])