Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ref: naming entities in the driver #283

Merged
merged 6 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions octoprint_octorelay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
STARTUP, PRINTING_STOPPED, PRINTING_STARTED, PRIORITIES, FALLBACK_PRIORITY, PREEMPTIVE_CANCELLATION_CUTOFF,
CANCEL_TASK_COMMAND, USER_ACTION, TURNED_ON
)
from .driver import Relay
from .driver import Driver
from .task import Task
from .listing import Listing
from .migrations import migrate
Expand Down Expand Up @@ -108,7 +108,7 @@ def handle_list_all_command(self) -> Listing:
settings = self._settings.get([], merged=True) # expensive
for index in RELAY_INDEXES:
if bool(settings[index]["active"]):
relay = Relay.get_or_create_relay(
relay = Driver.ensure(
int(settings[index]["relay_pin"] or 0),
bool(settings[index]["inverted_output"])
)
Expand All @@ -124,7 +124,7 @@ def handle_get_status_command(self, index: str) -> bool:
settings = self._settings.get([index], merged=True) # expensive
if not bool(settings["active"]):
raise HandlingException(400)
return Relay.get_or_create_relay(
return Driver.ensure(
int(settings["relay_pin"] or 0),
bool(settings["inverted_output"])
).is_closed()
Expand Down Expand Up @@ -241,7 +241,7 @@ def toggle_relay(self, index, target: Optional[bool] = None) -> bool:
self._printer.disconnect()
pin = int(settings["relay_pin"] or 0)
inverted = bool(settings["inverted_output"])
relay = Relay.get_or_create_relay(pin, inverted)
relay = Driver.ensure(pin, inverted)
self._logger.debug(
f"Toggling the relay {index} on pin {pin}" if target is None else
f"Turning the relay {index} {'ON' if target else 'OFF'} (pin {pin})"
Expand Down Expand Up @@ -313,7 +313,7 @@ def update_ui(self):
))
for index in RELAY_INDEXES:
active = bool(settings[index]["active"])
relay = Relay.get_or_create_relay(
relay = Driver.ensure(
int(settings[index]["relay_pin"] or 0),
bool(settings[index]["inverted_output"])
)
Expand Down Expand Up @@ -362,7 +362,7 @@ def input_polling(self):
for index in RELAY_INDEXES:
active = self.model[index]["active"]
model_state = self.model[index]["relay_state"] # bool since v3.1
actual_state = Relay.get_or_create_relay(
actual_state = Driver.ensure(
self.model[index]["relay_pin"],
self.model[index]["inverted_output"]
).is_closed() if active else False
Expand Down
16 changes: 8 additions & 8 deletions octoprint_octorelay/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
def xor(left: bool, right: bool) -> bool:
return left is not right

class Relay():
relays: List["Relay"] = []
class Driver():
cache: List["Driver"] = []

def __init__(self, pin: int, inverted: bool, pin_factory=None):
self.pin = pin # GPIO pin
self.inverted = inverted # marks the relay as normally closed
self.relay = LED(pin, pin_factory=pin_factory)
self.handle = LED(pin, pin_factory=pin_factory)

def __repr__(self) -> str:
return f"{type(self).__name__}(pin={self.pin},inverted={self.inverted},closed={self.is_closed()})"
Expand All @@ -26,7 +26,7 @@ def open(self):

def is_closed(self) -> bool:
"""Returns the logical state of the relay."""
return xor(self.inverted, self.relay.is_lit)
return xor(self.inverted, self.handle.is_lit)

def toggle(self, desired_state: Optional[bool] = None) -> bool:
"""
Expand All @@ -36,16 +36,16 @@ def toggle(self, desired_state: Optional[bool] = None) -> bool:
"""
if desired_state is None:
desired_state = not self.is_closed()
(self.relay.on if xor(self.inverted, desired_state) else self.relay.off)()
(self.handle.on if xor(self.inverted, desired_state) else self.handle.off)()
return desired_state

@classmethod
def get_or_create_relay(cls, pin: int, inverted: bool, pin_factory=None):
for relay in cls.relays:
def ensure(cls, pin: int, inverted: bool, pin_factory=None):
for relay in cls.cache:
if relay.pin == pin:
if xor(relay.inverted, inverted):
relay.inverted = inverted
return relay
relay = cls(pin, inverted, pin_factory)
cls.relays.append(relay)
cls.cache.append(relay)
return relay
52 changes: 26 additions & 26 deletions tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,42 @@
from gpiozero.pins.mock import MockFactory

# pylint: disable=wrong-import-position
from octoprint_octorelay.driver import Relay
from octoprint_octorelay.driver import Driver

# avoid keeping other modules automatically imported by this test
del sys.modules["octoprint_octorelay"]
del sys.modules["octoprint_octorelay.migrations"]
del sys.modules["octoprint_octorelay.task"]

class TestRelayDriver(unittest.TestCase):
class TestDriver(unittest.TestCase):
def test_constructor(self):
relay = Relay(18, True, MockFactory())
self.assertIsInstance(relay, Relay)
relay = Driver(18, True, MockFactory())
self.assertIsInstance(relay, Driver)
self.assertEqual(relay.pin, 18)
self.assertTrue(relay.inverted)

def test_serialization(self):
relay = Relay(18, True, MockFactory())
relay = Driver(18, True, MockFactory())
serialization = f"{relay}"
self.assertEqual(serialization, "Relay(pin=18,inverted=True,closed=True)")
self.assertEqual(serialization, "Driver(pin=18,inverted=True,closed=True)")

def test_close(self):
cases = [
{ "relay": Relay(18, False, MockFactory()), "expected_pin_state": True },
{ "relay": Relay(18, True, MockFactory()), "expected_pin_state": False }
{ "relay": Driver(18, False, MockFactory()), "expected_pin_state": True },
{ "relay": Driver(18, True, MockFactory()), "expected_pin_state": False }
]
for case in cases:
case["relay"].close()
self.assertEqual(case["relay"].relay.is_lit, case["expected_pin_state"])
self.assertEqual(case["relay"].handle.is_lit, case["expected_pin_state"])

def test_open(self):
cases = [
{ "relay": Relay(18, False, MockFactory()), "expected_pin_state": False },
{ "relay": Relay(18, True, MockFactory()), "expected_pin_state": True }
{ "relay": Driver(18, False, MockFactory()), "expected_pin_state": False },
{ "relay": Driver(18, True, MockFactory()), "expected_pin_state": True }
]
for case in cases:
case["relay"].open()
self.assertEqual(case["relay"].relay.is_lit, case["expected_pin_state"])
self.assertEqual(case["relay"].handle.is_lit, case["expected_pin_state"])

def test_is_closed(self):
cases = [
Expand All @@ -50,8 +50,8 @@ def test_is_closed(self):
{ "mocked_state": 0, "inverted": True, "expected_relay_state": True },
]
for case in cases:
relay = Relay(18, case["inverted"], MockFactory())
relay.relay.value = case["mocked_state"]
relay = Driver(18, case["inverted"], MockFactory())
relay.handle.value = case["mocked_state"]
self.assertEqual(relay.is_closed(), case["expected_relay_state"])

def test_toggle__no_argument(self):
Expand All @@ -62,32 +62,32 @@ def test_toggle__no_argument(self):
{ "mocked_state": 0, "inverted": True, "expected_pin_state": True, "expected_relay_state": False },
]
for case in cases:
relay = Relay(18, case["inverted"], MockFactory())
relay.relay.value = case["mocked_state"]
relay = Driver(18, case["inverted"], MockFactory())
relay.handle.value = case["mocked_state"]
self.assertEqual(relay.toggle(), case["expected_relay_state"])
self.assertEqual(relay.relay.is_lit, case["expected_pin_state"])
self.assertEqual(relay.handle.is_lit, case["expected_pin_state"])

def test_get_or_create_relay(self):
def test_ensure(self):
# Test creating a new relay
relay1 = Relay.get_or_create_relay(17, False, MockFactory())
self.assertEqual(len(Relay.relays), 1)
relay1 = Driver.ensure(17, False, MockFactory())
self.assertEqual(len(Driver.cache), 1)
self.assertEqual(relay1.pin, 17)
self.assertFalse(relay1.inverted)

# Test retrieving the existing relay with the same pin and inversion
relay2 = Relay.get_or_create_relay(17, True, MockFactory())
relay2 = Driver.ensure(17, True, MockFactory())
self.assertIs(relay1, relay2)
self.assertEqual(len(Relay.relays), 1) # Should still be 1
self.assertEqual(len(Driver.cache), 1) # Should still be 1

# Test retrieving the existing relay with the same pin but different inversion
relay3 = Relay.get_or_create_relay(17, True, MockFactory())
self.assertEqual(len(Relay.relays), 1) # Should still be 1
relay3 = Driver.ensure(17, True, MockFactory())
self.assertEqual(len(Driver.cache), 1) # Should still be 1
self.assertIs(relay1, relay3)
self.assertTrue(relay1.inverted) # Inversion should be updated

def tearDown(self):
# Clear relays after each test
Relay.relays = []
# Clear cache after each test
Driver.cache = []

if __name__ == "__main__":
unittest.main()
20 changes: 10 additions & 10 deletions tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
sys.modules["octoprint_octorelay.migrations"] = migrationsMock

relayMock = Mock()
relayConstructorMock = Mock(return_value=relayMock)
relayConstructorMock.get_or_create_relay = Mock(return_value=relayMock)
DriverMock = Mock(return_value=relayMock)
DriverMock.ensure = Mock(return_value=relayMock)

sys.modules["octoprint_octorelay.driver"] = Mock(
Relay=relayConstructorMock
Driver=DriverMock
)

# pylint: disable=wrong-import-position
Expand Down Expand Up @@ -461,17 +461,17 @@ def test_on_shutdown(self):
def test_input_polling(self):
# First active relay having state not equal to the one stored in model should trigger UI update
self.plugin_instance.update_ui = Mock()
relayConstructorMock.reset_mock()
DriverMock.reset_mock()
self.plugin_instance.model = {
"r1": { "active": False, "relay_pin": 4, "inverted_output": False, "relay_state": True },
"r2": { "active": True, "relay_pin": 17, "inverted_output": False, "relay_state": True },
"r3": { "active": True, "relay_pin": 18, "inverted_output": False, "relay_state": False }
}
relayMock.is_closed = Mock(return_value=True)
self.plugin_instance.input_polling()
self.assertEqual(relayConstructorMock.get_or_create_relay.call_count, 2)
relayConstructorMock.get_or_create_relay.assert_any_call(17, False)
relayConstructorMock.get_or_create_relay.assert_any_call(18, False)
self.assertEqual(DriverMock.ensure.call_count, 2)
DriverMock.ensure.assert_any_call(17, False)
DriverMock.ensure.assert_any_call(18, False)
self.plugin_instance.update_ui.assert_called_with()
self.plugin_instance._logger.debug.assert_called_with("relay: r3 has changed its pin state")

Expand Down Expand Up @@ -510,7 +510,7 @@ def test_update_ui(self):
"upcoming": None
}
self.plugin_instance.update_ui()
relayConstructorMock.get_or_create_relay.assert_called_with(17, False)
DriverMock.ensure.assert_called_with(17, False)
for index in RELAY_INDEXES:
self.plugin_instance._settings.get.assert_any_call([], merged=True)
self.plugin_instance._plugin_manager.send_plugin_message.assert_called_with(
Expand Down Expand Up @@ -555,7 +555,7 @@ def test_toggle_relay(self, system_mock):
"cmd_off": "CommandOFF"
})
self.plugin_instance.toggle_relay("r4", case["target"])
relayConstructorMock.get_or_create_relay.assert_called_with(17, case["inverted"])
DriverMock.ensure.assert_called_with(17, case["inverted"])
relayMock.toggle.assert_called_with(case["target"])
system_mock.assert_called_with(case["expectedCommand"])
if case["expectedCommand"] == "CommandON":
Expand Down Expand Up @@ -915,7 +915,7 @@ def test_handle_update_command(self, system_mock):
"target": False,
"closed": True,
"expectedError": False,
"expectedResult": False, # from the !closed returned by mocked Relay::toggle() below
"expectedResult": False, # from the !closed returned by mocked Driver::toggle() below
"expectedToggle": True,
"expectedCommand": "CommandOffMock"
},
Expand Down