diff --git a/records_mover/creds/base_creds.py b/records_mover/creds/base_creds.py index 72d54a102..0a209c7e6 100644 --- a/records_mover/creds/base_creds.py +++ b/records_mover/creds/base_creds.py @@ -33,7 +33,7 @@ # situation, let the session pick which backend we're using for # creds based on out of band information, like how it was constructed # or environment variables). -class BaseCreds(): +class BaseCreds: def __init__(self, default_db_creds_name: Optional[str] = None, default_aws_creds_name: Optional[str] = None, @@ -48,15 +48,15 @@ def __init__(self, default_gcs_client: Union[PleaseInfer, 'google.cloud.storage.Client', None] = PleaseInfer.token, + default_airbyte_creds: Union[PleaseInfer, + Dict[str, Any], + None] = PleaseInfer.token, scratch_s3_url: Union[PleaseInfer, str, None] = PleaseInfer.token, scratch_gcs_url: Union[PleaseInfer, str, - None] = PleaseInfer.token, - default_airbyte_creds: Union[PleaseInfer, - Dict[str, Any], - None] = PleaseInfer.token) -> None: + None] = PleaseInfer.token) -> None: self._default_db_creds_name = default_db_creds_name self._default_aws_creds_name = default_aws_creds_name self._default_gcp_creds_name = default_gcp_creds_name @@ -65,12 +65,11 @@ def __init__(self, self.__default_gcs_creds = default_gcp_creds self.__default_gcs_client = default_gcs_client self.__default_boto3_session = default_boto3_session + self._default_airbyte_creds = default_airbyte_creds self._scratch_s3_url = scratch_s3_url self._scratch_gcs_url = scratch_gcs_url - self._default_airbyte_creds = default_airbyte_creds - def google_sheets(self, gcp_creds_name: str) -> 'google.auth.credentials.Credentials': scopes = _GSHEETS_SCOPES return self._gcp_creds(gcp_creds_name, scopes) @@ -193,6 +192,14 @@ def default_db_facts(self) -> DBFacts: self.__default_db_facts = self.db_facts(self._default_db_creds_name) return self.__default_db_facts + def _infer_airbyte_creds(self) -> Dict[str, Any]: + raise NotImplementedError + + def airbyte(self) -> Optional[Dict[str, Any]]: + if self._default_airbyte_creds is PleaseInfer.token: + self._default_airbyte_creds = self._infer_airbyte_creds() + return self._default_airbyte_creds + def _append_aws_username_to_bucket(self, prefix: str, boto3_session: 'boto3.session.Session') -> Optional[str]: @@ -270,11 +277,3 @@ def default_scratch_gcs_url(self) -> Optional[str]: if self._scratch_gcs_url is PleaseInfer.token: self._scratch_gcs_url = self._infer_scratch_gcs_url() return self._scratch_gcs_url - - def _infer_airbyte_creds(self) -> Dict[str, Any]: - raise NotImplementedError - - def airbyte(self) -> Optional[Dict[str, Any]]: - if self._default_airbyte_creds is PleaseInfer.token: - self._default_airbyte_creds = self._infer_airbyte_creds() - return self._default_airbyte_creds diff --git a/records_mover/records/cli.py b/records_mover/records/cli.py index 59fa011b0..b52abf0e7 100644 --- a/records_mover/records/cli.py +++ b/records_mover/records/cli.py @@ -90,6 +90,8 @@ def build_parser() -> argparse.ArgumentParser: if airbyte_feature_flag is not None: parser.add_argument('-hc', '--healthcheck', action='store_true', required=False, help='Returns health of the configured airbyte instance') + parser.add_argument('-a', '--airbyte', action='store_true', required=False, + help='Enables usage of the airbyte engine to run data movement') subparsers = parser.add_subparsers(help='subcommand_help') from records_mover import Session diff --git a/records_mover/records/job/config.py b/records_mover/records/job/config.py index 4e7c5eecc..0b7252b02 100644 --- a/records_mover/records/job/config.py +++ b/records_mover/records/job/config.py @@ -139,6 +139,12 @@ def to_args(self) -> Dict[str, Any]: if 'db_name' in kwargs: # Already used to feed different args above del kwargs['db_name'] + # We'll have these and they are meaningless in this context so buhleted! + if 'airbyte' in kwargs: + del kwargs['airbyte'] + if 'healthcheck' in kwargs: + del kwargs['healthcheck'] + # for unit test coverage stability, this needs to be a list # comprehension (which maintains the order of the kwargs keys) # instead of subtracting possible_args from kwargs.keys(), diff --git a/records_mover/records/job/mover.py b/records_mover/records/job/mover.py index b58d2da19..deab2dd19 100644 --- a/records_mover/records/job/mover.py +++ b/records_mover/records/job/mover.py @@ -34,7 +34,19 @@ def run_records_mover_job(source_method_name: str, records = session.records source = source_method(**source_kwargs) target = target_method(**target_kwargs) - return records.move(source, target, processing_instructions) + # To be fancy, if airbyte fails, we fall back to the old method + try: + if 'airbyte' in config and config['airbyte']: + return records.move_via_airbyte(source, target, processing_instructions) + else: + run_legacy = True + except NotImplementedError as e: + logger.warning(f"""WARNING: This type of move not yet supported in air byte engine. + Falling back to legacy engine. Message: {e}""") + run_legacy = True + if run_legacy: + return records.move(source, target, processing_instructions) + return MoveResult(move_count=0, output_urls=None) except Exception: logger.error('', exc_info=True) raise diff --git a/records_mover/records/mover.py b/records_mover/records/mover.py index 8829da9bc..c8e17d991 100644 --- a/records_mover/records/mover.py +++ b/records_mover/records/mover.py @@ -15,6 +15,16 @@ logger = logging.getLogger(__name__) +def move_via_airbyte(records_source: RecordsSource, records_target: RecordsTarget, + processing_instructions: ProcessingInstructions = ProcessingInstructions()) \ + -> MoveResult: + if (isinstance(records_source, sources_base.SupportsAirbyte) and + isinstance(records_target, targets_base.SupportsAirbyte)): + raise NotImplementedError("Not yet! But for a different reason") + else: + raise NotImplementedError("Source and target must support airbyte") + + def move(records_source: RecordsSource, records_target: RecordsTarget, processing_instructions: ProcessingInstructions = ProcessingInstructions()) -> MoveResult: @@ -60,18 +70,19 @@ def move(records_source: RecordsSource, # See documentation for RecordsSource in sources.py and # RecordsTarget in targets.py for the semantics of the methods # being called. - if (isinstance(records_source, sources_base.SupportsRecordsDirectory) and - isinstance(records_target, SupportsMoveFromRecordsDirectory) and - records_target. - can_move_directly_from_scheme(records_source.records_directory().loc.scheme) and - records_target.can_move_from_format(records_source.records_format)): + if (isinstance(records_source, sources_base.SupportsRecordsDirectory) + and isinstance(records_target, SupportsMoveFromRecordsDirectory) + and records_target.can_move_directly_from_scheme( + records_source.records_directory().loc.scheme + ) + and records_target.can_move_from_format(records_source.records_format)): # Tell the destination to load directly from wherever the # source is, without needing to make any copies of the data or # streaming it through the current box. directory = records_source.records_directory() logger.info(f"Mover: copying from {records_source} to {records_target} " f"by moving from records directory in {directory}...") - return records_target.\ + return records_target. \ move_from_records_directory(directory=directory, override_records_format=records_source.records_format, processing_instructions=processing_instructions) @@ -98,7 +109,7 @@ def move(records_source: RecordsSource, records_format = records_source.compatible_format(records_target) assert records_format is not None # we checked compatibility above records_target.pre_load_hook() - out = records_source.\ + out = records_source. \ move_to_records_directory(records_directory=directory, records_format=records_format, processing_instructions=pi) @@ -110,7 +121,7 @@ def move(records_source: RecordsSource, directory = records_source.records_directory() logger.info(f"Mover: copying from {records_source} to {records_target} " "by loading from {directory.scheme} directory...") - return records_target.\ + return records_target. \ move_from_records_directory(directory=directory, override_records_format=records_source.records_format, processing_instructions=processing_instructions) @@ -118,20 +129,21 @@ def move(records_source: RecordsSource, # Incompatible types in assignment (expression has type "Optional[Any]", # variable has type "BaseRecordsFormat") target_records_format: BaseRecordsFormat \ - = getattr(records_target, "records_format", None) # type: ignore + = getattr(records_target, "records_format", None) # type: ignore logger.info(f"Mover: copying from {records_source} to {records_target} " f"by first writing {records_source} to {target_records_format} " "records format (if easy to rewrite)...") - with records_source.\ + with records_source. \ to_fileobjs_source(processing_instructions=processing_instructions, - records_format_if_possible=target_records_format)\ + records_format_if_possible=target_records_format) \ as fileobjs_source: return move(fileobjs_source, records_target, processing_instructions) elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and records_source.has_compatible_format(records_target) and - records_source. - can_move_to_scheme(records_target.temporary_loadable_directory_scheme()) and + records_source.can_move_to_scheme( + records_target.temporary_loadable_directory_scheme() + ) and records_target.can_move_from_temp_loc_after_filling_it()): logger.info(f"Mover: copying from {records_source} to {records_target} " f"by filling in a temporary location...") @@ -142,7 +154,7 @@ def move(records_source: RecordsSource, logger.info(f"Mover: copying from {records_source} to {records_target} " f"by converting to dataframe...") with records_source.to_dataframes_source(processing_instructions) as dataframes_source: - return records_target.\ + return records_target. \ move_from_dataframes_source(dfs_source=dataframes_source, processing_instructions=processing_instructions) elif (isinstance(records_source, SupportsToDataframesSource)): diff --git a/records_mover/records/records.py b/records_mover/records/records.py index 190d03a25..5b420afe5 100644 --- a/records_mover/records/records.py +++ b/records_mover/records/records.py @@ -2,7 +2,7 @@ from ..url.resolver import UrlResolver from .sources import RecordsSources from .targets import RecordsTargets -from .mover import move +from .mover import move, move_via_airbyte from enum import Enum from typing import Callable, Union, Optional, TYPE_CHECKING if TYPE_CHECKING: @@ -59,6 +59,9 @@ class Records: move: Callable "Alias of :meth:`records_mover.records.move`" + move_via_airbyte: Callable + "Alias of :meth:`records_mover.records.move_via_airbyte`" + def __init__(self, db_driver: Union[Callable[[Union['Engine', 'Connection', None], Optional['Connection'], @@ -76,6 +79,7 @@ def __init__(self, if url_resolver is PleaseInfer.token: url_resolver = session.url_resolver self.move = move + self.move_via_airbyte = move_via_airbyte self.sources = RecordsSources(db_driver=db_driver, url_resolver=url_resolver) self.targets = RecordsTargets(url_resolver=url_resolver, diff --git a/records_mover/records/sources/base.py b/records_mover/records/sources/base.py index 6425617e5..8101f9872 100644 --- a/records_mover/records/sources/base.py +++ b/records_mover/records/sources/base.py @@ -146,3 +146,9 @@ def to_dataframes_source(self, -> Iterator['DataframesRecordsSource']: """Convert current source to a DataframeSource and present it in a context manager""" pass + + +class SupportsAirbyte(RecordsSource, metaclass=ABCMeta): + @abstractmethod + def not_sure_yet(self): + yield None diff --git a/records_mover/records/sources/google_sheets.py b/records_mover/records/sources/google_sheets.py index 498ed5866..715ce2d31 100644 --- a/records_mover/records/sources/google_sheets.py +++ b/records_mover/records/sources/google_sheets.py @@ -1,4 +1,4 @@ -from .base import SupportsToDataframesSource +from .base import SupportsToDataframesSource, SupportsAirbyte from contextlib import contextmanager import itertools import google.auth.credentials @@ -13,7 +13,8 @@ SheetsService = Any -class GoogleSheetsRecordsSource(SupportsToDataframesSource): +class GoogleSheetsRecordsSource(SupportsToDataframesSource, SupportsAirbyte): + def __init__(self, spreadsheet_id: str, sheet_name_or_range: str, @@ -104,3 +105,6 @@ def to_dataframes_source(self, ) yield DataframesRecordsSource(dfs=[df], processing_instructions=processing_instructions) + + def not_sure_yet(self): + pass diff --git a/records_mover/records/targets/base.py b/records_mover/records/targets/base.py index 200af67f7..6d113fb5d 100644 --- a/records_mover/records/targets/base.py +++ b/records_mover/records/targets/base.py @@ -164,3 +164,9 @@ def move_from_dataframes_source(self, ProcessingInstructions provided. """ pass + + +class SupportsAirbyte(RecordsTarget, metaclass=ABCMeta): + @abstractmethod + def not_sure_yet(self) -> bool: + pass diff --git a/records_mover/records/targets/google_sheets.py b/records_mover/records/targets/google_sheets.py index 2bc9acfd3..301b5e59e 100644 --- a/records_mover/records/targets/google_sheets.py +++ b/records_mover/records/targets/google_sheets.py @@ -1,5 +1,5 @@ from ...utils.retry import google_sheets_retry -from .base import SupportsMoveFromDataframes +from .base import SupportsMoveFromDataframes, SupportsAirbyte import math import google.auth.credentials import numpy as np @@ -19,7 +19,7 @@ CellData = Union[int, str, float] -class GoogleSheetsRecordsTarget(SupportsMoveFromDataframes): +class GoogleSheetsRecordsTarget(SupportsMoveFromDataframes, SupportsAirbyte): def __init__(self, spreadsheet_id: str, sheet_name: str, @@ -130,3 +130,6 @@ def move_from_dataframes_source(self, self._naive_load_to_sheet(service, self.spreadsheet_id, self.sheet_name, json_encodable_data) return MoveResult(move_count=len(data), output_urls=None) + + def not_sure_yet(self) -> bool: + return True diff --git a/records_mover/session.py b/records_mover/session.py index f54b8a303..8e46cde55 100644 --- a/records_mover/session.py +++ b/records_mover/session.py @@ -130,7 +130,7 @@ def _infer_creds(session_type: str, f"{session_type}.") -class Session(): +class Session: def __init__(self, default_db_creds_name: Optional[str] = None, default_aws_creds_name: Union[None, str, PleaseInfer] = PleaseInfer.token, diff --git a/records_mover/utils/structures.py b/records_mover/utils/structures.py index 4b242798d..e0a9454fa 100644 --- a/records_mover/utils/structures.py +++ b/records_mover/utils/structures.py @@ -22,7 +22,12 @@ def insert_into_dict(msg: Dict[str, Union[V, Dict[str, Any]]], else: if keys[0] not in msg: msg[keys[0]] = {} - assert isinstance(msg[keys[0]], dict) + if not isinstance(msg[keys[0]], dict): + import logging + logger = logging.getLogger(__name__) + message = f"Type error; expected dict" + logger.error(message) + raise TypeError(message) insert_into_dict(msg[keys[0]], keys[1:], value) # type: ignore msg: Dict[str, Union[V, Dict[str, Any]]] = {} for key, val in d.items():