diff --git a/lib/galaxy/dependencies/pinned-requirements.txt b/lib/galaxy/dependencies/pinned-requirements.txt index e22a77086cc0..12044bca589d 100644 --- a/lib/galaxy/dependencies/pinned-requirements.txt +++ b/lib/galaxy/dependencies/pinned-requirements.txt @@ -7,7 +7,6 @@ aiofiles==23.1.0 ; python_version >= "3.7" and python_version < "3.12" aiohttp==3.8.4 ; python_version >= "3.7" and python_version < "3.12" aioitertools==0.11.0 ; python_version >= "3.7" and python_version < "3.12" aiosignal==1.3.1 ; python_version >= "3.7" and python_version < "3.12" -alembic-utils==0.8.1 ; python_version >= "3.7" and python_version < "3.12" alembic==1.10.3 ; python_version >= "3.7" and python_version < "3.12" amqp==5.1.1 ; python_version >= "3.7" and python_version < "3.12" anyio==3.6.2 ; python_version >= "3.7" and python_version < "3.12" diff --git a/lib/galaxy/managers/history_contents.py b/lib/galaxy/managers/history_contents.py index 068020b63d5c..100e1dece028 100644 --- a/lib/galaxy/managers/history_contents.py +++ b/lib/galaxy/managers/history_contents.py @@ -457,14 +457,6 @@ def _subcontainer_id_map(self, id_list, serialization_params=None): .options(joinedload(component_class.tags)) .options(joinedload(component_class.annotations)) ) - - # This will conditionally join a potentially costly job_state summary - # All the paranoia if-checking makes me wonder if serialization_params - # should really be a property of the manager class instance - if serialization_params and serialization_params.keys: - if "job_state_summary" in serialization_params.keys: - query = query.options(joinedload(component_class.job_state_summary)) - return {row.id: row for row in query.all()} diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 26e7773e137a..b61998b55142 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -56,7 +56,9 @@ BigInteger, bindparam, Boolean, + case, Column, + column, DateTime, desc, event, @@ -126,13 +128,13 @@ ) from galaxy.model.orm.now import now from galaxy.model.orm.util import add_object_to_object_session -from galaxy.model.view import HistoryDatasetCollectionJobStateSummary from galaxy.objectstore import ObjectStore from galaxy.security import get_permitted_actions from galaxy.security.idencoding import IdEncodingHelper from galaxy.security.validate_user_input import validate_password_str from galaxy.util import ( directory_hash_id, + enum_values, listify, ready_name_for_url, unicodify, @@ -6438,15 +6440,6 @@ class HistoryDatasetCollectionAssociation( back_populates="history_dataset_collection_associations", uselist=False, ) - job_state_summary = relationship( - HistoryDatasetCollectionJobStateSummary, - primaryjoin=( - lambda: HistoryDatasetCollectionAssociation.id - == HistoryDatasetCollectionJobStateSummary.__table__.c.hdca_id - ), - foreign_keys=HistoryDatasetCollectionJobStateSummary.__table__.c.hdca_id, - uselist=False, - ) tags = relationship( "HistoryDatasetCollectionTagAssociation", order_by=lambda: HistoryDatasetCollectionTagAssociation.id, @@ -6466,6 +6459,7 @@ class HistoryDatasetCollectionAssociation( dict_dbkeysandextensions_visible_keys = ["dbkeys", "extensions"] editable_keys = ("name", "deleted", "visible") + _job_state_summary = None def __init__(self, deleted=False, visible=True, **kwd): super().__init__(**kwd) @@ -6500,13 +6494,75 @@ def job_source_type(self): else: return None + @property + def job_state_summary(self): + """ + Aggregate counts of jobs by state, stored in a JobStateSummary object. + """ + if not self._job_state_summary: + self._job_state_summary = self._get_job_state_summary() + # if summary exists, but there are no jobs, load zeroes for all other counts (otherwise they will be None) + if self._job_state_summary and self._job_state_summary.all_jobs == 0: + zeroes = [0] * (len(Job.states) + 1) + self._job_state_summary = JobStateSummary._make(zeroes) + + return self._job_state_summary + + def _get_job_state_summary(self): + def build_statement(): + state_label = "state" # used to generate `SELECT job.state AS state`, and then refer to it in aggregates. + + # Select job states joining on icjja > icj > hdca + # (We are selecting Job.id in addition to Job.state because otherwise the UNION operation + # will get rid of duplicates, making aggregates meaningless.) + subq1 = ( + select(Job.id, Job.state.label(state_label)) + .join(ImplicitCollectionJobsJobAssociation, ImplicitCollectionJobsJobAssociation.job_id == Job.id) + .join( + ImplicitCollectionJobs, + ImplicitCollectionJobs.id == ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id, + ) + .join( + HistoryDatasetCollectionAssociation, + HistoryDatasetCollectionAssociation.implicit_collection_jobs_id == ImplicitCollectionJobs.id, + ) + .where(HistoryDatasetCollectionAssociation.id == self.id) + ) + + # Select job states joining on hdca + subq2 = ( + select(Job.id, Job.state.label(state_label)) + .join(HistoryDatasetCollectionAssociation, HistoryDatasetCollectionAssociation.job_id == Job.id) + .where(HistoryDatasetCollectionAssociation.id == self.id) + ) + + # Combine subqueries + subq = subq1.union(subq2) + + # Build and return final query + stm = select().select_from(subq) + # Add aggregate columns for each job state + for state in enum_values(Job.states): + col = func.sum(case((column(state_label) == state, 1), else_=0)).label(state) + stm = stm.add_columns(col) + # Add aggregate column for all jobs + col = func.count("*").label("all_jobs") + stm = stm.add_columns(col) + return stm + + if not object_session(self): + return None # no session means object is not persistant; therefore, it has no associated jobs. + + engine = object_session(self).bind + with engine.connect() as conn: + counts = conn.execute(build_statement()).one() + assert len(counts) == len(Job.states) + 1 # Verify all job states + all jobs are counted + return JobStateSummary._make(counts) + @property def job_state_summary_dict(self): if self.job_state_summary: - states = self.job_state_summary.__dict__.copy() - del states["_sa_instance_state"] - del states["hdca_id"] - return states + return self.job_state_summary._asdict() @property def dataset_dbkeys_and_extensions_summary(self): @@ -10555,3 +10611,6 @@ def receive_init(target, args, kwargs): if obj: add_object_to_object_session(target, obj) return # Once is enough. + + +JobStateSummary = NamedTuple("JobStateSummary", [(value, int) for value in enum_values(Job.states)] + [("all_jobs", int)]) # type: ignore[misc] # Ref https://github.com/python/mypy/issues/848#issuecomment-255237167 diff --git a/lib/galaxy/model/mapping.py b/lib/galaxy/model/mapping.py index 2f0b8752d39f..48c718388766 100644 --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -12,7 +12,6 @@ from galaxy.model.orm.engine_factory import build_engine from galaxy.model.security import GalaxyRBACAgent from galaxy.model.triggers.update_audit_table import install as install_timestamp_triggers -from galaxy.model.view.utils import install_views log = logging.getLogger(__name__) @@ -66,7 +65,6 @@ def init( def create_additional_database_objects(engine): install_timestamp_triggers(engine) - install_views(engine) def configure_model_mapping( diff --git a/lib/galaxy/model/view/__init__.py b/lib/galaxy/model/migrations/alembic/versions_gxy/3356bc2ecfc4_drop_view.py similarity index 57% rename from lib/galaxy/model/view/__init__.py rename to lib/galaxy/model/migrations/alembic/versions_gxy/3356bc2ecfc4_drop_view.py index 04fd7d648709..c5d8772baf41 100644 --- a/lib/galaxy/model/view/__init__.py +++ b/lib/galaxy/model/migrations/alembic/versions_gxy/3356bc2ecfc4_drop_view.py @@ -1,20 +1,21 @@ -""" -Galaxy sql view models -""" -from sqlalchemy import Integer -from sqlalchemy.orm import registry -from sqlalchemy.sql import ( - column, - text, -) +"""drop view -from galaxy.model.view.utils import View +Revision ID: 3356bc2ecfc4 +Revises: 460d0ecd1dd8 +Create Date: 2023-04-10 15:06:01.037416 +""" +from alembic import op -class HistoryDatasetCollectionJobStateSummary(View): - name = "collection_job_state_summary_view" +# revision identifiers, used by Alembic. +revision = "3356bc2ecfc4" +down_revision = "460d0ecd1dd8" +branch_labels = None +depends_on = None - aggregate_state_query = """ + +view_name = "collection_job_state_summary_view" +aggregate_query = """ SELECT hdca_id, SUM(CASE WHEN state = 'new' THEN 1 ELSE 0 END) AS new, @@ -51,28 +52,12 @@ class HistoryDatasetCollectionJobStateSummary(View): GROUP BY jobstates.hdca_id """ - __view__ = text(aggregate_state_query).columns( - column("hdca_id", Integer), - column("new", Integer), - column("resubmitted", Integer), - column("waiting", Integer), - column("queued", Integer), - column("running", Integer), - column("ok", Integer), - column("error", Integer), - column("failed", Integer), - column("paused", Integer), - column("skipped", Integer), - column("deleted", Integer), - column("deleted_new", Integer), - column("upload", Integer), - column("all_jobs", Integer), - ) - pkeys = {"hdca_id"} - __table__ = View._make_table(name, __view__, pkeys) + +def upgrade(): + stmt = f"DROP VIEW IF EXISTS {view_name}" + op.execute(stmt) -mapper_registry = registry() -mapper_registry.map_imperatively( - HistoryDatasetCollectionJobStateSummary, HistoryDatasetCollectionJobStateSummary.__table__ -) +def downgrade(): + stmt = f"CREATE VIEW {view_name} AS {aggregate_query}" + op.execute(stmt) diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/c39f1de47a04_add_skipped_state_to_collection_job_.py b/lib/galaxy/model/migrations/alembic/versions_gxy/c39f1de47a04_add_skipped_state_to_collection_job_.py index 6b56cb831d64..ff543ae55ff5 100644 --- a/lib/galaxy/model/migrations/alembic/versions_gxy/c39f1de47a04_add_skipped_state_to_collection_job_.py +++ b/lib/galaxy/model/migrations/alembic/versions_gxy/c39f1de47a04_add_skipped_state_to_collection_job_.py @@ -5,18 +5,7 @@ Create Date: 2023-01-16 11:53:59.783836 """ -from typing import Generator - -from alembic import ( - context, - op, -) -from alembic_utils.pg_view import PGView -from sqlalchemy import text as sql_text -from sqlalchemy.engine.url import make_url -from sqlalchemy.sql.elements import TextClause - -from galaxy.model.view import HistoryDatasetCollectionJobStateSummary +from alembic import op # revision identifiers, used by Alembic. revision = "c39f1de47a04" @@ -24,7 +13,8 @@ branch_labels = None depends_on = None -PREVIOUS_AGGREGATE_QUERY = """ +view_name = "collection_job_state_summary_view" +previous_aggregate_query = """ SELECT hdca_id, SUM(CASE WHEN state = 'new' THEN 1 ELSE 0 END) AS new, @@ -59,40 +49,53 @@ ) jobstates GROUP BY jobstates.hdca_id """ +new_aggregate_query = """ +SELECT + hdca_id, + SUM(CASE WHEN state = 'new' THEN 1 ELSE 0 END) AS new, + SUM(CASE WHEN state = 'resubmitted' THEN 1 ELSE 0 END) AS resubmitted, + SUM(CASE WHEN state = 'waiting' THEN 1 ELSE 0 END) AS waiting, + SUM(CASE WHEN state = 'queued' THEN 1 ELSE 0 END) AS queued, + SUM(CASE WHEN state = 'running' THEN 1 ELSE 0 END) AS running, + SUM(CASE WHEN state = 'ok' THEN 1 ELSE 0 END) AS ok, + SUM(CASE WHEN state = 'error' THEN 1 ELSE 0 END) AS error, + SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) AS failed, + SUM(CASE WHEN state = 'paused' THEN 1 ELSE 0 END) AS paused, + SUM(CASE WHEN state = 'skipped' THEN 1 ELSE 0 END) AS skipped, + SUM(CASE WHEN state = 'deleted' THEN 1 ELSE 0 END) AS deleted, + SUM(CASE WHEN state = 'deleted_new' THEN 1 ELSE 0 END) AS deleted_new, + SUM(CASE WHEN state = 'upload' THEN 1 ELSE 0 END) AS upload, + SUM(CASE WHEN job_id IS NOT NULL THEN 1 ELSE 0 END) AS all_jobs +FROM ( + SELECT hdca.id AS hdca_id, job.id AS job_id, job.state as state + FROM history_dataset_collection_association hdca + LEFT JOIN implicit_collection_jobs icj + ON icj.id = hdca.implicit_collection_jobs_id + LEFT JOIN implicit_collection_jobs_job_association icjja + ON icj.id = icjja.implicit_collection_jobs_id + LEFT JOIN job + ON icjja.job_id = job.id + UNION -class SqlitePGView(PGView): - def to_sql_statement_create_or_replace(self) -> Generator[TextClause, None, None]: - """Generates a SQL "create or replace view" statement""" - - yield sql_text(f"""DROP VIEW IF EXISTS {self.literal_schema}."{self.signature}";""") - yield sql_text(f"""CREATE VIEW {self.literal_schema}."{self.signature}" AS {self.definition};""") - - -def get_view_instance(definition: str): - try: - url = make_url(context.config.get_main_option("sqlalchemy.url")) - dialect = url.get_dialect().name - except Exception: - # If someone's doing offline migrations they're probably using PostgreSQL - dialect = "postgresql" - if dialect == "postgresql": - ViewClass = PGView - schema = "public" - elif dialect == "sqlite": - ViewClass = SqlitePGView - schema = "main" - else: - raise Exception(f"Don't know how to generate view for dialect '{dialect}'") - return ViewClass(schema=schema, signature="collection_job_state_summary_view", definition=definition) + SELECT hdca.id AS hdca_id, job.id AS job_id, job.state AS state + FROM history_dataset_collection_association hdca + LEFT JOIN job + ON hdca.job_id = job.id +) jobstates +GROUP BY jobstates.hdca_id +""" def upgrade(): - view = get_view_instance(HistoryDatasetCollectionJobStateSummary.aggregate_state_query) - # op.replace_entity comes from alembic_utils plugin - op.replace_entity(view) # type: ignore[attr-defined] + op.execute("BEGIN") + op.execute(f"DROP VIEW IF EXISTS {view_name}") + op.execute(f"CREATE VIEW {view_name} AS {new_aggregate_query}") + op.execute("END") def downgrade(): - view = get_view_instance(PREVIOUS_AGGREGATE_QUERY) - op.replace_entity(view) # type: ignore[attr-defined] + op.execute("BEGIN") + op.execute(f"DROP VIEW IF EXISTS {view_name}") + op.execute(f"CREATE VIEW {view_name} AS {previous_aggregate_query}") + op.execute("END") diff --git a/lib/galaxy/model/view/utils.py b/lib/galaxy/model/view/utils.py deleted file mode 100644 index 14c4cbca8dba..000000000000 --- a/lib/galaxy/model/view/utils.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -View wrappers -""" -from inspect import getmembers - -from sqlalchemy import ( - Column, - MetaData, - Table, -) -from sqlalchemy.ext import compiler -from sqlalchemy.schema import DDLElement - - -class View: - """Base class for Views.""" - - @staticmethod - def _make_table(name, selectable, pkeys): - """Create a view. - - :param name: The name of the view. - :param selectable: SQLAlchemy selectable. - :param pkeys: set of primary keys for the selectable. - """ - columns = [Column(c.name, c.type, primary_key=(c.name in pkeys)) for c in selectable.subquery().columns] - # We do not use the metadata object from model.mapping.py that contains all the Table objects - # because that would create a circular import (create_view is called from View objects - # in model.view; but those View objects are imported into model.mapping.py where the - # metadata object we need is defined). Thus, we do not use the after_create/before_drop - # hooks to automate creating/dropping views. Instead, this is taken care of in install_views(). - - # The metadata object passed to Table() should be empty: this table is internal to a View - # object and is not intended to be created in the database. - return Table(name, MetaData(), *columns) - - -class CreateView(DDLElement): - def __init__(self, name, selectable): - self.name = name - self.selectable = selectable - - -class DropView(DDLElement): - def __init__(self, name): - self.name = name - - -@compiler.compiles(CreateView) -def compile_create_view(element, compiler, **kw): - compiled_selectable = compiler.sql_compiler.process(element.selectable, literal_binds=True) - return f"CREATE VIEW {element.name} AS {compiled_selectable}" - - -@compiler.compiles(DropView) -def compile_drop_view(element, compiler, **kw): - return f"DROP VIEW IF EXISTS {element.name}" - - -def is_view_model(o): - return hasattr(o, "__view__") and issubclass(o, View) - - -def install_views(engine): - import galaxy.model.view - - views = getmembers(galaxy.model.view, is_view_model) - for _, view in views: - # adding DropView here because our unit-testing calls this function when - # it mocks the app and CreateView will attempt to rebuild an existing - # view in a database that is already made, the right answer is probably - # to change the sql that gest emitted when CreateView is rendered. - with engine.begin() as conn: - conn.execute(DropView(view.name)) - conn.execute(CreateView(view.name, view.__view__)) diff --git a/lib/galaxy/util/__init__.py b/lib/galaxy/util/__init__.py index 59e1ce1d22b4..6645688e4e7c 100644 --- a/lib/galaxy/util/__init__.py +++ b/lib/galaxy/util/__init__.py @@ -1905,3 +1905,11 @@ def to_str(self, **kwd): @property def elapsed(self): return time.time() - self.begin + + +def enum_values(enum_class): + """ + Return a list of member values of enumeration enum_class. + Values are in member definition order. + """ + return [value.value for value in enum_class.__members__.values()] diff --git a/pyproject.toml b/pyproject.toml index 652a0109dffb..72ab6fac4504 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,6 @@ url = "https://wheels.galaxyproject.org/simple" a2wsgi = "*" aiofiles = "*" alembic = "*" -alembic_utils = "*" apispec = "*" Babel = "*" bdbag = ">=1.6.3" diff --git a/test/unit/data/model/test_views.py b/test/unit/data/model/test_views.py deleted file mode 100644 index 433c24eb6522..000000000000 --- a/test/unit/data/model/test_views.py +++ /dev/null @@ -1,84 +0,0 @@ -import pytest -from sqlalchemy import ( - Column, - Integer, - MetaData, - Table, -) -from sqlalchemy.sql import ( - column, - text, -) - -from galaxy.model.database_utils import ( - create_database, - sqlalchemy_engine, -) -from galaxy.model.unittest_utils.model_testing_utils import ( - drop_database, - replace_database_in_url, - skip_if_not_mysql_uri, - skip_if_not_postgres_uri, -) -from galaxy.model.view.utils import ( - CreateView, - View, -) - - -@pytest.fixture -def view(): - # A View class we would add to galaxy.model.view - class TestView(View): - name = "testview" - __view__ = text("SELECT id, foo FROM testfoo").columns(column("id", Integer), column("foo", Integer)) - pkeys = {"id"} - View._make_table(name, __view__, pkeys) - - return TestView - - -@skip_if_not_postgres_uri -def test_postgres_create_view(database_name, postgres_url, view): - metadata = MetaData() - make_table(metadata) # table from which the view will select - url = replace_database_in_url(postgres_url, database_name) - query = f"SELECT 1 FROM information_schema.views WHERE table_name = '{view.name}'" - create_database(postgres_url, database_name) - run_view_test(url, metadata, view, query) - drop_database(postgres_url, database_name) - - -def test_sqlite_create_view(sqlite_memory_url, view): - metadata = MetaData() - make_table(metadata) # table from which the view will select - url = sqlite_memory_url - query = f"SELECT 1 FROM sqlite_master WHERE type='view' AND name='{view.name}'" - run_view_test(url, metadata, view, query) - - -@skip_if_not_mysql_uri -def test_mysql_create_view(database_name, mysql_url, view): - metadata = MetaData() - make_table(metadata) # table from which the view will select - url = replace_database_in_url(mysql_url, database_name) - query = f"SELECT 1 FROM information_schema.views WHERE table_name = '{view.name}'" - create_database(mysql_url, database_name) - run_view_test(url, metadata, view, query) - drop_database(mysql_url, database_name) - - -def make_table(metadata): - users = Table( - "testfoo", metadata, Column("id", Integer, primary_key=True), Column("foo", Integer), Column("bar", Integer) - ) - return users - - -def run_view_test(url, metadata, view, query): - with sqlalchemy_engine(url) as engine: - with engine.begin() as conn: - metadata.create_all(conn) # create table in database - conn.execute(CreateView(view.name, view.__view__)) # create view in database - result = conn.execute(text(query)).fetchall() - assert len(result) == 1 # assert that view exists in database diff --git a/test/unit/util/test_utils.py b/test/unit/util/test_utils.py index 828ae610af72..34d7cb95093b 100644 --- a/test/unit/util/test_utils.py +++ b/test/unit/util/test_utils.py @@ -1,6 +1,7 @@ import errno import os import tempfile +from enum import Enum from io import StringIO from typing import Dict @@ -139,3 +140,12 @@ def test_listify() -> None: assert util.listify(d) == [d] o = object() assert util.listify(o) == [o] + + +def test_enum_values(): + class Stuff(str, Enum): + A = "a" + C = "c" + B = "b" + + assert util.enum_values(Stuff) == ["a", "c", "b"]