Skip to content

Commit

Permalink
fix(ingestion): looker - various fixes (#4394)
Browse files Browse the repository at this point in the history
Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
gabe-lyons and shirshanka authored Mar 15, 2022
1 parent e8f6c4c commit 431ba4b
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 15 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/source_docs/looker.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `view_browse_pattern` | | `/{env}/{platform}/{project}/views/{name}` | Pattern for providing browse paths to views. Allowed variables are `{project}`, `{model}`, `{name}`, `{platform}` and `{env}` |
| `explore_naming_pattern` | | `{model}.explore.{name}` | Pattern for providing dataset names to explores. Allowed variables are `{project}`, `{model}`, `{name}` |
| `explore_browse_pattern` | | `/{env}/{platform}/{project}/explores/{model}.{name}` | Pattern for providing browse paths to explores. Allowed variables are `{project}`, `{model}`, `{name}`, `{platform}` and `{env}` |
| `transport_options` | | | Populates the [TransportOptions](https://github.com/looker-open-source/sdk-codegen/blob/94d6047a0d52912ac082eb91616c1e7c379ab262/python/looker_sdk/rtl/transport.py#L70) struct for looker client |
| `max_threads` | | `os.cpuCount or 40` | Max parallelism for Looker API calls |


Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/source_docs/lookml.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `github_info.branch` | | `main` | The default branch in your repo that you want urls to point to. Typically `main` or `master` |
| `github_info.base_url` | | `https://github.com` | The base url for your github coordinates |
| `sql_parser` | | `datahub.utilities.sql_parser.DefaultSQLParser` | See note below. |
| `transport_options` | | | Populates the [TransportOptions](https://github.com/looker-open-source/sdk-codegen/blob/94d6047a0d52912ac082eb91616c1e7c379ab262/python/looker_sdk/rtl/transport.py#L70) struct for looker client |

Note! The integration can use an SQL parser to try to parse the tables the views depends on. This parsing is disabled by default,
but can be enabled by setting `parse_table_names_from_sql: True`. The default parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package.
Expand Down
34 changes: 24 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import looker_sdk
from looker_sdk.error import SDKError
from looker_sdk.rtl.transport import TransportOptions
from looker_sdk.sdk.api31.methods import Looker31SDK
from looker_sdk.sdk.api31.models import (
Dashboard,
Expand Down Expand Up @@ -66,6 +67,7 @@ class LookerAPIConfig(ConfigModel):
client_id: str
client_secret: str
base_url: str
transport_options: Optional[TransportOptions]


class LookerAPI:
Expand All @@ -82,7 +84,7 @@ def __init__(self, config: LookerAPIConfig) -> None:
# try authenticating current user to check connectivity
# (since it's possible to initialize an invalid client without any complaints)
try:
self.client.me()
self.client.me(transport_options=config.transport_options)
except SDKError as e:
raise ConfigurationError(
"Failed to initialize Looker client. Please check your configuration."
Expand Down Expand Up @@ -408,7 +410,7 @@ def _get_looker_dashboard_element( # noqa: C901
fields = self._get_fields_from_query(element.look.query)
if element.look.query.view is not None:
explores = [element.look.query.view]
logger.info(
logger.debug(
"Element {}: Explores added: {}".format(element.title, explores)
)
for exp in explores:
Expand Down Expand Up @@ -576,8 +578,8 @@ def _make_explore_metadata_events(
events, explore_id, start_time, end_time = future.result()
explore_events.extend(events)
self.reporter.report_upstream_latency(start_time, end_time)
logger.info(
f"Running time of fetch_one_explore for {explore_id}: {(end_time-start_time).total_seconds()}"
logger.debug(
f"Running time of fetch_one_explore for {explore_id}: {(end_time - start_time).total_seconds()}"
)

return explore_events
Expand All @@ -593,7 +595,11 @@ def fetch_one_explore(
start_time = datetime.datetime.now()
events: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = []
looker_explore = LookerExplore.from_api(
model, explore, self.client, self.reporter
model,
explore,
self.client,
self.reporter,
self.source_config.transport_options,
)
if looker_explore is not None:
events = (
Expand Down Expand Up @@ -755,7 +761,9 @@ def process_dashboard(
"user_id",
]
dashboard_object = self.client.dashboard(
dashboard_id=dashboard_id, fields=",".join(fields)
dashboard_id=dashboard_id,
fields=",".join(fields),
transport_options=self.source_config.transport_options,
)
except SDKError:
# A looker dashboard could be deleted in between the list and the get
Expand Down Expand Up @@ -786,9 +794,15 @@ def process_dashboard(
return workunits, dashboard_id, start_time, datetime.datetime.now()

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
dashboards = self.client.all_dashboards(fields="id")
dashboards = self.client.all_dashboards(
fields="id", transport_options=self.source_config.transport_options
)
deleted_dashboards = (
self.client.search_dashboards(fields="id", deleted="true")
self.client.search_dashboards(
fields="id",
deleted="true",
transport_options=self.source_config.transport_options,
)
if self.source_config.include_deleted
else []
)
Expand All @@ -809,8 +823,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
]
for async_workunit in concurrent.futures.as_completed(async_workunits):
work_units, dashboard_id, start_time, end_time = async_workunit.result()
logger.info(
f"Running time of process_dashboard for {dashboard_id} = {(end_time-start_time).total_seconds()}"
logger.debug(
f"Running time of process_dashboard for {dashboard_id} = {(end_time - start_time).total_seconds()}"
)
self.reporter.report_upstream_latency(start_time, end_time)
for mwu in work_units:
Expand Down
11 changes: 7 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/looker_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict, Iterable, List, Optional, Tuple, Union

from looker_sdk.error import SDKError
from looker_sdk.rtl.transport import TransportOptions
from looker_sdk.sdk.api31.methods import Looker31SDK
from pydantic.class_validators import validator

Expand Down Expand Up @@ -309,10 +310,9 @@ def _get_field_type(
# attempt Postgres modified type
type_class = resolve_postgres_modified_type(native_type)

# if still not found, report a warning
# if still not found, log and continue
if type_class is None:
reporter.report_warning(
native_type,
logger.info(
f"The type '{native_type}' is not recognized for field type, setting as NullTypeClass.",
)
type_class = NullTypeClass
Expand Down Expand Up @@ -500,9 +500,12 @@ def from_api( # noqa: C901
explore_name: str,
client: Looker31SDK,
reporter: SourceReport,
transport_options: Optional[TransportOptions],
) -> Optional["LookerExplore"]: # noqa: C901
try:
explore = client.lookml_model_explore(model, explore_name)
explore = client.lookml_model_explore(
model, explore_name, transport_options=transport_options
)
views = set()
if explore.joins is not None and explore.joins != []:
if explore.view_name is not None and explore.view_name != explore.name:
Expand Down
9 changes: 8 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import pydantic
from looker_sdk.error import SDKError
from looker_sdk.rtl.transport import TransportOptions
from looker_sdk.sdk.api31.methods import Looker31SDK
from looker_sdk.sdk.api31.models import DBConnection
from pydantic import root_validator, validator
Expand Down Expand Up @@ -143,6 +144,7 @@ class LookMLSourceConfig(LookerCommonConfig):
sql_parser: str = "datahub.utilities.sql_parser.DefaultSQLParser"
api: Optional[LookerAPIConfig]
project_name: Optional[str]
transport_options: Optional[TransportOptions]

@validator("platform_instance")
def platform_instance_not_supported(cls, v: str) -> str:
Expand Down Expand Up @@ -577,6 +579,7 @@ def from_looker_dict(
fields,
)
# also store the view logic and materialization
view_logic = looker_viewfile.raw_file_content
if "sql" in derived_table:
view_logic = derived_table["sql"]
view_lang = "sql"
Expand Down Expand Up @@ -1003,7 +1006,11 @@ def get_project_name(self, model_name: str) -> str:
self.looker_client is not None
), "Failed to find a configured Looker API client"
try:
model = self.looker_client.lookml_model(model_name, "project_name")
model = self.looker_client.lookml_model(
model_name,
"project_name",
transport_options=self.source_config.transport_options,
)
assert (
model.project_name is not None
), f"Failed to find a project name for model {model_name}"
Expand Down

0 comments on commit 431ba4b

Please sign in to comment.