-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/iceberg): Iceberg performance improvement #11182
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
verbose code from the ingestor code
) | ||
tables_count = 0 | ||
for namespace in namespaces: | ||
tables = catalog.list_tables(namespace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this fail? If it can maybe we should handle it to not fail all the namespaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added handling of exceptions there.
tables_count = 0 | ||
for namespace in namespaces: | ||
tables = catalog.list_tables(namespace) | ||
tables_count += len(tables) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we want to add this to report?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
tables = catalog.list_tables(namespace) | ||
tables_count += len(tables) | ||
LOGGER.debug( | ||
f"Retrieved {len(tables)} tables for namespace: {namespace}, in total retrieved {tables_count}, first 10: {tables[:10]}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should go as well to report
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
LOGGER.error("Failed to get catalog", exc_info=True) | ||
self.report.report_failure("get-catalog", f"Failed to get catalog: {e}") | ||
return | ||
thread_local = threading.local() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? Why not just pass catalog to _process_dataset?
def _process_dataset(dataset_path, catalog):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catalog object can not be used across threads (at least not for every catalog type).
|
||
for dataset_path in self._get_datasets(catalog): | ||
def _process_dataset(dataset_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type info missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
) | ||
if row_count: | ||
# Iterating through fieldPaths introduces unwanted stats for list element fields... | ||
for field_path, field_id in table.schema()._name_to_id.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table.schema can't be null right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the definition of the function - not, but of course it might happen. If it happens we will simply report failure and not send the datasetProfile
aspect - then continue with the next table. I think it is reasonable behavior.
time_taken = timer.elapsed_seconds() | ||
self.report.report_table_profiling_time(time_taken) | ||
LOGGER.debug( | ||
"Finished profiling of dataset: %s in %s", dataset_name, time_taken |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please us fstring instead of string formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
profiling_table_timings: TimingClass = field(default_factory=TimingClass) | ||
listed_namespaces: int = 0 | ||
total_listed_tables: int = 0 | ||
tables_listed_per_namespace: Dict = field(default_factory=dict) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use TopKDict here?
datahub/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py
Line 98 in d660db1
num_lineage_total_log_entries: TopKDict[str, int] = field( |
This will keep the top 10 namespaces only.
Checklist