Skip to content

Commit

Permalink
Sqlalchemy destination (#1734)
Browse files Browse the repository at this point in the history
* Implement sqlalchemy loader

Begin implementing sqlalchemy loader

SQLA load job, factory, schema storage, POC

sqlalchemy tests attempt

Implement SqlJobClient interface

Parquet load, some tests running on mysql

update lockfile

Limit bulk insert chunk size, sqlite create/drop schema, fixes

Generate schema update

Get more tests running with mysql

More tests passing

Fix state, schema restore

* Support destination name in tests

* Some job client/sql client tests running on sqlite

* Fix more tests

* ALl sqlite tests passing

* Add sqlalchemy tests in ci

* Type errors

* Test sqlalchemy in own workflow

* Fix tests, type errors

* Fix config

* CI fix

* Add alembic to handle ALTER TABLE

* FIx workflow

* Install mysqlclient in venv

* Mysql service version

* Single fail

* mysql healtcheck

* No localhost

* Remove weaviate

* Change ubuntu version

* Debug sqlite version

* Revert

* Use py datetime in tests

* Test on sqlalchemy 1.4 and 2

remove secrets toml

remove secrets toml

Revert "remove secrets toml"

This reverts commit 7dd189c.

Fix default pipeline name test

* Lint, no cli tests

* Update lockfile

* Fix test, complex -> json

* Refactor type mapper

* Update tests destination config

* Fix tests

* Ignore sources tests

* Fix overriding destination in test pipeline

* Fix time precision in arrow test

* Lint

* Fix destination setup in test

* Fix

* Use nullpool, lazy create engine, close current connection
  • Loading branch information
steinitzu authored Sep 14, 2024
1 parent 325d927 commit 9580baf
Show file tree
Hide file tree
Showing 40 changed files with 1,915 additions and 287 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ jobs:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data
DESTINATION__QDRANT__CREDENTIALS__location: http://localhost:6333


- name: Stop weaviate
if: always()
run: docker compose -f ".github/weaviate-compose.yml" down -v
99 changes: 99 additions & 0 deletions .github/workflows/test_sqlalchemy_destinations.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Tests destinations that can run without credentials.
# i.e. local postgres, duckdb, filesystem (with local fs/memory bucket)

name: dest | sqlalchemy mysql and sqlite

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
# NOTE: this workflow can't use github secrets!
# DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"sqlalchemy\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml

run_loader:
name: dest | sqlalchemy mysql and sqlite
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
# Run on sqlalchemy 1.4 and 2.0
matrix:
sqlalchemy: [1.4, 2]
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

# Service containers to run with `container-job`
services:
# Label used to access the service container
mysql:
image: mysql:8
env:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: dlt_data
MYSQL_USER: loader
MYSQL_PASSWORD: loader
ports:
- 3306:3306
# Wait for the service to be ready before completing the job
options: >-
--health-cmd="mysqladmin ping -h localhost -u root -proot"
--health-interval=10s
--health-timeout=5s
--health-retries=5
steps:
- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E parquet -E filesystem -E sqlalchemy -E cli --with sentry-sdk --with pipeline && poetry run pip install mysqlclient && poetry run pip install "sqlalchemy==${{ matrix.sqlalchemy }}"

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

# always run full suite, also on branches
- run: poetry run pytest tests/load -x --ignore tests/load/sources
name: Run tests Linux
env:
DESTINATION__SQLALCHEMY_MYSQL__CREDENTIALS: mysql://root:[email protected]:3306/dlt_data # Use root cause we need to create databases
DESTINATION__SQLALCHEMY_SQLITE__CREDENTIALS: sqlite:///_storage/dl_data.sqlite
10 changes: 8 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None
"""The destination can override the parallelism strategy"""

max_query_parameters: Optional[int] = None
"""The maximum number of parameters that can be supplied in a single parametrized query"""

supports_native_boolean: bool = True
"""The destination supports a native boolean type, otherwise bool columns are usually stored as integers"""

def generates_case_sensitive_identifiers(self) -> bool:
"""Tells if capabilities as currently adjusted, will generate case sensitive identifiers"""
# must have case sensitive support and folding function must preserve casing
Expand Down Expand Up @@ -220,8 +226,8 @@ def generic_capabilities(
caps.merge_strategies_selector = merge_strategies_selector
return caps

def get_type_mapper(self) -> DataTypeMapper:
return self.type_mapper(self)
def get_type_mapper(self, *args: Any, **kwargs: Any) -> DataTypeMapper:
return self.type_mapper(self, *args, **kwargs)


def merge_caps_file_formats(
Expand Down
16 changes: 15 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ def from_normalized_mapping(
schema=normalized_doc[naming_convention.normalize_identifier("schema")],
)

def to_normalized_mapping(self, naming_convention: NamingConvention) -> Dict[str, Any]:
"""Convert this instance to mapping where keys are normalized according to given naming convention
Args:
naming_convention: Naming convention that should be used to normalize keys
Returns:
Dict[str, Any]: Mapping with normalized keys (e.g. {Version: ..., SchemaName: ...})
"""
return {
naming_convention.normalize_identifier(key): value
for key, value in self._asdict().items()
}


@dataclasses.dataclass
class StateInfo:
Expand Down Expand Up @@ -439,7 +453,7 @@ def __init__(
self.capabilities = capabilities

@abstractmethod
def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
def initialize_storage(self, truncate_tables: Optional[Iterable[str]] = None) -> None:
"""Prepares storage to be used ie. creates database schema or file system folder. Truncates requested tables."""
pass

Expand Down
83 changes: 48 additions & 35 deletions dlt/common/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,7 @@
from dlt.common.utils import map_nested_in_place


class SupportsJson(Protocol):
"""Minimum adapter for different json parser implementations"""

_impl_name: str
"""Implementation name"""

def dump(
self, obj: Any, fp: IO[bytes], sort_keys: bool = False, pretty: bool = False
) -> None: ...

def typed_dump(self, obj: Any, fp: IO[bytes], pretty: bool = False) -> None: ...

def typed_dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ...

def typed_loads(self, s: str) -> Any: ...

def typed_dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ...

def typed_loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: ...

def dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ...

def dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ...

def load(self, fp: Union[IO[bytes], IO[str]]) -> Any: ...

def loads(self, s: str) -> Any: ...

def loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: ...
TPuaDecoders = List[Callable[[Any], Any]]


def custom_encode(obj: Any) -> str:
Expand Down Expand Up @@ -104,7 +76,7 @@ def _datetime_decoder(obj: str) -> datetime:


# define decoder for each prefix
DECODERS: List[Callable[[Any], Any]] = [
DECODERS: TPuaDecoders = [
Decimal,
_datetime_decoder,
pendulum.Date.fromisoformat,
Expand All @@ -114,6 +86,11 @@ def _datetime_decoder(obj: str) -> datetime:
Wei,
pendulum.Time.fromisoformat,
]
# Alternate decoders that decode date/time/datetime to stdlib types instead of pendulum
PY_DATETIME_DECODERS = list(DECODERS)
PY_DATETIME_DECODERS[1] = datetime.fromisoformat
PY_DATETIME_DECODERS[2] = date.fromisoformat
PY_DATETIME_DECODERS[7] = time.fromisoformat
# how many decoders?
PUA_CHARACTER_MAX = len(DECODERS)

Expand Down Expand Up @@ -151,13 +128,13 @@ def custom_pua_encode(obj: Any) -> str:
raise TypeError(repr(obj) + " is not JSON serializable")


def custom_pua_decode(obj: Any) -> Any:
def custom_pua_decode(obj: Any, decoders: TPuaDecoders = DECODERS) -> Any:
if isinstance(obj, str) and len(obj) > 1:
c = ord(obj[0]) - PUA_START
# decode only the PUA space defined in DECODERS
if c >= 0 and c <= PUA_CHARACTER_MAX:
try:
return DECODERS[c](obj[1:])
return decoders[c](obj[1:])
except Exception:
# return strings that cannot be parsed
# this may be due
Expand All @@ -167,11 +144,11 @@ def custom_pua_decode(obj: Any) -> Any:
return obj


def custom_pua_decode_nested(obj: Any) -> Any:
def custom_pua_decode_nested(obj: Any, decoders: TPuaDecoders = DECODERS) -> Any:
if isinstance(obj, str):
return custom_pua_decode(obj)
return custom_pua_decode(obj, decoders)
elif isinstance(obj, (list, dict)):
return map_nested_in_place(custom_pua_decode, obj)
return map_nested_in_place(custom_pua_decode, obj, decoders=decoders)
return obj


Expand All @@ -190,6 +167,39 @@ def may_have_pua(line: bytes) -> bool:
return PUA_START_UTF8_MAGIC in line


class SupportsJson(Protocol):
"""Minimum adapter for different json parser implementations"""

_impl_name: str
"""Implementation name"""

def dump(
self, obj: Any, fp: IO[bytes], sort_keys: bool = False, pretty: bool = False
) -> None: ...

def typed_dump(self, obj: Any, fp: IO[bytes], pretty: bool = False) -> None: ...

def typed_dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ...

def typed_loads(self, s: str) -> Any: ...

def typed_dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ...

def typed_loadb(
self, s: Union[bytes, bytearray, memoryview], decoders: TPuaDecoders = DECODERS
) -> Any: ...

def dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ...

def dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ...

def load(self, fp: Union[IO[bytes], IO[str]]) -> Any: ...

def loads(self, s: str) -> Any: ...

def loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: ...


# pick the right impl
json: SupportsJson = None
if os.environ.get(known_env.DLT_USE_JSON) == "simplejson":
Expand All @@ -216,4 +226,7 @@ def may_have_pua(line: bytes) -> bool:
"custom_pua_remove",
"SupportsJson",
"may_have_pua",
"TPuaDecoders",
"DECODERS",
"PY_DATETIME_DECODERS",
]
12 changes: 9 additions & 3 deletions dlt/common/json/_orjson.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from typing import IO, Any, Union
import orjson

from dlt.common.json import custom_pua_encode, custom_pua_decode_nested, custom_encode
from dlt.common.json import (
custom_pua_encode,
custom_pua_decode_nested,
custom_encode,
TPuaDecoders,
DECODERS,
)
from dlt.common.typing import AnyFun

_impl_name = "orjson"
Expand Down Expand Up @@ -38,8 +44,8 @@ def typed_loads(s: str) -> Any:
return custom_pua_decode_nested(loads(s))


def typed_loadb(s: Union[bytes, bytearray, memoryview]) -> Any:
return custom_pua_decode_nested(loadb(s))
def typed_loadb(s: Union[bytes, bytearray, memoryview], decoders: TPuaDecoders = DECODERS) -> Any:
return custom_pua_decode_nested(loadb(s), decoders)


def dumps(obj: Any, sort_keys: bool = False, pretty: bool = False) -> str:
Expand Down
12 changes: 9 additions & 3 deletions dlt/common/json/_simplejson.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import simplejson
import platform

from dlt.common.json import custom_pua_encode, custom_pua_decode_nested, custom_encode
from dlt.common.json import (
custom_pua_encode,
custom_pua_decode_nested,
custom_encode,
TPuaDecoders,
DECODERS,
)

if platform.python_implementation() == "PyPy":
# disable speedups on PyPy, it can be actually faster than Python C
Expand Down Expand Up @@ -73,8 +79,8 @@ def typed_dumpb(obj: Any, sort_keys: bool = False, pretty: bool = False) -> byte
return typed_dumps(obj, sort_keys, pretty).encode("utf-8")


def typed_loadb(s: Union[bytes, bytearray, memoryview]) -> Any:
return custom_pua_decode_nested(loadb(s))
def typed_loadb(s: Union[bytes, bytearray, memoryview], decoders: TPuaDecoders = DECODERS) -> Any:
return custom_pua_decode_nested(loadb(s), decoders)


def dumps(obj: Any, sort_keys: bool = False, pretty: bool = False) -> str:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import pyarrow.parquet
import pyarrow.compute
import pyarrow.dataset
from pyarrow.parquet import ParquetFile
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt pyarrow helpers",
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time:
return result
else:
raise ValueError(f"{value} is not a valid ISO time string.")
elif isinstance(value, timedelta):
# Assume timedelta is seconds passed since midnight. Some drivers (mysqlclient) return time in this format
return pendulum.time(
value.seconds // 3600,
(value.seconds // 60) % 60,
value.seconds % 60,
value.microseconds,
)
raise TypeError(f"Cannot coerce {value} to a pendulum.Time object.")


Expand Down
Loading

0 comments on commit 9580baf

Please sign in to comment.