Skip to content

Commit

Permalink
✨ Add support for location configuration (#13)
Browse files Browse the repository at this point in the history
- Bigquery dataset location
- GCS bucket location
- Ensure that both GCS bucket and BigQuery dataset are in the same location
- Tests passed
  • Loading branch information
VinceLegendre authored Feb 3, 2023
1 parent 60432d4 commit 5a884b2
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 40 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,6 @@ dmypy.json
# Test plugins
plugins/
output/

# IDE
.idea/
52 changes: 27 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ First a valid example to give context to the below including a nested key exampl
"denormalized": true,
"credentials_path": "...",
"dataset": "my_dataset",
"location": "us-central1",
"batch_size": 500,
"column_name_transforms": {
"snake_case": true
Expand All @@ -74,31 +75,32 @@ First a valid example to give context to the below including a nested key exampl
```


| Setting | Required | Default | Description |
|:--------------------|:--------:|:-------:|:------------|
| credentials_path | False | None | The path to a gcp credentials json file. |
| credentials_json | False | None | A JSON string of your service account JSON file. |
| project | True | None | The target GCP project to materialize data into. |
| dataset | True | None | The target dataset to materialize data into. |
| batch_size | False | 500 | The maximum number of rows to send in a single batch to the worker. This should be configured based on load method. For `storage_write_api` and `streaming_insert` it should be `<=500`, for the LoadJob sinks, it can be much higher, ie `>100,000` |
| timeout | False | 600 | Default timeout for batch_job and gcs_stage derived LoadJobs. |
| denormalized | False | 0 | Determines whether to denormalize the data before writing to BigQuery. A false value will write data using a fixed JSON column based schema, while a true value will write data using a dynamic schema derived from the tap. |
| method | True | storage_write_api | The method to use for writing to BigQuery. Must be one of `batch_job`, `storage_write_api`, `gcs_stage`, `streaming_insert` |
| generate_view | False | 0 | Determines whether to generate a view based on the SCHEMA message parsed from the tap. Only valid if denormalized=false meaning you are using the fixed JSON column based schema. |
| bucket | False | None | The GCS bucket to use for staging data. Only used if method is gcs_stage. |
| cluster_on_key_properties| False | False | Indicates if we should use the key_properties from the tap to cluster our table. By default, tables created by this target cluster on `_sdc_batched_at`. |
| partition_granularity| False | "month" | Indicates the granularity of the created table partitioning scheme which is based on `_sdc_batched_at`. By default the granularity is monthly. Must be one of: "hour", "day", "month", "year". |
| column_name_transforms.lower| False | None | Lowercase column names. |
| column_name_transforms.quote| False | None | Quote column names in any generated DDL. |
| column_name_transforms.add_underscore_when_invalid| False | None | Add an underscore to the column name if it starts with a digit to make it valid. |
| column_name_transforms.snake_case| False | None | Snake case all incoming column names. Does not apply to fixed schema loads but _does_ apply to the view auto-generated over them. |
| options.storage_write_batch_mode| False | None | By default, we use the default stream (Committed mode) in the storage_write_api load method which results in streaming records which are immediately available and is generally fastest. If this is set to true, we will use the application created streams (Committed mode) to transactionally batch data on STATE messages and at end of pipe. |
| options.process_pool | False | None | By default we use an autoscaling threadpool to write to BigQuery. If set to true, we will use a process pool. |
| options.max_workers_per_stream| False | None | By default, each sink type has a preconfigured max worker limit. This sets an override for maximum number of workers per stream. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth| False | None | The max depth to flatten schemas. |
| Setting | Required | Default | Description |
|:---------------------------------------------------|:--------:|:-----------------:|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| credentials_path | False | None | The path to a gcp credentials json file. |
| credentials_json | False | None | A JSON string of your service account JSON file. |
| project | True | None | The target GCP project to materialize data into. |
| dataset | True | None | The target dataset to materialize data into. |
| location | False | US | The target dataset location to materialize data into. Applies also to the GCS bucket if using `gcs_stage` load method. |
| batch_size | False | 500 | The maximum number of rows to send in a single batch to the worker. This should be configured based on load method. For `storage_write_api` and `streaming_insert` it should be `<=500`, for the LoadJob sinks, it can be much higher, ie `>100,000` |
| timeout | False | 600 | Default timeout for batch_job and gcs_stage derived LoadJobs. |
| denormalized | False | 0 | Determines whether to denormalize the data before writing to BigQuery. A false value will write data using a fixed JSON column based schema, while a true value will write data using a dynamic schema derived from the tap. |
| method | True | storage_write_api | The method to use for writing to BigQuery. Must be one of `batch_job`, `storage_write_api`, `gcs_stage`, `streaming_insert` |
| generate_view | False | 0 | Determines whether to generate a view based on the SCHEMA message parsed from the tap. Only valid if denormalized=false meaning you are using the fixed JSON column based schema. |
| bucket | False | None | The GCS bucket to use for staging data. Only used if method is gcs_stage. |
| cluster_on_key_properties | False | False | Indicates if we should use the key_properties from the tap to cluster our table. By default, tables created by this target cluster on `_sdc_batched_at`. |
| partition_granularity | False | "month" | Indicates the granularity of the created table partitioning scheme which is based on `_sdc_batched_at`. By default the granularity is monthly. Must be one of: "hour", "day", "month", "year". |
| column_name_transforms.lower | False | None | Lowercase column names. |
| column_name_transforms.quote | False | None | Quote column names in any generated DDL. |
| column_name_transforms.add_underscore_when_invalid | False | None | Add an underscore to the column name if it starts with a digit to make it valid. |
| column_name_transforms.snake_case | False | None | Snake case all incoming column names. Does not apply to fixed schema loads but _does_ apply to the view auto-generated over them. |
| options.storage_write_batch_mode | False | None | By default, we use the default stream (Committed mode) in the storage_write_api load method which results in streaming records which are immediately available and is generally fastest. If this is set to true, we will use the application created streams (Committed mode) to transactionally batch data on STATE messages and at end of pipe. |
| options.process_pool | False | None | By default we use an autoscaling threadpool to write to BigQuery. If set to true, we will use a process pool. |
| options.max_workers_per_stream | False | None | By default, each sink type has a preconfigured max worker limit. This sets an override for maximum number of workers per stream. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |

A full list of supported settings and capabilities is available by running: `target-bigquery --about`

Expand Down
1 change: 0 additions & 1 deletion meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,3 @@ plugins:
dataset: raw_testing
batch_size: 500
timeout: 300
bucket: ${GCS_BUCKET}
40 changes: 31 additions & 9 deletions target_bigquery/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,13 @@ def as_table(self, apply_transforms: bool = False, **kwargs) -> bigquery.Table:
setattr(table, option, value)
return table

def as_dataset(self) -> bigquery.Dataset:
"""Returns a Dataset instance for this table."""
return bigquery.Dataset(self.as_dataset_ref())
def as_dataset(self, **kwargs) -> bigquery.Dataset:
"""Returns a Dataset instance for this dataset."""
dataset = bigquery.Dataset(self.as_dataset_ref())
config = {**self.default_dataset_options(), **kwargs}
for option, value in config.items():
setattr(dataset, option, value)
return dataset

def create_table(
self,
Expand All @@ -143,13 +147,22 @@ def create_table(
table in a single method call. It is idempotent and will not create
a new table if one already exists."""
if not hasattr(self, "_dataset"):
self._dataset = client.create_dataset(self.as_dataset(), exists_ok=True)
try:
self._dataset = client.create_dataset(self.as_dataset(**kwargs['dataset']), exists_ok=False)
except Conflict:
dataset = client.get_dataset(self.as_dataset(**kwargs['dataset']))
if dataset.location != kwargs['dataset']['location']:
raise Exception(f"Location of existing dataset {dataset.dataset_id} ({dataset.location})"
f" does not match specified location: {kwargs['dataset']['location']}")
else:
self._dataset = dataset
if not hasattr(self, "_table"):
try:
self._table = client.create_table(
self.as_table(
apply_transforms and self.ingestion_strategy is not IngestionStrategy.FIXED,
**kwargs,
apply_transforms
and self.ingestion_strategy != IngestionStrategy.FIXED,
**kwargs['table']
)
)
except Conflict:
Expand Down Expand Up @@ -178,6 +191,13 @@ def default_table_options(self) -> Dict[str, Any]:
),
}

@staticmethod
def default_dataset_options() -> Dict[str, Any]:
"""Returns the default dataset options for this dataset."""
return {
"location": "US"
}

def __hash__(self) -> int:
return hash((self.name, self.dataset, self.project, json.dumps(self.jsonschema)))

Expand Down Expand Up @@ -316,15 +336,17 @@ def preprocess_record(self, record: Dict[str, Any], context: Dict[str, Any]) ->
)
def create_target(self, key_properties: Optional[List[str]] = None) -> None:
"""Create the table in BigQuery."""
kwargs = {}
kwargs = {"table": {}, "dataset": {}}
if key_properties and self.config.get("cluster_on_key_properties", False):
kwargs["clustering_fields"] = key_properties[:4]
kwargs["table"]["clustering_fields"] = key_properties[:4]
partition_grain: str = self.config.get("partition_granularity")
if partition_grain:
kwargs["time_partitioning"] = TimePartitioning(
kwargs["table"]["time_partitioning"] = TimePartitioning(
type_=PARTITION_STRATEGY[partition_grain.upper()],
field="_sdc_batched_at",
)
location: str = self.config.get("location", BigQueryTable.default_dataset_options()["location"])
kwargs["dataset"]["location"] = location
self.table.create_table(self.client, self.apply_transforms, **kwargs)
if self.generate_view:
self.client.query(
Expand Down
Loading

0 comments on commit 5a884b2

Please sign in to comment.