Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First version of distributed materialization #15

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 67 additions & 5 deletions dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,29 @@
{% macro on_cluster_clause(label) %}
{% set on_cluster = adapter.get_clickhouse_cluster_name() %}
{%- if on_cluster is not none %}
{{ label }} {{ on_cluster }}
{{ label }} '{{ on_cluster }}' {# -- this one for quoting cluster name; had some issues with names like something-somenthing #}
{%- endif %}
{%- endmacro -%}

{% macro clickhouse__create_distributed_table(relation, local_relation) %}
{%- set cluster = adapter.get_clickhouse_cluster_name() -%}
{%- set columns = clickhouse__get_col_types(local_relation) -%}
{%- set coltypes = columns | map(attribute='coltype') | join(', ') -%}
{%- set sharding = config.get('sharding_key') -%}

{% if sharding is none %}
create table {{ relation }} {{ on_cluster_clause(label="on cluster") }} (
{{ coltypes }}
)
engine = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ local_relation.name }}')
{% else %}
create table {{ relation }} {{ on_cluster_clause(label="on cluster") }} (
{{ coltypes }}
)
engine = Distributed('{{ cluster}}', '{{ relation.schema }}', '{{ local_relation.name }}', {{ sharding }})
{% endif %}
{% endmacro %}

{% macro clickhouse__create_table_as(temporary, relation, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}

Expand All @@ -57,7 +76,7 @@
{{ order_cols(label="order by") }}
{{ partition_cols(label="partition by") }}
{%- else %}
create table {{ relation.include(database=False) }}
create table {{ relation.include(database=True) }}
{{ on_cluster_clause(label="on cluster") }}
{{ engine_clause(label="engine") }}
{{ order_cols(label="order by") }}
Expand All @@ -68,6 +87,22 @@
)
{%- endmacro %}

{% macro clickhouse__create_empty_table(relation, view_relation) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set columns = clickhouse__get_col_types(view_relation) -%}
{%- set coltypes = columns | map(attribute='coltype') | join(', ') -%}

{{ sql_header if sql_header is not none }}

create table {{ relation }} {{ on_cluster_clause(label="on cluster") }} (
{{ coltypes }}
)
{{ engine_clause(label="engine") }}
{{ order_cols(label="order by") }}
{{ partition_cols(label="partition by") }}

{%- endmacro %}

{% macro clickhouse__create_view_as(relation, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}

Expand Down Expand Up @@ -128,6 +163,22 @@
{% do return(load_result('get_columns_in_relation').table) %}
{% endmacro %}

{% macro clickhouse__get_col_types(relation) %}
{% call statement('get_col_types', fetch_result=True) %}
select
'"' || name || '" ' || type AS coltype
from system.columns
where
table = '{{ relation.identifier }}'
{% if relation.schema %}
and database = '{{ relation.schema }}'
{% endif %}
order by position
{% endcall %}
{% do return(load_result('get_col_types').table) %}
{% endmacro %}


{% macro clickhouse__drop_relation(relation) -%}
{% call statement('drop_relation', auto_begin=False) -%}
drop table if exists {{ relation }} {{ on_cluster_clause(label="on cluster") }}
Expand All @@ -151,11 +202,22 @@

{% macro clickhouse__make_temp_relation(base_relation, suffix) %}
{% set tmp_identifier = base_relation.identifier ~ suffix %}
{% set tmp_schema = base_relation.schema %}
{% set tmp_relation = base_relation.incorporate(
path={"identifier": tmp_identifier, "schema": None}) -%}
path={"identifier": tmp_identifier, "schema": tmp_schema}) -%}
{% do return(tmp_relation) %}
{% endmacro %}

{% macro clickhouse__make_local_relation(base_relation) %}
{% set local_relation = clickhouse__make_temp_relation(base_relation, '__dbt_local') -%}
{% do return(local_relation) %}
{% endmacro %}

{% macro clickhouse__make_view_relation(base_relation) %}
{% set view_relation = clickhouse__make_temp_relation(base_relation, '__dbt_view') -%}
{% do return(view_relation) %}
{% endmacro %}


{% macro clickhouse__generate_database_name(custom_database_name=none, node=none) -%}
{% do return(None) %}
Expand All @@ -178,6 +240,6 @@

{% macro clickhouse__alter_column_type(relation, column_name, new_column_type) -%}
{% call statement('alter_column_type') %}
alter table {{ relation }} modify column {{ adapter.quote(column_name) }} {{ new_column_type }} {{ on_cluster_clause(label="on cluster") }}
alter table {{ relation }} {{ on_cluster_clause(label="on cluster") }} modify column {{ adapter.quote(column_name) }} {{ new_column_type }}
{% endcall %}
{% endmacro %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{# -- Clickhouse has table engine "distributed". It doesn't physically exist in database, but manages queries, addressed to local tables on nodes in cluster. #}
{# -- It means that you can run selects and inserts on cluster, not only on one particular node. #}
{# -- I find it very useful to incorporate this table engine in dbt-clickhouse to run dbt on whole cluster, not just on nodes separately #}

{# -- The main difficulty about making distributed table is that you cannot create table "as" result of select query #}
{# -- Instead the flow is to make local tables (empty, with declaring variables) then to make distributed table linked to all local tables on cluster #}
{# -- But when you create table "as" you don't have to declare variables. #}
{# -- The main problem is to write macro, that would create this section in DDL corresponding to your model's sql query #}

{% materialization distributed_incremental, adapter = 'clickhouse' -%}

{# -- First we need some helper relations: #}
{# -- - tmp_relation: in-memory relation to make insert into distributed #}
{# -- - local_relation: physical tables on cluster #}
{# -- - wiew_relation: magical temporary relation, that will help us declare wariable #}
{% set unique_key = config.get('unique_key') %}
{% set existing_relation = load_relation(this) %}
{% set target_relation = this.incorporate(type='table') %}
{% set tmp_relation = clickhouse__make_temp_relation(target_relation) %}
{% set local_relation = clickhouse__make_local_relation(target_relation) %}
{% set view_relation = clickhouse__make_view_relation(target_relation) %}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{# -- Main stuff is almost exactly the same, as in base adapter incremental materialization #}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

{% set trigger_full_refresh = (full_rquery,efresh_mode or existing_relation.is_view) %}

{# -- So here is the flow: we make view out of sql. Database spends almost nothing to create that. #}
{# -- But after that we have this view in system.columns and system.tables #}
{# -- Therefore we can pull column names and types out of system tables to paste them into our local and distributed DDL's #}
{% if existing_relation is none or trigger_full_refresh %}
{% do adapter.drop_relation(local_relation) %}
{% do adapter.drop_relation(target_relation) %}
{% do adapter.drop_relation(view_relation) %}
{% do run_query(create_view_as(view_relation, sql)) %}
{% do run_query(clickhouse__create_empty_table(local_relation, view_relation)) %}

{# -- Don't forget to drop helper view #}
{% do adapter.drop_relation(view_relation) %}
{% do run_query(clickhouse__create_distributed_table(target_relation, local_relation)) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% endif %}

{# -- And then we have only to run incremental upsert #}
{# -- Even if it is first time running or full-refresh, we are using incremental upsert, because we cannot use "create table as" #}

{% call statement("main") %}
{{ incremental_upsert(tmp_relation, target_relation, unique_key = unique_key) }}
{% endcall %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

{# -- TODO #}
{# -- Flow to modify schema on changes #}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{# -- The main desctiption is in comments to distributed_incremental #}

{% materialization distributed_table, adapter = 'clickhouse' %}

{% set unique_key = config.get('none') %}
{% set existing_relation = load_relation(this) %}
{% set target_relation = this.incorporate(type='table') %}
{% set tmp_relation = clickhouse__make_temp_relation(target_relation) %}
{% set local_relation = clickhouse__make_local_relation(target_relation) %}
{% set view_relation = clickhouse__make_view_relation(target_relation) %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

{% do adapter.drop_relation(local_relation) %}
{% do adapter.drop_relation(target_relation) %}
{% do adapter.drop_relation(view_relation) %}
{% do run_query(create_view_as(view_relation, sql)) %}
{% do run_query(clickhouse__create_empty_table(local_relation, view_relation)) %}
{% do adapter.drop_relation(view_relation) %}
{% do run_query(clickhouse__create_distributed_table(target_relation, local_relation)) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}

{% call statement("main") %}
{{ incremental_upsert(tmp_relation, target_relation, unique_key = unique_key) }}
{% endcall %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
28 changes: 28 additions & 0 deletions dbt/include/clickhouse/macros/materializations/view.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{%- materialization view -%}

{% set target_relation = this.incorporate(type='view') %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}


{{ drop_relation_if_exists(target_relation) }}


-- build model
{% do run_query(create_view_as(target_relation, sql)) %}

-- cleanup
-- move the existing view out of the way


{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ drop_relation_if_exists(backup_relation) }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization -%}