From d02fe9287217d232b2d3fe223089e71531dc1c5f Mon Sep 17 00:00:00 2001 From: Love Eklund Date: Fri, 4 Oct 2024 15:14:09 +0200 Subject: [PATCH 1/6] updated updat_schema function --- target_bigquery/core.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/target_bigquery/core.py b/target_bigquery/core.py index 3cf69cc..6e50fdd 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -204,6 +204,9 @@ def create_table( time.sleep(5) return self._dataset, self._table + def get_current_schema(self) -> List[bigquery.SchemaField]: + return self._table.schema + def default_table_options(self) -> Dict[str, Any]: """Returns the default table options for this table.""" schema_dump = json.dumps(self.jsonschema) @@ -587,10 +590,11 @@ class Denormalized: def update_schema(self: BaseBigQuerySink) -> None: # type: ignore """Update the target schema.""" table = self.table.as_table() - current_schema = table.schema[:] - mut_schema = table.schema[:] - for expected_field in self.table.get_resolved_schema(self.apply_transforms): - if not any(field.name == expected_field.name for field in current_schema): + resolved_schema = self.table.get_resolved_schema(self.apply_transforms) + current_schema = self.table.get_current_schema()[:] + mut_schema = current_schema[:] + for expected_field in resolved_schema: + if not any(field.name == expected_field.name for field in mut_schema): mut_schema.append(expected_field) if len(mut_schema) > len(current_schema): table.schema = mut_schema From 375cf990388b22048b76e92a6c7b2c41c6773726 Mon Sep 17 00:00:00 2001 From: Love Eklund Date: Fri, 4 Oct 2024 15:15:47 +0200 Subject: [PATCH 2/6] adding space to trigger workflow --- .github/workflows/ci.yml | 2 +- target_bigquery/core.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5466b80..5b94f33 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,7 @@ on: - "**.py" - "poetry.lock" push: - branches: [main] + branches: [main, bugfix/fix_update_schema_to_read_current_schema_in_target_table] paths: - "**.py" - "poetry.lock" diff --git a/target_bigquery/core.py b/target_bigquery/core.py index 6e50fdd..286de27 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -588,7 +588,7 @@ class Denormalized: reraise=True, ) def update_schema(self: BaseBigQuerySink) -> None: # type: ignore - """Update the target schema.""" + """Update the target schema. """ table = self.table.as_table() resolved_schema = self.table.get_resolved_schema(self.apply_transforms) current_schema = self.table.get_current_schema()[:] From dcd4273f6682cd6f692600897637d09a479dffa7 Mon Sep 17 00:00:00 2001 From: Love Eklund Date: Fri, 4 Oct 2024 15:18:35 +0200 Subject: [PATCH 3/6] removing space to trigger tests --- target_bigquery/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_bigquery/core.py b/target_bigquery/core.py index 286de27..6e50fdd 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -588,7 +588,7 @@ class Denormalized: reraise=True, ) def update_schema(self: BaseBigQuerySink) -> None: # type: ignore - """Update the target schema. """ + """Update the target schema.""" table = self.table.as_table() resolved_schema = self.table.get_resolved_schema(self.apply_transforms) current_schema = self.table.get_current_schema()[:] From a078760354da0cebf884a63717d0d8202ca4cfa5 Mon Sep 17 00:00:00 2001 From: Love Eklund Date: Fri, 4 Oct 2024 15:48:43 +0200 Subject: [PATCH 4/6] adding space to trigger workflow --- target_bigquery/storage_write.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_bigquery/storage_write.py b/target_bigquery/storage_write.py index 8f66cd7..f80d6f8 100644 --- a/target_bigquery/storage_write.py +++ b/target_bigquery/storage_write.py @@ -53,7 +53,7 @@ # Stream specific constant MAX_IN_FLIGHT = 15 -"""Maximum number of concurrent requests per worker be processed by grpc before awaiting.""" +"""Maximum number of concurrent requests per worker be processed by grpc before awaiting. """ Dispatcher = Callable[[types.AppendRowsRequest], writer.AppendRowsFuture] StreamComponents = Tuple[Field, writer.AppendRowsStream, Dispatcher] From 1477ff2d1f36b551b8cfab802fba8ae45f2bb113 Mon Sep 17 00:00:00 2001 From: Love Eklund Date: Fri, 4 Oct 2024 15:51:33 +0200 Subject: [PATCH 5/6] removing space to trigger workflow --- target_bigquery/storage_write.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_bigquery/storage_write.py b/target_bigquery/storage_write.py index f80d6f8..8f66cd7 100644 --- a/target_bigquery/storage_write.py +++ b/target_bigquery/storage_write.py @@ -53,7 +53,7 @@ # Stream specific constant MAX_IN_FLIGHT = 15 -"""Maximum number of concurrent requests per worker be processed by grpc before awaiting. """ +"""Maximum number of concurrent requests per worker be processed by grpc before awaiting.""" Dispatcher = Callable[[types.AppendRowsRequest], writer.AppendRowsFuture] StreamComponents = Tuple[Field, writer.AppendRowsStream, Dispatcher] From 6f5b01cb40d81de48a34f2ba77255deb442d7740 Mon Sep 17 00:00:00 2001 From: Love Eklund Date: Fri, 4 Oct 2024 16:09:57 +0200 Subject: [PATCH 6/6] updating update_schema and adding function to get current version of tables schema --- target_bigquery/core.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/target_bigquery/core.py b/target_bigquery/core.py index 3cf69cc..6e50fdd 100644 --- a/target_bigquery/core.py +++ b/target_bigquery/core.py @@ -204,6 +204,9 @@ def create_table( time.sleep(5) return self._dataset, self._table + def get_current_schema(self) -> List[bigquery.SchemaField]: + return self._table.schema + def default_table_options(self) -> Dict[str, Any]: """Returns the default table options for this table.""" schema_dump = json.dumps(self.jsonschema) @@ -587,10 +590,11 @@ class Denormalized: def update_schema(self: BaseBigQuerySink) -> None: # type: ignore """Update the target schema.""" table = self.table.as_table() - current_schema = table.schema[:] - mut_schema = table.schema[:] - for expected_field in self.table.get_resolved_schema(self.apply_transforms): - if not any(field.name == expected_field.name for field in current_schema): + resolved_schema = self.table.get_resolved_schema(self.apply_transforms) + current_schema = self.table.get_current_schema()[:] + mut_schema = current_schema[:] + for expected_field in resolved_schema: + if not any(field.name == expected_field.name for field in mut_schema): mut_schema.append(expected_field) if len(mut_schema) > len(current_schema): table.schema = mut_schema