Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/gc): Adding test and more checks to gc source #12027

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,22 +208,28 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
dpis = []
start = 0
while True:
job_query_result = self.ctx.graph.execute_graphql(
DATA_PROCESS_INSTANCES_QUERY,
{"dataJobUrn": job_urn, "start": start, "count": batch_size},
)
job_data = job_query_result.get("dataJob")
if not job_data:
raise ValueError(f"Error getting job {job_urn}")

runs_data = job_data.get("runs")
if not runs_data:
raise ValueError(f"Error getting runs for {job_urn}")

runs = runs_data.get("runs")
dpis.extend(runs)
start += batch_size
if len(runs) < batch_size:
try:
job_query_result = self.ctx.graph.execute_graphql(
DATA_PROCESS_INSTANCES_QUERY,
{"dataJobUrn": job_urn, "start": start, "count": batch_size},
)
job_data = job_query_result.get("dataJob")
if not job_data:
logger.error(f"Error getting job {job_urn}")
break

runs_data = job_data.get("runs")
if not runs_data:
logger.error(f"Error getting runs for {job_urn}")
break

runs = runs_data.get("runs")
dpis.extend(runs)
start += batch_size
if len(runs) < batch_size:
break
except Exception as e:
logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}")
break
return dpis

Expand All @@ -243,8 +249,12 @@ def keep_last_n_dpi(
futures[future] = dpi

for future in as_completed(futures):
deleted_count_last_n += 1
futures[future]["deleted"] = True
try:
future.result()
deleted_count_last_n += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")

if deleted_count_last_n % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
Expand Down Expand Up @@ -279,7 +289,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None:
dpis = self.fetch_dpis(job.urn, self.config.batch_size)
dpis.sort(
key=lambda x: x["created"]["time"]
if x["created"] and x["created"]["time"]
if "created" in x and "time" in x["created"]
else 0,
reverse=True,
)
Expand Down Expand Up @@ -314,15 +324,23 @@ def remove_old_dpis(
if dpi.get("deleted"):
continue

if dpi["created"]["time"] < retention_time * 1000:
if (
"created" not in dpi
or "time" not in dpi["created"]
or dpi["created"]["time"] < retention_time * 1000
):
future = executor.submit(
self.delete_entity, dpi["urn"], "dataprocessInstance"
)
futures[future] = dpi

for future in as_completed(futures):
deleted_count_retention += 1
futures[future]["deleted"] = True
try:
future.result()
deleted_count_retention += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")

if deleted_count_retention % self.config.batch_size == 0:
logger.info(
Expand Down Expand Up @@ -378,8 +396,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
dataFlows[flow.urn] = flow

scroll_id: Optional[str] = None
previous_scroll_id: Optional[str] = None

dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list)
deleted_jobs: int = 0

while True:
result = self.ctx.graph.execute_graphql(
DATAJOB_QUERY,
Expand Down Expand Up @@ -426,9 +447,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
else:
dataJobs[datajob_entity.flow_urn].append(datajob_entity)

if not scroll_id:
if not scroll_id or previous_scroll_id == scroll_id:
break

previous_scroll_id = scroll_id

logger.info(f"Deleted {deleted_jobs} DataJobs")
# Delete empty dataflows if needed
if self.config.delete_empty_data_flows:
Expand All @@ -443,4 +466,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if deleted_jobs % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_data_flows} DataFlows")
logger.info(f"Deleted {deleted_data_flows} DataFlows")

return []
109 changes: 109 additions & 0 deletions metadata-ingestion/tests/unit/test_gc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import unittest
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.gc.dataprocess_cleanup import (
DataJobEntity,
DataProcessCleanup,
DataProcessCleanupConfig,
DataProcessCleanupReport,
)


class TestDataProcessCleanup(unittest.TestCase):
def setUp(self):
self.ctx = PipelineContext(run_id="test_run")
self.ctx.graph = MagicMock()
self.config = DataProcessCleanupConfig()
self.report = DataProcessCleanupReport()
self.cleanup = DataProcessCleanup(
self.ctx, self.config, self.report, dry_run=True
)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{
"urn": f"urn:li:dataprocessInstance:{i}",
"created": {
"time": int(datetime.now(timezone.utc).timestamp() + i) * 1000
},
}
for i in range(10)
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(5, self.report.num_aspects_removed)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpis(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = []
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(0, self.report.num_aspects_removed)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
] + [
{
"urn": "urn:li:dataprocessInstance:11",
"created": {"time": int(datetime.now(timezone.utc).timestamp() * 1000)},
}
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(10, self.report.num_aspects_removed)

def test_fetch_dpis(self):
assert self.cleanup.ctx.graph
self.cleanup.ctx.graph = MagicMock()
self.cleanup.ctx.graph.execute_graphql.return_value = {
"dataJob": {
"runs": {
"runs": [
{
"urn": "urn:li:dataprocessInstance:1",
"created": {
"time": int(datetime.now(timezone.utc).timestamp())
},
}
]
}
}
}
dpis = self.cleanup.fetch_dpis("urn:li:dataJob:1", 10)
self.assertEqual(len(dpis), 1)


if __name__ == "__main__":
unittest.main()
Loading