Skip to content

Commit

Permalink
Fix snippets
Browse files Browse the repository at this point in the history
  • Loading branch information
VioletM committed Sep 11, 2024
1 parent 8336d7d commit ded43a6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ print(load_info)
You can get an fsspec client from the filesystem resource after it was extracted, i.e., in order to delete processed files, etc. The filesystem module contains a convenient method `fsspec_from_resource` that can be used as follows:

```py
from dlt.sources.filesystem import filesystem, fsspec_from_resource, read_csv
from dlt.sources.filesystem import filesystem, read_csv
from dlt.sources.filesystem.helpers import fsspec_from_resource

# get filesystem source
gs_resource = filesystem("gs://ci-test-bucket/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ from dlt.sources.filesystem import filesystem, read_csv
all_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv")

# But filter out only updated records
filesystem_pipe = (all_files | read_csv()).apply_hints(incremental=dlt.sources.incremental("updated_at"))
filesystem_pipe = (all_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
Expand All @@ -397,7 +398,8 @@ new_files = filesystem(bucket_url="s3://bucket_name", file_glob="directory/*.csv
new_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))

# And in each modified file we filter out only updated records
filesystem_pipe = (new_files | read_csv()).apply_hints(incremental=dlt.sources.incremental("updated_at"))
filesystem_pipe = (new_files | read_csv())
filesystem_pipe.apply_hints(incremental=dlt.sources.incremental("updated_at"))
pipeline = dlt.pipeline(pipeline_name="my_pipeline", destination="duckdb")
load_info = pipeline.run(filesystem_pipe)
print(load_info)
Expand Down

0 comments on commit ded43a6

Please sign in to comment.