diff --git a/target_bigquery/core.py b/target_bigquery/core.py index b541042..ab77aa2 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -39,7 +39,7 @@ from textwrap import dedent, indent from typing import IO, TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type, Union -from google.api_core.exceptions import Conflict, Forbidden +from google.api_core.exceptions import Conflict, Forbidden, NotFound from google.cloud import bigquery, bigquery_storage_v1, storage from google.cloud.bigquery import SchemaField from google.cloud.bigquery.table import TimePartitioning, TimePartitioningType @@ -166,33 +166,30 @@ def create_table( This is a convenience method that wraps the creation of a dataset and table in a single method call. It is idempotent and will not create a new table if one already exists.""" - if not hasattr(self, "_dataset"): - try: - self._dataset = client.create_dataset( - self.as_dataset(**kwargs["dataset"]), exists_ok=False + try: + dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"])) + except NotFound: + dataset = client.create_dataset(self.as_dataset(**kwargs["dataset"])) + except (Conflict, Forbidden): + if dataset.location != kwargs["dataset"]["location"]: + raise Exception( + f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) " + f"does not match specified location: {kwargs['dataset']['location']}" ) - except (Conflict, Forbidden): - dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"])) - if dataset.location != kwargs["dataset"]["location"]: - raise Exception( - f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) " - f"does not match specified location: {kwargs['dataset']['location']}" - ) - else: - self._dataset = dataset - if not hasattr(self, "_table"): - try: - self._table = client.create_table( - self.as_table( - apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED, - **kwargs["table"], - ) + finally: + self._dataset = dataset + try: + self._table = client.get_table(self.as_ref()) + except NotFound: + self._table = client.create_table( + self.as_table( + apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED, + **kwargs["table"], ) - except Conflict: - self._table = client.get_table(self.as_ref()) - else: - # Wait for eventual consistency - time.sleep(5) + ) + else: + # Wait for eventual consistency + time.sleep(5) return self._dataset, self._table def default_table_options(self) -> Dict[str, Any]: @@ -515,7 +512,7 @@ def clean_up(self) -> None: tmp = f"{self.merge_target.name}__tmp" dedupe_query = ( f"SELECT * FROM {self.table.get_escaped_name()} " - f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(f'`{p}`' for p in self.key_properties)} " + f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(f'`{p}`' for p in self.key_properties)} " f"ORDER BY COALESCE({', '.join(date_columns)}) DESC) = 1" ) ctas_tmp = f"CREATE OR REPLACE TEMP TABLE `{tmp}` AS {dedupe_query}" @@ -807,15 +804,15 @@ def _translate_record_to_bigquery_schema( ) -> SchemaField: """Translate a JSON schema record into a BigQuery schema.""" properties = list(schema_property.get("properties", {}).items()) - + # If no properties defined, store as JSON instead of RECORD if len(properties) == 0: return SchemaField(name, "JSON", mode) - + fields = [ - self._jsonschema_property_to_bigquery_column(col, t) - for col, t in properties - ] + self._jsonschema_property_to_bigquery_column(col, t) + for col, t in properties + ] return SchemaField(name, "RECORD", mode, fields=fields) def _bigquery_field_to_projection( @@ -892,14 +889,18 @@ def _wrap_json_array( ) v = _v.as_sql().rstrip(", \n") return (" " * depth * 2) + indent( - dedent(f""" + dedent( + f""" ARRAY( SELECT {v} FROM UNNEST( JSON_QUERY_ARRAY({base}, '{path}.{field.name}') ) AS {field.name}__rows WHERE {_v.projection} IS NOT NULL - """ + (" " * depth * 2) + f") AS {field.name},\n").lstrip(), + """ + + (" " * depth * 2) + + f") AS {field.name},\n" + ).lstrip(), " " * depth * 2, )