Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API support for history import/export from/to galaxy.files plugins #10993

Merged
merged 4 commits into from
Dec 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/galaxy/job_execution/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ def ensure_configs_directory(work_dir):
if not os.path.exists(configs_dir):
safe_makedirs(configs_dir)
return configs_dir


def create_working_directory_for_job(object_store, job):
object_store.create(
job, base_dir='job_work', dir_only=True, obj_dir=True)
working_directory = object_store.get_filename(
job, base_dir='job_work', dir_only=True, obj_dir=True)
return working_directory
11 changes: 5 additions & 6 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
TaskPathRewriter
)
from galaxy.job_execution.output_collect import collect_extra_files
from galaxy.job_execution.setup import ensure_configs_directory
from galaxy.job_execution.setup import (
create_working_directory_for_job,
ensure_configs_directory,
)
from galaxy.jobs.actions.post import ActionBox
from galaxy.jobs.mapper import (
JobMappingException,
Expand Down Expand Up @@ -1190,11 +1193,7 @@ def tool_working_directory(self):
return os.path.join(self.working_directory, "working")

def _create_working_directory(self, job):
self.object_store.create(
job, base_dir='job_work', dir_only=True, obj_dir=True)
working_directory = self.object_store.get_filename(
job, base_dir='job_work', dir_only=True, obj_dir=True)
return working_directory
return create_working_directory_for_job(self.object_store, job)

def clear_working_directory(self):
job = self.get_job()
Expand Down
27 changes: 27 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,8 @@ def __eq__(self, other):


class JobExportHistoryArchive(RepresentById):
ATTRS_FILENAME_HISTORY = 'history_attrs.txt'

def __init__(self, job=None, history=None, dataset=None, compressed=False,
history_attrs_filename=None):
self.job = job
Expand Down Expand Up @@ -1627,6 +1629,31 @@ def export_name(self):
hname += ".gz"
return hname

@staticmethod
def create_for_history(history, job, sa_session, object_store, compressed):
# Create dataset that will serve as archive.
archive_dataset = Dataset()
sa_session.add(archive_dataset)
sa_session.flush() # ensure job.id and archive_dataset.id are available
object_store.create(archive_dataset) # set the object store id, create dataset (if applicable)
# Add association for keeping track of job, history, archive relationship.
jeha = JobExportHistoryArchive(
job=job, history=history,
dataset=archive_dataset,
compressed=compressed
)
sa_session.add(jeha)

#
# Create attributes/metadata files for export.
#
jeha.dataset.create_extra_files_path()
temp_output_dir = jeha.dataset.extra_files_path

history_attrs_filename = os.path.join(temp_output_dir, jeha.ATTRS_FILENAME_HISTORY)
jeha.history_attrs_filename = history_attrs_filename
return jeha


class JobImportHistoryArchive(RepresentById):
def __init__(self, job=None, history=None, archive_dir=None):
Expand Down
60 changes: 41 additions & 19 deletions lib/galaxy/tools/actions/history_imp_exp.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import datetime
import logging
import os
import tempfile
from collections import OrderedDict

from galaxy.job_execution.setup import create_working_directory_for_job
from galaxy.tools.actions import ToolAction
from galaxy.tools.imp_exp import (
JobExportHistoryArchiveWrapper,
JobImportHistoryArchiveWrapper
)
from galaxy.util import ready_name_for_url

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,10 +102,7 @@ def execute(self, tool, trans, incoming={}, set_output_hid=False, overwrite=True
job.galaxy_version = trans.app.config.version_major
session = trans.get_galaxy_session()
job.session_id = session and session.id
if history:
history_id = history.id
else:
history_id = trans.history.id
history_id = history.id
job.history_id = history_id
job.tool_id = tool.id
if trans.user:
Expand All @@ -112,26 +112,32 @@ def execute(self, tool, trans, incoming={}, set_output_hid=False, overwrite=True
job.state = job.states.WAITING # we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters
trans.sa_session.add(job)

# Create dataset that will serve as archive.
archive_dataset = trans.app.model.Dataset()
trans.sa_session.add(archive_dataset)

trans.sa_session.flush() # ensure job.id and archive_dataset.id are available
trans.app.object_store.create(archive_dataset) # set the object store id, create dataset (if applicable)
compressed = incoming['compress']
exporting_to_uri = "directory_uri" in incoming
if not exporting_to_uri:
# see comment below about how this should be transitioned to occuring in a
# job handler or detached MQ-driven thread
jeha = trans.app.model.JobExportHistoryArchive.create_for_history(
history, job, trans.sa_session, trans.app.object_store, compressed
)
store_directory = jeha.temp_directory
else:
# creating a job directory in the web thread is bad (it is slow, bypasses
# dynamic objectstore assignment, etc..) but it is arguably less bad than
# creating a dataset (like above for dataset export case).
# ensure job.id is available
trans.sa_session.flush()
job_directory = create_working_directory_for_job(trans.app.object_store, job)
store_directory = os.path.join(job_directory, "working", "_object_export")
os.makedirs(store_directory)

#
# Setup job and job wrapper.
#

# Add association for keeping track of job, history, archive relationship.
jeha = trans.app.model.JobExportHistoryArchive(job=job, history=history,
dataset=archive_dataset,
compressed=incoming['compress'])
trans.sa_session.add(jeha)

job_wrapper = JobExportHistoryArchiveWrapper(trans.app, job)
cmd_line = job_wrapper.setup_job(jeha, include_hidden=incoming['include_hidden'],
include_deleted=incoming['include_deleted'])
cmd_line = job_wrapper.setup_job(history, store_directory, include_hidden=incoming['include_hidden'],
include_deleted=incoming['include_deleted'],
compressed=compressed)

#
# Add parameters to job_parameter table.
Expand All @@ -140,6 +146,22 @@ def execute(self, tool, trans, incoming={}, set_output_hid=False, overwrite=True
# Set additional parameters.
incoming['__HISTORY_TO_EXPORT__'] = history.id
incoming['__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__'] = cmd_line
if exporting_to_uri:
directory_uri = incoming["directory_uri"]
file_name = incoming.get("file_name")
if file_name is None:
hname = ready_name_for_url(history.name)
human_timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
if compressed:
extension = ".tar.gz"
else:
extension = ".tar"
file_name = f"Galaxy-History-{hname}-{human_timestamp}.{extension}"

file_name = os.path.basename(os.path.abspath(file_name))
sep = "" if directory_uri.endswith("/") else "/"
incoming['__EXPORT_TO_URI__'] = f"{directory_uri}{sep}{file_name}"

for name, value in tool.params_to_strings(incoming, trans.app).items():
job.add_parameter(name, value)

Expand Down
5 changes: 4 additions & 1 deletion lib/galaxy/tools/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,4 +647,7 @@ def _history(self):
@property
def _user(self):
history = self._history
return history and history.user
if history:
return history.user
else:
return self.job.user
33 changes: 11 additions & 22 deletions lib/galaxy/tools/imp_exp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

log = logging.getLogger(__name__)

ATTRS_FILENAME_HISTORY = 'history_attrs.txt'


class JobImportHistoryArchiveWrapper:
"""
Expand Down Expand Up @@ -80,33 +78,24 @@ def __init__(self, app, job_id):
self.job_id = job_id
self.sa_session = self.app.model.context

def setup_job(self, jeha, include_hidden=False, include_deleted=False):
""" Perform setup for job to export a history into an archive. Method generates
attribute files for export, sets the corresponding attributes in the jeha
object, and returns a command line for running the job. The command line
includes the command, inputs, and options; it does not include the output
file because it must be set at runtime. """
def setup_job(self, history, store_directory, include_hidden=False, include_deleted=False, compressed=True):
"""Perform setup for job to export a history into an archive.

Method generates attribute files for export, sets the corresponding attributes
in the jeha object, and returns a command line for running the job. The command
line includes the command, inputs, and options; it does not include the output
file because it must be set at runtime.
"""
app = self.app

#
# Create attributes/metadata files for export.
#
jeha.dataset.create_extra_files_path()
temp_output_dir = jeha.dataset.extra_files_path

history = jeha.history
history_attrs_filename = os.path.join(temp_output_dir, ATTRS_FILENAME_HISTORY)
jeha.history_attrs_filename = history_attrs_filename

# symlink files on export, on worker files will tarred up in a dereferenced manner.
with store.DirectoryModelExportStore(temp_output_dir, app=app, export_files="symlink") as export_store:
with store.DirectoryModelExportStore(store_directory, app=app, export_files="symlink") as export_store:
export_store.export_history(history, include_hidden=include_hidden, include_deleted=include_deleted)

#
# Create and return command line for running tool.
#
options = "--galaxy-version '%s'" % VERSION_MAJOR
if jeha.compressed:
options = f"--galaxy-version '{VERSION_MAJOR}'"
if compressed:
options += " -G"
return f"{options} {temp_output_dir}"
return f"{options} {store_directory}"
16 changes: 16 additions & 0 deletions lib/galaxy/tools/imp_exp/exp_history_to_uri.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<tool id="__EXPORT_HISTORY_URI__" name="Export History to URI" version="0.1" tool_type="export_history">
<type class="ExportHistoryTool" module="galaxy.tools"/>
<action module="galaxy.tools.actions.history_imp_exp" class="ExportHistoryToolAction"/>
<command>python '$export_history' --file-sources '$file_sources' $__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__ '$__EXPORT_TO_URI__'</command>
<inputs>
<param name="__HISTORY_TO_EXPORT__" type="hidden"/>
<param name="compress" type="boolean"/>
<param name="__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__" type="hidden"/>
<param name="directory_uri" type="directory_uri" />
<param name="file_name" type="text" optional="true" />
</inputs>
<configfiles>
<configfile name="export_history">from galaxy.tools.imp_exp.export_history import main; main()</configfile>
<file_sources name="file_sources" />
</configfiles>
</tool>
53 changes: 32 additions & 21 deletions lib/galaxy/tools/imp_exp/export_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
-G, --gzip: gzip archive file
"""

import json
import optparse
import os
import shutil
Expand Down Expand Up @@ -34,36 +35,46 @@ def main(argv=None):
parser = optparse.OptionParser()
parser.add_option('-G', '--gzip', dest='gzip', action="store_true", help='Compress archive using gzip.')
parser.add_option('--galaxy-version', dest='galaxy_version', help='Galaxy version that initiated the command.', default=None)
parser.add_option('--file-sources', type=str, help='file sources json')
(options, args) = parser.parse_args(argv)
galaxy_version = options.galaxy_version
if galaxy_version is None:
galaxy_version = "19.01" if len(args) == 4 else "19.05"
galaxy_version = "19.05"

gzip = bool(options.gzip)
if galaxy_version == "19.01":
# This job was created pre 18.0X with old argument style.
out_file = args[3]
temp_directory = os.path.dirname(args[0])
assert len(args) >= 2
temp_directory = args[0]
out_arg = args[1]

destination_uri = None
if "://" in out_arg:
# writing to a file source instead of a dataset path.
destination_uri = out_arg
out_file = "./temp_out_archive"
else:
assert len(args) >= 2
# We have a 19.0X directory argument instead of individual arguments.
temp_directory = args[0]
out_file = args[1]
out_file = out_arg
# Create archive.
exit = create_archive(temp_directory, out_file, gzip=gzip)
if destination_uri is not None and exit == 0:
_write_to_destination(options.file_sources, os.path.abspath(out_file), destination_uri)
return exit

if galaxy_version == "19.01":
history_attrs = os.path.join(temp_directory, 'history_attrs.txt')
dataset_attrs = os.path.join(temp_directory, 'datasets_attrs.txt')
job_attrs = os.path.join(temp_directory, 'jobs_attrs.txt')

shutil.move(args[0], history_attrs)
shutil.move(args[1], dataset_attrs)
provenance_path = args[1] + ".provenance"
if os.path.exists(provenance_path):
shutil.move(provenance_path, dataset_attrs + ".provenance")
shutil.move(args[2], job_attrs)
def _write_to_destination(file_sources_path, out_file, destination_uri):
file_sources = get_file_sources(file_sources_path)
file_source_path = file_sources.get_file_source_path(destination_uri)
file_source = file_source_path.file_source
assert os.path.exists(out_file)
file_source.write_from(file_source_path.path, out_file)

# Create archive.
return create_archive(temp_directory, out_file, gzip=gzip)

def get_file_sources(file_sources_path):
assert os.path.exists(file_sources_path), "file sources path [%s] does not exist" % file_sources_path
from galaxy.files import ConfiguredFileSources
with open(file_sources_path) as f:
file_sources_as_dict = json.load(f)
file_sources = ConfiguredFileSources.from_dict(file_sources_as_dict)
return file_sources


if __name__ == "__main__":
Expand Down
9 changes: 5 additions & 4 deletions lib/galaxy/tools/imp_exp/imp_history_from_archive.xml
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
<tool id="__IMPORT_HISTORY__" name="Import History" version="0.1" tool_type="import_history" profile="16.04">
<type class="ImportHistoryTool" module="galaxy.tools"/>
<action module="galaxy.tools.actions.history_imp_exp" class="ImportHistoryToolAction"/>
<requirements>
<requirement type="package" version="2.23.0">requests</requirement>
</requirements>
<!-- eliminated requirements again, now requires galaxy.files to resolve URIs -->
<command>#from base64 import b64encode#
python '$__tool_directory__/unpack_tar_gz_archive.py'
'${ b64encode(str($__ARCHIVE_SOURCE__).encode('utf-8')).decode('utf-8')}'
'${ b64encode(str($__DEST_DIR__).encode('utf-8')).decode('utf-8')}'
--$__ARCHIVE_TYPE__ --encoded</command>
--$__ARCHIVE_TYPE__ --file-sources '$file_sources' --encoded</command>
<inputs>
<param name="__ARCHIVE_SOURCE__" type="text">
<sanitizer sanitize="False"/>
Expand All @@ -18,4 +16,7 @@ python '$__tool_directory__/unpack_tar_gz_archive.py'
<sanitizer sanitize="False"/>
</param>
</inputs>
<configfiles>
<file_sources name="file_sources" />
</configfiles>
</tool>
Loading