Skip to content

Commit

Permalink
feat(ingest): add bigquery-queries source (#10994)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
mayurinehate and hsheth2 authored Aug 26, 2024
1 parent 7b28677 commit 223650d
Show file tree
Hide file tree
Showing 36 changed files with 12,799 additions and 629 deletions.
3 changes: 1 addition & 2 deletions metadata-ingestion/docs/sources/bigquery/bigquery_recipe.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
source:
type: bigquery
config:
# `schema_pattern` for BQ Datasets
schema_pattern:
dataset_pattern:
allow:
- finance_bq_dataset
table_pattern:
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@
"google-cloud-datacatalog-lineage==0.2.2",
}
| classification_lib,
"bigquery-queries": sql_common | bigquery_common | sqlglot_lib,
"clickhouse": sql_common | clickhouse_common,
"clickhouse-usage": sql_common | usage_common | clickhouse_common,
"cockroachdb": sql_common | postgres_common | {"sqlalchemy-cockroachdb<2.0.0"},
Expand Down Expand Up @@ -668,6 +669,7 @@
"athena = datahub.ingestion.source.sql.athena:AthenaSource",
"azure-ad = datahub.ingestion.source.identity.azure_ad:AzureADSource",
"bigquery = datahub.ingestion.source.bigquery_v2.bigquery:BigqueryV2Source",
"bigquery-queries = datahub.ingestion.source.bigquery_v2.bigquery_queries:BigQueryQueriesSource",
"clickhouse = datahub.ingestion.source.sql.clickhouse:ClickHouseSource",
"clickhouse-usage = datahub.ingestion.source.usage.clickhouse_usage:ClickHouseUsageSource",
"cockroachdb = datahub.ingestion.source.sql.cockroachdb:CockroachDBSource",
Expand Down
16 changes: 16 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
UpstreamLineageClass,
_Aspect as AspectAbstract,
)
from datahub.metadata.urns import CorpGroupUrn, CorpUserUrn
from datahub.utilities.urn_encoder import UrnEncoder
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
Expand Down Expand Up @@ -224,6 +225,21 @@ def make_user_urn(username: str) -> str:
)


def make_actor_urn(actor: str) -> Union[CorpUserUrn, CorpGroupUrn]:
"""
Makes a user urn if the input is not a user or group urn already
"""
return (
CorpUserUrn(actor)
if not actor.startswith(("urn:li:corpuser:", "urn:li:corpGroup:"))
else (
CorpUserUrn.from_string(actor)
if actor.startswith("urn:li:corpuser:")
else CorpGroupUrn.from_string(actor)
)
)


def make_group_urn(groupname: str) -> str:
"""
Makes a group urn if the input is not a user or group urn already
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os
from typing import Iterable, List, Optional

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand All @@ -21,22 +20,23 @@
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigqueryTableIdentifier,
BigQueryTableRef,
)
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryProject,
BigQuerySchemaApi,
get_projects,
)
from datahub.ingestion.source.bigquery_v2.bigquery_schema_gen import (
BigQuerySchemaGenerator,
)
from datahub.ingestion.source.bigquery_v2.bigquery_test_connection import (
BigQueryTestConnection,
)
from datahub.ingestion.source.bigquery_v2.common import (
BigQueryFilter,
BigQueryIdentifierBuilder,
)
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
Expand Down Expand Up @@ -109,8 +109,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
BigqueryTableIdentifier._BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = (
self.config.sharded_table_pattern
)
if self.config.enable_legacy_sharded_table_support:
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

self.bigquery_data_dictionary = BigQuerySchemaApi(
report=BigQueryV2Report().schema_api_perf,
Expand All @@ -123,6 +121,8 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
)

self.sql_parser_schema_resolver = self._init_schema_resolver()
self.filters = BigQueryFilter(self.config, self.report)
self.identifiers = BigQueryIdentifierBuilder(self.config, self.report)

redundant_lineage_run_skip_handler: Optional[
RedundantLineageRunSkipHandler
Expand All @@ -139,7 +139,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.lineage_extractor = BigqueryLineageExtractor(
config,
self.report,
dataset_urn_builder=self.gen_dataset_urn_from_raw_ref,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_lineage_run_skip_handler,
)

Expand All @@ -156,7 +156,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
config,
self.report,
schema_resolver=self.sql_parser_schema_resolver,
dataset_urn_builder=self.gen_dataset_urn_from_raw_ref,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_usage_run_skip_handler,
)

Expand All @@ -179,7 +179,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.domain_registry,
self.sql_parser_schema_resolver,
self.profiler,
self.gen_dataset_urn,
self.identifiers,
)

self.add_config_to_report()
Expand Down Expand Up @@ -232,7 +232,11 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
projects = self._get_projects()
projects = get_projects(
self.bq_schema_extractor.schema_api,
self.report,
self.filters,
)
if not projects:
return

Expand All @@ -256,89 +260,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.bq_schema_extractor.table_refs,
)

def _get_projects(self) -> List[BigqueryProject]:
logger.info("Getting projects")

if self.config.project_ids or self.config.project_id:
project_ids = self.config.project_ids or [self.config.project_id] # type: ignore
return [
BigqueryProject(id=project_id, name=project_id)
for project_id in project_ids
]

if self.config.project_labels:
return list(self._query_project_list_from_labels())

return list(self._query_project_list())

def _query_project_list_from_labels(self) -> Iterable[BigqueryProject]:
projects = self.bigquery_data_dictionary.get_projects_with_labels(
self.config.project_labels
)

if not projects: # Report failure on exception and if empty list is returned
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project with any of the specified label(s). "
"Maybe resourcemanager.projects.list permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)

for project in projects:
if self.config.project_id_pattern.allowed(project.id):
yield project
else:
self.report.report_dropped(project.id)

def _query_project_list(self) -> Iterable[BigqueryProject]:
try:
projects = self.bigquery_data_dictionary.get_projects()

if (
not projects
): # Report failure on exception and if empty list is returned
self.report.failure(
title="Get projects didn't return any project. ",
message="Maybe resourcemanager.projects.get permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)
except Exception as e:
self.report.failure(
title="Failed to get BigQuery Projects",
message="Maybe resourcemanager.projects.get permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account.",
exc=e,
)
projects = []

for project in projects:
if self.config.project_id_pattern.allowed(project.id):
yield project
else:
self.report.report_dropped(project.id)

def gen_dataset_urn(
self, project_id: str, dataset_name: str, table: str, use_raw_name: bool = False
) -> str:
datahub_dataset_name = BigqueryTableIdentifier(project_id, dataset_name, table)
return make_dataset_urn(
self.platform,
(
str(datahub_dataset_name)
if not use_raw_name
else datahub_dataset_name.raw_table_name()
),
self.config.env,
)

def gen_dataset_urn_from_raw_ref(self, ref: BigQueryTableRef) -> str:
return self.gen_dataset_urn(
ref.table_identifier.project_id,
ref.table_identifier.dataset,
ref.table_identifier.table,
use_raw_name=True,
)

def get_report(self) -> BigQueryV2Report:
return self.report

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from dateutil import parser

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.utilities.parsing_util import (
get_first_missing_key,
get_first_missing_key_any,
Expand Down Expand Up @@ -213,13 +212,6 @@ def get_sanitized_table_ref(self) -> "BigQueryTableRef":
BigqueryTableIdentifier.from_string_name(sanitized_table)
)

def to_urn(self, env: str) -> str:
return make_dataset_urn(
"bigquery",
f"{self.table_identifier.project_id}.{self.table_identifier.dataset}.{self.table_identifier.table}",
env,
)

def __str__(self) -> str:
return f"projects/{self.table_identifier.project_id}/datasets/{self.table_identifier.dataset}/tables/{self.table_identifier.table}"

Expand Down Expand Up @@ -294,19 +286,27 @@ def from_entry(
job.get("jobName", {}).get("jobId"),
),
project_id=job.get("jobName", {}).get("projectId"),
default_dataset=job_query_conf["defaultDataset"]
if job_query_conf["defaultDataset"]
else None,
start_time=parser.parse(job["jobStatistics"]["startTime"])
if job["jobStatistics"]["startTime"]
else None,
end_time=parser.parse(job["jobStatistics"]["endTime"])
if job["jobStatistics"]["endTime"]
else None,
numAffectedRows=int(job["jobStatistics"]["queryOutputRowCount"])
if "queryOutputRowCount" in job["jobStatistics"]
and job["jobStatistics"]["queryOutputRowCount"]
else None,
default_dataset=(
job_query_conf["defaultDataset"]
if job_query_conf["defaultDataset"]
else None
),
start_time=(
parser.parse(job["jobStatistics"]["startTime"])
if job["jobStatistics"]["startTime"]
else None
),
end_time=(
parser.parse(job["jobStatistics"]["endTime"])
if job["jobStatistics"]["endTime"]
else None
),
numAffectedRows=(
int(job["jobStatistics"]["queryOutputRowCount"])
if "queryOutputRowCount" in job["jobStatistics"]
and job["jobStatistics"]["queryOutputRowCount"]
else None
),
statementType=job_query_conf.get("statementType", "UNKNOWN"),
)
# destinationTable
Expand Down Expand Up @@ -376,18 +376,26 @@ def from_exported_bigquery_audit_metadata(
query=query_config["query"],
job_name=job["jobName"],
project_id=QueryEvent._get_project_id_from_job_name(job["jobName"]),
default_dataset=query_config["defaultDataset"]
if query_config.get("defaultDataset")
else None,
start_time=parser.parse(job["jobStats"]["startTime"])
if job["jobStats"]["startTime"]
else None,
end_time=parser.parse(job["jobStats"]["endTime"])
if job["jobStats"]["endTime"]
else None,
numAffectedRows=int(query_stats["outputRowCount"])
if query_stats.get("outputRowCount")
else None,
default_dataset=(
query_config["defaultDataset"]
if query_config.get("defaultDataset")
else None
),
start_time=(
parser.parse(job["jobStats"]["startTime"])
if job["jobStats"]["startTime"]
else None
),
end_time=(
parser.parse(job["jobStats"]["endTime"])
if job["jobStats"]["endTime"]
else None
),
numAffectedRows=(
int(query_stats["outputRowCount"])
if query_stats.get("outputRowCount")
else None
),
statementType=query_config.get("statementType", "UNKNOWN"),
)
# jobName
Expand Down Expand Up @@ -445,18 +453,26 @@ def from_entry_v2(
timestamp=row.timestamp,
actor_email=payload["authenticationInfo"]["principalEmail"],
query=query_config["query"],
default_dataset=query_config["defaultDataset"]
if "defaultDataset" in query_config and query_config["defaultDataset"]
else None,
start_time=parser.parse(job["jobStats"]["startTime"])
if job["jobStats"]["startTime"]
else None,
end_time=parser.parse(job["jobStats"]["endTime"])
if job["jobStats"]["endTime"]
else None,
numAffectedRows=int(query_stats["outputRowCount"])
if "outputRowCount" in query_stats and query_stats["outputRowCount"]
else None,
default_dataset=(
query_config["defaultDataset"]
if "defaultDataset" in query_config and query_config["defaultDataset"]
else None
),
start_time=(
parser.parse(job["jobStats"]["startTime"])
if job["jobStats"]["startTime"]
else None
),
end_time=(
parser.parse(job["jobStats"]["endTime"])
if job["jobStats"]["endTime"]
else None
),
numAffectedRows=(
int(query_stats["outputRowCount"])
if "outputRowCount" in query_stats and query_stats["outputRowCount"]
else None
),
statementType=query_config.get("statementType", "UNKNOWN"),
)
query_event.job_name = job.get("jobName")
Expand Down
Loading

0 comments on commit 223650d

Please sign in to comment.