forked from bluepixel00/HomeAssistant_Denon_RS232
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdenon232_receiver.py
65 lines (54 loc) · 2.15 KB
/
denon232_receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""
Denon RS232 interface to control the receiver.
Based off:
https://github.com/home-assistant/home-assistant/blob/dev/homeassistant/components/media_player/denon.py#L228
https://github.com/joopert/nad_receiver/blob/master/nad_receiver/__init__.py
Not all receivers have all functions.
Functions can be found on in the xls file within this repository
"""
import codecs
import socket
from time import sleep
import serial
import telnetlib
import threading
import logging
DEFAULT_TIMEOUT = 1
DEFAULT_WRITE_TIMEOUT = 1
_LOGGER = logging.getLogger(__name__)
class Denon232Receiver(object):
"""Denon232 receiver."""
def __init__(self, serial_port, timeout=DEFAULT_TIMEOUT,
write_timeout=DEFAULT_WRITE_TIMEOUT):
"""Create RS232 connection."""
self.ser = serial.Serial(serial_port, baudrate=9600, bytesize=8, parity='N', stopbits=1, timeout=timeout, write_timeout=write_timeout)
self.lock = threading.Lock()
def serial_command(self, cmd, response=False, all_lines=False):
_LOGGER.debug('Command: %s ', cmd)
if not self.ser.is_open:
self.ser.open()
try:
self.lock.acquire()
# Denon uses the suffix \r, so add those to the above cmd.
final_command = ''.join([cmd, '\r']).encode('utf-8')
#_LOGGER.debug('Final Command (encoded): %s ', final_command)
#Write data to serial port
#self.ser.reset_output_buffer()
self.ser.write(final_command)
#Read data from serial port
if response:
lines = []
while True:
line = self.ser.read_until(bytes('\r'.encode('utf-8')))
if not line:
break
#_LOGGER.debug('Received (encoded): %s ', line)
lines.append(line.decode().strip())
_LOGGER.debug("Received: %s", line.decode().strip())
if all_lines:
return lines
return lines[0] if lines else ''
else:
return None
finally:
self.lock.release()