Skip to content

Commit

Permalink
Merge pull request galaxyproject#17020 from jdavcs/dev_sa20_job_index…
Browse files Browse the repository at this point in the history
…_query

Upgrade job manager's index_query method to SA2.0
  • Loading branch information
jdavcs authored Nov 14, 2023
2 parents c85298b + 01e627f commit 4db3b94
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 74 deletions.
141 changes: 73 additions & 68 deletions lib/galaxy/managers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
datetime,
)

import sqlalchemy
from boltons.iterutils import remap
from pydantic import (
BaseModel,
Expand All @@ -27,7 +28,6 @@

from galaxy import model
from galaxy.exceptions import (
AdminRequiredException,
ItemAccessibilityException,
ObjectNotFound,
RequestParameterInvalidException,
Expand All @@ -41,9 +41,13 @@
from galaxy.managers.hdas import HDAManager
from galaxy.managers.lddas import LDDAManager
from galaxy.model import (
ImplicitCollectionJobsJobAssociation,
Job,
JobParameter,
User,
Workflow,
WorkflowInvocation,
WorkflowInvocationStep,
YIELD_PER_ROWS,
)
from galaxy.model.base import transaction
Expand Down Expand Up @@ -100,76 +104,50 @@ def __init__(self, app: StructuredApp):
self.app = app
self.dataset_manager = DatasetManager(app)

def index_query(self, trans, payload: JobIndexQueryPayload):
def index_query(self, trans, payload: JobIndexQueryPayload) -> sqlalchemy.engine.Result:
is_admin = trans.user_is_admin
user_details = payload.user_details

decoded_user_id = payload.user_id
history_id = payload.history_id
workflow_id = payload.workflow_id
invocation_id = payload.invocation_id
search = payload.search
order_by = payload.order_by

if is_admin:
if decoded_user_id is not None:
query = trans.sa_session.query(model.Job).filter(model.Job.user_id == decoded_user_id)
else:
query = trans.sa_session.query(model.Job)
if user_details:
query = query.outerjoin(model.Job.user)

else:
if user_details:
raise AdminRequiredException("Only admins can index the jobs with user details enabled")
if decoded_user_id is not None and decoded_user_id != trans.user.id:
raise AdminRequiredException("Only admins can index the jobs of others")
query = trans.sa_session.query(model.Job).filter(model.Job.user_id == trans.user.id)

def build_and_apply_filters(query, objects, filter_func):
def build_and_apply_filters(stmt, objects, filter_func):
if objects is not None:
if isinstance(objects, (str, date, datetime)):
query = query.filter(filter_func(objects))
stmt = stmt.where(filter_func(objects))
elif isinstance(objects, list):
t = []
for obj in objects:
t.append(filter_func(obj))
query = query.filter(or_(*t))
return query
stmt = stmt.where(or_(*t))
return stmt

query = build_and_apply_filters(query, payload.states, lambda s: model.Job.state == s)
query = build_and_apply_filters(query, payload.tool_ids, lambda t: model.Job.tool_id == t)
query = build_and_apply_filters(query, payload.tool_ids_like, lambda t: model.Job.tool_id.like(t))
query = build_and_apply_filters(query, payload.date_range_min, lambda dmin: model.Job.update_time >= dmin)
query = build_and_apply_filters(query, payload.date_range_max, lambda dmax: model.Job.update_time <= dmax)

history_id = payload.history_id
workflow_id = payload.workflow_id
invocation_id = payload.invocation_id
if history_id is not None:
query = query.filter(model.Job.history_id == history_id)
if workflow_id or invocation_id:
def add_workflow_jobs():
wfi_step = select(WorkflowInvocationStep)
if workflow_id is not None:
wfi_step = (
trans.sa_session.query(model.WorkflowInvocationStep)
.join(model.WorkflowInvocation)
.join(model.Workflow)
.filter(
model.Workflow.stored_workflow_id == workflow_id,
)
.subquery()
wfi_step.join(WorkflowInvocation).join(Workflow).where(Workflow.stored_workflow_id == workflow_id)
)
elif invocation_id is not None:
wfi_step = (
trans.sa_session.query(model.WorkflowInvocationStep)
.filter(model.WorkflowInvocationStep.workflow_invocation_id == invocation_id)
.subquery()
)
query1 = query.join(wfi_step)
query2 = query.join(model.ImplicitCollectionJobsJobAssociation).join(
wfi_step = wfi_step.where(WorkflowInvocationStep.workflow_invocation_id == invocation_id)
wfi_step = wfi_step.subquery()

stmt1 = stmt.join(wfi_step)
stmt2 = stmt.join(ImplicitCollectionJobsJobAssociation).join(
wfi_step,
model.ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id
ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id
== wfi_step.c.implicit_collection_jobs_id,
)
query = query1.union(query2)
# Ensure the result is models, not tuples
sq = stmt1.union(stmt2).subquery()
# SQLite won't recognize Job.foo as a valid column for the ORDER BY clause due to the UNION clause, so we'll use the subquery `columns` collection (`sq.c`).
# Ref: https://github.com/galaxyproject/galaxy/pull/16852#issuecomment-1804676322
return select(aliased(Job, sq)), sq.c

search = payload.search
if search:
def add_search_criteria(stmt):
search_filters = {
"tool": "tool",
"t": "tool",
Expand All @@ -191,36 +169,63 @@ def build_and_apply_filters(query, objects, filter_func):
"h": "handler",
}
)
assert search
parsed_search = parse_filters_structured(search, search_filters)
for term in parsed_search.terms:
if isinstance(term, FilteredTerm):
key = term.filter
if key == "user":
query = query.filter(text_column_filter(model.User.email, term))
stmt = stmt.where(text_column_filter(User.email, term))
elif key == "tool":
query = query.filter(text_column_filter(model.Job.tool_id, term))
stmt = stmt.where(text_column_filter(Job.tool_id, term))
elif key == "handler":
query = query.filter(text_column_filter(model.Job.handler, term))
stmt = stmt.where(text_column_filter(Job.handler, term))
elif key == "runner":
query = query.filter(text_column_filter(model.Job.job_runner_name, term))
stmt = stmt.where(text_column_filter(Job.job_runner_name, term))
elif isinstance(term, RawTextTerm):
columns = [model.Job.tool_id]
columns = [Job.tool_id]
if user_details:
columns.append(model.User.email)
columns.append(User.email)
if is_admin:
columns.append(model.Job.handler)
columns.append(model.Job.job_runner_name)
query = query.filter(raw_text_column_filter(columns, term))
columns.append(Job.handler)
columns.append(Job.job_runner_name)
stmt = stmt.filter(raw_text_column_filter(columns, term))
return stmt

stmt = select(Job)

if is_admin:
if decoded_user_id is not None:
stmt = stmt.where(Job.user_id == decoded_user_id)
if user_details:
stmt = stmt.outerjoin(Job.user)
else:
stmt = stmt.where(Job.user_id == trans.user.id)

stmt = build_and_apply_filters(stmt, payload.states, lambda s: model.Job.state == s)
stmt = build_and_apply_filters(stmt, payload.tool_ids, lambda t: model.Job.tool_id == t)
stmt = build_and_apply_filters(stmt, payload.tool_ids_like, lambda t: model.Job.tool_id.like(t))
stmt = build_and_apply_filters(stmt, payload.date_range_min, lambda dmin: model.Job.update_time >= dmin)
stmt = build_and_apply_filters(stmt, payload.date_range_max, lambda dmax: model.Job.update_time <= dmax)

if history_id is not None:
stmt = stmt.where(Job.history_id == history_id)

order_by_columns = Job
if workflow_id or invocation_id:
stmt, order_by_columns = add_workflow_jobs()

if search:
stmt = add_search_criteria(stmt)

if payload.order_by == JobIndexSortByEnum.create_time:
order_by = model.Job.create_time.desc()
if order_by == JobIndexSortByEnum.create_time:
stmt = stmt.order_by(order_by_columns.create_time.desc())
else:
order_by = model.Job.update_time.desc()
query = query.order_by(order_by)
stmt = stmt.order_by(order_by_columns.update_time.desc())

query = query.offset(payload.offset)
query = query.limit(payload.limit)
return query
stmt = stmt.offset(payload.offset)
stmt = stmt.limit(payload.limit)
return trans.sa_session.scalars(stmt)

def job_lock(self) -> JobLock:
return JobLock(active=self.app.job_manager.job_lock)
Expand Down
27 changes: 21 additions & 6 deletions lib/galaxy/webapps/galaxy/services/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import (
Any,
Dict,
Optional,
)

from galaxy import (
Expand Down Expand Up @@ -59,15 +60,18 @@ def index(
):
security = trans.security
is_admin = trans.user_is_admin
if payload.view == JobIndexViewEnum.admin_job_list:
view = payload.view
if view == JobIndexViewEnum.admin_job_list:
payload.user_details = True
user_details = payload.user_details
if payload.view == JobIndexViewEnum.admin_job_list and not is_admin:
raise exceptions.AdminRequiredException("Only admins can use the admin_job_list view")
query = self.job_manager.index_query(trans, payload)
decoded_user_id = payload.user_id

if not is_admin:
self._check_nonadmin_access(view, user_details, decoded_user_id, trans.user.id)

jobs = self.job_manager.index_query(trans, payload)
out = []
view = payload.view
for job in query.yield_per(model.YIELD_PER_ROWS):
for job in jobs.yield_per(model.YIELD_PER_ROWS):
job_dict = job.to_dict(view, system_details=is_admin)
j = security.encode_all_ids(job_dict, True)
if view == JobIndexViewEnum.admin_job_list:
Expand All @@ -77,3 +81,14 @@ def index(
out.append(j)

return out

def _check_nonadmin_access(
self, view: str, user_details: bool, decoded_user_id: Optional[DecodedDatabaseIdField], trans_user_id: int
):
"""Verify admin-only resources are not being accessed."""
if view == JobIndexViewEnum.admin_job_list:
raise exceptions.AdminRequiredException("Only admins can use the admin_job_list view")
if user_details:
raise exceptions.AdminRequiredException("Only admins can index the jobs with user details enabled")
if decoded_user_id is not None and decoded_user_id != trans_user_id:
raise exceptions.AdminRequiredException("Only admins can index the jobs of others")

0 comments on commit 4db3b94

Please sign in to comment.