From 88fb30e0da7a2f1340447e4cc3ae98042ddf9ea9 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 19 Mar 2020 13:03:09 -0400 Subject: [PATCH] Delete jobs without loading objects first The commit is intended to speed up the cleanup_jobs command in awx. Old methods takes 7+ hours to delete 1 million old jobs. New method takes around 6 minutes. Leverages a sub-classed Collector, called AWXCollector, that does not load in objects before deleting them. Instead querysets, which are lazily evaluated, are used in places where Collector normally keeps a list of objects. Finally, a couple of tests to ensure parity between old Collector and AWXCollector. That is, any object that is updated/removed from the database using Collector should be have identical operations using AWXCollector. tower issue 1103 --- awx/main/management/commands/cleanup_jobs.py | 46 +++-- awx/main/management/commands/deletion.py | 177 +++++++++++++++++ .../functional/commands/test_cleanup_jobs.py | 179 ++++++++++++++++++ 3 files changed, 385 insertions(+), 17 deletions(-) create mode 100644 awx/main/management/commands/deletion.py create mode 100644 awx/main/tests/functional/commands/test_cleanup_jobs.py diff --git a/awx/main/management/commands/cleanup_jobs.py b/awx/main/management/commands/cleanup_jobs.py index 511072834abf..dd6304e24201 100644 --- a/awx/main/management/commands/cleanup_jobs.py +++ b/awx/main/management/commands/cleanup_jobs.py @@ -21,6 +21,8 @@ disable_computed_fields ) +from awx.main.management.commands.deletion import AWXCollector, pre_delete + class Command(BaseCommand): ''' @@ -57,27 +59,37 @@ def add_arguments(self, parser): action='store_true', dest='only_workflow_jobs', help='Remove workflow jobs') + def cleanup_jobs(self): - #jobs_qs = Job.objects.exclude(status__in=('pending', 'running')) - #jobs_qs = jobs_qs.filter(created__lte=self.cutoff) skipped, deleted = 0, 0 - jobs = Job.objects.filter(created__lt=self.cutoff) - for job in jobs.iterator(): - job_display = '"%s" (%d host summaries, %d events)' % \ - (str(job), - job.job_host_summaries.count(), job.job_events.count()) - if job.status in ('pending', 'waiting', 'running'): - action_text = 'would skip' if self.dry_run else 'skipping' - self.logger.debug('%s %s job %s', action_text, job.status, job_display) - skipped += 1 + + batch_size = 1000000 + + while True: + # get queryset for available jobs to remove + qs = Job.objects.filter(created__lt=self.cutoff).exclude(status__in=['pending', 'waiting', 'running']) + # get pk list for the first N (batch_size) objects + pk_list = qs[0:batch_size].values_list('pk') + # You cannot delete queries with sql LIMIT set, so we must + # create a new query from this pk_list + qs_batch = Job.objects.filter(pk__in=pk_list) + just_deleted = 0 + if not self.dry_run: + del_query = pre_delete(qs_batch) + collector = AWXCollector(del_query.db) + collector.collect(del_query) + _, models_deleted = collector.delete() + if models_deleted: + just_deleted = models_deleted['main.Job'] + deleted += just_deleted else: - action_text = 'would delete' if self.dry_run else 'deleting' - self.logger.info('%s %s', action_text, job_display) - if not self.dry_run: - job.delete() - deleted += 1 + just_deleted = 0 # break from loop, this is dry run + deleted = qs.count() + + if just_deleted == 0: + break - skipped += Job.objects.filter(created__gte=self.cutoff).count() + skipped += (Job.objects.filter(created__gte=self.cutoff) | Job.objects.filter(status__in=['pending', 'waiting', 'running'])).count() return skipped, deleted def cleanup_ad_hoc_commands(self): diff --git a/awx/main/management/commands/deletion.py b/awx/main/management/commands/deletion.py new file mode 100644 index 000000000000..8fc78d540f23 --- /dev/null +++ b/awx/main/management/commands/deletion.py @@ -0,0 +1,177 @@ +from django.contrib.contenttypes.models import ContentType +from django.db.models.deletion import ( + DO_NOTHING, Collector, get_candidate_relations_to_delete, +) +from collections import Counter, OrderedDict +from django.db import transaction +from django.db.models import sql + + +def bulk_related_objects(field, objs, using): + # This overrides the method in django.contrib.contenttypes.fields.py + """ + Return all objects related to ``objs`` via this ``GenericRelation``. + """ + return field.remote_field.model._base_manager.db_manager(using).filter(**{ + "%s__pk" % field.content_type_field_name: ContentType.objects.db_manager(using).get_for_model( + field.model, for_concrete_model=field.for_concrete_model).pk, + "%s__in" % field.object_id_field_name: list(objs.values_list('pk', flat=True)) + }) + + +def pre_delete(qs): + # taken from .delete method in django.db.models.query.py + assert qs.query.can_filter(), \ + "Cannot use 'limit' or 'offset' with delete." + + if qs._fields is not None: + raise TypeError("Cannot call delete() after .values() or .values_list()") + + del_query = qs._chain() + + # The delete is actually 2 queries - one to find related objects, + # and one to delete. Make sure that the discovery of related + # objects is performed on the same database as the deletion. + del_query._for_write = True + + # Disable non-supported fields. + del_query.query.select_for_update = False + del_query.query.select_related = False + del_query.query.clear_ordering(force_empty=True) + return del_query + + +class AWXCollector(Collector): + + def add(self, objs, source=None, nullable=False, reverse_dependency=False): + """ + Add 'objs' to the collection of objects to be deleted. If the call is + the result of a cascade, 'source' should be the model that caused it, + and 'nullable' should be set to True if the relation can be null. + + Return a list of all objects that were not already collected. + """ + if not objs.exists(): + return objs + model = objs.model + self.data.setdefault(model, []) + self.data[model].append(objs) + # Nullable relationships can be ignored -- they are nulled out before + # deleting, and therefore do not affect the order in which objects have + # to be deleted. + if source is not None and not nullable: + if reverse_dependency: + source, model = model, source + self.dependencies.setdefault( + source._meta.concrete_model, set()).add(model._meta.concrete_model) + return objs + + def add_field_update(self, field, value, objs): + """ + Schedule a field update. 'objs' must be a homogeneous iterable + collection of model instances (e.g. a QuerySet). + """ + if not objs.exists(): + return + model = objs.model + self.field_updates.setdefault(model, {}) + self.field_updates[model].setdefault((field, value), []) + self.field_updates[model][(field, value)].append(objs) + + def collect(self, objs, source=None, nullable=False, collect_related=True, + source_attr=None, reverse_dependency=False, keep_parents=False): + """ + Add 'objs' to the collection of objects to be deleted as well as all + parent instances. 'objs' must be a homogeneous iterable collection of + model instances (e.g. a QuerySet). If 'collect_related' is True, + related objects will be handled by their respective on_delete handler. + + If the call is the result of a cascade, 'source' should be the model + that caused it and 'nullable' should be set to True, if the relation + can be null. + + If 'reverse_dependency' is True, 'source' will be deleted before the + current model, rather than after. (Needed for cascading to parent + models, the one case in which the cascade follows the forwards + direction of an FK rather than the reverse direction.) + + If 'keep_parents' is True, data of parent model's will be not deleted. + """ + + if hasattr(objs, 'polymorphic_disabled'): + objs.polymorphic_disabled = True + + if self.can_fast_delete(objs): + self.fast_deletes.append(objs) + return + new_objs = self.add(objs, source, nullable, + reverse_dependency=reverse_dependency) + if not new_objs.exists(): + return + + model = new_objs.model + + if not keep_parents: + # Recursively collect concrete model's parent models, but not their + # related objects. These will be found by meta.get_fields() + concrete_model = model._meta.concrete_model + for ptr in concrete_model._meta.parents.keys(): + if ptr: + parent_objs = ptr.objects.filter(pk__in = new_objs.values_list('pk', flat=True)) + self.collect(parent_objs, source=model, + collect_related=False, + reverse_dependency=True) + if collect_related: + parents = model._meta.parents + for related in get_candidate_relations_to_delete(model._meta): + # Preserve parent reverse relationships if keep_parents=True. + if keep_parents and related.model in parents: + continue + field = related.field + if field.remote_field.on_delete == DO_NOTHING: + continue + related_qs = self.related_objects(related, new_objs) + if self.can_fast_delete(related_qs, from_field=field): + self.fast_deletes.append(related_qs) + elif related_qs: + field.remote_field.on_delete(self, field, related_qs, self.using) + for field in model._meta.private_fields: + if hasattr(field, 'bulk_related_objects'): + # It's something like generic foreign key. + sub_objs = bulk_related_objects(field, new_objs, self.using) + self.collect(sub_objs, source=model, nullable=True) + + def delete(self): + self.sort() + + # collect pk_list before deletion (once things start to delete + # queries might not be able to retreive pk list) + del_dict = OrderedDict() + for model, instances in self.data.items(): + del_dict.setdefault(model, []) + for inst in instances: + del_dict[model] += list(inst.values_list('pk', flat=True)) + + deleted_counter = Counter() + + with transaction.atomic(using=self.using, savepoint=False): + + # update fields + for model, instances_for_fieldvalues in self.field_updates.items(): + for (field, value), instances in instances_for_fieldvalues.items(): + for inst in instances: + query = sql.UpdateQuery(model) + query.update_batch(inst.values_list('pk', flat=True), + {field.name: value}, self.using) + # fast deletes + for qs in self.fast_deletes: + count = qs._raw_delete(using=self.using) + deleted_counter[qs.model._meta.label] += count + + # delete instances + for model, pk_list in del_dict.items(): + query = sql.DeleteQuery(model) + count = query.delete_batch(pk_list, self.using) + deleted_counter[model._meta.label] += count + + return sum(deleted_counter.values()), dict(deleted_counter) diff --git a/awx/main/tests/functional/commands/test_cleanup_jobs.py b/awx/main/tests/functional/commands/test_cleanup_jobs.py new file mode 100644 index 000000000000..ed506983383a --- /dev/null +++ b/awx/main/tests/functional/commands/test_cleanup_jobs.py @@ -0,0 +1,179 @@ +import pytest +from datetime import datetime, timedelta +from pytz import timezone +from collections import OrderedDict + +from django.db.models.deletion import Collector, SET_NULL, CASCADE +from django.core.management import call_command + +from awx.main.management.commands.deletion import AWXCollector +from awx.main.models import ( + JobTemplate, User, Job, JobEvent, Notification, + WorkflowJobNode, JobHostSummary +) + + +@pytest.fixture +def setup_environment(inventory, project, machine_credential, host, notification_template, label): + ''' + Create old jobs and new jobs, with various other objects to hit the + related fields of Jobs. This makes sure on_delete() effects are tested + properly. + ''' + old_jobs = [] + new_jobs = [] + days = 10 + days_str = str(days) + + jt = JobTemplate.objects.create(name='testjt', inventory=inventory, project=project) + jt.credentials.add(machine_credential) + jt_user = User.objects.create(username='jobtemplateuser') + jt.execute_role.members.add(jt_user) + + notification = Notification() + notification.notification_template = notification_template + notification.save() + + for i in range(3): + job1 = jt.create_job() + job1.created =datetime.now(tz=timezone('UTC')) + job1.save() + # create jobs with current time + JobEvent.create_from_data(job_id=job1.pk, uuid='abc123', event='runner_on_start', + stdout='a' * 1025).save() + new_jobs.append(job1) + + job2 = jt.create_job() + # create jobs 10 days ago + job2.created = datetime.now(tz=timezone('UTC')) - timedelta(days=days) + job2.save() + job2.dependent_jobs.add(job1) + JobEvent.create_from_data(job_id=job2.pk, uuid='abc123', event='runner_on_start', + stdout='a' * 1025).save() + old_jobs.append(job2) + + jt.last_job = job2 + jt.current_job = job2 + jt.save() + host.last_job = job2 + host.save() + notification.unifiedjob_notifications.add(job2) + label.unifiedjob_labels.add(job2) + jn = WorkflowJobNode.objects.create(job=job2) + jn.save() + jh = JobHostSummary.objects.create(job=job2) + jh.save() + + return (old_jobs, new_jobs, days_str) + + +@pytest.mark.django_db +def test_cleanup_jobs(setup_environment): + (old_jobs, new_jobs, days_str) = setup_environment + + # related_fields + related = [f for f in Job._meta.get_fields(include_hidden=True) + if f.auto_created and not + f.concrete and + (f.one_to_one or f.one_to_many)] + + job = old_jobs[-1] # last job + + # gather related objects for job + related_should_be_removed = {} + related_should_be_null = {} + for r in related: + qs = r.related_model._base_manager.using('default').filter( + **{"%s__in" % r.field.name: [job.pk]} + ) + if qs.exists(): + if r.field.remote_field.on_delete == CASCADE: + related_should_be_removed[qs.model] = set(qs.values_list('pk', flat=True)) + if r.field.remote_field.on_delete == SET_NULL: + related_should_be_null[(qs.model,r.field.name)] = set(qs.values_list('pk', flat=True)) + + assert related_should_be_removed + assert related_should_be_null + + call_command('cleanup_jobs', '--days', days_str) + # make sure old jobs are removed + assert not Job.objects.filter(pk__in=[obj.pk for obj in old_jobs]).exists() + + # make sure new jobs are untouched + assert len(new_jobs) == Job.objects.filter(pk__in=[obj.pk for obj in new_jobs]).count() + + # make sure related objects are destroyed or set to NULL (none) + for model, values in related_should_be_removed.items(): + assert not model.objects.filter(pk__in=values).exists() + + for (model,fieldname), values in related_should_be_null.items(): + for v in values: + assert not getattr(model.objects.get(pk=v), fieldname) + + +@pytest.mark.django_db +def test_awxcollector(setup_environment): + ''' + Efforts to improve the performance of cleanup_jobs involved + sub-classing the django Collector class. This unit test will + check for parity between the django Collector and the modified + AWXCollector class. AWXCollector is used in cleanup_jobs to + bulk-delete old jobs from the database. + + Specifically, Collector has four dictionaries to check: + .dependencies, .data, .fast_deletes, and .field_updates + + These tests will convert each dictionary from AWXCollector + (after running .collect on jobs), from querysets to sets of + objects. The final result should be a dictionary that is + equivalent to django's Collector. + ''' + + (old_jobs, new_jobs, days_str) = setup_environment + collector = Collector('default') + collector.collect(old_jobs) + + awx_col = AWXCollector('default') + # awx_col accepts a queryset as input + awx_col.collect(Job.objects.filter(pk__in=[obj.pk for obj in old_jobs])) + + # check that dependencies are the same + assert awx_col.dependencies == collector.dependencies + + # check that objects to delete are the same + awx_del_dict = OrderedDict() + for model, instances in awx_col.data.items(): + awx_del_dict.setdefault(model, set()) + for inst in instances: + # .update() will put each object in a queryset into the set + awx_del_dict[model].update(inst) + assert awx_del_dict == collector.data + + # check that field updates are the same + awx_del_dict = OrderedDict() + for model, instances_for_fieldvalues in awx_col.field_updates.items(): + awx_del_dict.setdefault(model, {}) + for (field, value), instances in instances_for_fieldvalues.items(): + awx_del_dict[model].setdefault((field,value), set()) + for inst in instances: + awx_del_dict[model][(field,value)].update(inst) + + # collector field updates don't use the base (polymorphic parent) model, e.g. + # it will use JobTemplate instead of UnifiedJobTemplate. Therefore, + # we need to rebuild the dictionary and grab the model from the field + collector_del_dict = OrderedDict() + for model, instances_for_fieldvalues in collector.field_updates.items(): + for (field,value), instances in instances_for_fieldvalues.items(): + collector_del_dict.setdefault(field.model, {}) + collector_del_dict[field.model][(field, value)] = collector.field_updates[model][(field,value)] + assert awx_del_dict == collector_del_dict + + # check that fast deletes are the same + collector_fast_deletes = set() + for q in collector.fast_deletes: + collector_fast_deletes.update(q) + + awx_col_fast_deletes = set() + for q in awx_col.fast_deletes: + awx_col_fast_deletes.update(q) + assert collector_fast_deletes == awx_col_fast_deletes