Skip to content

Commit

Permalink
Merge pull request galaxyproject#16340 from jmchilton/per_object_stor…
Browse files Browse the repository at this point in the history
…e_admin

Augment pgcleanup to allow periodically deleting old datasets.
  • Loading branch information
jmchilton authored Nov 2, 2023
2 parents bbe0ee2 + 360b327 commit d3d812c
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 61 deletions.
22 changes: 22 additions & 0 deletions lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,28 @@ def run_collection_creates_list(self, history_id: str, hdca_id: str) -> Response
self.wait_for_history(history_id, assert_ok=True)
return self.run_tool("collection_creates_list", inputs, history_id)

def new_error_dataset(self, history_id: str) -> str:
payload = self.run_tool_payload(
tool_id="test_data_source",
inputs={
"URL": f"file://{os.path.join(os.getcwd(), 'README.rst')}",
"URL_method": "get",
"data_type": "bed",
},
history_id=history_id,
)
create_response = self._post("tools", data=payload)
api_asserts.assert_status_code_is(create_response, 200)
create_object = create_response.json()
api_asserts.assert_has_keys(create_object, "outputs")
assert len(create_object["outputs"]) == 1
output = create_object["outputs"][0]
self.wait_for_history(history_id, assert_ok=False)
# wait=False to allow errors
output_details = self.get_history_dataset_details(history_id, dataset=output, wait=False)
assert output_details["state"] == "error", output_details
return output_details["id"]

def run_exit_code_from_file(self, history_id: str, hdca_id: str) -> dict:
exit_code_inputs = {
"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
Expand Down
6 changes: 3 additions & 3 deletions scripts/cleanup_datasets/admin_cleanup_datasets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
"""
Mark datasets as deleted that are older than specified cutoff
and (optionaly) with a tool_id that matches the specified search
and (optionally) with a tool_id that matches the specified search
string.
This script is useful for administrators to cleanup after users who
Expand Down Expand Up @@ -121,10 +121,10 @@ def main():
default=False,
)
parser.add_argument(
"--smtp", default=None, help="SMTP Server to use to send email. " "Default: [read from galaxy ini file]"
"--smtp", default=None, help="SMTP Server to use to send email. " "Default: [read from galaxy config file]"
)
parser.add_argument(
"--fromaddr", default=None, help="From address to use to send email. " "Default: [read from galaxy ini file]"
"--fromaddr", default=None, help="From address to use to send email. " "Default: [read from galaxy config file]"
)
populate_config_args(parser)

Expand Down
76 changes: 70 additions & 6 deletions scripts/cleanup_datasets/pgcleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ def __init__(self, app):
self._debug = app.args.debug
self._update_time = app.args.update_time
self._force_retry = app.args.force_retry
if app.args.object_store_id:
self._object_store_id_sql = f" AND dataset.object_store_id = '{app.args.object_store_id}'"
else:
self._object_store_id_sql = ""
self._epoch_time = str(int(time.time()))
self._days = app.args.days
self._config = app.config
Expand Down Expand Up @@ -200,6 +204,7 @@ def sql(self):
update_time_sql=self._update_time_sql,
force_retry_sql=self._force_retry_sql,
epoch_time=self._epoch_time,
object_store_id_sql=self._object_store_id_sql,
)

@property
Expand Down Expand Up @@ -359,6 +364,7 @@ def sql(self):
update_time_sql=self._update_time_sql,
force_retry_sql=self._force_retry_sql,
epoch_time=self._epoch_time,
object_store_id_sql=self._object_store_id_sql,
)


Expand Down Expand Up @@ -844,17 +850,68 @@ class PurgeDeletedHDAs(PurgesHDAs, RemovesMetadataFiles, RequiresDiskUsageRecalc
)


class PurgeOldHDAs(PurgesHDAs, RemovesMetadataFiles, RequiresDiskUsageRecalculation, Action):
"""
- Mark purged all HistoryDatasetAssociations that are older than the specified number of days.
- Mark deleted all MetadataFiles whose hda_id is purged in this step.
- Mark deleted all ImplicitlyConvertedDatasetAssociations whose hda_parent_id is purged in this
step.
- Mark purged all HistoryDatasetAssociations for which an ImplicitlyConvertedDatasetAssociation
with matching hda_id is deleted in this step.
"""

force_retry_sql = " AND NOT history_dataset_association.purged"
_action_sql = """
WITH purged_hda_ids
AS ( UPDATE history_dataset_association
SET purged = true, deleted = true{update_time_sql}
FROM dataset
WHERE history_dataset_association.dataset_id = dataset.id AND
dataset.create_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
{force_retry_sql} {object_store_id_sql}
RETURNING history_dataset_association.id,
history_id),
hda_events
AS (INSERT INTO cleanup_event_hda_association
(create_time, cleanup_event_id, hda_id)
SELECT NOW() AT TIME ZONE 'utc', %(event_id)s, id
FROM purged_hda_ids),
{purge_hda_dependencies_sql}
SELECT purged_hda_ids.id AS purged_hda_id,
history.user_id AS recalculate_disk_usage_user_id,
deleted_metadata_file_ids.id AS deleted_metadata_file_id,
deleted_metadata_file_ids.uuid AS deleted_metadata_file_uuid,
deleted_metadata_file_ids.object_store_id AS object_store_id,
deleted_icda_ids.id AS deleted_icda_id,
deleted_icda_ids.hda_id AS deleted_icda_hda_id
FROM purged_hda_ids
LEFT OUTER JOIN history
ON purged_hda_ids.history_id = history.id
LEFT OUTER JOIN deleted_metadata_file_ids
ON deleted_metadata_file_ids.hda_id = purged_hda_ids.id
LEFT OUTER JOIN deleted_icda_ids
ON deleted_icda_ids.hda_parent_id = purged_hda_ids.id
ORDER BY purged_hda_ids.id
"""
causals = (
("purged_hda_id", "deleted_metadata_file_id", "object_store_id"),
("purged_hda_id", "deleted_icda_id", "deleted_icda_hda_id"),
)


class PurgeHistorylessHDAs(PurgesHDAs, RemovesMetadataFiles, RequiresDiskUsageRecalculation, Action):
"""
- Mark purged all HistoryDatasetAssociations whose history_id is null.
"""

force_retry_sql = " AND NOT history_dataset_association.purged"
_action_sql = """
WITH purged_hda_ids
AS ( UPDATE history_dataset_association
SET purged = true, deleted = true{update_time_sql}
WHERE history_id IS NULL{force_retry_sql}
AND update_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
FROM dataset
WHERE history_id IS NULL{force_retry_sql}{object_store_id_sql}
AND history_dataset_association.update_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
RETURNING id),
hda_events
AS (INSERT INTO cleanup_event_hda_association
Expand Down Expand Up @@ -893,7 +950,7 @@ class PurgeErrorHDAs(PurgesHDAs, RemovesMetadataFiles, RequiresDiskUsageRecalcul
AS ( UPDATE history_dataset_association
SET purged = true, deleted = true{update_time_sql}
FROM dataset
WHERE history_dataset_association.dataset_id = dataset.id{force_retry_sql}
WHERE history_dataset_association.dataset_id = dataset.id{force_retry_sql}{object_store_id_sql}
AND dataset.state = 'error'
AND history_dataset_association.update_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
RETURNING history_dataset_association.id as id,
Expand Down Expand Up @@ -1037,7 +1094,7 @@ class DeleteExportedHistories(Action):
SET deleted = true{update_time_sql}
FROM job_export_history_archive
WHERE job_export_history_archive.dataset_id = dataset.id
AND NOT deleted
AND NOT deleted {object_store_id_sql}
AND dataset.update_time <= (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
RETURNING dataset.id),
dataset_events
Expand All @@ -1063,7 +1120,7 @@ class DeleteDatasets(Action):
WITH deleted_dataset_ids
AS ( UPDATE dataset
SET deleted = true{update_time_sql}
WHERE NOT deleted
WHERE NOT deleted {object_store_id_sql}
AND NOT EXISTS
(SELECT true
FROM library_dataset_dataset_association
Expand Down Expand Up @@ -1097,7 +1154,7 @@ class PurgeDatasets(RemovesDatasets, Action):
WITH purged_dataset_ids
AS ( UPDATE dataset
SET purged = true{update_time_sql}
WHERE deleted{force_retry_sql}
WHERE deleted{force_retry_sql}{object_store_id_sql}
AND update_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
RETURNING id,
uuid,
Expand Down Expand Up @@ -1182,6 +1239,13 @@ def __parse_args(self):
default=14,
help="Only perform action(s) on objects that have not been updated since the specified number of days",
)
parser.add_argument(
"--object-store-id",
dest="object_store_id",
type=str,
default=None,
help="Only perform action(s) on objects stored in the target object store (for dataset operations - ignored by user/history centric operations)",
)
parser.add_argument(
"-U",
"--no-update-time",
Expand Down
90 changes: 38 additions & 52 deletions test/integration/test_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from galaxy_test.driver import integration_util


class TestScriptsIntegration(integration_util.IntegrationTestCase):
class BaseScriptsIntegrationTestCase(integration_util.IntegrationTestCase):
dataset_populator: DatasetPopulator

def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
Expand All @@ -26,6 +28,39 @@ def setUp(self):
def handle_galaxy_config_kwds(cls, config):
cls._raw_config = config

def _scripts_check_argparse_help(self, script):
# Test imports and argparse response to --help with 0 exit code.
output = self._scripts_check_output(script, ["--help"])
# Test -h, --help in printed output message.
assert "-h, --help" in output

def _scripts_check_output(self, script, args):
cwd = galaxy_directory()
cmd = ["python", os.path.join(cwd, "scripts", script)] + args
clean_env = {
"PATH": os.environ.get("PATH", None),
} # Don't let testing environment variables interfere with config.
try:
return unicodify(subprocess.check_output(cmd, cwd=cwd, env=clean_env))
except Exception as e:
if isinstance(e, subprocess.CalledProcessError):
raise Exception(f"{unicodify(e)}\nOutput was:\n{unicodify(e.output)}")
raise

def write_config_file(self):
config_dir = self.config_dir
path = os.path.join(config_dir, "galaxy.yml")
self._test_driver.temp_directories.extend([config_dir])
config = self._raw_config
# Update config dict with database_connection, which might be set through env variables
config["database_connection"] = self._app.config.database_connection
with open(path, "w") as f:
yaml.dump({"galaxy": config}, f)

return path


class TestScriptsIntegration(BaseScriptsIntegrationTestCase):
def test_helper(self):
script = "helper.py"
self._scripts_check_argparse_help(script)
Expand All @@ -52,25 +87,10 @@ def test_cleanup(self):
assert history_response.status_code == 200
assert history_response.json()["purged"] is True, history_response.json()

def test_pgcleanup(self):
self._skip_unless_postgres()

script = "cleanup_datasets/pgcleanup.py"
def test_admin_cleanup_datasets(self):
script = "cleanup_datasets/admin_cleanup_datasets.py"
self._scripts_check_argparse_help(script)

history_id = self.dataset_populator.new_history()
delete_response = self.dataset_populator._delete(f"histories/{history_id}")
assert delete_response.status_code == 200
assert delete_response.json()["purged"] is False
config_file = self.write_config_file()
output = self._scripts_check_output(
script, ["-c", config_file, "--older-than", "0", "--sequence", "purge_deleted_histories"]
)
print(output)
history_response = self.dataset_populator._get(f"histories/{history_id}")
assert history_response.status_code == 200
assert history_response.json()["purged"] is True, history_response.json()

def test_set_user_disk_usage(self):
script = "set_user_disk_usage.py"
self._scripts_check_argparse_help(script)
Expand Down Expand Up @@ -123,9 +143,6 @@ def test_grt_export(self):
export = json.load(f)
assert export["version"] == 3

def test_admin_cleanup_datasets(self):
self._scripts_check_argparse_help("cleanup_datasets/admin_cleanup_datasets.py")

def test_secret_decoder_ring(self):
script = "secret_decoder_ring.py"
self._scripts_check_argparse_help(script)
Expand All @@ -143,34 +160,3 @@ def test_galaxy_main(self):
def test_runtime_stats(self):
self._skip_unless_postgres()
self._scripts_check_argparse_help("runtime_stats.py")

def _scripts_check_argparse_help(self, script):
# Test imports and argparse response to --help with 0 exit code.
output = self._scripts_check_output(script, ["--help"])
# Test -h, --help in printed output message.
assert "-h, --help" in output

def _scripts_check_output(self, script, args):
cwd = galaxy_directory()
cmd = ["python", os.path.join(cwd, "scripts", script)] + args
clean_env = {
"PATH": os.environ.get("PATH", None),
} # Don't let testing environment variables interfere with config.
try:
return unicodify(subprocess.check_output(cmd, cwd=cwd, env=clean_env))
except Exception as e:
if isinstance(e, subprocess.CalledProcessError):
raise Exception(f"{unicodify(e)}\nOutput was:\n{unicodify(e.output)}")
raise

def write_config_file(self):
config_dir = self.config_dir
path = os.path.join(config_dir, "galaxy.yml")
self._test_driver.temp_directories.extend([config_dir])
config = self._raw_config
# Update config dict with database_connection, which might be set through env variables
config["database_connection"] = self._app.config.database_connection
with open(path, "w") as f:
yaml.dump({"galaxy": config}, f)

return path
Loading

0 comments on commit d3d812c

Please sign in to comment.