diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py index 170a6ada3e336f..f9a00d7f009058 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py @@ -141,7 +141,9 @@ def _scroll_execution_requests( break if self.report.ergc_read_errors >= self.config.max_read_errors: self.report.failure( - f"ergc({self.instance_id}): too many read errors, aborting." + title="Too many read errors, aborting", + message="Too many read errors, aborting", + context=str(self.instance_id), ) break try: @@ -158,8 +160,11 @@ def _scroll_execution_requests( break params["scrollId"] = document["scrollId"] except Exception as e: - logger.error( - f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}" + self.report.failure( + title="Failed to fetch next batch of execution requests", + message="Failed to fetch next batch of execution requests", + context=str(self.instance_id), + exc=e, ) self.report.ergc_read_errors += 1 @@ -231,8 +236,11 @@ def _delete_entry(self, entry: CleanupRecord) -> None: self.graph.delete_entity(entry.urn, True) except Exception as e: self.report.ergc_delete_errors += 1 - logger.error( - f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}" + self.report.failure( + title="Failed to delete ExecutionRequest", + message="Failed to delete ExecutionRequest", + context=str(self.instance_id), + exc=e, ) def _reached_runtime_limit(self) -> bool: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 4c0355834f9b4f..cf810d05aa2ca1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -105,6 +105,8 @@ class SoftDeletedEntitiesReport(SourceReport): sample_hard_deleted_aspects_by_type: TopKDict[str, LossyList[str]] = field( default_factory=TopKDict ) + runtime_limit_reached: bool = False + deletion_limit_reached: bool = False class SoftDeletedEntitiesCleanup: @@ -163,6 +165,8 @@ def delete_entity(self, urn: str) -> None: f"Dry run is on otherwise it would have deleted {urn} with hard deletion" ) return + if self._deletion_limit_reached() or self._times_up(): + return self._increment_removal_started_count() self.ctx.graph.delete_entity(urn=urn, hard=True) self.ctx.graph.delete_references_to_urn( @@ -203,11 +207,10 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]: for future in done: self._print_report() if future.exception(): - logger.error( - f"Failed to delete entity {futures[future]}: {future.exception()}" - ) self.report.failure( - f"Failed to delete entity {futures[future]}", + title="Failed to delete entity", + message="Failed to delete entity", + context=futures[future], exc=future.exception(), ) self.report.num_soft_deleted_entity_processed += 1 @@ -274,6 +277,26 @@ def _get_urns(self) -> Iterable[str]: ) yield from self._get_soft_deleted_queries() + def _times_up(self) -> bool: + if ( + self.config.runtime_limit_seconds + and time.time() - self.start_time > self.config.runtime_limit_seconds + ): + with self._report_lock: + self.report.runtime_limit_reached = True + return True + return False + + def _deletion_limit_reached(self) -> bool: + if ( + self.config.limit_entities_delete + and self.report.num_hard_deleted > self.config.limit_entities_delete + ): + with self._report_lock: + self.report.deletion_limit_reached = True + return True + return False + def cleanup_soft_deleted_entities(self) -> None: if not self.config.enabled: return @@ -285,24 +308,8 @@ def cleanup_soft_deleted_entities(self) -> None: self._print_report() while len(futures) >= self.config.futures_max_at_time: futures = self._process_futures(futures) - if ( - self.config.limit_entities_delete - and self.report.num_hard_deleted > self.config.limit_entities_delete - ): - logger.info( - f"Limit of {self.config.limit_entities_delete} entities reached. Stopped adding more." - ) + if self._deletion_limit_reached() or self._times_up(): break - if ( - self.config.runtime_limit_seconds - and time.time() - self.start_time - > self.config.runtime_limit_seconds - ): - logger.info( - f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached. Not submitting more futures." - ) - break - future = executor.submit(self.delete_soft_deleted_entity, urn) futures[future] = urn diff --git a/metadata-ingestion/tests/unit/test_gc.py b/metadata-ingestion/tests/unit/test_gc.py index 8f00d5e064db85..fde9a3f2e0cf03 100644 --- a/metadata-ingestion/tests/unit/test_gc.py +++ b/metadata-ingestion/tests/unit/test_gc.py @@ -9,6 +9,34 @@ DataProcessCleanupConfig, DataProcessCleanupReport, ) +from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import ( + SoftDeletedEntitiesCleanup, + SoftDeletedEntitiesCleanupConfig, + SoftDeletedEntitiesReport, +) + + +class TestSoftDeletedEntitiesCleanup(unittest.TestCase): + def setUp(self): + self.ctx = PipelineContext(run_id="test_run") + self.ctx.graph = MagicMock() + self.config = SoftDeletedEntitiesCleanupConfig() + self.report = SoftDeletedEntitiesReport() + self.cleanup = SoftDeletedEntitiesCleanup( + self.ctx, self.config, self.report, dry_run=True + ) + + def test_update_report(self): + self.cleanup._update_report( + urn="urn:li:dataset:1", + entity_type="dataset", + ) + self.assertEqual(1, self.report.num_hard_deleted) + self.assertEqual(1, self.report.num_hard_deleted_by_type["dataset"]) + + def test_increment_retained_count(self): + self.cleanup._increment_retained_count() + self.assertEqual(1, self.report.num_soft_deleted_retained_due_to_age) class TestDataProcessCleanup(unittest.TestCase):