Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Adds config_change_callback to Destinations and Sources #440

Merged
merged 11 commits into from
Nov 8, 2024
15 changes: 13 additions & 2 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

if TYPE_CHECKING:
import logging
from collections.abc import Generator
from collections.abc import Callable, Generator
from typing import IO

from airbyte._executors.base import Executor
Expand All @@ -56,13 +56,15 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any], int], None] | None = None,
*,
validate: bool = False,
) -> None:
"""Initialize the source.

If config is provided, it will be validated against the spec if validate is True.
"""
self.config_change_callback = config_change_callback
self.executor = executor
self._name = name
self._config_dict: dict[str, Any] | None = None
Expand Down Expand Up @@ -361,7 +363,8 @@ def _peek_airbyte_message(

This method handles reading Airbyte messages and taking action, if needed, based on the
message type. For instance, log messages are logged, records are tallied, and errors are
raised as exceptions if `raise_on_error` is True.
raised as exceptions if `raise_on_error` is True. If a config change message is received,
the config change callback is called.

Raises:
AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
Expand All @@ -380,6 +383,14 @@ def _peek_airbyte_message(
)
return

if (
message.type == "CONTROL"
and message.control.type == "CONNECTOR_CONFIG"
and self.config_change_callback is not None
):
self.config_change_callback(message.control.config, message.control.emitted_at)
tcboles marked this conversation as resolved.
Show resolved Hide resolved
return

def _execute(
self,
args: list[str],
Expand Down
4 changes: 4 additions & 0 deletions airbyte/destinations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@


if TYPE_CHECKING:
from collections.abc import Callable

from airbyte._executors.base import Executor
from airbyte.caches.base import CacheBase
from airbyte.shared.state_writers import StateWriterBase
Expand All @@ -48,6 +50,7 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any], int], None] | None = None,
*,
validate: bool = False,
) -> None:
Expand All @@ -59,6 +62,7 @@ def __init__(
executor=executor,
name=name,
config=config,
config_change_callback=config_change_callback,
validate=validate,
)

Expand Down
3 changes: 3 additions & 0 deletions airbyte/destinations/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@


if TYPE_CHECKING:
from collections.abc import Callable
from pathlib import Path


def get_destination(
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any], int], None] | None = None,
tcboles marked this conversation as resolved.
Show resolved Hide resolved
*,
version: str | None = None,
pip_url: str | None = None,
Expand Down Expand Up @@ -58,6 +60,7 @@ def get_destination(
return Destination(
name=name,
config=config,
config_change_callback=config_change_callback,
executor=get_connector_executor(
name=name,
version=version,
Expand Down
4 changes: 3 additions & 1 deletion airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@


if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator
from collections.abc import Callable, Generator, Iterable, Iterator

from airbyte_cdk import ConnectorSpecification
from airbyte_protocol.models import AirbyteStream
Expand All @@ -58,6 +58,7 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any], int], None] | None = None,
tcboles marked this conversation as resolved.
Show resolved Hide resolved
streams: str | list[str] | None = None,
*,
validate: bool = False,
Expand All @@ -73,6 +74,7 @@ def __init__(
executor=executor,
name=name,
config=config,
config_change_callback=config_change_callback,
validate=validate,
)
self._config_dict: dict[str, Any] | None = None
Expand Down
3 changes: 3 additions & 0 deletions airbyte/sources/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@


if TYPE_CHECKING:
from collections.abc import Callable
from pathlib import Path


Expand Down Expand Up @@ -45,6 +46,7 @@ def get_connector(
def get_source( # noqa: PLR0913 # Too many arguments
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any], int], None] | None = None,
tcboles marked this conversation as resolved.
Show resolved Hide resolved
*,
streams: str | list[str] | None = None,
version: str | None = None,
Expand Down Expand Up @@ -103,6 +105,7 @@ def get_source( # noqa: PLR0913 # Too many arguments
return Source(
name=name,
config=config,
config_change_callback=config_change_callback,
streams=streams,
executor=get_connector_executor(
name=name,
Expand Down
Loading