Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop database views #15876

Merged
merged 15 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/galaxy/dependencies/pinned-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ aiofiles==23.1.0 ; python_version >= "3.7" and python_version < "3.12"
aiohttp==3.8.4 ; python_version >= "3.7" and python_version < "3.12"
aioitertools==0.11.0 ; python_version >= "3.7" and python_version < "3.12"
aiosignal==1.3.1 ; python_version >= "3.7" and python_version < "3.12"
alembic-utils==0.8.1 ; python_version >= "3.7" and python_version < "3.12"
alembic==1.10.3 ; python_version >= "3.7" and python_version < "3.12"
amqp==5.1.1 ; python_version >= "3.7" and python_version < "3.12"
anyio==3.6.2 ; python_version >= "3.7" and python_version < "3.12"
Expand Down
8 changes: 0 additions & 8 deletions lib/galaxy/managers/history_contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,6 @@ def _subcontainer_id_map(self, id_list, serialization_params=None):
.options(joinedload(component_class.tags))
.options(joinedload(component_class.annotations))
)

# This will conditionally join a potentially costly job_state summary
# All the paranoia if-checking makes me wonder if serialization_params
# should really be a property of the manager class instance
if serialization_params and serialization_params.keys:
if "job_state_summary" in serialization_params.keys:
query = query.options(joinedload(component_class.job_state_summary))

return {row.id: row for row in query.all()}


Expand Down
87 changes: 73 additions & 14 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
BigInteger,
bindparam,
Boolean,
case,
Column,
column,
DateTime,
desc,
event,
Expand Down Expand Up @@ -126,13 +128,13 @@
)
from galaxy.model.orm.now import now
from galaxy.model.orm.util import add_object_to_object_session
from galaxy.model.view import HistoryDatasetCollectionJobStateSummary
from galaxy.objectstore import ObjectStore
from galaxy.security import get_permitted_actions
from galaxy.security.idencoding import IdEncodingHelper
from galaxy.security.validate_user_input import validate_password_str
from galaxy.util import (
directory_hash_id,
enum_values,
listify,
ready_name_for_url,
unicodify,
Expand Down Expand Up @@ -6438,15 +6440,6 @@ class HistoryDatasetCollectionAssociation(
back_populates="history_dataset_collection_associations",
uselist=False,
)
job_state_summary = relationship(
HistoryDatasetCollectionJobStateSummary,
primaryjoin=(
lambda: HistoryDatasetCollectionAssociation.id
== HistoryDatasetCollectionJobStateSummary.__table__.c.hdca_id
),
foreign_keys=HistoryDatasetCollectionJobStateSummary.__table__.c.hdca_id,
uselist=False,
)
tags = relationship(
"HistoryDatasetCollectionTagAssociation",
order_by=lambda: HistoryDatasetCollectionTagAssociation.id,
Expand All @@ -6466,6 +6459,7 @@ class HistoryDatasetCollectionAssociation(

dict_dbkeysandextensions_visible_keys = ["dbkeys", "extensions"]
editable_keys = ("name", "deleted", "visible")
_job_state_summary = None

def __init__(self, deleted=False, visible=True, **kwd):
super().__init__(**kwd)
Expand Down Expand Up @@ -6500,13 +6494,75 @@ def job_source_type(self):
else:
return None

@property
def job_state_summary(self):
"""
Aggregate counts of jobs by state, stored in a JobStateSummary object.
"""
if not self._job_state_summary:
self._job_state_summary = self._get_job_state_summary()
# if summary exists, but there are no jobs, load zeroes for all other counts (otherwise they will be None)
if self._job_state_summary and self._job_state_summary.all_jobs == 0:
zeroes = [0] * (len(Job.states) + 1)
self._job_state_summary = JobStateSummary._make(zeroes)

return self._job_state_summary

def _get_job_state_summary(self):
def build_statement():
state_label = "state" # used to generate `SELECT job.state AS state`, and then refer to it in aggregates.

# Select job states joining on icjja > icj > hdca
# (We are selecting Job.id in addition to Job.state because otherwise the UNION operation
# will get rid of duplicates, making aggregates meaningless.)
subq1 = (
select(Job.id, Job.state.label(state_label))
.join(ImplicitCollectionJobsJobAssociation, ImplicitCollectionJobsJobAssociation.job_id == Job.id)
.join(
ImplicitCollectionJobs,
ImplicitCollectionJobs.id == ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id,
)
.join(
HistoryDatasetCollectionAssociation,
HistoryDatasetCollectionAssociation.implicit_collection_jobs_id == ImplicitCollectionJobs.id,
)
.where(HistoryDatasetCollectionAssociation.id == self.id)
)

# Select job states joining on hdca
subq2 = (
select(Job.id, Job.state.label(state_label))
.join(HistoryDatasetCollectionAssociation, HistoryDatasetCollectionAssociation.job_id == Job.id)
.where(HistoryDatasetCollectionAssociation.id == self.id)
)

# Combine subqueries
subq = subq1.union(subq2)

# Build and return final query
stm = select().select_from(subq)
# Add aggregate columns for each job state
for state in enum_values(Job.states):
col = func.sum(case((column(state_label) == state, 1), else_=0)).label(state)
stm = stm.add_columns(col)
# Add aggregate column for all jobs
col = func.count("*").label("all_jobs")
stm = stm.add_columns(col)
return stm

if not object_session(self):
return None # no session means object is not persistant; therefore, it has no associated jobs.

engine = object_session(self).bind
with engine.connect() as conn:
counts = conn.execute(build_statement()).one()
assert len(counts) == len(Job.states) + 1 # Verify all job states + all jobs are counted
return JobStateSummary._make(counts)

@property
def job_state_summary_dict(self):
if self.job_state_summary:
states = self.job_state_summary.__dict__.copy()
del states["_sa_instance_state"]
del states["hdca_id"]
return states
return self.job_state_summary._asdict()

@property
def dataset_dbkeys_and_extensions_summary(self):
Expand Down Expand Up @@ -10555,3 +10611,6 @@ def receive_init(target, args, kwargs):
if obj:
add_object_to_object_session(target, obj)
return # Once is enough.


JobStateSummary = NamedTuple("JobStateSummary", [(value, int) for value in enum_values(Job.states)] + [("all_jobs", int)]) # type: ignore[misc] # Ref https://github.com/python/mypy/issues/848#issuecomment-255237167
2 changes: 0 additions & 2 deletions lib/galaxy/model/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from galaxy.model.orm.engine_factory import build_engine
from galaxy.model.security import GalaxyRBACAgent
from galaxy.model.triggers.update_audit_table import install as install_timestamp_triggers
from galaxy.model.view.utils import install_views

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -66,7 +65,6 @@ def init(

def create_additional_database_objects(engine):
install_timestamp_triggers(engine)
install_views(engine)


def configure_model_mapping(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
"""
Galaxy sql view models
"""
from sqlalchemy import Integer
from sqlalchemy.orm import registry
from sqlalchemy.sql import (
column,
text,
)
"""drop view

from galaxy.model.view.utils import View
Revision ID: 3356bc2ecfc4
Revises: 460d0ecd1dd8
Create Date: 2023-04-10 15:06:01.037416

"""
from alembic import op

class HistoryDatasetCollectionJobStateSummary(View):
name = "collection_job_state_summary_view"
# revision identifiers, used by Alembic.
revision = "3356bc2ecfc4"
down_revision = "460d0ecd1dd8"
branch_labels = None
depends_on = None

aggregate_state_query = """

view_name = "collection_job_state_summary_view"
aggregate_query = """
SELECT
hdca_id,
SUM(CASE WHEN state = 'new' THEN 1 ELSE 0 END) AS new,
Expand Down Expand Up @@ -51,28 +52,12 @@ class HistoryDatasetCollectionJobStateSummary(View):
GROUP BY jobstates.hdca_id
"""

__view__ = text(aggregate_state_query).columns(
column("hdca_id", Integer),
column("new", Integer),
column("resubmitted", Integer),
column("waiting", Integer),
column("queued", Integer),
column("running", Integer),
column("ok", Integer),
column("error", Integer),
column("failed", Integer),
column("paused", Integer),
column("skipped", Integer),
column("deleted", Integer),
column("deleted_new", Integer),
column("upload", Integer),
column("all_jobs", Integer),
)
pkeys = {"hdca_id"}
__table__ = View._make_table(name, __view__, pkeys)

def upgrade():
stmt = f"DROP VIEW IF EXISTS {view_name}"
op.execute(stmt)


mapper_registry = registry()
mapper_registry.map_imperatively(
HistoryDatasetCollectionJobStateSummary, HistoryDatasetCollectionJobStateSummary.__table__
)
def downgrade():
stmt = f"CREATE VIEW {view_name} AS {aggregate_query}"
op.execute(stmt)
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,16 @@
Create Date: 2023-01-16 11:53:59.783836

"""
from typing import Generator

from alembic import (
context,
op,
)
from alembic_utils.pg_view import PGView
from sqlalchemy import text as sql_text
from sqlalchemy.engine.url import make_url
from sqlalchemy.sql.elements import TextClause

from galaxy.model.view import HistoryDatasetCollectionJobStateSummary
from alembic import op

# revision identifiers, used by Alembic.
revision = "c39f1de47a04"
down_revision = "3100452fa030"
branch_labels = None
depends_on = None

PREVIOUS_AGGREGATE_QUERY = """
view_name = "collection_job_state_summary_view"
previous_aggregate_query = """
SELECT
hdca_id,
SUM(CASE WHEN state = 'new' THEN 1 ELSE 0 END) AS new,
Expand Down Expand Up @@ -59,40 +49,53 @@
) jobstates
GROUP BY jobstates.hdca_id
"""
new_aggregate_query = """
SELECT
hdca_id,
SUM(CASE WHEN state = 'new' THEN 1 ELSE 0 END) AS new,
SUM(CASE WHEN state = 'resubmitted' THEN 1 ELSE 0 END) AS resubmitted,
SUM(CASE WHEN state = 'waiting' THEN 1 ELSE 0 END) AS waiting,
SUM(CASE WHEN state = 'queued' THEN 1 ELSE 0 END) AS queued,
SUM(CASE WHEN state = 'running' THEN 1 ELSE 0 END) AS running,
SUM(CASE WHEN state = 'ok' THEN 1 ELSE 0 END) AS ok,
SUM(CASE WHEN state = 'error' THEN 1 ELSE 0 END) AS error,
SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) AS failed,
SUM(CASE WHEN state = 'paused' THEN 1 ELSE 0 END) AS paused,
SUM(CASE WHEN state = 'skipped' THEN 1 ELSE 0 END) AS skipped,
SUM(CASE WHEN state = 'deleted' THEN 1 ELSE 0 END) AS deleted,
SUM(CASE WHEN state = 'deleted_new' THEN 1 ELSE 0 END) AS deleted_new,
SUM(CASE WHEN state = 'upload' THEN 1 ELSE 0 END) AS upload,
SUM(CASE WHEN job_id IS NOT NULL THEN 1 ELSE 0 END) AS all_jobs
FROM (
SELECT hdca.id AS hdca_id, job.id AS job_id, job.state as state
FROM history_dataset_collection_association hdca
LEFT JOIN implicit_collection_jobs icj
ON icj.id = hdca.implicit_collection_jobs_id
LEFT JOIN implicit_collection_jobs_job_association icjja
ON icj.id = icjja.implicit_collection_jobs_id
LEFT JOIN job
ON icjja.job_id = job.id

UNION

class SqlitePGView(PGView):
def to_sql_statement_create_or_replace(self) -> Generator[TextClause, None, None]:
"""Generates a SQL "create or replace view" statement"""

yield sql_text(f"""DROP VIEW IF EXISTS {self.literal_schema}."{self.signature}";""")
yield sql_text(f"""CREATE VIEW {self.literal_schema}."{self.signature}" AS {self.definition};""")


def get_view_instance(definition: str):
try:
url = make_url(context.config.get_main_option("sqlalchemy.url"))
dialect = url.get_dialect().name
except Exception:
# If someone's doing offline migrations they're probably using PostgreSQL
dialect = "postgresql"
if dialect == "postgresql":
ViewClass = PGView
schema = "public"
elif dialect == "sqlite":
ViewClass = SqlitePGView
schema = "main"
else:
raise Exception(f"Don't know how to generate view for dialect '{dialect}'")
return ViewClass(schema=schema, signature="collection_job_state_summary_view", definition=definition)
SELECT hdca.id AS hdca_id, job.id AS job_id, job.state AS state
FROM history_dataset_collection_association hdca
LEFT JOIN job
ON hdca.job_id = job.id
) jobstates
GROUP BY jobstates.hdca_id
"""


def upgrade():
view = get_view_instance(HistoryDatasetCollectionJobStateSummary.aggregate_state_query)
# op.replace_entity comes from alembic_utils plugin
op.replace_entity(view) # type: ignore[attr-defined]
op.execute("BEGIN")
op.execute(f"DROP VIEW IF EXISTS {view_name}")
op.execute(f"CREATE VIEW {view_name} AS {new_aggregate_query}")
op.execute("END")


def downgrade():
view = get_view_instance(PREVIOUS_AGGREGATE_QUERY)
op.replace_entity(view) # type: ignore[attr-defined]
op.execute("BEGIN")
op.execute(f"DROP VIEW IF EXISTS {view_name}")
op.execute(f"CREATE VIEW {view_name} AS {previous_aggregate_query}")
op.execute("END")
Loading