Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sqlalchemy destination #1734

Merged
merged 37 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3f06e1e
Implement sqlalchemy loader
steinitzu Sep 11, 2024
31d9033
Support destination name in tests
steinitzu Sep 11, 2024
e3eaa43
Some job client/sql client tests running on sqlite
steinitzu Sep 11, 2024
2973526
Fix more tests
steinitzu Sep 4, 2024
8caf2f3
ALl sqlite tests passing
steinitzu Sep 5, 2024
2a30b36
Add sqlalchemy tests in ci
steinitzu Sep 5, 2024
e7f56c9
Type errors
steinitzu Sep 5, 2024
11d52db
Test sqlalchemy in own workflow
steinitzu Sep 5, 2024
9d37ea6
Fix tests, type errors
steinitzu Sep 11, 2024
cdeb17d
Fix config
steinitzu Sep 6, 2024
a730a91
CI fix
steinitzu Sep 6, 2024
3326580
Add alembic to handle ALTER TABLE
steinitzu Sep 11, 2024
567359d
FIx workflow
steinitzu Sep 6, 2024
babcd3c
Install mysqlclient in venv
steinitzu Sep 6, 2024
9dec1c5
Mysql service version
steinitzu Sep 6, 2024
3e282ea
Single fail
steinitzu Sep 6, 2024
0439015
mysql healtcheck
steinitzu Sep 6, 2024
61c8355
No localhost
steinitzu Sep 6, 2024
84dc4cf
Remove weaviate
steinitzu Sep 6, 2024
4bcc425
Change ubuntu version
steinitzu Sep 6, 2024
a9b7e49
Debug sqlite version
steinitzu Sep 6, 2024
e0a0781
Revert
steinitzu Sep 6, 2024
98f8de2
Use py datetime in tests
steinitzu Sep 6, 2024
4f8d8f6
Test on sqlalchemy 1.4 and 2
steinitzu Sep 6, 2024
79631b2
Lint, no cli tests
steinitzu Sep 6, 2024
8068595
Update lockfile
steinitzu Sep 11, 2024
a9c89a0
Fix test, complex -> json
steinitzu Sep 11, 2024
874c871
Refactor type mapper
steinitzu Sep 11, 2024
a69d749
Update tests destination config
steinitzu Sep 11, 2024
c25932b
Fix tests
steinitzu Sep 12, 2024
6c426e6
Ignore sources tests
steinitzu Sep 12, 2024
36b585e
Fix overriding destination in test pipeline
steinitzu Sep 12, 2024
0208c64
Fix time precision in arrow test
steinitzu Sep 12, 2024
65f6ef7
Lint
steinitzu Sep 12, 2024
6c29071
Fix destination setup in test
steinitzu Sep 13, 2024
eec4e22
Fix
steinitzu Sep 13, 2024
dc4c29c
Use nullpool, lazy create engine, close current connection
steinitzu Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test_local_destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ jobs:
DESTINATION__POSTGRES__CREDENTIALS: postgresql://loader:loader@localhost:5432/dlt_data
DESTINATION__QDRANT__CREDENTIALS__location: http://localhost:6333


- name: Stop weaviate
if: always()
run: docker compose -f ".github/weaviate-compose.yml" down -v
99 changes: 99 additions & 0 deletions .github/workflows/test_sqlalchemy_destinations.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Tests destinations that can run without credentials.
# i.e. local postgres, duckdb, filesystem (with local fs/memory bucket)

name: dest | sqlalchemy mysql and sqlite

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
# NOTE: this workflow can't use github secrets!
# DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
RUNTIME__DLTHUB_TELEMETRY_ENDPOINT: ${{ secrets.RUNTIME__DLTHUB_TELEMETRY_ENDPOINT }}
ACTIVE_DESTINATIONS: "[\"sqlalchemy\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\", \"file\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml

run_loader:
name: dest | sqlalchemy mysql and sqlite
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
strategy:
fail-fast: false
# Run on sqlalchemy 1.4 and 2.0
matrix:
sqlalchemy: [1.4, 2]
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

# Service containers to run with `container-job`
services:
# Label used to access the service container
mysql:
image: mysql:8
env:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: dlt_data
MYSQL_USER: loader
MYSQL_PASSWORD: loader
ports:
- 3306:3306
# Wait for the service to be ready before completing the job
options: >-
--health-cmd="mysqladmin ping -h localhost -u root -proot"
--health-interval=10s
--health-timeout=5s
--health-retries=5

steps:
- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-local-destinations

- name: Install dependencies
run: poetry install --no-interaction -E parquet -E filesystem -E sqlalchemy -E cli --with sentry-sdk --with pipeline && poetry run pip install mysqlclient && poetry run pip install "sqlalchemy==${{ matrix.sqlalchemy }}"

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

# always run full suite, also on branches
- run: poetry run pytest tests/load -x --ignore tests/load/sources
name: Run tests Linux
env:
DESTINATION__SQLALCHEMY_MYSQL__CREDENTIALS: mysql://root:[email protected]:3306/dlt_data # Use root cause we need to create databases
DESTINATION__SQLALCHEMY_SQLITE__CREDENTIALS: sqlite:///_storage/dl_data.sqlite
10 changes: 8 additions & 2 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
loader_parallelism_strategy: Optional[TLoaderParallelismStrategy] = None
"""The destination can override the parallelism strategy"""

max_query_parameters: Optional[int] = None
"""The maximum number of parameters that can be supplied in a single parametrized query"""

supports_native_boolean: bool = True
"""The destination supports a native boolean type, otherwise bool columns are usually stored as integers"""

def generates_case_sensitive_identifiers(self) -> bool:
"""Tells if capabilities as currently adjusted, will generate case sensitive identifiers"""
# must have case sensitive support and folding function must preserve casing
Expand Down Expand Up @@ -220,8 +226,8 @@ def generic_capabilities(
caps.merge_strategies_selector = merge_strategies_selector
return caps

def get_type_mapper(self) -> DataTypeMapper:
return self.type_mapper(self)
def get_type_mapper(self, *args: Any, **kwargs: Any) -> DataTypeMapper:
return self.type_mapper(self, *args, **kwargs)


def merge_caps_file_formats(
Expand Down
16 changes: 15 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ def from_normalized_mapping(
schema=normalized_doc[naming_convention.normalize_identifier("schema")],
)

def to_normalized_mapping(self, naming_convention: NamingConvention) -> Dict[str, Any]:
"""Convert this instance to mapping where keys are normalized according to given naming convention

Args:
naming_convention: Naming convention that should be used to normalize keys

Returns:
Dict[str, Any]: Mapping with normalized keys (e.g. {Version: ..., SchemaName: ...})
"""
return {
naming_convention.normalize_identifier(key): value
for key, value in self._asdict().items()
}


@dataclasses.dataclass
class StateInfo:
Expand Down Expand Up @@ -439,7 +453,7 @@ def __init__(
self.capabilities = capabilities

@abstractmethod
def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
def initialize_storage(self, truncate_tables: Optional[Iterable[str]] = None) -> None:
"""Prepares storage to be used ie. creates database schema or file system folder. Truncates requested tables."""
pass

Expand Down
83 changes: 48 additions & 35 deletions dlt/common/json/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,7 @@
from dlt.common.utils import map_nested_in_place


class SupportsJson(Protocol):
"""Minimum adapter for different json parser implementations"""

_impl_name: str
"""Implementation name"""

def dump(
self, obj: Any, fp: IO[bytes], sort_keys: bool = False, pretty: bool = False
) -> None: ...

def typed_dump(self, obj: Any, fp: IO[bytes], pretty: bool = False) -> None: ...

def typed_dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ...

def typed_loads(self, s: str) -> Any: ...

def typed_dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ...

def typed_loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: ...

def dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ...

def dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ...

def load(self, fp: Union[IO[bytes], IO[str]]) -> Any: ...

def loads(self, s: str) -> Any: ...

def loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: ...
TPuaDecoders = List[Callable[[Any], Any]]


def custom_encode(obj: Any) -> str:
Expand Down Expand Up @@ -104,7 +76,7 @@ def _datetime_decoder(obj: str) -> datetime:


# define decoder for each prefix
DECODERS: List[Callable[[Any], Any]] = [
DECODERS: TPuaDecoders = [
Decimal,
_datetime_decoder,
pendulum.Date.fromisoformat,
Expand All @@ -114,6 +86,11 @@ def _datetime_decoder(obj: str) -> datetime:
Wei,
pendulum.Time.fromisoformat,
]
# Alternate decoders that decode date/time/datetime to stdlib types instead of pendulum
PY_DATETIME_DECODERS = list(DECODERS)
PY_DATETIME_DECODERS[1] = datetime.fromisoformat
PY_DATETIME_DECODERS[2] = date.fromisoformat
PY_DATETIME_DECODERS[7] = time.fromisoformat
# how many decoders?
PUA_CHARACTER_MAX = len(DECODERS)

Expand Down Expand Up @@ -151,13 +128,13 @@ def custom_pua_encode(obj: Any) -> str:
raise TypeError(repr(obj) + " is not JSON serializable")


def custom_pua_decode(obj: Any) -> Any:
def custom_pua_decode(obj: Any, decoders: TPuaDecoders = DECODERS) -> Any:
if isinstance(obj, str) and len(obj) > 1:
c = ord(obj[0]) - PUA_START
# decode only the PUA space defined in DECODERS
if c >= 0 and c <= PUA_CHARACTER_MAX:
try:
return DECODERS[c](obj[1:])
return decoders[c](obj[1:])
except Exception:
# return strings that cannot be parsed
# this may be due
Expand All @@ -167,11 +144,11 @@ def custom_pua_decode(obj: Any) -> Any:
return obj


def custom_pua_decode_nested(obj: Any) -> Any:
def custom_pua_decode_nested(obj: Any, decoders: TPuaDecoders = DECODERS) -> Any:
if isinstance(obj, str):
return custom_pua_decode(obj)
return custom_pua_decode(obj, decoders)
elif isinstance(obj, (list, dict)):
return map_nested_in_place(custom_pua_decode, obj)
return map_nested_in_place(custom_pua_decode, obj, decoders=decoders)
return obj


Expand All @@ -190,6 +167,39 @@ def may_have_pua(line: bytes) -> bool:
return PUA_START_UTF8_MAGIC in line


class SupportsJson(Protocol):
"""Minimum adapter for different json parser implementations"""

_impl_name: str
"""Implementation name"""

def dump(
self, obj: Any, fp: IO[bytes], sort_keys: bool = False, pretty: bool = False
) -> None: ...

def typed_dump(self, obj: Any, fp: IO[bytes], pretty: bool = False) -> None: ...

def typed_dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ...

def typed_loads(self, s: str) -> Any: ...

def typed_dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ...

def typed_loadb(
self, s: Union[bytes, bytearray, memoryview], decoders: TPuaDecoders = DECODERS
) -> Any: ...

def dumps(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> str: ...

def dumpb(self, obj: Any, sort_keys: bool = False, pretty: bool = False) -> bytes: ...

def load(self, fp: Union[IO[bytes], IO[str]]) -> Any: ...

def loads(self, s: str) -> Any: ...

def loadb(self, s: Union[bytes, bytearray, memoryview]) -> Any: ...


# pick the right impl
json: SupportsJson = None
if os.environ.get(known_env.DLT_USE_JSON) == "simplejson":
Expand All @@ -216,4 +226,7 @@ def may_have_pua(line: bytes) -> bool:
"custom_pua_remove",
"SupportsJson",
"may_have_pua",
"TPuaDecoders",
"DECODERS",
"PY_DATETIME_DECODERS",
]
12 changes: 9 additions & 3 deletions dlt/common/json/_orjson.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from typing import IO, Any, Union
import orjson

from dlt.common.json import custom_pua_encode, custom_pua_decode_nested, custom_encode
from dlt.common.json import (
custom_pua_encode,
custom_pua_decode_nested,
custom_encode,
TPuaDecoders,
DECODERS,
)
from dlt.common.typing import AnyFun

_impl_name = "orjson"
Expand Down Expand Up @@ -38,8 +44,8 @@ def typed_loads(s: str) -> Any:
return custom_pua_decode_nested(loads(s))


def typed_loadb(s: Union[bytes, bytearray, memoryview]) -> Any:
return custom_pua_decode_nested(loadb(s))
def typed_loadb(s: Union[bytes, bytearray, memoryview], decoders: TPuaDecoders = DECODERS) -> Any:
return custom_pua_decode_nested(loadb(s), decoders)


def dumps(obj: Any, sort_keys: bool = False, pretty: bool = False) -> str:
Expand Down
12 changes: 9 additions & 3 deletions dlt/common/json/_simplejson.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import simplejson
import platform

from dlt.common.json import custom_pua_encode, custom_pua_decode_nested, custom_encode
from dlt.common.json import (
custom_pua_encode,
custom_pua_decode_nested,
custom_encode,
TPuaDecoders,
DECODERS,
)

if platform.python_implementation() == "PyPy":
# disable speedups on PyPy, it can be actually faster than Python C
Expand Down Expand Up @@ -73,8 +79,8 @@ def typed_dumpb(obj: Any, sort_keys: bool = False, pretty: bool = False) -> byte
return typed_dumps(obj, sort_keys, pretty).encode("utf-8")


def typed_loadb(s: Union[bytes, bytearray, memoryview]) -> Any:
return custom_pua_decode_nested(loadb(s))
def typed_loadb(s: Union[bytes, bytearray, memoryview], decoders: TPuaDecoders = DECODERS) -> Any:
return custom_pua_decode_nested(loadb(s), decoders)


def dumps(obj: Any, sort_keys: bool = False, pretty: bool = False) -> str:
Expand Down
1 change: 1 addition & 0 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import pyarrow.parquet
import pyarrow.compute
import pyarrow.dataset
from pyarrow.parquet import ParquetFile
except ModuleNotFoundError:
raise MissingDependencyException(
"dlt pyarrow helpers",
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ def ensure_pendulum_time(value: Union[str, datetime.time]) -> pendulum.Time:
return result
else:
raise ValueError(f"{value} is not a valid ISO time string.")
elif isinstance(value, timedelta):
# Assume timedelta is seconds passed since midnight. Some drivers (mysqlclient) return time in this format
return pendulum.time(
value.seconds // 3600,
(value.seconds // 60) % 60,
value.seconds % 60,
value.microseconds,
)
raise TypeError(f"Cannot coerce {value} to a pendulum.Time object.")


Expand Down
Loading
Loading