-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 3bac8d3
Showing
22 changed files
with
1,391 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
name: test | ||
|
||
permissions: | ||
contents: read | ||
|
||
on: | ||
- pull_request | ||
- push | ||
|
||
|
||
jobs: | ||
test: | ||
runs-on: ubuntu-latest | ||
|
||
steps: | ||
- uses: actions/checkout@v4 | ||
- name: Install poetry | ||
run: curl -sSL https://install.python-poetry.org | python3 - | ||
env: | ||
POETRY_VERSION: 1.7.0 | ||
- name: Add Poetry to path | ||
run: echo "${HOME}/.poetry/bin" >> $GITHUB_PATH | ||
- name: Set up Python 3.11 | ||
uses: actions/setup-python@v4 | ||
with: | ||
python-version: "3.11" | ||
cache: "poetry" | ||
- name: Install Poetry Packages | ||
run: | | ||
poetry env use "3.11" | ||
poetry install --only dev | ||
- name: Add venv to path | ||
run: echo `poetry env info --path`/bin/ >> $GITHUB_PATH | ||
|
||
- run: ruff check --output-format github . | ||
- run: ruff format --check . | ||
- run: mypy --check-untyped-defs . | ||
- run: pyright --warnings . | ||
- run: pytest -s --cov=iter_pipes --cov-report=html | ||
- uses: actions/upload-artifact@v3 | ||
with: | ||
name: code-coverage | ||
path: htmlcov/ | ||
- name: Upload coverage to codecov | ||
uses: codecov/codecov-action@v3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
__pycache__ | ||
.coverage |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
MIT License | ||
|
||
Copyright (c) 2023 Bright Network | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is | ||
furnished to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in all | ||
copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,235 @@ | ||
[![Code Coverage](https://img.shields.io/codecov/c/github/brightnetwork/iter-pipes)](https://app.codecov.io/gh/brightnetwork/iter-pipes) | ||
|
||
## `iter_pipes`: Iterable Pipes | ||
|
||
Functional pythonic pipelines for iterables. | ||
|
||
|
||
```bash | ||
pip install git+https://github.com/brightnetwork/iter-pipes | ||
``` | ||
|
||
### Examples | ||
|
||
#### map / filter: | ||
|
||
```python | ||
import math | ||
|
||
from iter_pipes import PipelineFactory | ||
|
||
pipeline = ( | ||
PipelineFactory[int]() | ||
.map(math.exp) | ||
.filter(lambda x: x > math.exp(2)) | ||
.map(math.log) | ||
.map(str) | ||
) | ||
|
||
assert pipeline(range(5)).to_list() == ["3.0", "4.0"] | ||
``` | ||
|
||
#### Batch operations | ||
|
||
```python | ||
def get_user_names_from_db(user_ids: list[int]) -> list[str]: | ||
# typical batch operation: | ||
# - duration is roughly constant for a batch | ||
# - batch size has to be below a fixed threshold | ||
print("processing batch", user_ids) | ||
return [f"user_{user_id}" for user_id in user_ids] | ||
|
||
|
||
pipeline = ( | ||
PipelineFactory[int]() | ||
.batch(get_user_names_from_db, batch_size=3) | ||
.for_each(lambda user_name: print("Hello ", user_name)) | ||
) | ||
|
||
pipeline(range(5)).to_list() | ||
# returns | ||
# ["user_0", "user_1", "user_2", "user_3", "user_4"] | ||
# prints | ||
# processing batch [0, 1, 2] | ||
# Hello user_0 | ||
# Hello user_1 | ||
# Hello user_2 | ||
# processing batch [3, 4] | ||
# Hello user_3 | ||
# Hello user_4 | ||
``` | ||
|
||
|
||
#### Storing state | ||
|
||
Class with a `__call__` method provide a easy way to store a state during the processing. | ||
|
||
```python | ||
class CountUsers: | ||
def __init__(self): | ||
self._count = 0 | ||
|
||
def __call__(self, item: str) -> str: | ||
self._count += 1 | ||
return f"{item} (position {self._count})" | ||
|
||
|
||
pipeline = PipelineFactory[int]().map(lambda x: f"user {x}").map(CountUsers()) | ||
|
||
pipeline.process(range(5)).to_list() | ||
# return | ||
# ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)'] | ||
``` | ||
|
||
```python | ||
def count_users: | ||
self._ | ||
def __init__(self): | ||
self._count = 0 | ||
|
||
def __call__(self, item: str) -> str: | ||
self._count += 1 | ||
return f"{item} (position {self._count})" | ||
|
||
|
||
pipeline = PipelineFactory[int]().map(lambda x: f"user {x}").map(CountUsers()) | ||
|
||
pipeline.process(range(5)).to_list() | ||
# return | ||
# ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)'] | ||
``` | ||
|
||
One could also use a closure: | ||
|
||
``` | ||
def count_users(): | ||
count = 0 | ||
def wrapper(item: str) -> str: | ||
nonlocal count | ||
count += 1 | ||
return f"{item} (position {count})" | ||
return wrapper | ||
pipeline = PipelineFactory[int]().map(lambda x: f"user {x}").map(count_users()) | ||
pipeline.process(range(5)).to_list() | ||
# return | ||
# ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)'] | ||
``` | ||
|
||
#### Split | ||
|
||
```python | ||
pipeline = ( | ||
PipelineFactory[int]() | ||
.split( | ||
lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2), | ||
lambda x: x.map(lambda x: -x), | ||
) | ||
.map(str) | ||
) | ||
|
||
expected = ["0", "0", "4", "-1", "-2", "16", "-3", "-4", "36", "-5", "-6", "-7"] | ||
assert pipeline(range(8)).to_list() == expected | ||
``` | ||
|
||
Each split "branch" order will be preserved, but there is not guarantee in term of how the two are merged. | ||
|
||
#### Pipe operator overload | ||
|
||
```python | ||
import iter_pipes.functional as itp | ||
|
||
pipeline = ( | ||
PipelineFactory[int]() | ||
| itp.map(math.exp) | ||
| itp.filter(lambda x: x > math.exp(2)) # type checker might complain | ||
| itp.map(math.log) | ||
| itp.map(str) | ||
) | ||
|
||
assert pipeline(range(6)).to_list() == ["3.0", "4.0", "5.0"] | ||
``` | ||
|
||
note that typing of lambda function inside functional map is not as good as the one from the `Pipeline.XXX` methods. To work around this, one should either use the non functional style, either use fully typed function instead of lambda. | ||
|
||
|
||
#### Resumability | ||
|
||
```python | ||
pipeline = PipelineFactory[int]().split( | ||
lambda x: x.filter(lambda x: x % 3 == 0).map(str), | ||
lambda x: x, | ||
) | ||
|
||
print(pipeline.process(range(12)).to_list()) | ||
# return | ||
# ['0', 0, '3', 1, 2, 3, '6', 4, 5, 6, '9', 7, 8, 9, 10, 11] | ||
# note that between each yield from the first branch, the pipeline will yield everything | ||
# from the second branch so that we don't store too many messages in the inflight buffer. | ||
|
||
|
||
def filter_out_everything(items: Iterable[int]) -> Iterable[int]: | ||
print("starting") | ||
for item in items: | ||
if False: | ||
yield item | ||
|
||
|
||
pipeline = PipelineFactory[int]().split( | ||
lambda x: x.pipe(filter_out_everything).map(str), | ||
lambda x: x, | ||
max_inflight=5, | ||
) | ||
|
||
print(pipeline.process(range(9)).to_list()) | ||
# return | ||
# [0, 1, 2, 3, 4, 5, 6, 7, 8] | ||
# starting | ||
# starting | ||
# starting | ||
``` | ||
|
||
### Motivations | ||
|
||
Goal of the library is to provide a structure to work with [collection pipelines](https://martinfowler.com/articles/collection-pipeline/). | ||
|
||
> Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next. | ||
In this library, each "operation" is called a "step". We differentiate different subtype for steps: | ||
- `map` steps will operate on each item of the collection, one by one | ||
- `filter` steps will reduce the number of item in the collection, without changing their values | ||
- `for_each` steps will do some processing, but without impacting the following steps (they won't change the input) | ||
- `batch` steps will operate by batch of a fixed size - can be useful for example to batch database calls. | ||
|
||
In addition to that, we also define pipeline `split`, which allow to run several steps after a single one. | ||
|
||
Library goal: | ||
- declarative, expressive syntax for the steps above | ||
- memory efficiency: | ||
- pure python, so it's not optimal at all | ||
- but what we care about is ensuring that the memory used by the pipeline does not scale with the number of items in the collection. | ||
- performant: | ||
- pure python, so the code itself is not really performant | ||
- but the library allow for an optimal usage of the "slow" operations (network calls mainly) that are computed in the pipeline. This is what is meant by "performant" | ||
- lightweight usage, as in existing function can be used as a step without the need for a wrapper | ||
- provide as good of a type experience as possible | ||
|
||
|
||
|
||
### Documentation | ||
|
||
Have a look at the [`docs`](./tests/docs/) part of the test suites for examples. | ||
|
||
### Contributing | ||
|
||
```bash | ||
ruff check --output-format github . | ||
ruff format --check . | ||
mypy --check-untyped-defs . | ||
pytest -s | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
comment: false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .main import * # noqa |
Oops, something went wrong.