diff --git a/dlt/common/libs/pyarrow.py b/dlt/common/libs/pyarrow.py index c8034d8d18..37268c0d2f 100644 --- a/dlt/common/libs/pyarrow.py +++ b/dlt/common/libs/pyarrow.py @@ -620,7 +620,7 @@ def row_tuples_to_arrow( ) float_array = pa.array(columnar_known_types[field.name], type=pa.float64()) columnar_known_types[field.name] = float_array.cast(field.type, safe=False) - if issubclass(py_type, (dict, list)): + if issubclass(py_type, (dict, list, set)): logger.warning( f"Field {field.name} was reflected as JSON type and needs to be serialized back to" " string to be placed in arrow table. This will slow data extraction down. You" @@ -628,7 +628,7 @@ def row_tuples_to_arrow( " extracting an SQL VIEW that selects with cast." ) json_str_array = pa.array( - [None if s is None else json.dumps(s) for s in columnar_known_types[field.name]] + [None if s is None else json.dumps(s) if not issubclass(type(s), set) else json.dumps(list(s)) for s in columnar_known_types[field.name]] ) columnar_known_types[field.name] = json_str_array diff --git a/dlt/common/runtime/collector.py b/dlt/common/runtime/collector.py index be5453cdd3..8504334281 100644 --- a/dlt/common/runtime/collector.py +++ b/dlt/common/runtime/collector.py @@ -37,7 +37,13 @@ class Collector(ABC): @abstractmethod def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = None, ) -> None: """Creates or updates a counter @@ -48,6 +54,7 @@ def update( name (str): An unique name of a counter, displayable. inc (int, optional): Increase amount. Defaults to 1. total (int, optional): Maximum value of a counter. Defaults to None which means unbound counter. + icn_total (int, optional): Increase the maximum value of the counter, does nothing if counter does not exit yet message (str, optional): Additional message attached to a counter. Defaults to None. label (str, optional): Creates nested counter for counter `name`. Defaults to None. """ @@ -80,7 +87,13 @@ class NullCollector(Collector): """A default counter that does not count anything.""" def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = None, ) -> None: pass @@ -98,7 +111,13 @@ def __init__(self) -> None: self.counters: DefaultDict[str, int] = None def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = None, ) -> None: assert not label, "labels not supported in dict collector" self.counters[name] += inc @@ -158,7 +177,13 @@ def __init__( self.last_log_time: float = None def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = None + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = None, ) -> None: counter_key = f"{name}_{label}" if label else name @@ -171,6 +196,14 @@ def update( ) self.messages[counter_key] = None self.last_log_time = None + else: + counter_info = self.counter_info[counter_key] + if inc_total: + self.counter_info[counter_key] = LogCollector.CounterInfo( + description=counter_info.description, + start_time=counter_info.start_time, + total=counter_info.total + inc_total, + ) self.counters[counter_key] += inc if message is not None: @@ -264,7 +297,13 @@ def __init__(self, single_bar: bool = False, **tqdm_kwargs: Any) -> None: self.tqdm_kwargs = tqdm_kwargs or {} def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = "" + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = "", ) -> None: key = f"{name}_{label}" bar = self._bars.get(key) @@ -281,6 +320,10 @@ def update( bar = tqdm(desc=desc, total=total, leave=False, **self.tqdm_kwargs) bar.refresh() self._bars[key] = bar + else: + if inc_total: + bar.total += inc_total + bar.refresh() if message: bar.set_postfix_str(message) bar.update(inc) @@ -312,11 +355,18 @@ def __init__(self, single_bar: bool = True, **alive_kwargs: Any) -> None: ) self.single_bar = single_bar self._bars: Dict[str, Any] = {} + self._bars_counts: Dict[str, int] = {} self._bars_contexts: Dict[str, ContextManager[Any]] = {} self.alive_kwargs = alive_kwargs or {} def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = "" + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = "", ) -> None: key = f"{name}_{label}" bar = self._bars.get(key) @@ -333,19 +383,28 @@ def update( bar = alive_bar(total=total, title=desc, **self.alive_kwargs) self._bars_contexts[key] = bar bar = self._bars[key] = bar.__enter__() + self._bars_counts[key] = 0 + else: + # TODO: implement once total change is supported + pass + # if message: # bar.set_postfix_str(message) - bar(inc) + if inc > 0: + bar(inc) + self._bars_counts[key] += inc def _start(self, step: str) -> None: self._bars = {} self._bars_contexts = {} + self def _stop(self) -> None: for bar in self._bars_contexts.values(): bar.__exit__(None, None, None) self._bars.clear() self._bars_contexts.clear() + self._bars_counts.clear() class EnlightenCollector(Collector): @@ -376,7 +435,13 @@ def __init__(self, single_bar: bool = False, **enlighten_kwargs: Any) -> None: self.enlighten_kwargs = enlighten_kwargs def update( - self, name: str, inc: int = 1, total: int = None, message: str = None, label: str = "" + self, + name: str, + inc: int = 1, + total: int = None, + inc_total: int = None, + message: str = None, + label: str = "", ) -> None: key = f"{name}_{label}" bar = self._bars.get(key) @@ -391,6 +456,9 @@ def update( ) bar.refresh() self._bars[key] = bar + else: + if inc_total: + bar.total = bar.total + inc_total bar.update(inc) def _start(self, step: str) -> None: diff --git a/dlt/destinations/impl/lancedb/configuration.py b/dlt/destinations/impl/lancedb/configuration.py index 8f6a192bb0..33642268c1 100644 --- a/dlt/destinations/impl/lancedb/configuration.py +++ b/dlt/destinations/impl/lancedb/configuration.py @@ -82,6 +82,8 @@ class LanceDBClientConfiguration(DestinationClientDwhConfiguration): """Embedding provider used for generating embeddings. Default is "cohere". You can find the full list of providers at https://github.com/lancedb/lancedb/tree/main/python/python/lancedb/embeddings as well as https://lancedb.github.io/lancedb/embeddings/default_embedding_functions/.""" + embedding_model_provider_host: Optional[str] = None + """Full host URL with protocol and port (e.g. 'http://localhost:11434'). Uses LanceDB's default if not specified, assuming the provider accepts this parameter.""" embedding_model: str = "embed-english-v3.0" """The model used by the embedding provider for generating embeddings. Check with the embedding provider which options are available. diff --git a/dlt/destinations/impl/lancedb/lancedb_client.py b/dlt/destinations/impl/lancedb/lancedb_client.py index 1a3e1a7d34..bb0e12f8ec 100644 --- a/dlt/destinations/impl/lancedb/lancedb_client.py +++ b/dlt/destinations/impl/lancedb/lancedb_client.py @@ -251,6 +251,7 @@ def __init__( self.dataset_name = self.config.normalize_dataset_name(self.schema) embedding_model_provider = self.config.embedding_model_provider + embedding_model_host = self.config.embedding_model_provider_host # LanceDB doesn't provide a standardized way to set API keys across providers. # Some use ENV variables and others allow passing api key as an argument. @@ -259,12 +260,13 @@ def __init__( embedding_model_provider, self.config.credentials.embedding_model_provider_api_key, ) + self.model_func = self.registry.get(embedding_model_provider).create( name=self.config.embedding_model, max_retries=self.config.options.max_retries, api_key=self.config.credentials.api_key, + **({"host": embedding_model_host} if embedding_model_host else {}), ) - self.vector_field_name = self.config.vector_field_name @property diff --git a/dlt/load/load.py b/dlt/load/load.py index 060b2c5d8e..ddbc7193ed 100644 --- a/dlt/load/load.py +++ b/dlt/load/load.py @@ -370,6 +370,7 @@ def create_followup_jobs( f"Job {starting_job.job_id()} CREATED a new FOLLOWUP JOB" f" {followup_job.new_file_path()} placed in new_jobs" ) + self.collector.update("Jobs", inc=0, inc_total=len(jobs)) def complete_jobs( self, load_id: str, jobs: Sequence[LoadJob], schema: Schema diff --git a/dlt/sources/helpers/rest_client/client.py b/dlt/sources/helpers/rest_client/client.py index 6d04373d8d..a619a05a00 100644 --- a/dlt/sources/helpers/rest_client/client.py +++ b/dlt/sources/helpers/rest_client/client.py @@ -58,6 +58,7 @@ class RESTClient: auth (Optional[AuthBase]): Authentication configuration for all requests. paginator (Optional[BasePaginator]): Default paginator for handling paginated responses. data_selector (Optional[jsonpath.TJsonPath]): JSONPath selector for extracting data from responses. + Only used in `paginate`. session (BaseSession): HTTP session for making requests. paginator_factory (Optional[PaginatorFactory]): Factory for creating paginator instances, used for detecting paginators. @@ -96,18 +97,18 @@ def __init__( def _create_request( self, - path: str, + path_or_url: str, method: HTTPMethod, params: Optional[Dict[str, Any]] = None, json: Optional[Dict[str, Any]] = None, auth: Optional[AuthBase] = None, hooks: Optional[Hooks] = None, ) -> Request: - parsed_url = urlparse(path) + parsed_url = urlparse(path_or_url) if parsed_url.scheme in ("http", "https"): - url = path + url = path_or_url else: - url = join_url(self.base_url, path) + url = join_url(self.base_url, path_or_url) return Request( method=method, @@ -140,7 +141,7 @@ def _send_request(self, request: Request, **kwargs: Any) -> Response: def request(self, path: str = "", method: HTTPMethod = "GET", **kwargs: Any) -> Response: prepared_request = self._create_request( - path=path, + path_or_url=path, method=method, params=kwargs.pop("params", None), json=kwargs.pop("json", None), @@ -171,6 +172,8 @@ def paginate( Args: path (str): Endpoint path for the request, relative to `base_url`. + Can also be a fully qualified URL; if starting with http(s) it will + be used instead of the base_url + path. method (HTTPMethodBasic): HTTP method for the request, defaults to 'get'. params (Optional[Dict[str, Any]]): URL parameters for the request. json (Optional[Dict[str, Any]]): JSON payload for the request. @@ -210,7 +213,7 @@ def paginate( hooks["response"] = [raise_for_status] request = self._create_request( - path=path, method=method, params=params, json=json, auth=auth, hooks=hooks + path_or_url=path, method=method, params=params, json=json, auth=auth, hooks=hooks ) if paginator: diff --git a/dlt/sources/rest_api/__init__.py b/dlt/sources/rest_api/__init__.py index ed55f71e10..966d9e8b6c 100644 --- a/dlt/sources/rest_api/__init__.py +++ b/dlt/sources/rest_api/__init__.py @@ -266,7 +266,7 @@ def create_resources( client = RESTClient( base_url=client_config["base_url"], headers=client_config.get("headers"), - auth=create_auth(client_config.get("auth")), + auth=create_auth(endpoint_config.get("auth", client_config.get("auth"))), paginator=create_paginator(client_config.get("paginator")), session=client_config.get("session"), ) @@ -409,7 +409,16 @@ def _validate_config(config: RESTAPIConfig) -> None: if client_config: auth = client_config.get("auth") if auth: - auth = _mask_secrets(auth) + _mask_secrets(auth) + resources = c.get("resources", []) + for resource in resources: + if isinstance(resource, (str, DltResource)): + continue + if endpoint := resource.get("endpoint"): + if not isinstance(endpoint, str): + auth = endpoint.get("auth") + if auth: + _mask_secrets(auth) validate_dict(RESTAPIConfig, c, path=".") diff --git a/dlt/sources/rest_api/typing.py b/dlt/sources/rest_api/typing.py index d4cea892a3..ccef828b1a 100644 --- a/dlt/sources/rest_api/typing.py +++ b/dlt/sources/rest_api/typing.py @@ -263,6 +263,7 @@ class Endpoint(TypedDict, total=False): data_selector: Optional[jsonpath.TJsonPath] response_actions: Optional[List[ResponseAction]] incremental: Optional[IncrementalConfig] + auth: Optional[AuthConfig] class ProcessingSteps(TypedDict): diff --git a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md index 5ca25af55c..3bd1ae8e15 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md +++ b/docs/website/docs/dlt-ecosystem/destinations/clickhouse.md @@ -126,7 +126,7 @@ clickhouse destination. :::tip -`dataset_name` is optional for Clikchouse. When skipped `dlt` will create all tables without prefix. Note that staging dataset +`dataset_name` is optional for ClickHouse. When skipped `dlt` will create all tables without prefix. Note that staging dataset tables will still be prefixed with `_staging` (or other name that you configure). ::: diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 3f4efe1db2..9b243b9429 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -530,7 +530,7 @@ layout="{table_name}/{load_id}.{file_id}.{ext}" # current preconfigured naming s # layout = "{table_name}/{load_package_timestamp}/{load_id}.{file_id}.{ext}" # Parquet-like layout (note: it is not compatible with the internal datetime of the parquet file) -# layout = "{table_name}/year={year}/month={month}/day={day}/{load_id}.{file_id}.{ext}" +# layout = "{table_name}/year={YYYY}/month={MM}/day={DD}/{load_id}.{file_id}.{ext}" # Custom placeholders # extra_placeholders = { "owner" = "admin", "department" = "finance" } diff --git a/docs/website/docs/dlt-ecosystem/destinations/lancedb.md b/docs/website/docs/dlt-ecosystem/destinations/lancedb.md index b2aec665ab..035f27fe32 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/lancedb.md +++ b/docs/website/docs/dlt-ecosystem/destinations/lancedb.md @@ -33,8 +33,10 @@ Configure the destination in the dlt secrets file located at `~/.dlt/secrets.tom ```toml [destination.lancedb] -embedding_model_provider = "cohere" -embedding_model = "embed-english-v3.0" +embedding_model_provider = "ollama" +embedding_model = "mxbai-embed-large" +embedding_model_provider_host = "http://localhost:11434" # Optional: custom endpoint for providers that support it + [destination.lancedb.credentials] uri = ".lancedb" api_key = "api_key" # API key to connect to LanceDB Cloud. Leave out if you are using LanceDB OSS. @@ -47,6 +49,7 @@ embedding_model_provider_api_key = "embedding_model_provider_api_key" # Not need - The `embedding_model` specifies the model used by the embedding provider for generating embeddings. Check with the embedding provider which options are available. Reference https://lancedb.github.io/lancedb/embeddings/default_embedding_functions/. +- The `embedding_model_provider_host` specifies the full host URL with protocol and port for providers that support custom endpoints (like Ollama). If not specified, the provider's default endpoint will be used. - The `embedding_model_provider_api_key` is the API key for the embedding model provider used to generate embeddings. If you're using a provider that doesn't need authentication, such as Ollama, you don't need to supply this key. :::info Available model providers @@ -61,6 +64,7 @@ embedding_model_provider_api_key = "embedding_model_provider_api_key" # Not need - "sentence-transformers" - "huggingface" - "colbert" +- "ollama" ::: ### Define your data source diff --git a/docs/website/docs/dlt-ecosystem/destinations/redshift.md b/docs/website/docs/dlt-ecosystem/destinations/redshift.md index 3108004712..4b57877f00 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/redshift.md +++ b/docs/website/docs/dlt-ecosystem/destinations/redshift.md @@ -67,6 +67,10 @@ You can also pass a database connection string similar to the one used by the `p destination.redshift.credentials="redshift://loader:@localhost/dlt_data?connect_timeout=15" ``` +:::note +Use the PostgreSQL driver for PostgreSQL-based setups or the Amazon Redshift driver for native Redshift; [see documentation](https://docs.aws.amazon.com/redshift/latest/dg/c_redshift-postgres-jdbc.html). +::: + ## Write disposition All [write dispositions](../../general-usage/incremental-loading#choosing-a-write-disposition) are supported. diff --git a/docs/website/docs/dlt-ecosystem/notebooks.md b/docs/website/docs/dlt-ecosystem/notebooks.md deleted file mode 100644 index 4486b81b68..0000000000 --- a/docs/website/docs/dlt-ecosystem/notebooks.md +++ /dev/null @@ -1,27 +0,0 @@ ---- -title: dlt in notebooks -description: Run dlt in notebooks like Colab, Databricks or Jupyter -keywords: [notebook, jupyter] ---- -# dlt in notebooks - -## Colab -You'll need to install `dlt` like any other dependency: -```sh -!pip install dlt -``` - -You can configure secrets using **Secrets** sidebar. Just create a variable with the name `secrets.toml` and paste -the content of the **toml** file from your `.dlt` folder into it. We support `config.toml` variable as well. - -:::note -`dlt` will not reload the secrets automatically. Please restart your interpreter in Colab options when you add/change -content of the variables above. -::: - -## Streamlit -`dlt` will look for `secrets.toml` and `config.toml` in the `.dlt` folder. If `secrets.toml` are not found, it will use -`secrets.toml` from `.streamlit` folder. -If you run locally, maintain your usual `.dlt` folder. When running on streamlit cloud, paste the content of `dlt` -`secrets.toml` into the `streamlit` secrets. - diff --git a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md index d23f3f139e..14d9ecb04b 100644 --- a/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md +++ b/docs/website/docs/dlt-ecosystem/verified-sources/rest_api/basic.md @@ -24,7 +24,7 @@ source = rest_api_source({ "token": dlt.secrets["your_api_token"], }, "paginator": { - "type": "json_response", + "type": "json_link", "next_url_path": "paging.next", }, }, @@ -308,6 +308,32 @@ A resource configuration is used to define a [dlt resource](../../../general-usa - `include_from_parent`: A list of fields from the parent resource to be included in the resource output. See the [resource relationships](#include-fields-from-the-parent-resource) section for more details. - `processing_steps`: A list of [processing steps](#processing-steps-filter-and-transform-data) to filter and transform the data. - `selected`: A flag to indicate if the resource is selected for loading. This could be useful when you want to load data only from child resources and not from the parent resource. +- `auth`: An optional `AuthConfig` instance. If passed, is used over the one defined in the [client](#client) definition. Example: +```py +from dlt.sources.helpers.rest_client.auth import HttpBasicAuth + +config = { + "client": { + "auth": { + "type": "bearer", + "token": dlt.secrets["your_api_token"], + } + }, + "resources": [ + "resource-using-bearer-auth", + { + "name": "my-resource-with-special-auth", + "endpoint": { + # ... + "auth": HttpBasicAuth("user", dlt.secrets["your_basic_auth_password"]) + }, + # ... + } + ] + # ... +} +``` +This would use `Bearer` auth as defined in the `client` for `resource-using-bearer-auth` and `Http Basic` auth for `my-resource-with-special-auth`. You can also pass additional resource parameters that will be used to configure the dlt resource. See [dlt resource API reference](../../../api_reference/extract/decorators#resource) for more details. @@ -335,7 +361,8 @@ The endpoint configuration defines how to query the API endpoint. Quick example: The fields in the endpoint configuration are: -- `path`: The path to the API endpoint. +- `path`: The path to the API endpoint. By default this path is appended to the given `base_url`. If this is a fully qualified URL starting with `http:` or `https:` it will be +used as-is and `base_url` will be ignored. - `method`: The HTTP method to be used. The default is `GET`. - `params`: Query parameters to be sent with each request. For example, `sort` to order the results or `since` to specify [incremental loading](#incremental-loading). This is also used to define [resource relationships](#define-resource-relationships). - `json`: The JSON payload to be sent with the request (for POST and PUT requests). diff --git a/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md b/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md deleted file mode 100644 index 79ac7b89ad..0000000000 --- a/docs/website/docs/dlt-ecosystem/visualizations/exploring-the-data.md +++ /dev/null @@ -1,128 +0,0 @@ ---- -title: Explore the loaded data -description: How to explore the data that has been loaded -keywords: [exploring, loaded data, data quality] ---- - -# Explore the loaded data - -Once you have run a pipeline locally, you can launch a web app that displays the loaded data. - -To do so, run the [cli command](../../reference/command-line-interface.md#show-tables-and-data-in-the-destination) -below with your pipeline name. The pipeline name is the name of the -Python file where your pipeline is defined and also displayed in your terminal when loading: - -```sh -dlt pipeline {pipeline_name} show -``` - -This will open a streamlit app with: - -- Information about the loads. -- Tables and sample data. -- A SQL client that you can use to run queries. - -## Exploring the data in Python - -You can quickly fetch loaded data from a destination using SQL. The data will be available as a -stream of rows or a data frame. Both methods use the same credentials that you set up for your -pipeline and hide many intricacies of correctly setting up the connection to your destination. - -### Querying the data using the `dlt` SQL client - -Execute any SQL query and get results following the Python -[dbapi](https://peps.python.org/pep-0249/) spec. Below, we fetch data from the customers table: - -```py -pipeline = dlt.pipeline(destination="bigquery", dataset_name="crm") -with pipeline.sql_client() as client: - with client.execute_query( - "SELECT id, name, email FROM customers WHERE id = %s", - 10 - ) as cursor: - # get all data from the cursor as a list of rows - print(cursor.fetchall()) -``` - -In the above, we used `dbapi` parameter placeholders and fetched the data using the `fetchall` method -that reads all the rows from the cursor. - -### Querying data into a data frame - -You can fetch the results of any SQL query as a data frame. If the destination supports that -natively (i.e., BigQuery and DuckDB), `dlt` uses the native method. Thanks to that, reading data -frames may be really fast! The example below reads GitHub reactions data from the `issues` table and -counts reaction types. - -```py -pipeline = dlt.pipeline( - pipeline_name="github_pipeline", - destination="duckdb", - dataset_name="github_reactions", - dev_mode=True -) -with pipeline.sql_client() as client: - with client.execute_query( - 'SELECT "reactions__+1", "reactions__-1", reactions__laugh, reactions__hooray, reactions__rocket FROM issues' - ) as cursor: - # calling `df` on a cursor, returns the data as a pandas DataFrame - reactions = cursor.df() -counts = reactions.sum(0).sort_values(0, ascending=False) -``` - -The `df` method above returns all the data in the cursor as a data frame. You can also fetch data in -chunks by passing the `chunk_size` argument to the `df` method. - -### Access destination native connection - -The native connection to your destination like BigQuery `Client` or DuckDB `DuckDBPyConnection` is -available in case you want to do anything special. Below, we take the native connection to `duckdb` -to get `DuckDBPyRelation` from a query: - -```py -import dlt -import duckdb - -pipeline = dlt.pipeline(destination="duckdb", dataset_name="github_reactions") -with pipeline.sql_client() as client: - conn = client.native_connection - rel = conn.sql('SELECT * FROM issues') - rel.limit(3).show() -``` - -## Data quality dashboards - -After deploying a `dlt` pipeline, you might ask yourself: How can we know if the data is and remains -high quality? - -There are two ways to catch errors: - -1. Tests. -1. People [monitoring.](../../running-in-production/monitoring.md) - -## Tests - -The first time you load data from a pipeline you have built, you will likely want to test it. Plot -the data on time series line charts and look for any interruptions or spikes, which will highlight -any gaps or loading issues. - -### Data usage as monitoring - -Setting up monitoring is a good idea. However, in practice, often by the time you notice something is wrong through reviewing charts, someone in the business has likely already noticed something is wrong. That is, if there is usage of the data, then that usage will act as a sort of monitoring. - -### Plotting main metrics on line charts - -In cases where data is not being used much (e.g., only one marketing analyst is using some data alone), then it is a good idea to have them plot their main metrics on "last 7 days" line charts, so it's visible to them that something may be off when they check their metrics. - -It's important to think about granularity here. A daily line chart, for example, would not catch hourly issues well. Typically, you will want to match the granularity of the time dimension (day/hour/etc.) of the line chart with the things that could go wrong, either in the loading process or in the tracked process. - -If a dashboard is the main product of an analyst, they will generally watch it closely. Therefore, it's probably not necessary for a data engineer to include monitoring in their daily activities in these situations. - -## Tools to create dashboards - -[Metabase](https://www.metabase.com/), [Looker Studio](https://lookerstudio.google.com/u/0/), and [Streamlit](https://streamlit.io/) are some common tools that you might use to set up dashboards to explore data. It's worth noting that while many tools are suitable for exploration, different tools enable your organization to achieve different things. Some organizations use multiple tools for different scopes: - -- Tools like [Metabase](https://www.metabase.com/) are intended for data democratization, where the business user can change the dimension or granularity to answer follow-up questions. -- Tools like [Looker Studio](https://lookerstudio.google.com/u/0/) and [Tableau](https://www.tableau.com/) are intended for minimal interaction curated dashboards that business users can filter and read as-is with limited training. -- Tools like [Streamlit](https://streamlit.io/) enable powerful customizations and the building of complex apps by Python-first developers, but they generally do not support self-service out of the box. - diff --git a/docs/website/docs/general-usage/dataset-access/data-quality-dashboard.md b/docs/website/docs/general-usage/dataset-access/data-quality-dashboard.md new file mode 100644 index 0000000000..01e51993c6 --- /dev/null +++ b/docs/website/docs/general-usage/dataset-access/data-quality-dashboard.md @@ -0,0 +1,39 @@ +--- +title: Ensuring data quality +description: Monitoring and testing data quality +keywords: [destination, schema, data, monitoring, testing, quality] +--- + +# Data quality dashboards + +After deploying a `dlt` pipeline, you might ask yourself: How can we know if the data is and remains high quality? + +There are two ways to catch errors: + +1. Tests. +1. People [monitoring.](../../running-in-production/monitoring.md) + +## Tests + +The first time you load data from a pipeline you have built, you will likely want to test it. Plot the data on time series line charts and look for any interruptions or spikes, which will highlight any gaps or loading issues. + +### Data usage as monitoring + +Setting up monitoring is a good idea. However, in practice, often by the time you notice something is wrong through reviewing charts, someone in the business has likely already noticed something is wrong. That is, if there is usage of the data, then that usage will act as a sort of monitoring. + +### Plotting main metrics on line charts + +In cases where data is not being used much (e.g., only one marketing analyst is using some data alone), then it is a good idea to have them plot their main metrics on "last 7 days" line charts, so it's visible to them that something may be off when they check their metrics. + +It's important to think about granularity here. A daily line chart, for example, would not catch hourly issues well. Typically, you will want to match the granularity of the time dimension (day/hour/etc.) of the line chart with the things that could go wrong, either in the loading process or in the tracked process. + +If a dashboard is the main product of an analyst, they will generally watch it closely. Therefore, it's probably not necessary for a data engineer to include monitoring in their daily activities in these situations. + +## Tools to create dashboards + +[Metabase](https://www.metabase.com/), [Looker Studio](https://lookerstudio.google.com/u/0/), and [Streamlit](https://streamlit.io/) are some common tools that you might use to set up dashboards to explore data. It's worth noting that while many tools are suitable for exploration, different tools enable your organization to achieve different things. Some organizations use multiple tools for different scopes: + +- Tools like [Metabase](https://www.metabase.com/) are intended for data democratization, where the business user can change the dimension or granularity to answer follow-up questions. +- Tools like [Looker Studio](https://lookerstudio.google.com/u/0/) and [Tableau](https://www.tableau.com/) are intended for minimal interaction curated dashboards that business users can filter and read as-is with limited training. +- Tools like [Streamlit](https://streamlit.io/) enable powerful customizations and the building of complex apps by Python-first developers, but they generally do not support self-service out of the box. + diff --git a/docs/website/docs/general-usage/dataset-access/dataset.md b/docs/website/docs/general-usage/dataset-access/dataset.md new file mode 100644 index 0000000000..68635383c5 --- /dev/null +++ b/docs/website/docs/general-usage/dataset-access/dataset.md @@ -0,0 +1,240 @@ +--- +title: Accessing loaded data in Python +description: Conveniently accessing the data loaded to any destination in python +keywords: [destination, schema, data, access, retrieval] +--- + +# Accessing loaded data in Python + +This guide explains how to access and manipulate data that has been loaded into your destination using the `dlt` Python library. After running your pipelines and loading data, you can use the `ReadableDataset` and `ReadableRelation` classes to interact with your data programmatically. + +**Note:** The `ReadableDataset` and `ReadableRelation` objects are **lazy-loading**. They will only query and retrieve data when you perform an action that requires it, such as fetching data into a DataFrame or iterating over the data. This means that simply creating these objects does not load data into memory, making your code more efficient. + +## Quick start example + +Here's a full example of how to retrieve data from a pipeline and load it into a Pandas DataFrame or a PyArrow Table. + +```py +# Assuming you have a Pipeline object named 'pipeline' +# and you have loaded data to a table named 'items' in the destination + +# Step 1: Get the readable dataset from the pipeline +dataset = pipeline._dataset() + +# Step 2: Access a table as a ReadableRelation +items_relation = dataset.items # Or dataset["items"] + +# Step 3: Fetch the entire table as a Pandas DataFrame +df = items_relation.df() + +# Alternatively, fetch as a PyArrow Table +arrow_table = items_relation.arrow() +``` + +## Getting started + +Assuming you have a `Pipeline` object (let's call it `pipeline`), you can obtain a `ReadableDataset` and access your tables as `ReadableRelation` objects. + +### Access the `ReadableDataset` + +```py +# Get the readable dataset from the pipeline +dataset = pipeline._dataset() +``` + +### Access tables as `ReadableRelation` + +You can access tables in your dataset using either attribute access or item access. + +```py +# Using attribute access +items_relation = dataset.items + +# Using item access +items_relation = dataset["items"] +``` + +## Reading data + +Once you have a `ReadableRelation`, you can read data in various formats and sizes. + +### Fetch the entire table + +:::caution +Loading full tables into memory without limiting or iterating over them can consume a large amount of memory and may cause your program to crash if the table is too large. It's recommended to use chunked iteration or apply limits when dealing with large datasets. +::: + +#### As a Pandas DataFrame + +```py +df = items_relation.df() +``` + +#### As a PyArrow Table + +```py +arrow_table = items_relation.arrow() +``` + +#### As a list of Python tuples + +```py +items_list = items_relation.fetchall() +``` + +## Lazy loading behavior + +The `ReadableDataset` and `ReadableRelation` objects are **lazy-loading**. This means that they do not immediately fetch data when you create them. Data is only retrieved when you perform an action that requires it, such as calling `.df()`, `.arrow()`, or iterating over the data. This approach optimizes performance and reduces unnecessary data loading. + +## Iterating over data in chunks + +To handle large datasets efficiently, you can process data in smaller chunks. + +### Iterate as Pandas DataFrames + +```py +for df_chunk in items_relation.iter_df(chunk_size=500): + # Process each DataFrame chunk + pass +``` + +### Iterate as PyArrow Tables + +```py +for arrow_chunk in items_relation.iter_arrow(chunk_size=500): + # Process each PyArrow chunk + pass +``` + +### Iterate as lists of tuples + +```py +for items_chunk in items_relation.iter_fetch(chunk_size=500): + # Process each chunk of tuples + pass +``` + +The methods available on the ReadableRelation correspond to the methods available on the cursor returned by the SQL client. Please refer to the [SQL client](./sql-client.md#supported-methods-on-the-cursor) guide for more information. + +## Modifying queries + +You can refine your data retrieval by limiting the number of records, selecting specific columns, or chaining these operations. + +### Limit the number of records + +```py +# Get the first 50 items as a PyArrow table +arrow_table = items_relation.limit(50).arrow() +``` + +#### Using `head()` to get the first 5 records + +```py +df = items_relation.head().df() +``` + +### Select specific columns + +```py +# Select only 'col1' and 'col2' columns +items_list = items_relation.select("col1", "col2").fetchall() + +# Alternate notation with brackets +items_list = items_relation[["col1", "col2"]].fetchall() + +# Only get one column +items_list = items_relation["col1"].fetchall() + +``` + +### Chain operations + +You can combine `select`, `limit`, and other methods. + +```py +# Select columns and limit the number of records +arrow_table = items_relation.select("col1", "col2").limit(50).arrow() +``` + +## Supported destinations + +All SQL and filesystem destinations supported by `dlt` can utilize this data access interface. For filesystem destinations, `dlt` [uses **DuckDB** under the hood](./sql-client.md#the-filesystem-sql-client) to create views from Parquet or JSONL files dynamically. This allows you to query data stored in files using the same interface as you would with SQL databases. If you plan on accessing data in buckets or the filesystem a lot this way, it is advised to load data as Parquet instead of JSONL, as **DuckDB** is able to only load the parts of the data actually needed for the query to work. + +## Examples + +### Fetch one record as a tuple + +```py +record = items_relation.fetchone() +``` + +### Fetch many records as tuples + +```py +records = items_relation.fetchmany(chunk_size=10) +``` + +### Iterate over data with limit and column selection + +**Note:** When iterating over filesystem tables, the underlying DuckDB may give you a different chunk size depending on the size of the parquet files the table is based on. + +```py + +# Dataframes +for df_chunk in items_relation.select("col1", "col2").limit(100).iter_df(chunk_size=20): + ... + +# Arrow tables +for arrow_table in items_relation.select("col1", "col2").limit(100).iter_arrow(chunk_size=20): + ... + +# Python tuples +for records in items_relation.select("col1", "col2").limit(100).iter_fetch(chunk_size=20): + # Process each modified DataFrame chunk + ... +``` + +## Advanced usage + +### Using custom SQL queries to create `ReadableRelations` + +You can use custom SQL queries directly on the dataset to create a `ReadableRelation`: + +```py +# Join 'items' and 'other_items' tables +custom_relation = dataset("SELECT * FROM items JOIN other_items ON items.id = other_items.id") +arrow_table = custom_relation.arrow() +``` + +:::note +When using custom SQL queries with `dataset()`, methods like `limit` and `select` won't work. Include any filtering or column selection directly in your SQL query. +::: + + +### Loading a `ReadableRelation` into a pipeline table + +Since the `iter_arrow` and `iter_df` methods are generators that iterate over the full `ReadableRelation` in chunks, you can use them as a resource for another (or even the same) `dlt` pipeline: + +```py +# Create a readable relation with a limit of 1m rows +limited_items_relation = dataset.items.limit(1_000_000) + +# Create a new pipeline +other_pipeline = dlt.pipeline(pipeline_name="other_pipeline", destination="duckdb") + +# We can now load these 1m rows into this pipeline in 10k chunks +other_pipeline.run(limited_items_relation.iter_arrow(chunk_size=10_000), table_name="limited_items") +``` + +### Using `ibis` to query the data + +Visit the [Native Ibis integration](./ibis-backend.md) guide to learn more. + +## Important considerations + +- **Memory usage:** Loading full tables into memory without iterating or limiting can consume significant memory, potentially leading to crashes if the dataset is large. Always consider using limits or chunked iteration. + +- **Lazy evaluation:** `ReadableDataset` and `ReadableRelation` objects delay data retrieval until necessary. This design improves performance and resource utilization. + +- **Custom SQL queries:** When executing custom SQL queries, remember that additional methods like `limit()` or `select()` won't modify the query. Include all necessary clauses directly in your SQL statement. + diff --git a/docs/website/docs/general-usage/dataset-access/ibis-backend.md b/docs/website/docs/general-usage/dataset-access/ibis-backend.md new file mode 100644 index 0000000000..8f4b0fb6b6 --- /dev/null +++ b/docs/website/docs/general-usage/dataset-access/ibis-backend.md @@ -0,0 +1,49 @@ +--- +title: Native Ibis integration +description: Accessing your data with native Ibis backends +keywords: [data, dataset, ibis] +--- + +# Ibis + +Ibis is a powerful portable Python dataframe library. Learn more about what it is and how to use it in the [official documentation](https://ibis-project.org/). + +`dlt` provides an easy way to hand over your loaded dataset to an Ibis backend connection. + +:::tip +Not all destinations supported by `dlt` have an equivalent Ibis backend. Natively supported destinations include DuckDB (including Motherduck), Postgres, Redshift, Snowflake, Clickhouse, MSSQL (including Synapse), and BigQuery. The filesystem destination is supported via the [Filesystem SQL client](./sql-client#the-filesystem-sql-client); please install the DuckDB backend for Ibis to use it. Mutating data with Ibis on the filesystem will not result in any actual changes to the persisted files. +::: + +## Prerequisites + +To use the Ibis backend, you will need to have the `ibis-framework` package with the correct Ibis extra installed. The following example will install the DuckDB backend: + +```sh +pip install ibis-framework[duckdb] +``` + +## Get an Ibis connection from your dataset + +`dlt` datasets have a helper method to return an Ibis connection to the destination they live on. The returned object is a native Ibis connection to the destination, which you can use to read and even transform data. Please consult the [Ibis documentation](https://ibis-project.org/docs/backends/) to learn more about what you can do with Ibis. + +```py +# get the dataset from the pipeline +dataset = pipeline._dataset() +dataset_name = pipeline.dataset_name + +# get the native ibis connection from the dataset +ibis_connection = dataset.ibis() + +# list all tables in the dataset +# NOTE: You need to provide the dataset name to ibis, in ibis datasets are named databases +print(ibis_connection.list_tables(database=dataset_name)) + +# get the items table +table = ibis_connection.table("items", database=dataset_name) + +# print the first 10 rows +print(table.limit(10).execute()) + +# Visit the ibis docs to learn more about the available methods +``` + diff --git a/docs/website/docs/general-usage/dataset-access/index.md b/docs/website/docs/general-usage/dataset-access/index.md new file mode 100644 index 0000000000..d51ca09804 --- /dev/null +++ b/docs/website/docs/general-usage/dataset-access/index.md @@ -0,0 +1,19 @@ +--- +title: Accessing loaded data +description: How to access your loaded datasets +keywords: [datasets, data, access] +--- +import DocCardList from '@theme/DocCardList'; + +# Accessing loaded data + +After one or more successful runs of your pipeline, you can inspect or access the loaded data in various ways: + +* We have a simple [`streamlit` app](./streamlit.md) that you can use to view your data locally in your webapp. +* We have a [Python interface](./dataset.md) that allows you to access your data in Python as Python tuples, `arrow` tables, or `pandas` dataframes with a simple dataset object or an SQL interface. You can even run SQL commands on the filesystem destination via `DuckDB` or forward data from any table into another pipeline. +* We have an [`ibis` interface](./ibis-backend.md) that allows you to hand over your loaded data to the powerful [ibis-framework](https://ibis-project.org/) library. +* Lastly, we have some advice for [monitoring and ensuring the quality of your data](./data-quality-dashboard.md). + +# Learn more + + diff --git a/docs/website/docs/general-usage/dataset-access/sql-client.md b/docs/website/docs/general-usage/dataset-access/sql-client.md new file mode 100644 index 0000000000..bcd23cfd32 --- /dev/null +++ b/docs/website/docs/general-usage/dataset-access/sql-client.md @@ -0,0 +1,84 @@ +--- +title: The SQL Client +description: Technical details about the destination sql client +keywords: [data, dataset, sql] +--- + +# The SQL client + +:::note +This page contains technical details about the implementation of the SQL client as well as information on how to use low-level APIs. If you simply want to query your data, it's advised to read the pages in this section on accessing data via `dlt` datasets, Streamlit, or Ibis. +::: + +Most `dlt` destinations use an implementation of the `SqlClientBase` class to connect to the physical destination to which your data is loaded. DDL statements, data insert or update commands, as well as SQL merge and replace queries, are executed via a connection on this client. It also is used for reading data for the [Streamlit app](./streamlit.md) and [data access via `dlt` datasets](./dataset.md). + +All SQL destinations make use of an SQL client; additionally, the filesystem has a special implementation of the SQL client which you can read about [below](#the-filesystem-sql-client). + +## Executing a query on the SQL client + +You can access the SQL client of your destination via the `sql_client` method on your pipeline. The code below shows how to use the SQL client to execute a query. + +```py +pipeline = dlt.pipeline(destination="bigquery", dataset_name="crm") +with pipeline.sql_client() as client: + with client.execute_query( + "SELECT id, name, email FROM customers WHERE id = %s", + 10 + ) as cursor: + # get all data from the cursor as a list of tuples + print(cursor.fetchall()) +``` + +## Retrieving the data in different formats + +The cursor returned by `execute_query` has several methods for retrieving the data. The supported formats are Python tuples, Pandas DataFrame, and Arrow table. + +The code below shows how to retrieve the data as a Pandas DataFrame and then manipulate it in memory: + +```py +pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb") +with pipeline.sql_client() as client: + with client.execute_query( + 'SELECT "reactions__+1", "reactions__-1", reactions__laugh, reactions__hooray, reactions__rocket FROM issues' + ) as cursor: + # calling `df` on a cursor, returns the data as a pandas DataFrame + reactions = cursor.df() +counts = reactions.sum(0).sort_values(0, ascending=False) +``` + +## Supported methods on the cursor + +- `fetchall()`: returns all rows as a list of tuples; +- `fetchone()`: returns a single row as a tuple; +- `fetchmany(size=None)`: returns a number of rows as a list of tuples; if no size is provided, all rows are returned; +- `df(chunk_size=None, **kwargs)`: returns the data as a Pandas DataFrame; if `chunk_size` is provided, the data is retrieved in chunks of the given size; +- `arrow(chunk_size=None, **kwargs)`: returns the data as an Arrow table; if `chunk_size` is provided, the data is retrieved in chunks of the given size; +- `iter_fetch(chunk_size: int)`: iterates over the data in chunks of the given size as lists of tuples; +- `iter_df(chunk_size: int)`: iterates over the data in chunks of the given size as Pandas DataFrames; +- `iter_arrow(chunk_size: int)`: iterates over the data in chunks of the given size as Arrow tables. + +:::info +Which retrieval method you should use very much depends on your use case and the destination you are using. Some drivers for our destinations provided by their vendors natively support Arrow or Pandas DataFrames; in these cases, we will use that interface. If they do not, `dlt` will convert lists of tuples into these formats. +::: + +## The filesystem SQL client + +The filesystem destination implements a special but extremely useful version of the SQL client. While during a normal pipeline run, the filesystem does not make use of an SQL client but rather copies the files resulting from a load into the folder or bucket you have specified, it is possible to query this data using SQL via this client. For this to work, `dlt` uses an in-memory `DuckDB` database instance and makes your filesystem tables available as views on this database. For the most part, you can use the filesystem SQL client just like any other SQL client. `dlt` uses sqlglot to discover which tables you are trying to access and, as mentioned above, `DuckDB` to make them queryable. + +The code below shows how to use the filesystem SQL client to query the data: + +```py +pipeline = dlt.pipeline(destination="filesystem", dataset_name="my_dataset") +with pipeline.sql_client() as client: + with client.execute_query("SELECT * FROM my_table") as cursor: + print(cursor.fetchall()) +``` + +A few things to know or keep in mind when using the filesystem SQL client: + +- The SQL database you are actually querying is an in-memory database, so if you do any kind of mutating queries, these will not be persisted to your folder or bucket. +- You must have loaded your data as `JSONL` or `Parquet` files for this SQL client to work. For optimal performance, you should use `Parquet` files, as `DuckDB` is able to only read the bytes needed to execute your query from a folder or bucket in this case. +- Keep in mind that if you do any filtering, sorting, or full table loading with the SQL client, the in-memory `DuckDB` instance will have to download and query a lot of data from your bucket or folder if you have a large table. +- If you are accessing data on a bucket, `dlt` will temporarily store your credentials in `DuckDB` to let it connect to the bucket. +- Some combinations of buckets and table formats may not be fully supported at this time. + diff --git a/docs/website/docs/general-usage/dataset-access/streamlit.md b/docs/website/docs/general-usage/dataset-access/streamlit.md new file mode 100644 index 0000000000..2d76aac660 --- /dev/null +++ b/docs/website/docs/general-usage/dataset-access/streamlit.md @@ -0,0 +1,57 @@ +--- +title: Viewing your data with Streamlit +description: Viewing your data with streamlit +keywords: [data, dataset, streamlit] +--- + +# Viewing your data with Streamlit + +Once you have run a pipeline locally, you can launch a web app that displays the loaded data. For this to work, you will need to have the `streamlit` package installed. + +:::tip +The Streamlit app does not work with all destinations supported by `dlt`. Only destinations that provide a SQL client will work. The filesystem destination has support via the [Filesystem SQL client](./sql-client#the-filesystem-sql-client) and will work in most cases. Vector databases generally are unsupported. +::: + +## Prerequisites + +To install Streamlit, run the following command: + +```sh +pip install streamlit +``` + +## Launching the Streamlit app + +You can use the `show` [CLI command](../../reference/command-line-interface.md#show-tables-and-data-in-the-destination) +with your pipeline name: + +```sh +dlt pipeline {pipeline_name} show +``` + +Use the pipeline name you defined in your Python code with the `pipeline_name` argument. If you are unsure, you can use the `dlt pipeline --list` command to list all pipelines. + +## Credentials + +`dlt` will look for `secrets.toml` and `config.toml` in the `.dlt` folder. + +If `secrets.toml` are not found, it will use +`secrets.toml` from `.streamlit` folder. + +If you run locally, maintain your usual `.dlt` folder. + +When running on streamlit cloud, paste the content of `dlt` +`secrets.toml` into the `streamlit` secrets. + +## Inspecting your data + +You can now inspect the schema and your data. Use the left sidebar to switch between: + +* Exploring your data (default); +* Information about your loads. + + +## Further reading + +If you are running `dlt` in Python interactively or in a notebook, read the [Accessing loaded data in Python](./dataset.md) guide. + diff --git a/docs/website/docs/general-usage/destination-tables.md b/docs/website/docs/general-usage/destination-tables.md index a49ed8578e..bc42618b77 100644 --- a/docs/website/docs/general-usage/destination-tables.md +++ b/docs/website/docs/general-usage/destination-tables.md @@ -35,7 +35,7 @@ will behave similarly and have similar concepts. ::: -Running this pipeline will create a database schema in the destination database (DuckDB) along with a table named `users`. Quick tip: you can use the `show` command of the `dlt pipeline` CLI [to see the tables](../dlt-ecosystem/visualizations/exploring-the-data.md#exploring-the-data) in the destination database. +Running this pipeline will create a database schema in the destination database (DuckDB) along with a table named `users`. Quick tip: you can use the `show` command of the `dlt pipeline` CLI [to see the tables](../general-usage/dataset-access/streamlit) in the destination database. ## Database schema @@ -190,7 +190,7 @@ The `_dlt_loads` table will look like this: The `_dlt_loads` table tracks complete loads and allows chaining transformations on top of them. Many destinations do not support distributed and long-running transactions (e.g., Amazon Redshift). In that case, the user may see the partially loaded data. It is possible to filter such data out: any row with a `load_id` that does not exist in `_dlt_loads` is not yet completed. The same procedure may be used to identify and delete data for packages that never got completed. -For each load, you can test and [alert](../running-in-production/alerting.md) on anomalies (e.g., no data, too much loaded to a table). There are also some useful load stats in the `Load info` tab of the [Streamlit app](../dlt-ecosystem/visualizations/exploring-the-data.md#exploring-the-data) mentioned above. +For each load, you can test and [alert](../running-in-production/alerting.md) on anomalies (e.g., no data, too much loaded to a table). There are also some useful load stats in the `Load info` tab of the [Streamlit app](../general-usage/dataset-access/streamlit) mentioned above. You can add [transformations](../dlt-ecosystem/transformations/) and chain them together using the `status` column. You start the transformation for all the data with a particular `load_id` with a status of 0 and then update it to 1. The next transformation starts with the status of 1 and is then updated to 2. This can be repeated for every additional transformation. diff --git a/docs/website/docs/intro.md b/docs/website/docs/intro.md index 76e3a34736..b20d41c494 100644 --- a/docs/website/docs/intro.md +++ b/docs/website/docs/intro.md @@ -56,7 +56,7 @@ source = rest_api_source({ "token": dlt.secrets["your_api_token"], }, "paginator": { - "type": "json_response", + "type": "json_link", "next_url_path": "paging.next", }, }, diff --git a/docs/website/docs/walkthroughs/adjust-a-schema.md b/docs/website/docs/walkthroughs/adjust-a-schema.md index d76bdad229..ce547c5b09 100644 --- a/docs/website/docs/walkthroughs/adjust-a-schema.md +++ b/docs/website/docs/walkthroughs/adjust-a-schema.md @@ -113,7 +113,7 @@ players_games: ``` Run the pipeline script again and make sure that the change is visible in the export schema. Then, -[launch the Streamlit app](../dlt-ecosystem/visualizations/exploring-the-data.md) to see the changed data. +[launch the Streamlit app](../general-usage/dataset-access/streamlit) to see the changed data. :::note Do not rename the tables or columns in the YAML file. `dlt` infers those from the data, so the schema will be recreated. diff --git a/docs/website/docs/walkthroughs/run-a-pipeline.md b/docs/website/docs/walkthroughs/run-a-pipeline.md index 49abe8675f..3c0e30ccf3 100644 --- a/docs/website/docs/walkthroughs/run-a-pipeline.md +++ b/docs/website/docs/walkthroughs/run-a-pipeline.md @@ -140,7 +140,24 @@ destination, etc. Please refer to [Running in production](../running-in-production/running.md#inspect-and-save-the-load-info-and-trace) for more details. -## 5. Detect and handle problems +## Run dlt in Notebooks + +### Colab +You'll need to install `dlt` like any other dependency: +```sh +!pip install dlt +``` + +You can configure secrets using **Secrets** sidebar. Just create a variable with the name `secrets.toml` and paste +the content of the **toml** file from your `.dlt` folder into it. We support `config.toml` variable as well. + +:::note +`dlt` will not reload the secrets automatically. Please restart your interpreter in Colab options when you add/change +content of the variables above. +::: + + +## Troubleshooting What happens if something goes wrong? In most cases, the `dlt` `run` command raises exceptions. We put a lot of effort into making the exception messages easy to understand. Reading them is the first step diff --git a/docs/website/netlify.toml b/docs/website/netlify.toml index 51cc4ee21f..4cf2a06234 100644 --- a/docs/website/netlify.toml +++ b/docs/website/netlify.toml @@ -42,3 +42,7 @@ to = "/docs/reference/telemetry" [[redirects]] from = "/docs/walkthroughs" to = "/docs/intro" + +[[redirects]] +from = "/docs/visualizations" +to = "/docs/general-usage/dataset-access" \ No newline at end of file diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index 1edee20c81..274f3e82b3 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -193,7 +193,21 @@ const sidebars = { items: [ 'walkthroughs/create-a-pipeline', 'walkthroughs/run-a-pipeline', - 'dlt-ecosystem/visualizations/exploring-the-data', + { + type: 'category', + label: 'Accessing loaded data', + link: { + type: 'doc', + id: 'general-usage/dataset-access/index', + }, + items: [ + 'general-usage/dataset-access/streamlit', + 'general-usage/dataset-access/dataset', + 'general-usage/dataset-access/ibis-backend', + 'general-usage/dataset-access/sql-client', + 'general-usage/dataset-access/data-quality-dashboard', + ] + }, { type: 'category', label: 'Transform the data', @@ -257,7 +271,6 @@ const sidebars = { 'general-usage/full-loading', ] }, - 'dlt-ecosystem/notebooks' ] }, { diff --git a/tests/load/lancedb/test_model_providers.py b/tests/load/lancedb/test_model_providers.py new file mode 100644 index 0000000000..7ad5464fe5 --- /dev/null +++ b/tests/load/lancedb/test_model_providers.py @@ -0,0 +1,44 @@ +""" +Test intricacies and configuration related to each provider. +""" + +import os +from typing import Iterator, Any, Generator + +import pytest +from lancedb import DBConnection # type: ignore +from lancedb.embeddings import EmbeddingFunctionRegistry # type: ignore +from lancedb.table import Table # type: ignore + +import dlt +from dlt.common.configuration import resolve_configuration +from dlt.common.typing import DictStrStr +from dlt.common.utils import uniq_id +from dlt.destinations.impl.lancedb import lancedb_adapter +from dlt.destinations.impl.lancedb.configuration import LanceDBClientConfiguration +from dlt.destinations.impl.lancedb.lancedb_client import LanceDBClient +from tests.load.utils import drop_active_pipeline_data, sequence_generator +from tests.pipeline.utils import assert_load_info + +# Mark all tests as essential, don't remove. +pytestmark = pytest.mark.essential + + +@pytest.fixture(autouse=True) +def drop_lancedb_data() -> Iterator[Any]: + yield + drop_active_pipeline_data() + + +def test_lancedb_ollama_endpoint_configuration() -> None: + os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "ollama" + os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "nomic-embed-text" + os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER_HOST"] = "http://198.163.194.3:24233" + + config = resolve_configuration( + LanceDBClientConfiguration()._bind_dataset_name(dataset_name="dataset"), + sections=("destination", "lancedb"), + ) + assert config.embedding_model_provider == "ollama" + assert config.embedding_model == "nomic-embed-text" + assert config.embedding_model_provider_host == "http://198.163.194.3:24233" diff --git a/tests/sources/helpers/rest_client/test_client.py b/tests/sources/helpers/rest_client/test_client.py index 488d7ef525..36fe009b93 100644 --- a/tests/sources/helpers/rest_client/test_client.py +++ b/tests/sources/helpers/rest_client/test_client.py @@ -7,6 +7,7 @@ from requests import PreparedRequest, Request, Response from requests.auth import AuthBase from requests.exceptions import HTTPError +import requests_mock from dlt.common import logger from dlt.common.typing import TSecretStrValue @@ -512,3 +513,24 @@ def test_request_kwargs(self, mocker) -> None: "timeout": 432, "allow_redirects": False, } + + @requests_mock.Mocker(kw="mock") + def test_overwrite_path(self, mocker, **kwargs) -> None: + expected = {"foo": "bar"} + kwargs["mock"].get("https://completely.different/endpoint", json=expected) + rest_client = RESTClient( + base_url="https://api.example.com", + ) + response = rest_client.get("https://completely.different/endpoint") + assert response.json() == expected + + @requests_mock.Mocker(kw="mock") + def test_overwrite_path_ignores_different_protocol(self, mocker, **kwargs) -> None: + expected = {"foo": "bar"} + base_url = "https://api.example.com" + kwargs["mock"].get(f"{base_url}/my://protocol", json=expected) + rest_client = RESTClient( + base_url=base_url, + ) + response = rest_client.get("my://protocol") + assert response.json() == expected diff --git a/tests/sources/rest_api/configurations/source_configs.py b/tests/sources/rest_api/configurations/source_configs.py index 705a42637c..ff58fee0fb 100644 --- a/tests/sources/rest_api/configurations/source_configs.py +++ b/tests/sources/rest_api/configurations/source_configs.py @@ -395,6 +395,18 @@ def repositories(): repositories(), ], }, + { + "client": {"base_url": "https://github.com/api/v2"}, + "resources": [ + { + "name": "issues", + "endpoint": { + "path": "user/repos", + "auth": HttpBasicAuth("", "BASIC_AUTH_TOKEN"), + }, + } + ], + }, ] diff --git a/tests/sources/rest_api/integration/test_response_actions.py b/tests/sources/rest_api/integration/test_response_actions.py index 1ec8058a86..e4fcc32f8f 100644 --- a/tests/sources/rest_api/integration/test_response_actions.py +++ b/tests/sources/rest_api/integration/test_response_actions.py @@ -1,3 +1,4 @@ +import base64 import pytest from dlt.common import json from dlt.sources.helpers.requests import Response @@ -316,3 +317,43 @@ def add_field(response: Response, *args, **kwargs) -> Response: mock_response_hook_2.assert_called_once() assert all(record["custom_field"] == "foobar" for record in data) + + +def test_auth_overwrites_for_specific_endpoints(mock_api_server, mocker): + def custom_hook(response: Response, *args, **kwargs) -> Response: + assert ( + response.request.headers["Authorization"] + == f"Basic {base64.b64encode(b'U:P').decode('ascii')}" + ) + return response + + mock_response_hook = mocker.Mock(side_effect=custom_hook) + mock_source = rest_api_source( + { + "client": { + "base_url": "https://api.example.com", + "auth": { + "type": "bearer", + "token": "T", + }, + }, + "resources": [ + { + "name": "posts", + "endpoint": { + "auth": { + "type": "http_basic", + "username": "U", + "password": "P", + }, + "response_actions": [ + mock_response_hook, + ], + }, + }, + ], + } + ) + + list(mock_source.with_resources("posts").add_limit(1)) + mock_response_hook.assert_called_once()