Skip to content

Commit

Permalink
adds _impl_cls to dlt.resource and dynamic config section to standalo…
Browse files Browse the repository at this point in the history
…ne resources with dynamic names (#1324)

* allows custom DltResource impl, standalone resource returning resource, standalone resource config name may be dynamic

* better cloning and awareness of config injection in incremental

* bumps mypy to 1.10
  • Loading branch information
rudolfix authored May 6, 2024
1 parent b4a736c commit e329ab9
Show file tree
Hide file tree
Showing 13 changed files with 208 additions and 99 deletions.
5 changes: 4 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,10 @@ def should_truncate_table_before_load_on_staging_destination(self, table: TTable
return True


TDestinationReferenceArg = Union[str, "Destination", Callable[..., "Destination"], None]
# TODO: type Destination properly
TDestinationReferenceArg = Union[
str, "Destination[Any, Any]", Callable[..., "Destination[Any, Any]"], None
]


class Destination(ABC, Generic[TDestinationConfig, TDestinationClient]):
Expand Down
4 changes: 3 additions & 1 deletion dlt/common/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@
from typing import _TypedDict

REPattern = _REPattern[str]
PathLike = os.PathLike[str]
else:
StrOrBytesPath = Any
from typing import _TypedDictMeta as _TypedDict

REPattern = _REPattern
PathLike = os.PathLike

AnyType: TypeAlias = Any
NoneType = type(None)
Expand Down Expand Up @@ -92,7 +94,7 @@
TVariantBase = TypeVar("TVariantBase", covariant=True)
TVariantRV = Tuple[str, Any]
VARIANT_FIELD_FORMAT = "v_%s"
TFileOrPath = Union[str, os.PathLike, IO[Any]]
TFileOrPath = Union[str, PathLike, IO[Any]]
TSortOrder = Literal["asc", "desc"]


Expand Down
2 changes: 1 addition & 1 deletion dlt/destinations/impl/dremio/pydremio.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def __init__(self, factory: CookieMiddlewareFactory, *args: Any, **kwargs: Any):
def received_headers(self, headers: Mapping[str, str]) -> None:
for key in headers:
if key.lower() == "set-cookie":
cookie = SimpleCookie() # type: ignore
cookie = SimpleCookie()
for item in headers.get(key):
cookie.load(item)

Expand Down
103 changes: 69 additions & 34 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from dlt.extract.exceptions import (
CurrentSourceNotAvailable,
DynamicNameNotStandaloneResource,
InvalidResourceDataTypeFunctionNotAGenerator,
InvalidTransformerDataTypeGeneratorFunctionRequired,
ResourceFunctionExpected,
ResourceInnerCallableConfigWrapDisallowed,
Expand All @@ -65,7 +66,7 @@

from dlt.extract.items import TTableHintTemplate
from dlt.extract.source import DltSource
from dlt.extract.resource import DltResource, TUnboundDltResource
from dlt.extract.resource import DltResource, TUnboundDltResource, TDltResourceImpl


@configspec
Expand Down Expand Up @@ -103,7 +104,7 @@ def source(
schema_contract: TSchemaContract = None,
spec: Type[BaseConfiguration] = None,
_impl_cls: Type[TDltSourceImpl] = DltSource, # type: ignore[assignment]
) -> Callable[TSourceFunParams, DltSource]: ...
) -> Callable[TSourceFunParams, TDltSourceImpl]: ...


@overload
Expand Down Expand Up @@ -264,7 +265,7 @@ async def _wrap_coro(*args: Any, **kwargs: Any) -> TDltSourceImpl:
# get spec for wrapped function
SPEC = get_fun_spec(conf_f)
# get correct wrapper
wrapper = _wrap_coro if inspect.iscoroutinefunction(inspect.unwrap(f)) else _wrap
wrapper: AnyFun = _wrap_coro if inspect.iscoroutinefunction(inspect.unwrap(f)) else _wrap # type: ignore[assignment]
# store the source information
_SOURCES[_wrap.__qualname__] = SourceInfo(SPEC, wrapper, func_module)
if inspect.iscoroutinefunction(inspect.unwrap(f)):
Expand Down Expand Up @@ -296,7 +297,8 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource: ...
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> TDltResourceImpl: ...


@overload
Expand All @@ -315,7 +317,8 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: ...
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> Callable[[Callable[TResourceFunParams, Any]], TDltResourceImpl]: ...


@overload
Expand All @@ -334,8 +337,11 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
standalone: Literal[True] = True,
) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, DltResource]]: ...
) -> Callable[
[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, TDltResourceImpl]
]: ...


@overload
Expand All @@ -354,7 +360,8 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
) -> DltResource: ...
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> TDltResourceImpl: ...


def resource(
Expand All @@ -372,6 +379,7 @@ def resource(
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
standalone: bool = False,
data_from: TUnboundDltResource = None,
) -> Any:
Expand Down Expand Up @@ -435,17 +443,19 @@ def resource(
parallelized (bool, optional): If `True`, the resource generator will be extracted in parallel with other resources. Defaults to `False`.
_impl_cls (Type[TDltResourceImpl], optional): A custom implementation of DltResource, may be also used to providing just a typing stub
Raises:
ResourceNameMissing: indicates that name of the resource cannot be inferred from the `data` being passed.
InvalidResourceDataType: indicates that the `data` argument cannot be converted into `dlt resource`
Returns:
DltResource instance which may be loaded, iterated or combined with other resources into a pipeline.
TDltResourceImpl instance which may be loaded, iterated or combined with other resources into a pipeline.
"""

def make_resource(
_name: str, _section: str, _data: Any, incremental: IncrementalResourceWrapper = None
) -> DltResource:
) -> TDltResourceImpl:
table_template = make_hints(
table_name,
write_disposition=write_disposition or DEFAULT_WRITE_DISPOSITION,
Expand All @@ -464,7 +474,7 @@ def make_resource(
table_template.setdefault("x-normalizer", {}) # type: ignore[typeddict-item]
table_template["x-normalizer"]["max_nesting"] = max_table_nesting # type: ignore[typeddict-item]

resource = DltResource.from_data(
resource = _impl_cls.from_data(
_data,
_name,
_section,
Expand All @@ -479,7 +489,7 @@ def make_resource(

def decorator(
f: Callable[TResourceFunParams, Any]
) -> Callable[TResourceFunParams, DltResource]:
) -> Callable[TResourceFunParams, TDltResourceImpl]:
if not callable(f):
if data_from:
# raise more descriptive exception if we construct transformer
Expand All @@ -490,7 +500,6 @@ def decorator(
if not standalone and callable(name):
raise DynamicNameNotStandaloneResource(get_callable_name(f))

# resource_section = name if name and not callable(name) else get_callable_name(f)
resource_name = name if name and not callable(name) else get_callable_name(f)

# do not inject config values for inner functions, we assume that they are part of the source
Expand All @@ -506,15 +515,18 @@ def decorator(
incr_f = incremental.wrap(sig, f) if incremental else f

resource_sections = (known_sections.SOURCES, source_section, resource_name)

# standalone resource will prefer existing section context when resolving config values
# this lets the source to override those values and provide common section for all config values for resources present in that source
# for autogenerated spec do not include defaults
# NOTE: allow full config for standalone, currently some edge cases for incremental does not work
# (removing it via apply hints or explicit call)
conf_f = with_config(
incr_f,
spec=spec,
sections=resource_sections,
sections_merge_style=ConfigSectionContext.resource_merge_style,
include_defaults=spec is not None,
include_defaults=spec is not None, # or standalone,
)
is_inner_resource = is_inner_callable(f)
if conf_f != incr_f and is_inner_resource and not standalone:
Expand All @@ -526,33 +538,52 @@ def decorator(
if not is_inner_resource:
_SOURCES[f.__qualname__] = SourceInfo(SPEC, f, func_module)

if standalone:
if data_from:
compat_wrapper, skip_args = wrap_compat_transformer, 1
else:
compat_wrapper, skip_args = wrap_resource_gen, 0

@wraps(conf_f)
def _wrap(*args: Any, **kwargs: Any) -> DltResource:
_, mod_sig, bound_args = simulate_func_call(conf_f, skip_args, *args, **kwargs)
actual_resource_name = (
name(bound_args.arguments) if callable(name) else resource_name
)
if not standalone:
# we return a DltResource that is callable and returns dlt resource when called
# so it should match the signature
return make_resource(resource_name, source_section, conf_f, incremental) # type: ignore[return-value]

# wrap the standalone resource
if data_from:
compat_wrapper, skip_args = wrap_compat_transformer, 1
else:
compat_wrapper, skip_args = wrap_resource_gen, 0

@wraps(incr_f)
def _wrap(*args: Any, **kwargs: Any) -> TDltResourceImpl:
_, mod_sig, bound_args = simulate_func_call(incr_f, skip_args, *args, **kwargs)
actual_resource_name = name(bound_args.arguments) if callable(name) else resource_name
# wrap again with an actual resource name
conf_f = with_config(
incr_f,
spec=SPEC,
sections=resource_sections[:-1] + (actual_resource_name,),
sections_merge_style=ConfigSectionContext.resource_merge_style,
)
try:
r = make_resource(
actual_resource_name,
source_section,
compat_wrapper(actual_resource_name, conf_f, sig, *args, **kwargs),
incremental,
)
# consider transformer arguments bound
r._args_bound = True
# keep explicit args passed
r._set_explicit_args(conf_f, mod_sig, *args, **kwargs)
return r

return _wrap
else:
return make_resource(resource_name, source_section, conf_f, incremental)
except InvalidResourceDataTypeFunctionNotAGenerator as gen_ex:
# we allow an edge case: resource can return another resource
try:
# actually call the function to see if it contains DltResource
data_ = conf_f(*args, **kwargs)
if not isinstance(data_, DltResource):
raise
r = data_ # type: ignore[assignment]
except Exception:
raise gen_ex from None
# consider transformer arguments bound
r._args_bound = True
# keep explicit args passed
r._set_explicit_args(conf_f, mod_sig, *args, **kwargs)
return r

return _wrap

# if data is callable or none use decorator
if data is None:
Expand Down Expand Up @@ -659,6 +690,7 @@ def transformer(
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
standalone: bool = False,
_impl_cls: Type[TDltResourceImpl] = DltResource, # type: ignore[assignment]
) -> Any:
"""A form of `dlt resource` that takes input from other resources via `data_from` argument in order to enrich or transform the data.
Expand Down Expand Up @@ -713,6 +745,8 @@ def transformer(
spec (Type[BaseConfiguration], optional): A specification of configuration and secret values required by the source.
standalone (bool, optional): Returns a wrapped decorated function that creates DltResource instance. Must be called before use. Cannot be part of a source.
_impl_cls (Type[TDltResourceImpl], optional): A custom implementation of DltResource, may be also used to providing just a typing stub
"""
if isinstance(f, DltResource):
raise ValueError(
Expand All @@ -733,6 +767,7 @@ def transformer(
standalone=standalone,
data_from=data_from,
parallelized=parallelized,
_impl_cls=_impl_cls,
)


Expand Down
16 changes: 13 additions & 3 deletions dlt/extract/incremental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ def merge(self, other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue
merged.resource_name = self.resource_name
if other.resource_name:
merged.resource_name = other.resource_name
# also pass if resolved
merged.__is_resolved__ = other.__is_resolved__
merged.__exception__ = other.__exception__
return merged # type: ignore

def copy(self) -> "Incremental[TCursorValue]":
Expand Down Expand Up @@ -438,7 +441,8 @@ def can_close(self) -> bool:
def __str__(self) -> str:
return (
f"Incremental at {id(self)} for resource {self.resource_name} with cursor path:"
f" {self.cursor_path} initial {self.initial_value} lv_func {self.last_value_func}"
f" {self.cursor_path} initial {self.initial_value} - {self.end_value} lv_func"
f" {self.last_value_func}"
)

def _get_transformer(self, items: TDataItems) -> IncrementalTransform:
Expand Down Expand Up @@ -569,10 +573,16 @@ def _wrap(*args: Any, **kwargs: Any) -> Any:
new_incremental.__orig_class__ = p.annotation # type: ignore

# set the incremental only if not yet set or if it was passed explicitly
# NOTE: if new incremental is resolved, it was passed via config injection
# NOTE: the _incremental may be also set by applying hints to the resource see `set_template` in `DltResource`
if (new_incremental and p.name in bound_args.arguments) or not self._incremental:
if (
new_incremental
and p.name in bound_args.arguments
and not new_incremental.is_resolved()
) or not self._incremental:
self._incremental = new_incremental
self._incremental.resolve()
if not self._incremental.is_resolved():
self._incremental.resolve()
# in case of transformers the bind will be called before this wrapper is set: because transformer is called for a first time late in the pipe
if self._resource_name:
# rebind internal _incremental from wrapper that already holds
Expand Down
Loading

0 comments on commit e329ab9

Please sign in to comment.