Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First step of creating an integrated airbyte engine, targetting gsheets #288

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions records_mover/creds/base_creds.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# situation, let the session pick which backend we're using for
# creds based on out of band information, like how it was constructed
# or environment variables).
class BaseCreds():
class BaseCreds:
def __init__(self,
default_db_creds_name: Optional[str] = None,
default_aws_creds_name: Optional[str] = None,
Expand All @@ -48,15 +48,15 @@ def __init__(self,
default_gcs_client: Union[PleaseInfer,
'google.cloud.storage.Client',
None] = PleaseInfer.token,
default_airbyte_creds: Union[PleaseInfer,
Dict[str, Any],
None] = PleaseInfer.token,
scratch_s3_url: Union[PleaseInfer,
str,
None] = PleaseInfer.token,
scratch_gcs_url: Union[PleaseInfer,
str,
None] = PleaseInfer.token,
default_airbyte_creds: Union[PleaseInfer,
Dict[str, Any],
None] = PleaseInfer.token) -> None:
None] = PleaseInfer.token) -> None:
self._default_db_creds_name = default_db_creds_name
self._default_aws_creds_name = default_aws_creds_name
self._default_gcp_creds_name = default_gcp_creds_name
Expand All @@ -65,12 +65,11 @@ def __init__(self,
self.__default_gcs_creds = default_gcp_creds
self.__default_gcs_client = default_gcs_client
self.__default_boto3_session = default_boto3_session
self._default_airbyte_creds = default_airbyte_creds

self._scratch_s3_url = scratch_s3_url
self._scratch_gcs_url = scratch_gcs_url

self._default_airbyte_creds = default_airbyte_creds

def google_sheets(self, gcp_creds_name: str) -> 'google.auth.credentials.Credentials':
scopes = _GSHEETS_SCOPES
return self._gcp_creds(gcp_creds_name, scopes)
Expand Down Expand Up @@ -193,6 +192,14 @@ def default_db_facts(self) -> DBFacts:
self.__default_db_facts = self.db_facts(self._default_db_creds_name)
return self.__default_db_facts

def _infer_airbyte_creds(self) -> Dict[str, Any]:
raise NotImplementedError

def airbyte(self) -> Optional[Dict[str, Any]]:
if self._default_airbyte_creds is PleaseInfer.token:
self._default_airbyte_creds = self._infer_airbyte_creds()
return self._default_airbyte_creds

def _append_aws_username_to_bucket(self,
prefix: str,
boto3_session: 'boto3.session.Session') -> Optional[str]:
Expand Down Expand Up @@ -270,11 +277,3 @@ def default_scratch_gcs_url(self) -> Optional[str]:
if self._scratch_gcs_url is PleaseInfer.token:
self._scratch_gcs_url = self._infer_scratch_gcs_url()
return self._scratch_gcs_url

def _infer_airbyte_creds(self) -> Dict[str, Any]:
raise NotImplementedError

def airbyte(self) -> Optional[Dict[str, Any]]:
if self._default_airbyte_creds is PleaseInfer.token:
self._default_airbyte_creds = self._infer_airbyte_creds()
return self._default_airbyte_creds
2 changes: 2 additions & 0 deletions records_mover/records/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def build_parser() -> argparse.ArgumentParser:
if airbyte_feature_flag is not None:
parser.add_argument('-hc', '--healthcheck', action='store_true', required=False,
help='Returns health of the configured airbyte instance')
parser.add_argument('-a', '--airbyte', action='store_true', required=False,
help='Enables usage of the airbyte engine to run data movement')

subparsers = parser.add_subparsers(help='subcommand_help')
from records_mover import Session
Expand Down
6 changes: 6 additions & 0 deletions records_mover/records/job/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ def to_args(self) -> Dict[str, Any]:
if 'db_name' in kwargs:
# Already used to feed different args above
del kwargs['db_name']
# We'll have these and they are meaningless in this context so buhleted!
if 'airbyte' in kwargs:
del kwargs['airbyte']
if 'healthcheck' in kwargs:
del kwargs['healthcheck']

# for unit test coverage stability, this needs to be a list
# comprehension (which maintains the order of the kwargs keys)
# instead of subtracting possible_args from kwargs.keys(),
Expand Down
14 changes: 13 additions & 1 deletion records_mover/records/job/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,19 @@ def run_records_mover_job(source_method_name: str,
records = session.records
source = source_method(**source_kwargs)
target = target_method(**target_kwargs)
return records.move(source, target, processing_instructions)
# To be fancy, if airbyte fails, we fall back to the old method
try:
if 'airbyte' in config and config['airbyte']:
return records.move_via_airbyte(source, target, processing_instructions)
else:
run_legacy = True
except NotImplementedError as e:
logger.warning(f"""WARNING: This type of move not yet supported in air byte engine.
Falling back to legacy engine. Message: {e}""")
run_legacy = True
if run_legacy:
return records.move(source, target, processing_instructions)
return MoveResult(move_count=0, output_urls=None)
except Exception:
logger.error('', exc_info=True)
raise
40 changes: 26 additions & 14 deletions records_mover/records/mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
logger = logging.getLogger(__name__)


def move_via_airbyte(records_source: RecordsSource, records_target: RecordsTarget,
processing_instructions: ProcessingInstructions = ProcessingInstructions()) \
-> MoveResult:
if (isinstance(records_source, sources_base.SupportsAirbyte) and
isinstance(records_target, targets_base.SupportsAirbyte)):
raise NotImplementedError("Not yet! But for a different reason")
else:
raise NotImplementedError("Source and target must support airbyte")


def move(records_source: RecordsSource,
records_target: RecordsTarget,
processing_instructions: ProcessingInstructions = ProcessingInstructions()) -> MoveResult:
Expand Down Expand Up @@ -60,18 +70,19 @@ def move(records_source: RecordsSource,
# See documentation for RecordsSource in sources.py and
# RecordsTarget in targets.py for the semantics of the methods
# being called.
if (isinstance(records_source, sources_base.SupportsRecordsDirectory) and
isinstance(records_target, SupportsMoveFromRecordsDirectory) and
records_target.
can_move_directly_from_scheme(records_source.records_directory().loc.scheme) and
records_target.can_move_from_format(records_source.records_format)):
if (isinstance(records_source, sources_base.SupportsRecordsDirectory)
and isinstance(records_target, SupportsMoveFromRecordsDirectory)
and records_target.can_move_directly_from_scheme(
records_source.records_directory().loc.scheme
)
and records_target.can_move_from_format(records_source.records_format)):
# Tell the destination to load directly from wherever the
# source is, without needing to make any copies of the data or
# streaming it through the current box.
directory = records_source.records_directory()
logger.info(f"Mover: copying from {records_source} to {records_target} "
f"by moving from records directory in {directory}...")
return records_target.\
return records_target. \
move_from_records_directory(directory=directory,
override_records_format=records_source.records_format,
processing_instructions=processing_instructions)
Expand All @@ -98,7 +109,7 @@ def move(records_source: RecordsSource,
records_format = records_source.compatible_format(records_target)
assert records_format is not None # we checked compatibility above
records_target.pre_load_hook()
out = records_source.\
out = records_source. \
move_to_records_directory(records_directory=directory,
records_format=records_format,
processing_instructions=pi)
Expand All @@ -110,28 +121,29 @@ def move(records_source: RecordsSource,
directory = records_source.records_directory()
logger.info(f"Mover: copying from {records_source} to {records_target} "
"by loading from {directory.scheme} directory...")
return records_target.\
return records_target. \
move_from_records_directory(directory=directory,
override_records_format=records_source.records_format,
processing_instructions=processing_instructions)
elif isinstance(records_source, SupportsToFileobjsSource):
# Incompatible types in assignment (expression has type "Optional[Any]",
# variable has type "BaseRecordsFormat")
target_records_format: BaseRecordsFormat \
= getattr(records_target, "records_format", None) # type: ignore
= getattr(records_target, "records_format", None) # type: ignore
logger.info(f"Mover: copying from {records_source} to {records_target} "
f"by first writing {records_source} to {target_records_format} "
"records format (if easy to rewrite)...")
with records_source.\
with records_source. \
to_fileobjs_source(processing_instructions=processing_instructions,
records_format_if_possible=target_records_format)\
records_format_if_possible=target_records_format) \
as fileobjs_source:
return move(fileobjs_source, records_target, processing_instructions)
elif (isinstance(records_source, SupportsMoveToRecordsDirectory) and
isinstance(records_target, MightSupportMoveFromTempLocAfterFillingIt) and
records_source.has_compatible_format(records_target) and
records_source.
can_move_to_scheme(records_target.temporary_loadable_directory_scheme()) and
records_source.can_move_to_scheme(
records_target.temporary_loadable_directory_scheme()
) and
records_target.can_move_from_temp_loc_after_filling_it()):
logger.info(f"Mover: copying from {records_source} to {records_target} "
f"by filling in a temporary location...")
Expand All @@ -142,7 +154,7 @@ def move(records_source: RecordsSource,
logger.info(f"Mover: copying from {records_source} to {records_target} "
f"by converting to dataframe...")
with records_source.to_dataframes_source(processing_instructions) as dataframes_source:
return records_target.\
return records_target. \
move_from_dataframes_source(dfs_source=dataframes_source,
processing_instructions=processing_instructions)
elif (isinstance(records_source, SupportsToDataframesSource)):
Expand Down
6 changes: 5 additions & 1 deletion records_mover/records/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from ..url.resolver import UrlResolver
from .sources import RecordsSources
from .targets import RecordsTargets
from .mover import move
from .mover import move, move_via_airbyte
from enum import Enum
from typing import Callable, Union, Optional, TYPE_CHECKING
if TYPE_CHECKING:
Expand Down Expand Up @@ -59,6 +59,9 @@ class Records:
move: Callable
"Alias of :meth:`records_mover.records.move`"

move_via_airbyte: Callable
"Alias of :meth:`records_mover.records.move_via_airbyte`"

def __init__(self,
db_driver: Union[Callable[[Union['Engine', 'Connection', None],
Optional['Connection'],
Expand All @@ -76,6 +79,7 @@ def __init__(self,
if url_resolver is PleaseInfer.token:
url_resolver = session.url_resolver
self.move = move
self.move_via_airbyte = move_via_airbyte
self.sources = RecordsSources(db_driver=db_driver,
url_resolver=url_resolver)
self.targets = RecordsTargets(url_resolver=url_resolver,
Expand Down
6 changes: 6 additions & 0 deletions records_mover/records/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,9 @@ def to_dataframes_source(self,
-> Iterator['DataframesRecordsSource']:
"""Convert current source to a DataframeSource and present it in a context manager"""
pass


class SupportsAirbyte(RecordsSource, metaclass=ABCMeta):
@abstractmethod
def not_sure_yet(self):
yield None
8 changes: 6 additions & 2 deletions records_mover/records/sources/google_sheets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import SupportsToDataframesSource
from .base import SupportsToDataframesSource, SupportsAirbyte
from contextlib import contextmanager
import itertools
import google.auth.credentials
Expand All @@ -13,7 +13,8 @@
SheetsService = Any


class GoogleSheetsRecordsSource(SupportsToDataframesSource):
class GoogleSheetsRecordsSource(SupportsToDataframesSource, SupportsAirbyte):

def __init__(self,
spreadsheet_id: str,
sheet_name_or_range: str,
Expand Down Expand Up @@ -104,3 +105,6 @@ def to_dataframes_source(self,
)
yield DataframesRecordsSource(dfs=[df],
processing_instructions=processing_instructions)

def not_sure_yet(self):
pass
6 changes: 6 additions & 0 deletions records_mover/records/targets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,9 @@ def move_from_dataframes_source(self,
ProcessingInstructions provided.
"""
pass


class SupportsAirbyte(RecordsTarget, metaclass=ABCMeta):
@abstractmethod
def not_sure_yet(self) -> bool:
pass
7 changes: 5 additions & 2 deletions records_mover/records/targets/google_sheets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ...utils.retry import google_sheets_retry
from .base import SupportsMoveFromDataframes
from .base import SupportsMoveFromDataframes, SupportsAirbyte
import math
import google.auth.credentials
import numpy as np
Expand All @@ -19,7 +19,7 @@
CellData = Union[int, str, float]


class GoogleSheetsRecordsTarget(SupportsMoveFromDataframes):
class GoogleSheetsRecordsTarget(SupportsMoveFromDataframes, SupportsAirbyte):
def __init__(self,
spreadsheet_id: str,
sheet_name: str,
Expand Down Expand Up @@ -130,3 +130,6 @@ def move_from_dataframes_source(self,
self._naive_load_to_sheet(service, self.spreadsheet_id, self.sheet_name,
json_encodable_data)
return MoveResult(move_count=len(data), output_urls=None)

def not_sure_yet(self) -> bool:
return True
2 changes: 1 addition & 1 deletion records_mover/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def _infer_creds(session_type: str,
f"{session_type}.")


class Session():
class Session:
def __init__(self,
default_db_creds_name: Optional[str] = None,
default_aws_creds_name: Union[None, str, PleaseInfer] = PleaseInfer.token,
Expand Down
7 changes: 6 additions & 1 deletion records_mover/utils/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ def insert_into_dict(msg: Dict[str, Union[V, Dict[str, Any]]],
else:
if keys[0] not in msg:
msg[keys[0]] = {}
assert isinstance(msg[keys[0]], dict)
if not isinstance(msg[keys[0]], dict):
import logging
logger = logging.getLogger(__name__)
message = f"Type error; expected dict"
logger.error(message)
raise TypeError(message)
insert_into_dict(msg[keys[0]], keys[1:], value) # type: ignore
msg: Dict[str, Union[V, Dict[str, Any]]] = {}
for key, val in d.items():
Expand Down
Loading