From 8c88848eaf59fe6e471b58a1386c269b805e5a46 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Sun, 16 Jun 2024 22:25:40 +0200 Subject: [PATCH 1/5] :wrench: change logic to see if ds or table exists --- target_bigquery/core.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/target_bigquery/core.py b/target_bigquery/core.py index b541042..4ff31d0 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -39,7 +39,7 @@ from textwrap import dedent, indent from typing import IO, TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type, Union -from google.api_core.exceptions import Conflict, Forbidden +from google.api_core.exceptions import Conflict, Forbidden, NotFound from google.cloud import bigquery, bigquery_storage_v1, storage from google.cloud.bigquery import SchemaField from google.cloud.bigquery.table import TimePartitioning, TimePartitioningType @@ -166,7 +166,9 @@ def create_table( This is a convenience method that wraps the creation of a dataset and table in a single method call. It is idempotent and will not create a new table if one already exists.""" - if not hasattr(self, "_dataset"): + try: + dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"])) + except NotFound: try: self._dataset = client.create_dataset( self.as_dataset(**kwargs["dataset"]), exists_ok=False @@ -180,7 +182,9 @@ def create_table( ) else: self._dataset = dataset - if not hasattr(self, "_table"): + try: + table = client.get_table(self.as_ref()) + except NotFound: try: self._table = client.create_table( self.as_table( @@ -317,7 +321,8 @@ def __init__( ): self.merge_target = copy(self.table) self.table = BigQueryTable( - name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts) + name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts + ) self.table.create_table( self.client, self.apply_transforms, @@ -336,7 +341,8 @@ def __init__( elif self._is_overwrite_candidate(): self.overwrite_target = copy(self.table) self.table = BigQueryTable( - name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts) + name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts + ) self.table.create_table( self.client, self.apply_transforms, @@ -515,7 +521,7 @@ def clean_up(self) -> None: tmp = f"{self.merge_target.name}__tmp" dedupe_query = ( f"SELECT * FROM {self.table.get_escaped_name()} " - f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(f'`{p}`' for p in self.key_properties)} " + f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(f'`{p}`' for p in self.key_properties)} " f"ORDER BY COALESCE({', '.join(date_columns)}) DESC) = 1" ) ctas_tmp = f"CREATE OR REPLACE TEMP TABLE `{tmp}` AS {dedupe_query}" @@ -807,15 +813,12 @@ def _translate_record_to_bigquery_schema( ) -> SchemaField: """Translate a JSON schema record into a BigQuery schema.""" properties = list(schema_property.get("properties", {}).items()) - + # If no properties defined, store as JSON instead of RECORD if len(properties) == 0: return SchemaField(name, "JSON", mode) - - fields = [ - self._jsonschema_property_to_bigquery_column(col, t) - for col, t in properties - ] + + fields = [self._jsonschema_property_to_bigquery_column(col, t) for col, t in properties] return SchemaField(name, "RECORD", mode, fields=fields) def _bigquery_field_to_projection( @@ -892,14 +895,18 @@ def _wrap_json_array( ) v = _v.as_sql().rstrip(", \n") return (" " * depth * 2) + indent( - dedent(f""" + dedent( + f""" ARRAY( SELECT {v} FROM UNNEST( JSON_QUERY_ARRAY({base}, '{path}.{field.name}') ) AS {field.name}__rows WHERE {_v.projection} IS NOT NULL - """ + (" " * depth * 2) + f") AS {field.name},\n").lstrip(), + """ + + (" " * depth * 2) + + f") AS {field.name},\n" + ).lstrip(), " " * depth * 2, ) From db756a52d3768806d4ba6e70af22cd71384db4b1 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Sun, 16 Jun 2024 22:33:57 +0200 Subject: [PATCH 2/5] :fire: refactoring a lil bit --- target_bigquery/core.py | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/target_bigquery/core.py b/target_bigquery/core.py index 4ff31d0..567e402 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -167,14 +167,11 @@ def create_table( table in a single method call. It is idempotent and will not create a new table if one already exists.""" try: - dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"])) + self._dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"])) except NotFound: try: - self._dataset = client.create_dataset( - self.as_dataset(**kwargs["dataset"]), exists_ok=False - ) + dataset = client.create_dataset(self.as_dataset(**kwargs["dataset"])) except (Conflict, Forbidden): - dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"])) if dataset.location != kwargs["dataset"]["location"]: raise Exception( f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) " @@ -183,20 +180,17 @@ def create_table( else: self._dataset = dataset try: - table = client.get_table(self.as_ref()) + self._table = client.get_table(self.as_ref()) except NotFound: - try: - self._table = client.create_table( - self.as_table( - apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED, - **kwargs["table"], - ) + self._table = client.create_table( + self.as_table( + apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED, + **kwargs["table"], ) - except Conflict: - self._table = client.get_table(self.as_ref()) - else: - # Wait for eventual consistency - time.sleep(5) + ) + else: + # Wait for eventual consistency + time.sleep(5) return self._dataset, self._table def default_table_options(self) -> Dict[str, Any]: From c290e6d5547f69137d2d97b26cc2a52a5e2d2a59 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Sun, 16 Jun 2024 22:42:22 +0200 Subject: [PATCH 3/5] :fire: undoing some linting --- target_bigquery/core.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/target_bigquery/core.py b/target_bigquery/core.py index 567e402..78280cd 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -315,8 +315,7 @@ def __init__( ): self.merge_target = copy(self.table) self.table = BigQueryTable( - name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts - ) + name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts) self.table.create_table( self.client, self.apply_transforms, @@ -335,8 +334,7 @@ def __init__( elif self._is_overwrite_candidate(): self.overwrite_target = copy(self.table) self.table = BigQueryTable( - name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts - ) + name=f"{self.table_name}__{time.strftime('%Y%m%d%H%M%S')}__{uuid.uuid4()}", **opts) self.table.create_table( self.client, self.apply_transforms, @@ -812,7 +810,10 @@ def _translate_record_to_bigquery_schema( if len(properties) == 0: return SchemaField(name, "JSON", mode) - fields = [self._jsonschema_property_to_bigquery_column(col, t) for col, t in properties] + fields = [ + self._jsonschema_property_to_bigquery_column(col, t) + for col, t in properties + ] return SchemaField(name, "RECORD", mode, fields=fields) def _bigquery_field_to_projection( From b45c6cf52f4a0d2a7aa9a5bcb6a0d7abe96fb5ae Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Sun, 16 Jun 2024 23:57:46 +0200 Subject: [PATCH 4/5] :wrench: re-thinking try except blocks --- target_bigquery/core.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/target_bigquery/core.py b/target_bigquery/core.py index 78280cd..23c250f 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -169,16 +169,15 @@ def create_table( try: self._dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"])) except NotFound: - try: - dataset = client.create_dataset(self.as_dataset(**kwargs["dataset"])) - except (Conflict, Forbidden): - if dataset.location != kwargs["dataset"]["location"]: - raise Exception( - f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) " - f"does not match specified location: {kwargs['dataset']['location']}" - ) - else: - self._dataset = dataset + dataset = client.create_dataset(self.as_dataset(**kwargs["dataset"])) + except (Conflict, Forbidden): + if dataset.location != kwargs["dataset"]["location"]: + raise Exception( + f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) " + f"does not match specified location: {kwargs['dataset']['location']}" + ) + else: + self._dataset = dataset try: self._table = client.get_table(self.as_ref()) except NotFound: From 2dcbffb2b453a222978a7baff553f8e7f8b09859 Mon Sep 17 00:00:00 2001 From: Alejandro Martinez Date: Mon, 17 Jun 2024 00:05:51 +0200 Subject: [PATCH 5/5] :wrench: more work on exc block --- target_bigquery/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/target_bigquery/core.py b/target_bigquery/core.py index 23c250f..ab77aa2 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -167,7 +167,7 @@ def create_table( table in a single method call. It is idempotent and will not create a new table if one already exists.""" try: - self._dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"])) + dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"])) except NotFound: dataset = client.create_dataset(self.as_dataset(**kwargs["dataset"])) except (Conflict, Forbidden): @@ -176,7 +176,7 @@ def create_table( f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) " f"does not match specified location: {kwargs['dataset']['location']}" ) - else: + finally: self._dataset = dataset try: self._table = client.get_table(self.as_ref())