Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔧 change logic to see if ds or table exists #91

Merged
merged 5 commits into from
Jul 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 35 additions & 34 deletions target_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from textwrap import dedent, indent
from typing import IO, TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type, Union

from google.api_core.exceptions import Conflict, Forbidden
from google.api_core.exceptions import Conflict, Forbidden, NotFound
from google.cloud import bigquery, bigquery_storage_v1, storage
from google.cloud.bigquery import SchemaField
from google.cloud.bigquery.table import TimePartitioning, TimePartitioningType
Expand Down Expand Up @@ -166,33 +166,30 @@ def create_table(
This is a convenience method that wraps the creation of a dataset and
table in a single method call. It is idempotent and will not create
a new table if one already exists."""
if not hasattr(self, "_dataset"):
try:
self._dataset = client.create_dataset(
self.as_dataset(**kwargs["dataset"]), exists_ok=False
try:
dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"]))
except NotFound:
dataset = client.create_dataset(self.as_dataset(**kwargs["dataset"]))
except (Conflict, Forbidden):
if dataset.location != kwargs["dataset"]["location"]:
raise Exception(
f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) "
f"does not match specified location: {kwargs['dataset']['location']}"
)
except (Conflict, Forbidden):
dataset = client.get_dataset(self.as_dataset(**kwargs["dataset"]))
if dataset.location != kwargs["dataset"]["location"]:
raise Exception(
f"Location of existing dataset {dataset.dataset_id} ({dataset.location}) "
f"does not match specified location: {kwargs['dataset']['location']}"
)
else:
self._dataset = dataset
if not hasattr(self, "_table"):
try:
self._table = client.create_table(
self.as_table(
apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED,
**kwargs["table"],
)
finally:
self._dataset = dataset
try:
self._table = client.get_table(self.as_ref())
except NotFound:
self._table = client.create_table(
self.as_table(
apply_transforms and self.ingestion_strategy != IngestionStrategy.FIXED,
**kwargs["table"],
)
except Conflict:
self._table = client.get_table(self.as_ref())
else:
# Wait for eventual consistency
time.sleep(5)
)
else:
# Wait for eventual consistency
time.sleep(5)
return self._dataset, self._table

def default_table_options(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -515,7 +512,7 @@ def clean_up(self) -> None:
tmp = f"{self.merge_target.name}__tmp"
dedupe_query = (
f"SELECT * FROM {self.table.get_escaped_name()} "
f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(f'`{p}`' for p in self.key_properties)} "
f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {', '.join(f'`{p}`' for p in self.key_properties)} "
f"ORDER BY COALESCE({', '.join(date_columns)}) DESC) = 1"
)
ctas_tmp = f"CREATE OR REPLACE TEMP TABLE `{tmp}` AS {dedupe_query}"
Expand Down Expand Up @@ -807,15 +804,15 @@ def _translate_record_to_bigquery_schema(
) -> SchemaField:
"""Translate a JSON schema record into a BigQuery schema."""
properties = list(schema_property.get("properties", {}).items())

# If no properties defined, store as JSON instead of RECORD
if len(properties) == 0:
return SchemaField(name, "JSON", mode)

fields = [
self._jsonschema_property_to_bigquery_column(col, t)
for col, t in properties
]
self._jsonschema_property_to_bigquery_column(col, t)
for col, t in properties
]
return SchemaField(name, "RECORD", mode, fields=fields)

def _bigquery_field_to_projection(
Expand Down Expand Up @@ -892,14 +889,18 @@ def _wrap_json_array(
)
v = _v.as_sql().rstrip(", \n")
return (" " * depth * 2) + indent(
dedent(f"""
dedent(
f"""
ARRAY(
SELECT {v}
FROM UNNEST(
JSON_QUERY_ARRAY({base}, '{path}.{field.name}')
) AS {field.name}__rows
WHERE {_v.projection} IS NOT NULL
""" + (" " * depth * 2) + f") AS {field.name},\n").lstrip(),
"""
+ (" " * depth * 2)
+ f") AS {field.name},\n"
).lstrip(),
" " * depth * 2,
)

Expand Down
Loading