Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add testing of rs232.SerialData #147

Closed
wants to merge 9 commits into from
115 changes: 115 additions & 0 deletions pocs/tests/serial_handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# The protocol_*.py files in this package are based on PySerial's file
# test/handlers/protocol_test.py, modified for different behaviors.
# The call serial.serial_for_url("XYZ://") looks for a class Serial in a
# file named protocol_XYZ.py in this package (i.e. directory).

print("Importing serial_handlers __init__.py")
print("__name__:", __name__)
print("__file__:", __file__)

from serial import serialutil


class NoOpSerial(serialutil.SerialBase):
"""No-op implementation of PySerial's SerialBase.

Provides no-op implementation of various methods that SerialBase expects
to have implemented by the sub-class. Can be used as is for a /dev/null
type of behavior.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

@property
def in_waiting(self):
"""The number of input bytes available to read immediately."""
return 0

def open(self):
"""Open port.

Raises:
SerialException if the port cannot be opened.
"""
self.is_open = True

def close(self):
"""Close port immediately."""
self.is_open = False

def read(self, size=1):
"""Read size bytes.

If a timeout is set it may return fewer characters than requested.
With no timeout it will block until the requested number of bytes
is read.

Args:
size: Number of bytes to read.

Returns:
Bytes read from the port, of type 'bytes'.

Raises:
SerialTimeoutException: In case a write timeout is configured for
the port and the time is exceeded.
"""
if not self.is_open:
raise serialutil.portNotOpenError
return bytes()

def write(self, data):
"""
Args:
data: The data to write.

Returns:
Number of bytes written.

Raises:
SerialTimeoutException: In case a write timeout is configured for
the port and the time is exceeded.
"""
if not self.is_open:
raise serialutil.portNotOpenError
return 0

# --------------------------------------------------------------------------
# There are a number of methods called by SerialBase that need to be
# implemented by sub-classes, assuming their calls haven't been blocked
# by replacing the calling methods/properties. These are no-op
# implementations.

def _reconfigure_port(self):
"""Reconfigure the open port after a property has been changed.

If you need to know which property has been changed, override the
setter for the appropriate properties.
"""
pass

def _update_rts_state(self):
"""Handle rts being set to some value.

"self.rts = value" has been executed, for some value. This may not
have changed the value.
"""
pass

def _update_dtr_state(self):
"""Handle dtr being set to some value.

"self.dtr = value" has been executed, for some value. This may not
have changed the value.
"""
pass

def _update_break_state(self):
"""Handle break_condition being set to some value.

"self.break_condition = value" has been executed, for some value.
This may not have changed the value.
Note that break_condition is set and then cleared by send_break().
"""
pass
105 changes: 105 additions & 0 deletions pocs/tests/serial_handlers/protocol_buffers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# This module implements a handler for serial_for_url("buffers://").

print("Importing protocol_buffers.py")
print("__name__:", __name__)
print("__file__:", __file__)

from pocs.tests.serial_handlers import NoOpSerial

import io
import threading

# r_buffer and w_buffer are binary I/O buffers. read(size=N) on an instance
# of Serial reads the next N bytes from r_buffer, and write(data) appends the
# bytes of data to w_buffer.
# NOTE: The caller (a test) is responsible for resetting buffers before tests.
_r_buffer = None
_w_buffer = None

# The above I/O buffers are not thread safe, so we need to lock them during
# access.
_r_lock = threading.Lock()
_w_lock = threading.Lock()


def ResetBuffers(read_data=None):
SetRBufferValue(read_data)
with _w_lock:
global _w_buffer
_w_buffer = io.BytesIO()


def SetRBufferValue(data):
"""Sets the r buffer to data (a bytes object)."""
if data and not isinstance(data, (bytes, bytearray)):
raise TypeError("data must by a bytes or bytearray object.")
with _r_lock:
global _r_buffer
_r_buffer = io.BytesIO(data)


def GetWBufferValue():
"""Returns an immutable bytes object with the value of the w buffer."""
with _w_lock:
if _w_buffer:
return _w_buffer.getvalue()


class BuffersSerial(NoOpSerial):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

@property
def in_waiting(self):
if not self.is_open:
raise serialutil.portNotOpenError
with _r_lock:
return len(_r_buffer.getbuffer()) - _r_buffer.tell()

def read(self, size=1):
"""Read size bytes.

If a timeout is set it may return fewer characters than requested.
With no timeout it will block until the requested number of bytes
is read.

Args:
size: Number of bytes to read.

Returns:
Bytes read from the port, of type 'bytes'.

Raises:
SerialTimeoutException: In case a write timeout is configured for
the port and the time is exceeded.
"""
if not self.is_open:
raise serialutil.portNotOpenError
with _r_lock:
# TODO(jamessynge): Figure out whether and how to handle timeout.
# We might choose to generate a timeout if the caller asks for data
# beyond the end of the buffer; or simply return what is left,
# including nothing (i.e. bytes()) if there is nothing left.
return _r_buffer.read(size)

def write(self, data):
"""
Args:
data: The data to write.

Returns:
Number of bytes written.

Raises:
SerialTimeoutException: In case a write timeout is configured for
the port and the time is exceeded.
"""
if not isinstance(data, (bytes, bytearray)):
raise TypeError("data must by a bytes or bytearray object.")
if not self.is_open:
raise serialutil.portNotOpenError
with _w_lock:
return _w_buffer.write(data)


Serial = BuffersSerial
31 changes: 31 additions & 0 deletions pocs/tests/serial_handlers/protocol_hooked.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# This module enables a test to provide a handler for "hooked://..." urls
# passed into serial.serial_for_url. To do so, set the value of
# serial_class_for_url from your test to a function with the same API as
# ExampleSerialClassForUrl. Or assign your class to Serial.

from pocs.tests.serial_handlers import NoOpSerial


def ExampleSerialClassForUrl(url):
"""Implementation of serial_class_for_url called by serial.serial_for_url.

Returns the url, possibly modified, and a factory function to be called to
create an instance of a SerialBase sub-class (or at least behaves like it).
You can return a class as that factory function, as calling a class creates
an instance of that class.

serial.serial_for_url will call that factory function with None as the
port parameter (the first), and after creating the instance will assign
the url to the port property of the instance.

Returns:
A tuple (url, factory).
"""
return url, Serial


# Assign to this global variable from a test to override this default behavior.
serial_class_for_url = ExampleSerialClassForUrl

# Or assign your own class to this global variable.
Serial = NoOpSerial
6 changes: 6 additions & 0 deletions pocs/tests/serial_handlers/protocol_no_op.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# This module implements a handler for serial_for_url("no_op://").

from pocs.tests.serial_handlers import NoOpSerial

# Export it as Serial so that it will be picked up by PySerial's serial_for_url.
Serial = NoOpSerial
Loading