Skip to content

Commit

Permalink
noop
Browse files Browse the repository at this point in the history
  • Loading branch information
p9f committed Nov 15, 2023
0 parents commit 30f2456
Show file tree
Hide file tree
Showing 21 changed files with 1,118 additions and 0 deletions.
38 changes: 38 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: test

permissions:
contents: read

on:
- pull_request
- push


jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Install poetry
run: curl -sSL https://install.python-poetry.org | python3 -
env:
POETRY_VERSION: 1.7.0
- name: Add Poetry to path
run: echo "${HOME}/.poetry/bin" >> $GITHUB_PATH
- name: Set up Python 3.11
uses: actions/setup-python@v4
with:
python-version: "3.11"
cache: "poetry"
- name: Install Poetry Packages
run: |
poetry env use "3.11"
poetry install --only dev
- name: Add venv to path
run: echo `poetry env info --path`/bin/ >> $GITHUB_PATH

- run: ruff check --output-format github .
- run: ruff format --check .
- run: mypy --check-untyped-defs .
- run: pytest -s
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__
.coverage
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2023 Bright Network

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
99 changes: 99 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
## `iter_pipes`: Iterable Pipes

Functional pythonic pipelines for iterables.


```bash
pip install git+https://github.com/brightnetwork/iter-pipes
```

### Example

Simple map / filter:

```python
import math

from iter_pipes import PipelineFactory

pipeline = (
PipelineFactory[int]()
.map(math.exp)
.filter(lambda x: x > math.exp(2))
.map(math.log)
.map(str)
)

assert pipeline(range(5)).to_list() == ["3.0", "4.0"]
```

Batch operations

```python
def get_user_names_from_db(user_ids: list[int]) -> list[str]:
# batch operation:
# - duration is roughly constant for a batch
# - batch size is has to be below 10
print("processing batch", user_ids)
return [f"user_{user_id}" for user_id in user_ids]


pipeline = (
PipelineFactory[int]() #
.batch(get_user_names_from_db, batch_size=3) #
.for_each(lambda user_name: print("Hello ", user_name))
)

pipeline(range(5)).to_list()
# returns
# ["user_0", "user_1", "user_2", "user_3", "user_4"]
# prints
# processing batch [0, 1, 2]
# Hello user_0
# Hello user_1
# Hello user_2
# processing batch [3, 4]
# Hello user_3
# Hello user_4
```


### Motivations

Goal of the library is to provide a structure to work with [collection pipelines](https://martinfowler.com/articles/collection-pipeline/).

> Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next.
In this library, each "operation" is called a "step". We differentiate different subtype for steps:
- `map` steps will operate on each item of the collection, one by one
- `filter` steps will reduce the number of item in the collection, without changing their values
- `for_each` steps will do some processing, but without impacting the following steps (they won't change the input)
- `batch` steps will operate by batch of a fixed size - can be useful for example to batch database calls.

In addition to that, we also define pipeline `split`, which allow to run several steps after a single one.

Library goal:
- declarative, expressive syntax for the steps above
- memory efficiency:
- pure python, so it's not optimal at all
- but what we care about is ensuring that the memory used by the pipeline does not scale with the number of items in the collection.
- performant:
- pure python, so the code itself is not really performant
- but the library allow for an optimal usage of the "slow" operations (network calls mainly) that are computed in the pipeline. This is what is meant by "performant"
- lightweight usage, as in existing function can be used as a step without the need for a wrapper
- provide as good of a type experience as possible



### Documentation

Have a look at the [`docs`](./tests/docs/) part of the test suites for examples.

### Contributing

```bash
ruff check --output-format github .
ruff format --check .
mypy --check-untyped-defs .
pytest -s
```
1 change: 1 addition & 0 deletions iter_pipes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .main import * # noqa
207 changes: 207 additions & 0 deletions iter_pipes/functional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
from __future__ import annotations

import math
from collections import deque
from collections.abc import Callable, Iterable, Iterator
from functools import partial
from itertools import count, groupby
from typing import Any, Literal, TypeGuard, TypeVar, overload

__all__ = [
"map",
"filter",
"for_each",
"for_batch",
"batch",
"fork",
]

T = TypeVar("T")
V = TypeVar("V")
U = TypeVar("U")
W = TypeVar("W")
X = TypeVar("X")


raw_filter = filter


Step = Callable[[Iterable[T]], Iterable[V]]


def map(step: Callable[[V], W]) -> Step[V, W]:
def f(data: Iterable[V]) -> Iterable[W]:
for item in data:
yield step(item)

return f


def for_each(step: Callable[[V], Any]) -> Step[V, V]:
def f(data: Iterable[V]) -> Iterable[V]:
for item in data:
step(item)
yield item

return f


def for_batch(step: Callable[[list[V]], Any], batch_size: int) -> Step[V, V]:
def f(data: Iterable[V]) -> Iterable[V]:
for _, batch_iterator in groupby(
zip(data, count()),
key=lambda x: math.floor(x[1] / batch_size),
):
batch = [x[0] for x in batch_iterator]
step(batch)
yield from batch

return f


def batch(step: Callable[[list[V]], Iterable[U]], batch_size: int) -> Step[V, U]:
def f(data: Iterable[V]) -> Iterable[U]:
for _, batch_iterator in groupby(
zip(data, count()),
key=lambda x: math.floor(x[1] / batch_size),
):
yield from step([x[0] for x in batch_iterator])

return f


@overload
def filter(step: Callable[[V], TypeGuard[W]]) -> Step[V, W]:
...


@overload
def filter(step: Callable[[V], bool]) -> Step[V, V]:
...


def filter(step: Callable[[V], bool]) -> Step[V, V]: # type: ignore
return partial(raw_filter, step) # type: ignore


@overload
def fork(
step1: Step[T, U] | None,
step2: Step[T, V],
pick_first: Literal[True],
max_inflight: int | None = ...,
) -> Step[T, U]:
...


@overload
def fork(
step1: Step[T, U],
step2: Step[T, V],
pick_first: Literal[False] | None,
max_inflight: int | None = ...,
) -> Step[T, V | U]:
...


@overload
def fork(
step1: Step[T, U],
step2: Step[T, V],
step3: Step[T, W],
pick_first: Literal[False] | None,
max_inflight: int | None = ...,
) -> Step[T, V | U | W]:
...


@overload
def fork( # noqa: PLR0913
step1: Step[T, U],
step2: Step[T, V],
step3: Step[T, W],
step4: Step[T, X],
pick_first: Literal[False] | None,
max_inflight: int | None = ...,
) -> Step[T, V | U | W | X]:
...


@overload
def fork(
*steps: Step[T, Any] | None,
pick_first: Literal[False] | None,
max_inflight: int | None = ...,
) -> Step[T, Any]:
...


def fork( # type: ignore
*steps: Step[T, Any] | None,
max_inflight: int = 1000,
pick_first: bool = False,
) -> Step[T, Any]:
"""
Returns a step that forks the input iterable into multiple iterables,
each one being processed by a different step. The output iterable is the
concatenation of the output of each step.
If `pick_first` is True, the output iterable is the concatenation of the
output of the first step only.
"""

def wrapper(iterable: Iterable[T]) -> Iterable[Any]:
it = iter(iterable)

queues: list[deque] = [deque() for _ in steps]
# could be rewritten with a single deque to be more memory efficient

# the set of iterators that are paused because we have too many inflight items
# they should be resumed when the number of inflight items goes down
paused_iterators: set[int] = set()

def gen(i: int) -> Iterator[T]:
while True:
mydeque = queues[i]
if not mydeque: # when the current deque is empty
try:
newval = next(it) # fetch a new value and
except StopIteration:
return
for d in queues: # load it to all the deques
d.append(newval)

# if there are too many inflight items, pause the iterator
nb_inflights = sum(len(q) for q in queues)
if nb_inflights > max_inflight:
paused_iterators.add(i)
return

yield mydeque.popleft()

iterators = [iter((steps[i] or identity)(gen(i))) for i in range(len(steps))]

# the set of iterators that are not done yet
pending_iterators = set(range(len(iterators)))

while len(pending_iterators):
i = max( # the index of the iterator with the most inflight items
pending_iterators,
key=lambda i: len(queues[i]),
)
try:
val = next(iterators[i])
if not pick_first or i == 0:
yield val
except StopIteration:
if i in paused_iterators: # resume the iterator
iterators[i] = iter((steps[i] or identity)(gen(i)))
paused_iterators.remove(i)
else:
pending_iterators.remove(i)

return wrapper


def identity(item: W) -> W:
return item
Loading

0 comments on commit 30f2456

Please sign in to comment.