Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/gc): reduce logging, remove unnecessary sleeps #12238

Merged
merged 3 commits into from
Dec 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ class DataProcessCleanupReport(SourceReport):
sample_removed_aspects_by_type: TopKDict[str, LossyList[str]] = field(
default_factory=TopKDict
)
num_data_flows_found: int = 0
num_data_jobs_found: int = 0


class DataProcessCleanup:
Expand Down Expand Up @@ -265,13 +267,17 @@ def keep_last_n_dpi(
self.report.report_failure(
f"Exception while deleting DPI: {e}", exc=e
)
if deleted_count_last_n % self.config.batch_size == 0:
if (
deleted_count_last_n % self.config.batch_size == 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

and deleted_count_last_n > 0
):
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
if self.config.delay:
logger.info(f"Sleeping for {self.config.delay} seconds")
time.sleep(self.config.delay)

logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
if deleted_count_last_n > 0:
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
Comment on lines +279 to +280
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we are repeating code here, how about refactoring both ifs to

Suggested change
if deleted_count_last_n > 0:
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
if deleted_count_last_n > 0:
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
if deleted_count_last_n % self.config.batch_size == 0 and self.config.delay:
logger.info(f"Sleeping for {self.config.delay} seconds")
time.sleep(self.config.delay)


def delete_entity(self, urn: str, type: str) -> None:
assert self.ctx.graph
Expand Down Expand Up @@ -351,7 +357,10 @@ def remove_old_dpis(
except Exception as e:
self.report.report_failure(f"Exception while deleting DPI: {e}", exc=e)

if deleted_count_retention % self.config.batch_size == 0:
if (
deleted_count_retention % self.config.batch_size == 0
and deleted_count_retention > 0
):
logger.info(
f"Deleted {deleted_count_retention} DPIs from {job.urn} due to retention"
)
Expand Down Expand Up @@ -393,6 +402,7 @@ def get_data_flows(self) -> Iterable[DataFlowEntity]:
scrollAcrossEntities = result.get("scrollAcrossEntities")
if not scrollAcrossEntities:
raise ValueError("Missing scrollAcrossEntities in response")
self.report.num_data_flows_found += scrollAcrossEntities.get("count")
logger.info(f"Got {scrollAcrossEntities.get('count')} DataFlow entities")

scroll_id = scrollAcrossEntities.get("nextScrollId")
Expand All @@ -415,8 +425,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
assert self.ctx.graph

dataFlows: Dict[str, DataFlowEntity] = {}
for flow in self.get_data_flows():
dataFlows[flow.urn] = flow
if self.config.delete_empty_data_flows:
for flow in self.get_data_flows():
dataFlows[flow.urn] = flow
Comment on lines +429 to +430
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might consider using dictionary comprehension to reduce indentations:

          dataFlows = {flow.urn: flow for flow in self.get_data_flows()}


scroll_id: Optional[str] = None
previous_scroll_id: Optional[str] = None
Expand All @@ -443,6 +454,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if not scrollAcrossEntities:
raise ValueError("Missing scrollAcrossEntities in response")

self.report.num_data_jobs_found += scrollAcrossEntities.get("count")
logger.info(f"Got {scrollAcrossEntities.get('count')} DataJob entities")

scroll_id = scrollAcrossEntities.get("nextScrollId")
Expand Down Expand Up @@ -481,7 +493,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

previous_scroll_id = scroll_id

logger.info(f"Deleted {deleted_jobs} DataJobs")
if deleted_jobs > 0:
logger.info(f"Deleted {deleted_jobs} DataJobs")
# Delete empty dataflows if needed
if self.config.delete_empty_data_flows:
deleted_data_flows: int = 0
Expand Down
Loading