Skip to content

Commit

Permalink
fix(ingest/gc): logging and stopping fix (#12266)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 3, 2025
1 parent b76db33 commit f9e2c49
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ def _scroll_execution_requests(
break
if self.report.ergc_read_errors >= self.config.max_read_errors:
self.report.failure(
f"ergc({self.instance_id}): too many read errors, aborting."
title="Too many read errors, aborting",
message="Too many read errors, aborting",
context=str(self.instance_id),
)
break
try:
Expand All @@ -158,8 +160,11 @@ def _scroll_execution_requests(
break
params["scrollId"] = document["scrollId"]
except Exception as e:
logger.error(
f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
self.report.failure(
title="Failed to fetch next batch of execution requests",
message="Failed to fetch next batch of execution requests",
context=str(self.instance_id),
exc=e,
)
self.report.ergc_read_errors += 1

Expand Down Expand Up @@ -231,8 +236,11 @@ def _delete_entry(self, entry: CleanupRecord) -> None:
self.graph.delete_entity(entry.urn, True)
except Exception as e:
self.report.ergc_delete_errors += 1
logger.error(
f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
self.report.failure(
title="Failed to delete ExecutionRequest",
message="Failed to delete ExecutionRequest",
context=str(self.instance_id),
exc=e,
)

def _reached_runtime_limit(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class SoftDeletedEntitiesReport(SourceReport):
sample_hard_deleted_aspects_by_type: TopKDict[str, LossyList[str]] = field(
default_factory=TopKDict
)
runtime_limit_reached: bool = False
deletion_limit_reached: bool = False


class SoftDeletedEntitiesCleanup:
Expand Down Expand Up @@ -163,6 +165,8 @@ def delete_entity(self, urn: str) -> None:
f"Dry run is on otherwise it would have deleted {urn} with hard deletion"
)
return
if self._deletion_limit_reached() or self._times_up():
return
self._increment_removal_started_count()
self.ctx.graph.delete_entity(urn=urn, hard=True)
self.ctx.graph.delete_references_to_urn(
Expand Down Expand Up @@ -203,11 +207,10 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]:
for future in done:
self._print_report()
if future.exception():
logger.error(
f"Failed to delete entity {futures[future]}: {future.exception()}"
)
self.report.failure(
f"Failed to delete entity {futures[future]}",
title="Failed to delete entity",
message="Failed to delete entity",
context=futures[future],
exc=future.exception(),
)
self.report.num_soft_deleted_entity_processed += 1
Expand Down Expand Up @@ -274,6 +277,26 @@ def _get_urns(self) -> Iterable[str]:
)
yield from self._get_soft_deleted_queries()

def _times_up(self) -> bool:
if (
self.config.runtime_limit_seconds
and time.time() - self.start_time > self.config.runtime_limit_seconds
):
with self._report_lock:
self.report.runtime_limit_reached = True
return True
return False

def _deletion_limit_reached(self) -> bool:
if (
self.config.limit_entities_delete
and self.report.num_hard_deleted > self.config.limit_entities_delete
):
with self._report_lock:
self.report.deletion_limit_reached = True
return True
return False

def cleanup_soft_deleted_entities(self) -> None:
if not self.config.enabled:
return
Expand All @@ -285,24 +308,8 @@ def cleanup_soft_deleted_entities(self) -> None:
self._print_report()
while len(futures) >= self.config.futures_max_at_time:
futures = self._process_futures(futures)
if (
self.config.limit_entities_delete
and self.report.num_hard_deleted > self.config.limit_entities_delete
):
logger.info(
f"Limit of {self.config.limit_entities_delete} entities reached. Stopped adding more."
)
if self._deletion_limit_reached() or self._times_up():
break
if (
self.config.runtime_limit_seconds
and time.time() - self.start_time
> self.config.runtime_limit_seconds
):
logger.info(
f"Runtime limit of {self.config.runtime_limit_seconds} seconds reached. Not submitting more futures."
)
break

future = executor.submit(self.delete_soft_deleted_entity, urn)
futures[future] = urn

Expand Down
28 changes: 28 additions & 0 deletions metadata-ingestion/tests/unit/test_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,34 @@
DataProcessCleanupConfig,
DataProcessCleanupReport,
)
from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import (
SoftDeletedEntitiesCleanup,
SoftDeletedEntitiesCleanupConfig,
SoftDeletedEntitiesReport,
)


class TestSoftDeletedEntitiesCleanup(unittest.TestCase):
def setUp(self):
self.ctx = PipelineContext(run_id="test_run")
self.ctx.graph = MagicMock()
self.config = SoftDeletedEntitiesCleanupConfig()
self.report = SoftDeletedEntitiesReport()
self.cleanup = SoftDeletedEntitiesCleanup(
self.ctx, self.config, self.report, dry_run=True
)

def test_update_report(self):
self.cleanup._update_report(
urn="urn:li:dataset:1",
entity_type="dataset",
)
self.assertEqual(1, self.report.num_hard_deleted)
self.assertEqual(1, self.report.num_hard_deleted_by_type["dataset"])

def test_increment_retained_count(self):
self.cleanup._increment_retained_count()
self.assertEqual(1, self.report.num_soft_deleted_retained_due_to_age)


class TestDataProcessCleanup(unittest.TestCase):
Expand Down

0 comments on commit f9e2c49

Please sign in to comment.