Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New "refresh" mode and "dev_mode" #1063

Merged
merged 51 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
dba576b
Rename full_refresh -> dev_mode, add deprecation warning
steinitzu Mar 1, 2024
432d1d7
Replace some full_refresh usage in code and docs
steinitzu May 10, 2024
c7e86ef
Replace full_refresh usage in tests
steinitzu May 10, 2024
8c13683
Init experimental refresh = full with drop command
steinitzu Apr 9, 2024
613257e
Refresh modes with dropped_tables file
steinitzu Apr 24, 2024
2d46305
Init separate local/write drop command
steinitzu Apr 9, 2024
9b63cff
Use load_package_state instead of drop tables file
steinitzu Apr 18, 2024
3b6d809
Use drop schema in init_client (TODO: error)
steinitzu Apr 6, 2024
4334553
Separate cli drop command instructions/execute
steinitzu Apr 7, 2024
b6a21aa
drop tables in init_client
steinitzu Apr 8, 2024
e9543e0
dropped_tables field on load package state
steinitzu Apr 9, 2024
7f95d9e
Fix import
steinitzu Apr 9, 2024
3e0af96
test/fix truncate mode
steinitzu Apr 9, 2024
e9528b1
Save truncated tables in load package state
steinitzu Apr 15, 2024
10fe6f3
Remove load package state copying
steinitzu Apr 16, 2024
44cc1cc
cleanup
steinitzu Apr 17, 2024
54de570
Drop cmd use package state, refactoring
steinitzu Apr 18, 2024
6aae866
Don't drop tables without data
steinitzu Apr 17, 2024
29b6c96
Validate literals in configspec
steinitzu Apr 19, 2024
8a2276b
Match stored schema by version+version_hash
steinitzu Apr 19, 2024
2546021
Cleanup
steinitzu May 10, 2024
c749eae
Fix dlt version test
steinitzu May 11, 2024
7802e85
Cleanup
steinitzu May 11, 2024
ce41fb0
Remove dropped_tables_filename
steinitzu May 14, 2024
0f9358d
Fix snippet
steinitzu May 14, 2024
7a161a8
Pipeline refresh docs
steinitzu May 18, 2024
113d577
refresh argument docstring
steinitzu May 18, 2024
73aab0e
Restore and update commented drop cmd tests
steinitzu May 18, 2024
f18509e
Cleanup refresh tests and test whether table dropped vs truncated
steinitzu May 18, 2024
b662381
Test existing schema hash
steinitzu May 18, 2024
ed43445
Revert "Match stored schema by version+version_hash"
steinitzu May 22, 2024
fbfb97c
Use replace_schema flag
steinitzu May 22, 2024
2ed7d39
Change drop_tables replace_schema to delete_schema
steinitzu May 22, 2024
b8d98f6
Refresh drop only selected sources
steinitzu May 22, 2024
1a28168
Rename refresh modes, update docs
steinitzu May 22, 2024
360e2c4
pipeline.run/extract refresh argument
steinitzu May 22, 2024
6f5a853
Don't modify schema when refresh='drop_data'
steinitzu May 22, 2024
a122834
Move refresh tests to load, add filesystem truncate test
steinitzu May 22, 2024
bbf56a0
Fix duck import
steinitzu May 22, 2024
d9bfe90
Remove generated doc sections
steinitzu May 27, 2024
0abffa5
Default full_refresh=None
steinitzu May 27, 2024
f096596
Cleanup unused imports
steinitzu May 27, 2024
be7e0d0
Close caution blocks
steinitzu May 27, 2024
8b5aa2d
Update config field docstring
steinitzu May 27, 2024
aaaf519
Add filesystem drop_tables method
steinitzu May 27, 2024
e3b5b35
Run all refresh tests on local filesystem destination
steinitzu May 27, 2024
3dcd1ab
Fix test drop
steinitzu May 29, 2024
8580ea4
Fix iter filesystem schemas
steinitzu May 29, 2024
6a06449
Fix drop_resources
steinitzu May 29, 2024
87cfd16
Default config.full_refresh also None
steinitzu May 29, 2024
879e4e5
Fix filesystem + test
steinitzu May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions dlt/cli/deploy_command_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,25 @@ def parse_pipeline_info(visitor: PipelineScriptVisitor) -> List[Tuple[str, Optio
if n.PIPELINE in visitor.known_calls:
for call_args in visitor.known_calls[n.PIPELINE]:
pipeline_name, pipelines_dir = None, None
f_r_node = call_args.arguments.get("full_refresh")
# Check both full_refresh/dev_mode until full_refresh option is removed from dlt
f_r_node = call_args.arguments.get("full_refresh") or call_args.arguments.get(
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
"dev_mode"
)
if f_r_node:
f_r_value = evaluate_node_literal(f_r_node)
if f_r_value is None:
fmt.warning(
"The value of `full_refresh` in call to `dlt.pipeline` cannot be"
"The value of `dev_mode` in call to `dlt.pipeline` cannot be"
f" determined from {unparse(f_r_node).strip()}. We assume that you know"
" what you are doing :)"
)
if f_r_value is True:
if fmt.confirm(
"The value of 'full_refresh' is set to True. Do you want to abort to set it"
" to False?",
"The value of 'dev_mode' or 'full_refresh' is set to True. Do you want to"
" abort to set it to False?",
default=True,
):
raise CliCommandException("deploy", "Please set the full_refresh to False")
raise CliCommandException("deploy", "Please set the dev_mode to False")

p_d_node = call_args.arguments.get("pipelines_dir")
if p_d_node:
Expand Down
2 changes: 1 addition & 1 deletion dlt/common/configuration/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def _resolve_config_field(
embedded_sections: Tuple[str, ...],
accept_partial: bool,
) -> Tuple[Any, List[LookupTrace]]:
inner_hint = extract_inner_hint(hint)
inner_hint = extract_inner_hint(hint, preserve_literal=True)

if explicit_value is not None:
value = explicit_value
Expand Down
12 changes: 9 additions & 3 deletions dlt/common/configuration/specs/base_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
overload,
ClassVar,
TypeVar,
Literal,
)
from typing_extensions import get_args, get_origin, dataclass_transform, Annotated, TypeAlias
from functools import wraps
Expand Down Expand Up @@ -120,13 +121,18 @@ def is_valid_hint(hint: Type[Any]) -> bool:
return False


def extract_inner_hint(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]:
def extract_inner_hint(
hint: Type[Any], preserve_new_types: bool = False, preserve_literal: bool = False
) -> Type[Any]:
# extract hint from Optional / Literal / NewType hints
inner_hint = extract_inner_type(hint, preserve_new_types)
inner_hint = extract_inner_type(hint, preserve_new_types, preserve_literal)
# get base configuration from union type
inner_hint = get_config_if_union_hint(inner_hint) or inner_hint
# extract origin from generic types (ie List[str] -> List)
return get_origin(inner_hint) or inner_hint
origin = get_origin(inner_hint) or inner_hint
if preserve_literal and origin is Literal:
return inner_hint
return origin or inner_hint


def is_secret_hint(hint: Type[Any]) -> bool:
Expand Down
31 changes: 27 additions & 4 deletions dlt/common/configuration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,20 @@
import ast
import contextlib
import tomlkit
from typing import Any, Dict, Mapping, NamedTuple, Optional, Tuple, Type, Sequence
from typing import (
Any,
Dict,
Mapping,
NamedTuple,
Optional,
Tuple,
Type,
Sequence,
get_args,
Literal,
get_origin,
List,
)
from collections.abc import Mapping as C_Mapping

from dlt.common.json import json
Expand Down Expand Up @@ -51,25 +64,35 @@ def deserialize_value(key: str, value: Any, hint: Type[TAny]) -> TAny:
raise
return c # type: ignore

literal_values: Tuple[Any, ...] = ()
if get_origin(hint) is Literal:
# Literal fields are validated against the literal values
literal_values = get_args(hint)
hint_origin = type(literal_values[0])
else:
hint_origin = hint

# coerce value
hint_dt = py_type_to_sc_type(hint)
hint_dt = py_type_to_sc_type(hint_origin)
value_dt = py_type_to_sc_type(type(value))

# eval only if value is string and hint is "complex"
if value_dt == "text" and hint_dt == "complex":
if hint is tuple:
if hint_origin is tuple:
# use literal eval for tuples
value = ast.literal_eval(value)
else:
# use json for sequences and mappings
value = json.loads(value)
# exact types must match
if not isinstance(value, hint):
if not isinstance(value, hint_origin):
raise ValueError(value)
else:
# for types that are not complex, reuse schema coercion rules
if value_dt != hint_dt:
value = coerce_value(hint_dt, value_dt, value)
if literal_values and value not in literal_values:
raise ConfigValueCannotBeCoercedException(key, value, hint)
return value # type: ignore
except ConfigValueCannotBeCoercedException:
raise
Expand Down
13 changes: 13 additions & 0 deletions dlt/common/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TypeVar,
TypedDict,
Mapping,
Literal,
)
from typing_extensions import NotRequired

Expand Down Expand Up @@ -52,6 +53,10 @@
from dlt.common.versioned_state import TVersionedState


# TRefreshMode = Literal["full", "replace"]
TRefreshMode = Literal["drop_sources", "drop_resources", "drop_data"]


class _StepInfo(NamedTuple):
pipeline: "SupportsPipeline"
loads_ids: List[str]
Expand Down Expand Up @@ -762,6 +767,14 @@ def reset_resource_state(resource_name: str, source_state_: Optional[DictStrAny]
state_["resources"].pop(resource_name)


def _get_matching_sources(
pattern: REPattern, pipeline_state: Optional[TPipelineState] = None, /
) -> List[str]:
"""Get all source names in state matching the regex pattern"""
state_ = _sources_state(pipeline_state)
return [key for key in state_ if pattern.match(key)]


def _get_matching_resources(
pattern: REPattern, source_state_: Optional[DictStrAny] = None, /
) -> List[str]:
Expand Down
11 changes: 11 additions & 0 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,17 @@ def update_schema(self, schema: "Schema") -> None:
self._settings = deepcopy(schema.settings)
self._compile_settings()

def drop_tables(
self, table_names: Sequence[str], seen_data_only: bool = False
) -> List[TTableSchema]:
"""Drops tables from the schema and returns the dropped tables"""
result = []
for table_name in table_names:
table = self.tables.get(table_name)
if table and (not seen_data_only or utils.has_table_seen_data(table)):
result.append(self._schema_tables.pop(table_name))
return result

def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: StrAny) -> StrAny:
rv_row: DictStrAny = {}
column_prop: TColumnProp = utils.hint_to_column_prop(hint_type)
Expand Down
7 changes: 6 additions & 1 deletion dlt/common/storages/load_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from dlt.common.destination import TLoaderFileFormat
from dlt.common.exceptions import TerminalValueError
from dlt.common.schema import Schema, TSchemaTables
from dlt.common.schema.typing import TStoredSchema, TTableSchemaColumns
from dlt.common.schema.typing import TStoredSchema, TTableSchemaColumns, TTableSchema
from dlt.common.storages import FileStorage
from dlt.common.storages.exceptions import LoadPackageNotFound, CurrentLoadPackageStateNotAvailable
from dlt.common.typing import DictStrAny, SupportsHumanize
Expand Down Expand Up @@ -76,6 +76,11 @@ class TLoadPackageState(TVersionedState, total=False):
destination_state: NotRequired[Dict[str, Any]]
"""private space for destinations to store state relevant only to the load package"""

dropped_tables: NotRequired[List[TTableSchema]]
"""List of tables that are to be dropped from the schema and destination (i.e. when `refresh` mode is used)"""
truncated_tables: NotRequired[List[TTableSchema]]
"""List of tables that are to be truncated in the destination (i.e. when `refresh='drop_data'` mode is used)"""


class TLoadPackage(TypedDict, total=False):
load_id: str
Expand Down
12 changes: 7 additions & 5 deletions dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ def is_dict_generic_type(t: Type[Any]) -> bool:
return False


def extract_inner_type(hint: Type[Any], preserve_new_types: bool = False) -> Type[Any]:
def extract_inner_type(
hint: Type[Any], preserve_new_types: bool = False, preserve_literal: bool = False
) -> Type[Any]:
"""Gets the inner type from Literal, Optional, Final and NewType

Args:
Expand All @@ -256,15 +258,15 @@ def extract_inner_type(hint: Type[Any], preserve_new_types: bool = False) -> Typ
Type[Any]: Inner type if hint was Literal, Optional or NewType, otherwise hint
"""
if maybe_modified := extract_type_if_modifier(hint):
return extract_inner_type(maybe_modified, preserve_new_types)
return extract_inner_type(maybe_modified, preserve_new_types, preserve_literal)
if is_optional_type(hint):
return extract_inner_type(get_args(hint)[0], preserve_new_types)
if is_literal_type(hint):
return extract_inner_type(get_args(hint)[0], preserve_new_types, preserve_literal)
if is_literal_type(hint) and not preserve_literal:
# assume that all literals are of the same type
return type(get_args(hint)[0])
if is_newtype_type(hint) and not preserve_new_types:
# descend into supertypes of NewType
return extract_inner_type(hint.__supertype__, preserve_new_types)
return extract_inner_type(hint.__supertype__, preserve_new_types, preserve_literal)
return hint


Expand Down
34 changes: 26 additions & 8 deletions dlt/destinations/impl/filesystem/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
self.fs_client.makedirs(self.dataset_path, exist_ok=True)
self.fs_client.touch(self.pathlib.join(self.dataset_path, INIT_FILE_NAME))

def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
self.truncate_tables(list(tables))
if not delete_schema:
return
# Delete all stored schemas
for filename, fileparts in self._iter_stored_schema_files():
if fileparts[0] == self.schema.name:
self._delete_file(filename)

def truncate_tables(self, table_names: List[str]) -> None:
"""Truncate table with given name"""
table_dirs = set(self.get_table_dirs(table_names))
Expand All @@ -180,19 +189,23 @@ def truncate_tables(self, table_names: List[str]) -> None:
# NOTE: deleting in chunks on s3 does not raise on access denied, file non existing and probably other errors
# print(f"DEL {table_file}")
try:
# NOTE: must use rm_file to get errors on delete
self.fs_client.rm_file(table_file)
except NotImplementedError:
# not all filesystem implement the above
self.fs_client.rm(table_file)
if self.fs_client.exists(table_file):
raise FileExistsError(table_file)
self._delete_file(table_file)
except FileNotFoundError:
logger.info(
f"Directory or path to truncate tables {table_names} does not exist but"
" it should have been created previously!"
)

def _delete_file(self, file_path: str) -> None:
try:
# NOTE: must use rm_file to get errors on delete
self.fs_client.rm_file(file_path)
except NotImplementedError:
# not all filesystems implement the above
self.fs_client.rm(file_path)
if self.fs_client.exists(file_path):
raise FileExistsError(file_path)

def update_stored_schema(
self,
only_tables: Iterable[str] = None,
Expand Down Expand Up @@ -401,6 +414,11 @@ def _get_schema_file_name(self, version_hash: str, load_id: str) -> str:
f"{self.schema.name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl",
)

def _iter_stored_schema_files(self) -> Iterator[Tuple[str, List[str]]]:
"""Iterator over all stored schema files"""
for filepath, fileparts in self._list_dlt_table_files(self.schema.version_table_name):
yield filepath, fileparts

def _get_stored_schema_by_hash_or_newest(
self, version_hash: str = None
) -> Optional[StorageSchemaInfo]:
Expand All @@ -409,7 +427,7 @@ def _get_stored_schema_by_hash_or_newest(
# find newest schema for pipeline or by version hash
selected_path = None
newest_load_id = "0"
for filepath, fileparts in self._list_dlt_table_files(self.schema.version_table_name):
for filepath, fileparts in self._iter_stored_schema_files():
if (
not version_hash
and fileparts[0] == self.schema.name
Expand Down
18 changes: 12 additions & 6 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,18 @@ def update_stored_schema(
)
return applied_update

def drop_tables(self, *tables: str, replace_schema: bool = True) -> None:
def drop_tables(self, *tables: str, delete_schema: bool = True) -> None:
"""Drop tables in destination database and optionally delete the stored schema as well.
Clients that support ddl transactions will have both operations performed in a single transaction.

Args:
tables: Names of tables to drop.
delete_schema: If True, also delete all versions of the current schema from storage
"""
with self.maybe_ddl_transaction():
self.sql_client.drop_tables(*tables)
if replace_schema:
self._replace_schema_in_storage(self.schema)
if delete_schema:
self._delete_schema_in_storage(self.schema)

@contextlib.contextmanager
def maybe_ddl_transaction(self) -> Iterator[None]:
Expand Down Expand Up @@ -520,13 +527,12 @@ def _row_to_schema_info(self, query: str, *args: Any) -> StorageSchemaInfo:

return StorageSchemaInfo(row[0], row[1], row[2], row[3], inserted_at, schema_str)

def _replace_schema_in_storage(self, schema: Schema) -> None:
def _delete_schema_in_storage(self, schema: Schema) -> None:
"""
Save the given schema in storage and remove all previous versions with the same name
Delete all stored versions with the same name as given schema
"""
name = self.sql_client.make_qualified_table_name(self.schema.version_table_name)
self.sql_client.execute_sql(f"DELETE FROM {name} WHERE schema_name = %s;", schema.name)
self._update_schema_in_storage(schema)

def _update_schema_in_storage(self, schema: Schema) -> None:
# get schema string or zip
Expand Down
Loading
Loading