Skip to content

Commit

Permalink
Merge pull request #11789 from mvdbeek/history_export_task
Browse files Browse the repository at this point in the history
Run history export setup via celery, if configured
  • Loading branch information
dannon authored Apr 21, 2021
2 parents ddbc67d + 08d4a51 commit a277c24
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 44 deletions.
2 changes: 1 addition & 1 deletion client/src/components/History/model/JobStateSummary.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { STATES } from "./states";

// Job-state-summary lists

const NON_TERMINAL_STATES = [STATES.NEW, STATES.QUEUED, STATES.RUNNING];
const NON_TERMINAL_STATES = [STATES.NEW, STATES.WAITING, STATES.QUEUED, STATES.RUNNING];

const ERROR_STATES = [
STATES.ERROR,
Expand Down
2 changes: 2 additions & 0 deletions client/src/components/History/model/states.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export const STATES = {
SETTING_METADATA: "setting_metadata",
/** was created without a tool */
NEW: "new",
/** job is being created, but not put into job queue yet */
WAITING: "waiting",
/** has no data */
EMPTY: "empty",
/** has successfully completed running */
Expand Down
2 changes: 1 addition & 1 deletion client/src/mvc/history/job-states-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import AJAX_QUEUE from "utils/ajax-queue";

/** ms between fetches when checking running jobs/datasets for updates */
var UPDATE_DELAY = 2000;
var NON_TERMINAL_STATES = ["new", "queued", "running"];
var NON_TERMINAL_STATES = ["new", "queued", "running", "waiting"];
var ERROR_STATES = ["error", "deleted"];
var TERMINAL_STATES = ["ok"].concat(ERROR_STATES);
/** Fetch state on add or just wait for polling to start. */
Expand Down
18 changes: 8 additions & 10 deletions lib/galaxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ def __init__(self, **kwargs):
self._register_singleton(MinimalManagerApp, self)
self.execution_timer_factory = self._register_singleton(ExecutionTimerFactory, ExecutionTimerFactory(self.config))
self.configure_fluent_log()
self.application_stack = self._register_singleton(ApplicationStack, application_stack_instance(app=self))
# Initialize job metrics manager, needs to be in place before
# config so per-destination modifications can be made.
self.job_metrics = self._register_singleton(JobMetrics, JobMetrics(self.config.job_metrics_config_file, app=self))
# Initialize the job management configuration
self.job_config = self._register_singleton(jobs.JobConfiguration)

# Tag handler
self.tag_handler = self._register_singleton(GalaxyTagHandler)
Expand All @@ -167,6 +173,8 @@ def __init__(self, **kwargs):
self.library_folder_manager = self._register_singleton(FolderManager)
self.library_manager = self._register_singleton(LibraryManager)
self.role_manager = self._register_singleton(RoleManager)
from galaxy.jobs.manager import JobManager
self.job_manager = self._register_singleton(JobManager)

# ConfiguredFileSources
self.file_sources = self._register_singleton(ConfiguredFileSources, ConfiguredFileSources.from_app_config(self.config))
Expand Down Expand Up @@ -209,7 +217,6 @@ def __init__(self, **kwargs) -> None:
]
self._register_singleton(StructuredApp, self)
# A lot of postfork initialization depends on the server name, ensure it is set immediately after forking before other postfork functions
self.application_stack = self._register_singleton(ApplicationStack, application_stack_instance(app=self))
self.application_stack.register_postfork_function(self.application_stack.set_postfork_server_name, self)
self.config.reload_sanitize_allowlist(explicit='sanitize_allowlist_file' in kwargs)
self.amqp_internal_connection_obj = galaxy.queues.connection_from_config(self.config)
Expand All @@ -234,16 +241,9 @@ def __init__(self, **kwargs) -> None:
# Data providers registry.
self.data_provider_registry = self._register_singleton(DataProviderRegistry)

# Initialize job metrics manager, needs to be in place before
# config so per-destination modifications can be made.
self.job_metrics = self._register_singleton(JobMetrics, JobMetrics(self.config.job_metrics_config_file, app=self))

# Initialize error report plugins.
self.error_reports = self._register_singleton(ErrorReports, ErrorReports(self.config.error_report_file, app=self))

# Initialize the job management configuration
self.job_config = self._register_singleton(jobs.JobConfiguration)

# Setup a Tool Cache
self.tool_cache = self._register_singleton(ToolCache)
self.tool_shed_repository_cache = self._register_singleton(ToolShedRepositoryCache)
Expand Down Expand Up @@ -305,8 +305,6 @@ def __init__(self, **kwargs) -> None:
self.config.oidc_config_file,
self.config.oidc_backends_config_file)
# Start the job manager
from galaxy.jobs import manager
self.job_manager = self._register_singleton(manager.JobManager)
self.application_stack.register_postfork_function(self.job_manager.start)
self.proxy_manager = ProxyManager(self.config)

Expand Down
28 changes: 26 additions & 2 deletions lib/galaxy/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
scoped_session,
)

from galaxy import model
from galaxy.app import MinimalManagerApp
from galaxy.celery import celery_app
from galaxy.jobs.manager import JobManager
from galaxy.managers.hdas import HDAManager
from galaxy.managers.lddas import LDDAManager
from galaxy.model import User
from galaxy.util.custom_logging import get_logger
from . import get_galaxy_app

log = get_logger(__name__)
CELERY_TASKS = []


def galaxy_task(func):
CELERY_TASKS.append(func.__name__)
app = get_galaxy_app()
if app:
return magic_bind_to_container(app)(func)
Expand All @@ -24,7 +28,7 @@ def galaxy_task(func):
@galaxy_task
def recalculate_user_disk_usage(session: scoped_session, user_id=None):
if user_id:
user = session.query(User).get(user_id)
user = session.query(model.User).get(user_id)
if user:
user.calculate_and_set_disk_usage()
log.info(f"New user disk usage is {user.disk_usage}")
Expand All @@ -49,3 +53,23 @@ def set_metadata(hda_manager: HDAManager, ldda_manager: LDDAManager, dataset_id,
elif model_class == 'LibraryDatasetDatasetAssociation':
dataset = ldda_manager.by_id(dataset_id)
dataset.datatype.set_meta(dataset)


@celery_app.task(ignore_result=True)
@galaxy_task
def export_history(
app: MinimalManagerApp,
sa_session: scoped_session,
job_manager: JobManager,
store_directory,
history_id,
job_id,
include_hidden=False,
include_deleted=False):
history = sa_session.query(model.History).get(history_id)
with model.store.DirectoryModelExportStore(store_directory, app=app, export_files="symlink") as export_store:
export_store.export_history(history, include_hidden=include_hidden, include_deleted=include_deleted)
job = sa_session.query(model.Job).get(job_id)
job.state = model.Job.states.NEW
sa_session.flush()
job_manager.enqueue(job)
24 changes: 13 additions & 11 deletions lib/galaxy/tools/actions/history_imp_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr
if trans.user:
# If this is an actual user, run the job as that individual. Otherwise we're running as guest.
job.user_id = trans.user.id
start_job_state = job.state # should be job.states.NEW
job.state = job.states.WAITING # we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters
trans.sa_session.add(job)

Expand All @@ -135,10 +134,10 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr
#
# Setup job and job wrapper.
#
job_wrapper = JobExportHistoryArchiveWrapper(trans.app, job)
cmd_line = job_wrapper.setup_job(history, store_directory, include_hidden=incoming['include_hidden'],
include_deleted=incoming['include_deleted'],
compressed=compressed)
cmd_line = f"--galaxy-version '{job.galaxy_version}'"
if compressed:
cmd_line += " -G"
cmd_line = f"{cmd_line} {store_directory}"

#
# Add parameters to job_parameter table.
Expand All @@ -165,11 +164,14 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr

for name, value in tool.params_to_strings(incoming, trans.app).items():
job.add_parameter(name, value)

job.state = start_job_state # job inputs have been configured, restore initial job state

# Queue the job for execution
trans.app.job_manager.enqueue(job, tool=tool)
trans.log_event("Added export history job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id)
trans.sa_session.flush()

job_wrapper = JobExportHistoryArchiveWrapper(trans.app, job.id)
job_wrapper.setup_job(
history,
store_directory,
include_hidden=incoming['include_hidden'],
include_deleted=incoming['include_deleted'],
compressed=compressed)

return job, {}
26 changes: 8 additions & 18 deletions lib/galaxy/tools/imp_exp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from galaxy import model
from galaxy.model import store
from galaxy.util.path import external_chown
from galaxy.version import VERSION_MAJOR

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -93,23 +92,14 @@ def __init__(self, app, job_id):
self.sa_session = self.app.model.context

def setup_job(self, history, store_directory, include_hidden=False, include_deleted=False, compressed=True):
"""Perform setup for job to export a history into an archive.
Method generates attribute files for export, sets the corresponding attributes
in the jeha object, and returns a command line for running the job. The command
line includes the command, inputs, and options; it does not include the output
file because it must be set at runtime.
"""
Perform setup for job to export a history into an archive.
"""
app = self.app

# symlink files on export, on worker files will tarred up in a dereferenced manner.
with store.DirectoryModelExportStore(store_directory, app=app, export_files="symlink") as export_store:
export_store.export_history(history, include_hidden=include_hidden, include_deleted=include_deleted)

#
# Create and return command line for running tool.
#
options = f"--galaxy-version '{VERSION_MAJOR}'"
if compressed:
options += " -G"
return f"{options} {store_directory}"
from galaxy.celery.tasks import export_history
if app.config.enable_celery_tasks:
# symlink files on export, on worker files will tarred up in a dereferenced manner.
export_history.delay(store_directory=store_directory, history_id=history.id, job_id=self.job_id, include_hidden=include_hidden, include_deleted=include_deleted)
else:
export_history(store_directory=store_directory, history_id=history.id, job_id=self.job_id, include_hidden=include_hidden, include_deleted=include_deleted)
16 changes: 16 additions & 0 deletions lib/galaxy_test/base/celery_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from functools import wraps


def rebind_container_to_task(app):
import galaxy.app
galaxy.app.app = app
from galaxy.celery import tasks

def magic_bind_dynamic(func):
return wraps(func)(app.magic_partial(func, shared=None))

for task in tasks.CELERY_TASKS:
task_fn = getattr(tasks, task)
task_fn = getattr(task_fn, '__wrapped__', task_fn)
container_bound_task = magic_bind_dynamic(task_fn)
setattr(tasks, task, container_bound_task)
3 changes: 3 additions & 0 deletions lib/galaxy_test/driver/driver_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from galaxy.util.properties import load_app_properties
from galaxy.webapps.galaxy import buildapp
from galaxy_test.base.api_util import get_admin_api_key, get_user_api_key
from galaxy_test.base.celery_helper import rebind_container_to_task
from galaxy_test.base.env import (
DEFAULT_WEB_HOST,
target_url_parts,
Expand Down Expand Up @@ -626,6 +627,8 @@ def build_galaxy_app(simple_kwargs):
)
# Build the Universe Application
app = GalaxyUniverseApplication(**simple_kwargs)
rebind_container_to_task(app)

log.info("Embedded Galaxy application started")

global galaxy_context
Expand Down
2 changes: 2 additions & 0 deletions test/unit/tools/test_history_imp_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,8 @@ def _import_export(app, h, dest_export=None):
dest_export = os.path.join(dest_parent, "moo.tgz")

job = model.Job()
app.model.session.add(job, h)
app.model.session.flush()
jeha = model.JobExportHistoryArchive.create_for_history(
h, job, app.model.context, app.object_store, compressed=True
)
Expand Down
7 changes: 6 additions & 1 deletion test/unit/unittest_utils/galaxy_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from galaxy.util.bunch import Bunch
from galaxy.util.dbkeys import GenomeBuilds
from galaxy.web_stack import ApplicationStack
from galaxy_test.base.celery_helper import rebind_container_to_task


# =============================================================================
Expand Down Expand Up @@ -86,7 +87,9 @@ def __init__(self, config=None, **kwargs):
self.init_datatypes()
self.job_config = Bunch(
dynamic_params=None,
destinations={}
destinations={},
use_messaging=False,
assign_handler=lambda *args, **kwargs: None
)
self.tool_data_tables = {}
self.dataset_collections_service = None
Expand All @@ -99,6 +102,8 @@ def __init__(self, config=None, **kwargs):
self.auth_manager = AuthManager(self.config)
self.user_manager = UserManager(self)
self.execution_timer_factory = Bunch(get_timer=StructuredExecutionTimer)
self.is_job_handler = False
rebind_container_to_task(self)

def url_for(*args, **kwds):
return "/mock/url"
Expand Down

0 comments on commit a277c24

Please sign in to comment.