diff --git a/lib/galaxy/managers/datasets.py b/lib/galaxy/managers/datasets.py index 2590f5189a35..13e523ba4ae0 100644 --- a/lib/galaxy/managers/datasets.py +++ b/lib/galaxy/managers/datasets.py @@ -530,7 +530,7 @@ def set_metadata(self, trans, dataset_assoc, overwrite=False, validate=True): if overwrite: self.overwrite_metadata(data) - job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute( + job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute_via_trans( self.app.datatypes_registry.set_external_metadata_tool, trans, incoming={"input1": data, "validate": validate}, @@ -866,7 +866,7 @@ def deserialize_datatype(self, item, key, val, **context): assert ( trans ), "Logic error in Galaxy, deserialize_datatype not send a transation object" # TODO: restructure this for stronger typing - job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute( + job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute_via_trans( self.app.datatypes_registry.set_external_metadata_tool, trans, incoming={"input1": item}, overwrite=False ) # overwrite is False as per existing behavior trans.app.job_manager.enqueue(job, tool=trans.app.datatypes_registry.set_external_metadata_tool) diff --git a/lib/galaxy/managers/histories.py b/lib/galaxy/managers/histories.py index 9c34509f65bd..732050789ed5 100644 --- a/lib/galaxy/managers/histories.py +++ b/lib/galaxy/managers/histories.py @@ -418,7 +418,7 @@ def queue_history_export( # Run job to do export. history_exp_tool = trans.app.toolbox.get_tool(export_tool_id) - job, *_ = history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True) + job, *_ = history_exp_tool.execute(trans, incoming=params, history=history) trans.app.job_manager.enqueue(job, tool=history_exp_tool) return job diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index 18bd68c369d9..0f5173b42c49 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -51,6 +51,7 @@ StoredWorkflow, ) from galaxy.model.base import transaction +from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.tool_shed.util.repository_util import get_installed_repository from galaxy.tool_shed.util.shed_util_common import set_image_paths from galaxy.tool_util.deps import ( @@ -113,6 +114,7 @@ from galaxy.tools.actions.model_operations import ModelOperationToolAction from galaxy.tools.cache import ToolDocumentCache from galaxy.tools.evaluation import global_tool_errors +from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.tools.imp_exp import JobImportHistoryArchiveWrapper from galaxy.tools.parameters import ( check_param, @@ -183,8 +185,18 @@ from galaxy.version import VERSION_MAJOR from galaxy.work.context import proxy_work_context_for_history from .execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_SET_OUTPUT_HID, + DEFAULT_USE_CACHED_JOB, execute as execute_job, + ExecutionSlice, + JobCallbackT, MappingParameters, + ToolParameterRequestInstanceT, + ToolParameterRequestT, ) if TYPE_CHECKING: @@ -1862,11 +1874,11 @@ def expand_incoming(self, trans, incoming, request_context, input_format="legacy def handle_input( self, trans, - incoming, - history=None, - use_cached_job=False, - preferred_object_store_id: Optional[str] = None, - input_format="legacy", + incoming: ToolParameterRequestT, + history: Optional[model.History] = None, + use_cached_job: bool = DEFAULT_USE_CACHED_JOB, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + input_format: str = "legacy", ): """ Process incoming parameters for this tool from the dict `incoming`, @@ -1942,23 +1954,23 @@ def handle_incoming_errors(self, all_errors): def handle_single_execution( self, trans, - rerun_remap_job_id, - execution_slice, - history, - execution_cache=None, - completed_job=None, - collection_info=None, - job_callback=None, - preferred_object_store_id=None, - flush_job=True, - skip=False, + rerun_remap_job_id: Optional[int], + execution_slice: ExecutionSlice, + history: model.History, + execution_cache: ToolExecutionCache, + completed_job: Optional[model.Job], + collection_info: Optional[MatchingCollections], + job_callback: Optional[JobCallbackT], + preferred_object_store_id: Optional[str], + flush_job: bool, + skip: bool, ): """ Return a pair with whether execution is successful as well as either resulting output data or an error message indicating the problem. """ try: - rval = self.execute( + rval = self._execute( trans, incoming=execution_slice.param_combination, history=history, @@ -2045,18 +2057,67 @@ def get_static_param_values(self, trans): args[key] = param.get_initial_value(trans, None) return args - def execute(self, trans, incoming=None, set_output_hid=True, history=None, **kwargs): + def execute( + self, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[model.History] = None, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + ): """ Execute the tool using parameter values in `incoming`. This just dispatches to the `ToolAction` instance specified by `self.tool_action`. In general this will create a `Job` that when run will build the tool's outputs, e.g. `DefaultToolAction`. + + _execute has many more options but should be accessed through + handle_single_execution. The public interface to execute should be + rarely used and in more specific ways. """ + return self._execute( + trans, + incoming=incoming, + history=history, + set_output_hid=set_output_hid, + flush_job=flush_job, + ) + + def _execute( + self, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[model.History] = None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = None, + completed_job: Optional[model.Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + skip: bool = False, + ): if incoming is None: incoming = {} try: return self.tool_action.execute( - self, trans, incoming=incoming, set_output_hid=set_output_hid, history=history, **kwargs + self, + trans, + incoming=incoming, + history=history, + job_params=None, + rerun_remap_job_id=rerun_remap_job_id, + execution_cache=execution_cache, + dataset_collection_elements=dataset_collection_elements, + completed_job=completed_job, + collection_info=collection_info, + job_callback=job_callback, + preferred_object_store_id=preferred_object_store_id, + set_output_hid=set_output_hid, + flush_job=flush_job, + skip=skip, ) except exceptions.ToolExecutionError as exc: job = exc.job @@ -2988,7 +3049,9 @@ class SetMetadataTool(Tool): requires_setting_metadata = False tool_action: "SetMetadataToolAction" - def regenerate_imported_metadata_if_needed(self, hda, history, user, session_id): + def regenerate_imported_metadata_if_needed( + self, hda: model.HistoryDatasetAssociation, history: model.History, user: model.User, session_id: int + ): if hda.has_metadata_files: job, *_ = self.tool_action.execute_via_app( self, diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index 1a16ed58af3c..989ab5a6ad45 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -9,7 +9,9 @@ cast, Dict, List, + Optional, Set, + Tuple, TYPE_CHECKING, Union, ) @@ -24,6 +26,7 @@ from galaxy.job_execution.actions.post import ActionBox from galaxy.managers.context import ProvidesHistoryContext from galaxy.model import ( + History, HistoryDatasetAssociation, Job, LibraryDatasetDatasetAssociation, @@ -31,8 +34,24 @@ ) from galaxy.model.base import transaction from galaxy.model.dataset_collections.builder import CollectionBuilder +from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.model.none_like import NoneDataset from galaxy.objectstore import ObjectStorePopulator +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_SET_OUTPUT_HID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ( + filter_output, + on_text_for_names, + ToolExecutionCache, +) from galaxy.tools.parameters import update_dataset_ids from galaxy.tools.parameters.basic import ( DataCollectionToolParameter, @@ -54,32 +73,8 @@ log = logging.getLogger(__name__) -class ToolExecutionCache: - """An object mean to cache calculation caused by repeatedly evaluting - the same tool by the same user with slightly different parameters. - """ - - def __init__(self, trans): - self.trans = trans - self.current_user_roles = trans.get_current_user_roles() - self.chrom_info = {} - self.cached_collection_elements = {} - - def get_chrom_info(self, tool_id, input_dbkey): - genome_builds = self.trans.app.genome_builds - custom_build_hack_get_len_from_fasta_conversion = tool_id != "CONVERTER_fasta_to_len" - if custom_build_hack_get_len_from_fasta_conversion and input_dbkey in self.chrom_info: - return self.chrom_info[input_dbkey] - - chrom_info_pair = genome_builds.get_chrom_info( - input_dbkey, - trans=self.trans, - custom_build_hack_get_len_from_fasta_conversion=custom_build_hack_get_len_from_fasta_conversion, - ) - if custom_build_hack_get_len_from_fasta_conversion: - self.chrom_info[input_dbkey] = chrom_info_pair - - return chrom_info_pair +OutputDatasetsT = Dict[str, "DatasetInstance"] +ToolActionExecuteResult = Union[Tuple[Job, OutputDatasetsT, Optional[History]], Tuple[Job, OutputDatasetsT]] class ToolAction: @@ -89,14 +84,31 @@ class ToolAction: """ @abstractmethod - def execute(self, tool, trans, incoming=None, set_output_hid=True, **kwargs): - pass + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: + """Perform target tool action.""" class DefaultToolAction(ToolAction): """Default tool action is to run an external command""" - produces_real_jobs = True + produces_real_jobs: bool = True def _collect_input_datasets( self, @@ -389,21 +401,20 @@ def execute( self, tool, trans, - incoming=None, - return_job=False, - set_output_hid=True, - history=None, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, job_params=None, - rerun_remap_job_id=None, - execution_cache=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, dataset_collection_elements=None, - completed_job=None, - collection_info=None, - job_callback=None, - preferred_object_store_id=None, - flush_job=True, - skip=False, - ): + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: """ Executes a tool, creating job and tool outputs, associating them, and submitting the job to the job queue. If history is not specified, use @@ -424,6 +435,7 @@ def execute( preserved_hdca_tags, all_permissions, ) = self._collect_inputs(tool, trans, incoming, history, current_user_roles, collection_info) + assert history # tell type system we've set history and it is no longer optional # Build name for output datasets based on tool name and input names on_text = self._get_on_text(inp_data) @@ -846,7 +858,7 @@ def _get_on_text(self, inp_data): return on_text_for_names(input_names) - def _new_job_for_session(self, trans, tool, history): + def _new_job_for_session(self, trans, tool, history) -> Tuple[model.Job, Optional[model.GalaxySession]]: job = trans.app.model.Job() job.galaxy_version = trans.app.config.version_major galaxy_session = None @@ -1097,40 +1109,6 @@ def check_elements(elements): self.out_collection_instances[name] = hdca -def on_text_for_names(input_names): - # input_names may contain duplicates... this is because the first value in - # multiple input dataset parameters will appear twice once as param_name - # and once as param_name1. - unique_names = [] - for name in input_names: - if name not in unique_names: - unique_names.append(name) - input_names = unique_names - - # Build name for output datasets based on tool name and input names - if len(input_names) == 0: - on_text = "" - elif len(input_names) == 1: - on_text = input_names[0] - elif len(input_names) == 2: - on_text = "{} and {}".format(*input_names) - elif len(input_names) == 3: - on_text = "{}, {}, and {}".format(*input_names) - else: - on_text = "{}, {}, and others".format(*input_names[:2]) - return on_text - - -def filter_output(tool, output, incoming): - for filter in output.filters: - try: - if not eval(filter.text.strip(), globals(), incoming): - return True # do not create this dataset - except Exception as e: - log.debug(f"Tool {tool.id} output {output.name}: dataset output filter ({filter.text}) failed: {e}") - return False - - def get_ext_or_implicit_ext(hda): if hda.implicitly_converted_parent_datasets: # implicitly_converted_parent_datasets is a list of ImplicitlyConvertedDatasetAssociation diff --git a/lib/galaxy/tools/actions/data_manager.py b/lib/galaxy/tools/actions/data_manager.py index ce7d69e1048c..c24e86fd0afb 100644 --- a/lib/galaxy/tools/actions/data_manager.py +++ b/lib/galaxy/tools/actions/data_manager.py @@ -1,7 +1,27 @@ import logging +from typing import Optional +from galaxy.model import ( + History, + Job, +) from galaxy.model.base import transaction -from . import DefaultToolAction +from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_SET_OUTPUT_HID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache +from . import ( + DefaultToolAction, + ToolActionExecuteResult, +) log = logging.getLogger(__name__) @@ -9,8 +29,41 @@ class DataManagerToolAction(DefaultToolAction): """Tool action used for Data Manager Tools""" - def execute(self, tool, trans, **kwds): - rval = super().execute(tool, trans, **kwds) + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: + rval = super().execute( + tool, + trans, + incoming=incoming, + history=history, + job_params=job_params, + rerun_remap_job_id=rerun_remap_job_id, + execution_cache=execution_cache, + dataset_collection_elements=dataset_collection_elements, + completed_job=completed_job, + collection_info=collection_info, + job_callback=job_callback, + preferred_object_store_id=preferred_object_store_id, + set_output_hid=set_output_hid, + flush_job=flush_job, + skip=skip, + ) if isinstance(rval, tuple) and len(rval) >= 2 and isinstance(rval[0], trans.app.model.Job): assoc = trans.app.model.DataManagerJobAssociation(job=rval[0], data_manager_id=tool.data_manager_id) trans.sa_session.add(assoc) diff --git a/lib/galaxy/tools/actions/history_imp_exp.py b/lib/galaxy/tools/actions/history_imp_exp.py index d98764c405fd..848995c61dac 100644 --- a/lib/galaxy/tools/actions/history_imp_exp.py +++ b/lib/galaxy/tools/actions/history_imp_exp.py @@ -2,10 +2,30 @@ import logging import os import tempfile +from typing import Optional from galaxy.job_execution.setup import create_working_directory_for_job +from galaxy.model import ( + History, + Job, +) from galaxy.model.base import transaction -from galaxy.tools.actions import ToolAction +from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools.actions import ( + ToolAction, + ToolActionExecuteResult, +) +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_SET_OUTPUT_HID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.tools.imp_exp import ( JobExportHistoryArchiveWrapper, JobImportHistoryArchiveWrapper, @@ -18,9 +38,26 @@ class ImportHistoryToolAction(ToolAction): """Tool action used for importing a history to an archive.""" - produces_real_jobs = True - - def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, **kwargs): + produces_real_jobs: bool = True + + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: # # Create job. # @@ -78,9 +115,26 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr class ExportHistoryToolAction(ToolAction): """Tool action used for exporting a history to an archive.""" - produces_real_jobs = True - - def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, **kwargs): + produces_real_jobs: bool = True + + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: trans.check_user_activation() # # Get history to export. diff --git a/lib/galaxy/tools/actions/metadata.py b/lib/galaxy/tools/actions/metadata.py index 04db0ab30e17..f7d6ce844a9d 100644 --- a/lib/galaxy/tools/actions/metadata.py +++ b/lib/galaxy/tools/actions/metadata.py @@ -1,10 +1,32 @@ import logging import os from json import dumps +from typing import ( + Any, + Dict, + Optional, +) from galaxy.job_execution.datasets import DatasetPath from galaxy.metadata import get_metadata_compute_strategy +from galaxy.model import ( + History, + Job, + User, +) from galaxy.model.base import transaction +from galaxy.model.dataset_collections.matching import MatchingCollections +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_SET_OUTPUT_HID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.util import asbool from . import ToolAction @@ -14,47 +36,80 @@ class SetMetadataToolAction(ToolAction): """Tool action used for setting external metadata on an existing dataset""" - produces_real_jobs = False + produces_real_jobs: bool = False + set_output_hid: bool = False def execute( - self, tool, trans, incoming=None, set_output_hid=False, overwrite=True, history=None, job_params=None, **kwargs + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + skip: bool = False, ): """ Execute using a web transaction. """ + overwrite = True + job, odict = self.execute_via_trans( + tool, + trans, + incoming, + overwrite, + history, + job_params, + ) + # FIXME: can remove this when logging in execute_via_app method. + trans.log_event(f"Added set external metadata job to the job queue, id: {str(job.id)}", tool_id=job.tool_id) + return job, odict + + def execute_via_trans( + self, + tool, + trans, + incoming: Optional[Dict[str, Any]], + overwrite: bool = True, + history: Optional[History] = None, + job_params: Optional[Dict[str, Any]] = None, + ): trans.check_user_activation() session = trans.get_galaxy_session() session_id = session and session.id history_id = trans.history and trans.history.id incoming = incoming or {} - job, odict = self.execute_via_app( + return self.execute_via_app( tool, trans.app, session_id, history_id, trans.user, incoming, - set_output_hid, overwrite, history, job_params, ) - # FIXME: can remove this when logging in execute_via_app method. - trans.log_event(f"Added set external metadata job to the job queue, id: {str(job.id)}", tool_id=job.tool_id) - return job, odict def execute_via_app( self, tool, app, - session_id, - history_id, - user=None, - incoming=None, - set_output_hid=False, - overwrite=True, - history=None, - job_params=None, + session_id: Optional[int], + history_id: Optional[int], + user: Optional[User] = None, + incoming: Optional[Dict[str, Any]] = None, + overwrite: bool = True, + history: Optional[History] = None, + job_params: Optional[Dict[str, Any]] = None, ): """ Execute using application. diff --git a/lib/galaxy/tools/actions/model_operations.py b/lib/galaxy/tools/actions/model_operations.py index bedba42d4aa3..1b18adcf39f6 100644 --- a/lib/galaxy/tools/actions/model_operations.py +++ b/lib/galaxy/tools/actions/model_operations.py @@ -1,12 +1,32 @@ import logging -from typing import TYPE_CHECKING +from typing import ( + Optional, + TYPE_CHECKING, +) +from galaxy.model import ( + History, + Job, +) +from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.objectstore import ObjectStorePopulator from galaxy.tools.actions import ( DefaultToolAction, OutputCollections, - ToolExecutionCache, + OutputDatasetsT, + ToolActionExecuteResult, ) +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_SET_OUTPUT_HID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache if TYPE_CHECKING: from galaxy.managers.context import ProvidesUserContext @@ -15,7 +35,7 @@ class ModelOperationToolAction(DefaultToolAction): - produces_real_jobs = False + produces_real_jobs: bool = False def check_inputs_ready(self, tool, trans, incoming, history, execution_cache=None, collection_info=None): if execution_cache is None: @@ -32,17 +52,20 @@ def execute( self, tool, trans, - incoming=None, - set_output_hid=False, - overwrite=True, - history=None, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, job_params=None, - execution_cache=None, - collection_info=None, - job_callback=None, - skip=False, - **kwargs, - ): + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: incoming = incoming or {} trans.check_user_activation() @@ -65,7 +88,7 @@ def execute( # wrapped params are used by change_format action and by output.label; only perform this wrapping once, as needed wrapped_params = self._wrapped_params(trans, tool, incoming) - out_data = {} + out_data: OutputDatasetsT = {} input_collections = {k: v[0][0] for k, v in inp_dataset_collections.items()} output_collections = OutputCollections( trans, @@ -73,7 +96,7 @@ def execute( tool=tool, tool_action=self, input_collections=input_collections, - dataset_collection_elements=kwargs.get("dataset_collection_elements", None), + dataset_collection_elements=dataset_collection_elements, on_text=on_text, incoming=incoming, params=wrapped_params.params, diff --git a/lib/galaxy/tools/actions/upload.py b/lib/galaxy/tools/actions/upload.py index b38a85419513..b85bba71a0d2 100644 --- a/lib/galaxy/tools/actions/upload.py +++ b/lib/galaxy/tools/actions/upload.py @@ -1,14 +1,34 @@ import json import logging import os +from typing import Optional from galaxy.exceptions import RequestParameterMissingException +from galaxy.model import ( + History, + Job, +) from galaxy.model.base import transaction +from galaxy.model.dataset_collections.matching import MatchingCollections from galaxy.model.dataset_collections.structure import UninitializedTree from galaxy.tools.actions import upload_common +from galaxy.tools.execute import ( + DatasetCollectionElementsSliceT, + DEFAULT_DATASET_COLLECTION_ELEMENTS, + DEFAULT_JOB_CALLBACK, + DEFAULT_PREFERRED_OBJECT_STORE_ID, + DEFAULT_RERUN_REMAP_JOB_ID, + DEFAULT_SET_OUTPUT_HID, + JobCallbackT, + ToolParameterRequestInstanceT, +) +from galaxy.tools.execution_helpers import ToolExecutionCache from galaxy.util import ExecutionTimer from galaxy.util.bunch import Bunch -from . import ToolAction +from . import ( + ToolAction, + ToolActionExecuteResult, +) log = logging.getLogger(__name__) @@ -16,7 +36,24 @@ class BaseUploadToolAction(ToolAction): produces_real_jobs = True - def execute(self, tool, trans, incoming=None, history=None, **kwargs): + def execute( + self, + tool, + trans, + incoming: Optional[ToolParameterRequestInstanceT] = None, + history: Optional[History] = None, + job_params=None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + execution_cache: Optional[ToolExecutionCache] = None, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + completed_job: Optional[Job] = None, + collection_info: Optional[MatchingCollections] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, + set_output_hid: bool = DEFAULT_SET_OUTPUT_HID, + flush_job: bool = True, + skip: bool = False, + ) -> ToolActionExecuteResult: trans.check_user_activation() incoming = incoming or {} dataset_upload_inputs = [] diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index cca565294428..121c3d311602 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -29,7 +29,7 @@ tool_output_to_structure, ) from galaxy.tool_util.parser import ToolOutputCollectionPart -from galaxy.tools.actions import ( +from galaxy.tools.execution_helpers import ( filter_output, on_text_for_names, ToolExecutionCache, @@ -45,14 +45,30 @@ BATCH_EXECUTION_MESSAGE = "Created ${job_count} job(s) for tool ${tool_id} request" +CompletedJobsT = Dict[int, Optional[model.Job]] +JobCallbackT = Callable +WorkflowResourceParametersT = Dict[str, Any] +# Input dictionary from the API, may include map/reduce instructions +ToolParameterRequestT = Dict[str, Any] +# Input dictionary extracted from a tool request for running a tool individually +ToolParameterRequestInstanceT = Dict[str, Any] +DatasetCollectionElementsSliceT = Dict[str, model.DatasetCollectionElement] +DEFAULT_USE_CACHED_JOB = False +DEFAULT_PREFERRED_OBJECT_STORE_ID: Optional[str] = None +DEFAULT_RERUN_REMAP_JOB_ID: Optional[int] = None +DEFAULT_JOB_CALLBACK: Optional[JobCallbackT] = None +DEFAULT_DATASET_COLLECTION_ELEMENTS: Optional[DatasetCollectionElementsSliceT] = None +DEFAULT_SET_OUTPUT_HID: bool = True + + class PartialJobExecution(Exception): - def __init__(self, execution_tracker): + def __init__(self, execution_tracker: "ExecutionTracker"): self.execution_tracker = execution_tracker class MappingParameters(NamedTuple): - param_template: Dict[str, Any] - param_combinations: List[Dict[str, Any]] + param_template: ToolParameterRequestT + param_combinations: List[ToolParameterRequestInstanceT] def execute( @@ -60,15 +76,15 @@ def execute( tool: "Tool", mapping_params: MappingParameters, history: model.History, - rerun_remap_job_id: Optional[int] = None, - preferred_object_store_id: Optional[str] = None, + rerun_remap_job_id: Optional[int] = DEFAULT_RERUN_REMAP_JOB_ID, + preferred_object_store_id: Optional[str] = DEFAULT_PREFERRED_OBJECT_STORE_ID, collection_info: Optional[MatchingCollections] = None, workflow_invocation_uuid: Optional[str] = None, invocation_step: Optional[model.WorkflowInvocationStep] = None, max_num_jobs: Optional[int] = None, - job_callback: Optional[Callable] = None, - completed_jobs: Optional[Dict[int, Optional[model.Job]]] = None, - workflow_resource_parameters: Optional[Dict[str, Any]] = None, + job_callback: Optional[JobCallbackT] = DEFAULT_JOB_CALLBACK, + completed_jobs: Optional[CompletedJobsT] = None, + workflow_resource_parameters: Optional[WorkflowResourceParametersT] = None, validate_outputs: bool = False, ): """ @@ -95,7 +111,7 @@ def execute( ) execution_cache = ToolExecutionCache(trans) - def execute_single_job(execution_slice, completed_job, skip=False): + def execute_single_job(execution_slice: "ExecutionSlice", completed_job: Optional[model.Job], skip: bool = False): job_timer = tool.app.execution_timer_factory.get_timer( "internals.galaxy.tools.execute.job_single", SINGLE_EXECUTION_SUCCESS_MESSAGE ) @@ -124,8 +140,8 @@ def execute_single_job(execution_slice, completed_job, skip=False): execution_cache, completed_job, collection_info, - job_callback=job_callback, - preferred_object_store_id=preferred_object_store_id, + job_callback, + preferred_object_store_id, flush_job=False, skip=skip, ) @@ -225,7 +241,17 @@ def execute_single_job(execution_slice, completed_job, skip=False): class ExecutionSlice: - def __init__(self, job_index, param_combination, dataset_collection_elements=None): + job_index: int + param_combination: ToolParameterRequestInstanceT + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] + history: Optional[model.History] + + def __init__( + self, + job_index: int, + param_combination: ToolParameterRequestInstanceT, + dataset_collection_elements: Optional[DatasetCollectionElementsSliceT] = DEFAULT_DATASET_COLLECTION_ELEMENTS, + ): self.job_index = job_index self.param_combination = param_combination self.dataset_collection_elements = dataset_collection_elements diff --git a/lib/galaxy/tools/execution_helpers.py b/lib/galaxy/tools/execution_helpers.py new file mode 100644 index 000000000000..66ae3c853681 --- /dev/null +++ b/lib/galaxy/tools/execution_helpers.py @@ -0,0 +1,71 @@ +"""Helpers meant to assist tool execution. + +Lower-level things that prevent interwoven dependencies between tool code, +tool execution code, and tool action code. +""" + +import logging + +log = logging.getLogger(__name__) + + +class ToolExecutionCache: + """An object meant to cache calculation caused by repeatedly evaluting + the same tool by the same user with slightly different parameters. + """ + + def __init__(self, trans): + self.trans = trans + self.current_user_roles = trans.get_current_user_roles() + self.chrom_info = {} + self.cached_collection_elements = {} + + def get_chrom_info(self, tool_id, input_dbkey): + genome_builds = self.trans.app.genome_builds + custom_build_hack_get_len_from_fasta_conversion = tool_id != "CONVERTER_fasta_to_len" + if custom_build_hack_get_len_from_fasta_conversion and input_dbkey in self.chrom_info: + return self.chrom_info[input_dbkey] + + chrom_info_pair = genome_builds.get_chrom_info( + input_dbkey, + trans=self.trans, + custom_build_hack_get_len_from_fasta_conversion=custom_build_hack_get_len_from_fasta_conversion, + ) + if custom_build_hack_get_len_from_fasta_conversion: + self.chrom_info[input_dbkey] = chrom_info_pair + + return chrom_info_pair + + +def filter_output(tool, output, incoming): + for filter in output.filters: + try: + if not eval(filter.text.strip(), globals(), incoming): + return True # do not create this dataset + except Exception as e: + log.debug(f"Tool {tool.id} output {output.name}: dataset output filter ({filter.text}) failed: {e}") + return False + + +def on_text_for_names(input_names): + # input_names may contain duplicates... this is because the first value in + # multiple input dataset parameters will appear twice once as param_name + # and once as param_name1. + unique_names = [] + for name in input_names: + if name not in unique_names: + unique_names.append(name) + input_names = unique_names + + # Build name for output datasets based on tool name and input names + if len(input_names) == 0: + on_text = "" + elif len(input_names) == 1: + on_text = input_names[0] + elif len(input_names) == 2: + on_text = "{} and {}".format(*input_names) + elif len(input_names) == 3: + on_text = "{}, {}, and {}".format(*input_names) + else: + on_text = "{}, {}, and others".format(*input_names[:2]) + return on_text diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index d0eb5025d5c4..eeece34fe554 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -52,12 +52,12 @@ DefaultToolState, get_safe_version, ) -from galaxy.tools.actions import filter_output from galaxy.tools.execute import ( execute, MappingParameters, PartialJobExecution, ) +from galaxy.tools.execution_helpers import filter_output from galaxy.tools.expressions import do_eval from galaxy.tools.parameters import ( check_param, diff --git a/lib/galaxy_test/api/test_tools.py b/lib/galaxy_test/api/test_tools.py index 4b7ba5244232..efd2266ffb5a 100644 --- a/lib/galaxy_test/api/test_tools.py +++ b/lib/galaxy_test/api/test_tools.py @@ -2512,6 +2512,8 @@ def _run_implicit_collection_and_reduce(self, history_id, param): ], wait=True, ) + details = self.dataset_populator.get_history_dataset_details(history_id, hid=2) + assert details["extension"] == "fasta" self._assert_status_code_is(response, 200) hdca_id = response.json()["outputs"][0]["id"] inputs = { diff --git a/test/unit/app/tools/test_actions.py b/test/unit/app/tools/test_actions.py index 1e6b69528ac7..36056fbc61d9 100644 --- a/test/unit/app/tools/test_actions.py +++ b/test/unit/app/tools/test_actions.py @@ -14,8 +14,8 @@ from galaxy.tools.actions import ( DefaultToolAction, determine_output_format, - on_text_for_names, ) +from galaxy.tools.execution_helpers import on_text_for_names from galaxy.util import XML from galaxy.util.unittest import TestCase @@ -140,7 +140,7 @@ def _simple_execute(self, contents=None, incoming=None): if incoming is None: incoming = dict(param1="moo") self._init_tool(contents) - job, out_data, _ = self.action.execute( + job, out_data, *_ = self.action.execute( tool=self.tool, trans=self.trans, history=self.history,