diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml
index 6f79ae9d1f46..1b4f2e50e9b6 100644
--- a/.github/workflows/publish-command.yml
+++ b/.github/workflows/publish-command.yml
@@ -116,6 +116,7 @@ jobs:
GREENHOUSE_TEST_CREDS_LIMITED: ${{ secrets.GREENHOUSE_TEST_CREDS_LIMITED }}
HARVEST_INTEGRATION_TESTS_CREDS: ${{ secrets.HARVEST_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS }}
+ HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH }}
INSTAGRAM_INTEGRATION_TESTS_CREDS: ${{ secrets.INSTAGRAM_INTEGRATION_TESTS_CREDS }}
INTERCOM_INTEGRATION_TEST_CREDS: ${{ secrets.INTERCOM_INTEGRATION_TEST_CREDS }}
ITERABLE_INTEGRATION_TEST_CREDS: ${{ secrets.ITERABLE_INTEGRATION_TEST_CREDS }}
diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml
index 0275fe11d14f..bc1a31b2a318 100644
--- a/.github/workflows/test-command.yml
+++ b/.github/workflows/test-command.yml
@@ -111,6 +111,7 @@ jobs:
GREENHOUSE_TEST_CREDS_LIMITED: ${{ secrets.GREENHOUSE_TEST_CREDS_LIMITED }}
HARVEST_INTEGRATION_TESTS_CREDS: ${{ secrets.HARVEST_INTEGRATION_TESTS_CREDS }}
HUBSPOT_INTEGRATION_TESTS_CREDS: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS }}
+ HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH: ${{ secrets.HUBSPOT_INTEGRATION_TESTS_CREDS_OAUTH }}
INSTAGRAM_INTEGRATION_TESTS_CREDS: ${{ secrets.INSTAGRAM_INTEGRATION_TESTS_CREDS }}
INTERCOM_INTEGRATION_TEST_CREDS: ${{ secrets.INTERCOM_INTEGRATION_TEST_CREDS }}
ITERABLE_INTEGRATION_TEST_CREDS: ${{ secrets.ITERABLE_INTEGRATION_TEST_CREDS }}
diff --git a/airbyte-integrations/connectors/source-hubspot/acceptance-test-config.yml b/airbyte-integrations/connectors/source-hubspot/acceptance-test-config.yml
index ceb78e017f5d..a47ab37f6198 100644
--- a/airbyte-integrations/connectors/source-hubspot/acceptance-test-config.yml
+++ b/airbyte-integrations/connectors/source-hubspot/acceptance-test-config.yml
@@ -5,21 +5,60 @@ tests:
connection:
- config_path: "secrets/config.json"
status: "succeed"
+ - config_path: "secrets/config_oauth.json"
+ status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
+ - config_path: "integration_tests/invalid_config_oauth.json"
+ status: "exception"
+ - config_path: "integration_tests/invalid_config_wrong_title.json"
+ status: "exception"
discovery:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
- # TODO: permissions error with Workflows stream for Test Account
- configured_catalog_path: "sample_files/configured_catalog_without_workflows.json"
-# incremental: fixme (eugene): '<=' not supported between instances of 'int' and 'str'
-# - config_path: "secrets/config.json"
-# configured_catalog_path: "sample_files/configured_catalog.json"
-# future_state_path: "integration_tests/abnormal_state.json"
-# cursor_paths:
-# subscription_changes: ["timestamp"]
-# email_events: ["timestamp"]
+ configured_catalog_path: "sample_files/configured_catalog.json"
+ empty_streams:
+ [
+ "contact_lists",
+ "campaigns",
+ "tickets",
+ "subscription_changes",
+ "quotes",
+ "email_events",
+ "engagements",
+ "forms",
+ "products",
+ "workflows",
+ ]
+ - config_path: "secrets/config_oauth.json"
+ configured_catalog_path: "sample_files/configured_catalog.json"
+ empty_streams:
+ [
+ "companies",
+ "deals",
+ "owners",
+ "contact_lists",
+ "campaigns",
+ "tickets",
+ "subscription_changes",
+ "quotes",
+ "email_events",
+ "engagements",
+ "forms",
+ "products",
+ "workflows",
+ ]
+ # incremental: fixme (eugene): '<=' not supported between instances of 'int' and 'str'
+ # See https://github.com/airbytehq/airbyte/issues/6509
+ # - config_path: "secrets/config.json"
+ # configured_catalog_path: "sample_files/configured_catalog.json"
+ # future_state_path: "integration_tests/abnormal_state.json"
+ # cursor_paths:
+ # subscription_changes: ["timestamp"]
+ # email_events: ["timestamp"]
full_refresh:
- config_path: "secrets/config.json"
- configured_catalog_path: "sample_files/configured_catalog_without_workflows.json"
+ configured_catalog_path: "sample_files/configured_catalog.json"
+ - config_path: "secrets/config_oauth.json"
+ configured_catalog_path: "sample_files/configured_catalog.json"
diff --git a/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config.json
index bc19c743fbd7..81743cdb1d40 100644
--- a/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config.json
+++ b/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config.json
@@ -1,6 +1,7 @@
{
"start_date": "2021-01-01T00:00:00Z",
"credentials": {
+ "credentials_title": "API Key Credentials",
"api_key": "wrongkey-key1-key2-key3-wrongkey1234"
}
}
diff --git a/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config_oauth.json b/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config_oauth.json
new file mode 100644
index 000000000000..ec526ff6c580
--- /dev/null
+++ b/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config_oauth.json
@@ -0,0 +1,9 @@
+{
+ "start_date": "2021-05-30T00:00:00Z",
+ "credentials": {
+ "credentials_title": "OAuth Credentials",
+ "client_id": "fake-client-id-not1234-id12345",
+ "client_secret": "fake-client-secret-not1234-secret12345",
+ "refresh_token": "fake-refresh-token-not1234-token12345"
+ }
+}
diff --git a/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config_wrong_title.json b/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config_wrong_title.json
new file mode 100644
index 000000000000..ba1fa4f623d6
--- /dev/null
+++ b/airbyte-integrations/connectors/source-hubspot/integration_tests/invalid_config_wrong_title.json
@@ -0,0 +1,7 @@
+{
+ "start_date": "2021-05-30T00:00:00Z",
+ "credentials": {
+ "credentials_title": "Fake Credentials",
+ "client_id": "fake-client-id-not1234-id12345"
+ }
+}
diff --git a/airbyte-integrations/connectors/source-hubspot/setup.py b/airbyte-integrations/connectors/source-hubspot/setup.py
index 7c7f547f4ad1..d15d84cb5506 100644
--- a/airbyte-integrations/connectors/source-hubspot/setup.py
+++ b/airbyte-integrations/connectors/source-hubspot/setup.py
@@ -6,6 +6,7 @@
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = [
+ "airbyte-cdk~=0.1",
"airbyte-protocol",
"base-python",
"backoff==1.11.1",
diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py
index 8daa72820e23..071f103aaff2 100644
--- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py
+++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/api.py
@@ -6,7 +6,6 @@
import sys
import time
from abc import ABC, abstractmethod
-from datetime import datetime, timedelta
from functools import lru_cache, partial
from http import HTTPStatus
from typing import Any, Callable, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union
@@ -14,6 +13,7 @@
import backoff
import pendulum as pendulum
import requests
+from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator
from base_python.entrypoint import logger
from source_hubspot.errors import HubspotAccessDenied, HubspotInvalidAuth, HubspotRateLimited, HubspotTimeout
@@ -104,58 +104,26 @@ class API:
USER_AGENT = "Airbyte"
def __init__(self, credentials: Mapping[str, Any]):
- self._credentials = {**credentials}
self._session = requests.Session()
+ credentials_title = credentials.get("credentials_title")
+
+ if credentials_title == "OAuth Credentials":
+ self._session.auth = Oauth2Authenticator(
+ token_refresh_endpoint=self.BASE_URL + "/oauth/v1/token",
+ client_id=credentials["client_id"],
+ client_secret=credentials["client_secret"],
+ refresh_token=credentials["refresh_token"],
+ )
+ elif credentials_title == "API Key Credentials":
+ self._session.params["hapikey"] = credentials.get("api_key")
+ else:
+ raise Exception("No supported `credentials_title` specified. See spec.json for references")
+
self._session.headers = {
"Content-Type": "application/json",
"User-Agent": self.USER_AGENT,
}
- def _acquire_access_token_from_refresh_token(self):
- payload = {
- "grant_type": "refresh_token",
- "redirect_uri": self._credentials["redirect_uri"],
- "refresh_token": self._credentials["refresh_token"],
- "client_id": self._credentials["client_id"],
- "client_secret": self._credentials["client_secret"],
- }
-
- resp = requests.post(self.BASE_URL + "/oauth/v1/token", data=payload)
- if resp.status_code == HTTPStatus.FORBIDDEN:
- raise HubspotInvalidAuth(resp.content, response=resp)
-
- resp.raise_for_status()
- auth = resp.json()
- self._credentials["access_token"] = auth["access_token"]
- self._credentials["refresh_token"] = auth["refresh_token"]
- self._credentials["token_expires"] = datetime.utcnow() + timedelta(seconds=auth["expires_in"] - 600)
- logger.info(f"Token refreshed. Expires at {self._credentials['token_expires']}")
-
- @property
- def api_key(self) -> Optional[str]:
- """Get API Key if set"""
- return self._credentials.get("api_key")
-
- @property
- def access_token(self) -> Optional[str]:
- """Get Access Token if set, refreshes token if needed"""
- if not self._credentials.get("access_token"):
- return None
-
- if self._credentials["token_expires"] is None or self._credentials["token_expires"] < datetime.utcnow():
- self._acquire_access_token_from_refresh_token()
- return self._credentials.get("access_token")
-
- def _add_auth(self, params: Mapping[str, Any] = None) -> Mapping[str, Any]:
- """Add auth info to request params/header"""
- params = params or {}
-
- if self.access_token:
- self._session.headers["Authorization"] = f"Bearer {self.access_token}"
- else:
- params["hapikey"] = self.api_key
- return params
-
@staticmethod
def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
"""Handle response"""
@@ -184,12 +152,14 @@ def _parse_and_handle_errors(response) -> Union[MutableMapping[str, Any], List[M
@retry_connection_handler(max_tries=5, factor=5)
@retry_after_handler(max_tries=3)
- def get(self, url: str, params=None) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
- response = self._session.get(self.BASE_URL + url, params=self._add_auth(params))
+ def get(self, url: str, params: MutableMapping[str, Any] = None) -> Union[MutableMapping[str, Any], List[MutableMapping[str, Any]]]:
+ response = self._session.get(self.BASE_URL + url, params=params)
return self._parse_and_handle_errors(response)
- def post(self, url: str, data: Mapping[str, Any], params=None) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
- response = self._session.post(self.BASE_URL + url, params=self._add_auth(params), json=data)
+ def post(
+ self, url: str, data: Mapping[str, Any], params: MutableMapping[str, Any] = None
+ ) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
+ response = self._session.post(self.BASE_URL + url, params=params, json=data)
return self._parse_and_handle_errors(response)
@@ -505,7 +475,9 @@ def url(self):
"""Entity URL"""
return f"/crm/v3/objects/{self.entity}"
- def __init__(self, entity: str = None, associations: List[str] = None, include_archived_only: bool = False, **kwargs):
+ def __init__(
+ self, entity: Optional[str] = None, associations: Optional[List[str]] = None, include_archived_only: bool = False, **kwargs
+ ):
super().__init__(**kwargs)
self.entity = entity or self.entity
self.associations = associations or self.associations
diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py
index 5d2431ed4f2c..42533bfcb588 100644
--- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py
+++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/client.py
@@ -3,7 +3,7 @@
#
-from typing import Any, Iterator, Mapping, Tuple
+from typing import Any, Callable, Iterator, Mapping, Optional, Tuple
from airbyte_protocol import AirbyteStream
from base_python import BaseClient
@@ -53,7 +53,7 @@ def __init__(self, start_date, credentials, **kwargs):
super().__init__(**kwargs)
- def _enumerate_methods(self) -> Mapping[str, callable]:
+ def _enumerate_methods(self) -> Mapping[str, Callable]:
return {name: api.list for name, api in self._apis.items()}
@property
@@ -78,7 +78,7 @@ def set_stream_state(self, name: str, state: Any):
"""Set state of stream with corresponding name"""
self._apis[name].state = state
- def health_check(self) -> Tuple[bool, str]:
+ def health_check(self) -> Tuple[bool, Optional[str]]:
alive = True
error_msg = None
diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.json b/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.json
index 77d0bb325c19..e3accc60a1df 100644
--- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.json
+++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.json
@@ -9,22 +9,95 @@
"properties": {
"start_date": {
"type": "string",
+ "title": "Replication start date",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"description": "UTC date and time in the format 2017-01-25T00:00:00Z. Any data before this date will not be replicated.",
"examples": ["2017-01-25T00:00:00Z"]
},
"credentials": {
+ "title": "Authentication mechanism",
+ "description": "Choose either to provide the API key or the OAuth2.0 credentials",
"type": "object",
- "title": "api key",
- "required": ["api_key"],
- "properties": {
- "api_key": {
- "description": "Hubspot API Key. See our docs if you need help finding this key.",
- "type": "string",
- "airbyte_secret": true
+ "oneOf": [
+ {
+ "type": "object",
+ "title": "Authenticate via Hubspot (Oauth)",
+ "required": [
+ "redirect_uri",
+ "client_id",
+ "client_secret",
+ "refresh_token",
+ "access_token",
+ "credentials_title"
+ ],
+ "properties": {
+ "credentials_title": {
+ "type": "string",
+ "title": "Credentials title",
+ "description": "Name of the credentials set",
+ "const": "OAuth Credentials",
+ "enum": ["OAuth Credentials"],
+ "default": "OAuth Credentials",
+ "order": 0
+ },
+ "client_id": {
+ "title": "Client ID",
+ "description": "Hubspot client_id. See our docs if you need help finding this id.",
+ "type": "string",
+ "examples": ["123456789000"]
+ },
+ "client_secret": {
+ "title": "Client Secret",
+ "description": "Hubspot client_secret. See our docs if you need help finding this secret.",
+ "type": "string",
+ "examples": ["secret"],
+ "airbyte_secret": true
+ },
+ "refresh_token": {
+ "title": "Refresh token",
+ "description": "Hubspot refresh_token. See our docs if you need help generating the token.",
+ "type": "string",
+ "examples": ["refresh_token"],
+ "airbyte_secret": true
+ }
+ }
+ },
+ {
+ "type": "object",
+ "title": "API key",
+ "required": ["api_key", "credentials_title"],
+ "properties": {
+ "credentials_title": {
+ "type": "string",
+ "title": "Credentials title",
+ "description": "Name of the credentials set",
+ "const": "API Key Credentials",
+ "enum": ["API Key Credentials"],
+ "default": "API Key Credentials",
+ "order": 0
+ },
+ "api_key": {
+ "title": "API key",
+ "description": "Hubspot API Key. See our docs if you need help finding this key.",
+ "type": "string",
+ "airbyte_secret": true
+ }
+ }
}
- }
+ ]
}
}
+ },
+ "authSpecification": {
+ "auth_type": "oauth2.0",
+ "oauth2Specification": {
+ "rootObject": ["credentials", 0],
+ "oauthFlowInitParameters": [
+ ["client_id"],
+ ["client_secret"],
+ ["refresh_token"]
+ ],
+ "oauthFlowOutputParameters": [["refresh_token"]]
+ }
}
}
diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_client.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_client.py
index 271a7feec390..2b5e6e360fca 100644
--- a/airbyte-integrations/connectors/source-hubspot/unit_tests/test_client.py
+++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/test_client.py
@@ -10,12 +10,12 @@
@pytest.fixture(name="some_credentials")
def some_credentials_fixture():
- return {"api_key": "wrong_key"}
+ return {"credentials_title": "API Key Credentials", "api_key": "wrong_key"}
@pytest.fixture(name="creds_with_wrong_permissions")
def creds_with_wrong_permissions():
- return {"api_key": "THIS-IS-THE-API_KEY"}
+ return {"credentials_title": "API Key Credentials", "api_key": "THIS-IS-THE-API_KEY"}
def test_client_backoff_on_limit_reached(requests_mock, some_credentials):
@@ -90,7 +90,7 @@ def test_wrong_permissions_api_key(requests_mock, creds_with_wrong_permissions):
# Mock the getter method that handles requests.
def get(url=test_stream.url, params=None):
- response = api._session.get(api.BASE_URL + url, params=api._add_auth(params))
+ response = api._session.get(api.BASE_URL + url, params=params)
return api._parse_and_handle_errors(response)
# Define request params value
diff --git a/docs/integrations/sources/hubspot.md b/docs/integrations/sources/hubspot.md
index c7fb4774fc36..8f97a910c438 100644
--- a/docs/integrations/sources/hubspot.md
+++ b/docs/integrations/sources/hubspot.md
@@ -40,6 +40,7 @@ This source is capable of syncing the following tables and their data:
* Hubspot Account
* Api credentials
+* If using Oauth, [scopes](https://legacydocs.hubspot.com/docs/methods/oauth2/initiate-oauth-integration#scopes) enabled for the streams you want to sync
{% hint style="info" %}
Hubspot's API will [rate limit](https://developers.hubspot.com/docs/api/usage-details) the amount of records you can sync daily, so make sure that you are on the appropriate plan if you are planning on syncing more than 250,000 records per day.
@@ -64,10 +65,34 @@ Example of the output message when trying to read `workflows` stream with missin
}
```
+### Required scopes
+
+If you are using Oauth, most of the streams require the appropriate [scopes](https://legacydocs.hubspot.com/docs/methods/oauth2/initiate-oauth-integration#scopes) enabled for the API account.
+
+| Stream | Required Scope |
+| :--- | :---- |
+| `campaigns` | `content` |
+| `companies` | `contacts` |
+| `contact_lists` | `contacts` |
+| `contacts` | `contacts` |
+| `deal_pipelines` | either the `contacts` scope (to fetch deals pipelines) or the `tickets` scope. |
+| `deals` | `contacts` |
+| `email_events` | `content` |
+| `engagements` | `contacts` |
+| `forms` | `forms` |
+| `line_items` | `e-commerce` |
+| `owners` | `contacts` |
+| `products` | `e-commerce` |
+| `quotes` | no scope required |
+| `subscription_changes` | `content` |
+| `tickets` | `tickets` |
+| `workflows` | `automation` |
+
## Changelog
| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
+| 0.1.16 | 2021-09-27 | [6465](https://github.com/airbytehq/airbyte/pull/6465) | Implement OAuth support. Use CDK authenticator instead of connector specific authenticator |
| 0.1.15 | 2021-09-23 | [6374](https://github.com/airbytehq/airbyte/pull/6374) | Use correct schema for `owners` stream |
| 0.1.14 | 2021-09-08 | [5693](https://github.com/airbytehq/airbyte/pull/5693) | Include deal_to_contact association when pulling deal stream and include contact ID in contact stream |
| 0.1.13 | 2021-09-08 | [5834](https://github.com/airbytehq/airbyte/pull/5834) | Fixed array fields without items property in schema |
diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh
index add334226dcd..9aff6efcf648 100755
--- a/tools/bin/ci_credentials.sh
+++ b/tools/bin/ci_credentials.sh
@@ -84,6 +84,7 @@ write_standard_creds source-greenhouse "$GREENHOUSE_TEST_CREDS"
write_standard_creds source-greenhouse "$GREENHOUSE_TEST_CREDS_LIMITED" "config_users_only.json"
write_standard_creds source-harvest "$HARVEST_INTEGRATION_TESTS_CREDS"
write_standard_creds source-hubspot "$HUBSPOT_INTEGRATION_TESTS_CREDS"
+write_standard_creds source-hubspot "$HUBSPOT_INTEGRATION_TESTS_CREDS" "config_oauth.json"
write_standard_creds source-instagram "$INSTAGRAM_INTEGRATION_TESTS_CREDS"
write_standard_creds source-intercom "$INTERCOM_INTEGRATION_TEST_CREDS"
write_standard_creds source-iterable "$ITERABLE_INTEGRATION_TEST_CREDS"