Skip to content

Commit

Permalink
🎉 Source Hubspot: Support OAuth (#6465)
Browse files Browse the repository at this point in the history
* Add required scopes to hubspot.md docs

* Update spec with oauth creds.
Update acceptance test config with oauth tests.
Add oauth test creds configs.

* Fix spec.js `oneOf` usage

* Add CDK authenticators usage.
Update source acceptance tests.
Update spec.json.

* Fix source unit tests

* Update hubspot.md scopes docs

* Add spec field titles.
Update comments.

* Update spec.json oauthFlowOutputParameters.

* Update spec.json to pass SAT.
  • Loading branch information
htrueman authored Oct 4, 2021
1 parent 5ff55ab commit 351e998
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 76 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jobs:
GREENHOUSE_TEST_CREDS_LIMITED: ${{ secrets.GREENHOUSE_TEST_CREDS_LIMITED }}
HARVEST_INTEGRATION_TESTS_CREDS: ${{ secrets.HARVEST_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH }}
INSTAGRAM_INTEGRATION_TESTS_CREDS: ${{ secrets.INSTAGRAM_INTEGRATION_TESTS_CREDS }}
INTERCOM_INTEGRATION_TEST_CREDS: ${{ secrets.INTERCOM_INTEGRATION_TEST_CREDS }}
ITERABLE_INTEGRATION_TEST_CREDS: ${{ secrets.ITERABLE_INTEGRATION_TEST_CREDS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ jobs:
GREENHOUSE_TEST_CREDS_LIMITED: ${{ secrets.GREENHOUSE_TEST_CREDS_LIMITED }}
HARVEST_INTEGRATION_TESTS_CREDS: ${{ secrets.HARVEST_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH }}
INSTAGRAM_INTEGRATION_TESTS_CREDS: ${{ secrets.INSTAGRAM_INTEGRATION_TESTS_CREDS }}
INTERCOM_INTEGRATION_TEST_CREDS: ${{ secrets.INTERCOM_INTEGRATION_TEST_CREDS }}
ITERABLE_INTEGRATION_TEST_CREDS: ${{ secrets.ITERABLE_INTEGRATION_TEST_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,60 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "secrets/config_oauth.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/invalid_config_oauth.json"
status: "exception"
- config_path: "integration_tests/invalid_config_wrong_title.json"
status: "exception"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
# TODO: permissions error with Workflows stream for Test Account
configured_catalog_path: "sample_files/configured_catalog_without_workflows.json"
# incremental: fixme (eugene): '<=' not supported between instances of 'int' and 'str'
# - config_path: "secrets/config.json"
# configured_catalog_path: "sample_files/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# subscription_changes: ["timestamp"]
# email_events: ["timestamp"]
configured_catalog_path: "sample_files/configured_catalog.json"
empty_streams:
[
"contact_lists",
"campaigns",
"tickets",
"subscription_changes",
"quotes",
"email_events",
"engagements",
"forms",
"products",
"workflows",
]
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "sample_files/configured_catalog.json"
empty_streams:
[
"companies",
"deals",
"owners",
"contact_lists",
"campaigns",
"tickets",
"subscription_changes",
"quotes",
"email_events",
"engagements",
"forms",
"products",
"workflows",
]
# incremental: fixme (eugene): '<=' not supported between instances of 'int' and 'str'
# See https://github.com/airbytehq/airbyte/issues/6509
# - config_path: "secrets/config.json"
# configured_catalog_path: "sample_files/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# subscription_changes: ["timestamp"]
# email_events: ["timestamp"]
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/configured_catalog_without_workflows.json"
configured_catalog_path: "sample_files/configured_catalog.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "sample_files/configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"start_date": "2021-01-01T00:00:00Z",
"credentials": {
"credentials_title": "API Key Credentials",
"api_key": "wrongkey-key1-key2-key3-wrongkey1234"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"start_date": "2021-05-30T00:00:00Z",
"credentials": {
"credentials_title": "OAuth Credentials",
"client_id": "fake-client-id-not1234-id12345",
"client_secret": "fake-client-secret-not1234-secret12345",
"refresh_token": "fake-refresh-token-not1234-token12345"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"start_date": "2021-05-30T00:00:00Z",
"credentials": {
"credentials_title": "Fake Credentials",
"client_id": "fake-client-id-not1234-id12345"
}
}
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-hubspot/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"airbyte-protocol",
"base-python",
"backoff==1.11.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import sys
import time
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from functools import lru_cache, partial
from http import HTTPStatus
from typing import Any, Callable, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union

import backoff
import pendulum as pendulum
import requests
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
from base_python.entrypoint import logger
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout

Expand Down Expand Up @@ -104,58 +104,26 @@ class API:
USER_AGENT = "Airbyte"

def __init__(self, credentials: Mapping[str, Any]):
self._credentials = {**credentials}
self._session = requests.Session()
credentials_title = credentials.get("credentials_title")

if credentials_title == "OAuth Credentials":
self._session.auth = Oauth2Authenticator(
token_refresh_endpoint=self.BASE_URL + "/oauth/v1/token",
client_id=credentials["client_id"],
client_secret=credentials["client_secret"],
refresh_token=credentials["refresh_token"],
)
elif credentials_title == "API Key Credentials":
self._session.params["hapikey"] = credentials.get("api_key")
else:
raise Exception("No supported `credentials_title` specified. See spec.json for references")

self._session.headers = {
"Content-Type": "application/json",
"User-Agent": self.USER_AGENT,
}

def _acquire_access_token_from_refresh_token(self):
payload = {
"grant_type": "refresh_token",
"redirect_uri": self._credentials["redirect_uri"],
"refresh_token": self._credentials["refresh_token"],
"client_id": self._credentials["client_id"],
"client_secret": self._credentials["client_secret"],
}

resp = requests.post(self.BASE_URL + "/oauth/v1/token", data=payload)
if resp.status_code == HTTPStatus.FORBIDDEN:
raise HubspotInvalidAuth(resp.content, response=resp)

resp.raise_for_status()
auth = resp.json()
self._credentials["access_token"] = auth["access_token"]
self._credentials["refresh_token"] = auth["refresh_token"]
self._credentials["token_expires"] = datetime.utcnow() + timedelta(seconds=auth["expires_in"] - 600)
logger.info(f"Token refreshed. Expires at {self._credentials['token_expires']}")

@property
def api_key(self) -> Optional[str]:
"""Get API Key if set"""
return self._credentials.get("api_key")

@property
def access_token(self) -> Optional[str]:
"""Get Access Token if set, refreshes token if needed"""
if not self._credentials.get("access_token"):
return None

if self._credentials["token_expires"] is None or self._credentials["token_expires"] < datetime.utcnow():
self._acquire_access_token_from_refresh_token()
return self._credentials.get("access_token")

def _add_auth(self, params: Mapping[str, Any] = None) -> Mapping[str, Any]:
"""Add auth info to request params/header"""
params = params or {}

if self.access_token:
self._session.headers["Authorization"] = f"Bearer {self.access_token}"
else:
params["hapikey"] = self.api_key
return params

@staticmethod
def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
"""Handle response"""
Expand Down Expand Up @@ -184,12 +152,14 @@ def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[M

@retry_connection_handler(max_tries=5, factor=5)
@retry_after_handler(max_tries=3)
def get(self, url: str, params=None) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
response = self._session.get(self.BASE_URL + url, params=self._add_auth(params))
def get(self, url: str, params: MutableMapping[str, Any] = None) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
response = self._session.get(self.BASE_URL + url, params=params)
return self._parse_and_handle_errors(response)

def post(self, url: str, data: Mapping[str, Any], params=None) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
response = self._session.post(self.BASE_URL + url, params=self._add_auth(params), json=data)
def post(
self, url: str, data: Mapping[str, Any], params: MutableMapping[str, Any] = None
) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
response = self._session.post(self.BASE_URL + url, params=params, json=data)
return self._parse_and_handle_errors(response)


Expand Down Expand Up @@ -505,7 +475,9 @@ def url(self):
"""Entity URL"""
return f"/crm/v3/objects/{self.entity}"

def __init__(self, entity: str = None, associations: List[str] = None, include_archived_only: bool = False, **kwargs):
def __init__(
self, entity: Optional[str] = None, associations: Optional[List[str]] = None, include_archived_only: bool = False, **kwargs
):
super().__init__(**kwargs)
self.entity = entity or self.entity
self.associations = associations or self.associations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#


from typing import Any, Iterator, Mapping, Tuple
from typing import Any, Callable, Iterator, Mapping, Optional, Tuple

from airbyte_protocol import AirbyteStream
from base_python import BaseClient
Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(self, start_date, credentials, **kwargs):

super().__init__(**kwargs)

def _enumerate_methods(self) -> Mapping[str, callable]:
def _enumerate_methods(self) -> Mapping[str, Callable]:
return {name: api.list for name, api in self._apis.items()}

@property
Expand All @@ -78,7 +78,7 @@ def set_stream_state(self, name: str, state: Any):
"""Set state of stream with corresponding name"""
self._apis[name].state = state

def health_check(self) -> Tuple[bool, str]:
def health_check(self) -> Tuple[bool, Optional[str]]:
alive = True
error_msg = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,95 @@
"properties": {
"start_date": {
"type": "string",
"title": "Replication start date",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.",
"examples": ["2017-01-25T00:00:00Z"]
},
"credentials": {
"title": "Authentication mechanism",
"description": "Choose either to provide the API key or the OAuth2.0 credentials",
"type": "object",
"title": "api key",
"required": ["api_key"],
"properties": {
"api_key": {
"description": "Hubspot API Key. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help finding this key.",
"type": "string",
"airbyte_secret": true
"oneOf": [
{
"type": "object",
"title": "Authenticate via Hubspot (Oauth)",
"required": [
"redirect_uri",
"client_id",
"client_secret",
"refresh_token",
"access_token",
"credentials_title"
],
"properties": {
"credentials_title": {
"type": "string",
"title": "Credentials title",
"description": "Name of the credentials set",
"const": "OAuth Credentials",
"enum": ["OAuth Credentials"],
"default": "OAuth Credentials",
"order": 0
},
"client_id": {
"title": "Client ID",
"description": "Hubspot client_id. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help finding this id.",
"type": "string",
"examples": ["123456789000"]
},
"client_secret": {
"title": "Client Secret",
"description": "Hubspot client_secret. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help finding this secret.",
"type": "string",
"examples": ["secret"],
"airbyte_secret": true
},
"refresh_token": {
"title": "Refresh token",
"description": "Hubspot refresh_token. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help generating the token.",
"type": "string",
"examples": ["refresh_token"],
"airbyte_secret": true
}
}
},
{
"type": "object",
"title": "API key",
"required": ["api_key", "credentials_title"],
"properties": {
"credentials_title": {
"type": "string",
"title": "Credentials title",
"description": "Name of the credentials set",
"const": "API Key Credentials",
"enum": ["API Key Credentials"],
"default": "API Key Credentials",
"order": 0
},
"api_key": {
"title": "API key",
"description": "Hubspot API Key. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help finding this key.",
"type": "string",
"airbyte_secret": true
}
}
}
}
]
}
}
},
"authSpecification": {
"auth_type": "oauth2.0",
"oauth2Specification": {
"rootObject": ["credentials", 0],
"oauthFlowInitParameters": [
["client_id"],
["client_secret"],
["refresh_token"]
],
"oauthFlowOutputParameters": [["refresh_token"]]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@

@pytest.fixture(name="some_credentials")
def some_credentials_fixture():
return {"api_key": "wrong_key"}
return {"credentials_title": "API Key Credentials", "api_key": "wrong_key"}


@pytest.fixture(name="creds_with_wrong_permissions")
def creds_with_wrong_permissions():
return {"api_key": "THIS-IS-THE-API_KEY"}
return {"credentials_title": "API Key Credentials", "api_key": "THIS-IS-THE-API_KEY"}


def test_client_backoff_on_limit_reached(requests_mock, some_credentials):
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_wrong_permissions_api_key(requests_mock, creds_with_wrong_permissions):

# Mock the getter method that handles requests.
def get(url=test_stream.url, params=None):
response = api._session.get(api.BASE_URL + url, params=api._add_auth(params))
response = api._session.get(api.BASE_URL + url, params=params)
return api._parse_and_handle_errors(response)

# Define request params value
Expand Down
Loading

0 comments on commit 351e998

Please sign in to comment.