From b5110db2eb1f5acb86897dc17b2a2865c5855806 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 2 Dec 2024 17:44:31 +0530 Subject: [PATCH 1/2] fix(ingest/gc): delete invalid dpis --- .../ingestion/source/gc/dataprocess_cleanup.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 0f35e1a67fede7..20a3ac8d5a565e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -277,7 +277,17 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None: assert self.ctx.graph dpis = self.fetch_dpis(job.urn, self.config.batch_size) - dpis.sort(key=lambda x: x["created"]["time"], reverse=True) + try: + dpis.sort(key=lambda x: x["created"]["time"], reverse=True) + except Exception as e: + # Delete all cases where created is not present + for dpi in dpis: + try: + dpi["created"]["time"] + except Exception as e: + self.delete_entity(dpi["urn"], "dataprocessInstance") + # We will try again for this job next time + return with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: if self.config.keep_last_n: From 90913f7a228176a7a37962cd40e5406bf603bacf Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 2 Dec 2024 18:00:16 +0530 Subject: [PATCH 2/2] code review feedback --- .../ingestion/source/gc/dataprocess_cleanup.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 20a3ac8d5a565e..ca67cd6daa045b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -277,17 +277,12 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None: assert self.ctx.graph dpis = self.fetch_dpis(job.urn, self.config.batch_size) - try: - dpis.sort(key=lambda x: x["created"]["time"], reverse=True) - except Exception as e: - # Delete all cases where created is not present - for dpi in dpis: - try: - dpi["created"]["time"] - except Exception as e: - self.delete_entity(dpi["urn"], "dataprocessInstance") - # We will try again for this job next time - return + dpis.sort( + key=lambda x: x["created"]["time"] + if x["created"] and x["created"]["time"] + else 0, + reverse=True, + ) with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor: if self.config.keep_last_n: