Skip to content

Commit

Permalink
Merge branch 'main' into add-return-immediately-argument-to-pubsubpul…
Browse files Browse the repository at this point in the history
…lsensor-class
  • Loading branch information
arnaubadia authored Aug 28, 2024
2 parents 6bd1a2f + c2d5ef4 commit e4d83a2
Show file tree
Hide file tree
Showing 15 changed files with 356 additions and 387 deletions.
15 changes: 15 additions & 0 deletions airflow/providers/airbyte/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@
Changelog
---------

4.0.0
.....

Breaking changes
~~~~~~~~~~~~~~~~

.. note::
This version introduce a new way to handle the connection to Airbyte using ``client_id`` and ``client_secret`` instead of ``login`` and ``password``.
You can get them accessing the Airbyte UI and creating a new Application in the Settings page.

There is a large refactor to create a connection.
You must specify the Full Qualified Domain Name in the ``host`` parameter, eg: ``https://my.company:8000/airbyte/v1/``.
The ``token_url`` parameter is optional and it is used to create the access token, the default value is ``v1/applications/token`` used by Airbyte Cloud.
You must remove the ``api_type`` parameter from your DAG it isn't required anymore.

3.9.0
.....

Expand Down
241 changes: 87 additions & 154 deletions airflow/providers/airbyte/hooks/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,126 +17,105 @@
# under the License.
from __future__ import annotations

import base64
import json
import time
from typing import TYPE_CHECKING, Any, Literal, TypeVar
from typing import Any, TypeVar

import aiohttp
from aiohttp import ClientResponseError
from asgiref.sync import sync_to_async
from airbyte_api import AirbyteAPI
from airbyte_api.api import CancelJobRequest, GetJobRequest
from airbyte_api.models import JobCreateRequest, JobStatusEnum, JobTypeEnum, SchemeClientCredentials, Security

from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook

if TYPE_CHECKING:
from airflow.models import Connection
from airflow.hooks.base import BaseHook

T = TypeVar("T", bound=Any)


class AirbyteHook(HttpHook):
class AirbyteHook(BaseHook):
"""
Hook for Airbyte API.
:param airbyte_conn_id: Optional. The name of the Airflow connection to get
connection information for Airbyte. Defaults to "airbyte_default".
:param api_version: Optional. Airbyte API version. Defaults to "v1".
:param api_type: Optional. The type of Airbyte API to use. Either "config" or "cloud". Defaults to "config".
"""

conn_name_attr = "airbyte_conn_id"
default_conn_name = "airbyte_default"
conn_type = "airbyte"
hook_name = "Airbyte"

RUNNING = "running"
SUCCEEDED = "succeeded"
CANCELLED = "cancelled"
PENDING = "pending"
FAILED = "failed"
ERROR = "error"
INCOMPLETE = "incomplete"

def __init__(
self,
airbyte_conn_id: str = "airbyte_default",
api_version: str = "v1",
api_type: Literal["config", "cloud"] = "config",
) -> None:
super().__init__(http_conn_id=airbyte_conn_id)
super().__init__()
self.api_version: str = api_version
self.api_type: str = api_type

async def get_headers_tenants_from_connection(self) -> tuple[dict[str, Any], str]:
"""Get Headers, tenants from the connection details."""
connection: Connection = await sync_to_async(self.get_connection)(self.http_conn_id)
# schema defaults to HTTP
schema = connection.schema if connection.schema else "http"
base_url = f"{schema}://{connection.host}"

if connection.port:
base_url += f":{connection.port}"

if self.api_type == "config":
credentials = f"{connection.login}:{connection.password}"
credentials_base64 = base64.b64encode(credentials.encode("utf-8")).decode("utf-8")
authorized_headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Basic {credentials_base64}",
}
else:
authorized_headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {connection.password}",
}

return authorized_headers, base_url

async def get_job_details(self, job_id: int) -> Any:
self.airbyte_conn_id = airbyte_conn_id
self.conn = self.get_conn_params(self.airbyte_conn_id)
self.airbyte_api = self.create_api_session()

def get_conn_params(self, conn_id: str) -> Any:
conn = self.get_connection(conn_id)

conn_params: dict = {}
conn_params["host"] = conn.host
conn_params["client_id"] = conn.login
conn_params["client_secret"] = conn.password
conn_params["token_url"] = conn.schema or "v1/applications/token"

return conn_params

def create_api_session(self) -> AirbyteAPI:
"""Create Airbyte API session."""
credentials = SchemeClientCredentials(
client_id=self.conn["client_id"],
client_secret=self.conn["client_secret"],
TOKEN_URL=self.conn["token_url"],
)

return AirbyteAPI(
server_url=self.conn["host"],
security=Security(client_credentials=credentials),
)

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Return custom field behaviour."""
return {
"hidden_fields": [
"extra",
"port",
],
"relabeling": {"login": "Client ID", "password": "Client Secret", "schema": "Token URL"},
"placeholders": {},
}

def get_job_details(self, job_id: int) -> Any:
"""
Use Http async call to retrieve metadata for a specific job of an Airbyte Sync.
:param job_id: The ID of an Airbyte Sync Job.
"""
headers, base_url = await self.get_headers_tenants_from_connection()
if self.api_type == "config":
url = f"{base_url}/api/{self.api_version}/jobs/get"
self.log.info("URL for api request: %s", url)
async with aiohttp.ClientSession(headers=headers) as session:
async with session.post(url=url, data=json.dumps({"id": job_id})) as response:
try:
response.raise_for_status()
return await response.json()
except ClientResponseError as e:
msg = f"{e.status}: {e.message} - {e.request_info}"
raise AirflowException(msg)
else:
url = f"{base_url}/{self.api_version}/jobs/{job_id}"
self.log.info("URL for api request: %s", url)
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(url=url) as response:
try:
response.raise_for_status()
return await response.json()
except ClientResponseError as e:
msg = f"{e.status}: {e.message} - {e.request_info}"
raise AirflowException(msg)
try:
get_job_res = self.airbyte_api.jobs.get_job(
request=GetJobRequest(
job_id=job_id,
)
)
return get_job_res.job_response
except Exception as e:
raise AirflowException(e)

async def get_job_status(self, job_id: int) -> str:
def get_job_status(self, job_id: int) -> str:
"""
Retrieve the status for a specific job of an Airbyte Sync.
:param job_id: The ID of an Airbyte Sync Job.
"""
self.log.info("Getting the status of job run %s.", job_id)
response = await self.get_job_details(job_id=job_id)
if self.api_type == "config":
return str(response["job"]["status"])
else:
return str(response["status"])
response = self.get_job_details(job_id=job_id)
return response.status

def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: float | None = 3600) -> None:
"""
Expand All @@ -155,105 +134,59 @@ def wait_for_job(self, job_id: str | int, wait_seconds: float = 3, timeout: floa
raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
time.sleep(wait_seconds)
try:
job = self.get_job(job_id=(int(job_id)))
if self.api_type == "config":
state = job.json()["job"]["status"]
else:
state = job.json()["status"]
job = self.get_job_details(job_id=(int(job_id)))
state = job.status

except AirflowException as err:
self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
continue

if state in (self.RUNNING, self.PENDING, self.INCOMPLETE):
if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, JobStatusEnum.INCOMPLETE):
continue
if state == self.SUCCEEDED:
if state == JobStatusEnum.SUCCEEDED:
break
if state == self.ERROR:
if state == JobStatusEnum.FAILED:
raise AirflowException(f"Job failed:\n{job}")
elif state == self.CANCELLED:
elif state == JobStatusEnum.CANCELLED:
raise AirflowException(f"Job was cancelled:\n{job}")
else:
raise AirflowException(f"Encountered unexpected state `{state}` for job_id `{job_id}`")

def submit_sync_connection(self, connection_id: str) -> Any:
"""
Submit a job to a Airbyte server.
:param connection_id: Required. The ConnectionId of the Airbyte Connection.
"""
if self.api_type == "config":
return self.run(
endpoint=f"api/{self.api_version}/connections/sync",
json={"connectionId": connection_id},
headers={"accept": "application/json"},
)
else:
conn = self.get_connection(self.http_conn_id)
self.method = "POST"
return self.run(
endpoint=f"{self.api_version}/jobs",
headers={"accept": "application/json", "authorization": f"Bearer {conn.password}"},
json={
"jobType": "sync",
"connectionId": connection_id,
}, # TODO: add an option to pass jobType = reset
)

def get_job(self, job_id: int) -> Any:
"""
Get the resource representation for a job in Airbyte.
:param job_id: Required. Id of the Airbyte job
"""
if self.api_type == "config":
return self.run(
endpoint=f"api/{self.api_version}/jobs/get",
json={"id": job_id},
headers={"accept": "application/json"},
)
else:
self.method = "GET"
conn = self.get_connection(self.http_conn_id)
return self.run(
endpoint=f"{self.api_version}/jobs/{job_id}",
headers={"accept": "application/json", "authorization": f"Bearer {conn.password}"},
try:
res = self.airbyte_api.jobs.create_job(
request=JobCreateRequest(
connection_id=connection_id,
job_type=JobTypeEnum.SYNC,
)
)
return res.job_response
except Exception as e:
raise AirflowException(e)

def cancel_job(self, job_id: int) -> Any:
"""
Cancel the job when task is cancelled.
:param job_id: Required. Id of the Airbyte job
"""
if self.api_type == "config":
return self.run(
endpoint=f"api/{self.api_version}/jobs/cancel",
json={"id": job_id},
headers={"accept": "application/json"},
)
else:
self.method = "DELETE"
conn = self.get_connection(self.http_conn_id)
return self.run(
endpoint=f"{self.api_version}/jobs/{job_id}",
headers={"accept": "application/json", "authorization": f"Bearer {conn.password}"},
try:
cancel_job_res = self.airbyte_api.jobs.cancel_job(
request=CancelJobRequest(
job_id=job_id,
)
)
return cancel_job_res.job_response
except Exception as e:
raise AirflowException(e)

def test_connection(self):
"""Tests the Airbyte connection by hitting the health API."""
self.method = "GET"
try:
res = self.run(
endpoint=f"api/{self.api_version}/health",
headers={"accept": "application/json"},
extra_options={"check_response": False},
)

if res.status_code == 200:
health_check = self.airbyte_api.health.get_health_check()
if health_check.status_code == 200:
return True, "Connection successfully tested"
else:
return False, res.text
return False, str(health_check.raw_response)
except Exception as e:
return False, str(e)
finally:
self.method = "POST"
Loading

0 comments on commit e4d83a2

Please sign in to comment.