Skip to content

Commit

Permalink
Miscellaneous fixes to BigQuery connector (#959)
Browse files Browse the repository at this point in the history
* Test nullable column types are parsed correctly for BigQuery

* Prefer not NoneType when inferring schema for Table load to BigQuery

If a Parsons Table column has values like `[None, None, True, False]`,
the BigQuery connector will infer that the appropriate type for this
column is NoneType, which it will translate into a STRING type.

This change ensures that types returned by petl.typecheck() will
choose the first available type that isn't 'NoneType' if that is
available.

* Fix BigQuery type map

Source types ultimately come from `petl.typeset`, which calls
`type(v).__name__`. This call does not include source module, but only
the type name itself. e.g. `date` and not `datetime.date`

* Fix commented out row to use job_config passed as argument

It looks like this line was accidentally commented out

* Parse python datetime objects for BigQuery as datetime or timestamp

Python datetime objects may represent timestamps or datetimes in
BigQuery, depending on whether they do or do not have a timezone
attached.

* Only generate schema for BigQuery when table does not already exist

Always passing a schema to BigQuery is not necessary, and introduces
situations for provided schema to mismatch actual schema.

When table already exists in BigQuery, fetch the schema from BigQuery

* Ensure that schema and Table columns align

---------

Co-authored-by: Cormac Martinez del Rio <[email protected]>
  • Loading branch information
austinweisgrau and cmdelrio authored Jan 31, 2024
1 parent 0d68302 commit e515096
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
70 changes: 62 additions & 8 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from google.cloud import bigquery, exceptions
from google.cloud.bigquery import dbapi
from google.cloud.bigquery.job import LoadJobConfig
from google.cloud import exceptions
import google
import petl
from contextlib import contextmanager

from parsons.databases.database_connector import DatabaseConnector
from parsons.databases.table import BaseTable
Expand All @@ -26,13 +30,13 @@
"float": "FLOAT",
"int": "INTEGER",
"bool": "BOOLEAN",
"datetime.datetime": "DATETIME",
"datetime.date": "DATE",
"datetime.time": "TIME",
"datetime": "DATETIME",
"date": "DATE",
"time": "TIME",
"dict": "RECORD",
"NoneType": "STRING",
"UUID": "STRING",
"datetime": "DATETIME",
"timestamp": "TIMESTAMP",
}

# Max number of rows that we query at a time, so we can avoid loading huge
Expand Down Expand Up @@ -775,10 +779,38 @@ def copy(
"""
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)

# if not job_config:
job_config = bigquery.LoadJobConfig()
if not job_config:
job_config = bigquery.LoadJobConfig()

# It isn't ever actually necessary to generate the schema explicitly here
# BigQuery will attempt to autodetect the schema on its own
# When appending or truncating an existing table, we should not provide a schema here
# It introduces situations where provided schema can mismatch the actual schema
if not job_config.schema:
job_config.schema = self._generate_schema_from_parsons_table(tbl)
if if_exists in ("append", "truncate"):
# It is more robust to fetch the actual existing schema
# than it is to try and infer it based on provided data
try:
bigquery_table = self.client.get_table(table_name)
job_config.schema = bigquery_table.schema
except google.api_core.exceptions.NotFound:
job_config.schema = self._generate_schema_from_parsons_table(tbl)
else:
job_config.schema = self._generate_schema_from_parsons_table(tbl)

# Reorder schema to match table to ensure compatibility
schema = []
for column in tbl.columns:
try:
schema_row = [
i for i in job_config.schema if i.name.lower() == column.lower()
][0]
except IndexError:
raise IndexError(
f"Column found in Table that was not found in schema: {column}"
)
schema.append(schema_row)
job_config.schema = schema

gcs_client = gcs_client or GoogleCloudStorage()
temp_blob_name = f"{uuid.uuid4()}.csv"
Expand Down Expand Up @@ -1104,11 +1136,33 @@ def get_row_count(self, schema: str, table_name: str) -> int:
return result["row_count"][0]

def _generate_schema_from_parsons_table(self, tbl):
"""BigQuery schema generation based on contents of Parsons table.
Not usually necessary to use this. BigQuery is able to
natively autodetect schema formats."""
stats = tbl.get_columns_type_stats()
fields = []
for stat in stats:
petl_types = stat["type"]
best_type = "str" if "str" in petl_types else petl_types[0]

# Prefer 'str' if included
# Otherwise choose first type that isn't "NoneType"
# Otherwise choose NoneType
not_none_petl_types = [i for i in petl_types if i != "NoneType"]
if "str" in petl_types:
best_type = "str"
elif not_none_petl_types:
best_type = not_none_petl_types[0]
else:
best_type = "NoneType"

# Python datetimes may be datetime or timestamp in BigQuery
# BigQuery datetimes have no timezone, timestamps do
if best_type == "datetime":
for value in petl.util.base.values(tbl.table, stat["name"]):
if isinstance(value, datetime.datetime) and value.tzinfo:
best_type = "timestamp"

field_type = self._bigquery_type(best_type)
field = bigquery.schema.SchemaField(stat["name"], field_type)
fields.append(field)
Expand Down
9 changes: 6 additions & 3 deletions test/test_databases/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ def test_copy(self):

self.assertEqual(bq.copy_from_gcs.call_count, 1)
load_call_args = bq.copy_from_gcs.call_args
job_config = bq.copy_from_gcs.call_args[1]["job_config"]
column_types = [schema_field.field_type for schema_field in job_config.schema]
self.assertEqual(column_types, ["INTEGER", "STRING", "BOOLEAN"])
self.assertEqual(load_call_args[1]["gcs_blob_uri"], tmp_blob_uri)
self.assertEqual(load_call_args[1]["table_name"], table_name)

Expand Down Expand Up @@ -397,7 +400,7 @@ def test_copy__if_exists_passed_through(self):
bq = self._build_mock_client_for_copying(table_exists=False)
bq.copy_from_gcs = mock.MagicMock()
table_name = "dataset.table"
if_exists = "append"
if_exists = "drop"

# call the method being tested
bq.copy(
Expand Down Expand Up @@ -539,7 +542,7 @@ def _build_mock_cloud_storage_client(self, tmp_blob_uri=""):
def default_table(self):
return Table(
[
{"num": 1, "ltr": "a"},
{"num": 2, "ltr": "b"},
{"num": 1, "ltr": "a", "boolcol": None},
{"num": 2, "ltr": "b", "boolcol": True},
]
)

0 comments on commit e515096

Please sign in to comment.