From 82f05c312f2ebcf7a5b71a4cc9b881848164a90b Mon Sep 17 00:00:00 2001 From: Marcos Marx Date: Fri, 20 Aug 2021 15:38:28 -0300 Subject: [PATCH 1/7] pg ssh normalization squashed --- .../airbyte/integrations/base/ssh/readme.md | 14 +- .../bases/base-normalization/.dockerignore | 1 + .../bases/base-normalization/Dockerfile | 3 + .../bases/base-normalization/build.gradle | 11 ++ .../bases/base-normalization/entrypoint.sh | 4 + .../transform_catalog/stream_processor.py | 181 ++++++------------ .../transform_config/transform.py | 109 +++++++---- .../unit_tests/test_transform_config.py | 133 +++++++++++++ .../bases/source-acceptance-test/Dockerfile | 2 +- .../unit_tests/test_utils.py | 132 +++++++------ ...hKeyPostgresDestinationAcceptanceTest.java | 2 +- ...wordPostgresDestinationAcceptanceTest.java | 2 +- .../SshPostgresDestinationAcceptanceTest.java | 2 +- .../views/auth/SignupPage/SignupPage.tsx | 1 + .../workers/DbtTransformationRunner.java | 5 +- .../DefaultNormalizationRunner.java | 36 ++-- .../dbt_transformation_entrypoint.sh | 6 + .../src/main/resources/sshtunneling.sh | 64 +++++++ 18 files changed, 451 insertions(+), 257 deletions(-) create mode 100644 airbyte-workers/src/main/resources/sshtunneling.sh diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/readme.md b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/readme.md index 4f99198029eca..fc347b985bda0 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/readme.md +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/readme.md @@ -1,13 +1,14 @@ -# Developing an SSH Source +# Developing an SSH Connector ## Goal -Easy development of any source that needs the ability to connect to a resource via SSH Tunnel. +Easy development of any connector that needs the ability to connect to a resource via SSH Tunnel. ## Overview Our SSH connector support is designed to be easy to plug into any existing connector. There are a few major pieces to consider: 1. Add SSH Configuration to the Spec - for SSH, we need to take in additional configuration, so we need to inject extra fields into the connector configuration. 2. Add SSH Logic to the Connector - before the connector code begins to execute we need to start an SSH tunnel. This library provides logic to create that tunnel (and clean it up). 3. Acceptance Testing - it is a good practice to include acceptance testing for the SSH version of a connector for at least one of the SSH types (password or ssh key). While unit testing for the SSH functionality exists in this package (coming soon), high-level acceptance testing to make sure this feature works with the individual connector belongs in the connector. +4. Normalization Support for Destinations - if the connector is a destination and supports normalization, there's a small change required in the normalization code to update the config so that dbt uses the right credentials for the SSH tunnel. ## How To @@ -21,6 +22,15 @@ Our SSH connector support is designed to be easy to plug into any existing conne ### Acceptance Testing 1. The only difference between existing acceptance testing and acceptance testing with SSH is that the configuration that is used for testing needs to contain additional fields. You can see the `Postgres Source ssh key creds` in lastpass to see an example of what that might look like. Those credentials leverage an existing bastion host in our test infrastructure. (As future work, we want to get rid of the need to use a static bastion server and instead do it in docker so we can run it all locally.) +### Normalization Support for Destinations +1. The core functionality for ssh tunnelling with normalization is already in place but you'll need to add a small tweak to `transform_config/transform.py` in the normalization module. Find the function `transform_{connector}()` and add at the start: + ``` + if TransformConfig.is_ssh_tunnelling(config): + config = TransformConfig.get_ssh_altered_config(config, port_key="port", host_key="host") + ``` + Replace port_key and host_key as necessary. Look at `transform_postgres()` to see an example. +2. If your `host_key="host"` and `port_key="port"` then step 1 should be sufficient. However if the key names differ for your connector, you will also need to add some logic into `sshtunneling.sh` (within airbyte-workers) to handle this, as currently it assumes that the keys are exactly `host` and `port`. + ## Misc ### How to wrap the protocol in an SSH Tunnel diff --git a/airbyte-integrations/bases/base-normalization/.dockerignore b/airbyte-integrations/bases/base-normalization/.dockerignore index c1b470f707044..53f6d44e83e9d 100644 --- a/airbyte-integrations/bases/base-normalization/.dockerignore +++ b/airbyte-integrations/bases/base-normalization/.dockerignore @@ -1,6 +1,7 @@ * !Dockerfile !entrypoint.sh +!build/sshtunneling.sh !setup.py !normalization !dbt-project-template diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index 1155b2410081b..3fce26df39ea9 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -17,8 +17,11 @@ RUN pip install cx_Oracle COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte +RUN apt-get update && apt-get install -y jq sshpass + WORKDIR /airbyte COPY entrypoint.sh . +COPY build/sshtunneling.sh . WORKDIR /airbyte/normalization_code COPY normalization ./normalization diff --git a/airbyte-integrations/bases/base-normalization/build.gradle b/airbyte-integrations/bases/base-normalization/build.gradle index 8b9aed890c061..c90d16893b5b3 100644 --- a/airbyte-integrations/bases/base-normalization/build.gradle +++ b/airbyte-integrations/bases/base-normalization/build.gradle @@ -8,9 +8,20 @@ airbytePython { } dependencies { + implementation project(':airbyte-workers') implementation files(project(':airbyte-integrations:bases:airbyte-protocol').airbyteDocker.outputs) } +// we need to access the sshtunneling script from airbyte-workers for ssh support +task copySshScript(type: Copy, dependsOn: [project(':airbyte-workers').processResources]) { + from "${project(':airbyte-workers').buildDir}/resources/main" + into "${buildDir}" + include "sshtunneling.sh" + if(inputs.sourceFiles.empty) throw new StopExecutionException("Couldn't find sshtunneling.sh to copy") +} +test.dependsOn copySshScript +assemble.dependsOn copySshScript + installReqs.dependsOn(":airbyte-integrations:bases:airbyte-protocol:installReqs") integrationTest.dependsOn(build) diff --git a/airbyte-integrations/bases/base-normalization/entrypoint.sh b/airbyte-integrations/bases/base-normalization/entrypoint.sh index 08c20eb32976a..74678fb0f2f7e 100755 --- a/airbyte-integrations/bases/base-normalization/entrypoint.sh +++ b/airbyte-integrations/bases/base-normalization/entrypoint.sh @@ -94,8 +94,12 @@ function main() { case "$CMD" in run) configuredbt + . /airbyte/sshtunneling.sh + openssh $CONFIG_FILE "${PROJECT_DIR}/localsshport.json" + trap 'closessh' EXIT # Run dbt to compile and execute the generated normalization models dbt run --profiles-dir "${PROJECT_DIR}" --project-dir "${PROJECT_DIR}" + closessh ;; configure-dbt) configuredbt diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index 61b222b12a954..b915377ef9494 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -24,7 +24,6 @@ import os -import re from typing import Dict, List, Optional, Tuple from airbyte_protocol.models.airbyte_protocol import DestinationSyncMode, SyncMode @@ -45,7 +44,6 @@ is_string, is_timestamp_with_time_zone, jinja_call, - remove_jinja, ) # using too many columns breaks ephemeral materialization (somewhere between 480 and 490 columns) @@ -79,7 +77,6 @@ def __init__( stream_name: str, destination_type: DestinationType, raw_schema: str, - default_schema: str, schema: str, source_sync_mode: SyncMode, destination_sync_mode: DestinationSyncMode, @@ -112,8 +109,6 @@ def __init__( self.sql_outputs: Dict[str, str] = {} self.parent: Optional["StreamProcessor"] = None self.is_nested_array: bool = False - self.default_schema: str = default_schema - self.airbyte_emitted_at = "_airbyte_emitted_at" @staticmethod def create_from_parent( @@ -135,7 +130,6 @@ def create_from_parent( stream_name=child_name, destination_type=parent.destination_type, raw_schema=parent.raw_schema, - default_schema=parent.default_schema, schema=parent.schema, # Nested Streams don't inherit parents sync modes? source_sync_mode=SyncMode.full_refresh, @@ -157,7 +151,6 @@ def create( stream_name: str, destination_type: DestinationType, raw_schema: str, - default_schema: str, schema: str, source_sync_mode: SyncMode, destination_sync_mode: DestinationSyncMode, @@ -190,7 +183,6 @@ def create( stream_name, destination_type, raw_schema, - default_schema, schema, source_sync_mode, destination_sync_mode, @@ -236,21 +228,14 @@ def process(self) -> List["StreamProcessor"]: ) if self.destination_sync_mode.value == DestinationSyncMode.append_dedup.value: from_table = self.add_to_outputs(self.generate_dedup_record_model(from_table, column_names), is_intermediate=True, suffix="ab4") - if self.destination_type == DestinationType.ORACLE: - where_clause = '\nwhere "_AIRBYTE_ROW_NUM" = 1' - else: - where_clause = "\nwhere _airbyte_row_num = 1" + where_clause = "\nwhere _airbyte_row_num = 1" from_table = self.add_to_outputs( self.generate_scd_type_2_model(from_table, column_names) + where_clause, is_intermediate=False, column_count=column_count, suffix="scd", ) - if self.destination_type == DestinationType.ORACLE: - where_clause = '\nwhere "_AIRBYTE_ACTIVE_ROW" = 1' - else: - where_clause = "\nwhere _airbyte_active_row = 1" - + where_clause = "\nwhere _airbyte_active_row = True" from_table = self.add_to_outputs( self.generate_final_model(from_table, column_names) + where_clause, is_intermediate=False, column_count=column_count ) @@ -327,10 +312,6 @@ def find_children_streams(self, from_table: str, column_names: Dict[str, Tuple[s return children def generate_json_parsing_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: - if self.destination_type == DestinationType.ORACLE: - table_alias = "" - else: - table_alias = "as table_alias" template = Template( """ -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema @@ -342,15 +323,13 @@ def generate_json_parsing_model(self, from_table: str, column_names: Dict[str, T {%- for field in fields %} {{ field }}, {%- endfor %} - {{ col_emitted_at }} -from {{ from_table }} {{ table_alias }} + _airbyte_emitted_at +from {{ from_table }} as table_alias {{ unnesting_after_query }} {{ sql_table_comment }} """ ) sql = template.render( - col_emitted_at=self.get_emitted_at(), - table_alias=table_alias, unnesting_before_query=self.unnesting_before_query(), parent_hash_id=self.parent_hash_id(), fields=self.extract_json_columns(column_names), @@ -360,16 +339,14 @@ def generate_json_parsing_model(self, from_table: str, column_names: Dict[str, T ) return sql - def get_emitted_at(self, in_jinja: bool = False): - return self.name_transformer.normalize_column_name(self.airbyte_emitted_at, in_jinja, False) - def extract_json_columns(self, column_names: Dict[str, Tuple[str, str]]) -> List[str]: return [ - self.extract_json_column(field, self.json_column_name, self.properties[field], column_names[field][0], "table_alias") + StreamProcessor.extract_json_column(field, self.json_column_name, self.properties[field], column_names[field][0], "table_alias") for field in column_names ] - def extract_json_column(self, property_name: str, json_column_name: str, definition: Dict, column_name: str, table_alias: str) -> str: + @staticmethod + def extract_json_column(property_name: str, json_column_name: str, definition: Dict, column_name: str, table_alias: str) -> str: json_path = [property_name] # In some cases, some destination aren't able to parse the JSON blob using the original property name # we make their life easier by using a pre-populated and sanitized column name instead... @@ -377,7 +354,6 @@ def extract_json_column(self, property_name: str, json_column_name: str, definit table_alias = f"{table_alias}" if "unnested_column_value" in json_column_name: table_alias = "" - json_extract = jinja_call(f"json_extract('{table_alias}', {json_column_name}, {json_path})") if "type" in definition: if is_array(definition["type"]): @@ -386,7 +362,6 @@ def extract_json_column(self, property_name: str, json_column_name: str, definit json_extract = jinja_call(f"json_extract('{table_alias}', {json_column_name}, {json_path}, {normalized_json_path})") elif is_simple_property(definition["type"]): json_extract = jinja_call(f"json_extract_scalar({json_column_name}, {json_path}, {normalized_json_path})") - return f"{json_extract} as {column_name}" def generate_column_typing_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: @@ -400,13 +375,12 @@ def generate_column_typing_model(self, from_table: str, column_names: Dict[str, {%- for field in fields %} {{ field }}, {%- endfor %} - {{ col_emitted_at }} + _airbyte_emitted_at from {{ from_table }} {{ sql_table_comment }} """ ) sql = template.render( - col_emitted_at=self.get_emitted_at(), parent_hash_id=self.parent_hash_id(), fields=self.cast_property_types(column_names), from_table=jinja_call(from_table), @@ -443,31 +417,28 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column: else: print(f"WARN: Unknown type {definition['type']} for column {property_name} at {self.current_json_path()}") return column_name - return f"cast({column_name} as {sql_type}) as {column_name}" def generate_id_hashing_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: - template = Template( """ -- SQL model to build a hash column based on the values of this record select + *, {{ '{{' }} dbt_utils.surrogate_key([ {%- if parent_hash_id %} - {{ parent_hash_id }}, + '{{ parent_hash_id }}', {%- endif %} {%- for field in fields %} {{ field }}, {%- endfor %} - ]) {{ '}}' }} as {{ hash_id }}, - tmp.* -from {{ from_table }} tmp + ]) {{ '}}' }} as {{ hash_id }} +from {{ from_table }} {{ sql_table_comment }} """ ) - sql = template.render( - parent_hash_id=self.parent_hash_id(in_jinja=True), + parent_hash_id=self.parent_hash_id(), fields=self.safe_cast_to_strings(column_names), hash_id=self.hash_id(), from_table=jinja_call(from_table), @@ -476,68 +447,44 @@ def generate_id_hashing_model(self, from_table: str, column_names: Dict[str, Tup return sql def safe_cast_to_strings(self, column_names: Dict[str, Tuple[str, str]]) -> List[str]: - - return [ - StreamProcessor.safe_cast_to_string(self.properties[field], column_names[field][1], self.destination_type) - for field in column_names - ] + return [StreamProcessor.safe_cast_to_string(self.properties[field], column_names[field][1]) for field in column_names] @staticmethod - def safe_cast_to_string(definition: Dict, column_name: str, destination_type: DestinationType) -> str: + def safe_cast_to_string(definition: Dict, column_name: str) -> str: """ - Note that the result from this static method should always be used within a - jinja context (for example, from jinja macro surrogate_key call) - - The jinja_remove function is necessary because of Oracle database, some columns - are created with {{ quote('column_name') }} and reused the same fields for this - operation. Because the quote is injected inside a jinja macro we need to remove - the curly brackets. + Note that the result from this static method should always be used within a jinja context (for example, from jinja macro surrogate_key call) """ - if "type" not in definition: - col = column_name + return column_name elif is_boolean(definition["type"]): - col = f"boolean_to_string({column_name})" + return f"boolean_to_string({column_name})" elif is_array(definition["type"]): - col = f"array_to_string({column_name})" + return f"array_to_string({column_name})" else: - col = column_name - - if destination_type == DestinationType.ORACLE: - quote_in_parenthesis = re.compile(r"quote\((.*)\)") - return remove_jinja(col) if quote_in_parenthesis.findall(col) else col - - return col + return column_name def generate_dedup_record_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: template = Template( """ -- SQL model to prepare for deduplicating records based on the hash record column select + *, row_number() over ( partition by {{ hash_id }} - order by {{ col_emitted_at }} asc - ) as {{ active_row }}, - tmp.* -from {{ from_table }} tmp + order by _airbyte_emitted_at asc + ) as _airbyte_row_num +from {{ from_table }} {{ sql_table_comment }} """ ) sql = template.render( - active_row=self.process_col("_airbyte_row_num"), - col_emitted_at=self.get_emitted_at(), - hash_id=self.hash_id(), - from_table=jinja_call(from_table), - sql_table_comment=self.sql_table_comment(include_from_table=True), + hash_id=self.hash_id(), from_table=jinja_call(from_table), sql_table_comment=self.sql_table_comment(include_from_table=True) ) return sql - def process_col(self, col: str): - return self.name_transformer.normalize_column_name(col) - def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: - - scd_sql_template = """ + template = Template( + """ -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key select {%- if parent_hash_id %} @@ -546,42 +493,29 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup {%- for field in fields %} {{ field }}, {%- endfor %} - {{ cursor_field }} as {{ airbyte_start_at }}, - lag({{ cursor_field }}) over ( - partition by {{ primary_key }} - order by {{ cursor_field }} {{ order_null }}, {{ cursor_field }} desc, {{ col_emitted_at }} desc - ) as {{ airbyte_end_at }}, - case when lag({{ cursor_field }}) over ( - partition by {{ primary_key }} - order by {{ cursor_field }} {{ order_null }}, {{ cursor_field }} desc, {{ col_emitted_at }} desc{{ cdc_updated_at_order }} - ) is null {{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, - {{ col_emitted_at }}, - {{ hash_id }} + {{ cursor_field }} as _airbyte_start_at, + lag({{ cursor_field }}) over ( + partition by {{ primary_key }} + order by {{ cursor_field }} is null asc, {{ cursor_field }} desc, _airbyte_emitted_at desc + ) as _airbyte_end_at, + lag({{ cursor_field }}) over ( + partition by {{ primary_key }} + order by {{ cursor_field }} is null asc, {{ cursor_field }} desc, _airbyte_emitted_at desc{{ cdc_updated_at_order }} + ) is null {{ cdc_active_row }}as _airbyte_active_row, + _airbyte_emitted_at, + {{ hash_id }} from {{ from_table }} {{ sql_table_comment }} """ - - template = Template(scd_sql_template) - - order_null = "is null asc" - if self.destination_type == DestinationType.ORACLE: - order_null = "asc nulls first" + ) cdc_active_row_pattern = "" cdc_updated_order_pattern = "" if "_ab_cdc_deleted_at" in column_names.keys(): - col_cdc_deleted_at = self.name_transformer.normalize_column_name("_ab_cdc_deleted_at") - col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at") - cdc_active_row_pattern = f"and {col_cdc_deleted_at} is null " - cdc_updated_order_pattern = f", {col_cdc_updated_at} desc" + cdc_active_row_pattern = "and _ab_cdc_deleted_at is null " + cdc_updated_order_pattern = ", _ab_cdc_updated_at desc" sql = template.render( - order_null=order_null, - airbyte_start_at=self.name_transformer.normalize_column_name("_airbyte_start_at"), - airbyte_end_at=self.name_transformer.normalize_column_name("_airbyte_end_at"), - active_row=self.name_transformer.normalize_column_name("_airbyte_active_row"), - lag_emitted_at=self.get_emitted_at(in_jinja=True), - col_emitted_at=self.get_emitted_at(), parent_hash_id=self.parent_hash_id(), fields=self.list_fields(column_names), cursor_field=self.get_cursor_field(column_names), @@ -594,20 +528,18 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup ) return sql - def get_cursor_field(self, column_names: Dict[str, Tuple[str, str]], in_jinja: bool = False) -> str: + def get_cursor_field(self, column_names: Dict[str, Tuple[str, str]]) -> str: if not self.cursor_field: - cursor = self.name_transformer.normalize_column_name("_airbyte_emitted_at", in_jinja) + return "_airbyte_emitted_at" elif len(self.cursor_field) == 1: if not is_airbyte_column(self.cursor_field[0]): - cursor = column_names[self.cursor_field[0]][0] + return column_names[self.cursor_field[0]][0] else: # using an airbyte generated column - cursor = self.cursor_field[0] + return self.cursor_field[0] else: raise ValueError(f"Unsupported nested cursor field {'.'.join(self.cursor_field)} for stream {self.stream_name}") - return cursor - def get_primary_key(self, column_names: Dict[str, Tuple[str, str]]) -> str: if self.primary_key and len(self.primary_key) > 0: return ", ".join([self.get_primary_key_from_path(column_names, path) for path in self.primary_key]) @@ -647,14 +579,13 @@ def generate_final_model(self, from_table: str, column_names: Dict[str, Tuple[st {%- for field in fields %} {{ field }}, {%- endfor %} - {{ col_emitted_at }}, + _airbyte_emitted_at, {{ hash_id }} from {{ from_table }} {{ sql_table_comment }} """ ) sql = template.render( - col_emitted_at=self.get_emitted_at(), parent_hash_id=self.parent_hash_id(), fields=self.list_fields(column_names), hash_id=self.hash_id(), @@ -663,7 +594,8 @@ def generate_final_model(self, from_table: str, column_names: Dict[str, Tuple[st ) return sql - def list_fields(self, column_names: Dict[str, Tuple[str, str]]) -> List[str]: + @staticmethod + def list_fields(column_names: Dict[str, Tuple[str, str]]) -> List[str]: return [column_names[field][0] for field in column_names] def add_to_outputs(self, sql: str, is_intermediate: bool, column_count: int = 0, suffix: str = "") -> str: @@ -687,10 +619,7 @@ def add_to_outputs(self, sql: str, is_intermediate: bool, column_count: int = 0, if file_name != table_name: header = jinja_call(f'config(alias="{table_name}", schema="{schema}", tags=[{tags}])') else: - if self.destination_type == DestinationType.ORACLE: - header = jinja_call(f'config(schema="{self.default_schema}", tags=[{tags}])') - else: - header = jinja_call(f'config(schema="{schema}", tags=[{tags}])') + header = jinja_call(f'config(schema="{schema}", tags=[{tags}])') self.sql_outputs[ output ] = f""" @@ -736,20 +665,18 @@ def sql_table_comment(self, include_from_table: bool = False) -> str: result += f" from {from_table}" return result - def hash_id(self, in_jinja: bool = False) -> str: - hash_id_col = f"_airbyte_{self.normalized_stream_name()}_hashid" + def hash_id(self) -> str: if self.parent: if self.normalized_stream_name().lower() == self.parent.stream_name.lower(): level = len(self.json_path) - hash_id_col = f"_airbyte_{self.normalized_stream_name()}_{level}_hashid" - - return self.name_transformer.normalize_column_name(hash_id_col, in_jinja) + return self.name_transformer.normalize_column_name(f"_airbyte_{self.normalized_stream_name()}_{level}_hashid") + return self.name_transformer.normalize_column_name(f"_airbyte_{self.normalized_stream_name()}_hashid") # Nested Streams - def parent_hash_id(self, in_jinja: bool = False) -> str: + def parent_hash_id(self) -> str: if self.parent: - return self.parent.hash_id(in_jinja) + return self.parent.hash_id() return "" def unnesting_before_query(self) -> str: diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py index 368377effbbcf..1cb2ca4167b73 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py @@ -27,6 +27,7 @@ import json import os import pkgutil +import socket from enum import Enum from typing import Any, Dict @@ -39,7 +40,6 @@ class DestinationType(Enum): redshift = "redshift" snowflake = "snowflake" mysql = "mysql" - oracle = "oracle" class TransformConfig: @@ -47,12 +47,10 @@ def run(self, args): inputs = self.parse(args) original_config = self.read_json_config(inputs["config"]) integration_type = inputs["integration_type"] - - transformed_dbt_project = self.transform_dbt_project(integration_type) - self.write_yaml_config(inputs["output_path"], transformed_dbt_project, "dbt_project.yml") - transformed_config = self.transform(integration_type, original_config) - self.write_yaml_config(inputs["output_path"], transformed_config, "profiles.yml") + self.write_yaml_config(inputs["output_path"], transformed_config) + if self.is_ssh_tunnelling(original_config): + self.write_ssh_port(inputs["output_path"], self.pick_a_port()) @staticmethod def parse(args): @@ -72,18 +70,6 @@ def parse(args): "output_path": parsed_args.out, } - def transform_dbt_project(self, integration_type: DestinationType): - data = pkgutil.get_data(self.__class__.__module__.split(".")[0], "transform_config/dbt_project_base.yml") - if not data: - raise FileExistsError("Failed to load profile_base.yml") - base_project = yaml.load(data, Loader=yaml.FullLoader) - - if integration_type.value == DestinationType.oracle.value: - base_project["quoting"]["database"] = False - base_project["quoting"]["identifier"] = False - - return base_project - def transform(self, integration_type: DestinationType, config: Dict[str, Any]): data = pkgutil.get_data(self.__class__.__module__.split(".")[0], "transform_config/profile_base.yml") if not data: @@ -96,7 +82,6 @@ def transform(self, integration_type: DestinationType, config: Dict[str, Any]): DestinationType.redshift.value: self.transform_redshift, DestinationType.snowflake.value: self.transform_snowflake, DestinationType.mysql.value: self.transform_mysql, - DestinationType.oracle.value: self.transform_oracle, }[integration_type.value](config) # merge pre-populated base_profile with destination-specific configuration. @@ -104,6 +89,55 @@ def transform(self, integration_type: DestinationType, config: Dict[str, Any]): return base_profile + @staticmethod + def is_ssh_tunnelling(config: Dict[str, Any]) -> bool: + tunnel_methods = ["SSH_KEY_AUTH", "SSH_PASSWORD_AUTH"] + if ( + "tunnel_method" in config.keys() + and "tunnel_method" in config["tunnel_method"] + and config["tunnel_method"]["tunnel_method"].upper() in tunnel_methods + ): + return True + else: + return False + + @staticmethod + def is_port_free(port: int) -> bool: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("localhost", port)) + except Exception as e: + print(f"port {port} unsuitable: {e}") + return False + else: + print(f"port {port} is free") + return True + + @staticmethod + def pick_a_port() -> int: + """ + This function finds a free port, starting with 50001 and adding 1 until we find an open port. + """ + port_to_check = 50001 # just past start of dynamic port range (49152:65535) + while not TransformConfig.is_port_free(port_to_check): + port_to_check += 1 + # error if we somehow hit end of port range + if port_to_check > 65535: + raise RuntimeError("Couldn't find a free port to use.") + return port_to_check + + @staticmethod + def get_ssh_altered_config(config: Dict[str, Any], port_key: str = "port", host_key: str = "host") -> Dict[str, Any]: + """ + This should be called only if ssh tunneling is on. + It will return config with appropriately altered port and host values + """ + # make a copy of config rather than mutate in place + ssh_ready_config = {k: v for k, v in config.items()} + ssh_ready_config[port_key] = TransformConfig.pick_a_port() + ssh_ready_config[host_key] = "localhost" + return ssh_ready_config + @staticmethod def transform_bigquery(config: Dict[str, Any]): print("transform_bigquery") @@ -126,6 +160,10 @@ def transform_bigquery(config: Dict[str, Any]): @staticmethod def transform_postgres(config: Dict[str, Any]): print("transform_postgres") + + if TransformConfig.is_ssh_tunnelling(config): + config = TransformConfig.get_ssh_altered_config(config, port_key="port", host_key="host") + # https://docs.getdbt.com/reference/warehouse-profiles/postgres-profile dbt_config = { "type": "postgres", @@ -196,22 +234,6 @@ def transform_mysql(config: Dict[str, Any]): } return dbt_config - @staticmethod - def transform_oracle(config: Dict[str, Any]): - print("transform_oracle") - # https://github.com/techindicium/dbt-oracle#configure-your-profile - dbt_config = { - "type": "oracle", - "host": config["host"], - "user": config["username"], - "pass": config["password"], - "port": config["port"], - "dbname": config["sid"], - "schema": config["schema"], - "threads": 4, - } - return dbt_config - @staticmethod def read_json_config(input_path: str): with open(input_path, "r") as file: @@ -219,12 +241,25 @@ def read_json_config(input_path: str): return json.loads(contents) @staticmethod - def write_yaml_config(output_path: str, config: Dict[str, Any], filename: str): + def write_yaml_config(output_path: str, config: Dict[str, Any]): if not os.path.exists(output_path): os.makedirs(output_path) - with open(os.path.join(output_path, filename), "w") as fh: + with open(os.path.join(output_path, "profiles.yml"), "w") as fh: fh.write(yaml.dump(config)) + @staticmethod + def write_ssh_port(output_path: str, port: int): + """ + This function writes a small json file with content like {"port":xyz} + This is being used only when ssh tunneling. + We do this because we need to decide on and save this port number into our dbt config + and then use that same port in sshtunneling.sh when opening the tunnel. + """ + if not os.path.exists(output_path): + os.makedirs(output_path) + with open(os.path.join(output_path, "localsshport.json"), "w") as fh: + json.dump({"port": port}, fh) + def main(args=None): TransformConfig().run(args) diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py b/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py index 50968b1fcb138..3d478729afbaa 100644 --- a/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py +++ b/airbyte-integrations/bases/base-normalization/unit_tests/test_transform_config.py @@ -24,6 +24,8 @@ import os +import socket +import time import pytest from normalization.transform_catalog.transform import extract_schema @@ -47,6 +49,104 @@ def before_all_tests(self, request): yield os.chdir(request.config.invocation_dir) + def test_is_ssh_tunnelling(self): + def single_test(config, expected_output): + assert TransformConfig.is_ssh_tunnelling(config) == expected_output + + inputs = [ + ({}, False), + ( + { + "type": "postgres", + "dbname": "my_db", + "host": "airbyte.io", + "pass": "password123", + "port": 5432, + "schema": "public", + "threads": 32, + "user": "a user", + }, + False, + ), + ( + { + "type": "postgres", + "dbname": "my_db", + "host": "airbyte.io", + "pass": "password123", + "port": 5432, + "schema": "public", + "threads": 32, + "user": "a user", + "tunnel_method": { + "tunnel_host": "1.2.3.4", + "tunnel_method": "SSH_PASSWORD_AUTH", + "tunnel_port": 22, + "tunnel_user": "user", + "tunnel_user_password": "pass", + }, + }, + True, + ), + ( + { + "type": "postgres", + "dbname": "my_db", + "host": "airbyte.io", + "pass": "password123", + "port": 5432, + "schema": "public", + "threads": 32, + "user": "a user", + "tunnel_method": { + "tunnel_method": "SSH_KEY_AUTH", + }, + }, + True, + ), + ( + { + "type": "postgres", + "dbname": "my_db", + "host": "airbyte.io", + "pass": "password123", + "port": 5432, + "schema": "public", + "threads": 32, + "user": "a user", + "tunnel_method": { + "nothing": "nothing", + }, + }, + False, + ), + ] + for input_tuple in inputs: + single_test(input_tuple[0], input_tuple[1]) + + def test_is_port_free(self): + # to test that this accurately identifies 'free' ports, we'll find a 'free' port and then try to use it + test_port = 13055 + while not TransformConfig.is_port_free(test_port): + test_port += 1 + if test_port > 65535: + raise RuntimeError("couldn't find a free port...") + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("localhost", test_port)) + # if we haven't failed then we accurately identified a 'free' port. + # now we can test for accurate identification of 'in-use' port since we're using it + assert TransformConfig.is_port_free(test_port) is False + + # and just for good measure now that our context manager is closed (and port open again) + time.sleep(1) + assert TransformConfig.is_port_free(test_port) is True + + def test_pick_a_port(self): + supposedly_open_port = TransformConfig.pick_a_port() + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("localhost", supposedly_open_port)) + def test_transform_bigquery(self): input = {"project_id": "my_project_id", "dataset_id": "my_dataset_id", "credentials_json": '{ "type": "service_account-json" }'} @@ -108,6 +208,39 @@ def test_transform_postgres(self): assert expected == actual assert extract_schema(actual) == "public" + def test_transform_postgres_ssh(self): + input = { + "host": "airbyte.io", + "port": 5432, + "username": "a user", + "password": "password123", + "database": "my_db", + "schema": "public", + "tunnel_method": { + "tunnel_host": "1.2.3.4", + "tunnel_method": "SSH_PASSWORD_AUTH", + "tunnel_port": 22, + "tunnel_user": "user", + "tunnel_user_password": "pass", + }, + } + port = TransformConfig.pick_a_port() + + actual = TransformConfig().transform_postgres(input) + expected = { + "type": "postgres", + "dbname": "my_db", + "host": "localhost", + "pass": "password123", + "port": port, + "schema": "public", + "threads": 32, + "user": "a user", + } + + assert expected == actual + assert extract_schema(actual) == "public" + def test_transform_snowflake(self): input = { "host": "http://123abc.us-east-7.aws.snowflakecomputing.com", diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index 2979e5506ac38..3aef6cfc616d8 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -9,7 +9,7 @@ COPY setup.py ./ COPY pytest.ini ./ RUN pip install . -LABEL io.airbyte.version=0.1.18 +LABEL io.airbyte.version=0.1.17 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"] diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_utils.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_utils.py index ad361703c786d..ecfec41c5b404 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_utils.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_utils.py @@ -28,78 +28,74 @@ @pytest.fixture(name="not_sorted_data") def not_sorted_data_fixture(): - return [ - { - "date_created": "0001-01-01T00:00:00", - "date_updated": "0001-01-01T00:00:00", - "editable": False, - "id": "superuser", - "name": "Super User", - "organization_id": "orga_ya3w9oMjeLtWe7zFGZr63Dz8ruBbjybG0EIUdUXaESi", - "permissions": [ - "bulk_edit", - "delete_own_opportunities", - "export", - "manage_group_numbers", - "manage_email_sequences", - "delete_leads", - "call_coach_listen", - "call_coach_barge", - "manage_others_tasks", - "manage_others_activities", - "delete_own_tasks", - "manage_customizations", - "manage_team_smart_views", - "bulk_delete", - "manage_team_email_templates", - "bulk_email", - "merge_leads", - "calling", - "bulk_sequence_subscriptions", - "bulk_import", - "delete_own_activities", - "manage_others_opportunities", - ], - } - ] + return [{ + "date_created": "0001-01-01T00:00:00", + "date_updated": "0001-01-01T00:00:00", + "editable": False, + "id": "superuser", + "name": "Super User", + "organization_id": "orga_ya3w9oMjeLtWe7zFGZr63Dz8ruBbjybG0EIUdUXaESi", + "permissions": [ + "bulk_edit", + "delete_own_opportunities", + "export", + "manage_group_numbers", + "manage_email_sequences", + "delete_leads", + "call_coach_listen", + "call_coach_barge", + "manage_others_tasks", + "manage_others_activities", + "delete_own_tasks", + "manage_customizations", + "manage_team_smart_views", + "bulk_delete", + "manage_team_email_templates", + "bulk_email", + "merge_leads", + "calling", + "bulk_sequence_subscriptions", + "bulk_import", + "delete_own_activities", + "manage_others_opportunities" + ] + }] @pytest.fixture(name="sorted_data") def sorted_data_fixture(): - return [ - { - "date_created": "0001-01-01T00:00:00", - "date_updated": "0001-01-01T00:00:00", - "editable": False, - "id": "superuser", - "name": "Super User", - "organization_id": "orga_ya3w9oMjeLtWe7zFGZr63Dz8ruBbjybG0EIUdUXaESi", - "permissions": [ - "bulk_delete", - "bulk_edit", - "bulk_email", - "bulk_import", - "bulk_sequence_subscriptions", - "call_coach_barge", - "call_coach_listen", - "calling", - "delete_leads", - "delete_own_activities", - "delete_own_opportunities", - "delete_own_tasks", - "export", - "manage_customizations", - "manage_email_sequences", - "manage_group_numbers", - "manage_others_activities", - "manage_others_opportunities", - "manage_others_tasks", - "manage_team_email_templates", - "manage_team_smart_views", - "merge_leads", - ], - } - ] + return [{ + "date_created": "0001-01-01T00:00:00", + "date_updated": "0001-01-01T00:00:00", + "editable": False, + "id": "superuser", + "name": "Super User", + "organization_id": "orga_ya3w9oMjeLtWe7zFGZr63Dz8ruBbjybG0EIUdUXaESi", + "permissions": [ + "bulk_delete", + "bulk_edit", + "bulk_email", + "bulk_import", + "bulk_sequence_subscriptions", + "call_coach_barge", + "call_coach_listen", + "calling", + "delete_leads", + "delete_own_activities", + "delete_own_opportunities", + "delete_own_tasks", + "export", + "manage_customizations", + "manage_email_sequences", + "manage_group_numbers", + "manage_others_activities", + "manage_others_opportunities", + "manage_others_tasks", + "manage_team_email_templates", + "manage_team_smart_views", + "merge_leads" + ] + }] def test_compare_two_records(not_sorted_data, sorted_data): diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshKeyPostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshKeyPostgresDestinationAcceptanceTest.java index 1b8105a6f9ef1..0c5fa25ba0462 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshKeyPostgresDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshKeyPostgresDestinationAcceptanceTest.java @@ -30,7 +30,7 @@ public class SshKeyPostgresDestinationAcceptanceTest extends SshPostgresDestinat @Override public Path getConfigFilePath() { - return Path.of("ssh-key-config.json"); + return Path.of("secrets/ssh-key-config.json"); } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPasswordPostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPasswordPostgresDestinationAcceptanceTest.java index f4cdcb017ed29..0bc064de2ef87 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPasswordPostgresDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPasswordPostgresDestinationAcceptanceTest.java @@ -30,7 +30,7 @@ public class SshPasswordPostgresDestinationAcceptanceTest extends SshPostgresDes @Override public Path getConfigFilePath() { - return Path.of("ssh-pwd-config.json"); + return Path.of("secrets/ssh-pwd-config.json"); } } diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java index e192125c2d4ec..082632901b815 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/SshPostgresDestinationAcceptanceTest.java @@ -73,7 +73,7 @@ protected JsonNode getConfig() { } private JsonNode getConfigFromSecretsFile() { - return Jsons.deserialize(IOs.readFile(Path.of("secrets/ssh-pwd-config.json"))); + return Jsons.deserialize(IOs.readFile(getConfigFilePath())); } @Override diff --git a/airbyte-webapp/src/packages/cloud/views/auth/SignupPage/SignupPage.tsx b/airbyte-webapp/src/packages/cloud/views/auth/SignupPage/SignupPage.tsx index 6e9d55313786d..0a48aebe13fdf 100644 --- a/airbyte-webapp/src/packages/cloud/views/auth/SignupPage/SignupPage.tsx +++ b/airbyte-webapp/src/packages/cloud/views/auth/SignupPage/SignupPage.tsx @@ -17,6 +17,7 @@ import { FormTitle } from "../components/FormTitle"; import CheckBoxControl from "../components/CheckBoxControl"; import { useAuthService } from "packages/cloud/services/auth/AuthService"; import { FieldError } from "packages/cloud/lib/errors/FieldError"; +import config from "config"; const MarginBlock = styled.div` margin-bottom: 15px; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java index b67b1d7d12d40..248d37900e10b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DbtTransformationRunner.java @@ -82,7 +82,10 @@ public boolean run(String jobId, int attempt, Path jobRoot, JsonNode config, Res public boolean transform(String jobId, int attempt, Path jobRoot, JsonNode config, ResourceRequirements resourceRequirements, OperatorDbt dbtConfig) throws Exception { try { - final Map files = ImmutableMap.of(DBT_ENTRYPOINT_SH, MoreResources.readResource("dbt_transformation_entrypoint.sh")); + final Map files = ImmutableMap.of( + DBT_ENTRYPOINT_SH, MoreResources.readResource("dbt_transformation_entrypoint.sh"), + "sshtunneling.sh", MoreResources.readResource("sshtunneling.sh") + ); final List dbtArguments = new ArrayList<>(); dbtArguments.add(DBT_ENTRYPOINT_SH); if (Strings.isNullOrEmpty(dbtConfig.getDbtArguments())) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index 92799f79021e5..9ec53b4fc2838 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -68,12 +68,12 @@ public DefaultNormalizationRunner(final DestinationType destinationType, final P } @Override - public boolean configureDbt(String jobId, - int attempt, - Path jobRoot, - JsonNode config, - ResourceRequirements resourceRequirements, - OperatorDbt dbtConfig) + public boolean configureDbt(final String jobId, + final int attempt, + final Path jobRoot, + final JsonNode config, + final ResourceRequirements resourceRequirements, + final OperatorDbt dbtConfig) throws Exception { final Map files = ImmutableMap.of( WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config)); @@ -97,12 +97,12 @@ public boolean configureDbt(String jobId, } @Override - public boolean normalize(String jobId, - int attempt, - Path jobRoot, - JsonNode config, - ConfiguredAirbyteCatalog catalog, - ResourceRequirements resourceRequirements) + public boolean normalize(final String jobId, + final int attempt, + final Path jobRoot, + final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final ResourceRequirements resourceRequirements) throws Exception { final Map files = ImmutableMap.of( WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config), @@ -114,11 +114,11 @@ public boolean normalize(String jobId, "--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME); } - private boolean runProcess(String jobId, - int attempt, - Path jobRoot, - Map files, - ResourceRequirements resourceRequirements, + private boolean runProcess(final String jobId, + final int attempt, + final Path jobRoot, + final Map files, + final ResourceRequirements resourceRequirements, final String... args) throws Exception { try { @@ -130,7 +130,7 @@ private boolean runProcess(String jobId, WorkerUtils.wait(process); return process.exitValue() == 0; - } catch (Exception e) { + } catch (final Exception e) { // make sure we kill the process on failure to avoid zombies. if (process != null) { WorkerUtils.cancelProcess(process); diff --git a/airbyte-workers/src/main/resources/dbt_transformation_entrypoint.sh b/airbyte-workers/src/main/resources/dbt_transformation_entrypoint.sh index 4d95ea1ece54f..9de36b5b72931 100644 --- a/airbyte-workers/src/main/resources/dbt_transformation_entrypoint.sh +++ b/airbyte-workers/src/main/resources/dbt_transformation_entrypoint.sh @@ -34,6 +34,10 @@ if [[ -f "${CWD}/bq_keyfile.json" ]]; then cp "${CWD}/bq_keyfile.json" /tmp/bq_keyfile.json fi +. $CWD/sshtunneling.sh +openssh $CWD/destination_config.json $CWD/localsshport.json +trap 'closessh' EXIT + # Add mandatory flags profiles-dir and project-dir when calling dbt when necessary case "${CONTAINS_PROFILES_DIR}-${CONTAINS_PROJECT_DIR}" in true-true) @@ -53,3 +57,5 @@ case "${CONTAINS_PROFILES_DIR}-${CONTAINS_PROJECT_DIR}" in dbt $@ "--profiles-dir=${CWD}" "--project-dir=${CWD}/git_repo" --profile normalize ;; esac + +closessh diff --git a/airbyte-workers/src/main/resources/sshtunneling.sh b/airbyte-workers/src/main/resources/sshtunneling.sh new file mode 100644 index 0000000000000..41bfdff0a3665 --- /dev/null +++ b/airbyte-workers/src/main/resources/sshtunneling.sh @@ -0,0 +1,64 @@ +# This function opens an ssh tunnel if required using values provided in config. +# Requires two arguments, + # path to config file ($1) + # path to file containing local port to use ($2) +function openssh() { + # check if jq is missing, and if so try to install it.. + # this is janky but for custom dbt transform we can't be sure jq is installed as using user docker image + if ! command -v jq &> /dev/null ; then + echo "CRITICAL: jq not installed... attempting to install on the fly but will fail if unable." + { apt-get update && apt-get install -y jq; } || + apk --update add jq || + { yum install epel-release -y && yum install jq -y; } || + { dnf install epel-release -y && dnf install jq -y; } || exit 1 + fi + # tunnel_db_host and tunnel_db_port currently rely on the destination's spec using "host" and "port" as keys for these values + # if adding ssh support for a new destination where this is not the case, extra logic will be needed to capture these dynamically + tunnel_db_host=$(cat $1 | jq -r '.host') + tunnel_db_port=$(cat $1 | jq -r '.port') + tunnel_method=$(cat $1 | jq -r '.tunnel_method.tunnel_method' | tr '[:lower:]' '[:upper:]') + tunnel_username=$(cat $1 | jq -r '.tunnel_method.tunnel_user') + tunnel_host=$(cat $1 | jq -r '.tunnel_method.tunnel_host') + tunnel_local_port=$(cat $2 | jq -r '.port') + # set a path for a control socket, allowing us to close this specific ssh connection when desired + tmpcontrolsocket="/tmp/sshsocket${tunnel_db_remote_port}-${RANDOM}" + if [[ ${tunnel_method} = "SSH_KEY_AUTH" ]] ; then + echo "Detected tunnel method SSH_KEY_AUTH for normalization" + # create a temporary file to hold ssh key and trap to delete on EXIT + trap 'rm -f "$tmpkeyfile"' EXIT + tmpkeyfile=$(mktemp /tmp/xyzfile.XXXXXXXXXXX) || exit 1 + echo "$(cat $1 | jq -r '.tunnel_method.ssh_key')" > $tmpkeyfile + # -f=background -N=no remote command -M=master mode StrictHostKeyChecking=no auto-adds host + echo "Running: ssh -f -N -M -o StrictHostKeyChecking=no -S {control socket} -i {key file} -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host}" + ssh -f -N -M -o StrictHostKeyChecking=no -S $tmpcontrolsocket -i $tmpkeyfile -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host} && + sshopen="yes" && + echo "ssh tunnel opened" + rm -f $tmpkeyfile + elif [[ ${tunnel_method} = "SSH_PASSWORD_AUTH" ]] ; then + echo "Detected tunnel method SSH_PASSWORD_AUTH for normalization" + if ! command -v sshpass &> /dev/null ; then + echo "CRITICAL: sshpass not installed... attempting to install on the fly but will fail if unable." + { apt-get update && apt-get install -y sshpass; } || + { apk add --update openssh && apk --update add sshpass; } || + { yum install epel-release -y && yum install sshpass -y; } || + { dnf install epel-release -y && dnf install sshpass -y; } || exit 1 + fi + # put ssh password in env var for use in sshpass. Better than directly passing with -p + export SSHPASS=$(cat $1 | jq -r '.tunnel_method.tunnel_user_password') + echo "Running: sshpass -e ssh -f -N -M -o StrictHostKeyChecking=no -S {control socket} -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host}" + sshpass -e ssh -f -N -M -o StrictHostKeyChecking=no -S $tmpcontrolsocket -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host} && + sshopen="yes" && + echo "ssh tunnel opened" + fi +} + +# This function checks if $sshopen variable has been set and if so, closes the ssh open via $tmpcontrolsocket +# This only works after calling openssh() +function closessh() { + # $sshopen $tmpcontrolsocket comes from openssh() function + if [ ! -z "$sshopen" ] ; then + ssh -S $tmpcontrolsocket -O exit ${tunnel_host} && + echo "closed ssh tunnel" + trap 'rm -f "$tmpcontrolsocket"' EXIT + fi +} From 55524e4db03cf51c45268250b9e725851ef56584 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 7 Sep 2021 14:44:27 -0700 Subject: [PATCH 2/7] fix build file --- .../bases/base-normalization/build.gradle | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/build.gradle b/airbyte-integrations/bases/base-normalization/build.gradle index c90d16893b5b3..9df5d98b497ba 100644 --- a/airbyte-integrations/bases/base-normalization/build.gradle +++ b/airbyte-integrations/bases/base-normalization/build.gradle @@ -17,10 +17,18 @@ task copySshScript(type: Copy, dependsOn: [project(':airbyte-workers').processRe from "${project(':airbyte-workers').buildDir}/resources/main" into "${buildDir}" include "sshtunneling.sh" - if(inputs.sourceFiles.empty) throw new StopExecutionException("Couldn't find sshtunneling.sh to copy") } -test.dependsOn copySshScript -assemble.dependsOn copySshScript + +// make sure the copy task above worked (if it fails, it fails silently annoyingly) +task checkSshScriptCopy(type: Task, dependsOn: copySshScript) { + doFirst { + assert file("${buildDir}/sshtunneling.sh").exists() : + "Copy of sshtunneling.sh failed, check that it is present in airbyte-workers." + } +} + +test.dependsOn checkSshScriptCopy +assemble.dependsOn checkSshScriptCopy installReqs.dependsOn(":airbyte-integrations:bases:airbyte-protocol:installReqs") integrationTest.dependsOn(build) From 2c0bf673cb2b4bbf63fada32ff435c22f49683a2 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 7 Sep 2021 14:45:53 -0700 Subject: [PATCH 3/7] process threm --- .../transform_catalog/stream_processor.py | 181 ++++++++++++------ 1 file changed, 127 insertions(+), 54 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py index b915377ef9494..61b222b12a954 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/stream_processor.py @@ -24,6 +24,7 @@ import os +import re from typing import Dict, List, Optional, Tuple from airbyte_protocol.models.airbyte_protocol import DestinationSyncMode, SyncMode @@ -44,6 +45,7 @@ is_string, is_timestamp_with_time_zone, jinja_call, + remove_jinja, ) # using too many columns breaks ephemeral materialization (somewhere between 480 and 490 columns) @@ -77,6 +79,7 @@ def __init__( stream_name: str, destination_type: DestinationType, raw_schema: str, + default_schema: str, schema: str, source_sync_mode: SyncMode, destination_sync_mode: DestinationSyncMode, @@ -109,6 +112,8 @@ def __init__( self.sql_outputs: Dict[str, str] = {} self.parent: Optional["StreamProcessor"] = None self.is_nested_array: bool = False + self.default_schema: str = default_schema + self.airbyte_emitted_at = "_airbyte_emitted_at" @staticmethod def create_from_parent( @@ -130,6 +135,7 @@ def create_from_parent( stream_name=child_name, destination_type=parent.destination_type, raw_schema=parent.raw_schema, + default_schema=parent.default_schema, schema=parent.schema, # Nested Streams don't inherit parents sync modes? source_sync_mode=SyncMode.full_refresh, @@ -151,6 +157,7 @@ def create( stream_name: str, destination_type: DestinationType, raw_schema: str, + default_schema: str, schema: str, source_sync_mode: SyncMode, destination_sync_mode: DestinationSyncMode, @@ -183,6 +190,7 @@ def create( stream_name, destination_type, raw_schema, + default_schema, schema, source_sync_mode, destination_sync_mode, @@ -228,14 +236,21 @@ def process(self) -> List["StreamProcessor"]: ) if self.destination_sync_mode.value == DestinationSyncMode.append_dedup.value: from_table = self.add_to_outputs(self.generate_dedup_record_model(from_table, column_names), is_intermediate=True, suffix="ab4") - where_clause = "\nwhere _airbyte_row_num = 1" + if self.destination_type == DestinationType.ORACLE: + where_clause = '\nwhere "_AIRBYTE_ROW_NUM" = 1' + else: + where_clause = "\nwhere _airbyte_row_num = 1" from_table = self.add_to_outputs( self.generate_scd_type_2_model(from_table, column_names) + where_clause, is_intermediate=False, column_count=column_count, suffix="scd", ) - where_clause = "\nwhere _airbyte_active_row = True" + if self.destination_type == DestinationType.ORACLE: + where_clause = '\nwhere "_AIRBYTE_ACTIVE_ROW" = 1' + else: + where_clause = "\nwhere _airbyte_active_row = 1" + from_table = self.add_to_outputs( self.generate_final_model(from_table, column_names) + where_clause, is_intermediate=False, column_count=column_count ) @@ -312,6 +327,10 @@ def find_children_streams(self, from_table: str, column_names: Dict[str, Tuple[s return children def generate_json_parsing_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: + if self.destination_type == DestinationType.ORACLE: + table_alias = "" + else: + table_alias = "as table_alias" template = Template( """ -- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema @@ -323,13 +342,15 @@ def generate_json_parsing_model(self, from_table: str, column_names: Dict[str, T {%- for field in fields %} {{ field }}, {%- endfor %} - _airbyte_emitted_at -from {{ from_table }} as table_alias + {{ col_emitted_at }} +from {{ from_table }} {{ table_alias }} {{ unnesting_after_query }} {{ sql_table_comment }} """ ) sql = template.render( + col_emitted_at=self.get_emitted_at(), + table_alias=table_alias, unnesting_before_query=self.unnesting_before_query(), parent_hash_id=self.parent_hash_id(), fields=self.extract_json_columns(column_names), @@ -339,14 +360,16 @@ def generate_json_parsing_model(self, from_table: str, column_names: Dict[str, T ) return sql + def get_emitted_at(self, in_jinja: bool = False): + return self.name_transformer.normalize_column_name(self.airbyte_emitted_at, in_jinja, False) + def extract_json_columns(self, column_names: Dict[str, Tuple[str, str]]) -> List[str]: return [ - StreamProcessor.extract_json_column(field, self.json_column_name, self.properties[field], column_names[field][0], "table_alias") + self.extract_json_column(field, self.json_column_name, self.properties[field], column_names[field][0], "table_alias") for field in column_names ] - @staticmethod - def extract_json_column(property_name: str, json_column_name: str, definition: Dict, column_name: str, table_alias: str) -> str: + def extract_json_column(self, property_name: str, json_column_name: str, definition: Dict, column_name: str, table_alias: str) -> str: json_path = [property_name] # In some cases, some destination aren't able to parse the JSON blob using the original property name # we make their life easier by using a pre-populated and sanitized column name instead... @@ -354,6 +377,7 @@ def extract_json_column(property_name: str, json_column_name: str, definition: D table_alias = f"{table_alias}" if "unnested_column_value" in json_column_name: table_alias = "" + json_extract = jinja_call(f"json_extract('{table_alias}', {json_column_name}, {json_path})") if "type" in definition: if is_array(definition["type"]): @@ -362,6 +386,7 @@ def extract_json_column(property_name: str, json_column_name: str, definition: D json_extract = jinja_call(f"json_extract('{table_alias}', {json_column_name}, {json_path}, {normalized_json_path})") elif is_simple_property(definition["type"]): json_extract = jinja_call(f"json_extract_scalar({json_column_name}, {json_path}, {normalized_json_path})") + return f"{json_extract} as {column_name}" def generate_column_typing_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: @@ -375,12 +400,13 @@ def generate_column_typing_model(self, from_table: str, column_names: Dict[str, {%- for field in fields %} {{ field }}, {%- endfor %} - _airbyte_emitted_at + {{ col_emitted_at }} from {{ from_table }} {{ sql_table_comment }} """ ) sql = template.render( + col_emitted_at=self.get_emitted_at(), parent_hash_id=self.parent_hash_id(), fields=self.cast_property_types(column_names), from_table=jinja_call(from_table), @@ -417,28 +443,31 @@ def cast_property_type(self, property_name: str, column_name: str, jinja_column: else: print(f"WARN: Unknown type {definition['type']} for column {property_name} at {self.current_json_path()}") return column_name + return f"cast({column_name} as {sql_type}) as {column_name}" def generate_id_hashing_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: + template = Template( """ -- SQL model to build a hash column based on the values of this record select - *, {{ '{{' }} dbt_utils.surrogate_key([ {%- if parent_hash_id %} - '{{ parent_hash_id }}', + {{ parent_hash_id }}, {%- endif %} {%- for field in fields %} {{ field }}, {%- endfor %} - ]) {{ '}}' }} as {{ hash_id }} -from {{ from_table }} + ]) {{ '}}' }} as {{ hash_id }}, + tmp.* +from {{ from_table }} tmp {{ sql_table_comment }} """ ) + sql = template.render( - parent_hash_id=self.parent_hash_id(), + parent_hash_id=self.parent_hash_id(in_jinja=True), fields=self.safe_cast_to_strings(column_names), hash_id=self.hash_id(), from_table=jinja_call(from_table), @@ -447,44 +476,68 @@ def generate_id_hashing_model(self, from_table: str, column_names: Dict[str, Tup return sql def safe_cast_to_strings(self, column_names: Dict[str, Tuple[str, str]]) -> List[str]: - return [StreamProcessor.safe_cast_to_string(self.properties[field], column_names[field][1]) for field in column_names] + + return [ + StreamProcessor.safe_cast_to_string(self.properties[field], column_names[field][1], self.destination_type) + for field in column_names + ] @staticmethod - def safe_cast_to_string(definition: Dict, column_name: str) -> str: + def safe_cast_to_string(definition: Dict, column_name: str, destination_type: DestinationType) -> str: """ - Note that the result from this static method should always be used within a jinja context (for example, from jinja macro surrogate_key call) + Note that the result from this static method should always be used within a + jinja context (for example, from jinja macro surrogate_key call) + + The jinja_remove function is necessary because of Oracle database, some columns + are created with {{ quote('column_name') }} and reused the same fields for this + operation. Because the quote is injected inside a jinja macro we need to remove + the curly brackets. """ + if "type" not in definition: - return column_name + col = column_name elif is_boolean(definition["type"]): - return f"boolean_to_string({column_name})" + col = f"boolean_to_string({column_name})" elif is_array(definition["type"]): - return f"array_to_string({column_name})" + col = f"array_to_string({column_name})" else: - return column_name + col = column_name + + if destination_type == DestinationType.ORACLE: + quote_in_parenthesis = re.compile(r"quote\((.*)\)") + return remove_jinja(col) if quote_in_parenthesis.findall(col) else col + + return col def generate_dedup_record_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: template = Template( """ -- SQL model to prepare for deduplicating records based on the hash record column select - *, row_number() over ( partition by {{ hash_id }} - order by _airbyte_emitted_at asc - ) as _airbyte_row_num -from {{ from_table }} + order by {{ col_emitted_at }} asc + ) as {{ active_row }}, + tmp.* +from {{ from_table }} tmp {{ sql_table_comment }} """ ) sql = template.render( - hash_id=self.hash_id(), from_table=jinja_call(from_table), sql_table_comment=self.sql_table_comment(include_from_table=True) + active_row=self.process_col("_airbyte_row_num"), + col_emitted_at=self.get_emitted_at(), + hash_id=self.hash_id(), + from_table=jinja_call(from_table), + sql_table_comment=self.sql_table_comment(include_from_table=True), ) return sql + def process_col(self, col: str): + return self.name_transformer.normalize_column_name(col) + def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tuple[str, str]]) -> str: - template = Template( - """ + + scd_sql_template = """ -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key select {%- if parent_hash_id %} @@ -493,29 +546,42 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup {%- for field in fields %} {{ field }}, {%- endfor %} - {{ cursor_field }} as _airbyte_start_at, - lag({{ cursor_field }}) over ( - partition by {{ primary_key }} - order by {{ cursor_field }} is null asc, {{ cursor_field }} desc, _airbyte_emitted_at desc - ) as _airbyte_end_at, - lag({{ cursor_field }}) over ( - partition by {{ primary_key }} - order by {{ cursor_field }} is null asc, {{ cursor_field }} desc, _airbyte_emitted_at desc{{ cdc_updated_at_order }} - ) is null {{ cdc_active_row }}as _airbyte_active_row, - _airbyte_emitted_at, - {{ hash_id }} + {{ cursor_field }} as {{ airbyte_start_at }}, + lag({{ cursor_field }}) over ( + partition by {{ primary_key }} + order by {{ cursor_field }} {{ order_null }}, {{ cursor_field }} desc, {{ col_emitted_at }} desc + ) as {{ airbyte_end_at }}, + case when lag({{ cursor_field }}) over ( + partition by {{ primary_key }} + order by {{ cursor_field }} {{ order_null }}, {{ cursor_field }} desc, {{ col_emitted_at }} desc{{ cdc_updated_at_order }} + ) is null {{ cdc_active_row }} then 1 else 0 end as {{ active_row }}, + {{ col_emitted_at }}, + {{ hash_id }} from {{ from_table }} {{ sql_table_comment }} """ - ) + + template = Template(scd_sql_template) + + order_null = "is null asc" + if self.destination_type == DestinationType.ORACLE: + order_null = "asc nulls first" cdc_active_row_pattern = "" cdc_updated_order_pattern = "" if "_ab_cdc_deleted_at" in column_names.keys(): - cdc_active_row_pattern = "and _ab_cdc_deleted_at is null " - cdc_updated_order_pattern = ", _ab_cdc_updated_at desc" + col_cdc_deleted_at = self.name_transformer.normalize_column_name("_ab_cdc_deleted_at") + col_cdc_updated_at = self.name_transformer.normalize_column_name("_ab_cdc_updated_at") + cdc_active_row_pattern = f"and {col_cdc_deleted_at} is null " + cdc_updated_order_pattern = f", {col_cdc_updated_at} desc" sql = template.render( + order_null=order_null, + airbyte_start_at=self.name_transformer.normalize_column_name("_airbyte_start_at"), + airbyte_end_at=self.name_transformer.normalize_column_name("_airbyte_end_at"), + active_row=self.name_transformer.normalize_column_name("_airbyte_active_row"), + lag_emitted_at=self.get_emitted_at(in_jinja=True), + col_emitted_at=self.get_emitted_at(), parent_hash_id=self.parent_hash_id(), fields=self.list_fields(column_names), cursor_field=self.get_cursor_field(column_names), @@ -528,18 +594,20 @@ def generate_scd_type_2_model(self, from_table: str, column_names: Dict[str, Tup ) return sql - def get_cursor_field(self, column_names: Dict[str, Tuple[str, str]]) -> str: + def get_cursor_field(self, column_names: Dict[str, Tuple[str, str]], in_jinja: bool = False) -> str: if not self.cursor_field: - return "_airbyte_emitted_at" + cursor = self.name_transformer.normalize_column_name("_airbyte_emitted_at", in_jinja) elif len(self.cursor_field) == 1: if not is_airbyte_column(self.cursor_field[0]): - return column_names[self.cursor_field[0]][0] + cursor = column_names[self.cursor_field[0]][0] else: # using an airbyte generated column - return self.cursor_field[0] + cursor = self.cursor_field[0] else: raise ValueError(f"Unsupported nested cursor field {'.'.join(self.cursor_field)} for stream {self.stream_name}") + return cursor + def get_primary_key(self, column_names: Dict[str, Tuple[str, str]]) -> str: if self.primary_key and len(self.primary_key) > 0: return ", ".join([self.get_primary_key_from_path(column_names, path) for path in self.primary_key]) @@ -579,13 +647,14 @@ def generate_final_model(self, from_table: str, column_names: Dict[str, Tuple[st {%- for field in fields %} {{ field }}, {%- endfor %} - _airbyte_emitted_at, + {{ col_emitted_at }}, {{ hash_id }} from {{ from_table }} {{ sql_table_comment }} """ ) sql = template.render( + col_emitted_at=self.get_emitted_at(), parent_hash_id=self.parent_hash_id(), fields=self.list_fields(column_names), hash_id=self.hash_id(), @@ -594,8 +663,7 @@ def generate_final_model(self, from_table: str, column_names: Dict[str, Tuple[st ) return sql - @staticmethod - def list_fields(column_names: Dict[str, Tuple[str, str]]) -> List[str]: + def list_fields(self, column_names: Dict[str, Tuple[str, str]]) -> List[str]: return [column_names[field][0] for field in column_names] def add_to_outputs(self, sql: str, is_intermediate: bool, column_count: int = 0, suffix: str = "") -> str: @@ -619,7 +687,10 @@ def add_to_outputs(self, sql: str, is_intermediate: bool, column_count: int = 0, if file_name != table_name: header = jinja_call(f'config(alias="{table_name}", schema="{schema}", tags=[{tags}])') else: - header = jinja_call(f'config(schema="{schema}", tags=[{tags}])') + if self.destination_type == DestinationType.ORACLE: + header = jinja_call(f'config(schema="{self.default_schema}", tags=[{tags}])') + else: + header = jinja_call(f'config(schema="{schema}", tags=[{tags}])') self.sql_outputs[ output ] = f""" @@ -665,18 +736,20 @@ def sql_table_comment(self, include_from_table: bool = False) -> str: result += f" from {from_table}" return result - def hash_id(self) -> str: + def hash_id(self, in_jinja: bool = False) -> str: + hash_id_col = f"_airbyte_{self.normalized_stream_name()}_hashid" if self.parent: if self.normalized_stream_name().lower() == self.parent.stream_name.lower(): level = len(self.json_path) - return self.name_transformer.normalize_column_name(f"_airbyte_{self.normalized_stream_name()}_{level}_hashid") - return self.name_transformer.normalize_column_name(f"_airbyte_{self.normalized_stream_name()}_hashid") + hash_id_col = f"_airbyte_{self.normalized_stream_name()}_{level}_hashid" + + return self.name_transformer.normalize_column_name(hash_id_col, in_jinja) # Nested Streams - def parent_hash_id(self) -> str: + def parent_hash_id(self, in_jinja: bool = False) -> str: if self.parent: - return self.parent.hash_id() + return self.parent.hash_id(in_jinja) return "" def unnesting_before_query(self) -> str: From 63c708d2b01225821f7f700b6ff7365cc9d30f08 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 7 Sep 2021 14:51:49 -0700 Subject: [PATCH 4/7] fix transform.py --- .../transform_config/transform.py | 39 +++++++++++++++++-- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py index 1cb2ca4167b73..af9bfc9e527d1 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py @@ -40,6 +40,7 @@ class DestinationType(Enum): redshift = "redshift" snowflake = "snowflake" mysql = "mysql" + oracle = "oracle" class TransformConfig: @@ -47,8 +48,12 @@ def run(self, args): inputs = self.parse(args) original_config = self.read_json_config(inputs["config"]) integration_type = inputs["integration_type"] + + transformed_dbt_project = self.transform_dbt_project(integration_type) + self.write_yaml_config(inputs["output_path"], transformed_dbt_project, "dbt_project.yml") + transformed_config = self.transform(integration_type, original_config) - self.write_yaml_config(inputs["output_path"], transformed_config) + self.write_yaml_config(inputs["output_path"], transformed_config, "profiles.yml") if self.is_ssh_tunnelling(original_config): self.write_ssh_port(inputs["output_path"], self.pick_a_port()) @@ -70,6 +75,18 @@ def parse(args): "output_path": parsed_args.out, } + def transform_dbt_project(self, integration_type: DestinationType): + data = pkgutil.get_data(self.__class__.__module__.split(".")[0], "transform_config/dbt_project_base.yml") + if not data: + raise FileExistsError("Failed to load profile_base.yml") + base_project = yaml.load(data, Loader=yaml.FullLoader) + + if integration_type.value == DestinationType.oracle.value: + base_project["quoting"]["database"] = False + base_project["quoting"]["identifier"] = False + + return base_project + def transform(self, integration_type: DestinationType, config: Dict[str, Any]): data = pkgutil.get_data(self.__class__.__module__.split(".")[0], "transform_config/profile_base.yml") if not data: @@ -234,6 +251,22 @@ def transform_mysql(config: Dict[str, Any]): } return dbt_config + @staticmethod + def transform_oracle(config: Dict[str, Any]): + print("transform_oracle") + # https://github.com/techindicium/dbt-oracle#configure-your-profile + dbt_config = { + "type": "oracle", + "host": config["host"], + "user": config["username"], + "pass": config["password"], + "port": config["port"], + "dbname": config["sid"], + "schema": config["schema"], + "threads": 4, + } + return dbt_config + @staticmethod def read_json_config(input_path: str): with open(input_path, "r") as file: @@ -241,10 +274,10 @@ def read_json_config(input_path: str): return json.loads(contents) @staticmethod - def write_yaml_config(output_path: str, config: Dict[str, Any]): + def write_yaml_config(output_path: str, config: Dict[str, Any], filename: str): if not os.path.exists(output_path): os.makedirs(output_path) - with open(os.path.join(output_path, "profiles.yml"), "w") as fh: + with open(os.path.join(output_path, filename), "w") as fh: fh.write(yaml.dump(config)) @staticmethod From 3775ca684710e065bbbca5e555bd57586f688940 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 7 Sep 2021 14:52:27 -0700 Subject: [PATCH 5/7] fix transform.py 2 --- .../normalization/transform_config/transform.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py index af9bfc9e527d1..d325079a3ba5d 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py @@ -99,6 +99,7 @@ def transform(self, integration_type: DestinationType, config: Dict[str, Any]): DestinationType.redshift.value: self.transform_redshift, DestinationType.snowflake.value: self.transform_snowflake, DestinationType.mysql.value: self.transform_mysql, + DestinationType.oracle.value: self.transform_oracle, }[integration_type.value](config) # merge pre-populated base_profile with destination-specific configuration. From fa2d5ed5fccdc6a3a2c33e9d8db58e9b7cd96e02 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 7 Sep 2021 14:53:10 -0700 Subject: [PATCH 6/7] fix version --- airbyte-integrations/bases/source-acceptance-test/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index 3aef6cfc616d8..2979e5506ac38 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -9,7 +9,7 @@ COPY setup.py ./ COPY pytest.ini ./ RUN pip install . -LABEL io.airbyte.version=0.1.17 +LABEL io.airbyte.version=0.1.18 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"] From 7bb977afeedf09d0bf77887a6aa1b9c99e2a6279 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 7 Sep 2021 14:54:11 -0700 Subject: [PATCH 7/7] webapp --- .../src/packages/cloud/views/auth/SignupPage/SignupPage.tsx | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-webapp/src/packages/cloud/views/auth/SignupPage/SignupPage.tsx b/airbyte-webapp/src/packages/cloud/views/auth/SignupPage/SignupPage.tsx index 0a48aebe13fdf..6e9d55313786d 100644 --- a/airbyte-webapp/src/packages/cloud/views/auth/SignupPage/SignupPage.tsx +++ b/airbyte-webapp/src/packages/cloud/views/auth/SignupPage/SignupPage.tsx @@ -17,7 +17,6 @@ import { FormTitle } from "../components/FormTitle"; import CheckBoxControl from "../components/CheckBoxControl"; import { useAuthService } from "packages/cloud/services/auth/AuthService"; import { FieldError } from "packages/cloud/lib/errors/FieldError"; -import config from "config"; const MarginBlock = styled.div` margin-bottom: 15px;