Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚀 5.0.0 #280

Merged
merged 11 commits into from
Jul 10, 2024
2 changes: 1 addition & 1 deletion .github/workflows/CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python: [ "3.7", "3.8", "3.9", "3.10", "3.11", "3.12" ]
python: [ "3.9", "3.10", "3.11", "3.12" ]
octoprint: [ "1.5", "1.6", "1.7", "1.8", "1.9", "1.10" ]
exclude:
# These versions are not compatible to each other:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python: [ "3.7", "3.8", "3.9", "3.10", "3.11", "3.12" ]
python: [ "3.9", "3.10", "3.11", "3.12" ]
octoprint: [ "1.5", "1.6", "1.7", "1.8", "1.9", "1.10" ]
exclude:
# These versions are not compatible to each other:
Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ persistent=yes

# Minimum Python version to use for version dependent checks. Will default to
# the version used to run pylint.
py-version=3.7
py-version=3.9

# Discover python modules and packages in the file system subtree.
recursive=no
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The plugin allows you to set your own icons and flexibly customize the way the r

## Requirements

- Python: at least `3.7`,
- Python: at least `3.9`,
- OctoPrint: at least `1.5.3`.
- For [AutoConnect feature](https://github.com/borisbu/OctoRelay/blob/master/CHANGELOG.md#330): at least `1.9.0`.

Expand Down
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ exclude = (?x)(
[mypy-octoprint.*]
ignore_missing_imports = True

[mypy-RPi.*]
[mypy-gpiozero.*]
ignore_missing_imports = True

[mypy-flask.*]
Expand Down
13 changes: 7 additions & 6 deletions octoprint_octorelay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ def handle_list_all_command(self) -> Listing:
settings = self._settings.get([], merged=True) # expensive
for index in RELAY_INDEXES:
if bool(settings[index]["active"]):
relay = Relay(
relay = Relay.get_or_create_relay(
int(settings[index]["relay_pin"] or 0),
bool(settings[index]["inverted_output"])
)

RobinTail marked this conversation as resolved.
Show resolved Hide resolved
active_relays.append({
"id": index,
"name": settings[index]["label_text"],
Expand All @@ -124,7 +125,7 @@ def handle_get_status_command(self, index: str) -> bool:
settings = self._settings.get([index], merged=True) # expensive
if not bool(settings["active"]):
raise HandlingException(400)
return Relay(
return Relay.get_or_create_relay(
int(settings["relay_pin"] or 0),
bool(settings["inverted_output"])
).is_closed()
Expand Down Expand Up @@ -241,7 +242,7 @@ def toggle_relay(self, index, target: Optional[bool] = None) -> bool:
self._printer.disconnect()
pin = int(settings["relay_pin"] or 0)
inverted = bool(settings["inverted_output"])
relay = Relay(pin, inverted)
relay = Relay.get_or_create_relay(pin, inverted)
self._logger.debug(
f"Toggling the relay {index} on pin {pin}" if target is None else
f"Turning the relay {index} {'ON' if target else 'OFF'} (pin {pin})"
Expand Down Expand Up @@ -313,7 +314,7 @@ def update_ui(self):
))
for index in RELAY_INDEXES:
active = bool(settings[index]["active"])
relay = Relay(
relay = Relay.get_or_create_relay(
int(settings[index]["relay_pin"] or 0),
bool(settings[index]["inverted_output"])
)
Expand Down Expand Up @@ -362,7 +363,7 @@ def input_polling(self):
for index in RELAY_INDEXES:
active = self.model[index]["active"]
model_state = self.model[index]["relay_state"] # bool since v3.1
actual_state = Relay(
actual_state = Relay.get_or_create_relay(
self.model[index]["relay_pin"],
self.model[index]["inverted_output"]
).is_closed() if active else False
Expand All @@ -371,7 +372,7 @@ def input_polling(self):
self.update_ui()
break

__plugin_pythoncompat__ = ">=3.7,<4"
__plugin_pythoncompat__ = ">=3.9,<4"
__plugin_implementation__ = OctoRelayPlugin()

__plugin_hooks__ = {
Expand Down
46 changes: 29 additions & 17 deletions octoprint_octorelay/driver.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
# -*- coding: utf-8 -*-
from typing import Optional
from RPi import GPIO
from typing import Optional, List
from gpiozero import LED

# The driver operates BCM mode of pins enumeration
GPIO.setmode(GPIO.BCM)

def xor(left: bool, right: bool) -> bool:
return left is not right

class Relay():
def __init__(self, pin: int, inverted: bool):
relays: List["Relay"] = []

def __init__(self, pin: int, inverted: bool, pin_factory=None):
self.pin = pin # GPIO pin
self.inverted = inverted # marks the relay as normally closed
self.relay = LED(pin, pin_factory=pin_factory)

def __repr__(self) -> str:
return f"{type(self).__name__}(pin={self.pin},inverted={self.inverted},closed={self.is_closed()})"

def __xor(self, left: bool, right: bool) -> bool:
return left is not right

def close(self):
"""Activates the current flow through the relay."""
self.toggle(True)
Expand All @@ -26,11 +27,7 @@ def open(self):

def is_closed(self) -> bool:
"""Returns the logical state of the relay."""
GPIO.setwarnings(False)
GPIO.setup(self.pin, GPIO.OUT)
pin_state = bool(GPIO.input(self.pin))
GPIO.setwarnings(True)
return xor(self.inverted, pin_state)
return self.__xor(self.inverted, self.relay.is_lit)

def toggle(self, desired_state: Optional[bool] = None) -> bool:
"""
Expand All @@ -40,8 +37,23 @@ def toggle(self, desired_state: Optional[bool] = None) -> bool:
"""
if desired_state is None:
desired_state = not self.is_closed()
GPIO.setwarnings(False)
GPIO.setup(self.pin, GPIO.OUT)
GPIO.output(self.pin, xor(self.inverted, desired_state))
GPIO.setwarnings(True)

if self.__xor(self.inverted, desired_state) is True:
self.relay.on()
else:
self.relay.off()

return desired_state

@classmethod
def get_or_create_relay(cls, pin: int, inverted: bool, pin_factory=None):
for relay in cls.relays:
if relay.pin == pin:
if relay.inverted is not inverted:
relay.inverted = inverted

return relay

relay = cls(pin, inverted, pin_factory)
cls.relays.append(relay)
return relay
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_version_and_cmdclass(pkg_path):

# Any additional requirements besides OctoPrint should be listed here
# todo after dropping 3.7 remove typing-extensions (used by model.py and listing.py)
plugin_requires = ["RPi.GPIO", "typing-extensions"]
plugin_requires = ["gpiozero", "typing-extensions", "lgpio", "RPi.GPIO"]

# --------------------------------------------------------------------------------------------------------------------
# More advanced options that you usually shouldn't have to touch follow after this point
Expand Down
69 changes: 36 additions & 33 deletions tests/test_driver.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
# -*- coding: utf-8 -*-
import unittest
import sys
from unittest.mock import Mock

# Mocks used for assertions
GPIO_mock = Mock()
GPIO_mock.BCM = "MockedBCM"
GPIO_mock.OUT = "MockedOUT"
sys.modules["RPi.GPIO"] = GPIO_mock
from gpiozero.pins.mock import MockFactory

# pylint: disable=wrong-import-position
from octoprint_octorelay.driver import Relay
Expand All @@ -19,40 +14,33 @@

class TestRelayDriver(unittest.TestCase):
def test_constructor(self):
GPIO_mock.setmode.assert_called_with("MockedBCM")
relay = Relay(18, True)
relay = Relay(18, True, MockFactory())
self.assertIsInstance(relay, Relay)
self.assertEqual(relay.pin, 18)
self.assertTrue(relay.inverted)

def test_serialization(self):
relay = Relay(18, True)
relay = Relay(18, True, MockFactory())
serialization = f"{relay}"
self.assertEqual(serialization, "Relay(pin=18,inverted=True,closed=True)")

def test_close(self):
cases = [
{ "relay": Relay(18, False), "expected_pin_state": True },
{ "relay": Relay(18, True), "expected_pin_state": False }
{ "relay": Relay(18, False, MockFactory()), "expected_pin_state": True },
{ "relay": Relay(18, True, MockFactory()), "expected_pin_state": False }
]
for case in cases:
case["relay"].close()
GPIO_mock.setup.assert_called_with(18, "MockedOUT")
GPIO_mock.output.assert_called_with(18, case["expected_pin_state"])
GPIO_mock.setwarnings.assert_any_call(False)
GPIO_mock.setwarnings.assert_called_with(True)
self.assertEqual(case["relay"].relay.is_lit, case["expected_pin_state"])

def test_open(self):
cases = [
{ "relay": Relay(18, False), "expected_pin_state": False },
{ "relay": Relay(18, True), "expected_pin_state": True }
{ "relay": Relay(18, False, MockFactory()), "expected_pin_state": False },
{ "relay": Relay(18, True, MockFactory()), "expected_pin_state": True }
]
for case in cases:
case["relay"].open()
GPIO_mock.setup.assert_called_with(18, "MockedOUT")
GPIO_mock.output.assert_called_with(18, case["expected_pin_state"])
GPIO_mock.setwarnings.assert_any_call(False)
GPIO_mock.setwarnings.assert_called_with(True)
self.assertEqual(case["relay"].relay.is_lit, case["expected_pin_state"])

def test_is_closed(self):
cases = [
Expand All @@ -62,12 +50,9 @@ def test_is_closed(self):
{ "mocked_state": 0, "inverted": True, "expected_relay_state": True },
]
for case in cases:
GPIO_mock.input = Mock(return_value=case["mocked_state"])
relay = Relay(18, case["inverted"])
relay = Relay(18, case["inverted"], MockFactory())
relay.relay.value = case["mocked_state"]
self.assertEqual(relay.is_closed(), case["expected_relay_state"])
GPIO_mock.setwarnings.assert_any_call(False)
GPIO_mock.setwarnings.assert_called_with(True)
GPIO_mock.input.assert_called_with(18)

def test_toggle__no_argument(self):
cases = [
Expand All @@ -77,14 +62,32 @@ def test_toggle__no_argument(self):
{ "mocked_state": 0, "inverted": True, "expected_pin_state": True, "expected_relay_state": False },
]
for case in cases:
GPIO_mock.input = Mock(return_value=case["mocked_state"])
relay = Relay(18, case["inverted"])
relay = Relay(18, case["inverted"], MockFactory())
relay.relay.value = case["mocked_state"]
self.assertEqual(relay.toggle(), case["expected_relay_state"])
GPIO_mock.setwarnings.assert_any_call(False)
GPIO_mock.setwarnings.assert_called_with(True)
GPIO_mock.input.assert_called_with(18)
GPIO_mock.setup.assert_called_with(18, "MockedOUT")
GPIO_mock.output.assert_called_with(18, case["expected_pin_state"])
self.assertEqual(relay.relay.is_lit, case["expected_pin_state"])

def test_get_or_create_relay(self):
# Test creating a new relay
relay1 = Relay.get_or_create_relay(17, False, MockFactory())
self.assertEqual(len(Relay.relays), 1)
self.assertEqual(relay1.pin, 17)
self.assertFalse(relay1.inverted)

# Test retrieving the existing relay with the same pin and inversion
relay2 = Relay.get_or_create_relay(17, True, MockFactory())
self.assertIs(relay1, relay2)
self.assertEqual(len(Relay.relays), 1) # Should still be 1

# Test retrieving the existing relay with the same pin but different inversion
relay3 = Relay.get_or_create_relay(17, True, MockFactory())
self.assertEqual(len(Relay.relays), 1) # Should still be 1
self.assertIs(relay1, relay3)
self.assertTrue(relay1.inverted) # Inversion should be updated

def tearDown(self):
# Clear relays after each test
Relay.relays = []

if __name__ == "__main__":
unittest.main()
18 changes: 10 additions & 8 deletions tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from octoprint.access import ADMIN_GROUP, USER_GROUP

# Patching required before importing OctoRelayPlugin class
sys.modules["RPi.GPIO"] = Mock()
sys.modules["gpiozero"] = Mock()

# Mocks used for assertions
timerMock = Mock()
Expand All @@ -30,8 +30,10 @@

relayMock = Mock()
relayConstructorMock = Mock(return_value=relayMock)
relayConstructorMock.get_or_create_relay = Mock(return_value=relayMock)

sys.modules["octoprint_octorelay.driver"] = Mock(
Relay=relayConstructorMock
Relay=relayConstructorMock,
RobinTail marked this conversation as resolved.
Show resolved Hide resolved
)

# pylint: disable=wrong-import-position
Expand Down Expand Up @@ -432,7 +434,7 @@ def test_get_update_information(self):

def test_python_compatibility(self):
# Should be the current Python compability string
self.assertEqual(__plugin_pythoncompat__, ">=3.7,<4")
self.assertEqual(__plugin_pythoncompat__, ">=3.9,<4")

def test_exposed_implementation(self):
# Should be an instance of the plugin class
Expand Down Expand Up @@ -467,9 +469,9 @@ def test_input_polling(self):
}
relayMock.is_closed = Mock(return_value=True)
self.plugin_instance.input_polling()
self.assertEqual(relayConstructorMock.call_count, 2)
relayConstructorMock.assert_any_call(17, False)
relayConstructorMock.assert_any_call(18, False)
self.assertEqual(relayConstructorMock.get_or_create_relay.call_count, 2)
relayConstructorMock.get_or_create_relay.assert_any_call(17, False)
relayConstructorMock.get_or_create_relay.assert_any_call(18, False)
self.plugin_instance.update_ui.assert_called_with()
self.plugin_instance._logger.debug.assert_called_with("relay: r3 has changed its pin state")

Expand Down Expand Up @@ -508,7 +510,7 @@ def test_update_ui(self):
"upcoming": None
}
self.plugin_instance.update_ui()
relayConstructorMock.assert_called_with(17, False)
relayConstructorMock.get_or_create_relay.assert_called_with(17, False)
for index in RELAY_INDEXES:
self.plugin_instance._settings.get.assert_any_call([], merged=True)
self.plugin_instance._plugin_manager.send_plugin_message.assert_called_with(
Expand Down Expand Up @@ -553,7 +555,7 @@ def test_toggle_relay(self, system_mock):
"cmd_off": "CommandOFF"
})
self.plugin_instance.toggle_relay("r4", case["target"])
relayConstructorMock.assert_called_with(17, case["inverted"])
relayConstructorMock.get_or_create_relay.assert_called_with(17, case["inverted"])
relayMock.toggle.assert_called_with(case["target"])
system_mock.assert_called_with(case["expectedCommand"])
if case["expectedCommand"] == "CommandON":
Expand Down
Loading