diff --git a/superset/commands/sql_lab/query.py b/superset/commands/sql_lab/query.py new file mode 100644 index 0000000000000..87bc0d28adbb2 --- /dev/null +++ b/superset/commands/sql_lab/query.py @@ -0,0 +1,109 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +import time +from datetime import datetime, timedelta + +import sqlalchemy as sa + +from superset import db +from superset.commands.base import BaseCommand +from superset.models.sql_lab import Query + +logger = logging.getLogger(__name__) + + +# pylint: disable=consider-using-transaction +class QueryPruneCommand(BaseCommand): + """ + Command to prune the query table by deleting rows older than the specified retention period. + + This command deletes records from the `Query` table that have not been changed within the + specified number of days. It helps in maintaining the database by removing outdated entries + and freeing up space. + + Attributes: + retention_period_days (int): The number of days for which records should be retained. + Records older than this period will be deleted. + """ + + def __init__(self, retention_period_days: int): + """ + :param retention_period_days: Number of days to keep in the query table + """ + self.retention_period_days = retention_period_days + + def run(self) -> None: + """ + Executes the prune command + """ + batch_size = 999 # SQLite has a IN clause limit of 999 + total_deleted = 0 + start_time = time.time() + + # Select all IDs that need to be deleted + ids_to_delete = ( + db.session.execute( + sa.select(Query.id).where( + Query.changed_on + < datetime.now() - timedelta(days=self.retention_period_days) + ) + ) + .scalars() + .all() + ) + + total_rows = len(ids_to_delete) + + logger.info("Total rows to be deleted: %s", total_rows) + + next_logging_threshold = 1 + + # Iterate over the IDs in batches + for i in range(0, total_rows, batch_size): + batch_ids = ids_to_delete[i : i + batch_size] + + # Delete the selected batch using IN clause + result = db.session.execute(sa.delete(Query).where(Query.id.in_(batch_ids))) + + # Update the total number of deleted records + total_deleted += result.rowcount + + # Explicitly commit the transaction given that if an error occurs, we want to ensure that the + # records that have been deleted so far are committed + db.session.commit() + + # Log the number of deleted records every 1% increase in progress + percentage_complete = (total_deleted / total_rows) * 100 + if percentage_complete >= next_logging_threshold: + logger.info( + "Deleted %s rows from the query table older than %s days (%d%% complete)", + total_deleted, + self.retention_period_days, + percentage_complete, + ) + next_logging_threshold += 1 + + elapsed_time = time.time() - start_time + minutes, seconds = divmod(elapsed_time, 60) + formatted_time = f"{int(minutes):02}:{int(seconds):02}" + logger.info( + "Pruning complete: %s rows deleted in %s", total_deleted, formatted_time + ) + + def validate(self) -> None: + pass diff --git a/superset/config.py b/superset/config.py index a39dc432c3712..7e74e930efad8 100644 --- a/superset/config.py +++ b/superset/config.py @@ -999,6 +999,12 @@ class CeleryConfig: # pylint: disable=too-few-public-methods "task": "reports.prune_log", "schedule": crontab(minute=0, hour=0), }, + # Uncomment to enable pruning of the query table + # "prune_query": { + # "task": "prune_query", + # "schedule": crontab(minute=0, hour=0, day_of_month=1), + # "options": {"retention_period_days": 180}, + # }, } diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 652d556e8909b..ce963eff64465 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -25,6 +25,7 @@ from superset.commands.report.exceptions import ReportScheduleUnexpectedError from superset.commands.report.execute import AsyncExecuteReportScheduleCommand from superset.commands.report.log_prune import AsyncPruneReportScheduleLogCommand +from superset.commands.sql_lab.query import QueryPruneCommand from superset.daos.report import ReportScheduleDAO from superset.extensions import celery_app from superset.stats_logger import BaseStatsLogger @@ -119,3 +120,16 @@ def prune_log() -> None: logger.warning("A timeout occurred while pruning report schedule logs: %s", ex) except CommandException: logger.exception("An exception occurred while pruning report schedule logs") + + +@celery_app.task(name="prune_query") +def prune_query() -> None: + stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"] + stats_logger.incr("prune_query") + + try: + QueryPruneCommand( + prune_query.request.properties.get("retention_period_days") + ).run() + except CommandException as ex: + logger.exception("An error occurred while pruning queries: %s", ex)