From 2a4e512a20a63290829031c15b117006d13c3046 Mon Sep 17 00:00:00 2001 From: guillaume Date: Mon, 27 Nov 2023 14:15:23 +0000 Subject: [PATCH] chore: add flatten and improve types variance --- iter_pipes/functional.py | 8 ++++ iter_pipes/main.py | 98 ++++++++++++++++++++++------------------ tests/test_flatten.py | 12 +++++ 3 files changed, 73 insertions(+), 45 deletions(-) create mode 100644 tests/test_flatten.py diff --git a/iter_pipes/functional.py b/iter_pipes/functional.py index f74081a..b49b06f 100644 --- a/iter_pipes/functional.py +++ b/iter_pipes/functional.py @@ -29,6 +29,14 @@ Step = Callable[[Iterable[T]], Iterable[V]] +def flatten() -> Step[Iterable[T], T]: + def f(data: Iterable[Iterable[T]]) -> Iterable[T]: + for x in data: + yield from x + + return f + + def map(step: Callable[[V], W]) -> Step[V, W]: def f(data: Iterable[V]) -> Iterable[W]: for item in data: diff --git a/iter_pipes/main.py b/iter_pipes/main.py index bd3db8f..b03e893 100644 --- a/iter_pipes/main.py +++ b/iter_pipes/main.py @@ -8,14 +8,15 @@ batch, branch, filter, + flatten, for_batch, for_each, identity, map, ) -T = TypeVar("T") -U = TypeVar("U") +T_contra = TypeVar("T_contra", contravariant=True) +U_co = TypeVar("U_co", covariant=True) V = TypeVar("V") W = TypeVar("W") X = TypeVar("X") @@ -27,135 +28,140 @@ raw_filter = filter -Step = Callable[[Iterable[T]], Iterable[U]] +Step = Callable[[Iterable[T_contra]], Iterable[U_co]] -def compose_steps(step1: Step[T, U] | None, step2: Step[U, V]) -> Step[T, V]: +def compose_steps( + step1: Step[T_contra, U_co] | None, step2: Step[U_co, V] +) -> Step[T_contra, V]: if step1 is None: return step2 # type: ignore - def composed(items: Iterable[T]) -> Iterable[V]: + def composed(items: Iterable[T_contra]) -> Iterable[V]: return step2(step1(items)) return composed -class IterableWrapper(Generic[T]): - def __init__(self, iterable: Iterable[T]): +class IterableWrapper(Generic[T_contra]): + def __init__(self, iterable: Iterable[T_contra]): self._iterable = iterable - def __iter__(self) -> Iterator[T]: + def __iter__(self) -> Iterator[T_contra]: return iter(self._iterable) def consume(self) -> None: deque(self._iterable) - def to_list(self) -> list[T]: + def to_list(self) -> list[T_contra]: return list(self._iterable) -class Pipeline(Generic[T, U]): - step: Step[T, U] | None - items: Iterable[T] | None +class Pipeline(Generic[T_contra, U_co]): + step: Step[T_contra, U_co] | None + items: Iterable[T_contra] | None def __init__( self, - step: Step[T, U] | None = None, - items: Iterable[T] | None = None, + step: Step[T_contra, U_co] | None = None, + items: Iterable[T_contra] | None = None, ): self.step = step self.items = items - def for_each(self, step: Callable[[U], Any]) -> Pipeline[T, U]: + def for_each(self, step: Callable[[U_co], Any]) -> Pipeline[T_contra, U_co]: return self | for_each(step) - def map(self, step: Callable[[U], W]) -> Pipeline[T, W]: + def map(self, step: Callable[[U_co], W]) -> Pipeline[T_contra, W]: return self | map(step) - def pipe(self, step: Step[U, V]) -> Pipeline[T, V]: + def pipe(self, step: Step[U_co, V]) -> Pipeline[T_contra, V]: return Pipeline(compose_steps(self.step, step), self.items) def for_batch( - self, step: Callable[[list[U]], Any], batch_size: int - ) -> Pipeline[T, U]: + self, step: Callable[[list[U_co]], Any], batch_size: int + ) -> Pipeline[T_contra, U_co]: return self | for_batch(step, batch_size) def batch( - self, step: Callable[[list[U]], Iterable[V]], batch_size: int - ) -> Pipeline[T, V]: + self, step: Callable[[list[U_co]], Iterable[V]], batch_size: int + ) -> Pipeline[T_contra, V]: return self | batch(step, batch_size) @overload - def filter(self, step: Callable[[U], TypeGuard[W]]) -> Pipeline[T, W]: + def filter(self, step: Callable[[U_co], TypeGuard[W]]) -> Pipeline[T_contra, W]: ... @overload - def filter(self, step: Callable[[U], bool]) -> Pipeline[T, U]: + def filter(self, step: Callable[[U_co], bool]) -> Pipeline[T_contra, U_co]: ... def filter(self, step): # type: ignore return self | filter(step) # type: ignore - def filter_not_none(self: Pipeline[T, X | None]) -> Pipeline[T, X]: + def filter_not_none(self: Pipeline[T_contra, X | None]) -> Pipeline[T_contra, X]: return self | filter(lambda item: item is not None) # type: ignore + def flatten(self: Pipeline[T_contra, Iterable[W]]) -> Pipeline[T_contra, W]: + return self | flatten() + @overload def branch( self, - f1: Callable[[Pipeline[U, U]], Pipeline[U, W]], + f1: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, W]], max_inflight: int = ..., - ) -> Pipeline[U, W]: + ) -> Pipeline[U_co, W]: ... @overload def branch( self, - f1: Callable[[Pipeline[U, U]], Pipeline[U, V]], - f2: Callable[[Pipeline[U, U]], Pipeline[U, W]], + f1: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, V]], + f2: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, W]], max_inflight: int = ..., - ) -> Pipeline[U, W | V]: + ) -> Pipeline[U_co, W | V]: ... @overload def branch( self, - f1: Callable[[Pipeline[U, U]], Pipeline[U, V]], - f2: Callable[[Pipeline[U, U]], Pipeline[U, W]], - f3: Callable[[Pipeline[U, U]], Pipeline[U, X]], + f1: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, V]], + f2: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, W]], + f3: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, X]], max_inflight: int = ..., - ) -> Pipeline[U, W | V | X]: + ) -> Pipeline[U_co, W | V | X]: ... @overload def branch( # noqa W291 self, - f1: Callable[[Pipeline[U, U]], Pipeline[U, V]], - f2: Callable[[Pipeline[U, U]], Pipeline[U, W]], - f3: Callable[[Pipeline[U, U]], Pipeline[U, X]], - f4: Callable[[Pipeline[U, U]], Pipeline[U, Y]], + f1: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, V]], + f2: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, W]], + f3: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, X]], + f4: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, Y]], max_inflight: int = ..., - ) -> Pipeline[U, W | V | X | Y]: + ) -> Pipeline[U_co, W | V | X | Y]: ... def branch( # type: ignore self, - *functions: Callable[[Pipeline[U, U]], Pipeline[U, Any]], + *functions: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, Any]], max_inflight: int = 1000, - ) -> Pipeline[U, Any]: + ) -> Pipeline[U_co, Any]: steps = [f(Pipeline()).step or identity for f in functions] return self | branch(*steps, max_inflight=max_inflight, pick_first=False) # type: ignore def branch_off( self, - *functions: Callable[[Pipeline[U, U]], Pipeline[U, Any]], + *functions: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, Any]], max_inflight: int = 1000, - ) -> Pipeline[T, U]: + ) -> Pipeline[T_contra, U_co]: steps = [f(Pipeline()).step or identity for f in functions] return self | branch( identity, *steps, max_inflight=max_inflight, pick_first=True ) # type: ignore - def process(self, items: Iterable[T] | None = None) -> IterableWrapper[U]: + def process(self, items: Iterable[T_contra] | None = None) -> IterableWrapper[U_co]: input_ = items or self.items if not input_: raise ValueError("input is None") @@ -163,10 +169,12 @@ def process(self, items: Iterable[T] | None = None) -> IterableWrapper[U]: raise ValueError("step is None") return IterableWrapper(self.step(input_)) - def __call__(self, items: Iterable[T] | None = None) -> IterableWrapper[U]: + def __call__( + self, items: Iterable[T_contra] | None = None + ) -> IterableWrapper[U_co]: return self.process(items) - def __or__(self, step: Step[U, V]) -> Pipeline[T, V]: + def __or__(self, step: Step[U_co, V]) -> Pipeline[T_contra, V]: return self.pipe(step) diff --git a/tests/test_flatten.py b/tests/test_flatten.py new file mode 100644 index 0000000..9ee8320 --- /dev/null +++ b/tests/test_flatten.py @@ -0,0 +1,12 @@ +from iter_pipes import PipelineFactory + + +def test_consume(): + result = ( + PipelineFactory[int]() # + .map(lambda x: range(x)) + .flatten() + .process(range(5)) + .to_list() + ) + assert result == [0, 0, 1, 0, 1, 2, 0, 1, 2, 3]