Skip to content

Commit

Permalink
Use SKIP LOCKED only if database supports it
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Apr 16, 2021
1 parent 44b988c commit 5fb4ffb
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 17 deletions.
6 changes: 3 additions & 3 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ def fail(self, message, exception=False, tool_stdout="", tool_stderr="", exit_co
# Pause any dependent jobs (and those jobs' outputs)
for dep_job_assoc in dataset.dependent_jobs:
self.pause(dep_job_assoc.job, "Execution of this dataset's job is paused because its input datasets are in an error state.")
job.set_final_state(job.states.ERROR)
job.set_final_state(job.states.ERROR, supports_skip_locked=self.app.application_stack.supports_skip_locked())
job.command_line = unicodify(self.command_line)
job.info = message
# TODO: Put setting the stdout, stderr, and exit code in one place
Expand Down Expand Up @@ -1408,7 +1408,7 @@ def change_state(self, state, info=False, flush=True, job=None):
job.info = info
job.set_state(state)
self.sa_session.add(job)
job.update_output_states()
job.update_output_states(self.app.application_stack.supports_skip_locked())
if flush:
self.sa_session.flush()

Expand Down Expand Up @@ -1776,7 +1776,7 @@ def fail():

# Finally set the job state. This should only happen *after* all
# dataset creation, and will allow us to eliminate force_history_refresh.
job.set_final_state(final_job_state)
job.set_final_state(final_job_state, supports_skip_locked=self.app.application_stack.supports_skip_locked())
if not job.tasks:
# If job was composed of tasks, don't attempt to recollect statistics
self._collect_metrics(job, job_metrics_directory)
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ def __delete(self, job, error_msg):
if error_msg is not None:
final_state = job.states.ERROR
job.info = error_msg
job.set_final_state(final_state)
job.set_final_state(final_state, supports_skip_locked=self.app.application_stack.supports_skip_locked())
self.sa_session.add(job)
self.sa_session.flush()

Expand Down
33 changes: 24 additions & 9 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
true,
type_coerce,
types)
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext import hybrid
from sqlalchemy.orm import (
aliased,
Expand Down Expand Up @@ -1177,22 +1178,36 @@ def to_dict(self, view='collection', system_details=False):

return rval

def update_hdca_update_time_for_job(self, update_time, sa_session):
def update_hdca_update_time_for_job(self, update_time, sa_session, supports_skip_locked):
subq = sa_session.query(HistoryDatasetCollectionAssociation.id) \
.join(ImplicitCollectionJobs) \
.join(ImplicitCollectionJobsJobAssociation) \
.filter(ImplicitCollectionJobsJobAssociation.job_id == self.id) \
.with_for_update(skip_locked=True).subquery()
.filter(ImplicitCollectionJobsJobAssociation.job_id == self.id)
if supports_skip_locked:
subq = subq.with_for_update(skip_locked=True).subquery()
implicit_statement = HistoryDatasetCollectionAssociation.table.update() \
.where(HistoryDatasetCollectionAssociation.table.c.id.in_(subq)) \
.values(update_time=update_time)
explicit_statement = HistoryDatasetCollectionAssociation.table.update() \
.where(HistoryDatasetCollectionAssociation.table.c.job_id == self.id) \
.values(update_time=update_time)
sa_session.execute(implicit_statement)
sa_session.execute(explicit_statement)

def set_final_state(self, final_state):
if supports_skip_locked:
sa_session.execute(implicit_statement)
else:
conn = sa_session.connection(execution_options={'isolation_level': 'SERIALIZABLE'})
with conn.begin() as trans:
try:
conn.execute(implicit_statement)
trans.commit()
except OperationalError as e:
# If this is a serialization failure on PostgreSQL, then e.orig is a psycopg2 TransactionRollbackError
# and should have attribute `code`. Other engines should just report the message and move on.
if int(getattr(e.orig, 'pgcode', -1)) != 40001:
log.debug(f"Updating implicit collection uptime_time for job {self.id} failed (this is expected for large collections and not a problem): {unicodify(e)}")
trans.rollback()

def set_final_state(self, final_state, supports_skip_locked):
self.set_state(final_state)
# TODO: migrate to where-in subqueries?
statement = '''
Expand All @@ -1203,7 +1218,7 @@ def set_final_state(self, final_state):
sa_session = object_session(self)
update_time = galaxy.model.orm.now.now()
log.debug(f'Final state update time: {update_time}')
self.update_hdca_update_time_for_job(update_time=update_time, sa_session=sa_session)
self.update_hdca_update_time_for_job(update_time=update_time, sa_session=sa_session, supports_skip_locked=supports_skip_locked)
params = {
'job_id': self.id,
'update_time': update_time
Expand All @@ -1230,7 +1245,7 @@ def command_version(self):
for dataset_assoc in self.output_datasets:
return dataset_assoc.dataset.tool_version

def update_output_states(self):
def update_output_states(self, supports_skip_locked):
# TODO: migrate to where-in subqueries?
statements = ['''
UPDATE dataset
Expand Down Expand Up @@ -1275,7 +1290,7 @@ def update_output_states(self):
''']
sa_session = object_session(self)
update_time = galaxy.model.orm.now.now()
self.update_hdca_update_time_for_job(update_time=update_time, sa_session=sa_session)
self.update_hdca_update_time_for_job(update_time=update_time, sa_session=sa_session, supports_skip_locked=supports_skip_locked)
params = {
'job_id': self.id,
'state': self.state,
Expand Down
8 changes: 4 additions & 4 deletions test/unit/managers/test_HistoryContentsManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,15 @@ def test_update_time_filter(self):
contents.append(self.add_list_collection_to_history(history, contents[4:6]))

self.log("should allow filtering by update_time")
# in the case of collections we have to change the collection.collection (ugh) to change the update_time
contents[3].collection.populated_state = 'big ball of mud'
# change the update_time by updating the name
contents[3].name = 'big ball of mud'
self.app.model.context.flush()
update_time = contents[3].collection.update_time
update_time = contents[3].update_time

def get_update_time(item):
update_time = getattr(item, 'update_time', None)
if not update_time:
update_time = item.collection.update_time
update_time = item.update_time
return update_time

results = self.contents_manager.contents(history, filters=[parsed_filter("orm", column('update_time') >= update_time)])
Expand Down

0 comments on commit 5fb4ffb

Please sign in to comment.