Skip to content

Commit

Permalink
Added support for sending messages after a certain time.
Browse files Browse the repository at this point in the history
  • Loading branch information
TD22057 committed Sep 15, 2018
1 parent 211dbe9 commit 1593430
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 35 deletions.
52 changes: 43 additions & 9 deletions insteon_mqtt/Protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, link):

# Forward poll() calls from the network link to ourselves.
# That way we can test for write message time outs periodically.
self._linkPoll = self.link.poll
self.link.poll = self._poll

# Connect the link read/write signals to our callback methods.
Expand Down Expand Up @@ -102,7 +103,18 @@ def __init__(self, link):
# this time.
self._read_history = []

# TODO: doc
# List of Msg.Timed objects which store a message and a time at which
# to send the message. It will be sorted by send time. These are
# messages that should be sent after a certain time has passed. The
# _poll() call will this and push them onto the message queue when
# the current time is after the message time.
self._timed_messages = []

# Next time that a message can be written. When a message is read,
# we wait until it's expiration time (which is set by the hop count)
# until we send another message. Sending messages before a message
# could expire w/ Insteon is a good way to cancel previous command so
# we try and avoid that.
self._next_write_time = 0

#-----------------------------------------------------------------------
Expand Down Expand Up @@ -143,7 +155,7 @@ def load_config(self, config):
self.link.load_config(config)

#-----------------------------------------------------------------------
def send(self, msg, msg_handler, high_priority=False):
def send(self, msg, msg_handler, high_priority=False, after=None):
"""Write a message to the PLM modem.
If there are no other messages in the queue, the message gets
Expand All @@ -167,15 +179,30 @@ def send(self, msg, msg_handler, high_priority=False):
the handler returns the message.FINISHED flags.
high_priority: (bool)False to add the message at the end of the
queue. True to insert this message at the start of
the queue.
the queue. This is ignored in timed messages.
after: (float) Unix clock time tag to send the message
after. If None, the message is sent as soon as
possible. Exact time is not guaranteed - the
message will be send no earlier than this.
"""
if not high_priority:
# If the time is input, append the inputs to the timer list and sort
# the list by the times field.
if after is not None:
timed = Msg.Timed(msg, msg_handler, high_priority, after)
self._timed_messages.append(timed)
self._timed_messages.sort(key=lambda i: i.time)
return

# Normal message queue.
elif not high_priority:
self._write_queue.append((msg, msg_handler))

# High priority messages insert at the front of the queue.
else:
self._write_queue.insert(0, (msg, msg_handler))

# If there is an existing msg that we're processing replies
# for then delay sending this until we're done.
# If there are no existing messages that we're processing replies
# for, send the message immediately.
if not self._write_handler:
self._send_next_msg()

Expand All @@ -190,12 +217,19 @@ def _poll(self, t):
Args:
t: (float) Current Unix clock time tag.
"""
if not self._write_handler:
return
# Call the link poll function in case it needs to do something.
self._linkPoll(t)

# See if any timed messages should sent.
while self._timed_messages and self._timed_messages[0].is_active(t):
timed = self._timed_messages.pop(0)
LOG.info("Moving timer based message to queue: %s", timed.msg)
timed.send(self)

# Ask the write handler if it's past the time out in which
# case we'll mark this message as finished and move on.
if self._write_handler.is_expired(self, t):
if self._write_handler and \
self._write_handler.is_expired(self, t):
self._write_finished()

#-----------------------------------------------------------------------
Expand Down
8 changes: 6 additions & 2 deletions insteon_mqtt/device/Base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def type(self):
return self.__class__.__name__

#-----------------------------------------------------------------------
def send(self, msg, msg_handler, high_priority=False):
def send(self, msg, msg_handler, high_priority=False, after=None):
"""Send a message to the device.
This will use the history of messages received from the device to set
Expand All @@ -137,11 +137,15 @@ def send(self, msg, msg_handler, high_priority=False):
high_priority: (bool)False to add the message at the end of the
queue. True to insert this message at the start of
the queue.
after: (float) Unix clock time tag to send the message
after. If None, the message is sent as soon as
possible. Exact time is not guaranteed - the
message will be send no earlier than this.
"""
if isinstance(msg, Msg.OutStandard): # handles OutExtended as well
msg.flags.set_hops(self.history.avg_hops())

self.protocol.send(msg, msg_handler, high_priority)
self.protocol.send(msg, msg_handler, high_priority, after)

#-----------------------------------------------------------------------
def db_path(self):
Expand Down
59 changes: 59 additions & 0 deletions insteon_mqtt/message/Timed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#===========================================================================
#
# Timed message class
#
#===========================================================================


class Timed:
"""Times message container
This stores a message and time at which the message should be sent and is
used by the Protocol class for storing a message that should be sent at
some later time.
"""

#-----------------------------------------------------------------------
def __init__(self, msg, msg_handler, high_priority, after):
"""Constructor
Args:
msg: Output message to write. This should be an
instance of a message in the message directory that
that starts with 'Out'.
msg_handler: Message handler instance to use when replies to the
message are received. Any message received after we
write out the msg are passed to this handler until
the handler returns the message.FINISHED flags.
high_priority: (bool)False to add the message at the end of the
queue. True to insert this message at the start of
the queue. This is ignored in timed messages.
after: (float) Unix clock time tag to send the message
after. If None, the message is sent as soon as
possible. Exact time is not guaranteed - the
message will be send no earlier than this.
"""
self.msg = msg
self.msg_handler = msg_handler
self.high_priority = high_priority
self.time = after

#-----------------------------------------------------------------------
def is_active(self, t):
"""Return True if the message should be sent.
Args:
t: (float) Current Unix clock time.
"""
return t >= self.time

#-----------------------------------------------------------------------
def send(self, protocol):
"""Send the message.
Args:
protocol: (Protocol) The Protocol class to use.
"""
protocol.send(self.msg, self.msg_handler, self.high_priority)

#===========================================================================
4 changes: 3 additions & 1 deletion insteon_mqtt/message/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
results.
"""

from .Base import Base
from .Timed import Timed

# Bit level message flags
from .DbFlags import DbFlags
from .Flags import Flags
from .Base import Base

# Messages from PLM modem to the host (codes >= 0x60)
from .InpAllLinkComplete import InpAllLinkComplete
Expand Down
2 changes: 1 addition & 1 deletion tests/db/test_DeviceModifyManagerI1.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class MockProto:
def __init__(self):
self.msgs = []

def send(self, msg, handler, high_priority=False):
def send(self, msg, handler, high_priority=False, after=None):
self.msgs.append(msg)

class MockModem():
Expand Down
42 changes: 21 additions & 21 deletions tests/db/test_DeviceScanManagerI1.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ def test_start_scan(self):
device = MockDevice()
device_db = IM.db.Device(IM.Address(0x01, 0x02, 0x03))
manager = IM.db.DeviceScanManagerI1(device, device_db)

db_msg = Msg.OutStandard.direct(device_db.addr, 0x28, 0x0F)

manager.start_scan()
assert device.msgs[0].to_bytes() == db_msg.to_bytes()

Expand All @@ -28,14 +28,14 @@ def test_handle_set_msb(self):
manager = IM.db.DeviceScanManagerI1(device, device_db)
on_done = None
manager.msb = 0x0F

# Test bad MSB, should cause resend of set msb
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x28, 0x0E)
db_msg = Msg.OutStandard.direct(device_db.addr, 0x28, 0x0F)
manager.handle_set_msb(msg, on_done)
assert device.msgs[0].to_bytes() == db_msg.to_bytes()

# Test receive correct msb
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x28, 0x0F)
Expand All @@ -54,7 +54,7 @@ def test_handle_get_lsb(self):
calls = []
def callback(success, msg, data):
calls.append(msg)

# Test receiving a record e2 01 3a 29 84 01 0e 43
# Link Flag
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
Expand All @@ -63,63 +63,63 @@ def callback(success, msg, data):
manager.handle_get_lsb(msg, on_done)
assert device.msgs[0].to_bytes() == db_msg.to_bytes()
assert len(manager.record) == 1

# Group
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x2B, 0x01)
db_msg = Msg.OutStandard.direct(device_db.addr, 0x2B, 0xFA)
manager.handle_get_lsb(msg, on_done)
assert device.msgs[1].to_bytes() == db_msg.to_bytes()
assert len(manager.record) == 2

# Address High
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x2B, 0x3A)
db_msg = Msg.OutStandard.direct(device_db.addr, 0x2B, 0xFB)
manager.handle_get_lsb(msg, on_done)
assert device.msgs[2].to_bytes() == db_msg.to_bytes()
assert len(manager.record) == 3

# Address Mid
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x2B, 0x29)
db_msg = Msg.OutStandard.direct(device_db.addr, 0x2B, 0xFC)
manager.handle_get_lsb(msg, on_done)
assert device.msgs[3].to_bytes() == db_msg.to_bytes()
assert len(manager.record) == 4

# Address Low
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x2B, 0x84)
db_msg = Msg.OutStandard.direct(device_db.addr, 0x2B, 0xFD)
manager.handle_get_lsb(msg, on_done)
assert device.msgs[4].to_bytes() == db_msg.to_bytes()
assert len(manager.record) == 5

# Address D1
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x2B, 0x01)
db_msg = Msg.OutStandard.direct(device_db.addr, 0x2B, 0xFE)
manager.handle_get_lsb(msg, on_done)
assert device.msgs[5].to_bytes() == db_msg.to_bytes()
assert len(manager.record) == 6

# Address D2
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x2B, 0x0E)
db_msg = Msg.OutStandard.direct(device_db.addr, 0x2B, 0xFF)
manager.handle_get_lsb(msg, on_done)
assert device.msgs[6].to_bytes() == db_msg.to_bytes()
assert len(manager.record) == 7

# Address D3
flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x2B, 0x43)
db_msg = Msg.OutStandard.direct(device_db.addr, 0x2B, 0xF0)
manager.handle_get_lsb(msg, on_done)
assert device.msgs[7].to_bytes() == db_msg.to_bytes()
assert len(manager.record) == 0

db_flags = Msg.DbFlags(in_use=True, is_controller=True,
is_last_rec=False)
raw = [0x00, 0x01,
Expand All @@ -129,29 +129,29 @@ def callback(success, msg, data):
0x3a, 0x29, 0x84,
0x01, 0x0E, 0x43, 0x06]
entry = IM.db.DeviceEntry.from_bytes(bytes(raw))

assert len(device_db.entries) == 1
assert len(device_db.unused) == 0
assert len(device_db.groups) == 1

grp = device_db.find_group(0x01)
assert len(grp) == 1
assert grp[0].to_bytes() == entry.to_bytes()
# test changing MSB

# test changing MSB
manager.record = [0xe2,0x01,3,4,5,6,7]
manager.lsb = 0x07

flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x2B, 0x08)
manager.handle_get_lsb(msg, on_done)
assert device.msgs[8].cmd2 == 0x0E

# test on_done callback on last record
flags = Msg.DbFlags(True, True, True)
manager.record = [flags.to_bytes()[0],0x01,3,4,5,6,7]
manager.lsb = 0xFF

flags = Msg.Flags(Msg.Flags.Type.DIRECT_ACK, False)
msg = Msg.InpStandard(device_db.addr, device_db.addr, flags, 0x2B, 0x08)
manager.handle_get_lsb(msg, callback)
Expand All @@ -165,5 +165,5 @@ class MockDevice:
def __init__(self):
self.msgs = []

def send(self, msg, handler):
self.msgs.append(msg)
def send(self, msg, handler, high_priority=False, after=None):
self.msgs.append(msg)
2 changes: 1 addition & 1 deletion tests/handler/test_ModemDbGet.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def callback(success, msg, done):


class MockProtocol:
def send(self, msg, handler):
def send(self, msg, handler, high_priority=False, after=None):
self.sent = msg
self.handler = handler

Expand Down

0 comments on commit 1593430

Please sign in to comment.