Skip to content

Commit

Permalink
noop
Browse files Browse the repository at this point in the history
  • Loading branch information
p9f committed Nov 16, 2023
0 parents commit 0627084
Show file tree
Hide file tree
Showing 22 changed files with 1,368 additions and 0 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: test

permissions:
contents: read

on:
- pull_request
- push


jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Install poetry
run: curl -sSL https://install.python-poetry.org | python3 -
env:
POETRY_VERSION: 1.7.0
- name: Add Poetry to path
run: echo "${HOME}/.poetry/bin" >> $GITHUB_PATH
- name: Set up Python 3.11
uses: actions/setup-python@v4
with:
python-version: "3.11"
cache: "poetry"
- name: Install Poetry Packages
run: |
poetry env use "3.11"
poetry install --only dev
- name: Add venv to path
run: echo `poetry env info --path`/bin/ >> $GITHUB_PATH

- run: ruff check --output-format github .
- run: ruff format --check .
- run: mypy --check-untyped-defs .
- run: pyright --warnings .
- run: pytest -s --cov=iter_pipes --cov-report=html
- uses: actions/upload-artifact@v3
with:
name: code-coverage
path: htmlcov/
- name: Upload coverage to codecov
uses: codecov/codecov-action@v3
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__
.coverage
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2023 Bright Network

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
212 changes: 212 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
[![Code Coverage](https://img.shields.io/codecov/c/github/brightnetwork/iter-pipes)](https://app.codecov.io/gh/brightnetwork/iter-pipes)

## `iter_pipes`: Iterable Pipes

Functional pythonic pipelines for iterables.


```bash
pip install git+https://github.com/brightnetwork/iter-pipes
```

### Examples

#### map / filter:

```python
import math

from iter_pipes import PipelineFactory

pipeline = (
PipelineFactory[int]()
.map(math.exp)
.filter(lambda x: x > math.exp(2))
.map(math.log)
.map(str)
)

assert pipeline(range(5)).to_list() == ["3.0", "4.0"]
```

#### Batch operations

```python
def get_user_names_from_db(user_ids: list[int]) -> list[str]:
# typical batch operation:
# - duration is roughly constant for a batch
# - batch size has to be below a fixed threshold
print("processing batch", user_ids)
return [f"user_{user_id}" for user_id in user_ids]


pipeline = (
PipelineFactory[int]()
.batch(get_user_names_from_db, batch_size=3)
.for_each(lambda user_name: print("Hello ", user_name))
)

pipeline(range(5)).to_list()
# returns
# ["user_0", "user_1", "user_2", "user_3", "user_4"]
# prints
# processing batch [0, 1, 2]
# Hello user_0
# Hello user_1
# Hello user_2
# processing batch [3, 4]
# Hello user_3
# Hello user_4
```


#### Storing state

Class with a `__call__` method provide a easy way to store a state during the processing.

```python
class CountUsers:
def __init__(self):
self._count = 0

def __call__(self, item: str) -> str:
self._count += 1
return f"{item} (position {self._count})"


pipeline = PipelineFactory[int]().map(lambda x: f"user {x}").map(CountUsers())

pipeline.process(range(5)).to_list()
# return
# ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']
```

One could also use a closure:

```python
def count_users():
count = 0

def wrapper(item: str) -> str:
nonlocal count
count += 1
return f"{item} (position {count})"

return wrapper


pipeline = PipelineFactory[int]().map(lambda x: f"user {x}").map(count_users())

pipeline.process(range(5)).to_list()
# return
# ['user 0 (position 1)', 'user 1 (position 2)', 'user 2 (position 3)', 'user 3 (position 4)', 'user 4 (position 5)']
```

#### Split

```python
pipeline = (
PipelineFactory[int]()
.split(
lambda x: x.filter(lambda x: x % 2 == 0).map(lambda x: x**2),
lambda x: x.map(lambda x: -x),
)
.map(str)
)

expected = ["0", "0", "4", "-1", "-2", "16", "-3", "-4", "36", "-5", "-6", "-7"]
assert pipeline(range(8)).to_list() == expected
```

Each split "branch" order will be preserved, but there is not guarantee in term of how the two are merged.

#### Pipe operator overload

```python
import iter_pipes.functional as itp

pipeline = (
PipelineFactory[int]()
| itp.map(math.exp)
| itp.filter(lambda x: x > math.exp(2)) # type checker might complain
| itp.map(math.log)
| itp.map(str)
)

assert pipeline(range(6)).to_list() == ["3.0", "4.0", "5.0"]
```

note that typing of lambda function inside functional map is not as good as the one from the `Pipeline.XXX` methods. To work around this, one should either use the non functional style, either use fully typed function instead of lambda.


#### Resumability

```python
pipeline = PipelineFactory[int]().split(
lambda x: x.filter(lambda x: x % 3 == 0).map(str),
lambda x: x,
)

print(pipeline.process(range(12)).to_list())
# return
# ['0', 0, '3', 1, 2, 3, '6', 4, 5, 6, '9', 7, 8, 9, 10, 11]
# note that between each yield from the first branch, the pipeline will yield everything
# from the second branch so that we don't store too many messages in the inflight buffer.


def filter_out_everything(items: Iterable[int]) -> Iterable[int]:
print("starting")
for item in items:
if False:
yield item


pipeline = PipelineFactory[int]().split(
lambda x: x.pipe(filter_out_everything).map(str),
lambda x: x,
max_inflight=5,
)

print(pipeline.process(range(9)).to_list())
# return
# [0, 1, 2, 3, 4, 5, 6, 7, 8]
# print
# starting
# starting
# starting
```

### Motivations

Goal of the library is to provide a structure to work with [collection pipelines](https://martinfowler.com/articles/collection-pipeline/).

> Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next.
In this library, each "operation" is called a "step". We differentiate different subtype for steps:
- `map` steps will operate on each item of the collection, one by one
- `filter` steps will reduce the number of item in the collection, without changing their values
- `for_each` steps will do some processing, but without impacting the following steps (they won't change the input)
- `batch` steps will operate by batch of a fixed size - can be useful for example to batch database calls.

In addition to that, we also define pipeline `split`, which allow to run several steps after a single one.

Library goal:
- declarative, expressive syntax for the steps above
- memory efficiency:
- pure python, so it's not optimal at all
- but what we care about is ensuring that the memory used by the pipeline does not scale with the number of items in the collection.
- performant:
- pure python, so the code itself is not really performant
- but the library allow for an optimal usage of the "slow" operations (network calls mainly) that are computed in the pipeline. This is what is meant by "performant"
- lightweight usage, as in existing function can be used as a step without the need for a wrapper
- provide as good of a type experience as possible



### Documentation

Have a look at the [`docs`](./tests/docs/) part of the test suites for examples.

### Contributing

Please refer to the [`test`](./.github/workflows/test.yml) actions. 100% test coverage is a start.
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
comment: false
1 change: 1 addition & 0 deletions iter_pipes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .main import * # noqa
Loading

0 comments on commit 0627084

Please sign in to comment.