Skip to content

Commit

Permalink
Ref: naming entities in the driver (#283)
Browse files Browse the repository at this point in the history
* Renaming Relay::relays to cache.

* Renaming Relay::get_or_create_relay() to ensure().

* Renaming class Relay to Driver.

* Renaming Driver::relay to handle.

* Fix test suite name.

* Renaming to DriverMock in main test.
  • Loading branch information
RobinTail authored Jul 10, 2024
1 parent d350c83 commit 9d868c1
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 50 deletions.
12 changes: 6 additions & 6 deletions octoprint_octorelay/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
STARTUP, PRINTING_STOPPED, PRINTING_STARTED, PRIORITIES, FALLBACK_PRIORITY, PREEMPTIVE_CANCELLATION_CUTOFF,
CANCEL_TASK_COMMAND, USER_ACTION, TURNED_ON
)
from .driver import Relay
from .driver import Driver
from .task import Task
from .listing import Listing
from .migrations import migrate
Expand Down Expand Up @@ -108,7 +108,7 @@ def handle_list_all_command(self) -> Listing:
settings = self._settings.get([], merged=True) # expensive
for index in RELAY_INDEXES:
if bool(settings[index]["active"]):
relay = Relay.get_or_create_relay(
relay = Driver.ensure(
int(settings[index]["relay_pin"] or 0),
bool(settings[index]["inverted_output"])
)
Expand All @@ -124,7 +124,7 @@ def handle_get_status_command(self, index: str) -> bool:
settings = self._settings.get([index], merged=True) # expensive
if not bool(settings["active"]):
raise HandlingException(400)
return Relay.get_or_create_relay(
return Driver.ensure(
int(settings["relay_pin"] or 0),
bool(settings["inverted_output"])
).is_closed()
Expand Down Expand Up @@ -241,7 +241,7 @@ def toggle_relay(self, index, target: Optional[bool] = None) -> bool:
self._printer.disconnect()
pin = int(settings["relay_pin"] or 0)
inverted = bool(settings["inverted_output"])
relay = Relay.get_or_create_relay(pin, inverted)
relay = Driver.ensure(pin, inverted)
self._logger.debug(
f"Toggling the relay {index} on pin {pin}" if target is None else
f"Turning the relay {index} {'ON' if target else 'OFF'} (pin {pin})"
Expand Down Expand Up @@ -313,7 +313,7 @@ def update_ui(self):
))
for index in RELAY_INDEXES:
active = bool(settings[index]["active"])
relay = Relay.get_or_create_relay(
relay = Driver.ensure(
int(settings[index]["relay_pin"] or 0),
bool(settings[index]["inverted_output"])
)
Expand Down Expand Up @@ -362,7 +362,7 @@ def input_polling(self):
for index in RELAY_INDEXES:
active = self.model[index]["active"]
model_state = self.model[index]["relay_state"] # bool since v3.1
actual_state = Relay.get_or_create_relay(
actual_state = Driver.ensure(
self.model[index]["relay_pin"],
self.model[index]["inverted_output"]
).is_closed() if active else False
Expand Down
16 changes: 8 additions & 8 deletions octoprint_octorelay/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
def xor(left: bool, right: bool) -> bool:
return left is not right

class Relay():
relays: List["Relay"] = []
class Driver():
cache: List["Driver"] = []

def __init__(self, pin: int, inverted: bool, pin_factory=None):
self.pin = pin # GPIO pin
self.inverted = inverted # marks the relay as normally closed
self.relay = LED(pin, pin_factory=pin_factory)
self.handle = LED(pin, pin_factory=pin_factory)

def __repr__(self) -> str:
return f"{type(self).__name__}(pin={self.pin},inverted={self.inverted},closed={self.is_closed()})"
Expand All @@ -26,7 +26,7 @@ def open(self):

def is_closed(self) -> bool:
"""Returns the logical state of the relay."""
return xor(self.inverted, self.relay.is_lit)
return xor(self.inverted, self.handle.is_lit)

def toggle(self, desired_state: Optional[bool] = None) -> bool:
"""
Expand All @@ -36,16 +36,16 @@ def toggle(self, desired_state: Optional[bool] = None) -> bool:
"""
if desired_state is None:
desired_state = not self.is_closed()
(self.relay.on if xor(self.inverted, desired_state) else self.relay.off)()
(self.handle.on if xor(self.inverted, desired_state) else self.handle.off)()
return desired_state

@classmethod
def get_or_create_relay(cls, pin: int, inverted: bool, pin_factory=None):
for relay in cls.relays:
def ensure(cls, pin: int, inverted: bool, pin_factory=None):
for relay in cls.cache:
if relay.pin == pin:
if xor(relay.inverted, inverted):
relay.inverted = inverted
return relay
relay = cls(pin, inverted, pin_factory)
cls.relays.append(relay)
cls.cache.append(relay)
return relay
52 changes: 26 additions & 26 deletions tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,42 @@
from gpiozero.pins.mock import MockFactory

# pylint: disable=wrong-import-position
from octoprint_octorelay.driver import Relay
from octoprint_octorelay.driver import Driver

# avoid keeping other modules automatically imported by this test
del sys.modules["octoprint_octorelay"]
del sys.modules["octoprint_octorelay.migrations"]
del sys.modules["octoprint_octorelay.task"]

class TestRelayDriver(unittest.TestCase):
class TestDriver(unittest.TestCase):
def test_constructor(self):
relay = Relay(18, True, MockFactory())
self.assertIsInstance(relay, Relay)
relay = Driver(18, True, MockFactory())
self.assertIsInstance(relay, Driver)
self.assertEqual(relay.pin, 18)
self.assertTrue(relay.inverted)

def test_serialization(self):
relay = Relay(18, True, MockFactory())
relay = Driver(18, True, MockFactory())
serialization = f"{relay}"
self.assertEqual(serialization, "Relay(pin=18,inverted=True,closed=True)")
self.assertEqual(serialization, "Driver(pin=18,inverted=True,closed=True)")

def test_close(self):
cases = [
{ "relay": Relay(18, False, MockFactory()), "expected_pin_state": True },
{ "relay": Relay(18, True, MockFactory()), "expected_pin_state": False }
{ "relay": Driver(18, False, MockFactory()), "expected_pin_state": True },
{ "relay": Driver(18, True, MockFactory()), "expected_pin_state": False }
]
for case in cases:
case["relay"].close()
self.assertEqual(case["relay"].relay.is_lit, case["expected_pin_state"])
self.assertEqual(case["relay"].handle.is_lit, case["expected_pin_state"])

def test_open(self):
cases = [
{ "relay": Relay(18, False, MockFactory()), "expected_pin_state": False },
{ "relay": Relay(18, True, MockFactory()), "expected_pin_state": True }
{ "relay": Driver(18, False, MockFactory()), "expected_pin_state": False },
{ "relay": Driver(18, True, MockFactory()), "expected_pin_state": True }
]
for case in cases:
case["relay"].open()
self.assertEqual(case["relay"].relay.is_lit, case["expected_pin_state"])
self.assertEqual(case["relay"].handle.is_lit, case["expected_pin_state"])

def test_is_closed(self):
cases = [
Expand All @@ -50,8 +50,8 @@ def test_is_closed(self):
{ "mocked_state": 0, "inverted": True, "expected_relay_state": True },
]
for case in cases:
relay = Relay(18, case["inverted"], MockFactory())
relay.relay.value = case["mocked_state"]
relay = Driver(18, case["inverted"], MockFactory())
relay.handle.value = case["mocked_state"]
self.assertEqual(relay.is_closed(), case["expected_relay_state"])

def test_toggle__no_argument(self):
Expand All @@ -62,32 +62,32 @@ def test_toggle__no_argument(self):
{ "mocked_state": 0, "inverted": True, "expected_pin_state": True, "expected_relay_state": False },
]
for case in cases:
relay = Relay(18, case["inverted"], MockFactory())
relay.relay.value = case["mocked_state"]
relay = Driver(18, case["inverted"], MockFactory())
relay.handle.value = case["mocked_state"]
self.assertEqual(relay.toggle(), case["expected_relay_state"])
self.assertEqual(relay.relay.is_lit, case["expected_pin_state"])
self.assertEqual(relay.handle.is_lit, case["expected_pin_state"])

def test_get_or_create_relay(self):
def test_ensure(self):
# Test creating a new relay
relay1 = Relay.get_or_create_relay(17, False, MockFactory())
self.assertEqual(len(Relay.relays), 1)
relay1 = Driver.ensure(17, False, MockFactory())
self.assertEqual(len(Driver.cache), 1)
self.assertEqual(relay1.pin, 17)
self.assertFalse(relay1.inverted)

# Test retrieving the existing relay with the same pin and inversion
relay2 = Relay.get_or_create_relay(17, True, MockFactory())
relay2 = Driver.ensure(17, True, MockFactory())
self.assertIs(relay1, relay2)
self.assertEqual(len(Relay.relays), 1) # Should still be 1
self.assertEqual(len(Driver.cache), 1) # Should still be 1

# Test retrieving the existing relay with the same pin but different inversion
relay3 = Relay.get_or_create_relay(17, True, MockFactory())
self.assertEqual(len(Relay.relays), 1) # Should still be 1
relay3 = Driver.ensure(17, True, MockFactory())
self.assertEqual(len(Driver.cache), 1) # Should still be 1
self.assertIs(relay1, relay3)
self.assertTrue(relay1.inverted) # Inversion should be updated

def tearDown(self):
# Clear relays after each test
Relay.relays = []
# Clear cache after each test
Driver.cache = []

if __name__ == "__main__":
unittest.main()
20 changes: 10 additions & 10 deletions tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
sys.modules["octoprint_octorelay.migrations"] = migrationsMock

relayMock = Mock()
relayConstructorMock = Mock(return_value=relayMock)
relayConstructorMock.get_or_create_relay = Mock(return_value=relayMock)
DriverMock = Mock(return_value=relayMock)
DriverMock.ensure = Mock(return_value=relayMock)

sys.modules["octoprint_octorelay.driver"] = Mock(
Relay=relayConstructorMock
Driver=DriverMock
)

# pylint: disable=wrong-import-position
Expand Down Expand Up @@ -461,17 +461,17 @@ def test_on_shutdown(self):
def test_input_polling(self):
# First active relay having state not equal to the one stored in model should trigger UI update
self.plugin_instance.update_ui = Mock()
relayConstructorMock.reset_mock()
DriverMock.reset_mock()
self.plugin_instance.model = {
"r1": { "active": False, "relay_pin": 4, "inverted_output": False, "relay_state": True },
"r2": { "active": True, "relay_pin": 17, "inverted_output": False, "relay_state": True },
"r3": { "active": True, "relay_pin": 18, "inverted_output": False, "relay_state": False }
}
relayMock.is_closed = Mock(return_value=True)
self.plugin_instance.input_polling()
self.assertEqual(relayConstructorMock.get_or_create_relay.call_count, 2)
relayConstructorMock.get_or_create_relay.assert_any_call(17, False)
relayConstructorMock.get_or_create_relay.assert_any_call(18, False)
self.assertEqual(DriverMock.ensure.call_count, 2)
DriverMock.ensure.assert_any_call(17, False)
DriverMock.ensure.assert_any_call(18, False)
self.plugin_instance.update_ui.assert_called_with()
self.plugin_instance._logger.debug.assert_called_with("relay: r3 has changed its pin state")

Expand Down Expand Up @@ -510,7 +510,7 @@ def test_update_ui(self):
"upcoming": None
}
self.plugin_instance.update_ui()
relayConstructorMock.get_or_create_relay.assert_called_with(17, False)
DriverMock.ensure.assert_called_with(17, False)
for index in RELAY_INDEXES:
self.plugin_instance._settings.get.assert_any_call([], merged=True)
self.plugin_instance._plugin_manager.send_plugin_message.assert_called_with(
Expand Down Expand Up @@ -555,7 +555,7 @@ def test_toggle_relay(self, system_mock):
"cmd_off": "CommandOFF"
})
self.plugin_instance.toggle_relay("r4", case["target"])
relayConstructorMock.get_or_create_relay.assert_called_with(17, case["inverted"])
DriverMock.ensure.assert_called_with(17, case["inverted"])
relayMock.toggle.assert_called_with(case["target"])
system_mock.assert_called_with(case["expectedCommand"])
if case["expectedCommand"] == "CommandON":
Expand Down Expand Up @@ -915,7 +915,7 @@ def test_handle_update_command(self, system_mock):
"target": False,
"closed": True,
"expectedError": False,
"expectedResult": False, # from the !closed returned by mocked Relay::toggle() below
"expectedResult": False, # from the !closed returned by mocked Driver::toggle() below
"expectedToggle": True,
"expectedCommand": "CommandOffMock"
},
Expand Down

0 comments on commit 9d868c1

Please sign in to comment.