Skip to content

Commit

Permalink
Allow history export to galaxy.files plugins.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Dec 26, 2020
1 parent d1d85b0 commit ca5c84e
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 79 deletions.
8 changes: 8 additions & 0 deletions lib/galaxy/job_execution/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ def ensure_configs_directory(work_dir):
if not os.path.exists(configs_dir):
safe_makedirs(configs_dir)
return configs_dir


def create_working_directory_for_job(object_store, job):
object_store.create(
job, base_dir='job_work', dir_only=True, obj_dir=True)
working_directory = object_store.get_filename(
job, base_dir='job_work', dir_only=True, obj_dir=True)
return working_directory
11 changes: 5 additions & 6 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
TaskPathRewriter
)
from galaxy.job_execution.output_collect import collect_extra_files
from galaxy.job_execution.setup import ensure_configs_directory
from galaxy.job_execution.setup import (
create_working_directory_for_job,
ensure_configs_directory,
)
from galaxy.jobs.actions.post import ActionBox
from galaxy.jobs.mapper import (
JobMappingException,
Expand Down Expand Up @@ -1190,11 +1193,7 @@ def tool_working_directory(self):
return os.path.join(self.working_directory, "working")

def _create_working_directory(self, job):
self.object_store.create(
job, base_dir='job_work', dir_only=True, obj_dir=True)
working_directory = self.object_store.get_filename(
job, base_dir='job_work', dir_only=True, obj_dir=True)
return working_directory
return create_working_directory_for_job(self.object_store, job)

def clear_working_directory(self):
job = self.get_job()
Expand Down
27 changes: 27 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,8 @@ def __eq__(self, other):


class JobExportHistoryArchive(RepresentById):
ATTRS_FILENAME_HISTORY = 'history_attrs.txt'

def __init__(self, job=None, history=None, dataset=None, compressed=False,
history_attrs_filename=None):
self.job = job
Expand Down Expand Up @@ -1627,6 +1629,31 @@ def export_name(self):
hname += ".gz"
return hname

@staticmethod
def create_for_history(history, job, sa_session, object_store, compressed):
# Create dataset that will serve as archive.
archive_dataset = Dataset()
sa_session.add(archive_dataset)
sa_session.flush() # ensure job.id and archive_dataset.id are available
object_store.create(archive_dataset) # set the object store id, create dataset (if applicable)
# Add association for keeping track of job, history, archive relationship.
jeha = JobExportHistoryArchive(
job=job, history=history,
dataset=archive_dataset,
compressed=compressed
)
sa_session.add(jeha)

#
# Create attributes/metadata files for export.
#
jeha.dataset.create_extra_files_path()
temp_output_dir = jeha.dataset.extra_files_path

history_attrs_filename = os.path.join(temp_output_dir, jeha.ATTRS_FILENAME_HISTORY)
jeha.history_attrs_filename = history_attrs_filename
return jeha


class JobImportHistoryArchive(RepresentById):
def __init__(self, job=None, history=None, archive_dir=None):
Expand Down
60 changes: 41 additions & 19 deletions lib/galaxy/tools/actions/history_imp_exp.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import datetime
import logging
import os
import tempfile
from collections import OrderedDict

from galaxy.job_execution.setup import create_working_directory_for_job
from galaxy.tools.actions import ToolAction
from galaxy.tools.imp_exp import (
JobExportHistoryArchiveWrapper,
JobImportHistoryArchiveWrapper
)
from galaxy.util import ready_name_for_url

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,10 +102,7 @@ def execute(self, tool, trans, incoming={}, set_output_hid=False, overwrite=True
job.galaxy_version = trans.app.config.version_major
session = trans.get_galaxy_session()
job.session_id = session and session.id
if history:
history_id = history.id
else:
history_id = trans.history.id
history_id = history.id
job.history_id = history_id
job.tool_id = tool.id
if trans.user:
Expand All @@ -112,26 +112,32 @@ def execute(self, tool, trans, incoming={}, set_output_hid=False, overwrite=True
job.state = job.states.WAITING # we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters
trans.sa_session.add(job)

# Create dataset that will serve as archive.
archive_dataset = trans.app.model.Dataset()
trans.sa_session.add(archive_dataset)

trans.sa_session.flush() # ensure job.id and archive_dataset.id are available
trans.app.object_store.create(archive_dataset) # set the object store id, create dataset (if applicable)
compressed = incoming['compress']
exporting_to_uri = "directory_uri" in incoming
if not exporting_to_uri:
# see comment below about how this should be transitioned to occuring in a
# job handler or detached MQ-driven thread
jeha = trans.app.model.JobExportHistoryArchive.create_for_history(
history, job, trans.sa_session, trans.app.object_store, compressed
)
store_directory = jeha.temp_directory
else:
# creating a job directory in the web thread is bad (it is slow, bypasses
# dynamic objectstore assignment, etc..) but it is arguably less bad than
# creating a dataset (like above for dataset export case).
# ensure job.id is available
trans.sa_session.flush()
job_directory = create_working_directory_for_job(trans.app.object_store, job)
store_directory = os.path.join(job_directory, "working", "_object_export")
os.makedirs(store_directory)

#
# Setup job and job wrapper.
#

# Add association for keeping track of job, history, archive relationship.
jeha = trans.app.model.JobExportHistoryArchive(job=job, history=history,
dataset=archive_dataset,
compressed=incoming['compress'])
trans.sa_session.add(jeha)

job_wrapper = JobExportHistoryArchiveWrapper(trans.app, job)
cmd_line = job_wrapper.setup_job(jeha, include_hidden=incoming['include_hidden'],
include_deleted=incoming['include_deleted'])
cmd_line = job_wrapper.setup_job(history, store_directory, include_hidden=incoming['include_hidden'],
include_deleted=incoming['include_deleted'],
compressed=compressed)

#
# Add parameters to job_parameter table.
Expand All @@ -140,6 +146,22 @@ def execute(self, tool, trans, incoming={}, set_output_hid=False, overwrite=True
# Set additional parameters.
incoming['__HISTORY_TO_EXPORT__'] = history.id
incoming['__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__'] = cmd_line
if exporting_to_uri:
directory_uri = incoming["directory_uri"]
file_name = incoming.get("file_name")
if file_name is None:
hname = ready_name_for_url(history.name)
human_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
if compressed:
extension = ".tar.gz"
else:
extension = ".tar"
file_name = f"Galaxy-History-{hname}-{human_timestamp}.{extension}"

file_name = os.path.basename(os.path.abspath(file_name))
sep = "" if directory_uri.endswith("/") else "/"
incoming['__EXPORT_TO_URI__'] = f"{directory_uri}{sep}{file_name}"

for name, value in tool.params_to_strings(incoming, trans.app).items():
job.add_parameter(name, value)

Expand Down
33 changes: 11 additions & 22 deletions lib/galaxy/tools/imp_exp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

log = logging.getLogger(__name__)

ATTRS_FILENAME_HISTORY = 'history_attrs.txt'


class JobImportHistoryArchiveWrapper:
"""
Expand Down Expand Up @@ -80,33 +78,24 @@ def __init__(self, app, job_id):
self.job_id = job_id
self.sa_session = self.app.model.context

def setup_job(self, jeha, include_hidden=False, include_deleted=False):
""" Perform setup for job to export a history into an archive. Method generates
attribute files for export, sets the corresponding attributes in the jeha
object, and returns a command line for running the job. The command line
includes the command, inputs, and options; it does not include the output
file because it must be set at runtime. """
def setup_job(self, history, store_directory, include_hidden=False, include_deleted=False, compressed=True):
"""Perform setup for job to export a history into an archive.
Method generates attribute files for export, sets the corresponding attributes
in the jeha object, and returns a command line for running the job. The command
line includes the command, inputs, and options; it does not include the output
file because it must be set at runtime.
"""
app = self.app

#
# Create attributes/metadata files for export.
#
jeha.dataset.create_extra_files_path()
temp_output_dir = jeha.dataset.extra_files_path

history = jeha.history
history_attrs_filename = os.path.join(temp_output_dir, ATTRS_FILENAME_HISTORY)
jeha.history_attrs_filename = history_attrs_filename

# symlink files on export, on worker files will tarred up in a dereferenced manner.
with store.DirectoryModelExportStore(temp_output_dir, app=app, export_files="symlink") as export_store:
with store.DirectoryModelExportStore(store_directory, app=app, export_files="symlink") as export_store:
export_store.export_history(history, include_hidden=include_hidden, include_deleted=include_deleted)

#
# Create and return command line for running tool.
#
options = "--galaxy-version '%s'" % VERSION_MAJOR
if jeha.compressed:
options = f"--galaxy-version '{VERSION_MAJOR}'"
if compressed:
options += " -G"
return f"{options} {temp_output_dir}"
return f"{options} {store_directory}"
16 changes: 16 additions & 0 deletions lib/galaxy/tools/imp_exp/exp_history_to_uri.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<tool id="__EXPORT_HISTORY_URI__" name="Export History to URI" version="0.1" tool_type="export_history">
<type class="ExportHistoryTool" module="galaxy.tools"/>
<action module="galaxy.tools.actions.history_imp_exp" class="ExportHistoryToolAction"/>
<command>python '$export_history' --file-sources '$file_sources' $__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__ '$__EXPORT_TO_URI__'</command>
<inputs>
<param name="__HISTORY_TO_EXPORT__" type="hidden"/>
<param name="compress" type="boolean"/>
<param name="__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__" type="hidden"/>
<param name="directory_uri" type="directory_uri" />
<param name="file_name" type="text" optional="true" />
</inputs>
<configfiles>
<configfile name="export_history">from galaxy.tools.imp_exp.export_history import main; main()</configfile>
<file_sources name="file_sources" />
</configfiles>
</tool>
34 changes: 32 additions & 2 deletions lib/galaxy/tools/imp_exp/export_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
-G, --gzip: gzip archive file
"""

import json
import optparse
import os
import shutil
import sys

Expand All @@ -33,6 +35,7 @@ def main(argv=None):
parser = optparse.OptionParser()
parser.add_option('-G', '--gzip', dest='gzip', action="store_true", help='Compress archive using gzip.')
parser.add_option('--galaxy-version', dest='galaxy_version', help='Galaxy version that initiated the command.', default=None)
parser.add_option('--file-sources', type=str, help='file sources json')
(options, args) = parser.parse_args(argv)
galaxy_version = options.galaxy_version
if galaxy_version is None:
Expand All @@ -41,10 +44,37 @@ def main(argv=None):
gzip = bool(options.gzip)
assert len(args) >= 2
temp_directory = args[0]
out_file = args[1]
out_arg = args[1]

destination_uri = None
if "://" in out_arg:
# writing to a file source instead of a dataset path.
destination_uri = out_arg
out_file = "./temp_out_archive"
else:
out_file = out_arg
# Create archive.
return create_archive(temp_directory, out_file, gzip=gzip)
exit = create_archive(temp_directory, out_file, gzip=gzip)
if destination_uri is not None and exit == 0:
_write_to_destination(options.file_sources, os.path.abspath(out_file), destination_uri)
return exit


def _write_to_destination(file_sources_path, out_file, destination_uri):
file_sources = get_file_sources(file_sources_path)
file_source_path = file_sources.get_file_source_path(destination_uri)
file_source = file_source_path.file_source
assert os.path.exists(out_file)
file_source.write_from(file_source_path.path, out_file)


def get_file_sources(file_sources_path):
assert os.path.exists(file_sources_path), "file sources path [%s] does not exist" % file_sources_path
from galaxy.files import ConfiguredFileSources
with open(file_sources_path) as f:
file_sources_as_dict = json.load(f)
file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict)
return file_sources


if __name__ == "__main__":
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/tools/special_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

SPECIAL_TOOLS = {
"history export": "imp_exp/exp_history_to_archive.xml",
"history export to uri": "imp_exp/exp_history_to_uri.xml",
"history import": "imp_exp/imp_history_from_archive.xml",
"data fetch": "data_fetch.xml",
}
Expand Down
16 changes: 12 additions & 4 deletions lib/galaxy/webapps/base/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ def serve_ready_history_export(self, trans, jeha):
archive = trans.app.object_store.get_filename(jeha.dataset)
return open(archive, mode='rb')

def queue_history_export(self, trans, history, gzip=True, include_hidden=False, include_deleted=False):
def queue_history_export(self, trans, history, gzip=True, include_hidden=False, include_deleted=False, directory_uri=None, file_name=None):
# Convert options to booleans.
if isinstance(gzip, str):
gzip = (gzip in ['True', 'true', 'T', 't'])
Expand All @@ -419,16 +419,24 @@ def queue_history_export(self, trans, history, gzip=True, include_hidden=False,
if isinstance(include_deleted, str):
include_deleted = (include_deleted in ['True', 'true', 'T', 't'])

# Run job to do export.
history_exp_tool = trans.app.toolbox.get_tool('__EXPORT_HISTORY__')
params = {
'history_to_export': history,
'compress': gzip,
'include_hidden': include_hidden,
'include_deleted': include_deleted
}

history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True)
if directory_uri is None:
export_tool_id = '__EXPORT_HISTORY__'
else:
params['directory_uri'] = directory_uri
params['file_name'] = file_name or None
export_tool_id = '__EXPORT_HISTORY_URI__'

# Run job to do export.
history_exp_tool = trans.app.toolbox.get_tool(export_tool_id)
job, _ = history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True)
return job


class ImportsHistoryMixin:
Expand Down
28 changes: 23 additions & 5 deletions lib/galaxy/webapps/galaxy/api/histories.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,16 +468,34 @@ def archive_export(self, trans, id, **kwds):
# in one object being created.
history = self.manager.get_accessible(self.decode_id(id), trans.user, current_history=trans.history)
jeha = history.latest_export
up_to_date = jeha and jeha.up_to_date
if 'force' in kwds:
up_to_date = False # Temp hack to force rebuild everytime during dev
force = 'force' in kwds # Hack to force rebuild everytime during dev
exporting_to_uri = 'directory_uri' in kwds
# always just issue a new export when exporting to a URI.
up_to_date = not force and not exporting_to_uri and (jeha and jeha.up_to_date)
job = None
if not up_to_date:
# Need to create new JEHA + job.
gzip = kwds.get("gzip", True)
include_hidden = kwds.get("include_hidden", False)
include_deleted = kwds.get("include_deleted", False)
self.queue_history_export(trans, history, gzip=gzip, include_hidden=include_hidden, include_deleted=include_deleted)

directory_uri = kwds.get("directory_uri", None)
file_name = kwds.get("file_name", None)
job = self.queue_history_export(
trans,
history,
gzip=gzip,
include_hidden=include_hidden,
include_deleted=include_deleted,
directory_uri=directory_uri,
file_name=file_name,
)

if exporting_to_uri:
# we don't have a jeha, there will never be a download_url. Just let
# the client poll on the created job_id to determine when the file has been
# written.
job_id = trans.security.encode_id(job.id)
return dict(job_id=job_id)
if up_to_date and jeha.ready:
jeha_id = trans.security.encode_id(jeha.id)
return dict(download_url=url_for("history_archive_download", id=id, jeha_id=jeha_id))
Expand Down
Loading

0 comments on commit ca5c84e

Please sign in to comment.