Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New "refresh" mode and "dev_mode" #1063

Merged
merged 51 commits into from
Jun 3, 2024
Merged

Conversation

steinitzu
Copy link
Collaborator

Description

Still needs some cleanup and testing.

  • full_refresh arg replaced with dev_mode
  • Deprecation warning when full_refresh=True
  • Add refresh: "full" | "replace" arg to pipeline
  • full drops all tables/resource states/source state keys
  • replace drops tables/state from the resources selected to extract

The new refresh mode introduces a new file to the load package: dropped_tables.json, this gets created in extract
That was the simplest way I could think of to drop tables in appropriate stages (local schema and state is updated in extract and destination tables get dropped in load step).
Otherwise we'd have to start poking at destination storage before extract which feels wrong.

Refresh works as follows:

  1. Before extract -> uses drop command to wipe all/selected resource state and tables (local state only, destination tables are not deleted and normalize/load doesn't run as part of drop)
  2. Add dropped_tables.json to load package with the tables found by drop command
  3. In load stage -> read dropped_tables.json and drop the tables from destination schema (non-sql destination fall back to truncate on dataset init)

Related Issues

Copy link

netlify bot commented Mar 7, 2024

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 879e4e5
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/6657a4adfac32b0008eaee57

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two biggest issues:

  1. drop command must join extract transaction, should not persist anything
  2. IMO you can merge init_client and _refresh and have way less code. we may need a drop_schema.json which contains tables that should be dropped

dlt/cli/deploy_command_helpers.py Show resolved Hide resolved
@@ -47,6 +48,9 @@
from dlt.common.utils import RowCounts, merge_row_counts


TRefreshMode = Literal["full", "replace"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need better mode names. They must say which kind of data is destroyed. The above does not say that too well. Also my proposed naming was not explicit. This is what I propose

drop_dataset - drops the whole storage and completely wipes pipeline state, schemas and local storage
'full' the same as dataset
drop_tables - drops the tables belonging to all selected resources in all loaded sources, resets the corresponding resource schemas and states.
'drop_data' - truncates all tables belonging to all selected resources in all loaded sources. keeps the schemas but resets the resource state, source level state stays intact

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. To be clear by drop_dataset, you don't mean DROP SCHEMA ..., but just all tables in the dlt schema, right? I.e. we don't want to wipe anything unrelated to the pipeline.
  2. What's the difference between drop_data and write_disposition replace? Is it just case of getting the same behaviour without changing write_disposition in schema?

dlt/common/storages/load_package.py Outdated Show resolved Hide resolved
dlt/pipeline/helpers.py Show resolved Hide resolved
@@ -202,6 +237,18 @@ def _drop_state_keys(self) -> None:
with self.pipeline.managed_state(extract_state=True) as state: # type: ignore[assignment]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this must happen outside of this code - in CLI command, also saving of the schema

dlt/pipeline/helpers.py Outdated Show resolved Hide resolved
dlt/common/storages/load_package.py Outdated Show resolved Hide resolved
dlt/load/load.py Outdated Show resolved Hide resolved
dlt/load/load.py Outdated Show resolved Hide resolved
dlt/pipeline/configuration.py Show resolved Hide resolved
@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch from 4270770 to a52b45f Compare April 6, 2024 11:48
@rudolfix
Copy link
Collaborator

rudolfix commented Apr 8, 2024

@steinitzu this looks pretty cool now. in the meantime we refactored extract code so please merge the current changes. sorry for that!

@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch from f1df8de to fee5b1e Compare April 9, 2024 06:02
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu I think we can still simplify and restructure the code. see my comments

state_only=self.refresh == "drop_data",
state_paths="*" if self.refresh == "drop_dataset" else [],
)
_state.update(new_state)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should replace the _state with a new_state? you just update it here. hmmm it may work because drop command just changes sources in the state so they will be overwritten

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was updating it so it would mutate the state instance in context, wasn't sure how to replace the state dict completely or if there's a better to extract new state?

hmmm it may work because drop command just changes sources in the state so they will be overwritten
Yeah exactly cause of that it works

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just replace sources? as nothing else is changed in drop command. or maybe I'm wrong?

dlt/extract/extract.py Outdated Show resolved Hide resolved
]
load_package.state["dropped_tables"] = drop_tables
source.schema.tables.clear()
source.schema.tables.update(new_schema.tables)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll have a problem here. because the source schema is used to update the pipeline schema. so if you delete a table, it will be kept anyway. not sure what we could do here. maybe this piece of code should be removed from extract and put into a separate function that pipeline will call just before extract? then you can return a modified schema

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the pipeline schema get updated? Seems to work like this, after extract pipeline.default_schema doesn't contain any of the dropped tables

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm it seems I was wrong :)
so now each time extract starts:

  1. we clone the actual schema
  2. we pass it together with resource schema where it gets updated
  3. we set the changes back into the original instance (completely replacing, not merging)
def _extract_source(
        self, extract: Extract, source: DltSource, max_parallel_items: int, workers: int
    ) -> str:
        # discover the existing pipeline schema
        try:
            # all live schemas are initially committed and during the extract will accumulate changes in memory
            # line below may create another live schema if source schema is not a part of storage
            # this will (1) look for import schema if present
            # (2) load import schema an overwrite pipeline schema if import schema modified
            # (3) load pipeline schema if no import schema is present
            pipeline_schema = self.schemas[source.schema.name]
            pipeline_schema = pipeline_schema.clone()  # use clone until extraction complete
            # apply all changes in the source schema to pipeline schema
            # NOTE: we do not apply contracts to changes done programmatically
            pipeline_schema.update_schema(source.schema)
            # replace schema in the source
            source.schema = pipeline_schema
        except FileNotFoundError:
            pass

        # extract into pipeline schema
        load_id = extract.extract(source, max_parallel_items, workers)

        # save import with fully discovered schema
        # NOTE: moved to with_schema_sync, remove this if all test pass
        # self._schema_storage.save_import_schema_if_not_exists(source.schema)

        # update live schema but not update the store yet
        source.schema = self._schema_storage.set_live_schema(source.schema)

that's why it works. so I think it is good.

still I think you could move the code that modifies state and schema to a separate method. part that deletes state for "replace" resources could also go there

dlt/load/load.py Outdated Show resolved Hide resolved
) -> TSchemaTables:
staging_text = "for staging dataset" if staging_info else ""
logger.info(
f"Client for {job_client.config.destination_type} will start initialize storage"
f" {staging_text}"
)
if drop_tables:
old_schema = job_client.schema
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already dropped the tables from the old_schema right? so if we drop them again this means they were added from the data. (or we have a bug that we do not drop this tables effectively).

maybe we do not need to drop anything here? we should have here a schema in a final state

Copy link
Collaborator Author

@steinitzu steinitzu Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to do this cause I think at this point the dropped tables are back in old_schema if the same resources are being loaded again.
So I had to do this and replace stored schema in _dlt_version so that during schema update it correctly detects these tables don't exist and they get re-created.
It's a little convoluted though for sure, maybe there's a better way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

radical solution would be to:

  1. add a flag to update_stored_schema that always forces to re-check the schema and create/update existing tables
  2. remove replace_schema from drop_tables
  3. remove _drop_destination_tables from drop command and always create empty package (just modified state, schema and tables that needs to be dropped and truncated and do this here)

Copy link
Collaborator Author

@steinitzu steinitzu Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a flag to update_stored_schema that always forces to re-check the schema and create/update existing tables

This would have to run every time, right? We don't know if a table was dropped in some previous load.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially solved it with this: 8990f11
replace_schema is not needed now, but I check both version_hash and version when getting stored destination schema, so we are not comparing with the "pre-drop" schema. Maybe version number or previous hashes or something could be part of the hash so it's simpler?
Are there any downsides to this?

)
_state.update(new_state)
if drop_info["tables"]:
drop_tables = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to drop tables from a full table chain. look what happens in load. also we have a marker (seen-data) that tells which tables were really created.

same thing for truncated tables. we should truncate table chains.

maybe when we have table chains we do not really need to modify schema in load.py

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix I'm not 100% what you mean. (aside from checking seen-data) the table chain is already generated in drop far as I can tell, the dropped_tables list is the result of:

def group_tables_by_resource(
    tables: TSchemaTables, pattern: Optional[REPattern] = None
) -> Dict[str, List[TTableSchema]]:
    """Create a dict of resources and their associated tables and descendant tables
    If `pattern` is supplied, the result is filtered to only resource names matching the pattern.
    """
    result: Dict[str, List[TTableSchema]] = {}
    for table in tables.values():
        resource = table.get("resource")
        if resource and (pattern is None or pattern.match(resource)):
            resource_tables = result.setdefault(resource, [])
            resource_tables.extend(get_child_tables(tables, table["name"]))
    return result

...
data_tables = {t["name"]: t for t in schema.data_tables()}  # could probably include seen_data_only here?
resource_tables = group_tables_by_resource(data_tables, pattern=resource_pattern)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu I overlooked this. we are interested in root tables that seen data so

def _write_empty_files(
        self, source: DltSource, extractors: Dict[TDataItemFormat, Extractor]
    ) -> None:
        schema = source.schema
        json_extractor = extractors["object"]
        resources_with_items = set().union(*[e.resources_with_items for e in extractors.values()])
        # find REPLACE resources that did not yield any pipe items and create empty jobs for them
        # NOTE: do not include tables that have never seen data
        data_tables = {t["name"]: t for t in schema.data_tables(seen_data_only=True)}
        tables_by_resources = utils.group_tables_by_resource(data_tables)

looks like this should also go through truncated table state. and other things in _write_empty_files... better we fix it after this PR is merged

dlt/normalize/normalize.py Outdated Show resolved Hide resolved
dlt/pipeline/helpers.py Outdated Show resolved Hide resolved
@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch from 61eef7c to 7bbeeed Compare April 16, 2024 16:30
dlt/common/storages/load_package.py Outdated Show resolved Hide resolved
)
_state.update(new_state)
if drop_info["tables"]:
drop_tables = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu I overlooked this. we are interested in root tables that seen data so

def _write_empty_files(
        self, source: DltSource, extractors: Dict[TDataItemFormat, Extractor]
    ) -> None:
        schema = source.schema
        json_extractor = extractors["object"]
        resources_with_items = set().union(*[e.resources_with_items for e in extractors.values()])
        # find REPLACE resources that did not yield any pipe items and create empty jobs for them
        # NOTE: do not include tables that have never seen data
        data_tables = {t["name"]: t for t in schema.data_tables(seen_data_only=True)}
        tables_by_resources = utils.group_tables_by_resource(data_tables)

looks like this should also go through truncated table state. and other things in _write_empty_files... better we fix it after this PR is merged

) -> TSchemaTables:
staging_text = "for staging dataset" if staging_info else ""
logger.info(
f"Client for {job_client.config.destination_type} will start initialize storage"
f" {staging_text}"
)
if drop_tables:
old_schema = job_client.schema
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

radical solution would be to:

  1. add a flag to update_stored_schema that always forces to re-check the schema and create/update existing tables
  2. remove replace_schema from drop_tables
  3. remove _drop_destination_tables from drop command and always create empty package (just modified state, schema and tables that needs to be dropped and truncated and do this here)

@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch from fe5c0a8 to ac3baa5 Compare April 18, 2024 23:34
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two important things:

  1. investigate why we need to SELECT by version number as well (not only hash) when finding schema in db
  2. unify how package state is written by the pipeline

dlt/load/load.py Outdated Show resolved Hide resolved
logger.info(
f"Client for {job_client.config.destination_type} will drop tables {staging_text}"
)
job_client.drop_tables(*drop_table_names, replace_schema=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not this option at all so you can remove it from drop_tables. we want to add drop tables to other destinations (ie. filesystem) to let it refresh/drop as well

dlt/pipeline/configuration.py Show resolved Hide resolved
dlt/common/storages/load_package.py Outdated Show resolved Hide resolved
@@ -186,7 +187,10 @@ def update_stored_schema(
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
applied_update: TSchemaTables = {}
schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash)
self.schema.version
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu please remove this :)

@@ -375,10 +379,17 @@ def get_stored_state(self, pipeline_name: str) -> StateInfo:
return None
return StateInfo(row[0], row[1], row[2], row[3], pendulum.instance(row[4]))

def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo:
def get_stored_schema_by_hash(
self, version_hash: str, version: Optional[int] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we'd need to select by version hash as well? do we have a problem by "downgrading" a schema when tables are dropped? schemas with same hash have identical content and we store new schema only if hash not exists so we should never have duplicate hashes (except some race conditions)
do we have a test failing without it? then better to investigate

there are two cases when we can get a schema with same content hash but different version

  1. we drop the tables and we get a previous hash of the schema. but version gets bumped anyway
  2. some kind of race where schemas evolve in two different places and differ in number of steps

tldr;> I'd rather remove the version parameter and fix other things

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a problem when trying to reload a previous schema version after some tables are dropped.

  1. Load some source -> stored schema has tables a, b, c
  2. Drop table "b" -> new stored schema has tables a, c -> new hash
  3. Load resource "b" again -> new schema is identical to 1. and has same hash -> update_stored_schema incorrectly decides that current schema is 1. (Where table b alredy exists) -> no changes are detected, dropped table is not recreated and load fails

The version number makes sense to me, cause it's bumped only when there are changes. So even if we have an identical schema to some previous version we need to know there have been changes in between

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK but I still think it is too hacky. We already were dropping tables in the drop command that was not needed. IMO when you drop any tables and modify the schema we should remove the schema with a hash that become invalid. I think we were doing that in the drop command, no?

you can pass the schema hash to delete in the package state

dlt/pipeline/pipeline.py Outdated Show resolved Hide resolved
dlt/pipeline/pipeline.py Outdated Show resolved Hide resolved
max_parallel_items: int,
workers: int,
with_refresh: bool = False,
load_package_state_update: Optional[Dict[str, Any]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu @sh-rp IMO we should unify how we generate package state in the pipeline. right now there's a hack we use to save pipeline state into package state at the end of extract

for source in data_to_sources(
                    data,
                    self,
                    schema,
                    table_name,
                    parent_table_name,
                    write_disposition,
                    columns,
                    primary_key,
                    schema_contract,
                ):
                    if source.exhausted:
                        raise SourceExhausted(source.name)

                    self._extract_source(
                        extract_step, source, max_parallel_items, workers, with_refresh=True
                    )
                # extract state
                state: TPipelineStateDoc = None
                if self.config.restore_from_destination:
                    # this will update state version hash so it will not be extracted again by with_state_sync
                    state = self._bump_version_and_extract_state(
                        self._container[StateInjectableContext].state, True, extract_step
                    )
                # commit load packages with state
                extract_step.commit_packages(state)

here: extract_step.commit_packages(state)

and there's a mechanism from @steinitzu to inject state during extract:

def extract(
....
      if load_package_state_update:
                    load_package.state.update(load_package_state_update)  # type: ignore[typeddict-item]

IMO we do not need both. and what steini did seems more generic so if we just add pipeline state to load_package_state_update

if should_extract and extract_state:
            data, doc = state_resource(state)
            extract_ = extract or Extract(
                self._schema_storage, self._normalize_storage_config(), original_data=data
            )
            self._extract_source(
                extract_,
                data_to_sources(data, self, schema or self.default_schema)[0],
                1,
                1,
                load_package_state_update=load_package_state_update,

under the right key, everything well work (and we fix one inconsistency - if we have many sources we add pipeline state to all packages, not only to the one that contains state document)
@sh-rp WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that sounds good to me. I was thinking we should put the pipeline state into the load package state always and let the destination figure out wether it needs to be stored based on the hash (right now it's only the filesystem that is using this anyway)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Could we then not extract this state_resource? I.e. for drop command just create an empty load package with state and schema and no sources.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu sometimes we must update the state. right now AFAIK you somehow force the state hash to change so state is always extracted. you can try empty package (when there's not state change). I expect it will be ignored by the normalizer. but actually we should push it to load. you can try to fix it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I tried this, most of the changs in 9402090
State table is by default created like other _dlt_ tables and state is inserted in destination in complete_load()

Loading "empty" load package works (only load package state and new schema), but I had to remove this in normalize:

            # if len(schema_files) == 0:
            #     # delete empty package
            #     self.normalize_storage.extracted_packages.delete_package(load_id)
            #     logger.info(f"Empty package {load_id} processed")
            #     continue

Is this just optimization or some other reason to skip empty packages? Would it be better to load it conditionally, e.g. with some marker file that marks it as "state only" package?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu I do not see any changes in normalize so let's keep it like that. we'll have a bigger push to reorganize how we write state

max_parallel_items: int,
workers: int,
with_refresh: bool = False,
load_package_state_update: Optional[Dict[str, Any]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. this parameter is never passed from outside so we can just remove it IMO

max_parallel_items: int,
workers: int,
with_refresh: bool = False,
load_package_state_update: Optional[Dict[str, Any]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that sounds good to me. I was thinking we should put the pipeline state into the load package state always and let the destination figure out wether it needs to be stored based on the hash (right now it's only the filesystem that is using this anyway)

@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch from 8990f11 to 564c282 Compare April 24, 2024 21:01
@rudolfix
Copy link
Collaborator

@steinitzu do you need a review here? PR is still draft

@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch 2 times, most recently from ea477fd to 965a87c Compare May 3, 2024 16:30
@steinitzu steinitzu marked this pull request as ready for review May 3, 2024 17:39
@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch 3 times, most recently from 6102224 to 2297f5a Compare May 14, 2024 16:56
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that code is complete. there are still a few things to do:

  1. there are comments and non executable code left please take look at this PR and clean it
  2. we should improve tests, I've left you a few instructions
  3. docs are missing

also @sh-rp could you look at pipeline.py and review how we update package state and pass the pipeline state to it. you were the implementer... and we'll have a followup here to fully move state processing to package

@@ -186,7 +187,10 @@ def update_stored_schema(
) -> Optional[TSchemaTables]:
super().update_stored_schema(only_tables, expected_update)
applied_update: TSchemaTables = {}
schema_info = self.get_stored_schema_by_hash(self.schema.stored_version_hash)
self.schema.version
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu please remove this :)

@@ -375,10 +379,17 @@ def get_stored_state(self, pipeline_name: str) -> StateInfo:
return None
return StateInfo(row[0], row[1], row[2], row[3], pendulum.instance(row[4]))

def get_stored_schema_by_hash(self, version_hash: str) -> StorageSchemaInfo:
def get_stored_schema_by_hash(
self, version_hash: str, version: Optional[int] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK but I still think it is too hacky. We already were dropping tables in the drop command that was not needed. IMO when you drop any tables and modify the schema we should remove the schema with a hash that become invalid. I think we were doing that in the drop command, no?

you can pass the schema hash to delete in the package state

if isinstance(client, WithStagingDataset):
with contextlib.suppress(DatabaseUndefinedRelation):
with client.with_staging_dataset():
client.drop_tables(*table_names, replace_schema=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so here we were replacing the schema

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rudolfix
We were deleting all schema versions with replace_schema flag before, and I think we need to do that, or at least delete all schemas that contain the tables being dropped, not just the "current version".
I might be wrong there but think otherwise we can on later run accidentally re-create a schema that's identical to some previous version it will think those table already exist.

I thought it was kinda drastic to erase history completely like that, but maybe it's a better workaround? Should we work on properly supporting "subtractive" schema migrations? It is a purely additive versioning system basically so it's all a bit hacky imo.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK you are right. still I prefer to drop all schemas (with given name!). we just need to document this and maybe use :::caution::: for that

max_parallel_items: int,
workers: int,
with_refresh: bool = False,
load_package_state_update: Optional[Dict[str, Any]] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steinitzu I do not see any changes in normalize so let's keep it like that. we'll have a bigger push to reorganize how we write state

dlt/pipeline/__init__.py Show resolved Hide resolved
info = pipeline.run(my_source())
assert_load_info(info)

# Second run of pipeline with only selected resources
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to mutate schema of one of the resources. this mode should allow to replace ie nullable with not nullable column or change the data type (but really - in the database)

this should be tested

def test_refresh_drop_tables():
first_run = True

@dlt.source
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this code by any chance fully duplicated?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much. Cleaned this up a bit, the source is defined only once now

tests/pipeline/test_refresh_modes.py Outdated Show resolved Hide resolved
tests/pipeline/test_refresh_modes.py Outdated Show resolved Hide resolved
from tests.pipeline.utils import assert_load_info


def test_refresh_drop_dataset():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is possible to test also filesystem? we have now a client that can read table data. it would be cool to test local filesystem here

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK! now tests are almost in order! still to do:

  1. I have a strong opinion to rename our modes to drop_sources, drop_resources and drop_data. WDYT? this corresponds more to what happens
  2. is it much work to add refresh to run and extract methods of Pipeline?
  3. we need to improve docstrings and docs
  4. let's remove all schemas for a given name from a destination. Old flag was not that bad IMO - it leaves all details to the implementer
  5. a simple test for local filesystem - it really makes sense when I look how many users use that destination

@@ -72,6 +72,8 @@ def pipeline(
dev_mode (bool, optional): When set to True, each instance of the pipeline with the `pipeline_name` starts from scratch when run and loads the data to a separate dataset.
The datasets are identified by `dataset_name_` + datetime suffix. Use this setting whenever you experiment with your data to be sure you start fresh on each run. Defaults to False.

refresh (str | TRefreshMode): One of `drop_dataset`, `drop_tables` or `drop_data`. Set this to fully or partially delete and reset the schema, state and destination dataset when running the pipeline.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please be more specific :)

  • drop dataset: - drops all tables for any source that will processed in run or extract methods. also drops the source state. also warn that schema history is erased
  • drop_tables - in similar manner. . also warn that schema history is erased
    user should know what will happen when given options is specified

There are three possible refresh modes that can be used. The `refresh` argument should have one of these string values:

* `drop_dataset`
All tables listed in the pipeline's schema will be dropped and all source state is wiped.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we drop only the tables for schemas actually processed right? so if we have two schemas (two different sources) in the pipeline and we use drop and load only source_2, only that source will be wiped? if so we should add a test for that

heh maybe we should rename it to drop_schemas or drop_sources?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was in fact not being done. It was wiping al source state.
Fixed and tested now. drop_sources is good naming imo

@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch from d21dae2 to 61cbaf9 Compare May 22, 2024 19:42
@steinitzu
Copy link
Collaborator Author

OK! now tests are almost in order! still to do:

1. I have a strong opinion to rename our modes  to `drop_sources`, `drop_resources` and `drop_data`. WDYT? this corresponds more to what happens

2. is it much work to add `refresh` to `run` and `extract` methods of `Pipeline`?

3. we need to improve docstrings and docs

4. let's remove all schemas for a given name from a destination. Old flag was not that bad IMO - it leaves all details to the implementer

5. a simple test for local filesystem - it really makes sense when I look how many users use that destination
  1. Yes. I think that naming is much better. I changed it.
  2. Added this and tests for each. Nice to have this.
  3. I updated both a bit. Let me know if anything missing.
  4. Doing this again now, but I renamed the drop_tables replace_schema argument to delete_schema and changed the behaviour.
    This is simpler for initializing client. No need to do the dance with swapping out client schema, we just drop tables and delete schema history in one transaction -> then "update stored schema" runs as normal after.
  5. We don't support drop_tables in filesystem. Should we add it? I guess it would be same as truncate basically (maybe delete the folders also?). But I made a test for refresh=drop_data

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! just a few small things are left
regarding the filesystem: it has truncate_tables method which is equivalent to drop_tables. IMO you could just add drop_tables to it, call truncate_tables. dropping schemas is easy: just remove the directory:

self.get_table_dir(self.schema.version_table_name)

@sh-rp please review #1063 (comment)

this is big PR... we'll make sure to split those into smaller ones

dlt/pipeline/configuration.py Outdated Show resolved Hide resolved
docs/website/docs/dlt-ecosystem/destinations/duckdb.md Outdated Show resolved Hide resolved
docs/website/docs/general-usage/pipeline.md Show resolved Hide resolved
docs/website/docs/general-usage/pipeline.md Show resolved Hide resolved
dlt/pipeline/helpers.py Outdated Show resolved Hide resolved
dlt/pipeline/__init__.py Outdated Show resolved Hide resolved
@pytest.mark.parametrize(
"destination_config", destinations_configs(local_filesystem_configs=True), ids=lambda x: x.name
)
def test_refresh_drop_sources_local_filesystem(destination_config: DestinationTestConfiguration):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imho this does not need to be tested in a special test for local filesystem. we have a bunch of utils where you can query the items for each table regardless of wether the destination is a filesystem or a sql db. or is there something here that will not work this way?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking it would be cool to have some kind of unified interface to query all items from a table that needs to be implemented for all destinations so we do not need these extra tests for filesystem that are still around in a couple of places. But that is for another PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to do this mostly with the load_tables_to_dicts utils. All tests run on filesystem now!


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only duckdb?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned below, you should be able to test the filesystems with the same test

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only variance could be the implementation of "drop/truncate tables" methods, but they're already tested elsewhere. So didn't think it would be necessary to run other destinations here. Just going by the assumption that all sql destinations have their interface tested.
But maybe doesn't hurt to be thorough if something changes with the implementation in the future.


@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, subset=["duckdb"]),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same questions as above

Copy link
Collaborator

@sh-rp sh-rp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have another closer look tomorrow as this is really a lot of stuff going on here. For now I would suggest merging filesystem and duckdb tests, I think it is possible with our helpers and it will reduce complexity. Also @rudolfix is testing on duckdb enough?

@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch from 533a7f1 to 87cfd16 Compare May 29, 2024 14:00
@steinitzu
Copy link
Collaborator Author

@steinitzu plenty of tests are failing. my take is that you should try to run common test locally so we have less turnover on CI (if you do not do that already)

@rudolfix got it. I don't always run them but should for sure before pushing. Usually I try to run the tests I think are relevant, in this case lot more than I thought.

Common + load postgres/duck/local filesystem tests are passing locally now

@steinitzu steinitzu force-pushed the sthor/full-refresh-adjustment branch from 6eb160a to 87cfd16 Compare May 29, 2024 21:00
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@sh-rp
Copy link
Collaborator

sh-rp commented Jun 3, 2024

The state stuff looks good to me too. The way it is loaded to the destination has not been changed here yet, this is for an upcoming PR (correct me if I'm wrong there @steinitzu), only those new fields have been added and the state is committed after extraction, so I don't see a problem, or any migration stuff we need to take care of.

@rudolfix rudolfix merged commit cbed225 into devel Jun 3, 2024
48 of 50 checks passed
@rudolfix rudolfix deleted the sthor/full-refresh-adjustment branch June 3, 2024 14:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants