Skip to content

Commit

Permalink
Merge pull request #212 from kurtmckee/test-observer
Browse files Browse the repository at this point in the history
Test and update the `Observer.py` code
  • Loading branch information
LudovicRousseau authored Oct 15, 2024
2 parents 4452941 + e0f2060 commit 042ab5f
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 24 deletions.
47 changes: 23 additions & 24 deletions src/smartcard/Observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
for all smartcard package observers.
"""

from smartcard.Synchronization import *
from __future__ import annotations

import typing

from smartcard.Synchronization import Synchronization, synchronize

class Observer:

def update(self, observable, arg):
class Observer:
def update(self, observable: Observable, arg: typing.Any) -> None:
"""Called when the observed object is
modified. You call an Observable object's
notifyObservers method to notify all the
Expand All @@ -24,53 +27,49 @@ def update(self, observable, arg):


class Observable(Synchronization):

def __init__(self):
self.obs = []
def __init__(self) -> None:
super().__init__()
self.obs: list[Observer] = []
self.changed = 0
Synchronization.__init__(self)

def addObserver(self, observer):
def addObserver(self, observer: Observer) -> None:
if observer not in self.obs:
self.obs.append(observer)

def deleteObserver(self, observer):
def deleteObserver(self, observer: Observer) -> None:
self.obs.remove(observer)

def notifyObservers(self, arg=None):
def notifyObservers(self, arg: typing.Any = None) -> None:
"""If 'changed' indicates that this object
has changed, notify all its observers, then
call clearChanged(). Each observer has its
update() called with two arguments: this
observable object and the generic 'arg'."""

self.mutex.acquire()
try:
with self.mutex:
if not self.changed:
return
# Make a local copy in case of synchronous
# additions of observers:
localArray = self.obs[:]
self.clearChanged()
finally:
self.mutex.release()
# Make a copy of the observer list.
observers = self.obs.copy()
self.changed = 0

# Update observers
for observer in localArray:
for observer in observers:
observer.update(self, arg)

def deleteObservers(self):
def deleteObservers(self) -> None:
self.obs = []

def setChanged(self):
def setChanged(self) -> None:
self.changed = 1

def clearChanged(self):
def clearChanged(self) -> None:
self.changed = 0

def hasChanged(self):
def hasChanged(self) -> int:
return self.changed

def countObservers(self):
def countObservers(self) -> int:
return len(self.obs)


Expand Down
152 changes: 152 additions & 0 deletions test/test_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import unittest.mock

import pytest

import smartcard.Observer


def test_state_changes():
"""Verify that the "changed" state can be set and cleared."""

# Base state
observable = smartcard.Observer.Observable()
assert not observable.hasChanged(), "Observable must not start in a changed state"

# Set
observable.setChanged()
assert observable.hasChanged(), ".setChanged() must *set* the state"
observable.setChanged()
assert observable.hasChanged(), ".setChanged() must not *toggle* the state"

# Clear
observable.clearChanged()
assert not observable.hasChanged(), ".clearChanged() must *clear* the state"
observable.clearChanged()
assert not observable.hasChanged(), ".clearChanged() must not *toggle* the state"


@pytest.mark.parametrize(
"unbound_method",
(
# Observer management methods
smartcard.Observer.Observable.addObserver,
smartcard.Observer.Observable.countObservers,
smartcard.Observer.Observable.deleteObserver,
smartcard.Observer.Observable.deleteObservers,
smartcard.Observer.Observable.notifyObservers,
# State management methods
smartcard.Observer.Observable.clearChanged,
smartcard.Observer.Observable.hasChanged,
smartcard.Observer.Observable.setChanged,
),
)
def test_synchronization(unbound_method):
"""Verify that all Observable methods are synchronized.
The `.mutex` attribute is mocked and is confirmed to be acquired and released.
The test implementation here doesn't necessarily make valid method calls;
it relies on the knowledge that most Observable methods are decorated
and that the lock is acquired before the underlying method is called
(and thus, before Python realizes that required method parameters are unfilled
and raises a TypeError).
"""

mutex = unittest.mock.MagicMock()
# Ensure the context manager protocol is functional.
mutex.__enter__.return_value = mutex

observable = smartcard.Observer.Observable()
observable.mutex = mutex

try:
# Call the unbound Observable method, passing *observable* as `self`.
unbound_method(observable) # noqa
except TypeError:
# Ignore TypeErrors caused by missing mandatory method arguments.
pass

mutex.__enter__.assert_called_once()
mutex.__exit__.assert_called_once()


def test_registered_observers_are_always_notified():
"""Verify all observers are notified, even if the observer list changes."""

class JealousObserver(smartcard.Observer.Observer):
update_count = 0

def update(self, observable, _):
# Try to prevent all other observers from receiving an update.
for other_observer in observers - {self}:
try:
observable.deleteObserver(other_observer)
except ValueError:
pass

# Track the number of times an update was received.
JealousObserver.update_count += 1

jealous_observer_count = 10
observers = {JealousObserver() for _ in range(jealous_observer_count)}

# Create a fair observable and notify the observers of a change.
fair_observable = smartcard.Observer.Observable()
[fair_observable.addObserver(observer) for observer in observers]
fair_observable.setChanged()
fair_observable.notifyObservers()

# Confirm that the observers have all erased each other from the list of observers.
assert fair_observable.countObservers() == 0

# Confirm that the observers were all updated, despite their jealous efforts.
assert JealousObserver.update_count == jealous_observer_count


def test_double_observer_additions():
"""Verify that an observer cannot be added twice."""

observer = smartcard.Observer.Observer()
observable = smartcard.Observer.Observable()
observable.addObserver(observer)
observable.addObserver(observer)

assert observable.countObservers() == 1


def test_double_observer_removals():
"""Verify that an observer cannot be removed twice."""

observer = smartcard.Observer.Observer()
observable = smartcard.Observer.Observable()
observable.addObserver(observer)
observable.deleteObserver(observer)

with pytest.raises(ValueError, match="x not in list"):
observable.deleteObserver(observer)


def test_no_notifications_when_no_changes():
"""Verify that no notifications are sent when no changes are detected."""

class Observer(smartcard.Observer.Observer):
def update(self, *_, **__):
raise RuntimeError("no updates were expected")

observable = smartcard.Observer.Observable()
observable.addObserver(Observer())
# Attempt to send a notification when no changes have been made.
# Nothing needs to be asserted here;
# the lack of a RuntimeError during test execution demonstrates the test passed.
observable.notifyObservers()


def test_default_observer_updates():
"""Call the default Observer.update() method.
This test exists solely for the purposes of reaching 100% test coverage.
It can be removed if Observer becomes an abstract base class.
"""

observable = smartcard.Observer.Observable()
assert smartcard.Observer.Observer().update(observable, "arg") is None

0 comments on commit 042ab5f

Please sign in to comment.