Skip to content

Commit

Permalink
Add clamp filter.
Browse files Browse the repository at this point in the history
This filter can be useful for filtering out corrupted values when boundaries are known.
  • Loading branch information
denpamusic committed Jan 5, 2025
1 parent 2ed6a86 commit 5703bab
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 2 deletions.
13 changes: 12 additions & 1 deletion docs/source/callbacks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Subscribing to events
.. autofunction:: pyplumio.devices.Device.subscribe_once

To remove the previously registered callback, use the
``unsubcribe()`` method.
``unsubscribe()`` method.

.. autofunction:: pyplumio.devices.Device.unsubscribe

Expand All @@ -48,6 +48,17 @@ then calls the callback with the sum of values collected.
# Await the callback with the fuel burned during 30 seconds.
ecomax.subscribe("fuel_burned", aggregate(my_callback, seconds=30))
.. autofunction:: pyplumio.filters.clamp

This filter clamps value between specified boundaries.
It can be used to filter out outliers and corrupted data.

.. code-block:: python
from pyplumio.filters import clamp
# Await the callback with value clamped between 0 and 100.
ecomax.subscribe("load", clamp(my_callback, min_value=0, max_value=100))
.. autofunction:: pyplumio.filters.on_change

Expand Down
46 changes: 46 additions & 0 deletions pyplumio/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,52 @@ async def __call__(self, new_value: Any) -> Any:
"""Set a new value for the callback."""


class _Clamp(Filter):
"""Represents a clamp filter.
Calls callback with a value clamped between specified boundaries.
"""

__slots__ = ("_min_value", "_max_value")

_min_value: float
_max_value: float

def __init__(self, callback: Callback, min_value: float, max_value: float) -> None:
"""Initialize a new clamp filter."""
super().__init__(callback)
self._min_value = min_value
self._max_value = max_value

async def __call__(self, new_value: Any) -> Any:
"""Set a new value for the callback."""
if new_value < self._min_value:
return await self._callback(self._min_value)

if new_value > self._max_value:
return await self._callback(self._max_value)

return await self._callback(new_value)


def clamp(callback: Callback, min_value: float, max_value: float) -> _Clamp:
"""Return a clamp filter.
A callback function will be called with value clamped between
specified boundaries.
:param callback: A callback function to be awaited on new value
:type callback: Callback
:param min_value: A lower boundary
:type min_value: float
:param max_value: An upper boundary
:type max_value: float
:return: An instance of callable filter
:rtype: _Clamp
"""
return _Clamp(callback, min_value, max_value)


class _OnChange(Filter):
"""Represents a value changed filter.
Expand Down
26 changes: 25 additions & 1 deletion tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,35 @@

import pytest

from pyplumio.filters import aggregate, custom, debounce, delta, on_change, throttle
from pyplumio.filters import (
aggregate,
clamp,
custom,
debounce,
delta,
on_change,
throttle,
)
from pyplumio.helpers.parameter import Parameter, ParameterValues
from pyplumio.structures.alerts import Alert


async def test_clamp() -> None:
"""Test clamp filter."""
test_callback = AsyncMock()
wrapped_callback = clamp(test_callback, min_value=10, max_value=15)
await wrapped_callback(1)
test_callback.assert_awaited_once_with(10)
test_callback.reset_mock()

await wrapped_callback(50)
test_callback.assert_awaited_once_with(15)
test_callback.reset_mock()

await wrapped_callback(11)
test_callback.assert_awaited_once_with(11)


async def test_on_change() -> None:
"""Test on change filter."""
test_callback = AsyncMock()
Expand Down

0 comments on commit 5703bab

Please sign in to comment.