Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update clearpiperun to use raw SQL #775

Merged
merged 46 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8770187
timing and memory benchmark
shiblisaleheen Jul 25, 2024
f97605c
delete raw initial
shiblisaleheen Jul 26, 2024
03e3266
adding profiler
shiblisaleheen Jul 29, 2024
bf4e91e
optimisation handling exceptions
shiblisaleheen Sep 5, 2024
3297d4f
Added logging
ddobie Sep 11, 2024
4e75b8c
Updated delete_run
ddobie Sep 11, 2024
dcbafe5
Fix syntax errors
ddobie Sep 11, 2024
ee9de93
Disable triggers to see if that fixes speed issues
ddobie Sep 13, 2024
98da705
Remove memory profiling
ddobie Sep 16, 2024
9783865
Reenabled logging
ddobie Sep 20, 2024
bb49f84
Add end of loop logging, remove tqdm
ddobie Sep 20, 2024
3c7c35b
Remove all tqdm, improve logging slightly
ddobie Sep 20, 2024
88738e7
Added timing
ddobie Sep 20, 2024
68b5f94
Fixed tqdm missing
ddobie Sep 20, 2024
2c65e7d
Fix logging
ddobie Sep 20, 2024
b9bec73
Added units to logging
ddobie Sep 20, 2024
9fcb727
specify source id in logging
ddobie Sep 20, 2024
c7026ab
Toggle triggers
ddobie Sep 20, 2024
a781a8a
clean up clearpiperun
ddobie Oct 17, 2024
2aacc61
Other minor updates
ddobie Oct 17, 2024
e4b7c8d
Fix variable name
ddobie Oct 17, 2024
ab25282
Correctly handle images and skyregions that are associated with multi…
ddobie Oct 18, 2024
892c695
PEP8
ddobie Oct 18, 2024
bfbc131
Merge branch 'dev' into adacs_delete_optimisation_ddobie
ddobie Oct 18, 2024
6827654
Updated changelog
ddobie Oct 18, 2024
06c15bb
Remove commented code
ddobie Oct 18, 2024
5b7f2f6
Remove whitespace - don't know why the linter didn't pick this up
ddobie Oct 18, 2024
558aa40
Merge dev
ddobie Oct 21, 2024
2f90ca6
Update vast_pipeline/management/commands/clearpiperun.py
ddobie Oct 23, 2024
8b9007d
Update vast_pipeline/utils/delete_run.py
ddobie Oct 23, 2024
4f8d4ae
Update vast_pipeline/utils/delete_run.py
ddobie Oct 23, 2024
860c44f
Update vast_pipeline/utils/delete_run.py
ddobie Oct 23, 2024
f90b53b
Update vast_pipeline/utils/delete_run.py
ddobie Oct 23, 2024
f904748
Update vast_pipeline/utils/delete_run.py
ddobie Oct 23, 2024
bf72de5
Update vast_pipeline/utils/delete_run.py
ddobie Oct 23, 2024
156c507
Update vast_pipeline/management/commands/clearpiperun.py
ddobie Oct 23, 2024
2e3a699
Update vast_pipeline/management/commands/clearpiperun.py
ddobie Oct 23, 2024
000b92b
Update vast_pipeline/management/commands/clearpiperun.py
ddobie Oct 23, 2024
f5b4bff
Update vast_pipeline/utils/delete_run.py
ddobie Oct 25, 2024
d0248a4
Update vast_pipeline/utils/delete_run.py
ddobie Oct 25, 2024
478ec18
Update vast_pipeline/utils/delete_run.py
ddobie Oct 25, 2024
6df82d2
Update vast_pipeline/utils/delete_run.py
ddobie Oct 25, 2024
14fdc79
Update vast_pipeline/utils/delete_run.py
ddobie Oct 25, 2024
f30959d
Update vast_pipeline/utils/delete_run.py
ddobie Oct 25, 2024
1ce5a65
Fix logging count
ddobie Oct 25, 2024
0b47009
Clean up logging statements
ddobie Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Added

- Added vast_pipeline.utils.delete_run.py to enable deletion of pipeline runs using raw SQL [#775](https://github.com/askap-vast/vast-pipeline/pull/775)

#### Changed

- Updated clearpiperun to delete runs using raw SQL rather than via django [#775](https://github.com/askap-vast/vast-pipeline/pull/775)
- Shortened forced fits measurement names to ensure they fit within the character limits - remove image prefix and limited to 1000 forced fits per source [#734](https://github.com/askap-vast/vast-pipeline/pull/734)
- Cleaned up Code of Conduct including adding Zenodo DOI [#773](https://github.com/askap-vast/vast-pipeline/pull/773)
- Updated changelog release instructions to remove each release having an empty "Unreleased" section at the start [#772](https://github.com/askap-vast/vast-pipeline/pull/772)
Expand All @@ -24,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#775](https://github.com/askap-vast/vast-pipeline/pull/775): fix, feat: Enabled deletion of pipeline runs directly using SQL rather than via django
- [#734](https://github.com/askap-vast/vast-pipeline/pull/734): Shortened forced fits measurement names
- [#773](https://github.com/askap-vast/vast-pipeline/pull/773): docs: Cleaned up Code of Conduct including adding Zenodo DOI
- [#772](https://github.com/askap-vast/vast-pipeline/pull/772): fix, docs: Fixed changelog formatting and updated changelog release instructions
Expand Down
25 changes: 16 additions & 9 deletions vast_pipeline/management/commands/clearpiperun.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
"""
This module defines the command for clearing a run from the database.
"""

import os
import logging
import shutil
Expand All @@ -13,12 +9,13 @@

from vast_pipeline.models import Run
from vast_pipeline.pipeline.forced_extraction import remove_forced_meas
from vast_pipeline.utils.utils import StopWatch
from ..helpers import get_p_run_name

from ...utils.delete_run import delete_pipeline_run_raw_sql

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""
This script is used to clean the data for pipeline run(s).
Expand Down Expand Up @@ -82,7 +79,8 @@ def handle(self, *args, **options) -> None:
Returns:
None
"""
# configure logging
timer = StopWatch()

if options['verbosity'] > 1:
# set root logger to use the DEBUG level
root_logger = logging.getLogger('')
Expand All @@ -97,8 +95,9 @@ def handle(self, *args, **options) -> None:

piperuns = options['piperuns']
flag_all_runs = True if 'clearall' in piperuns else False

if flag_all_runs:
logger.info('clearing all pipeline run in the database')
logger.info('Clearing all pipeline run in the database')
piperuns = list(Run.objects.values_list('name', flat=True))

for piperun in piperuns:
Expand All @@ -108,14 +107,20 @@ def handle(self, *args, **options) -> None:
except Run.DoesNotExist:
raise CommandError(f'Pipeline run {p_run_name} does not exist')

logger.info("Deleting pipeline '%s' from database", p_run_name)
logger.info("Deleting pipeline run '%s' from database", p_run_name)
with transaction.atomic():
p_run.status = 'DEL'
p_run.save()
p_run.delete()

timer.reset()
delete_pipeline_run_raw_sql(p_run)
t = timer.reset()
logger.info("Time to delete run from database: %.2f sec", t)

# remove forced measurements in db if presents
forced_parquets = remove_forced_meas(p_run.path)
t = timer.reset()
logger.info("Time to delete forced measurements: %.2f sec", t)

# Delete parquet or folder eventually
if not options['keep_parquet'] and not options['remove_all']:
Expand All @@ -132,6 +137,8 @@ def handle(self, *args, **options) -> None:
f'Parquet file "{os.path.basename(parquet)}" not existent'
))
pass
t = timer.reset()
logger.info("Time to delete parquet files: %.2f sec", t)

if options['remove_all']:
logger.info('Deleting pipeline folder')
Expand Down
161 changes: 161 additions & 0 deletions vast_pipeline/utils/delete_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import logging

from django.db import connection
from vast_pipeline.utils.utils import StopWatch

logger = logging.getLogger(__name__)

def _run_raw_sql(command, cursor, debug=False, log=True):
if log:
if debug:
logger.debug("Running %s", command)
else:
logger.info("Running %s", command)
cursor.execute(command)

return

def delete_pipeline_run_raw_sql(p_run):
p_run_id = p_run.pk

with connection.cursor() as cursor:
# Disable triggers
#sql_cmd = "ALTER TABLE vast_pipeline_source DISABLE TRIGGER ALL;"
#_run_raw_sql(sql_cmd, cursor)

# Fetch source IDs associated with the pipeline run
sql_cmd = f"SELECT id FROM vast_pipeline_source WHERE run_id = {p_run_id};"
_run_raw_sql(sql_cmd, cursor)
source_ids = cursor.fetchall()

# Iterate over each source ID and delete related information
n_source_ids = len(source_ids)
logger.info("Iterating over %d sources to delete tags and relations", n_source_ids)
timer = StopWatch()
for i, source_id_tuple in enumerate(source_ids):
source_id = source_id_tuple[0]

# Delete entries from vast_pipeline_sourcefav for each source_id
sql_cmd = f"DELETE FROM vast_pipeline_sourcefav WHERE source_id = {source_id};"
_run_raw_sql(sql_cmd, cursor, log=False)

# Find source tags related to the source and delete them
sql_cmd = f"SELECT tagulous_source_tags_id FROM vast_pipeline_source_tags WHERE source_id = {source_id};"
_run_raw_sql(sql_cmd, cursor, log=False)
tagulous_source_tags_ids = cursor.fetchall()

for tagulous_source_tags_id_tuple in tagulous_source_tags_ids:
tagulous_source_tags_id = tagulous_source_tags_id_tuple[0]
sql_cmd = f"DELETE FROM vast_pipeline_tagulous_source_tags WHERE id = {tagulous_source_tags_id};"
_run_raw_sql(sql_cmd, cursor, log=False)

# Delete from vast_pipeline_source_tags for the source_id
sql_cmd = f"DELETE FROM vast_pipeline_source_tags WHERE source_id = {source_id};"
_run_raw_sql(sql_cmd, cursor, log=False)

# Delete from related source
sql_cmd = f"DELETE FROM vast_pipeline_relatedsource WHERE from_source_id = {source_id};"
_run_raw_sql(sql_cmd, cursor, log=False)
sql_cmd = f"DELETE FROM vast_pipeline_relatedsource WHERE to_source_id = {source_id};"
_run_raw_sql(sql_cmd, cursor, log=False)

sql_cmd = f"DELETE FROM vast_pipeline_association WHERE source_id = {source_id};"
_run_raw_sql(sql_cmd, cursor, log=False)

if i % 1000 == 0:
logger.info("Finished source id %d (%d of %d)", source_id, i, n_source_ids)
t = timer.reset()
logger.info(f"Time to iterate over %d source ids: %f seconds", n_source_ids, t)

# Delete source
sql_cmd = f"DELETE FROM vast_pipeline_source WHERE run_id = {p_run_id};"
_run_raw_sql(sql_cmd, cursor)

# Enable triggers
#sql_cmd = "ALTER TABLE vast_pipeline_source ENABLE TRIGGER ALL;"
#_run_raw_sql(sql_cmd, cursor)

# Delete comments
sql_cmd = f"DELETE FROM vast_pipeline_comment WHERE object_id = {p_run_id};"
_run_raw_sql(sql_cmd, cursor)

# Fetch image IDs associated with the pipeline run
sql_cmd = f"SELECT image_id FROM vast_pipeline_image_run WHERE run_id = {p_run_id};"
_run_raw_sql(sql_cmd, cursor)
image_ids = cursor.fetchall()

# Iterate over each image ID and delete related information
n_image_ids = len(image_ids)
logger.info("Iterating over %d images to delete measurements and images", n_image_ids)
timer.reset()

for image_id_tuple in image_ids:
image_id = image_id_tuple[0]

# Check if the Image is associated with more than one run
sql_cmd = f"SELECT COUNT(*) FROM vast_pipeline_image_run WHERE image_id={image_id};"
_run_raw_sql(sql_cmd, cursor)
num_occurences = cursor.fetchone()[0]

# Delete the link between the run and the image
sql_cmd = f"DELETE FROM vast_pipeline_image_run WHERE image_id = {image_id} AND run_id = {p_run_id};"
_run_raw_sql(sql_cmd, cursor)

# If the image is associated with more than one run, do not delete the image.
if num_occurences > 1:
logger.debug("image_id %d is referenced by %d other pipeline runs, not deleting", image_id, num_occurences-1)
continue

try:
sql_cmd = f"DELETE FROM vast_pipeline_measurement WHERE image_id = {image_id};"
_run_raw_sql(sql_cmd, cursor)
except Exception as e:
logger.error("%s %d", e, image_id)
pass

try:
sql_cmd = f"DELETE FROM vast_pipeline_image WHERE id = {image_id};"
_run_raw_sql(sql_cmd, cursor)
except Exception as e:
logger.error("%s %d", e, image_id)
pass
t = timer.reset()
logger.info("Time to iterate over %d image ids: %f seconds", n_image_ids, t)

# Fetch skyregion IDs associated with the pipeline run
sql_cmd = f"SELECT skyregion_id FROM vast_pipeline_skyregion_run WHERE run_id = {p_run_id};"
_run_raw_sql(sql_cmd, cursor, debug=True)
sky_ids = cursor.fetchall()

# Iterate over each skyregion ID and delete related information
n_sky_ids = len(sky_ids)
logger.info("Iterating over %d skyregion IDs to delete skyregions", n_sky_ids)
timer.reset()
for sky_id_tuple in sky_ids:
sky_id = sky_id_tuple[0]

# Check if the Image is associated with more than one run
sql_cmd = f"SELECT COUNT(*) FROM vast_pipeline_skyregion_run WHERE skyregion_id={sky_id};"
_run_raw_sql(sql_cmd, cursor)
num_occurences = cursor.fetchone()[0]

sql_cmd = f"DELETE FROM vast_pipeline_skyregion_run WHERE skyregion_id = {sky_id} AND run_id = {p_run_id};"
_run_raw_sql(sql_cmd, cursor)

# If the skyregion is associated with more than one run, do not delete the skyregion.
if num_occurences > 1:
logger.debug("skyregion_id %d is referenced by %d other pipeline runs, not deleting", image_id, num_occurences-1)
continue

try:
sql_cmd = f"DELETE FROM vast_pipeline_skyregion WHERE id = {sky_id};"
_run_raw_sql(sql_cmd, cursor)
except Exception as e:
logger.error("%s %d", e, sky_id)
pass
t = timer.reset()
logger.info("Time to iterate over %d sky ids: %f seconds", n_sky_ids, t)

# Finally delete the pipeline run
sql_cmd = f"DELETE FROM vast_pipeline_run WHERE id = {p_run_id};"
_run_raw_sql(sql_cmd, cursor)
Loading