Skip to content

Commit

Permalink
Enable BigQuery schema auto-detection with partitioning and clusterin…
Browse files Browse the repository at this point in the history
…g hints (#1806)

* Add autodetect schema with hints test for BigQuery table builder

Signed-off-by: Marcel Coetzee <[email protected]>

* Use SDK to set hints for autodetect_schema path

Signed-off-by: Marcel Coetzee <[email protected]>

* Pass timestamp test

Signed-off-by: Marcel Coetzee <[email protected]>

* Remove redundant test

Signed-off-by: Marcel Coetzee <[email protected]>

* Extract BigQuery load job configuration into own method

Signed-off-by: Marcel Coetzee <[email protected]>

* moves pipeline tests to pipelines

---------

Signed-off-by: Marcel Coetzee <[email protected]>
Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
Pipboyguy and rudolfix authored Sep 14, 2024
1 parent 12e2f12 commit 325d927
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 122 deletions.
55 changes: 40 additions & 15 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,29 @@
import functools
import os
from pathlib import Path
import time
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, cast

import google.cloud.bigquery as bigquery # noqa: I250
from google.api_core import exceptions as api_core_exceptions
from google.cloud import exceptions as gcp_exceptions
from google.api_core import retry
from google.cloud import exceptions as gcp_exceptions
from google.cloud.bigquery.retry import _RETRYABLE_REASONS

from dlt.common import logger
from dlt.common.runtime.signals import sleep
from dlt.common.json import json
from dlt.common.destination import DestinationCapabilitiesContext, PreparedTableSchema
from dlt.common.destination.reference import (
HasFollowupJobs,
FollowupJobRequest,
TLoadJobState,
RunnableLoadJob,
SupportsStagingDestination,
LoadJob,
)
from dlt.common.json import json
from dlt.common.runtime.signals import sleep
from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns
from dlt.common.schema.typing import TColumnType
from dlt.common.schema.utils import get_inherited_table_hint
from dlt.common.schema.utils import get_inherited_table_hint, get_columns_names_with_prop
from dlt.common.storages.load_package import destination_state
from dlt.common.typing import DictStrAny
from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob
from dlt.destinations.exceptions import (
DatabaseTransientException,
DatabaseUndefinedRelation,
Expand All @@ -49,6 +45,7 @@
from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration
from dlt.destinations.impl.bigquery.sql_client import BigQuerySqlClient, BQ_TERMINAL_REASONS
from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset
from dlt.destinations.job_impl import DestinationJsonlLoadJob, DestinationParquetLoadJob
from dlt.destinations.job_impl import ReferenceFollowupJobRequest
from dlt.destinations.sql_jobs import SqlMergeFollowupJob

Expand Down Expand Up @@ -227,8 +224,8 @@ def create_load_job(
def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
) -> List[str]:
# return empty columns which will skip table CREATE or ALTER
# to let BigQuery autodetect table from data
# Return empty columns which will skip table CREATE or ALTER to let BigQuery
# auto-detect table from data.
table = self.prepare_load_table(table_name)
if should_autodetect_schema(table):
return []
Expand Down Expand Up @@ -410,11 +407,8 @@ def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigque
max_bad_records=0,
)
if should_autodetect_schema(table):
# allow BigQuery to infer and evolve the schema, note that dlt is not
# creating such tables at all
job_config.autodetect = True
job_config.schema_update_options = bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
# Allow BigQuery to infer and evolve the schema, note that dlt is not creating such tables at all.
job_config = self._set_user_hints_with_schema_autodetection(table, job_config)

if bucket_path:
return self.sql_client.native_connection.load_table_from_uri(
Expand All @@ -434,6 +428,37 @@ def _create_load_job(self, table: PreparedTableSchema, file_path: str) -> bigque
timeout=self.config.file_upload_timeout,
)

def _set_user_hints_with_schema_autodetection(
self, table: PreparedTableSchema, job_config: bigquery.LoadJobConfig
) -> bigquery.LoadJobConfig:
job_config.autodetect = True
job_config.schema_update_options = bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
job_config.create_disposition = bigquery.CreateDisposition.CREATE_IF_NEEDED
if partition_column_ := get_columns_names_with_prop(table, PARTITION_HINT):
partition_column = partition_column_[0]
col_dtype = table["columns"][partition_column]["data_type"]
if col_dtype == "date":
job_config.time_partitioning = bigquery.TimePartitioning(field=partition_column)
elif col_dtype == "timestamp":
job_config.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY, field=partition_column
)
elif col_dtype == "bigint":
job_config.range_partitioning = bigquery.RangePartitioning(
field=partition_column,
range_=bigquery.PartitionRange(start=-172800000, end=691200000, interval=86400),
)
if clustering_columns := get_columns_names_with_prop(table, CLUSTER_HINT):
job_config.clustering_fields = clustering_columns
if table_description := table.get(TABLE_DESCRIPTION_HINT, False):
job_config.destination_table_description = table_description
if table_expiration := table.get(TABLE_EXPIRATION_HINT, False):
raise ValueError(
f"Table expiration time ({table_expiration}) can't be set with BigQuery type"
" auto-detection enabled!"
)
return job_config

def _retrieve_load_job(self, file_path: str) -> bigquery.LoadJob:
job_id = BigQueryLoadJob.get_job_id_from_file_path(file_path)
return cast(bigquery.LoadJob, self.sql_client.native_connection.get_job(job_id))
Expand Down
114 changes: 8 additions & 106 deletions tests/load/bigquery/test_bigquery_table_builder.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,33 @@
import os
from copy import deepcopy
from typing import Iterator, Dict, Any, List
from dlt.common.destination.exceptions import DestinationSchemaTampered
from dlt.common.schema.exceptions import SchemaIdentifierNormalizationCollision
from dlt.destinations.impl.bigquery.bigquery_adapter import (
PARTITION_HINT,
CLUSTER_HINT,
)

import google
import pytest
import sqlfluff
from google.cloud.bigquery import Table

import dlt
from dlt.common.configuration import resolve_configuration
from dlt.common.configuration.specs import (
GcpServiceAccountCredentialsWithoutDefaults,
GcpServiceAccountCredentials,
)
from dlt.common.destination.exceptions import DestinationSchemaTampered
from dlt.common.pendulum import pendulum
from dlt.common.schema import Schema, utils
from dlt.common.schema.exceptions import SchemaIdentifierNormalizationCollision
from dlt.common.utils import custom_environ
from dlt.common.utils import uniq_id

from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate
from dlt.destinations import bigquery
from dlt.destinations.impl.bigquery.bigquery import BigQueryClient
from dlt.destinations.adapters import bigquery_adapter
from dlt.destinations.exceptions import DestinationSchemaWillNotUpdate
from dlt.destinations.impl.bigquery.bigquery import BigQueryClient
from dlt.destinations.impl.bigquery.bigquery_adapter import (
PARTITION_HINT,
CLUSTER_HINT,
)
from dlt.destinations.impl.bigquery.configuration import BigQueryClientConfiguration

from dlt.extract import DltResource

from tests.load.utils import (
destinations_configs,
DestinationTestConfiguration,
Expand Down Expand Up @@ -1043,97 +1039,3 @@ def some_data() -> Iterator[Dict[str, str]]:
bigquery_adapter(some_data, table_expiration_datetime="2030-01-01")

assert some_data._hints["x-bigquery-table-expiration"] == pendulum.datetime(2030, 1, 1) # type: ignore


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
ids=lambda x: x.name,
)
def test_adapter_additional_table_hints_table_expiration(
destination_config: DestinationTestConfiguration,
) -> None:
@dlt.resource(columns=[{"name": "col1", "data_type": "text"}])
def no_hints() -> Iterator[Dict[str, str]]:
yield from [{"col1": str(i)} for i in range(10)]

hints = bigquery_adapter(
no_hints.with_name(new_name="hints"), table_expiration_datetime="2030-01-01"
)

@dlt.source(max_table_nesting=0)
def sources() -> List[DltResource]:
return [no_hints, hints]

pipeline = destination_config.setup_pipeline(
f"bigquery_{uniq_id()}",
dev_mode=True,
)

pipeline.run(sources())

with pipeline.sql_client() as c:
nc: google.cloud.bigquery.client.Client = c.native_connection

fqtn_no_hints = c.make_qualified_table_name("no_hints", escape=False)
fqtn_hints = c.make_qualified_table_name("hints", escape=False)

no_hints_table = nc.get_table(fqtn_no_hints)
hints_table = nc.get_table(fqtn_hints)

assert not no_hints_table.expires
assert hints_table.expires == pendulum.datetime(2030, 1, 1, 0)


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["bigquery"]),
ids=lambda x: x.name,
)
def test_adapter_merge_behaviour(
destination_config: DestinationTestConfiguration,
) -> None:
@dlt.resource(
columns=[
{"name": "col1", "data_type": "text"},
{"name": "col2", "data_type": "bigint"},
{"name": "col3", "data_type": "double"},
]
)
def hints() -> Iterator[Dict[str, Any]]:
yield from [{"col1": str(i), "col2": i, "col3": float(i)} for i in range(10)]

bigquery_adapter(hints, table_expiration_datetime="2030-01-01", cluster=["col1"])
bigquery_adapter(
hints,
table_description="A small table somewhere in the cosmos...",
partition="col2",
)

pipeline = destination_config.setup_pipeline(
f"bigquery_{uniq_id()}",
dev_mode=True,
)

pipeline.run(hints)

with pipeline.sql_client() as c:
nc: google.cloud.bigquery.client.Client = c.native_connection

table_fqtn = c.make_qualified_table_name("hints", escape=False)

table: Table = nc.get_table(table_fqtn)

table_cluster_fields = [] if table.clustering_fields is None else table.clustering_fields

# Test merging behaviour.
assert table.expires == pendulum.datetime(2030, 1, 1, 0)
assert ["col1"] == table_cluster_fields, "`hints` table IS NOT clustered by `col1`."
assert table.description == "A small table somewhere in the cosmos..."

if not table.range_partitioning:
raise ValueError("`hints` table IS NOT clustered on a column.")
else:
assert (
table.range_partitioning.field == "col2"
), "`hints` table IS NOT clustered on column `col2`."
Loading

0 comments on commit 325d927

Please sign in to comment.