Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miscellaneous fixes to BigQuery connector #959

Merged
merged 8 commits into from
Jan 31, 2024
70 changes: 62 additions & 8 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from google.cloud import bigquery, exceptions
from google.cloud.bigquery import dbapi
from google.cloud.bigquery.job import LoadJobConfig
from google.cloud import exceptions
import google
import petl
from contextlib import contextmanager

from parsons.databases.database_connector import DatabaseConnector
from parsons.databases.table import BaseTable
Expand All @@ -26,13 +30,13 @@
"float": "FLOAT",
"int": "INTEGER",
"bool": "BOOLEAN",
"datetime.datetime": "DATETIME",
"datetime.date": "DATE",
"datetime.time": "TIME",
"datetime": "DATETIME",
"date": "DATE",
"time": "TIME",
"dict": "RECORD",
"NoneType": "STRING",
"UUID": "STRING",
"datetime": "DATETIME",
"timestamp": "TIMESTAMP",
}

# Max number of rows that we query at a time, so we can avoid loading huge
Expand Down Expand Up @@ -775,10 +779,38 @@ def copy(
"""
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)

# if not job_config:
job_config = bigquery.LoadJobConfig()
if not job_config:
job_config = bigquery.LoadJobConfig()

# It isn't ever actually necessary to generate the schema explicitly here
# BigQuery will attempt to autodetect the schema on its own
# When appending or truncating an existing table, we should not provide a schema here
# It introduces situations where provided schema can mismatch the actual schema
if not job_config.schema:
job_config.schema = self._generate_schema_from_parsons_table(tbl)
if if_exists in ("append", "truncate"):
# It is more robust to fetch the actual existing schema
# than it is to try and infer it based on provided data
try:
bigquery_table = self.client.get_table(table_name)
job_config.schema = bigquery_table.schema
except google.api_core.exceptions.NotFound:
job_config.schema = self._generate_schema_from_parsons_table(tbl)
else:
job_config.schema = self._generate_schema_from_parsons_table(tbl)

# Reorder schema to match table to ensure compatibility
schema = []
for column in tbl.columns:
try:
schema_row = [
i for i in job_config.schema if i.name.lower() == column.lower()
][0]
except IndexError:
raise IndexError(
f"Column found in Table that was not found in schema: {column}"
)
schema.append(schema_row)
job_config.schema = schema

gcs_client = gcs_client or GoogleCloudStorage()
temp_blob_name = f"{uuid.uuid4()}.csv"
Expand Down Expand Up @@ -1104,11 +1136,33 @@ def get_row_count(self, schema: str, table_name: str) -> int:
return result["row_count"][0]

def _generate_schema_from_parsons_table(self, tbl):
"""BigQuery schema generation based on contents of Parsons table.

Not usually necessary to use this. BigQuery is able to
natively autodetect schema formats."""
stats = tbl.get_columns_type_stats()
fields = []
for stat in stats:
petl_types = stat["type"]
best_type = "str" if "str" in petl_types else petl_types[0]

# Prefer 'str' if included
# Otherwise choose first type that isn't "NoneType"
# Otherwise choose NoneType
not_none_petl_types = [i for i in petl_types if i != "NoneType"]
if "str" in petl_types:
best_type = "str"
elif not_none_petl_types:
best_type = not_none_petl_types[0]
else:
best_type = "NoneType"

# Python datetimes may be datetime or timestamp in BigQuery
# BigQuery datetimes have no timezone, timestamps do
if best_type == "datetime":
for value in petl.util.base.values(tbl.table, stat["name"]):
if isinstance(value, datetime.datetime) and value.tzinfo:
best_type = "timestamp"

field_type = self._bigquery_type(best_type)
field = bigquery.schema.SchemaField(stat["name"], field_type)
fields.append(field)
Expand Down
9 changes: 6 additions & 3 deletions test/test_databases/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ def test_copy(self):

self.assertEqual(bq.copy_from_gcs.call_count, 1)
load_call_args = bq.copy_from_gcs.call_args
job_config = bq.copy_from_gcs.call_args[1]["job_config"]
column_types = [schema_field.field_type for schema_field in job_config.schema]
self.assertEqual(column_types, ["INTEGER", "STRING", "BOOLEAN"])
self.assertEqual(load_call_args[1]["gcs_blob_uri"], tmp_blob_uri)
self.assertEqual(load_call_args[1]["table_name"], table_name)

Expand Down Expand Up @@ -397,7 +400,7 @@ def test_copy__if_exists_passed_through(self):
bq = self._build_mock_client_for_copying(table_exists=False)
bq.copy_from_gcs = mock.MagicMock()
table_name = "dataset.table"
if_exists = "append"
if_exists = "drop"

# call the method being tested
bq.copy(
Expand Down Expand Up @@ -539,7 +542,7 @@ def _build_mock_cloud_storage_client(self, tmp_blob_uri=""):
def default_table(self):
return Table(
[
{"num": 1, "ltr": "a"},
{"num": 2, "ltr": "b"},
{"num": 1, "ltr": "a", "boolcol": None},
{"num": 2, "ltr": "b", "boolcol": True},
]
)
Loading