Skip to content

Commit

Permalink
Merge branch 'devel' into feat/removes-unnecessary-deps
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Nov 30, 2024
2 parents 657c086 + eefe77b commit a60a48e
Show file tree
Hide file tree
Showing 31 changed files with 794 additions and 187 deletions.
4 changes: 2 additions & 2 deletions dlt/common/libs/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,15 +620,15 @@ def row_tuples_to_arrow(
)
float_array = pa.array(columnar_known_types[field.name], type=pa.float64())
columnar_known_types[field.name] = float_array.cast(field.type, safe=False)
if issubclass(py_type, (dict, list)):
if issubclass(py_type, (dict, list, set)):
logger.warning(
f"Field {field.name} was reflected as JSON type and needs to be serialized back to"
" string to be placed in arrow table. This will slow data extraction down. You"
" should cast JSON field to STRING in your database system ie. by creating and"
" extracting an SQL VIEW that selects with cast."
)
json_str_array = pa.array(
[None if s is None else json.dumps(s) for s in columnar_known_types[field.name]]
[None if s is None else json.dumps(s) if not issubclass(type(s), set) else json.dumps(list(s)) for s in columnar_known_types[field.name]]
)
columnar_known_types[field.name] = json_str_array

Expand Down
84 changes: 76 additions & 8 deletions dlt/common/runtime/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ class Collector(ABC):

@abstractmethod
def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = None,
) -> None:
"""Creates or updates a counter
Expand All @@ -48,6 +54,7 @@ def update(
name (str): An unique name of a counter, displayable.
inc (int, optional): Increase amount. Defaults to 1.
total (int, optional): Maximum value of a counter. Defaults to None which means unbound counter.
icn_total (int, optional): Increase the maximum value of the counter, does nothing if counter does not exit yet
message (str, optional): Additional message attached to a counter. Defaults to None.
label (str, optional): Creates nested counter for counter `name`. Defaults to None.
"""
Expand Down Expand Up @@ -80,7 +87,13 @@ class NullCollector(Collector):
"""A default counter that does not count anything."""

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = None,
) -> None:
pass

Expand All @@ -98,7 +111,13 @@ def __init__(self) -> None:
self.counters: DefaultDict[str, int] = None

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = None,
) -> None:
assert not label, "labels not supported in dict collector"
self.counters[name] += inc
Expand Down Expand Up @@ -158,7 +177,13 @@ def __init__(
self.last_log_time: float = None

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = None,
) -> None:
counter_key = f"{name}_{label}" if label else name

Expand All @@ -171,6 +196,14 @@ def update(
)
self.messages[counter_key] = None
self.last_log_time = None
else:
counter_info = self.counter_info[counter_key]
if inc_total:
self.counter_info[counter_key] = LogCollector.CounterInfo(
description=counter_info.description,
start_time=counter_info.start_time,
total=counter_info.total + inc_total,
)

self.counters[counter_key] += inc
if message is not None:
Expand Down Expand Up @@ -264,7 +297,13 @@ def __init__(self, single_bar: bool = False, **tqdm_kwargs: Any) -> None:
self.tqdm_kwargs = tqdm_kwargs or {}

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = ""
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = "",
) -> None:
key = f"{name}_{label}"
bar = self._bars.get(key)
Expand All @@ -281,6 +320,10 @@ def update(
bar = tqdm(desc=desc, total=total, leave=False, **self.tqdm_kwargs)
bar.refresh()
self._bars[key] = bar
else:
if inc_total:
bar.total += inc_total
bar.refresh()
if message:
bar.set_postfix_str(message)
bar.update(inc)
Expand Down Expand Up @@ -312,11 +355,18 @@ def __init__(self, single_bar: bool = True, **alive_kwargs: Any) -> None:
)
self.single_bar = single_bar
self._bars: Dict[str, Any] = {}
self._bars_counts: Dict[str, int] = {}
self._bars_contexts: Dict[str, ContextManager[Any]] = {}
self.alive_kwargs = alive_kwargs or {}

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = ""
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = "",
) -> None:
key = f"{name}_{label}"
bar = self._bars.get(key)
Expand All @@ -333,19 +383,28 @@ def update(
bar = alive_bar(total=total, title=desc, **self.alive_kwargs)
self._bars_contexts[key] = bar
bar = self._bars[key] = bar.__enter__()
self._bars_counts[key] = 0
else:
# TODO: implement once total change is supported
pass

# if message:
# bar.set_postfix_str(message)
bar(inc)
if inc > 0:
bar(inc)
self._bars_counts[key] += inc

def _start(self, step: str) -> None:
self._bars = {}
self._bars_contexts = {}
self

def _stop(self) -> None:
for bar in self._bars_contexts.values():
bar.__exit__(None, None, None)
self._bars.clear()
self._bars_contexts.clear()
self._bars_counts.clear()


class EnlightenCollector(Collector):
Expand Down Expand Up @@ -376,7 +435,13 @@ def __init__(self, single_bar: bool = False, **enlighten_kwargs: Any) -> None:
self.enlighten_kwargs = enlighten_kwargs

def update(
self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = ""
self,
name: str,
inc: int = 1,
total: int = None,
inc_total: int = None,
message: str = None,
label: str = "",
) -> None:
key = f"{name}_{label}"
bar = self._bars.get(key)
Expand All @@ -391,6 +456,9 @@ def update(
)
bar.refresh()
self._bars[key] = bar
else:
if inc_total:
bar.total = bar.total + inc_total
bar.update(inc)

def _start(self, step: str) -> None:
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/lancedb/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class LanceDBClientConfiguration(DestinationClientDwhConfiguration):
"""Embedding provider used for generating embeddings. Default is "cohere". You can find the full list of
providers at https://github.com/lancedb/lancedb/tree/main/python/python/lancedb/embeddings as well as
https://lancedb.github.io/lancedb/embeddings/default_embedding_functions/."""
embedding_model_provider_host: Optional[str] = None
"""Full host URL with protocol and port (e.g. 'http://localhost:11434'). Uses LanceDB's default if not specified, assuming the provider accepts this parameter."""
embedding_model: str = "embed-english-v3.0"
"""The model used by the embedding provider for generating embeddings.
Check with the embedding provider which options are available.
Expand Down
4 changes: 3 additions & 1 deletion dlt/destinations/impl/lancedb/lancedb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def __init__(
self.dataset_name = self.config.normalize_dataset_name(self.schema)

embedding_model_provider = self.config.embedding_model_provider
embedding_model_host = self.config.embedding_model_provider_host

# LanceDB doesn't provide a standardized way to set API keys across providers.
# Some use ENV variables and others allow passing api key as an argument.
Expand All @@ -259,12 +260,13 @@ def __init__(
embedding_model_provider,
self.config.credentials.embedding_model_provider_api_key,
)

self.model_func = self.registry.get(embedding_model_provider).create(
name=self.config.embedding_model,
max_retries=self.config.options.max_retries,
api_key=self.config.credentials.api_key,
**({"host": embedding_model_host} if embedding_model_host else {}),
)

self.vector_field_name = self.config.vector_field_name

@property
Expand Down
1 change: 1 addition & 0 deletions dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ def create_followup_jobs(
f"Job {starting_job.job_id()} CREATED a new FOLLOWUP JOB"
f" {followup_job.new_file_path()} placed in new_jobs"
)
self.collector.update("Jobs", inc=0, inc_total=len(jobs))

def complete_jobs(
self, load_id: str, jobs: Sequence[LoadJob], schema: Schema
Expand Down
15 changes: 9 additions & 6 deletions dlt/sources/helpers/rest_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class RESTClient:
auth (Optional[AuthBase]): Authentication configuration for all requests.
paginator (Optional[BasePaginator]): Default paginator for handling paginated responses.
data_selector (Optional[jsonpath.TJsonPath]): JSONPath selector for extracting data from responses.
Only used in `paginate`.
session (BaseSession): HTTP session for making requests.
paginator_factory (Optional[PaginatorFactory]): Factory for creating paginator instances,
used for detecting paginators.
Expand Down Expand Up @@ -96,18 +97,18 @@ def __init__(

def _create_request(
self,
path: str,
path_or_url: str,
method: HTTPMethod,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
auth: Optional[AuthBase] = None,
hooks: Optional[Hooks] = None,
) -> Request:
parsed_url = urlparse(path)
parsed_url = urlparse(path_or_url)
if parsed_url.scheme in ("http", "https"):
url = path
url = path_or_url
else:
url = join_url(self.base_url, path)
url = join_url(self.base_url, path_or_url)

return Request(
method=method,
Expand Down Expand Up @@ -140,7 +141,7 @@ def _send_request(self, request: Request, **kwargs: Any) -> Response:

def request(self, path: str = "", method: HTTPMethod = "GET", **kwargs: Any) -> Response:
prepared_request = self._create_request(
path=path,
path_or_url=path,
method=method,
params=kwargs.pop("params", None),
json=kwargs.pop("json", None),
Expand Down Expand Up @@ -171,6 +172,8 @@ def paginate(
Args:
path (str): Endpoint path for the request, relative to `base_url`.
Can also be a fully qualified URL; if starting with http(s) it will
be used instead of the base_url + path.
method (HTTPMethodBasic): HTTP method for the request, defaults to 'get'.
params (Optional[Dict[str, Any]]): URL parameters for the request.
json (Optional[Dict[str, Any]]): JSON payload for the request.
Expand Down Expand Up @@ -210,7 +213,7 @@ def paginate(
hooks["response"] = [raise_for_status]

request = self._create_request(
path=path, method=method, params=params, json=json, auth=auth, hooks=hooks
path_or_url=path, method=method, params=params, json=json, auth=auth, hooks=hooks
)

if paginator:
Expand Down
13 changes: 11 additions & 2 deletions dlt/sources/rest_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def create_resources(
client = RESTClient(
base_url=client_config["base_url"],
headers=client_config.get("headers"),
auth=create_auth(client_config.get("auth")),
auth=create_auth(endpoint_config.get("auth", client_config.get("auth"))),
paginator=create_paginator(client_config.get("paginator")),
session=client_config.get("session"),
)
Expand Down Expand Up @@ -409,7 +409,16 @@ def _validate_config(config: RESTAPIConfig) -> None:
if client_config:
auth = client_config.get("auth")
if auth:
auth = _mask_secrets(auth)
_mask_secrets(auth)
resources = c.get("resources", [])
for resource in resources:
if isinstance(resource, (str, DltResource)):
continue
if endpoint := resource.get("endpoint"):
if not isinstance(endpoint, str):
auth = endpoint.get("auth")
if auth:
_mask_secrets(auth)

validate_dict(RESTAPIConfig, c, path=".")

Expand Down
1 change: 1 addition & 0 deletions dlt/sources/rest_api/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ class Endpoint(TypedDict, total=False):
data_selector: Optional[jsonpath.TJsonPath]
response_actions: Optional[List[ResponseAction]]
incremental: Optional[IncrementalConfig]
auth: Optional[AuthConfig]


class ProcessingSteps(TypedDict):
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/destinations/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ clickhouse
destination.

:::tip
`dataset_name` is optional for Clikchouse. When skipped `dlt` will create all tables without prefix. Note that staging dataset
`dataset_name` is optional for ClickHouse. When skipped `dlt` will create all tables without prefix. Note that staging dataset
tables will still be prefixed with `_staging` (or other name that you configure).
:::

Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ layout="{table_name}/{load_id}.{file_id}.{ext}" # current preconfigured naming s
# layout = "{table_name}/{load_package_timestamp}/{load_id}.{file_id}.{ext}"

# Parquet-like layout (note: it is not compatible with the internal datetime of the parquet file)
# layout = "{table_name}/year={year}/month={month}/day={day}/{load_id}.{file_id}.{ext}"
# layout = "{table_name}/year={YYYY}/month={MM}/day={DD}/{load_id}.{file_id}.{ext}"

# Custom placeholders
# extra_placeholders = { "owner" = "admin", "department" = "finance" }
Expand Down
8 changes: 6 additions & 2 deletions docs/website/docs/dlt-ecosystem/destinations/lancedb.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ Configure the destination in the dlt secrets file located at `~/.dlt/secrets.tom

```toml
[destination.lancedb]
embedding_model_provider = "cohere"
embedding_model = "embed-english-v3.0"
embedding_model_provider = "ollama"
embedding_model = "mxbai-embed-large"
embedding_model_provider_host = "http://localhost:11434" # Optional: custom endpoint for providers that support it

[destination.lancedb.credentials]
uri = ".lancedb"
api_key = "api_key" # API key to connect to LanceDB Cloud. Leave out if you are using LanceDB OSS.
Expand All @@ -47,6 +49,7 @@ embedding_model_provider_api_key = "embedding_model_provider_api_key" # Not need
- The `embedding_model` specifies the model used by the embedding provider for generating embeddings.
Check with the embedding provider which options are available.
Reference https://lancedb.github.io/lancedb/embeddings/default_embedding_functions/.
- The `embedding_model_provider_host` specifies the full host URL with protocol and port for providers that support custom endpoints (like Ollama). If not specified, the provider's default endpoint will be used.
- The `embedding_model_provider_api_key` is the API key for the embedding model provider used to generate embeddings. If you're using a provider that doesn't need authentication, such as Ollama, you don't need to supply this key.

:::info Available model providers
Expand All @@ -61,6 +64,7 @@ embedding_model_provider_api_key = "embedding_model_provider_api_key" # Not need
- "sentence-transformers"
- "huggingface"
- "colbert"
- "ollama"
:::

### Define your data source
Expand Down
4 changes: 4 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ You can also pass a database connection string similar to the one used by the `p
destination.redshift.credentials="redshift://loader:<password>@localhost/dlt_data?connect_timeout=15"
```

:::note
Use the PostgreSQL driver for PostgreSQL-based setups or the Amazon Redshift driver for native Redshift; [see documentation](https://docs.aws.amazon.com/redshift/latest/dg/c_redshift-postgres-jdbc.html).
:::

## Write disposition

All [write dispositions](../../general-usage/incremental-loading#choosing-a-write-disposition) are supported.
Expand Down
Loading

0 comments on commit a60a48e

Please sign in to comment.