Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(looker): various looker fixes #4394

Merged
Merged
1 change: 1 addition & 0 deletions metadata-ingestion/source_docs/looker.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `view_browse_pattern` | | `/{env}/{platform}/{project}/views/{name}` | Pattern for providing browse paths to views. Allowed variables are `{project}`, `{model}`, `{name}`, `{platform}` and `{env}` |
| `explore_naming_pattern` | | `{model}.explore.{name}` | Pattern for providing dataset names to explores. Allowed variables are `{project}`, `{model}`, `{name}` |
| `explore_browse_pattern` | | `/{env}/{platform}/{project}/explores/{model}.{name}` | Pattern for providing browse paths to explores. Allowed variables are `{project}`, `{model}`, `{name}`, `{platform}` and `{env}` |
| `transport_options` | | | Populates the [TransportOptions](https://github.com/looker-open-source/sdk-codegen/blob/94d6047a0d52912ac082eb91616c1e7c379ab262/python/looker_sdk/rtl/transport.py#L70) struct for looker client |
| `max_threads` | | `os.cpuCount or 40` | Max parallelism for Looker API calls |


Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/source_docs/lookml.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `github_info.branch` | | `main` | The default branch in your repo that you want urls to point to. Typically `main` or `master` |
| `github_info.base_url` | | `https://github.com` | The base url for your github coordinates |
| `sql_parser` | | `datahub.utilities.sql_parser.DefaultSQLParser` | See note below. |
| `transport_options` | | | Populates the [TransportOptions](https://github.com/looker-open-source/sdk-codegen/blob/94d6047a0d52912ac082eb91616c1e7c379ab262/python/looker_sdk/rtl/transport.py#L70) struct for looker client |

Note! The integration can use an SQL parser to try to parse the tables the views depends on. This parsing is disabled by default,
but can be enabled by setting `parse_table_names_from_sql: True`. The default parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package.
Expand Down
34 changes: 24 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import looker_sdk
from looker_sdk.error import SDKError
from looker_sdk.rtl.transport import TransportOptions
from looker_sdk.sdk.api31.methods import Looker31SDK
from looker_sdk.sdk.api31.models import (
Dashboard,
Expand Down Expand Up @@ -66,6 +67,7 @@ class LookerAPIConfig(ConfigModel):
client_id: str
client_secret: str
base_url: str
transport_options: Optional[TransportOptions]


class LookerAPI:
Expand All @@ -82,7 +84,7 @@ def __init__(self, config: LookerAPIConfig) -> None:
# try authenticating current user to check connectivity
# (since it's possible to initialize an invalid client without any complaints)
try:
self.client.me()
self.client.me(transport_options=config.transport_options)
except SDKError as e:
raise ConfigurationError(
"Failed to initialize Looker client. Please check your configuration."
Expand Down Expand Up @@ -408,7 +410,7 @@ def _get_looker_dashboard_element( # noqa: C901
fields = self._get_fields_from_query(element.look.query)
if element.look.query.view is not None:
explores = [element.look.query.view]
logger.info(
logger.debug(
"Element {}: Explores added: {}".format(element.title, explores)
)
for exp in explores:
Expand Down Expand Up @@ -576,8 +578,8 @@ def _make_explore_metadata_events(
events, explore_id, start_time, end_time = future.result()
explore_events.extend(events)
self.reporter.report_upstream_latency(start_time, end_time)
logger.info(
f"Running time of fetch_one_explore for {explore_id}: {(end_time-start_time).total_seconds()}"
logger.debug(
f"Running time of fetch_one_explore for {explore_id}: {(end_time - start_time).total_seconds()}"
)

return explore_events
Expand All @@ -593,7 +595,11 @@ def fetch_one_explore(
start_time = datetime.datetime.now()
events: List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]] = []
looker_explore = LookerExplore.from_api(
model, explore, self.client, self.reporter
model,
explore,
self.client,
self.reporter,
self.source_config.transport_options,
)
if looker_explore is not None:
events = (
Expand Down Expand Up @@ -755,7 +761,9 @@ def process_dashboard(
"user_id",
]
dashboard_object = self.client.dashboard(
dashboard_id=dashboard_id, fields=",".join(fields)
dashboard_id=dashboard_id,
fields=",".join(fields),
transport_options=self.source_config.transport_options,
)
except SDKError:
# A looker dashboard could be deleted in between the list and the get
Expand Down Expand Up @@ -786,9 +794,15 @@ def process_dashboard(
return workunits, dashboard_id, start_time, datetime.datetime.now()

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
dashboards = self.client.all_dashboards(fields="id")
dashboards = self.client.all_dashboards(
fields="id", transport_options=self.source_config.transport_options
)
deleted_dashboards = (
self.client.search_dashboards(fields="id", deleted="true")
self.client.search_dashboards(
fields="id",
deleted="true",
transport_options=self.source_config.transport_options,
)
if self.source_config.include_deleted
else []
)
Expand All @@ -809,8 +823,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
]
for async_workunit in concurrent.futures.as_completed(async_workunits):
work_units, dashboard_id, start_time, end_time = async_workunit.result()
logger.info(
f"Running time of process_dashboard for {dashboard_id} = {(end_time-start_time).total_seconds()}"
logger.debug(
f"Running time of process_dashboard for {dashboard_id} = {(end_time - start_time).total_seconds()}"
)
self.reporter.report_upstream_latency(start_time, end_time)
for mwu in work_units:
Expand Down
11 changes: 7 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/looker_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict, Iterable, List, Optional, Tuple, Union

from looker_sdk.error import SDKError
from looker_sdk.rtl.transport import TransportOptions
from looker_sdk.sdk.api31.methods import Looker31SDK
from pydantic.class_validators import validator

Expand Down Expand Up @@ -309,10 +310,9 @@ def _get_field_type(
# attempt Postgres modified type
type_class = resolve_postgres_modified_type(native_type)

# if still not found, report a warning
# if still not found, log and continue
if type_class is None:
reporter.report_warning(
native_type,
logger.info(
f"The type '{native_type}' is not recognized for field type, setting as NullTypeClass.",
)
type_class = NullTypeClass
Expand Down Expand Up @@ -500,9 +500,12 @@ def from_api( # noqa: C901
explore_name: str,
client: Looker31SDK,
reporter: SourceReport,
transport_options: Optional[TransportOptions],
) -> Optional["LookerExplore"]: # noqa: C901
try:
explore = client.lookml_model_explore(model, explore_name)
explore = client.lookml_model_explore(
model, explore_name, transport_options=transport_options
)
views = set()
if explore.joins is not None and explore.joins != []:
if explore.view_name is not None and explore.view_name != explore.name:
Expand Down
9 changes: 8 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import pydantic
from looker_sdk.error import SDKError
from looker_sdk.rtl.transport import TransportOptions
from looker_sdk.sdk.api31.methods import Looker31SDK
from looker_sdk.sdk.api31.models import DBConnection
from pydantic import root_validator, validator
Expand Down Expand Up @@ -143,6 +144,7 @@ class LookMLSourceConfig(LookerCommonConfig):
sql_parser: str = "datahub.utilities.sql_parser.DefaultSQLParser"
api: Optional[LookerAPIConfig]
project_name: Optional[str]
transport_options: Optional[TransportOptions]

@validator("platform_instance")
def platform_instance_not_supported(cls, v: str) -> str:
Expand Down Expand Up @@ -577,6 +579,7 @@ def from_looker_dict(
fields,
)
# also store the view logic and materialization
view_logic = looker_viewfile.raw_file_content
if "sql" in derived_table:
view_logic = derived_table["sql"]
view_lang = "sql"
Expand Down Expand Up @@ -1003,7 +1006,11 @@ def get_project_name(self, model_name: str) -> str:
self.looker_client is not None
), "Failed to find a configured Looker API client"
try:
model = self.looker_client.lookml_model(model_name, "project_name")
model = self.looker_client.lookml_model(
model_name,
"project_name",
transport_options=self.source_config.transport_options,
)
assert (
model.project_name is not None
), f"Failed to find a project name for model {model_name}"
Expand Down