Skip to content

Commit

Permalink
Merge pull request #1304 from dlt-hub/devel
Browse files Browse the repository at this point in the history
master merge for `0.4.10` release
  • Loading branch information
rudolfix authored Apr 30, 2024
2 parents efaedc2 + 15686ab commit 048839d
Show file tree
Hide file tree
Showing 89 changed files with 3,942 additions and 524 deletions.
78 changes: 78 additions & 0 deletions .github/workflows/test_destination_clickhouse.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@

name: test | clickhouse

on:
pull_request:
branches:
- master
- devel
workflow_dispatch:
schedule:
- cron: '0 2 * * *'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
RUNTIME__SENTRY_DSN: https://[email protected]/4504819859914752
RUNTIME__LOG_LEVEL: ERROR
DLT_SECRETS_TOML: ${{ secrets.DLT_SECRETS_TOML }}

ACTIVE_DESTINATIONS: "[\"clickhouse\"]"
ALL_FILESYSTEM_DRIVERS: "[\"memory\"]"

jobs:
get_docs_changes:
name: docs changes
uses: ./.github/workflows/get_docs_changes.yml
if: ${{ !github.event.pull_request.head.repo.fork || contains(github.event.pull_request.labels.*.name, 'ci from fork')}}

run_loader:
name: test | clickhouse tests
needs: get_docs_changes
if: needs.get_docs_changes.outputs.changes_outside_docs == 'true'
defaults:
run:
shell: bash
runs-on: "ubuntu-latest"

steps:

- name: Check out
uses: actions/checkout@master

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: "3.10.x"

- name: Install Poetry
uses: snok/[email protected]
with:
virtualenvs-create: true
virtualenvs-in-project: true
installer-parallel: true

- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-gcp

- name: Install dependencies
run: poetry install --no-interaction -E clickhouse --with providers -E parquet --with sentry-sdk --with pipeline

- name: create secrets.toml
run: pwd && echo "$DLT_SECRETS_TOML" > tests/.dlt/secrets.toml

- run: |
poetry run pytest tests/load -m "essential"
name: Run essential tests Linux
if: ${{ ! (contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule')}}
- run: |
poetry run pytest tests/load
name: Run all tests Linux
if: ${{ contains(github.event.pull_request.labels.*.name, 'ci full') || github.event_name == 'schedule'}}
2 changes: 1 addition & 1 deletion dlt/cli/telemetry_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dlt.cli.utils import get_telemetry_status
from dlt.cli.config_toml_writer import WritableConfigValue, write_values
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
from dlt.common.runtime.segment import get_anonymous_id
from dlt.common.runtime.anon_tracker import get_anonymous_id

DLT_TELEMETRY_DOCS_URL = "https://dlthub.com/docs/reference/telemetry"

Expand Down
1 change: 0 additions & 1 deletion dlt/common/configuration/providers/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def name(self) -> str:

def _look_vault(self, full_key: str, hint: type) -> str:
"""Get Airflow Variable with given `full_key`, return None if not found"""

from airflow.models import Variable

with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def default_factory(att_value=att_value): # type: ignore[no-untyped-def]
synth_init = init and ((not base_params or base_params.init) and has_default_init)
if synth_init != init and has_default_init:
warnings.warn(
f"__init__ method will not be generated on {cls.__name__} because bas class didn't"
f"__init__ method will not be generated on {cls.__name__} because base class didn't"
" synthesize __init__. Please correct `init` flag in confispec decorator. You are"
" probably receiving incorrect __init__ signature for type checking"
)
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/configuration/specs/run_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class RunConfiguration(BaseConfiguration):
sentry_dsn: Optional[str] = None # keep None to disable Sentry
slack_incoming_hook: Optional[TSecretStrValue] = None
dlthub_telemetry: bool = True # enable or disable dlthub telemetry
dlthub_telemetry_segment_write_key: str = "a1F2gc6cNYw2plyAt02sZouZcsRjG7TD"
dlthub_telemetry_endpoint: str = "https://api.segment.io/v1/track"
dlthub_telemetry_endpoint: Optional[str] = "https://telemetry.scalevector.ai"
dlthub_telemetry_segment_write_key: Optional[str] = None
log_format: str = "{asctime}|[{levelname:<21}]|{process}|{thread}|{name}|{filename}|{funcName}:{lineno}|{message}"
log_level: str = "WARNING"
request_timeout: float = 60
Expand Down
51 changes: 48 additions & 3 deletions dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,47 @@ def escape_databricks_literal(v: Any) -> Any:
return _escape_extended(json.dumps(v), prefix="'", escape_dict=DATABRICKS_ESCAPE_DICT)
if isinstance(v, bytes):
return f"X'{v.hex()}'"
if v is None:
return "NULL"
return "NULL" if v is None else str(v)

return str(v)

# https://github.com/ClickHouse/ClickHouse/blob/master/docs/en/sql-reference/syntax.md#string
CLICKHOUSE_ESCAPE_DICT = {
"'": "''",
"\\": "\\\\",
"\n": "\\n",
"\t": "\\t",
"\b": "\\b",
"\f": "\\f",
"\r": "\\r",
"\0": "\\0",
"\a": "\\a",
"\v": "\\v",
}

CLICKHOUSE_ESCAPE_RE = _make_sql_escape_re(CLICKHOUSE_ESCAPE_DICT)


def escape_clickhouse_literal(v: Any) -> Any:
if isinstance(v, str):
return _escape_extended(
v, prefix="'", escape_dict=CLICKHOUSE_ESCAPE_DICT, escape_re=CLICKHOUSE_ESCAPE_RE
)
if isinstance(v, (datetime, date, time)):
return f"'{v.isoformat()}'"
if isinstance(v, (list, dict)):
return _escape_extended(
json.dumps(v),
prefix="'",
escape_dict=CLICKHOUSE_ESCAPE_DICT,
escape_re=CLICKHOUSE_ESCAPE_RE,
)
if isinstance(v, bytes):
return f"'{v.hex()}'"
return "NULL" if v is None else str(v)


def escape_clickhouse_identifier(v: str) -> str:
return "`" + v.replace("`", "``").replace("\\", "\\\\") + "`"


def format_datetime_literal(v: pendulum.DateTime, precision: int = 6, no_tz: bool = False) -> str:
Expand All @@ -176,3 +213,11 @@ def format_bigquery_datetime_literal(
"""Returns BigQuery-adjusted datetime literal by prefixing required `TIMESTAMP` indicator."""
# https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#timestamp_literals
return "TIMESTAMP " + format_datetime_literal(v, precision, no_tz)


def format_clickhouse_datetime_literal(
v: pendulum.DateTime, precision: int = 6, no_tz: bool = False
) -> str:
"""Returns clickhouse compatibel function"""
datetime = format_datetime_literal(v, precision, True)
return f"toDateTime64({datetime}, {precision}, '{v.tzinfo}')"
1 change: 1 addition & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):
insert_values_writer_type: str = "default"
supports_multiple_statements: bool = True
supports_clone_table: bool = False

"""Destination supports CREATE TABLE ... CLONE ... statements"""
max_table_nesting: Optional[int] = None # destination can overwrite max table nesting

Expand Down
17 changes: 11 additions & 6 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
TDestinationClient = TypeVar("TDestinationClient", bound="JobClientBase")
TDestinationDwhClient = TypeVar("TDestinationDwhClient", bound="DestinationClientDwhConfiguration")

DEFAULT_FILE_LAYOUT = "{table_name}/{load_id}.{file_id}.{ext}"


class StorageSchemaInfo(NamedTuple):
version_hash: str
Expand Down Expand Up @@ -155,7 +157,7 @@ class DestinationClientStagingConfiguration(DestinationClientDwhConfiguration):
as_staging: bool = False
bucket_url: str = None
# layout of the destination files
layout: str = "{table_name}/{load_id}.{file_id}.{ext}"
layout: str = DEFAULT_FILE_LAYOUT


@configspec
Expand Down Expand Up @@ -353,13 +355,16 @@ def _verify_schema(self) -> None:
f'"{table["x-merge-strategy"]}" is not a valid merge strategy. ' # type: ignore[typeddict-item]
f"""Allowed values: {', '.join(['"' + s + '"' for s in MERGE_STRATEGIES])}."""
)
if not has_column_with_prop(table, "primary_key") and not has_column_with_prop(
table, "merge_key"
if (
table.get("x-merge-strategy") == "delete-insert"
and not has_column_with_prop(table, "primary_key")
and not has_column_with_prop(table, "merge_key")
):
logger.warning(
f"Table {table_name} has write_disposition set to merge, but no primary or"
" merge keys defined. "
+ "dlt will fall back to append for this table."
f"Table {table_name} has `write_disposition` set to `merge`"
" and `merge_strategy` set to `delete-insert`, but no primary or"
" merge keys defined."
" dlt will fall back to `append` for this table."
)
if has_column_with_prop(table, "hard_delete"):
if len(get_columns_names_with_prop(table, "hard_delete")) > 1:
Expand Down
Loading

0 comments on commit 048839d

Please sign in to comment.