From 135bde7e9b67b61c4badfaf16179ecb69937c38f Mon Sep 17 00:00:00 2001
From: guillaume <guillaume@p9f.fr>
Date: Mon, 13 Nov 2023 12:16:53 +0000
Subject: [PATCH] noop

---
 .github/workflows/test.yml          |  40 +++++
 .gitignore                          |   1 +
 LICENSE                             |  21 +++
 README.md                           |   5 +
 iter_pipes/__init__.py              |   1 +
 iter_pipes/functional.py            | 252 ++++++++++++++++++++++++++++
 iter_pipes/main.py                  | 166 ++++++++++++++++++
 poetry.lock                         | 167 ++++++++++++++++++
 pyproject.toml                      |  25 +++
 tests/docs/test_01_functions.py     |  21 +++
 tests/docs/test_02_classes.py       |  30 ++++
 tests/docs/test_03_batches.py       |  28 ++++
 tests/docs/test_04_split.py         |  36 ++++
 tests/docs/test_05_filters.py       |  36 ++++
 tests/docs/test_06_pipe_overload.py |  33 ++++
 tests/docs/test_07_for.py           |  24 +++
 tests/docs/test_08_subpipeline.py   |  57 +++++++
 17 files changed, 943 insertions(+)
 create mode 100644 .github/workflows/test.yml
 create mode 100644 .gitignore
 create mode 100644 LICENSE
 create mode 100644 README.md
 create mode 100644 iter_pipes/__init__.py
 create mode 100644 iter_pipes/functional.py
 create mode 100644 iter_pipes/main.py
 create mode 100644 poetry.lock
 create mode 100644 pyproject.toml
 create mode 100644 tests/docs/test_01_functions.py
 create mode 100644 tests/docs/test_02_classes.py
 create mode 100644 tests/docs/test_03_batches.py
 create mode 100644 tests/docs/test_04_split.py
 create mode 100644 tests/docs/test_05_filters.py
 create mode 100644 tests/docs/test_06_pipe_overload.py
 create mode 100644 tests/docs/test_07_for.py
 create mode 100644 tests/docs/test_08_subpipeline.py

diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
new file mode 100644
index 0000000..e2ebdb9
--- /dev/null
+++ b/.github/workflows/test.yml
@@ -0,0 +1,40 @@
+name: test
+
+permissions:
+  contents: read
+
+on:
+  - pull_request
+  - push
+
+
+jobs:
+  test:
+    runs-on: ubuntu-latest
+
+    steps:
+      - uses: actions/checkout@v4
+      - name: Install poetry
+        run: curl -sSL https://install.python-poetry.org | python3 -
+        env:
+          POETRY_VERSION: 1.7.0
+      - name: Add Poetry to path
+        run: echo "${HOME}/.poetry/bin" >> $GITHUB_PATH
+      - name: Set up Python 3.11
+        uses: actions/setup-python@v4
+        with:
+          python-version: "3.11"
+          cache: "poetry"
+      - name: Install Poetry Packages
+        run: |
+          poetry env use "3.11"
+          poetry install --only dev
+      - name: Add venv to path
+        run: echo `poetry env info --path`/bin/ >> $GITHUB_PATH
+
+      - run: ruff check --output-format github .
+      - run: ruff format --check .
+      - run: mypy .
+      - run: |
+          pip install .
+          pytest -s
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..bee8a64
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+__pycache__
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..cf4b192
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2023 Bright Network
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..d08c467
--- /dev/null
+++ b/README.md
@@ -0,0 +1,5 @@
+## `iter_pipe`: Iterable Pipes
+
+Functional pythonic pipelines for iterables
+
+[Documentation](./tests/docs/)
diff --git a/iter_pipes/__init__.py b/iter_pipes/__init__.py
new file mode 100644
index 0000000..38f3549
--- /dev/null
+++ b/iter_pipes/__init__.py
@@ -0,0 +1 @@
+from .main import *  # noqa
diff --git a/iter_pipes/functional.py b/iter_pipes/functional.py
new file mode 100644
index 0000000..7736e8e
--- /dev/null
+++ b/iter_pipes/functional.py
@@ -0,0 +1,252 @@
+from __future__ import annotations
+
+import math
+from collections import deque
+from collections.abc import Callable, Iterable, Iterator
+from functools import partial
+from itertools import count, groupby
+from typing import Any, Generic, Literal, TypeGuard, TypeVar, overload
+
+__all__ = [
+    "map",
+    "filter",
+    "for_each",
+    "for_batch",
+    "for_all",
+    "batch",
+    "fork_forget",
+    "not_none",
+    "batch_where",
+    "fork",
+]
+
+T_contra = TypeVar("T_contra", contravariant=True)
+V_co = TypeVar("V_co", covariant=True)
+T = TypeVar("T")
+U = TypeVar("U")
+W = TypeVar("W")
+
+
+raw_filter = filter
+
+
+Step = Callable[[Iterable[T_contra]], Iterable[V_co]]
+
+
+def not_none(item: T | None) -> TypeGuard[T]:
+    return item is not None
+
+
+def map(step: Callable[[V_co], W]) -> Step[V_co, W]:
+    def f(data: Iterable[V_co]) -> Iterable[W]:
+        for item in data:
+            yield step(item)
+
+    return f
+
+
+def for_each(step: Callable[[V_co], Any]) -> Step[V_co, V_co]:
+    def f(data: Iterable[V_co]) -> Iterable[V_co]:
+        for item in data:
+            step(item)
+            yield item
+
+    return f
+
+
+def for_batch(step: Callable[[list[V_co]], Any], batch_size: int) -> Step[V_co, V_co]:
+    def f(data: Iterable[V_co]) -> Iterable[V_co]:
+        for _, batch_iterator in groupby(
+            zip(data, count()),
+            key=lambda x: math.floor(x[1] / batch_size),
+        ):
+            batch = [x[0] for x in batch_iterator]
+            step(batch)
+            yield from batch
+
+    return f
+
+
+def for_all(f: Step[V_co, Any]) -> Step[V_co, V_co]:
+    return fork_forget(f)
+
+
+def batch(step: Callable[[list[V_co]], Iterable[U]], batch_size: int) -> Step[V_co, U]:
+    def f(data: Iterable[V_co]) -> Iterable[U]:
+        for _, batch_iterator in groupby(
+            zip(data, count()),
+            key=lambda x: math.floor(x[1] / batch_size),
+        ):
+            yield from step([x[0] for x in batch_iterator])
+
+    return f
+
+
+@overload
+def filter(step: Callable[[V_co], TypeGuard[W]]) -> Step[V_co, W]:
+    ...
+
+
+@overload
+def filter(step: Callable[[V_co], bool]) -> Step[V_co, V_co]:
+    ...
+
+
+def filter(step: Callable[[V_co], bool]) -> Step[V_co, V_co]:  # type: ignore
+    return partial(raw_filter, step)  # type: ignore
+
+
+def batch_where(
+    step: Callable[[list[V_co]], Iterable[U]],
+    where: Callable[[V_co], bool],
+    batch_size: int,
+) -> Step[V_co, U | V_co]:
+    def f(data: Iterable[V_co]) -> Iterable[U | V_co]:
+        buffer: deque[V_co] = deque()
+
+        for item in data:
+            if not where(item):
+                yield item
+            else:
+                buffer.append(item)
+
+            if len(buffer) > batch_size:
+                yield from step(list(buffer))
+                buffer.clear()
+
+    return f
+
+
+def flatten(iterable: Iterable[Iterable[T_contra]]) -> Iterable[T_contra]:
+    for item in iterable:
+        yield from item
+
+
+class IteratorWrapper(Generic[W]):
+    def __init__(
+        self,
+        queue: deque[W],
+        consume_next: Callable[[], Any],
+    ):
+        self._queue = queue
+        self._consume_next = consume_next
+
+    def __iter__(self) -> Iterator[W]:
+        return self
+
+    def __next__(self) -> W:
+        if not self._queue:
+            self._consume_next()
+        return self._queue.popleft()
+
+
+@overload
+def fork(
+    step1: Step[T_contra, U] | None,
+    step2: Step[T_contra, V_co],
+    max_inflight: int | None,
+    pick_first: Literal[True],
+) -> Step[T_contra, U]:
+    ...
+
+
+@overload
+def fork(
+    step1: Step[T_contra, U] | None,
+    step2: Step[T_contra, V_co],
+    pick_first: Literal[True],
+) -> Step[T_contra, U]:
+    ...
+
+
+@overload
+def fork(
+    step1: Step[T_contra, U],
+    step2: Step[T_contra, V_co],
+    max_inflight: int | None,
+    pick_first: Literal[False] | None,
+) -> Step[T_contra, V_co | U]:
+    ...
+
+
+@overload
+def fork(
+    step1: Step[T_contra, U],
+    step2: Step[T_contra, V_co],
+    step3: Step[T_contra, W],
+    max_inflight: int | None,
+    pick_first: Literal[False] | None,
+) -> Step[T_contra, V_co | U | W]:
+    ...
+
+
+@overload
+def fork(
+    *steps: Step[T_contra, Any] | None,
+    max_inflight: int | None,
+    pick_first: Literal[False] | None,
+) -> Step[T_contra, Any]:
+    ...
+
+
+def fork(  # type: ignore
+    *steps: Step[T_contra, Any] | None,
+    max_inflight: int = 1000,
+    pick_first: bool = False,
+) -> Step[T_contra, Any]:
+    def f(iterable: Iterable[T_contra]) -> Iterable[Any]:
+        queues: list[deque] = [deque() for _ in steps]
+        it = iter(iterable)
+        paused_iterators: set[int] = set()
+
+        def consume_next(i: int) -> Callable[[], None]:
+            def wrapper() -> None:
+                val = next(it)
+                for d in queues:
+                    d.append(val)
+                nb_inflights = sum(len(q) for q in queues)
+                if nb_inflights > max_inflight:
+                    paused_iterators.add(i)
+                    raise StopIteration
+
+            return wrapper
+
+        iterators = [
+            iter((steps[i] or identity)(IteratorWrapper(queues[i], consume_next(i))))
+            for i in range(len(steps))
+        ]
+
+        pending_iterators = set(range(len(iterators)))
+
+        while len(pending_iterators):
+            i = max(  # the index of the iterator with the most inflight items
+                pending_iterators,
+                key=lambda i: len(queues[i]),
+            )
+            try:
+                val = next(iterators[i])
+                if not pick_first or i == 0:
+                    yield val
+            except StopIteration:
+                if i in paused_iterators:  # resume the iterator
+                    iterators[i] = iter(
+                        (steps[i] or identity)(
+                            IteratorWrapper(queues[i], consume_next(i))
+                        )
+                    )
+                    paused_iterators.remove(i)
+                else:
+                    pending_iterators.remove(i)
+
+    return f
+
+
+def identity(item: W) -> W:
+    return item
+
+
+def fork_forget(step: Step[U, Any], max_inflight: int = 3) -> Step[U, U]:
+    def f(data: Iterable[U]) -> Iterable[U]:
+        yield from fork(None, step, pick_first=True, max_inflight=max_inflight)(data)
+
+    return f
diff --git a/iter_pipes/main.py b/iter_pipes/main.py
new file mode 100644
index 0000000..8ee6d95
--- /dev/null
+++ b/iter_pipes/main.py
@@ -0,0 +1,166 @@
+from __future__ import annotations
+
+from collections import deque
+from collections.abc import Callable, Iterator
+from typing import Any, Generic, Iterable, TypeGuard, TypeVar, overload
+
+from iter_pipes.functional import (
+    batch,
+    filter,
+    for_batch,
+    for_each,
+    fork,
+    fork_forget,
+    identity,
+    map,
+)
+
+T_contra = TypeVar("T_contra", contravariant=True)
+V_co = TypeVar("V_co", covariant=True)
+U = TypeVar("U")
+W = TypeVar("W")
+X = TypeVar("X")
+Y = TypeVar("Y")
+
+__all__ = ["Pipeline", "PipelineFactory"]
+
+
+raw_filter = filter
+
+
+Step = Callable[[Iterable[T_contra]], Iterable[V_co]]
+
+
+def compose_steps(
+    step1: Step[T_contra, V_co] | None, step2: Step[V_co, U]
+) -> Step[T_contra, U]:
+    if step1 is None:
+        return step2  # type: ignore
+
+    def composed(items: Iterable[T_contra]) -> Iterable[U]:
+        return step2(step1(items))
+
+    return composed
+
+
+class IterableWrapper(Generic[T_contra]):
+    def __init__(self, iterable: Iterable[T_contra]):
+        self._iterable = iterable
+
+    def __iter__(self) -> Iterator[T_contra]:
+        return iter(self._iterable)
+
+    def consume(self) -> None:
+        deque(self._iterable)
+
+    def to_list(self) -> list[T_contra]:
+        return list(self._iterable)
+
+
+class Pipeline(Generic[T_contra, V_co]):
+    step: Step[T_contra, V_co] | None
+    items: Iterable[T_contra] | None
+
+    def __init__(
+        self,
+        step: Step[T_contra, V_co] | None = None,
+        items: Iterable[T_contra] | None = None,
+    ):
+        self.step = step
+        self.items = items
+
+    def for_each(self, step: Callable[[V_co], Any]) -> Pipeline[T_contra, V_co]:
+        return self | for_each(step)
+
+    def map(self, step: Callable[[V_co], W]) -> Pipeline[T_contra, W]:
+        return self | map(step)
+
+    def pipe(self, step: Step[V_co, U]) -> Pipeline[T_contra, U]:
+        return Pipeline(compose_steps(self.step, step), self.items)
+
+    def for_batch(
+        self, step: Callable[[list[V_co]], Any], batch_size: int
+    ) -> Pipeline[T_contra, V_co]:
+        return self | for_batch(step, batch_size)
+
+    def for_all(self, f: Step[V_co, Any]) -> Pipeline[T_contra, V_co]:
+        return self | fork_forget(f)
+
+    def batch(
+        self, step: Callable[[list[V_co]], Iterable[U]], batch_size: int
+    ) -> Pipeline[T_contra, U]:
+        return self | batch(step, batch_size)
+
+    @overload
+    def filter(self, step: Callable[[V_co], TypeGuard[W]]) -> Pipeline[T_contra, W]:
+        ...
+
+    @overload
+    def filter(self, step: Callable[[V_co], bool]) -> Pipeline[T_contra, V_co]:
+        ...
+
+    def filter(self, step):  # type: ignore
+        return self | filter(step)  # type: ignore
+
+    @overload
+    def split(
+        self,
+        f1: Callable[[Pipeline[V_co, V_co]], Pipeline[V_co, W]],
+        max_inflight: int = ...,
+    ) -> Pipeline[V_co, W]:
+        ...
+
+    @overload
+    def split(
+        self,
+        f1: Callable[[Pipeline[V_co, V_co]], Pipeline[V_co, U]],
+        f2: Callable[[Pipeline[V_co, V_co]], Pipeline[V_co, W]],
+        max_inflight: int = ...,
+    ) -> Pipeline[V_co, W | U]:
+        ...
+
+    @overload
+    def split(
+        self,
+        f1: Callable[[Pipeline[V_co, V_co]], Pipeline[V_co, U]],
+        f2: Callable[[Pipeline[V_co, V_co]], Pipeline[V_co, W]],
+        f3: Callable[[Pipeline[V_co, V_co]], Pipeline[V_co, W]],
+        max_inflight: int = ...,
+    ) -> Pipeline[V_co, W | U]:
+        ...
+
+    def split(  # type: ignore
+        self,
+        *functions: Callable[[Pipeline[V_co, V_co]], Pipeline[V_co, Any]],
+        max_inflight: int = 1000,
+    ) -> Pipeline[V_co, Any]:
+        steps = [f(Pipeline()).step or identity for f in functions]
+        return self | fork(*steps, max_inflight=max_inflight, pick_first=False)  # type: ignore
+
+    def subpipeline(
+        self,
+        *functions: Callable[[Pipeline[V_co, V_co]], Pipeline[V_co, Any]],
+        max_inflight: int = 1000,
+    ) -> Pipeline[T_contra, V_co]:
+        steps = [f(Pipeline()).step or identity for f in functions]
+        return self | fork(identity, *steps, max_inflight=max_inflight, pick_first=True)  # type: ignore
+
+    def process(self, items: Iterable[T_contra] | None = None) -> IterableWrapper[V_co]:
+        input_ = items or self.items
+        if not input_:
+            raise Exception("input is None")
+        if not self.step:
+            raise Exception("step is None")
+        return IterableWrapper(self.step(input_))
+
+    def __call__(
+        self, items: Iterable[T_contra] | None = None
+    ) -> IterableWrapper[V_co]:
+        return self.process(items)
+
+    def __or__(self, step: Step[V_co, U]) -> Pipeline[T_contra, U]:
+        return self.pipe(step)
+
+
+class PipelineFactory(Generic[U], Pipeline[U, U]):
+    pass
diff --git a/poetry.lock b/poetry.lock
new file mode 100644
index 0000000..b117770
--- /dev/null
+++ b/poetry.lock
@@ -0,0 +1,167 @@
+# This file is automatically @generated by Poetry 1.7.0 and should not be changed by hand.
+
+[[package]]
+name = "colorama"
+version = "0.4.6"
+description = "Cross-platform colored terminal text."
+optional = false
+python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
+files = [
+    {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
+    {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
+]
+
+[[package]]
+name = "iniconfig"
+version = "2.0.0"
+description = "brain-dead simple config-ini parsing"
+optional = false
+python-versions = ">=3.7"
+files = [
+    {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"},
+    {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
+]
+
+[[package]]
+name = "mypy"
+version = "1.6.1"
+description = "Optional static typing for Python"
+optional = false
+python-versions = ">=3.8"
+files = [
+    {file = "mypy-1.6.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e5012e5cc2ac628177eaac0e83d622b2dd499e28253d4107a08ecc59ede3fc2c"},
+    {file = "mypy-1.6.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d8fbb68711905f8912e5af474ca8b78d077447d8f3918997fecbf26943ff3cbb"},
+    {file = "mypy-1.6.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:21a1ad938fee7d2d96ca666c77b7c494c3c5bd88dff792220e1afbebb2925b5e"},
+    {file = "mypy-1.6.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:b96ae2c1279d1065413965c607712006205a9ac541895004a1e0d4f281f2ff9f"},
+    {file = "mypy-1.6.1-cp310-cp310-win_amd64.whl", hash = "sha256:40b1844d2e8b232ed92e50a4bd11c48d2daa351f9deee6c194b83bf03e418b0c"},
+    {file = "mypy-1.6.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:81af8adaa5e3099469e7623436881eff6b3b06db5ef75e6f5b6d4871263547e5"},
+    {file = "mypy-1.6.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:8c223fa57cb154c7eab5156856c231c3f5eace1e0bed9b32a24696b7ba3c3245"},
+    {file = "mypy-1.6.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a8032e00ce71c3ceb93eeba63963b864bf635a18f6c0c12da6c13c450eedb183"},
+    {file = "mypy-1.6.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4c46b51de523817a0045b150ed11b56f9fff55f12b9edd0f3ed35b15a2809de0"},
+    {file = "mypy-1.6.1-cp311-cp311-win_amd64.whl", hash = "sha256:19f905bcfd9e167159b3d63ecd8cb5e696151c3e59a1742e79bc3bcb540c42c7"},
+    {file = "mypy-1.6.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:82e469518d3e9a321912955cc702d418773a2fd1e91c651280a1bda10622f02f"},
+    {file = "mypy-1.6.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:d4473c22cc296425bbbce7e9429588e76e05bc7342da359d6520b6427bf76660"},
+    {file = "mypy-1.6.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:59a0d7d24dfb26729e0a068639a6ce3500e31d6655df8557156c51c1cb874ce7"},
+    {file = "mypy-1.6.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:cfd13d47b29ed3bbaafaff7d8b21e90d827631afda134836962011acb5904b71"},
+    {file = "mypy-1.6.1-cp312-cp312-win_amd64.whl", hash = "sha256:eb4f18589d196a4cbe5290b435d135dee96567e07c2b2d43b5c4621b6501531a"},
+    {file = "mypy-1.6.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:41697773aa0bf53ff917aa077e2cde7aa50254f28750f9b88884acea38a16169"},
+    {file = "mypy-1.6.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7274b0c57737bd3476d2229c6389b2ec9eefeb090bbaf77777e9d6b1b5a9d143"},
+    {file = "mypy-1.6.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bbaf4662e498c8c2e352da5f5bca5ab29d378895fa2d980630656178bd607c46"},
+    {file = "mypy-1.6.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:bb8ccb4724f7d8601938571bf3f24da0da791fe2db7be3d9e79849cb64e0ae85"},
+    {file = "mypy-1.6.1-cp38-cp38-win_amd64.whl", hash = "sha256:68351911e85145f582b5aa6cd9ad666c8958bcae897a1bfda8f4940472463c45"},
+    {file = "mypy-1.6.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:49ae115da099dcc0922a7a895c1eec82c1518109ea5c162ed50e3b3594c71208"},
+    {file = "mypy-1.6.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:8b27958f8c76bed8edaa63da0739d76e4e9ad4ed325c814f9b3851425582a3cd"},
+    {file = "mypy-1.6.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:925cd6a3b7b55dfba252b7c4561892311c5358c6b5a601847015a1ad4eb7d332"},
+    {file = "mypy-1.6.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8f57e6b6927a49550da3d122f0cb983d400f843a8a82e65b3b380d3d7259468f"},
+    {file = "mypy-1.6.1-cp39-cp39-win_amd64.whl", hash = "sha256:a43ef1c8ddfdb9575691720b6352761f3f53d85f1b57d7745701041053deff30"},
+    {file = "mypy-1.6.1-py3-none-any.whl", hash = "sha256:4cbe68ef919c28ea561165206a2dcb68591c50f3bcf777932323bc208d949cf1"},
+    {file = "mypy-1.6.1.tar.gz", hash = "sha256:4d01c00d09a0be62a4ca3f933e315455bde83f37f892ba4b08ce92f3cf44bcc1"},
+]
+
+[package.dependencies]
+mypy-extensions = ">=1.0.0"
+typing-extensions = ">=4.1.0"
+
+[package.extras]
+dmypy = ["psutil (>=4.0)"]
+install-types = ["pip"]
+reports = ["lxml"]
+
+[[package]]
+name = "mypy-extensions"
+version = "1.0.0"
+description = "Type system extensions for programs checked with the mypy type checker."
+optional = false
+python-versions = ">=3.5"
+files = [
+    {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"},
+    {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"},
+]
+
+[[package]]
+name = "packaging"
+version = "23.2"
+description = "Core utilities for Python packages"
+optional = false
+python-versions = ">=3.7"
+files = [
+    {file = "packaging-23.2-py3-none-any.whl", hash = "sha256:8c491190033a9af7e1d931d0b5dacc2ef47509b34dd0de67ed209b5203fc88c7"},
+    {file = "packaging-23.2.tar.gz", hash = "sha256:048fb0e9405036518eaaf48a55953c750c11e1a1b68e0dd1a9d62ed0c092cfc5"},
+]
+
+[[package]]
+name = "pluggy"
+version = "1.3.0"
+description = "plugin and hook calling mechanisms for python"
+optional = false
+python-versions = ">=3.8"
+files = [
+    {file = "pluggy-1.3.0-py3-none-any.whl", hash = "sha256:d89c696a773f8bd377d18e5ecda92b7a3793cbe66c87060a6fb58c7b6e1061f7"},
+    {file = "pluggy-1.3.0.tar.gz", hash = "sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12"},
+]
+
+[package.extras]
+dev = ["pre-commit", "tox"]
+testing = ["pytest", "pytest-benchmark"]
+
+[[package]]
+name = "pytest"
+version = "7.4.3"
+description = "pytest: simple powerful testing with Python"
+optional = false
+python-versions = ">=3.7"
+files = [
+    {file = "pytest-7.4.3-py3-none-any.whl", hash = "sha256:0d009c083ea859a71b76adf7c1d502e4bc170b80a8ef002da5806527b9591fac"},
+    {file = "pytest-7.4.3.tar.gz", hash = "sha256:d989d136982de4e3b29dabcc838ad581c64e8ed52c11fbe86ddebd9da0818cd5"},
+]
+
+[package.dependencies]
+colorama = {version = "*", markers = "sys_platform == \"win32\""}
+iniconfig = "*"
+packaging = "*"
+pluggy = ">=0.12,<2.0"
+
+[package.extras]
+testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
+
+[[package]]
+name = "ruff"
+version = "0.1.4"
+description = "An extremely fast Python linter and code formatter, written in Rust."
+optional = false
+python-versions = ">=3.7"
+files = [
+    {file = "ruff-0.1.4-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:864958706b669cce31d629902175138ad8a069d99ca53514611521f532d91495"},
+    {file = "ruff-0.1.4-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:9fdd61883bb34317c788af87f4cd75dfee3a73f5ded714b77ba928e418d6e39e"},
+    {file = "ruff-0.1.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b4eaca8c9cc39aa7f0f0d7b8fe24ecb51232d1bb620fc4441a61161be4a17539"},
+    {file = "ruff-0.1.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a9a1301dc43cbf633fb603242bccd0aaa34834750a14a4c1817e2e5c8d60de17"},
+    {file = "ruff-0.1.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:78e8db8ab6f100f02e28b3d713270c857d370b8d61871d5c7d1702ae411df683"},
+    {file = "ruff-0.1.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:80fea754eaae06335784b8ea053d6eb8e9aac75359ebddd6fee0858e87c8d510"},
+    {file = "ruff-0.1.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6bc02a480d4bfffd163a723698da15d1a9aec2fced4c06f2a753f87f4ce6969c"},
+    {file = "ruff-0.1.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9862811b403063765b03e716dac0fda8fdbe78b675cd947ed5873506448acea4"},
+    {file = "ruff-0.1.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:58826efb8b3efbb59bb306f4b19640b7e366967a31c049d49311d9eb3a4c60cb"},
+    {file = "ruff-0.1.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:fdfd453fc91d9d86d6aaa33b1bafa69d114cf7421057868f0b79104079d3e66e"},
+    {file = "ruff-0.1.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:e8791482d508bd0b36c76481ad3117987301b86072158bdb69d796503e1c84a8"},
+    {file = "ruff-0.1.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:01206e361021426e3c1b7fba06ddcb20dbc5037d64f6841e5f2b21084dc51800"},
+    {file = "ruff-0.1.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:645591a613a42cb7e5c2b667cbefd3877b21e0252b59272ba7212c3d35a5819f"},
+    {file = "ruff-0.1.4-py3-none-win32.whl", hash = "sha256:99908ca2b3b85bffe7e1414275d004917d1e0dfc99d497ccd2ecd19ad115fd0d"},
+    {file = "ruff-0.1.4-py3-none-win_amd64.whl", hash = "sha256:1dfd6bf8f6ad0a4ac99333f437e0ec168989adc5d837ecd38ddb2cc4a2e3db8a"},
+    {file = "ruff-0.1.4-py3-none-win_arm64.whl", hash = "sha256:d98ae9ebf56444e18a3e3652b3383204748f73e247dea6caaf8b52d37e6b32da"},
+    {file = "ruff-0.1.4.tar.gz", hash = "sha256:21520ecca4cc555162068d87c747b8f95e1e95f8ecfcbbe59e8dd00710586315"},
+]
+
+[[package]]
+name = "typing-extensions"
+version = "4.8.0"
+description = "Backported and Experimental Type Hints for Python 3.8+"
+optional = false
+python-versions = ">=3.8"
+files = [
+    {file = "typing_extensions-4.8.0-py3-none-any.whl", hash = "sha256:8f92fc8806f9a6b641eaa5318da32b44d401efaac0f6678c9bc448ba3605faa0"},
+    {file = "typing_extensions-4.8.0.tar.gz", hash = "sha256:df8e4339e9cb77357558cbdbceca33c303714cf861d1eef15e1070055ae8b7ef"},
+]
+
+[metadata]
+lock-version = "2.0"
+python-versions = "^3.11"
+content-hash = "01b614d20596e6a0f81a4d5e7678a40b4b7ff3d7b663b358cadba87be27d9f5e"
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..4a28284
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,25 @@
+[tool.poetry]
+name = "iter-pipes"
+version = "0.1.0"
+description = ""
+authors = ["guillaume <guillaume@p9f.fr>"]
+readme = "README.md"
+
+[tool.poetry.dependencies]
+python = "^3.11"
+
+[tool.poetry.group.dev.dependencies]
+ruff = "^0.1.4"
+mypy = "^1.6.1"
+pytest = "^7.4.3"
+
+[build-system]
+requires = ["poetry-core"]
+build-backend = "poetry.core.masonry.api"
+
+[tool.ruff]
+select = ["ALL"]
+ignore = ["INP001", "D", "COM", "PGH", "ANN101", "ANN204", "A003", "TRY", "EM101", "A001", "ISC001"]
+
+[tool.ruff.lint.extend-per-file-ignores]
+"tests/**" = ["ANN201", "S101", "FA102", "PLR2004"]
diff --git a/tests/docs/test_01_functions.py b/tests/docs/test_01_functions.py
new file mode 100644
index 0000000..fc3ed71
--- /dev/null
+++ b/tests/docs/test_01_functions.py
@@ -0,0 +1,21 @@
+from iter_pipes.main import PipelineFactory
+
+
+def minus(item: int) -> int:
+    return -item
+
+
+def to_str(item: int) -> str:
+    return str(item)
+
+
+def test_main():
+    p = (
+        PipelineFactory[int]()  #
+        .map(minus)
+        .map(to_str)
+        .process(range(10))
+        .to_list()
+    )
+
+    assert p == ["0", "-1", "-2", "-3", "-4", "-5", "-6", "-7", "-8", "-9"]
diff --git a/tests/docs/test_02_classes.py b/tests/docs/test_02_classes.py
new file mode 100644
index 0000000..f585421
--- /dev/null
+++ b/tests/docs/test_02_classes.py
@@ -0,0 +1,30 @@
+from iter_pipes.main import PipelineFactory
+
+# you can use classes instead of functions
+# this is useful for:
+# - store state
+# - dependency injection
+# - break down complex logic into smaller pieces
+
+
+class ToStr:
+    def __call__(self, item: int) -> str:
+        return str(item)
+
+
+class Multiply:
+    def __init__(self, value: int):
+        self.value = value
+
+    def __call__(self, item: int) -> int:
+        return item * self.value
+
+
+def test_main():
+    p = (
+        PipelineFactory[int]()  #
+        .map(Multiply(-1))
+        .map(ToStr())
+    )(range(10)).to_list()
+
+    assert p == ["0", "-1", "-2", "-3", "-4", "-5", "-6", "-7", "-8", "-9"]
diff --git a/tests/docs/test_03_batches.py b/tests/docs/test_03_batches.py
new file mode 100644
index 0000000..c8a5fd1
--- /dev/null
+++ b/tests/docs/test_03_batches.py
@@ -0,0 +1,28 @@
+from functools import reduce
+
+from iter_pipes.main import PipelineFactory
+
+# you can use process events by batches
+# especially useful to batch db requests / network calls
+
+
+class ToStr:
+    def __call__(self, item: int) -> str:
+        return str(item)
+
+
+class MultiplyTogether:
+    def __call__(self, items: list[int]) -> list[int]:
+        return [reduce(lambda x, y: x * y, items, 1) for _ in items]
+
+
+def test_main():
+    p = (
+        PipelineFactory[int]()  #
+        .batch(MultiplyTogether(), batch_size=3)
+        .map(ToStr())
+        .process(range(10))
+        .to_list()
+    )
+
+    assert p == ["0", "0", "0", "60", "60", "60", "336", "336", "336", "9"]
diff --git a/tests/docs/test_04_split.py b/tests/docs/test_04_split.py
new file mode 100644
index 0000000..54dba66
--- /dev/null
+++ b/tests/docs/test_04_split.py
@@ -0,0 +1,36 @@
+from iter_pipes.main import PipelineFactory
+
+
+def minus(item: int) -> int:
+    return -item
+
+
+class Add:
+    def __init__(self, value: int):
+        self.value = value
+
+    def __call__(self, item: int) -> int:
+        return item + self.value
+
+
+def join(item: tuple[int | None, int | None]) -> str:
+    return f"{item[0]}:{item[1]}"
+
+
+def to_str(item: int) -> str:
+    return str(item)
+
+
+def test_main():
+    p = (
+        PipelineFactory[int]()  #
+        .map(minus)
+        .split(
+            lambda pipeline: pipeline.map(Add(1)),
+            lambda pipeline: pipeline.map(Add(2)).map(minus),
+        )
+        .process(range(10))
+        .to_list()
+    )
+
+    assert p == [1, -2, 0, -1, -1, 0, -2, 1, -3, 2, -4, 3, -5, 4, -6, 5, -7, 6, -8, 7]
diff --git a/tests/docs/test_05_filters.py b/tests/docs/test_05_filters.py
new file mode 100644
index 0000000..02f3df1
--- /dev/null
+++ b/tests/docs/test_05_filters.py
@@ -0,0 +1,36 @@
+from iter_pipes.main import PipelineFactory
+
+
+def minus(item: int) -> int:
+    return -item
+
+
+def lte_4(item: int) -> bool:
+    return item <= 4
+
+
+class Add:
+    def __init__(self, value: int):
+        self.value = value
+
+    def __call__(self, item: int) -> int:
+        return item + self.value
+
+
+def join(item: tuple[int | None, int | None]) -> str:
+    return f"{item[0]}:{item[1]}"
+
+
+def test_main():
+    p = (
+        PipelineFactory[int]()  #
+        .map(minus)
+        .split(
+            lambda pipeline: pipeline.map(Add(1)),
+            lambda pipeline: pipeline.map(Add(2)).map(minus).filter(lte_4),
+        )
+        .process(range(9))
+        .to_list()
+    )
+
+    assert p == [1, -2, 0, -1, -1, 0, -2, 1, -3, 2, -4, 3, -5, 4, -6, -7]
diff --git a/tests/docs/test_06_pipe_overload.py b/tests/docs/test_06_pipe_overload.py
new file mode 100644
index 0000000..811e776
--- /dev/null
+++ b/tests/docs/test_06_pipe_overload.py
@@ -0,0 +1,33 @@
+import iter_pipes.functional as itp
+from iter_pipes.main import PipelineFactory
+
+# support the pipe operator (|) overload
+# in particular, the auto-formating in subpipelines is way prettier
+
+
+def minus(item: int) -> int:
+    return -item
+
+
+def lte_4(item: int) -> bool:
+    return item <= 4
+
+
+def test_main():
+    p = (
+        PipelineFactory[int]()  #
+        .map(minus)
+        .split(
+            lambda pipeline: pipeline  #
+            | itp.map(minus)
+            | itp.filter(lte_4),
+            lambda pipeline: pipeline  #
+            | itp.map(minus)
+            | itp.map(minus)
+            | itp.filter(lte_4),
+        )
+        .process(range(9))
+        .to_list()
+    )
+
+    assert p == [0, 0, 1, -1, 2, -2, 3, -3, 4, -4, -5, -6, -7, -8]
diff --git a/tests/docs/test_07_for.py b/tests/docs/test_07_for.py
new file mode 100644
index 0000000..45dfb17
--- /dev/null
+++ b/tests/docs/test_07_for.py
@@ -0,0 +1,24 @@
+from iter_pipes.main import PipelineFactory
+
+# `for` / `for_each` / `for_all` allow you to run functions at certain point of the
+# pipeline, but ignore the return value
+
+
+def minus(item: int) -> int:
+    return -item
+
+
+def to_str(item: int) -> str:
+    return str(item)
+
+
+def test_main():
+    p = (
+        PipelineFactory[int]()  #
+        .map(minus)
+        .for_each(to_str)
+        .map(minus)  # still an iterable of int
+        .map(to_str)
+    )(range(10)).to_list()
+
+    assert p == ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"]
diff --git a/tests/docs/test_08_subpipeline.py b/tests/docs/test_08_subpipeline.py
new file mode 100644
index 0000000..5892926
--- /dev/null
+++ b/tests/docs/test_08_subpipeline.py
@@ -0,0 +1,57 @@
+from iter_pipes.functional import filter
+from iter_pipes.main import PipelineFactory
+
+# subpipeline allow you to run a pipeline inside another pipeline
+# its return value is ignored
+
+
+def minus(item: int) -> int:
+    return -item
+
+
+class Add:
+    def __init__(self, value: int):
+        self.value = value
+
+    def __call__(self, item: int) -> int:
+        return item + self.value
+
+
+def join(item: tuple[int | None, int | None]) -> str:
+    return f"{item[0]}:{item[1]}"
+
+
+def counter():
+    memory = set()
+
+    def inc(x: int) -> None:
+        memory.add(x)
+
+    def dec(x: int) -> None:
+        memory.remove(x)
+
+    def get() -> int:
+        return len(memory)
+
+    return inc, dec, get
+
+
+def test_main():
+    max_inflight = 30
+    inc, dec, get = counter()
+    p = (
+        PipelineFactory[int]()  #
+        .for_each(inc)
+        .subpipeline(
+            lambda pipeline: pipeline  #
+            | filter(lambda _: False),
+            max_inflight=max_inflight,
+        )
+        .for_each(dec)
+        .process(range(10**3))
+    )
+    result = []
+    for x in p:
+        assert get() <= max_inflight
+        result.append(x)
+    assert result == list(range(10**3))