Skip to content

Commit

Permalink
Feature/python model v1 (#209)
Browse files Browse the repository at this point in the history
Co-authored-by: Jeremy Cohen <[email protected]>
Co-authored-by: Emily Rockman <[email protected]>
Co-authored-by: Stu Kilgore <[email protected]>
  • Loading branch information
4 people authored Jul 28, 2022
1 parent 946ec69 commit 55bbd3d
Show file tree
Hide file tree
Showing 12 changed files with 296 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
- Implement `create_schema` via SQL, instead of Python method, allowing users to override if desired. `drop_schema` remains a Python method for the time being. ([#182](https://github.com/dbt-labs/dbt-bigquery/issues/182), [#183](https://github.com/dbt-labs/dbt-bigquery/pull/183))
- Added incrementail materializations for python models via DataProc. ([#226](https://github.com/dbt-labs/dbt-bigquery/pull/226))

### Under the hood
- Implement minimal changes to support dbt Core incremental materialization refactor. ([#232](http://github.com/dbt-labs/dbt-bigquery/issues/232), [#223](https://github.com/dbt-labs/dbt-biquery/pull/223))
Expand Down
21 changes: 15 additions & 6 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ class BigQueryCredentials(Credentials):
client_secret: Optional[str] = None
token_uri: Optional[str] = None

dataproc_region: Optional[str] = None
dataproc_cluster_name: Optional[str] = None
gcs_bucket: Optional[str] = None

scopes: Optional[Tuple[str, ...]] = (
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/cloud-platform",
Expand Down Expand Up @@ -154,6 +158,7 @@ def _connection_keys(self):
"job_retries",
"job_creation_timeout_seconds",
"job_execution_timeout_seconds",
"gcs_bucket",
)

@classmethod
Expand Down Expand Up @@ -265,7 +270,7 @@ def format_rows_number(self, rows_number):
return f"{rows_number:3.1f}{unit}".strip()

@classmethod
def get_bigquery_credentials(cls, profile_credentials):
def get_google_credentials(cls, profile_credentials) -> GoogleCredentials:
method = profile_credentials.method
creds = GoogleServiceAccountCredentials.Credentials

Expand Down Expand Up @@ -295,8 +300,8 @@ def get_bigquery_credentials(cls, profile_credentials):
raise FailedToConnectException(error)

@classmethod
def get_impersonated_bigquery_credentials(cls, profile_credentials):
source_credentials = cls.get_bigquery_credentials(profile_credentials)
def get_impersonated_credentials(cls, profile_credentials):
source_credentials = cls.get_google_credentials(profile_credentials)
return impersonated_credentials.Credentials(
source_credentials=source_credentials,
target_principal=profile_credentials.impersonate_service_account,
Expand All @@ -305,11 +310,15 @@ def get_impersonated_bigquery_credentials(cls, profile_credentials):
)

@classmethod
def get_bigquery_client(cls, profile_credentials):
def get_credentials(cls, profile_credentials):
if profile_credentials.impersonate_service_account:
creds = cls.get_impersonated_bigquery_credentials(profile_credentials)
return cls.get_impersonated_credentials(profile_credentials)
else:
creds = cls.get_bigquery_credentials(profile_credentials)
return cls.get_google_credentials(profile_credentials)

@classmethod
def get_bigquery_client(cls, profile_credentials):
creds = cls.get_credentials(profile_credentials)
execution_project = profile_credentials.execution_project
location = getattr(profile_credentials, "location", None)

Expand Down
118 changes: 114 additions & 4 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

from dbt import ui # type: ignore
from dbt.adapters.base import BaseAdapter, available, RelationType, SchemaSearchMap, AdapterConfig
from dbt.adapters.base.impl import log_code_execution

from dbt.adapters.cache import _make_key

from dbt.adapters.bigquery.relation import BigQueryRelation
from dbt.adapters.bigquery import BigQueryColumn
from dbt.adapters.bigquery import BigQueryConnectionManager
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse
from dbt.contracts.graph.manifest import Manifest
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
Expand Down Expand Up @@ -286,7 +289,8 @@ def get_relation(self, database: str, schema: str, identifier: str) -> BigQueryR
# TODO: the code below is copy-pasted from SQLAdapter.create_schema. Is there a better way?
def create_schema(self, relation: BigQueryRelation) -> None:
# use SQL 'create schema'
relation = relation.without_identifier()
relation = relation.without_identifier() # type: ignore

fire_event(SchemaCreation(relation=_make_key(relation)))
kwargs = {
"relation": relation,
Expand Down Expand Up @@ -373,11 +377,11 @@ def _materialize_as_view(self, model: Dict[str, Any]) -> str:
model_database = model.get("database")
model_schema = model.get("schema")
model_alias = model.get("alias")
model_sql = model.get("compiled_sql")
model_code = model.get("compiled_code")

logger.debug("Model SQL ({}):\n{}".format(model_alias, model_sql))
logger.debug("Model SQL ({}):\n{}".format(model_alias, model_code))
self.connections.create_view(
database=model_database, schema=model_schema, table_name=model_alias, sql=model_sql
database=model_database, schema=model_schema, table_name=model_alias, sql=model_code
)
return "CREATE VIEW"

Expand Down Expand Up @@ -830,3 +834,109 @@ def run_sql_for_tests(self, sql, fetch, conn=None):
return res[0]
else:
return list(res)

@available.parse_none
@log_code_execution
def submit_python_job(self, parsed_model: dict, compiled_code: str):
# TODO improve the typing here. N.B. Jinja returns a `jinja2.runtime.Undefined` instead
# of `None` which evaluates to True!

# TODO limit this function to run only when doing the materialization of python nodes
# TODO should we also to timeout here?

# validate all additional stuff for python is set
schema = getattr(parsed_model, "schema", self.config.credentials.schema)
identifier = parsed_model["alias"]
python_required_configs = [
"dataproc_region",
"dataproc_cluster_name",
"gcs_bucket",
]
for required_config in python_required_configs:
if not getattr(self.connections.profile.credentials, required_config):
raise ValueError(
f"Need to supply {required_config} in profile to submit python job"
)
if not hasattr(self, "dataproc_helper"):
self.dataproc_helper = DataProcHelper(self.connections.profile.credentials)
model_file_name = f"{schema}/{identifier}.py"
# upload python file to GCS
self.dataproc_helper.upload_to_gcs(model_file_name, compiled_code)
# submit dataproc job
self.dataproc_helper.submit_dataproc_job(model_file_name)

# TODO proper result for this
message = "OK"
code = None
num_rows = None
bytes_processed = None
return BigQueryAdapterResponse( # type: ignore[call-arg]
_message=message,
rows_affected=num_rows,
code=code,
bytes_processed=bytes_processed,
)


class DataProcHelper:
def __init__(self, credential):
"""_summary_
Args:
credential (_type_): _description_
"""
try:
# Library only needed for python models
from google.cloud import dataproc_v1
from google.cloud import storage
except ImportError:
raise RuntimeError(
"You need to install [dataproc] extras to run python model in dbt-bigquery"
)
self.credential = credential
self.GoogleCredentials = BigQueryConnectionManager.get_credentials(credential)
self.storage_client = storage.Client(
project=self.credential.database, credentials=self.GoogleCredentials
)
self.job_client = dataproc_v1.JobControllerClient(
client_options={
"api_endpoint": "{}-dataproc.googleapis.com:443".format(
self.credential.dataproc_region
)
},
credentials=self.GoogleCredentials,
)

def upload_to_gcs(self, filename: str, compiled_code: str):
bucket = self.storage_client.get_bucket(self.credential.gcs_bucket)
blob = bucket.blob(filename)
blob.upload_from_string(compiled_code)

def submit_dataproc_job(self, filename: str):
# Create the job config.
job = {
"placement": {"cluster_name": self.credential.dataproc_cluster_name},
"pyspark_job": {
"main_python_file_uri": "gs://{}/{}".format(self.credential.gcs_bucket, filename)
},
}
operation = self.job_client.submit_job_as_operation(
request={
"project_id": self.credential.database,
"region": self.credential.dataproc_region,
"job": job,
}
)
response = operation.result()
return response

# TODO: there might be useful results here that we can parse and return
# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
# matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
# output = (
# self.storage_client
# .get_bucket(matches.group(1))
# .blob(f"{matches.group(2)}.000000000")
# .download_as_string()
# )
45 changes: 29 additions & 16 deletions dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,35 @@
{%- do return(bigquery_options(opts)) -%}
{%- endmacro -%}

{% macro bigquery__create_table_as(temporary, relation, sql) -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}

{{ sql_header if sql_header is not none }}

create or replace table {{ relation }}
{{ partition_by(partition_config) }}
{{ cluster_by(raw_cluster_by) }}
{{ bigquery_table_options(config, model, temporary) }}
as (
{{ sql }}
);
{% macro bigquery__create_table_as(temporary, relation, compiled_code, language='sql') -%}
{%- if language == 'sql' -%}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set raw_cluster_by = config.get('cluster_by', none) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- set partition_config = adapter.parse_partition_by(raw_partition_by) -%}

{{ sql_header if sql_header is not none }}

create or replace table {{ relation }}
{{ partition_by(partition_config) }}
{{ cluster_by(raw_cluster_by) }}
{{ bigquery_table_options(config, model, temporary) }}
as (
{{ compiled_code }}
);
{%- elif language == 'python' -%}
{#--
N.B. Python models _can_ write to temp views HOWEVER they use a different session
and have already expired by the time they need to be used (I.E. in merges for incremental models)

TODO: Deep dive into spark sessions to see if we can reuse a single session for an entire
dbt invocation.
--#}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation.quote(database=False, schema=False, identifier=False)) }}
{%- else -%}
{% do exceptions.raise_compiler_error("bigquery__create_table_as macro didn't get supported language, it got %s" % language) %}
{%- endif -%}

{%- endmacro -%}

Expand Down
57 changes: 41 additions & 16 deletions dbt/include/bigquery/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
{% macro declare_dbt_max_partition(relation, partition_by, sql) %}
{% macro declare_dbt_max_partition(relation, partition_by, complied_code, language='sql') %}

{% if '_dbt_max_partition' in sql %}
{#-- TODO: revisit partitioning with python models --#}
{%- if '_dbt_max_partition' in complied_code and language == 'sql' -%}

declare _dbt_max_partition {{ partition_by.data_type }} default (
select max({{ partition_by.field }}) from {{ this }}
where {{ partition_by.field }} is not null
);

{% endif %}
{%- endif -%}

{% endmacro %}

Expand Down Expand Up @@ -74,7 +75,7 @@
{{ declare_dbt_max_partition(this, partition_by, sql) }}

-- 1. create a temp table
{{ create_table_as(True, tmp_relation, sql) }}
{{ create_table_as(True, tmp_relation, compiled_code) }}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}
Expand Down Expand Up @@ -139,6 +140,7 @@

{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{%- set language = model['language'] %}

{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) %}
Expand All @@ -160,43 +162,66 @@
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif existing_relation.is_view %}
{#-- There's no way to atomically replace a view with a table on BQ --#}
{{ adapter.drop_relation(existing_relation) }}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% elif full_refresh_mode %}
{#-- If the partition/cluster config has changed, then we must drop and recreate --#}
{% if not adapter.is_replaceable(existing_relation, partition_by, cluster_by) %}
{% do log("Hard refreshing " ~ existing_relation ~ " because it is not replaceable") %}
{{ adapter.drop_relation(existing_relation) }}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% else %}
{%- if language == 'python' and strategy == 'insert_overwrite' -%}
{#-- This lets us move forward assuming no python will be directly templated into a query --#}
{%- set python_unsupported_msg -%}
The 'insert_overwrite' strategy is not yet supported for python models.
{%- endset %}
{% do exceptions.raise_compiler_error(python_unsupported_msg) %}
{%- endif -%}

{% set tmp_relation_exists = false %}
{% if on_schema_change != 'ignore' %} {# Check first, since otherwise we may not build a temp table #}
{% do run_query(
declare_dbt_max_partition(this, partition_by, sql) + create_table_as(True, tmp_relation, sql)
) %}
{% if on_schema_change != 'ignore' or language == 'python' %}
{#-- Check first, since otherwise we may not build a temp table --#}
{#-- Python always needs to create a temp table --#}
{%- call statement('create_tmp_relation', language=language) -%}
{{ declare_dbt_max_partition(this, partition_by, compiled_code, language) +
create_table_as(True, tmp_relation, compiled_code, language)
}}
{%- endcall -%}
{% set tmp_relation_exists = true %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% endif %}

{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% set build_sql = bq_generate_incremental_build_sql(
strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists
) %}

{% endif %}
{%- call statement('main') -%}
{{ build_sql }}
{% endcall %}

{%- if language == 'python' and tmp_relation -%}
{{ adapter.drop_relation(tmp_relation) }}
{%- endif -%}

{%- call statement('main') -%}
{{ build_sql }}
{% endcall %}
{% endif %}

{{ run_hooks(post_hooks) }}

Expand Down
Loading

0 comments on commit 55bbd3d

Please sign in to comment.