Skip to content

Commit

Permalink
chore: add flatten and improve types variance
Browse files Browse the repository at this point in the history
  • Loading branch information
p9f committed Nov 27, 2023
1 parent c812946 commit 2a4e512
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 45 deletions.
8 changes: 8 additions & 0 deletions iter_pipes/functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
Step = Callable[[Iterable[T]], Iterable[V]]


def flatten() -> Step[Iterable[T], T]:
def f(data: Iterable[Iterable[T]]) -> Iterable[T]:
for x in data:
yield from x

return f


def map(step: Callable[[V], W]) -> Step[V, W]:
def f(data: Iterable[V]) -> Iterable[W]:
for item in data:
Expand Down
98 changes: 53 additions & 45 deletions iter_pipes/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
batch,
branch,
filter,
flatten,
for_batch,
for_each,
identity,
map,
)

T = TypeVar("T")
U = TypeVar("U")
T_contra = TypeVar("T_contra", contravariant=True)
U_co = TypeVar("U_co", covariant=True)
V = TypeVar("V")
W = TypeVar("W")
X = TypeVar("X")
Expand All @@ -27,146 +28,153 @@
raw_filter = filter


Step = Callable[[Iterable[T]], Iterable[U]]
Step = Callable[[Iterable[T_contra]], Iterable[U_co]]


def compose_steps(step1: Step[T, U] | None, step2: Step[U, V]) -> Step[T, V]:
def compose_steps(
step1: Step[T_contra, U_co] | None, step2: Step[U_co, V]
) -> Step[T_contra, V]:
if step1 is None:
return step2 # type: ignore

def composed(items: Iterable[T]) -> Iterable[V]:
def composed(items: Iterable[T_contra]) -> Iterable[V]:
return step2(step1(items))

return composed


class IterableWrapper(Generic[T]):
def __init__(self, iterable: Iterable[T]):
class IterableWrapper(Generic[T_contra]):
def __init__(self, iterable: Iterable[T_contra]):
self._iterable = iterable

def __iter__(self) -> Iterator[T]:
def __iter__(self) -> Iterator[T_contra]:
return iter(self._iterable)

def consume(self) -> None:
deque(self._iterable)

def to_list(self) -> list[T]:
def to_list(self) -> list[T_contra]:
return list(self._iterable)


class Pipeline(Generic[T, U]):
step: Step[T, U] | None
items: Iterable[T] | None
class Pipeline(Generic[T_contra, U_co]):
step: Step[T_contra, U_co] | None
items: Iterable[T_contra] | None

def __init__(
self,
step: Step[T, U] | None = None,
items: Iterable[T] | None = None,
step: Step[T_contra, U_co] | None = None,
items: Iterable[T_contra] | None = None,
):
self.step = step
self.items = items

def for_each(self, step: Callable[[U], Any]) -> Pipeline[T, U]:
def for_each(self, step: Callable[[U_co], Any]) -> Pipeline[T_contra, U_co]:
return self | for_each(step)

def map(self, step: Callable[[U], W]) -> Pipeline[T, W]:
def map(self, step: Callable[[U_co], W]) -> Pipeline[T_contra, W]:
return self | map(step)

def pipe(self, step: Step[U, V]) -> Pipeline[T, V]:
def pipe(self, step: Step[U_co, V]) -> Pipeline[T_contra, V]:
return Pipeline(compose_steps(self.step, step), self.items)

def for_batch(
self, step: Callable[[list[U]], Any], batch_size: int
) -> Pipeline[T, U]:
self, step: Callable[[list[U_co]], Any], batch_size: int
) -> Pipeline[T_contra, U_co]:
return self | for_batch(step, batch_size)

def batch(
self, step: Callable[[list[U]], Iterable[V]], batch_size: int
) -> Pipeline[T, V]:
self, step: Callable[[list[U_co]], Iterable[V]], batch_size: int
) -> Pipeline[T_contra, V]:
return self | batch(step, batch_size)

@overload
def filter(self, step: Callable[[U], TypeGuard[W]]) -> Pipeline[T, W]:
def filter(self, step: Callable[[U_co], TypeGuard[W]]) -> Pipeline[T_contra, W]:
...

@overload
def filter(self, step: Callable[[U], bool]) -> Pipeline[T, U]:
def filter(self, step: Callable[[U_co], bool]) -> Pipeline[T_contra, U_co]:
...

def filter(self, step): # type: ignore
return self | filter(step) # type: ignore

def filter_not_none(self: Pipeline[T, X | None]) -> Pipeline[T, X]:
def filter_not_none(self: Pipeline[T_contra, X | None]) -> Pipeline[T_contra, X]:
return self | filter(lambda item: item is not None) # type: ignore

def flatten(self: Pipeline[T_contra, Iterable[W]]) -> Pipeline[T_contra, W]:
return self | flatten()

@overload
def branch(
self,
f1: Callable[[Pipeline[U, U]], Pipeline[U, W]],
f1: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, W]],
max_inflight: int = ...,
) -> Pipeline[U, W]:
) -> Pipeline[U_co, W]:
...

@overload
def branch(
self,
f1: Callable[[Pipeline[U, U]], Pipeline[U, V]],
f2: Callable[[Pipeline[U, U]], Pipeline[U, W]],
f1: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, V]],
f2: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, W]],
max_inflight: int = ...,
) -> Pipeline[U, W | V]:
) -> Pipeline[U_co, W | V]:
...

@overload
def branch(
self,
f1: Callable[[Pipeline[U, U]], Pipeline[U, V]],
f2: Callable[[Pipeline[U, U]], Pipeline[U, W]],
f3: Callable[[Pipeline[U, U]], Pipeline[U, X]],
f1: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, V]],
f2: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, W]],
f3: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, X]],
max_inflight: int = ...,
) -> Pipeline[U, W | V | X]:
) -> Pipeline[U_co, W | V | X]:
...

@overload
def branch( # noqa W291
self,
f1: Callable[[Pipeline[U, U]], Pipeline[U, V]],
f2: Callable[[Pipeline[U, U]], Pipeline[U, W]],
f3: Callable[[Pipeline[U, U]], Pipeline[U, X]],
f4: Callable[[Pipeline[U, U]], Pipeline[U, Y]],
f1: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, V]],
f2: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, W]],
f3: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, X]],
f4: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, Y]],
max_inflight: int = ...,
) -> Pipeline[U, W | V | X | Y]:
) -> Pipeline[U_co, W | V | X | Y]:
...

def branch( # type: ignore
self,
*functions: Callable[[Pipeline[U, U]], Pipeline[U, Any]],
*functions: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, Any]],
max_inflight: int = 1000,
) -> Pipeline[U, Any]:
) -> Pipeline[U_co, Any]:
steps = [f(Pipeline()).step or identity for f in functions]
return self | branch(*steps, max_inflight=max_inflight, pick_first=False) # type: ignore

def branch_off(
self,
*functions: Callable[[Pipeline[U, U]], Pipeline[U, Any]],
*functions: Callable[[Pipeline[U_co, U_co]], Pipeline[U_co, Any]],
max_inflight: int = 1000,
) -> Pipeline[T, U]:
) -> Pipeline[T_contra, U_co]:
steps = [f(Pipeline()).step or identity for f in functions]
return self | branch(
identity, *steps, max_inflight=max_inflight, pick_first=True
) # type: ignore

def process(self, items: Iterable[T] | None = None) -> IterableWrapper[U]:
def process(self, items: Iterable[T_contra] | None = None) -> IterableWrapper[U_co]:
input_ = items or self.items
if not input_:
raise ValueError("input is None")
if not self.step:
raise ValueError("step is None")
return IterableWrapper(self.step(input_))

def __call__(self, items: Iterable[T] | None = None) -> IterableWrapper[U]:
def __call__(
self, items: Iterable[T_contra] | None = None
) -> IterableWrapper[U_co]:
return self.process(items)

def __or__(self, step: Step[U, V]) -> Pipeline[T, V]:
def __or__(self, step: Step[U_co, V]) -> Pipeline[T_contra, V]:
return self.pipe(step)


Expand Down
12 changes: 12 additions & 0 deletions tests/test_flatten.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from iter_pipes import PipelineFactory


def test_consume():
result = (
PipelineFactory[int]() #
.map(lambda x: range(x))
.flatten()
.process(range(5))
.to_list()
)
assert result == [0, 0, 1, 0, 1, 2, 0, 1, 2, 3]

0 comments on commit 2a4e512

Please sign in to comment.