Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
Added delete_orphan_task_logs CLI command
Browse files Browse the repository at this point in the history
It fetches the ID and the Task ID for *every* task log entry, and performs
a 'does this task exist' check in Python. The logic is in Python because I
couldn't find a MongoDB query that allows "foreign key" lookups and delete
documents where this cannot find the referenced document.
  • Loading branch information
sybrenstuvel committed Mar 19, 2020
1 parent 99fcd26 commit ac9d065
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 0 deletions.
73 changes: 73 additions & 0 deletions flamenco/cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Commandline interface for Flamenco."""

import logging
from typing import Optional, Dict

from flask import current_app
from flask_script import Manager
from bson import ObjectId

from pillar.cli import manager
from pillar.api.utils import authentication, str2id
Expand Down Expand Up @@ -213,3 +215,74 @@ def runability_check():

log.info('Creating Celery background tasks for runability checks of jobs')
job_runnability_check.schedule_checks()


@manager_flamenco.command
def delete_orphan_task_logs():
"""Remove all task log entries for non-existant tasks."""
from flamenco import current_flamenco

removal_batch_size = 1000

tasks_coll = current_flamenco.db('tasks')
logs_coll = current_flamenco.db('task_logs')

logs_count = logs_coll.estimated_document_count()
log.info('Removing orphan task logs. Estimated log count before removal: %d', logs_count)

task_exists: Dict[str, bool] = {}

def check_task_exists(task_id: Optional[ObjectId]) -> bool:
if not task_id:
# It's MongoDB, you never know for sure.
return False

exists: Optional[bool] = task_exists.get(task_id)
if exists is not None:
return exists

exists = tasks_coll.count_documents({'_id': task_id}) > 0
task_exists[task_id] = exists
return exists

logs_seen = 0
logs_removed = 0

cursor = logs_coll.find(filter={},
projection={
'_id': True,
'task': True,
},
batch_size=10000,
comment='Orphan task log cleanup'
)
to_remove = []

def remove_log_batch():
nonlocal logs_removed
delete_result = logs_coll.delete_many({'_id': {'$in': to_remove}})
log.info(" deleted %d orphan task log entries", delete_result.deleted_count)
logs_removed += delete_result.deleted_count
to_remove.clear()

try:
for task_log in cursor:
logs_seen += 1
if check_task_exists(task_log.get('task')):
continue

# Batch up removals.
to_remove.append(task_log['_id'])
if len(to_remove) < removal_batch_size:
continue

remove_log_batch()
except KeyboardInterrupt:
log.info('Received keyboard interrupt, removing %d more log entries and stopping',
len(to_remove))

if to_remove:
remove_log_batch()

log.info('Deleted %d orphan task logs in total, estimated %d task log entries remaining',
logs_removed, logs_coll.estimated_document_count())
73 changes: 73 additions & 0 deletions tests/test_job_archival.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,76 @@ def test_resume_archiving(self, mock_archive_job):

mock_archive_job.delay.assert_called_once()
mock_archive_job.delay.assert_called_with(str(self.job1_id))


class CleanOrphanLogsTest(AbstractJobArchivalTest):
"""Test cli.delete_orphan_task_logs.
This test is here in and not in a CLI module test file, because it performs the same kind of
functionality as the job_archival module and requires the same kind of test setup.
"""
TASK_COUNT = 4
UPDATES_PER_TASK = 3
ORPHAN_COUNT = 1700

def setUp(self, **kwargs):
super().setUp(**kwargs)

self.job_id = self.create_job()
self.task_ids = [bson.ObjectId(t['_id'])
for t in self.do_schedule_tasks()]
self.enter_app_context()

def _perform_task_updates(self):
"""Send some log entries."""

for batch_idx in range(3):
now = datetime.datetime.now(tz=bson.tz_util.utc)
update_batch = [
{'_id': str(bson.ObjectId()),
'task_id': str(task_id),
'activity': f'testing logging batch {batch_idx}',
'log': 40 * f'This is batch {batch_idx} mülti→line log entry\n',
'received_on_manager': now}
for task_id in self.task_ids
]
self.post(f'/api/flamenco/managers/{self.mngr_id}/task-update-batch',
json=update_batch,
auth_token=self.mngr_token)

def test_orphan_log_cleanup(self):
from flamenco import cli

logs_coll = self.flamenco.db('task_logs')

# Create some genuine log entries.
self._perform_task_updates()
self.assertEqual(self.TASK_COUNT * self.UPDATES_PER_TASK, logs_coll.count_documents({}))

# Create some orphan log entries.
for orphan_idx in range(self.ORPHAN_COUNT):
now = datetime.datetime.now(tz=bson.tz_util.utc)
logs_coll.insert_one({
"task": bson.ObjectId(),
"received_on_manager": now,
"log": f"{now}: orphan log entry #{orphan_idx}"
})

tasks_coll = self.flamenco.db('tasks')
logs_coll = self.flamenco.db('task_logs')

# Count all the data pre-cleanup.
self.assertEqual(self.TASK_COUNT, tasks_coll.count_documents({'job': self.job_id}))
self.assertEqual(self.TASK_COUNT * self.UPDATES_PER_TASK,
logs_coll.count_documents({'task': {'$in': self.task_ids}}))
self.assertEqual(self.TASK_COUNT * self.UPDATES_PER_TASK + self.ORPHAN_COUNT,
logs_coll.count_documents({}))

cli.delete_orphan_task_logs()

# After the cleanup only the orphan log entries should have been removed.
self.assertEqual(self.TASK_COUNT, tasks_coll.count_documents({'job': self.job_id}))
self.assertEqual(self.TASK_COUNT * self.UPDATES_PER_TASK,
logs_coll.count_documents({'task': {'$in': self.task_ids}}))
self.assertEqual(self.TASK_COUNT * self.UPDATES_PER_TASK,
logs_coll.count_documents({}))

0 comments on commit ac9d065

Please sign in to comment.