From 115661e5a0736891027b224f5f08cb0d3513d406 Mon Sep 17 00:00:00 2001 From: naswierczek Date: Thu, 14 Mar 2024 17:08:00 -0400 Subject: [PATCH 1/5] First step of creating an integrated airbyte engine, targetting gsheets - Establishes that sources and targets can support airbyte independently. - This currently adds a new method at the top level. THEORETICALLY we could DEFAULT to the SupportsAirbyte but I think until we can remove the feature flag, we should keep the method signature as clean as possible --- records_mover/creds/base_creds.py | 29 ++++++++-------- records_mover/records/cli.py | 2 ++ records_mover/records/job/config.py | 6 ++++ records_mover/records/job/mover.py | 15 +++++++- records_mover/records/mover.py | 34 ++++++++++++------- records_mover/records/records.py | 6 +++- records_mover/records/sources/base.py | 7 ++++ .../records/sources/google_sheets.py | 8 +++-- records_mover/records/targets/base.py | 6 ++++ .../records/targets/google_sheets.py | 7 ++-- records_mover/session.py | 2 +- records_mover/utils/structures.py | 7 +++- 12 files changed, 94 insertions(+), 35 deletions(-) diff --git a/records_mover/creds/base_creds.py b/records_mover/creds/base_creds.py index 72d54a102..0a209c7e6 100644 --- a/records_mover/creds/base_creds.py +++ b/records_mover/creds/base_creds.py @@ -33,7 +33,7 @@ # situation, let the session pick which backend we're using for # creds based on out of band information, like how it was constructed # or environment variables). -class BaseCreds(): +class BaseCreds: def __init__(self, default_db_creds_name: Optional[str] = None, default_aws_creds_name: Optional[str] = None, @@ -48,15 +48,15 @@ def __init__(self, default_gcs_client: Union[PleaseInfer, 'google.cloud.storage.Client', None] = PleaseInfer.token, + default_airbyte_creds: Union[PleaseInfer, + Dict[str, Any], + None] = PleaseInfer.token, scratch_s3_url: Union[PleaseInfer, str, None] = PleaseInfer.token, scratch_gcs_url: Union[PleaseInfer, str, - None] = PleaseInfer.token, - default_airbyte_creds: Union[PleaseInfer, - Dict[str, Any], - None] = PleaseInfer.token) -> None: + None] = PleaseInfer.token) -> None: self._default_db_creds_name = default_db_creds_name self._default_aws_creds_name = default_aws_creds_name self._default_gcp_creds_name = default_gcp_creds_name @@ -65,12 +65,11 @@ def __init__(self, self.__default_gcs_creds = default_gcp_creds self.__default_gcs_client = default_gcs_client self.__default_boto3_session = default_boto3_session + self._default_airbyte_creds = default_airbyte_creds self._scratch_s3_url = scratch_s3_url self._scratch_gcs_url = scratch_gcs_url - self._default_airbyte_creds = default_airbyte_creds - def google_sheets(self, gcp_creds_name: str) -> 'google.auth.credentials.Credentials': scopes = _GSHEETS_SCOPES return self._gcp_creds(gcp_creds_name, scopes) @@ -193,6 +192,14 @@ def default_db_facts(self) -> DBFacts: self.__default_db_facts = self.db_facts(self._default_db_creds_name) return self.__default_db_facts + def _infer_airbyte_creds(self) -> Dict[str, Any]: + raise NotImplementedError + + def airbyte(self) -> Optional[Dict[str, Any]]: + if self._default_airbyte_creds is PleaseInfer.token: + self._default_airbyte_creds = self._infer_airbyte_creds() + return self._default_airbyte_creds + def _append_aws_username_to_bucket(self, prefix: str, boto3_session: 'boto3.session.Session') -> Optional[str]: @@ -270,11 +277,3 @@ def default_scratch_gcs_url(self) -> Optional[str]: if self._scratch_gcs_url is PleaseInfer.token: self._scratch_gcs_url = self._infer_scratch_gcs_url() return self._scratch_gcs_url - - def _infer_airbyte_creds(self) -> Dict[str, Any]: - raise NotImplementedError - - def airbyte(self) -> Optional[Dict[str, Any]]: - if self._default_airbyte_creds is PleaseInfer.token: - self._default_airbyte_creds = self._infer_airbyte_creds() - return self._default_airbyte_creds diff --git a/records_mover/records/cli.py b/records_mover/records/cli.py index 59fa011b0..b52abf0e7 100644 --- a/records_mover/records/cli.py +++ b/records_mover/records/cli.py @@ -90,6 +90,8 @@ def build_parser() -> argparse.ArgumentParser: if airbyte_feature_flag is not None: parser.add_argument('-hc', '--healthcheck', action='store_true', required=False, help='Returns health of the configured airbyte instance') + parser.add_argument('-a', '--airbyte', action='store_true', required=False, + help='Enables usage of the airbyte engine to run data movement') subparsers = parser.add_subparsers(help='subcommand_help') from records_mover import Session diff --git a/records_mover/records/job/config.py b/records_mover/records/job/config.py index 4e7c5eecc..0b7252b02 100644 --- a/records_mover/records/job/config.py +++ b/records_mover/records/job/config.py @@ -139,6 +139,12 @@ def to_args(self) -> Dict[str, Any]: if 'db_name' in kwargs: # Already used to feed different args above del kwargs['db_name'] + # We'll have these and they are meaningless in this context so buhleted! + if 'airbyte' in kwargs: + del kwargs['airbyte'] + if 'healthcheck' in kwargs: + del kwargs['healthcheck'] + # for unit test coverage stability, this needs to be a list # comprehension (which maintains the order of the kwargs keys) # instead of subtracting possible_args from kwargs.keys(), diff --git a/records_mover/records/job/mover.py b/records_mover/records/job/mover.py index b58d2da19..6e45a7ca8 100644 --- a/records_mover/records/job/mover.py +++ b/records_mover/records/job/mover.py @@ -1,4 +1,6 @@ """Create and run jobs to convert between different sources and targets""" +import sys + from records_mover import Session from ..processing_instructions import ProcessingInstructions from ..results import MoveResult @@ -34,7 +36,18 @@ def run_records_mover_job(source_method_name: str, records = session.records source = source_method(**source_kwargs) target = target_method(**target_kwargs) - return records.move(source, target, processing_instructions) + # To be fancy, if airbyte fails, we fall back to the old method + try: + if 'airbyte' in config and config['airbyte']: + return records.move_via_airbyte(source, target, processing_instructions) + else: + run_legacy = True + except NotImplementedError as e: + logger.warning(f"""WARNING: This type of move not yet supported in air byte engine. + Falling back to legacy engine. Message: {e}""") + run_legacy = True + if run_legacy: + return records.move(source, target, processing_instructions) except Exception: logger.error('', exc_info=True) raise diff --git a/records_mover/records/mover.py b/records_mover/records/mover.py index 8829da9bc..d88d971da 100644 --- a/records_mover/records/mover.py +++ b/records_mover/records/mover.py @@ -15,6 +15,16 @@ logger = logging.getLogger(__name__) +def move_via_airbyte(records_source: RecordsSource, records_target: RecordsTarget, + processing_instructions: ProcessingInstructions = ProcessingInstructions()) \ + -> MoveResult: + if (isinstance(records_source, sources_base.SupportsAirbyte) and + isinstance(records_target, targets_base.SupportsAirbyte)): + raise NotImplementedError("Not yet! But for a different reason") + else: + raise NotImplementedError("Source and target must support airbyte") + + def move(records_source: RecordsSource, records_target: RecordsTarget, processing_instructions: ProcessingInstructions = ProcessingInstructions()) -> MoveResult: @@ -61,17 +71,17 @@ def move(records_source: RecordsSource, # RecordsTarget in targets.py for the semantics of the methods # being called. if (isinstance(records_source, sources_base.SupportsRecordsDirectory) and - isinstance(records_target, SupportsMoveFromRecordsDirectory) and - records_target. - can_move_directly_from_scheme(records_source.records_directory().loc.scheme) and - records_target.can_move_from_format(records_source.records_format)): + isinstance(records_target, SupportsMoveFromRecordsDirectory) and + records_target. + can_move_directly_from_scheme(records_source.records_directory().loc.scheme) and + records_target.can_move_from_format(records_source.records_format)): # Tell the destination to load directly from wherever the # source is, without needing to make any copies of the data or # streaming it through the current box. directory = records_source.records_directory() logger.info(f"Mover: copying from {records_source} to {records_target} " f"by moving from records directory in {directory}...") - return records_target.\ + return records_target. \ move_from_records_directory(directory=directory, override_records_format=records_source.records_format, processing_instructions=processing_instructions) @@ -98,7 +108,7 @@ def move(records_source: RecordsSource, records_format = records_source.compatible_format(records_target) assert records_format is not None # we checked compatibility above records_target.pre_load_hook() - out = records_source.\ + out = records_source. \ move_to_records_directory(records_directory=directory, records_format=records_format, processing_instructions=pi) @@ -110,7 +120,7 @@ def move(records_source: RecordsSource, directory = records_source.records_directory() logger.info(f"Mover: copying from {records_source} to {records_target} " "by loading from {directory.scheme} directory...") - return records_target.\ + return records_target. \ move_from_records_directory(directory=directory, override_records_format=records_source.records_format, processing_instructions=processing_instructions) @@ -118,20 +128,20 @@ def move(records_source: RecordsSource, # Incompatible types in assignment (expression has type "Optional[Any]", # variable has type "BaseRecordsFormat") target_records_format: BaseRecordsFormat \ - = getattr(records_target, "records_format", None) # type: ignore + = getattr(records_target, "records_format", None) # type: ignore logger.info(f"Mover: copying from {records_source} to {records_target} " f"by first writing {records_source} to {target_records_format} " "records format (if easy to rewrite)...") - with records_source.\ + with records_source. \ to_fileobjs_source(processing_instructions=processing_instructions, - records_format_if_possible=target_records_format)\ + records_format_if_possible=target_records_format) \ as fileobjs_source: return move(fileobjs_source, records_target, processing_instructions) elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and records_source.has_compatible_format(records_target) and records_source. - can_move_to_scheme(records_target.temporary_loadable_directory_scheme()) and + can_move_to_scheme(records_target.temporary_loadable_directory_scheme()) and records_target.can_move_from_temp_loc_after_filling_it()): logger.info(f"Mover: copying from {records_source} to {records_target} " f"by filling in a temporary location...") @@ -142,7 +152,7 @@ def move(records_source: RecordsSource, logger.info(f"Mover: copying from {records_source} to {records_target} " f"by converting to dataframe...") with records_source.to_dataframes_source(processing_instructions) as dataframes_source: - return records_target.\ + return records_target. \ move_from_dataframes_source(dfs_source=dataframes_source, processing_instructions=processing_instructions) elif (isinstance(records_source, SupportsToDataframesSource)): diff --git a/records_mover/records/records.py b/records_mover/records/records.py index 190d03a25..5b420afe5 100644 --- a/records_mover/records/records.py +++ b/records_mover/records/records.py @@ -2,7 +2,7 @@ from ..url.resolver import UrlResolver from .sources import RecordsSources from .targets import RecordsTargets -from .mover import move +from .mover import move, move_via_airbyte from enum import Enum from typing import Callable, Union, Optional, TYPE_CHECKING if TYPE_CHECKING: @@ -59,6 +59,9 @@ class Records: move: Callable "Alias of :meth:`records_mover.records.move`" + move_via_airbyte: Callable + "Alias of :meth:`records_mover.records.move_via_airbyte`" + def __init__(self, db_driver: Union[Callable[[Union['Engine', 'Connection', None], Optional['Connection'], @@ -76,6 +79,7 @@ def __init__(self, if url_resolver is PleaseInfer.token: url_resolver = session.url_resolver self.move = move + self.move_via_airbyte = move_via_airbyte self.sources = RecordsSources(db_driver=db_driver, url_resolver=url_resolver) self.targets = RecordsTargets(url_resolver=url_resolver, diff --git a/records_mover/records/sources/base.py b/records_mover/records/sources/base.py index 6425617e5..aebd71d6f 100644 --- a/records_mover/records/sources/base.py +++ b/records_mover/records/sources/base.py @@ -146,3 +146,10 @@ def to_dataframes_source(self, -> Iterator['DataframesRecordsSource']: """Convert current source to a DataframeSource and present it in a context manager""" pass + + +class SupportsAirbyte(RecordsSource, metaclass=ABCMeta): + @abstractmethod + @contextmanager + def not_sure_yet(self) -> Iterator['FileobjsSource']: + yield FileobjsSource() \ No newline at end of file diff --git a/records_mover/records/sources/google_sheets.py b/records_mover/records/sources/google_sheets.py index 498ed5866..a257c0cdf 100644 --- a/records_mover/records/sources/google_sheets.py +++ b/records_mover/records/sources/google_sheets.py @@ -1,4 +1,4 @@ -from .base import SupportsToDataframesSource +from .base import SupportsToDataframesSource, SupportsAirbyte from contextlib import contextmanager import itertools import google.auth.credentials @@ -13,7 +13,8 @@ SheetsService = Any -class GoogleSheetsRecordsSource(SupportsToDataframesSource): +class GoogleSheetsRecordsSource(SupportsToDataframesSource, SupportsAirbyte): + def __init__(self, spreadsheet_id: str, sheet_name_or_range: str, @@ -104,3 +105,6 @@ def to_dataframes_source(self, ) yield DataframesRecordsSource(dfs=[df], processing_instructions=processing_instructions) + + def not_sure_yet(self) -> Iterator['FileobjsSource']: + pass diff --git a/records_mover/records/targets/base.py b/records_mover/records/targets/base.py index 200af67f7..6d113fb5d 100644 --- a/records_mover/records/targets/base.py +++ b/records_mover/records/targets/base.py @@ -164,3 +164,9 @@ def move_from_dataframes_source(self, ProcessingInstructions provided. """ pass + + +class SupportsAirbyte(RecordsTarget, metaclass=ABCMeta): + @abstractmethod + def not_sure_yet(self) -> bool: + pass diff --git a/records_mover/records/targets/google_sheets.py b/records_mover/records/targets/google_sheets.py index 2bc9acfd3..301b5e59e 100644 --- a/records_mover/records/targets/google_sheets.py +++ b/records_mover/records/targets/google_sheets.py @@ -1,5 +1,5 @@ from ...utils.retry import google_sheets_retry -from .base import SupportsMoveFromDataframes +from .base import SupportsMoveFromDataframes, SupportsAirbyte import math import google.auth.credentials import numpy as np @@ -19,7 +19,7 @@ CellData = Union[int, str, float] -class GoogleSheetsRecordsTarget(SupportsMoveFromDataframes): +class GoogleSheetsRecordsTarget(SupportsMoveFromDataframes, SupportsAirbyte): def __init__(self, spreadsheet_id: str, sheet_name: str, @@ -130,3 +130,6 @@ def move_from_dataframes_source(self, self._naive_load_to_sheet(service, self.spreadsheet_id, self.sheet_name, json_encodable_data) return MoveResult(move_count=len(data), output_urls=None) + + def not_sure_yet(self) -> bool: + return True diff --git a/records_mover/session.py b/records_mover/session.py index f54b8a303..8e46cde55 100644 --- a/records_mover/session.py +++ b/records_mover/session.py @@ -130,7 +130,7 @@ def _infer_creds(session_type: str, f"{session_type}.") -class Session(): +class Session: def __init__(self, default_db_creds_name: Optional[str] = None, default_aws_creds_name: Union[None, str, PleaseInfer] = PleaseInfer.token, diff --git a/records_mover/utils/structures.py b/records_mover/utils/structures.py index 4b242798d..04c1f8550 100644 --- a/records_mover/utils/structures.py +++ b/records_mover/utils/structures.py @@ -22,7 +22,12 @@ def insert_into_dict(msg: Dict[str, Union[V, Dict[str, Any]]], else: if keys[0] not in msg: msg[keys[0]] = {} - assert isinstance(msg[keys[0]], dict) + if not isinstance(msg[keys[0]], dict): + import logging + logger = logging.getLogger(__name__) + message = f"Type error; expected dict, got {msg[keys[0]].type()}" + logger.error(message) + raise TypeError(message) insert_into_dict(msg[keys[0]], keys[1:], value) # type: ignore msg: Dict[str, Union[V, Dict[str, Any]]] = {} for key, val in d.items(): From 159a6251fdb4ed50f9d296d7bf6791c26f8be224 Mon Sep 17 00:00:00 2001 From: naswierczek Date: Fri, 15 Mar 2024 12:32:34 -0400 Subject: [PATCH 2/5] Addressing quality --- records_mover/records/job/mover.py | 4 +--- records_mover/records/mover.py | 16 +++++++++------- records_mover/records/sources/base.py | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/records_mover/records/job/mover.py b/records_mover/records/job/mover.py index 6e45a7ca8..0727c6b63 100644 --- a/records_mover/records/job/mover.py +++ b/records_mover/records/job/mover.py @@ -1,6 +1,4 @@ """Create and run jobs to convert between different sources and targets""" -import sys - from records_mover import Session from ..processing_instructions import ProcessingInstructions from ..results import MoveResult @@ -43,7 +41,7 @@ def run_records_mover_job(source_method_name: str, else: run_legacy = True except NotImplementedError as e: - logger.warning(f"""WARNING: This type of move not yet supported in air byte engine. + logger.warning(f"""WARNING: This type of move not yet supported in air byte engine. Falling back to legacy engine. Message: {e}""") run_legacy = True if run_legacy: diff --git a/records_mover/records/mover.py b/records_mover/records/mover.py index d88d971da..c8e17d991 100644 --- a/records_mover/records/mover.py +++ b/records_mover/records/mover.py @@ -70,11 +70,12 @@ def move(records_source: RecordsSource, # See documentation for RecordsSource in sources.py and # RecordsTarget in targets.py for the semantics of the methods # being called. - if (isinstance(records_source, sources_base.SupportsRecordsDirectory) and - isinstance(records_target, SupportsMoveFromRecordsDirectory) and - records_target. - can_move_directly_from_scheme(records_source.records_directory().loc.scheme) and - records_target.can_move_from_format(records_source.records_format)): + if (isinstance(records_source, sources_base.SupportsRecordsDirectory) + and isinstance(records_target, SupportsMoveFromRecordsDirectory) + and records_target.can_move_directly_from_scheme( + records_source.records_directory().loc.scheme + ) + and records_target.can_move_from_format(records_source.records_format)): # Tell the destination to load directly from wherever the # source is, without needing to make any copies of the data or # streaming it through the current box. @@ -140,8 +141,9 @@ def move(records_source: RecordsSource, elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and records_source.has_compatible_format(records_target) and - records_source. - can_move_to_scheme(records_target.temporary_loadable_directory_scheme()) and + records_source.can_move_to_scheme( + records_target.temporary_loadable_directory_scheme() + ) and records_target.can_move_from_temp_loc_after_filling_it()): logger.info(f"Mover: copying from {records_source} to {records_target} " f"by filling in a temporary location...") diff --git a/records_mover/records/sources/base.py b/records_mover/records/sources/base.py index aebd71d6f..ba6e49fb5 100644 --- a/records_mover/records/sources/base.py +++ b/records_mover/records/sources/base.py @@ -152,4 +152,4 @@ class SupportsAirbyte(RecordsSource, metaclass=ABCMeta): @abstractmethod @contextmanager def not_sure_yet(self) -> Iterator['FileobjsSource']: - yield FileobjsSource() \ No newline at end of file + yield FileobjsSource() From 6101ec664167cececdc06c9b2aaab36fe2f98a43 Mon Sep 17 00:00:00 2001 From: naswierczek Date: Fri, 15 Mar 2024 12:55:31 -0400 Subject: [PATCH 3/5] Fixing tests --- records_mover/records/job/mover.py | 1 + records_mover/records/sources/base.py | 2 +- records_mover/utils/structures.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/records_mover/records/job/mover.py b/records_mover/records/job/mover.py index 0727c6b63..df9ee3898 100644 --- a/records_mover/records/job/mover.py +++ b/records_mover/records/job/mover.py @@ -46,6 +46,7 @@ def run_records_mover_job(source_method_name: str, run_legacy = True if run_legacy: return records.move(source, target, processing_instructions) + return None except Exception: logger.error('', exc_info=True) raise diff --git a/records_mover/records/sources/base.py b/records_mover/records/sources/base.py index ba6e49fb5..5168d2ffe 100644 --- a/records_mover/records/sources/base.py +++ b/records_mover/records/sources/base.py @@ -152,4 +152,4 @@ class SupportsAirbyte(RecordsSource, metaclass=ABCMeta): @abstractmethod @contextmanager def not_sure_yet(self) -> Iterator['FileobjsSource']: - yield FileobjsSource() + yield None diff --git a/records_mover/utils/structures.py b/records_mover/utils/structures.py index 04c1f8550..e0a9454fa 100644 --- a/records_mover/utils/structures.py +++ b/records_mover/utils/structures.py @@ -25,7 +25,7 @@ def insert_into_dict(msg: Dict[str, Union[V, Dict[str, Any]]], if not isinstance(msg[keys[0]], dict): import logging logger = logging.getLogger(__name__) - message = f"Type error; expected dict, got {msg[keys[0]].type()}" + message = f"Type error; expected dict" logger.error(message) raise TypeError(message) insert_into_dict(msg[keys[0]], keys[1:], value) # type: ignore From 62c64d61fbeca5c3f2a1c4afd6e0f5c35cadf0ed Mon Sep 17 00:00:00 2001 From: naswierczek Date: Fri, 15 Mar 2024 13:05:32 -0400 Subject: [PATCH 4/5] More test fixing :P --- records_mover/records/job/mover.py | 2 +- records_mover/records/sources/base.py | 3 +-- records_mover/records/sources/google_sheets.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/records_mover/records/job/mover.py b/records_mover/records/job/mover.py index df9ee3898..ea6dc303d 100644 --- a/records_mover/records/job/mover.py +++ b/records_mover/records/job/mover.py @@ -46,7 +46,7 @@ def run_records_mover_job(source_method_name: str, run_legacy = True if run_legacy: return records.move(source, target, processing_instructions) - return None + return MoveResult() except Exception: logger.error('', exc_info=True) raise diff --git a/records_mover/records/sources/base.py b/records_mover/records/sources/base.py index 5168d2ffe..8101f9872 100644 --- a/records_mover/records/sources/base.py +++ b/records_mover/records/sources/base.py @@ -150,6 +150,5 @@ def to_dataframes_source(self, class SupportsAirbyte(RecordsSource, metaclass=ABCMeta): @abstractmethod - @contextmanager - def not_sure_yet(self) -> Iterator['FileobjsSource']: + def not_sure_yet(self): yield None diff --git a/records_mover/records/sources/google_sheets.py b/records_mover/records/sources/google_sheets.py index a257c0cdf..715ce2d31 100644 --- a/records_mover/records/sources/google_sheets.py +++ b/records_mover/records/sources/google_sheets.py @@ -106,5 +106,5 @@ def to_dataframes_source(self, yield DataframesRecordsSource(dfs=[df], processing_instructions=processing_instructions) - def not_sure_yet(self) -> Iterator['FileobjsSource']: + def not_sure_yet(self): pass From b21afb3c7cdddad6be34ad7f129ed247c169ec7b Mon Sep 17 00:00:00 2001 From: naswierczek Date: Fri, 15 Mar 2024 13:10:02 -0400 Subject: [PATCH 5/5] Moar test fix --- records_mover/records/job/mover.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/records_mover/records/job/mover.py b/records_mover/records/job/mover.py index ea6dc303d..deab2dd19 100644 --- a/records_mover/records/job/mover.py +++ b/records_mover/records/job/mover.py @@ -46,7 +46,7 @@ def run_records_mover_job(source_method_name: str, run_legacy = True if run_legacy: return records.move(source, target, processing_instructions) - return MoveResult() + return MoveResult(move_count=0, output_urls=None) except Exception: logger.error('', exc_info=True) raise