diff --git a/client/src/components/History/model/JobStateSummary.js b/client/src/components/History/model/JobStateSummary.js index 30033a43d5c2..afb46f784e9f 100644 --- a/client/src/components/History/model/JobStateSummary.js +++ b/client/src/components/History/model/JobStateSummary.js @@ -9,7 +9,7 @@ import { STATES } from "./states"; // Job-state-summary lists -const NON_TERMINAL_STATES = [STATES.NEW, STATES.QUEUED, STATES.RUNNING]; +const NON_TERMINAL_STATES = [STATES.NEW, STATES.WAITING, STATES.QUEUED, STATES.RUNNING]; const ERROR_STATES = [ STATES.ERROR, diff --git a/client/src/components/History/model/states.js b/client/src/components/History/model/states.js index 3b9ba541280f..832a493ef220 100644 --- a/client/src/components/History/model/states.js +++ b/client/src/components/History/model/states.js @@ -11,6 +11,8 @@ export const STATES = { SETTING_METADATA: "setting_metadata", /** was created without a tool */ NEW: "new", + /** job is being created, but not put into job queue yet */ + WAITING: "waiting", /** has no data */ EMPTY: "empty", /** has successfully completed running */ diff --git a/client/src/mvc/history/job-states-model.js b/client/src/mvc/history/job-states-model.js index 4d7c4d04b75b..167650dd3392 100644 --- a/client/src/mvc/history/job-states-model.js +++ b/client/src/mvc/history/job-states-model.js @@ -4,7 +4,7 @@ import AJAX_QUEUE from "utils/ajax-queue"; /** ms between fetches when checking running jobs/datasets for updates */ var UPDATE_DELAY = 2000; -var NON_TERMINAL_STATES = ["new", "queued", "running"]; +var NON_TERMINAL_STATES = ["new", "queued", "running", "waiting"]; var ERROR_STATES = ["error", "deleted"]; var TERMINAL_STATES = ["ok"].concat(ERROR_STATES); /** Fetch state on add or just wait for polling to start. */ diff --git a/lib/galaxy/app.py b/lib/galaxy/app.py index 1549a7a9e820..fa6b7e6b62ad 100644 --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -153,6 +153,12 @@ def __init__(self, **kwargs): self._register_singleton(MinimalManagerApp, self) self.execution_timer_factory = self._register_singleton(ExecutionTimerFactory, ExecutionTimerFactory(self.config)) self.configure_fluent_log() + self.application_stack = self._register_singleton(ApplicationStack, application_stack_instance(app=self)) + # Initialize job metrics manager, needs to be in place before + # config so per-destination modifications can be made. + self.job_metrics = self._register_singleton(JobMetrics, JobMetrics(self.config.job_metrics_config_file, app=self)) + # Initialize the job management configuration + self.job_config = self._register_singleton(jobs.JobConfiguration) # Tag handler self.tag_handler = self._register_singleton(GalaxyTagHandler) @@ -167,6 +173,8 @@ def __init__(self, **kwargs): self.library_folder_manager = self._register_singleton(FolderManager) self.library_manager = self._register_singleton(LibraryManager) self.role_manager = self._register_singleton(RoleManager) + from galaxy.jobs.manager import JobManager + self.job_manager = self._register_singleton(JobManager) # ConfiguredFileSources self.file_sources = self._register_singleton(ConfiguredFileSources, ConfiguredFileSources.from_app_config(self.config)) @@ -209,7 +217,6 @@ def __init__(self, **kwargs) -> None: ] self._register_singleton(StructuredApp, self) # A lot of postfork initialization depends on the server name, ensure it is set immediately after forking before other postfork functions - self.application_stack = self._register_singleton(ApplicationStack, application_stack_instance(app=self)) self.application_stack.register_postfork_function(self.application_stack.set_postfork_server_name, self) self.config.reload_sanitize_allowlist(explicit='sanitize_allowlist_file' in kwargs) self.amqp_internal_connection_obj = galaxy.queues.connection_from_config(self.config) @@ -234,16 +241,9 @@ def __init__(self, **kwargs) -> None: # Data providers registry. self.data_provider_registry = self._register_singleton(DataProviderRegistry) - # Initialize job metrics manager, needs to be in place before - # config so per-destination modifications can be made. - self.job_metrics = self._register_singleton(JobMetrics, JobMetrics(self.config.job_metrics_config_file, app=self)) - # Initialize error report plugins. self.error_reports = self._register_singleton(ErrorReports, ErrorReports(self.config.error_report_file, app=self)) - # Initialize the job management configuration - self.job_config = self._register_singleton(jobs.JobConfiguration) - # Setup a Tool Cache self.tool_cache = self._register_singleton(ToolCache) self.tool_shed_repository_cache = self._register_singleton(ToolShedRepositoryCache) @@ -305,8 +305,6 @@ def __init__(self, **kwargs) -> None: self.config.oidc_config_file, self.config.oidc_backends_config_file) # Start the job manager - from galaxy.jobs import manager - self.job_manager = self._register_singleton(manager.JobManager) self.application_stack.register_postfork_function(self.job_manager.start) self.proxy_manager = ProxyManager(self.config) diff --git a/lib/galaxy/celery/tasks.py b/lib/galaxy/celery/tasks.py index 5f0f02e0a224..878c080f64bf 100644 --- a/lib/galaxy/celery/tasks.py +++ b/lib/galaxy/celery/tasks.py @@ -3,17 +3,21 @@ scoped_session, ) +from galaxy import model +from galaxy.app import MinimalManagerApp from galaxy.celery import celery_app +from galaxy.jobs.manager import JobManager from galaxy.managers.hdas import HDAManager from galaxy.managers.lddas import LDDAManager -from galaxy.model import User from galaxy.util.custom_logging import get_logger from . import get_galaxy_app log = get_logger(__name__) +CELERY_TASKS = [] def galaxy_task(func): + CELERY_TASKS.append(func.__name__) app = get_galaxy_app() if app: return magic_bind_to_container(app)(func) @@ -24,7 +28,7 @@ def galaxy_task(func): @galaxy_task def recalculate_user_disk_usage(session: scoped_session, user_id=None): if user_id: - user = session.query(User).get(user_id) + user = session.query(model.User).get(user_id) if user: user.calculate_and_set_disk_usage() log.info(f"New user disk usage is {user.disk_usage}") @@ -49,3 +53,23 @@ def set_metadata(hda_manager: HDAManager, ldda_manager: LDDAManager, dataset_id, elif model_class == 'LibraryDatasetDatasetAssociation': dataset = ldda_manager.by_id(dataset_id) dataset.datatype.set_meta(dataset) + + +@celery_app.task(ignore_result=True) +@galaxy_task +def export_history( + app: MinimalManagerApp, + sa_session: scoped_session, + job_manager: JobManager, + store_directory, + history_id, + job_id, + include_hidden=False, + include_deleted=False): + history = sa_session.query(model.History).get(history_id) + with model.store.DirectoryModelExportStore(store_directory, app=app, export_files="symlink") as export_store: + export_store.export_history(history, include_hidden=include_hidden, include_deleted=include_deleted) + job = sa_session.query(model.Job).get(job_id) + job.state = model.Job.states.NEW + sa_session.flush() + job_manager.enqueue(job) diff --git a/lib/galaxy/tools/actions/history_imp_exp.py b/lib/galaxy/tools/actions/history_imp_exp.py index 5b881b968206..8e265ae1092c 100644 --- a/lib/galaxy/tools/actions/history_imp_exp.py +++ b/lib/galaxy/tools/actions/history_imp_exp.py @@ -109,7 +109,6 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr if trans.user: # If this is an actual user, run the job as that individual. Otherwise we're running as guest. job.user_id = trans.user.id - start_job_state = job.state # should be job.states.NEW job.state = job.states.WAITING # we need to set job state to something other than NEW, or else when tracking jobs in db it will be picked up before we have added input / output parameters trans.sa_session.add(job) @@ -135,10 +134,10 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr # # Setup job and job wrapper. # - job_wrapper = JobExportHistoryArchiveWrapper(trans.app, job) - cmd_line = job_wrapper.setup_job(history, store_directory, include_hidden=incoming['include_hidden'], - include_deleted=incoming['include_deleted'], - compressed=compressed) + cmd_line = f"--galaxy-version '{job.galaxy_version}'" + if compressed: + cmd_line += " -G" + cmd_line = f"{cmd_line} {store_directory}" # # Add parameters to job_parameter table. @@ -165,11 +164,14 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr for name, value in tool.params_to_strings(incoming, trans.app).items(): job.add_parameter(name, value) - - job.state = start_job_state # job inputs have been configured, restore initial job state - - # Queue the job for execution - trans.app.job_manager.enqueue(job, tool=tool) - trans.log_event("Added export history job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id) + trans.sa_session.flush() + + job_wrapper = JobExportHistoryArchiveWrapper(trans.app, job.id) + job_wrapper.setup_job( + history, + store_directory, + include_hidden=incoming['include_hidden'], + include_deleted=incoming['include_deleted'], + compressed=compressed) return job, {} diff --git a/lib/galaxy/tools/imp_exp/__init__.py b/lib/galaxy/tools/imp_exp/__init__.py index 6de6442dda19..1989a2afc187 100644 --- a/lib/galaxy/tools/imp_exp/__init__.py +++ b/lib/galaxy/tools/imp_exp/__init__.py @@ -6,7 +6,6 @@ from galaxy import model from galaxy.model import store from galaxy.util.path import external_chown -from galaxy.version import VERSION_MAJOR log = logging.getLogger(__name__) @@ -93,23 +92,14 @@ def __init__(self, app, job_id): self.sa_session = self.app.model.context def setup_job(self, history, store_directory, include_hidden=False, include_deleted=False, compressed=True): - """Perform setup for job to export a history into an archive. - - Method generates attribute files for export, sets the corresponding attributes - in the jeha object, and returns a command line for running the job. The command - line includes the command, inputs, and options; it does not include the output - file because it must be set at runtime. + """ + Perform setup for job to export a history into an archive. """ app = self.app - # symlink files on export, on worker files will tarred up in a dereferenced manner. - with store.DirectoryModelExportStore(store_directory, app=app, export_files="symlink") as export_store: - export_store.export_history(history, include_hidden=include_hidden, include_deleted=include_deleted) - - # - # Create and return command line for running tool. - # - options = f"--galaxy-version '{VERSION_MAJOR}'" - if compressed: - options += " -G" - return f"{options} {store_directory}" + from galaxy.celery.tasks import export_history + if app.config.enable_celery_tasks: + # symlink files on export, on worker files will tarred up in a dereferenced manner. + export_history.delay(store_directory=store_directory, history_id=history.id, job_id=self.job_id, include_hidden=include_hidden, include_deleted=include_deleted) + else: + export_history(store_directory=store_directory, history_id=history.id, job_id=self.job_id, include_hidden=include_hidden, include_deleted=include_deleted) diff --git a/lib/galaxy_test/base/celery_helper.py b/lib/galaxy_test/base/celery_helper.py new file mode 100644 index 000000000000..531272bea960 --- /dev/null +++ b/lib/galaxy_test/base/celery_helper.py @@ -0,0 +1,14 @@ +from functools import wraps + + +def rebind_container_to_task(app): + from galaxy.celery import tasks + + def magic_bind_dynamic(func): + return wraps(func)(app.magic_partial(func, shared=None)) + + for task in tasks.CELERY_TASKS: + task_fn = getattr(tasks, task) + task_fn = getattr(task_fn, '__wrapped__', task_fn) + container_bound_task = magic_bind_dynamic(task_fn) + setattr(tasks, task, container_bound_task) diff --git a/lib/galaxy_test/driver/driver_util.py b/lib/galaxy_test/driver/driver_util.py index a7e323264e46..0bf2bba0419e 100644 --- a/lib/galaxy_test/driver/driver_util.py +++ b/lib/galaxy_test/driver/driver_util.py @@ -34,6 +34,7 @@ from galaxy.util.properties import load_app_properties from galaxy.webapps.galaxy import buildapp from galaxy_test.base.api_util import get_admin_api_key, get_user_api_key +from galaxy_test.base.celery_helper import rebind_container_to_task from galaxy_test.base.env import ( DEFAULT_WEB_HOST, target_url_parts, @@ -626,6 +627,8 @@ def build_galaxy_app(simple_kwargs): ) # Build the Universe Application app = GalaxyUniverseApplication(**simple_kwargs) + rebind_container_to_task(app) + log.info("Embedded Galaxy application started") global galaxy_context diff --git a/test/unit/tools/test_history_imp_exp.py b/test/unit/tools/test_history_imp_exp.py index 8f1d34c59085..d32956766f0f 100644 --- a/test/unit/tools/test_history_imp_exp.py +++ b/test/unit/tools/test_history_imp_exp.py @@ -622,6 +622,8 @@ def _import_export(app, h, dest_export=None): dest_export = os.path.join(dest_parent, "moo.tgz") job = model.Job() + app.model.session.add(job, h) + app.model.session.flush() jeha = model.JobExportHistoryArchive.create_for_history( h, job, app.model.context, app.object_store, compressed=True ) diff --git a/test/unit/unittest_utils/galaxy_mock.py b/test/unit/unittest_utils/galaxy_mock.py index 02e44efe61a6..e8e29ee32eb1 100644 --- a/test/unit/unittest_utils/galaxy_mock.py +++ b/test/unit/unittest_utils/galaxy_mock.py @@ -4,6 +4,7 @@ import os import shutil import tempfile +from functools import wraps from sqlalchemy.orm.scoping import scoped_session @@ -14,6 +15,7 @@ quota, ) from galaxy.auth import AuthManager +from galaxy.celery import tasks from galaxy.datatypes import registry from galaxy.jobs.manager import NoopManager from galaxy.managers.users import UserManager @@ -27,6 +29,7 @@ from galaxy.util.bunch import Bunch from galaxy.util.dbkeys import GenomeBuilds from galaxy.web_stack import ApplicationStack +from galaxy_test.base.celery_helper import rebind_container_to_task # ============================================================================= @@ -86,7 +89,9 @@ def __init__(self, config=None, **kwargs): self.init_datatypes() self.job_config = Bunch( dynamic_params=None, - destinations={} + destinations={}, + use_messaging=False, + assign_handler=lambda *args, **kwargs: None ) self.tool_data_tables = {} self.dataset_collections_service = None @@ -99,6 +104,8 @@ def __init__(self, config=None, **kwargs): self.auth_manager = AuthManager(self.config) self.user_manager = UserManager(self) self.execution_timer_factory = Bunch(get_timer=StructuredExecutionTimer) + self.is_job_handler = False + rebind_container_to_task(self) def url_for(*args, **kwds): return "/mock/url"