Skip to content

Commit

Permalink
Merge branch 'devel' into feat/1484-integrate-rest-api-generic-source…
Browse files Browse the repository at this point in the history
…-into-dlt-core
  • Loading branch information
rudolfix committed Sep 8, 2024
2 parents a0d90ac + 68b0bb8 commit ea1ce2c
Show file tree
Hide file tree
Showing 20 changed files with 547 additions and 210 deletions.
3 changes: 2 additions & 1 deletion dlt/common/libs/sql_alchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
raise MissingDependencyException(
"dlt sql_database helpers ",
[f"{version.DLT_PKG_NAME}[sql_database]"],
"Install the sql_database helpers for loading from sql_database sources. Note that you may need to install additional SQLAlchemy dialects for your source database.",
"Install the sql_database helpers for loading from sql_database sources. Note that you may"
" need to install additional SQLAlchemy dialects for your source database.",
)

# TODO: maybe use sa.__version__?
Expand Down
11 changes: 11 additions & 0 deletions dlt/common/storages/fsspec_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ class FileItem(TypedDict, total=False):
CREDENTIALS_DISPATCH["abfss"] = CREDENTIALS_DISPATCH["az"]
CREDENTIALS_DISPATCH["gcs"] = CREDENTIALS_DISPATCH["gs"]

# Default kwargs for protocol
DEFAULT_KWARGS = {
# disable concurrent
"az": {"max_concurrency": 1}
}
DEFAULT_KWARGS["adl"] = DEFAULT_KWARGS["az"]
DEFAULT_KWARGS["abfs"] = DEFAULT_KWARGS["az"]
DEFAULT_KWARGS["azure"] = DEFAULT_KWARGS["az"]
DEFAULT_KWARGS["abfss"] = DEFAULT_KWARGS["az"]


def fsspec_filesystem(
protocol: str,
Expand Down Expand Up @@ -125,6 +135,7 @@ def prepare_fsspec_args(config: FilesystemConfiguration) -> DictStrAny:

register_implementation("gdrive", GoogleDriveFileSystem, "GoogleDriveFileSystem")

fs_kwargs.update(DEFAULT_KWARGS.get(protocol, {}))
if config.kwargs is not None:
fs_kwargs.update(config.kwargs)
if config.client_kwargs is not None:
Expand Down
11 changes: 6 additions & 5 deletions dlt/extract/incremental/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,12 @@ def __call__(
tbl = pyarrow.remove_columns(tbl, ["_dlt_index"])

if self.on_cursor_value_missing == "include":
if isinstance(tbl, pa.RecordBatch):
assert isinstance(tbl_with_null, pa.RecordBatch)
tbl = pa.Table.from_batches([tbl, tbl_with_null])
else:
tbl = pa.concat_tables([tbl, tbl_with_null])
if tbl.schema.field(cursor_path).nullable:
if isinstance(tbl, pa.RecordBatch):
assert isinstance(tbl_with_null, pa.RecordBatch)
tbl = pa.Table.from_batches([tbl, tbl_with_null])
else:
tbl = pa.concat_tables([tbl, tbl_with_null])

if len(tbl) == 0:
return None, start_out_of_range, end_out_of_range
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
---
title: Custom Destination with LanceDB
title: Custom destination with LanceDB
description: Learn how use the custom destination to load to LanceDB.
keywords: [destination, credentials, example, lancedb, custom destination, vectorstore, AI, LLM]
---
Expand Down
6 changes: 6 additions & 0 deletions docs/website/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ jaffle_shop
npm-debug.log*
yarn-debug.log*
yarn-error.log*

# ignore all versions, there are generated dynamically
versions.json
versioned_docs
versioned_sidebars
.dlt-repo
27 changes: 26 additions & 1 deletion docs/website/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,29 @@ The site is deployed using `netlify`. The `netlify` build command is:
npm run build:netlify
```

It will place the build in `build/docs` folder. The `netlify.toml` redirects from root path `/` into `/docs`.
It will place the build in `build/docs` folder. The `netlify.toml` redirects from root path `/` into `/docs`.

## Docs versions

We keep a few additional versions of our docs for the users to be able read about how former and future versions of dlt work. We use docusaurus versions for this but we do not check the historical versions into the repo but rather use a script to build the former versions on deployment. To locally build the versions run:

```
npm run update-versions
```

This will execute the script at tools/update_versions.js. This tool will do the following:

* Find all the highest minor versions the tags of the repo (e.g. 0.4.13, 0.5.22, 1.1.3)
* It will create a version for all of these tags that are larger than the minimum version defined in MINIMUM_SEMVER_VERSION in the script.
* It will NOT create a version for the highest version, we assume the most up to date docs for the highest versions are the tip of master
* It will NOT create any docs versions for pre-releases.
* It will create a future version called "devel" from the current commit of this repo.
* It will set up docusaurus to display all of these versions correctly.

You can clear these versions with

```
npm run clear-versions
```

The netflify deployment of these docs need to happen from the master branch so that the current version gets properly selected.
9 changes: 9 additions & 0 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ azure_client_secret = "client_secret"
azure_tenant_id = "tenant_id" # please set me up!
```

:::caution
**Concurrent blob uploads**
`dlt` limits the number of concurrent connections for a single uploaded blob to 1. By default `adlfs` that we use, splits blobs into 4 MB chunks and uploads them concurrently which leads to gigabytes of used memory and thousands of connections for a larger load packages. You can increase the maximum concurrency as follows:
```toml
[destination.filesystem.kwargs]
max_concurrency=3
```
:::

### Local file system
If for any reason you want to have those files in a local folder, set up the `bucket_url` as follows (you are free to use `config.toml` for that as there are no secrets required)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ certain range.
```py
source = sql_database().with_resources("family")
#using the "updated" field as an incremental field using initial value of January 1, 2022, at midnight
source.family.apply_hints(incremental=dlt.sources.incremental("updated"),initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0))
source.family.apply_hints(incremental=dlt.sources.incremental("updated", initial_value=pendulum.DateTime(2022, 1, 1, 0, 0, 0)))
#running the pipeline
info = pipeline.run(source, write_disposition="merge")
print(info)
Expand Down
13 changes: 7 additions & 6 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ dataset with the merge write disposition.
## Merge incremental loading

The `merge` write disposition can be used with three different strategies:
1) `delete-insert` (default strategy)
2) `scd2`
3) `upsert`

1. `delete-insert` (default strategy)
2. `scd2`
3. `upsert`

### `delete-insert` strategy

Expand Down Expand Up @@ -1064,7 +1065,7 @@ def tweets():
data = get_data(start_from=last_val)
yield data
# change the state to the new value
dlt.current.state()["last_updated"] = data["last_timestamp"]
dlt.current.resource_state()["last_updated"] = data["last_timestamp"]
```

If we keep a list or a dictionary in the state, we can modify the underlying values in the objects,
Expand Down Expand Up @@ -1140,7 +1141,7 @@ def search_tweets(twitter_bearer_token=dlt.secrets.value, search_terms=None, sta
headers = _headers(twitter_bearer_token)
for search_term in search_terms:
# make cache for each term
last_value_cache = dlt.current.state().setdefault(f"last_value_{search_term}", None)
last_value_cache = dlt.current.resource_state().setdefault(f"last_value_{search_term}", None)
print(f'last_value_cache: {last_value_cache}')
params = {...}
url = "https://api.twitter.com/2/tweets/search/recent"
Expand All @@ -1149,7 +1150,7 @@ def search_tweets(twitter_bearer_token=dlt.secrets.value, search_terms=None, sta
page['search_term'] = search_term
last_id = page.get('meta', {}).get('newest_id', 0)
#set it back - not needed if we
dlt.current.state()[f"last_value_{search_term}"] = max(last_value_cache or 0, int(last_id))
dlt.current.resource_state()[f"last_value_{search_term}"] = max(last_value_cache or 0, int(last_id))
# print the value for each search term
print(f'new_last_value_cache for term {search_term}: {last_value_cache}')

Expand Down
1 change: 1 addition & 0 deletions docs/website/docs/reference/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ As before, **if you have just a single table with millions of records you should

<!--@@@DLT_SNIPPET ./performance_snippets/toml-snippets.toml::normalize_workers_2_toml-->

Since the normalize stage uses a process pool to create load package concurrently, adjusting the `file_max_items` and `file_max_bytes` settings can significantly impact load behavior. By setting a lower value for `file_max_items`, you reduce the size of each data chunk sent to the destination database, which can be particularly useful for managing memory constraints on the database server. Without explicit configuration `file_max_items`, `dlt` writes all data rows into one large intermediary file, attempting to insert all data from this single file. Configuring `file_max_items` ensures data is inserted in manageable chunks, enhancing performance and preventing potential memory issues.

### Parallel pipeline config example
The example below simulates loading of a large database table with 1 000 000 records. The **config.toml** below sets the parallelization as follows:
Expand Down
Loading

0 comments on commit ea1ce2c

Please sign in to comment.