Skip to content

Commit

Permalink
Support new Marketing Cloud APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesweakley committed Jun 7, 2021
1 parent 4f920c8 commit 7dad9f8
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 38 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ After creating an Omnata Push integration, you will be provided with External Fu
packages:
- git: "https://github.com/omnata-labs/dbt-omnata-push.git"
revision: 0.5.0
revision: 0.6.0
```

Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'omnata_push'
version: '0.5.0'
version: '0.6.0'
config-version: 2

# This setting configures which "profile" dbt uses for this project.
Expand Down
81 changes: 45 additions & 36 deletions macros/apps/operations/marketing_cloud_data_extension_upload.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,68 @@
{%- set data_extension_name = config.get('data_extension_name') -%}
{%- set omnata_functions_database = var("omnata_functions_database", target.database) -%}
{%- set omnata_functions_schema = var("omnata_functions_schema", target.schema) -%}

{%- set temp_table = 'temp_'+omnata_push.random_int(10) -%}

{# -- Store the load job details in the jobs table, including the results of checking the data extension #}
{% call statement('main') -%}
insert into {{ ref('omnata_push','sfmc_load_tasks') }} (job_id,load_task_name,object_name,operation,creation_time,creation_metadata)
select '{{ job_id }}',
'{{ load_task_name }}',
'{{ data_extension_name }}',
'{{ operation }}',
current_timestamp(),
"{{ omnata_functions_database }}"."{{ omnata_functions_schema }}".SFMC_DATA_EXTENSION_MANAGE(PARSE_JSON('{"operation":"ensure_exists",
create temp table {{ temp_table }} as(
select "{{ omnata_functions_database }}"."{{ omnata_functions_schema }}".SFMC_DATA_EXTENSION_MANAGE(PARSE_JSON('{"operation":"ensure_exists",
"extension_name":"{{ data_extension_name }}",
"force":"{{ force_check }}",
"extension_fields": {{ data_extension_fields | tojson }}
}'))
}')) as metadata_creation_result
)
{%- endcall %}

{# -- Load the data in batches, waiting for the result so that we can store it at the record level #}
{% call statement('main') -%}
insert into {{ ref('omnata_push','sfmc_load_task_logs') }}
with load_source as (
{{ sql }}
insert all
when row_index=1 then
into {{ ref('omnata_push','sfmc_load_tasks') }} (job_id, load_task_name,object_name,operation,creation_time,creation_metadata) values (job_id, load_task_name,object_name,operation,creation_time,creation_metadata)
into {{ ref('omnata_push','sfmc_load_task_logs') }} (job_id, job_log_entry_id,load_task_name,object_name,operation,record,result) values (job_id, job_log_entry_id,load_task_name,object_name,operation,record,result)
else
into {{ ref('omnata_push','sfmc_load_task_logs') }} (job_id, job_log_entry_id,load_task_name,object_name,operation,record,result) values (job_id, job_log_entry_id,load_task_name,object_name,operation,record,result)

with parameters as(
-- This section determines the data import settings
select PARSE_JSON('{"name":"{{ data_extension_name }}","operation":"{{ import_type }}"}') as import_parameters, metadata_creation_result
from {{ temp_table }}
)
,data_indexed as(
,load_source as(
-- This section determines which data is uploaded to Marketing Cloud, and the field names
{{ sql }}
-- --------------------------------------------------------------
),data_indexed as( -- assign row numbers to match results
select row_number() over (partition by null order by null) as row_index,
(row_index/100)::int as batch,
(row_index/100)::int as batch_number,
record
from load_source
)
,data_staged as(
),data_staged as( -- batch records for efficiency, stage the data for upload
select "{{ omnata_functions_database }}"."{{ omnata_functions_schema }}".SFMC_STAGE_DATA(array_agg(array_construct(row_index,record))) as staged_result
from data_indexed
group by batch
)
,data_imported as(
select "{{ omnata_functions_database }}"."{{ omnata_functions_schema }}".SFMC_DE_IMPORT(
PARSE_JSON('{"name":"{{ data_extension_name }}","operation":"{{ import_type }}"}'),
any_value(staged_result)
) as import_result
from data_staged
),
data_import_result as(
select any_value(import_result) as import_result_output
from data_imported
group by batch_number
),staged_data_result as(
select any_value(staged_result) as staged_query_id
from data_staged
),data_imported as( -- perform the import
select staged_query_id,"{{ omnata_functions_database }}"."{{ omnata_functions_schema }}".SFMC_AWAIT_RESULTS_POLL("{{ omnata_functions_database }}"."{{ omnata_functions_schema }}".SFMC_DE_IMPORT(import_parameters,staged_query_id)) as import_result
from staged_data_result,parameters
),import_results as( -- retrieve the results
select
staged_query_id as job_id,
'{{ load_task_name }}' as load_task_name,
UUID_STRING() as job_log_entry_id,
'{{ data_extension_name }}' as object_name,
'{{ import_type }}' as operation,
current_timestamp() as creation_time,
metadata_creation_result as creation_metadata,
row_index,
record,
SFMC_FETCH_RESULTS(staged_query_id,row_index) as result
from data_indexed,data_imported,parameters
where import_result=true
)
select '{{ job_id }}' as job_id,
UUID_STRING() as job_log_entry_id,
'{{ load_task_name }}',
'{{ data_extension_name }}',
'{{ operation }}',
record,
"{{ omnata_functions_database }}"."{{ omnata_functions_schema }}".SFMC_FETCH_RESULTS(import_result_output,row_index) as result
from data_indexed,data_import_result
select * from import_results
{%- endcall %}

{{ adapter.commit() }}
Expand Down

0 comments on commit 7dad9f8

Please sign in to comment.