Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/tableau): restructure the tableau graphql datasource query #11230

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fcc2450
cursor based pagination
sid-acryl Aug 23, 2024
752cd95
lint fix
sid-acryl Aug 23, 2024
ff54319
error message
sid-acryl Aug 23, 2024
7213497
add a debug log
sid-acryl Aug 23, 2024
d75912c
add fetch size
sid-acryl Aug 24, 2024
1dff5d8
commented upstreamFields and upstreamColumns of published and embedde…
sid-acryl Aug 26, 2024
f182099
existing test-cases are working
sid-acryl Aug 27, 2024
0d2431c
fix test case
sid-acryl Aug 27, 2024
fe70845
fix unit test case
sid-acryl Aug 27, 2024
8cb1d03
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Aug 27, 2024
464bb43
doc updated
sid-acryl Aug 27, 2024
902aa3c
Merge branch 'cus-2491-tableau-ingestion-hitting-20000-node-limit' of…
sid-acryl Aug 27, 2024
2ec4ca4
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Aug 27, 2024
dd533b4
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Aug 28, 2024
127391d
fix the retries_remaining
sid-acryl Aug 28, 2024
b25c6f1
change parent container emit sequence
sid-acryl Aug 28, 2024
a093b15
log message
sid-acryl Aug 29, 2024
02d4655
update sequence
sid-acryl Aug 29, 2024
9cce6c0
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Aug 30, 2024
439a375
address review comments
sid-acryl Aug 30, 2024
29f9406
generate container for parent first
sid-acryl Sep 3, 2024
dc28a3e
doc updates
sid-acryl Sep 4, 2024
3fcedb7
revert the file
sid-acryl Sep 4, 2024
b8f4428
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Sep 5, 2024
c5afb8a
address review comments
sid-acryl Sep 5, 2024
1559493
Update metadata-ingestion/src/datahub/ingestion/source/tableau/tablea…
sid-acryl Sep 9, 2024
45a2e07
Merge branch 'master' into cus-2491-tableau-ingestion-hitting-20000-n…
sid-acryl Sep 9, 2024
07baf75
address review comments
sid-acryl Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
"tableau = datahub.ingestion.source.tableau.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
"teradata = datahub.ingestion.source.sql.teradata:TeradataSource",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +0,0 @@
from datahub.ingestion.source.tableau.tableau import (
TableauConfig,
TableauSiteSource,
TableauSource,
TableauSourceReport,
)
130 changes: 98 additions & 32 deletions metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,9 @@ class UsageStat:
class TableauProject:
id: str
name: str
description: str
description: Optional[str]
parent_id: Optional[str]
parent_name: Optional[str] # Name of parent project
parent_name: Optional[str] # Name of a parent project
path: List[str]


Expand Down Expand Up @@ -943,6 +943,7 @@ def _init_workbook_registry(self) -> None:

def _populate_projects_registry(self) -> None:
if self.server is None:
logger.warning("server is None. Can not initialize the project registry")
return

logger.info("Initializing site project registry")
Expand Down Expand Up @@ -979,11 +980,22 @@ def get_connection_object_page(
query: str,
connection_type: str,
query_filter: str,
current_cursor: Optional[str], # initial value is None
fetch_size: int = 0,
current_cursor: Optional[str],
fetch_size: int = 250,
retry_on_auth_error: bool = True,
retries_remaining: Optional[int] = None,
) -> Tuple[dict, Optional[str], int]:
"""
`current_cursor:` Tableau Server executes the query and returns
a set of records based on the specified fetch_size. It also provides a cursor that
indicates the current position within the result set. In the next API call,
you pass this cursor to retrieve the next batch of records,
with each batch containing up to fetch_size records.
Initial value is None.

`fetch_size:` The number of records to retrieve from Tableau
Server in a single API call, starting from the current cursor position on Tableau Server.
"""
retries_remaining = retries_remaining or self.config.max_retries

logger.debug(
Expand All @@ -1007,7 +1019,7 @@ def get_connection_object_page(

# If ingestion has been running for over 2 hours, the Tableau
# temporary credentials will expire. If this happens, this exception
# will be thrown and we need to re-authenticate and retry.
# will be thrown, and we need to re-authenticate and retry.
self._re_authenticate()
return self.get_connection_object_page(
query=query,
Expand All @@ -1016,7 +1028,7 @@ def get_connection_object_page(
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retries_remaining=retries_remaining,
retries_remaining=retries_remaining - 1,
)
except OSError:
# In tableauseverclient 0.26 (which was yanked and released in 0.28 on 2023-10-04),
Expand All @@ -1034,7 +1046,7 @@ def get_connection_object_page(
fetch_size=fetch_size,
current_cursor=current_cursor,
retry_on_auth_error=False,
retries_remaining=retries_remaining,
retries_remaining=retries_remaining - 1,
)

if c.ERRORS in query_data:
Expand Down Expand Up @@ -1082,8 +1094,11 @@ def get_connection_object_page(
if node_limit_errors:
logger.debug(f"Node Limit Error. query_data {query_data}")
self.report.warning(
title="Node Limit Errors",
message="Increase your tableau node limit",
title="Tableau Data Exceed Predefined Limit",
message="The numbers of record in result set exceeds a predefined limit. Increase the tableau "
"configuration metadata query node limit to higher value. Refer "
"https://help.tableau.com/current/server/en-us/"
"cli_configuration-set_tsm.htm#metadata_nodelimit",
context=f"""{{
"errors": {node_limit_errors},
"connection_type": {connection_type},
Expand Down Expand Up @@ -1163,8 +1178,8 @@ def get_connection_objects(
query=query,
connection_type=connection_type,
query_filter=filter_,
fetch_size=self.config.fetch_size,
current_cursor=current_cursor,
fetch_size=self.config.fetch_size,
)

yield from connection_objects.get(c.NODES) or []
Expand Down Expand Up @@ -3052,33 +3067,84 @@ def _get_ownership(self, user: str) -> Optional[OwnershipClass]:
return None

def emit_project_containers(self) -> Iterable[MetadataWorkUnit]:
for _id, project in self.tableau_project_registry.items():
parent_container_key: Optional[ContainerKey] = None
if project.parent_id:
parent_container_key = self.gen_project_key(project.parent_id)
elif self.config.add_site_container and self.site and self.site.id:
parent_container_key = self.gen_site_key(self.site.id)
visited_project_ids: Set[str] = set()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
visited_project_ids: Set[str] = set()
generated_project_ids: Set[str] = set()


def emit_project_in_topological_order(
project_: TableauProject,
) -> Iterable[MetadataWorkUnit]:
"""
The source_helpers.py file includes a function called auto_browse_path_v2 that automatically generates
BrowsePathsV2. This auto-generation relies on the correct sequencing of MetadataWorkUnit containers.
The emit_project_in_topological_order function ensures that parent containers are emitted before their child
containers.
This is a recursive function.
"""
if project_.parent_id is None:
project_key = self.gen_project_key(project_.id)

if project_key.guid() in visited_project_ids:
return

visited_project_ids.add(project_key.guid())

parent_container_key: Optional[ContainerKey] = None
# set site as parent container
if self.config.add_site_container and self.site and self.site.id:
parent_container_key = self.gen_site_key(self.site.id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this after the visited_project_ids check?

also, why are we adding to visited_project_ids in two places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reorganized the code


yield from gen_containers(
container_key=self.gen_project_key(_id),
name=project.name,
description=project.description,
sub_types=[c.PROJECT],
parent_container_key=parent_container_key,
)
if (
project.parent_id is not None
and project.parent_id not in self.tableau_project_registry
):
# Parent project got skipped because of project_pattern.
# Let's ingest its container name property to show parent container name on DataHub Portal, otherwise
# DataHub Portal will show parent container URN
yield from gen_containers(
container_key=self.gen_project_key(project.parent_id),
name=cast(str, project.parent_name),
container_key=project_key,
name=project_.name,
description=project_.description,
sub_types=[c.PROJECT],
parent_container_key=parent_container_key,
)

return

parent_project: Optional[
TableauProject
] = self.tableau_project_registry.get(project_.parent_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how exactly does this interact with project path filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here #L870, connector adding project into the registry if its allowed by pattern.

if (
parent_project is None
): # this project is filtered because of project_pattern, we need to generate MCP
# for this, otherwise DataHub Portal will show parent container URN
assert (
project_.parent_name
), f"project {project_.name} parent project name is None"

parent_project = TableauProject(
id=project_.parent_id,
name=project_.parent_name,
description=None, # No description available for parent project
parent_id=None,
parent_name=None,
path=[],
)

yield from emit_project_in_topological_order(parent_project)

container_key = self.gen_project_key(project_.id)

if container_key.guid() in visited_project_ids:
return

visited_project_ids.add(container_key.guid())

yield from gen_containers(
container_key=container_key,
name=project_.name,
description=project_.description,
sub_types=[c.PROJECT],
parent_container_key=self.gen_project_key(project_.parent_id),
)

for id_, project in self.tableau_project_registry.items():
logger.debug(
f"project {project.name} and it's parent {project.parent_name} and parent id {project.parent_id}"
)
yield from emit_project_in_topological_order(project)

def emit_site_container(self):
if not self.site or not self.site.id:
logger.warning("Can not ingest site container. No site information found.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from datahub.configuration.source_common import DEFAULT_ENV
from datahub.emitter.mce_builder import make_schema_field_urn
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.tableau import (
from datahub.ingestion.source.tableau.tableau import (
TableauConfig,
TableauSiteSource,
TableauSource,
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_tableau_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

import datahub.ingestion.source.tableau.tableau_constant as c
from datahub.ingestion.source.tableau import TableauSiteSource
from datahub.ingestion.source.tableau.tableau import TableauSiteSource
from datahub.ingestion.source.tableau.tableau_common import (
get_filter_pages,
make_filter,
Expand Down
Loading