-
Notifications
You must be signed in to change notification settings - Fork 49
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add testing of rs232.SerialData (#164)
A mock serial device that can be used for testing purposes * Testing of serial attached devices, but with fake devices. * Allows a handler to be used based on the url of the serial "device". * Demonstrating how to hook (or mock) PySerial.
- Loading branch information
1 parent
fd8ae2a
commit b4e5b8d
Showing
7 changed files
with
503 additions
and
53 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
# The protocol_*.py files in this package are based on PySerial's file | ||
# test/handlers/protocol_test.py, modified for different behaviors. | ||
# The call serial.serial_for_url("XYZ://") looks for a class Serial in a | ||
# file named protocol_XYZ.py in this package (i.e. directory). | ||
|
||
print("Importing serial_handlers __init__.py") | ||
print("__name__:", __name__) | ||
print("__file__:", __file__) | ||
|
||
from serial import serialutil | ||
|
||
|
||
class NoOpSerial(serialutil.SerialBase): | ||
"""No-op implementation of PySerial's SerialBase. | ||
Provides no-op implementation of various methods that SerialBase expects | ||
to have implemented by the sub-class. Can be used as is for a /dev/null | ||
type of behavior. | ||
""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
@property | ||
def in_waiting(self): | ||
"""The number of input bytes available to read immediately.""" | ||
return 0 | ||
|
||
def open(self): | ||
"""Open port. | ||
Raises: | ||
SerialException if the port cannot be opened. | ||
""" | ||
self.is_open = True | ||
|
||
def close(self): | ||
"""Close port immediately.""" | ||
self.is_open = False | ||
|
||
def read(self, size=1): | ||
"""Read size bytes. | ||
If a timeout is set it may return fewer characters than requested. | ||
With no timeout it will block until the requested number of bytes | ||
is read. | ||
Args: | ||
size: Number of bytes to read. | ||
Returns: | ||
Bytes read from the port, of type 'bytes'. | ||
Raises: | ||
SerialTimeoutException: In case a write timeout is configured for | ||
the port and the time is exceeded. | ||
""" | ||
if not self.is_open: | ||
raise serialutil.portNotOpenError | ||
return bytes() | ||
|
||
def write(self, data): | ||
""" | ||
Args: | ||
data: The data to write. | ||
Returns: | ||
Number of bytes written. | ||
Raises: | ||
SerialTimeoutException: In case a write timeout is configured for | ||
the port and the time is exceeded. | ||
""" | ||
if not self.is_open: | ||
raise serialutil.portNotOpenError | ||
return 0 | ||
|
||
# -------------------------------------------------------------------------- | ||
# There are a number of methods called by SerialBase that need to be | ||
# implemented by sub-classes, assuming their calls haven't been blocked | ||
# by replacing the calling methods/properties. These are no-op | ||
# implementations. | ||
|
||
def _reconfigure_port(self): | ||
"""Reconfigure the open port after a property has been changed. | ||
If you need to know which property has been changed, override the | ||
setter for the appropriate properties. | ||
""" | ||
pass | ||
|
||
def _update_rts_state(self): | ||
"""Handle rts being set to some value. | ||
"self.rts = value" has been executed, for some value. This may not | ||
have changed the value. | ||
""" | ||
pass | ||
|
||
def _update_dtr_state(self): | ||
"""Handle dtr being set to some value. | ||
"self.dtr = value" has been executed, for some value. This may not | ||
have changed the value. | ||
""" | ||
pass | ||
|
||
def _update_break_state(self): | ||
"""Handle break_condition being set to some value. | ||
"self.break_condition = value" has been executed, for some value. | ||
This may not have changed the value. | ||
Note that break_condition is set and then cleared by send_break(). | ||
""" | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
# This module implements a handler for serial_for_url("buffers://"). | ||
|
||
print("Importing protocol_buffers.py") | ||
print("__name__:", __name__) | ||
print("__file__:", __file__) | ||
|
||
from pocs.tests.serial_handlers import NoOpSerial | ||
|
||
import io | ||
import threading | ||
|
||
# r_buffer and w_buffer are binary I/O buffers. read(size=N) on an instance | ||
# of Serial reads the next N bytes from r_buffer, and write(data) appends the | ||
# bytes of data to w_buffer. | ||
# NOTE: The caller (a test) is responsible for resetting buffers before tests. | ||
_r_buffer = None | ||
_w_buffer = None | ||
|
||
# The above I/O buffers are not thread safe, so we need to lock them during | ||
# access. | ||
_r_lock = threading.Lock() | ||
_w_lock = threading.Lock() | ||
|
||
|
||
def ResetBuffers(read_data=None): | ||
SetRBufferValue(read_data) | ||
with _w_lock: | ||
global _w_buffer | ||
_w_buffer = io.BytesIO() | ||
|
||
|
||
def SetRBufferValue(data): | ||
"""Sets the r buffer to data (a bytes object).""" | ||
if data and not isinstance(data, (bytes, bytearray)): | ||
raise TypeError("data must by a bytes or bytearray object.") | ||
with _r_lock: | ||
global _r_buffer | ||
_r_buffer = io.BytesIO(data) | ||
|
||
|
||
def GetWBufferValue(): | ||
"""Returns an immutable bytes object with the value of the w buffer.""" | ||
with _w_lock: | ||
if _w_buffer: | ||
return _w_buffer.getvalue() | ||
|
||
|
||
class BuffersSerial(NoOpSerial): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
@property | ||
def in_waiting(self): | ||
if not self.is_open: | ||
raise serialutil.portNotOpenError | ||
with _r_lock: | ||
return len(_r_buffer.getbuffer()) - _r_buffer.tell() | ||
|
||
def read(self, size=1): | ||
"""Read size bytes. | ||
If a timeout is set it may return fewer characters than requested. | ||
With no timeout it will block until the requested number of bytes | ||
is read. | ||
Args: | ||
size: Number of bytes to read. | ||
Returns: | ||
Bytes read from the port, of type 'bytes'. | ||
Raises: | ||
SerialTimeoutException: In case a write timeout is configured for | ||
the port and the time is exceeded. | ||
""" | ||
if not self.is_open: | ||
raise serialutil.portNotOpenError | ||
with _r_lock: | ||
# TODO(jamessynge): Figure out whether and how to handle timeout. | ||
# We might choose to generate a timeout if the caller asks for data | ||
# beyond the end of the buffer; or simply return what is left, | ||
# including nothing (i.e. bytes()) if there is nothing left. | ||
return _r_buffer.read(size) | ||
|
||
def write(self, data): | ||
""" | ||
Args: | ||
data: The data to write. | ||
Returns: | ||
Number of bytes written. | ||
Raises: | ||
SerialTimeoutException: In case a write timeout is configured for | ||
the port and the time is exceeded. | ||
""" | ||
if not isinstance(data, (bytes, bytearray)): | ||
raise TypeError("data must by a bytes or bytearray object.") | ||
if not self.is_open: | ||
raise serialutil.portNotOpenError | ||
with _w_lock: | ||
return _w_buffer.write(data) | ||
|
||
|
||
Serial = BuffersSerial |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# This module enables a test to provide a handler for "hooked://..." urls | ||
# passed into serial.serial_for_url. To do so, set the value of | ||
# serial_class_for_url from your test to a function with the same API as | ||
# ExampleSerialClassForUrl. Or assign your class to Serial. | ||
|
||
from pocs.tests.serial_handlers import NoOpSerial | ||
|
||
|
||
def ExampleSerialClassForUrl(url): | ||
"""Implementation of serial_class_for_url called by serial.serial_for_url. | ||
Returns the url, possibly modified, and a factory function to be called to | ||
create an instance of a SerialBase sub-class (or at least behaves like it). | ||
You can return a class as that factory function, as calling a class creates | ||
an instance of that class. | ||
serial.serial_for_url will call that factory function with None as the | ||
port parameter (the first), and after creating the instance will assign | ||
the url to the port property of the instance. | ||
Returns: | ||
A tuple (url, factory). | ||
""" | ||
return url, Serial | ||
|
||
|
||
# Assign to this global variable from a test to override this default behavior. | ||
serial_class_for_url = ExampleSerialClassForUrl | ||
|
||
# Or assign your own class to this global variable. | ||
Serial = NoOpSerial |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
# This module implements a handler for serial_for_url("no_op://"). | ||
|
||
from pocs.tests.serial_handlers import NoOpSerial | ||
|
||
# Export it as Serial so that it will be picked up by PySerial's serial_for_url. | ||
Serial = NoOpSerial |
Oops, something went wrong.