Skip to content

Commit

Permalink
[low-code connectors]: Assert there are no custom top-level fields (#…
Browse files Browse the repository at this point in the history
…15489)

* move components to definitions field

* Also update the references

* validate the top level fields and add version

* raise exception on unknown fields

* newline

* unit tests

* set version to 0.1.0

* newline
  • Loading branch information
girarda committed Aug 11, 2022
1 parent a3ee499 commit 0f780fd
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ class ReadException(Exception):
"""
Raise when there is an error reading data from an API Source
"""


class InvalidConnectorDefinitionException(Exception):
"""
Raise when the connector definition is invalid
"""
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.read_exception import ReadException
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.streams import Stream
Expand All @@ -16,6 +17,8 @@
class YamlDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a yaml file"""

VALID_TOP_LEVEL_FIELDS = {"definitions", "streams", "check", "version"}

def __init__(self, path_to_yaml):
"""
:param path_to_yaml: Path to the yaml file describing the source
Expand All @@ -25,6 +28,11 @@ def __init__(self, path_to_yaml):
self._path_to_yaml = path_to_yaml
self._source_config = self._read_and_parse_yaml_file(path_to_yaml)

# Stopgap to protect the top-level namespace until it's validated through the schema
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS]
if unknown_fields:
raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}")

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.read_exception import ReadException
from airbyte_cdk.sources.declarative.exceptions import ReadException
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import os
import tempfile
import unittest

from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource


class TestYamlDeclarativeSource(unittest.TestCase):
def test_source_is_created_if_toplevel_fields_are_known(self):
content = """
version: "version"
streams: "streams"
check: "check"
"""
temporary_file = TestFileContent(content)
YamlDeclarativeSource(temporary_file.filename)

def test_source_is_not_created_if_toplevel_fields_are_unknown(self):
content = """
version: "version"
streams: "streams"
check: "check"
not_a_valid_field: "error"
"""
temporary_file = TestFileContent(content)
with self.assertRaises(InvalidConnectorDefinitionException):
YamlDeclarativeSource(temporary_file.filename)


class TestFileContent:
def __init__(self, content):
self.file = tempfile.NamedTemporaryFile(mode="w", delete=False)

with self.file as f:
f.write(content)

@property
def filename(self):
return self.file.name

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
os.unlink(self.filename)
Original file line number Diff line number Diff line change
@@ -1,42 +1,44 @@
schema_loader:
type: JsonSchema
file_path: "./source_{{snakeCase name}}/schemas/\{{ options['name'] }}.json"
selector:
type: RecordSelector
extractor:
type: JelloExtractor
transform: "_"
requester:
type: HttpRequester
name: "\{{ options['name'] }}"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "\{{ config['api_key'] }}"
retriever:
type: SimpleRetriever
$options:
url_base: TODO "your_api_base_url"
name: "\{{ options['name'] }}"
primary_key: "\{{ options['primary_key'] }}"
record_selector:
$ref: "*ref(selector)"
paginator:
type: NoPagination
customers_stream:
type: DeclarativeStream
$options:
name: "customers"
primary_key: "id"
version: "0.1.0"

definitions:
schema_loader:
$ref: "*ref(schema_loader)"
type: JsonSchema
file_path: "./source_{{snakeCase name}}/schemas/\{{ options['name'] }}.json"
selector:
type: RecordSelector
extractor:
type: JelloExtractor
transform: "_"
requester:
type: HttpRequester
name: "\{{ options['name'] }}"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "\{{ config['api_key'] }}"
retriever:
$ref: "*ref(retriever)"
requester:
$ref: "*ref(requester)"
path: TODO "your_endpoint_path"
type: SimpleRetriever
$options:
url_base: TODO "your_api_base_url"
name: "\{{ options['name'] }}"
primary_key: "\{{ options['primary_key'] }}"
record_selector:
$ref: "*ref(definitions.selector)"
paginator:
type: NoPagination

streams:
- "*ref(customers_stream)"
- type: DeclarativeStream
$options:
name: "customers"
primary_key: "id"
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
requester:
$ref: "*ref(definitions.requester)"
path: TODO "your_endpoint_path"
check:
type: CheckStream
stream_names: ["customers"]

0 comments on commit 0f780fd

Please sign in to comment.