Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Hubspot: Support OAuth #6465

Merged
merged 13 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jobs:
GREENHOUSE_TEST_CREDS_LIMITED: ${{ secrets.GREENHOUSE_TEST_CREDS_LIMITED }}
HARVEST_INTEGRATION_TESTS_CREDS: ${{ secrets.HARVEST_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH }}
INSTAGRAM_INTEGRATION_TESTS_CREDS: ${{ secrets.INSTAGRAM_INTEGRATION_TESTS_CREDS }}
INTERCOM_INTEGRATION_TEST_CREDS: ${{ secrets.INTERCOM_INTEGRATION_TEST_CREDS }}
ITERABLE_INTEGRATION_TEST_CREDS: ${{ secrets.ITERABLE_INTEGRATION_TEST_CREDS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ jobs:
GREENHOUSE_TEST_CREDS_LIMITED: ${{ secrets.GREENHOUSE_TEST_CREDS_LIMITED }}
HARVEST_INTEGRATION_TESTS_CREDS: ${{ secrets.HARVEST_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH }}
INSTAGRAM_INTEGRATION_TESTS_CREDS: ${{ secrets.INSTAGRAM_INTEGRATION_TESTS_CREDS }}
INTERCOM_INTEGRATION_TEST_CREDS: ${{ secrets.INTERCOM_INTEGRATION_TEST_CREDS }}
ITERABLE_INTEGRATION_TEST_CREDS: ${{ secrets.ITERABLE_INTEGRATION_TEST_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,59 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "secrets/config_oauth.json"
htrueman marked this conversation as resolved.
Show resolved Hide resolved
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
- config_path: "integration_tests/invalid_config_oauth.json"
status: "exception"
- config_path: "integration_tests/invalid_config_wrong_title.json"
status: "exception"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
# TODO: permissions error with Workflows stream for Test Account
configured_catalog_path: "sample_files/configured_catalog_without_workflows.json"
# incremental: fixme (eugene): '<=' not supported between instances of 'int' and 'str'
# - config_path: "secrets/config.json"
# configured_catalog_path: "sample_files/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# subscription_changes: ["timestamp"]
# email_events: ["timestamp"]
configured_catalog_path: "sample_files/configured_catalog.json"
empty_streams:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are there so many empty streams? what can we do to populate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is on me. I'll update you there after I do a research. So I'll either create an issue to fill those streams with data or fill as much streams as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an issue #6711

[
"contact_lists",
"campaigns",
"tickets",
"subscription_changes",
"quotes",
"email_events",
"engagements",
"forms",
"products",
"workflows",
]
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "sample_files/configured_catalog.json"
empty_streams:
[
"companies",
"deals",
"owners",
"contact_lists",
"campaigns",
"tickets",
"subscription_changes",
"quotes",
"email_events",
"engagements",
"forms",
"products",
"workflows",
]
# incremental: fixme (eugene): '<=' not supported between instances of 'int' and 'str'
htrueman marked this conversation as resolved.
Show resolved Hide resolved
htrueman marked this conversation as resolved.
Show resolved Hide resolved
# - config_path: "secrets/config.json"
# configured_catalog_path: "sample_files/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# subscription_changes: ["timestamp"]
# email_events: ["timestamp"]
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "sample_files/configured_catalog_without_workflows.json"
configured_catalog_path: "sample_files/configured_catalog.json"
- config_path: "secrets/config_oauth.json"
configured_catalog_path: "sample_files/configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"start_date": "2021-01-01T00:00:00Z",
"credentials": {
"credentials_title": "API Key Credentials",
"api_key": "wrongkey-key1-key2-key3-wrongkey1234"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"start_date": "2021-05-30T00:00:00Z",
"credentials": {
"credentials_title": "OAuth Credentials",
"client_id": "fake-client-id-not1234-id12345",
"client_secret": "fake-client-secret-not1234-secret12345",
"refresh_token": "fake-refresh-token-not1234-token12345"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"start_date": "2021-05-30T00:00:00Z",
"credentials": {
"credentials_title": "Fake Credentials",
"client_id": "fake-client-id-not1234-id12345"
}
}
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-hubspot/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"airbyte-protocol",
"base-python",
"backoff==1.11.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import sys
import time
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from functools import lru_cache, partial
from http import HTTPStatus
from typing import Any, Callable, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union

import backoff
import pendulum as pendulum
import requests
from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
htrueman marked this conversation as resolved.
Show resolved Hide resolved
from base_python.entrypoint import logger
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout

Expand Down Expand Up @@ -118,64 +118,41 @@ def log_giveup(_details):


class API:
"""
htrueman marked this conversation as resolved.
Show resolved Hide resolved
Oauth2Authenticator(
token_refresh_endpoint="https://api.hubapi.com/oauth/v1/token",
client_id="fake-client-id-not1234-id12345",
client_secret="fake-client-secret-not1234-secret12345",
refresh_token="fake-refresh-token-not1234-token12345"
)
"""

"""Hubspot API interface, authorize, retrieve and post, supports backoff logic"""

BASE_URL = "https://api.hubapi.com"
USER_AGENT = "Airbyte"

def __init__(self, credentials: Mapping[str, Any]):
self._credentials = {**credentials}
self._session = requests.Session()
credentials_title = credentials.get("credentials_title")

if credentials_title == "OAuth Credentials":
self._session.auth = Oauth2Authenticator(
token_refresh_endpoint=self.BASE_URL + "/oauth/v1/token",
client_id=credentials["client_id"],
client_secret=credentials["client_secret"],
refresh_token=credentials["refresh_token"],
)
elif credentials_title == "API Key Credentials":
self._session.params["hapikey"] = credentials.get("api_key")
else:
raise Exception("No supported `credentials_title` specified. See spec.json for references")

self._session.headers = {
"Content-Type": "application/json",
"User-Agent": self.USER_AGENT,
}

def _acquire_access_token_from_refresh_token(self):
htrueman marked this conversation as resolved.
Show resolved Hide resolved
payload = {
"grant_type": "refresh_token",
"redirect_uri": self._credentials["redirect_uri"],
"refresh_token": self._credentials["refresh_token"],
"client_id": self._credentials["client_id"],
"client_secret": self._credentials["client_secret"],
}

resp = requests.post(self.BASE_URL + "/oauth/v1/token", data=payload)
if resp.status_code == HTTPStatus.FORBIDDEN:
raise HubspotInvalidAuth(resp.content, response=resp)

resp.raise_for_status()
auth = resp.json()
self._credentials["access_token"] = auth["access_token"]
self._credentials["refresh_token"] = auth["refresh_token"]
self._credentials["token_expires"] = datetime.utcnow() + timedelta(seconds=auth["expires_in"] - 600)
logger.info(f"Token refreshed. Expires at {self._credentials['token_expires']}")

@property
def api_key(self) -> Optional[str]:
"""Get API Key if set"""
return self._credentials.get("api_key")

@property
def access_token(self) -> Optional[str]:
"""Get Access Token if set, refreshes token if needed"""
if not self._credentials.get("access_token"):
return None

if self._credentials["token_expires"] is None or self._credentials["token_expires"] < datetime.utcnow():
self._acquire_access_token_from_refresh_token()
return self._credentials.get("access_token")

def _add_auth(self, params: Mapping[str, Any] = None) -> Mapping[str, Any]:
"""Add auth info to request params/header"""
params = params or {}

if self.access_token:
self._session.headers["Authorization"] = f"Bearer {self.access_token}"
else:
params["hapikey"] = self.api_key
return params

@staticmethod
def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
"""Handle response"""
Expand Down Expand Up @@ -204,12 +181,14 @@ def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[M

@retry_connection_handler(max_tries=5, factor=5)
@retry_after_handler(max_tries=3)
def get(self, url: str, params=None) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
response = self._session.get(self.BASE_URL + url, params=self._add_auth(params))
def get(self, url: str, params: MutableMapping[str, Any] = None) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
response = self._session.get(self.BASE_URL + url, params=params)
return self._parse_and_handle_errors(response)

def post(self, url: str, data: Mapping[str, Any], params=None) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
response = self._session.post(self.BASE_URL + url, params=self._add_auth(params), json=data)
def post(
self, url: str, data: Mapping[str, Any], params: MutableMapping[str, Any] = None
) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
response = self._session.post(self.BASE_URL + url, params=params, json=data)
return self._parse_and_handle_errors(response)


Expand Down Expand Up @@ -525,7 +504,9 @@ def url(self):
"""Entity URL"""
return f"/crm/v3/objects/{self.entity}"

def __init__(self, entity: str = None, associations: List[str] = None, include_archived_only: bool = False, **kwargs):
def __init__(
self, entity: Optional[str] = None, associations: Optional[List[str]] = None, include_archived_only: bool = False, **kwargs
):
super().__init__(**kwargs)
self.entity = entity or self.entity
self.associations = associations or self.associations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#


from typing import Any, Iterator, Mapping, Tuple
from typing import Any, Callable, Iterator, Mapping, Optional, Tuple

from airbyte_protocol import AirbyteStream
from base_python import BaseClient
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(self, start_date, credentials, **kwargs):

super().__init__(**kwargs)

def _enumerate_methods(self) -> Mapping[str, callable]:
def _enumerate_methods(self) -> Mapping[str, Callable]:
return {name: api.list for name, api in self._apis.items()}

@property
Expand All @@ -98,7 +98,7 @@ def set_stream_state(self, name: str, state: Any):
"""Set state of stream with corresponding name"""
self._apis[name].state = state

def health_check(self) -> Tuple[bool, str]:
def health_check(self) -> Tuple[bool, Optional[str]]:
alive = True
error_msg = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,77 @@
},
"credentials": {
"type": "object",
htrueman marked this conversation as resolved.
Show resolved Hide resolved
"title": "api key",
"required": ["api_key"],
"properties": {
"api_key": {
"description": "Hubspot API Key. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help finding this key.",
"type": "string",
"airbyte_secret": true
"oneOf": [
{
"type": "object",
"title": "api key",
htrueman marked this conversation as resolved.
Show resolved Hide resolved
"required": ["api_key", "credentials_title"],
"properties": {
"credentials_title": {
"type": "string",
"const": "API Key Credentials",
"enum": ["API Key Credentials"],
"default": "API Key Credentials",
"order": 0
},
"api_key": {
htrueman marked this conversation as resolved.
Show resolved Hide resolved
"description": "Hubspot API Key. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help finding this key.",
"type": "string",
"airbyte_secret": true
}
}
},
{
"type": "object",
"title": "oauth",
htrueman marked this conversation as resolved.
Show resolved Hide resolved
"required": [
"redirect_uri",
"client_id",
"client_secret",
"refresh_token",
"access_token",
"credentials_title"
],
"properties": {
"credentials_title": {
"type": "string",
"const": "OAuth Credentials",
"enum": ["OAuth Credentials"],
"default": "OAuth Credentials",
"order": 0
},
"client_id": {
"description": "Hubspot client_id. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help finding this id.",
"type": "string",
"examples": ["123456789000"]
},
"client_secret": {
"description": "Hubspot client_secret. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help finding this secret.",
"type": "string",
"examples": ["secret"],
"airbyte_secret": true
},
"refresh_token": {
"description": "Hubspot refresh_token. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\">docs</a> if you need help generating the token.",
"type": "string",
"examples": ["refresh_token"],
"airbyte_secret": true
}
}
}
}
]
}
htrueman marked this conversation as resolved.
Show resolved Hide resolved
}
},
"authSpecification": {
"auth_type": "oauth2.0",
"oauth2Specification": {
"rootObject": ["credentials", 1],
"oauthFlowInitParameters": [
["credentials", "client_id"],
["credentials", "client_secret"],
["credentials", "refresh_token"]
htrueman marked this conversation as resolved.
Show resolved Hide resolved
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@

@pytest.fixture(name="some_credentials")
def some_credentials_fixture():
return {"api_key": "wrong_key"}
return {"credentials_title": "API Key Credentials", "api_key": "wrong_key"}


@pytest.fixture(name="creds_with_wrong_permissions")
def creds_with_wrong_permissions():
return {"api_key": "THIS-IS-THE-API_KEY"}
return {"credentials_title": "API Key Credentials", "api_key": "THIS-IS-THE-API_KEY"}


def test_client_backoff_on_limit_reached(requests_mock, some_credentials):
Expand Down Expand Up @@ -110,7 +110,7 @@ def test_wrong_permissions_api_key(requests_mock, creds_with_wrong_permissions):

# Mock the getter method that handles requests.
def get(url=test_stream.url, params=None):
response = api._session.get(api.BASE_URL + url, params=api._add_auth(params))
response = api._session.get(api.BASE_URL + url, params=params)
return api._parse_and_handle_errors(response)

# Define request params value
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/hubspot.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ This connector supports only authentication with API Key. To obtain API key for

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.16 | 2021-09-27 | [6465](https://github.com/airbytehq/airbyte/pull/6465) | Implement OAuth support. Use CDK authenticator instead of connector specific authenticator |
| 0.1.15 | 2021-09-23 | [6374](https://github.com/airbytehq/airbyte/pull/6374) | Use correct schema for `owners` stream |
| 0.1.14 | 2021-09-08 | [5693](https://github.com/airbytehq/airbyte/pull/5693) | Include deal_to_contact association when pulling deal stream and include contact ID in contact stream |
| 0.1.13 | 2021-09-08 | [5834](https://github.com/airbytehq/airbyte/pull/5834) | Fixed array fields without items property in schema |
Expand Down
Loading