import os, sys os.chdir(os.path.dirname(__file__)) import uModBusFunctions as functions import uModBusConst as Const import serial import RPi.GPIO as GPIO import struct import time class uModBusSerial: def __init__(self, uart_id, baudrate=9600, data_bits=serial.EIGHTBITS, stop_bits=serial.STOPBITS_ONE, PARITY = serial.PARITY_NONE, ctrl_pin=None): self._uart = serial.Serial(port=uart_id, baudrate=baudrate, bytesize=data_bits, parity=PARITY, stopbits=stop_bits, timeout=None) if ctrl_pin is not None: self._ctrlPin = ctrl_pin GPIO.setwarnings(False) GPIO.setmode(GPIO.BOARD) GPIO.setup(self._ctrlPin, GPIO.OUT) else: self._ctrlPin = None self.char_time_ms = (1000 * (data_bits + stop_bits + 2)) // baudrate def _calculate_crc16(self, data): crc = 0xFFFF for char in data: crc = (crc >> 8) ^ Const.CRC16_TABLE[((crc) ^ char) & 0xFF] return struct.pack('<H',crc) def _bytes_to_bool(self, byte_list): bool_list = [] for index, byte in enumerate(byte_list): bool_list.extend([bool(byte & (1 << n)) for n in range(8)]) return bool_list def _to_short(self, byte_array, signed=True): response_quantity = int(len(byte_array) / 2) fmt = '>' + (('h' if signed else 'H') * response_quantity) return struct.unpack(fmt, byte_array) def _exit_read(self, response): if response[1] >= Const.ERROR_BIAS: if len(response) < Const.ERROR_RESP_LEN: return False elif (Const.READ_COILS <= response[1] <= Const.READ_INPUT_REGISTER): expected_len = Const.RESPONSE_HDR_LENGTH + 1 + response[2] + Const.CRC_LENGTH if len(response) < expected_len: return False elif len(response) < Const.FIXED_RESP_LEN: return False return True def _uart_read(self): response = bytearray() for x in range(1, 40): response.extend(self._uart.read_all()) # variable length function codes may require multiple reads if len(response)>=1 and self._exit_read(response): break time.sleep(0.05) return response def _send_receive(self, modbus_pdu, slave_addr, count): serial_pdu = bytearray() serial_pdu.append(slave_addr) serial_pdu.extend(modbus_pdu) crc = self._calculate_crc16(serial_pdu) serial_pdu.extend(crc) # flush the Rx FIFO self._uart.read_all() if self._ctrlPin: GPIO.output(self._ctrlPin, 1) self._uart.write(serial_pdu) if self._ctrlPin: time.sleep((1 + self.char_time_ms*len(serial_pdu))/1000) #time.sleep(1) GPIO.output(self._ctrlPin, 0) return self._validate_resp_hdr(self._uart_read(), slave_addr, modbus_pdu[0], count) def _validate_resp_hdr(self, response, slave_addr, function_code, count): resp_crc = response[-Const.CRC_LENGTH:] expected_crc = self._calculate_crc16(response[0:len(response) - Const.CRC_LENGTH]) if (resp_crc[0] != expected_crc[0]) or (resp_crc[1] != expected_crc[1]): print('invalid response CRC') if (response[0] != slave_addr): print('wrong slave address') if (response[1] == (function_code + Const.ERROR_BIAS)): print('slave returned exception code: {:d}'.format(response[2])) hdr_length = (Const.RESPONSE_HDR_LENGTH + 1) if count else Const.RESPONSE_HDR_LENGTH return response[hdr_length : len(response) - Const.CRC_LENGTH] def read_coils(self, slave_addr, starting_addr, coil_qty): modbus_pdu = functions.read_coils(starting_addr, coil_qty) resp_data = self._send_receive(modbus_pdu, slave_addr, True) status_pdu = self._bytes_to_bool(resp_data) return status_pdu def read_discrete_inputs(self, slave_addr, starting_addr, input_qty): modbus_pdu = functions.read_discrete_inputs(starting_addr, input_qty) resp_data = self._send_receive(modbus_pdu, slave_addr, True) status_pdu = self._bytes_to_bool(resp_data) return status_pdu def read_holding_registers(self, slave_addr, starting_addr, register_qty, signed=True): modbus_pdu = functions.read_holding_registers(starting_addr, register_qty) resp_data = self._send_receive(modbus_pdu, slave_addr, True) register_value = self._to_short(resp_data, signed) return register_value def read_input_registers(self, slave_addr, starting_address, register_quantity, signed=True): modbus_pdu = functions.read_input_registers(starting_address, register_quantity) resp_data = self._send_receive(modbus_pdu, slave_addr, True) register_value = self._to_short(resp_data, signed) return register_value def write_single_coil(self, slave_addr, output_address, output_value): modbus_pdu = functions.write_single_coil(output_address, output_value) resp_data = self._send_receive(modbus_pdu, slave_addr, False) operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_COIL, output_address, value=output_value, signed=False) return operation_status def write_single_register(self, slave_addr, register_address, register_value, signed=True): modbus_pdu = functions.write_single_register(register_address, register_value, signed) resp_data = self._send_receive(modbus_pdu, slave_addr, False) operation_status = functions.validate_resp_data(resp_data, Const.WRITE_SINGLE_REGISTER, register_address, value=register_value, signed=signed) return operation_status def write_multiple_coils(self, slave_addr, starting_address, output_values): modbus_pdu = functions.write_multiple_coils(starting_address, output_values) resp_data = self._send_receive(modbus_pdu, slave_addr, False) operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_COILS, starting_address, quantity=len(output_values)) return operation_status def write_multiple_registers(self, slave_addr, starting_address, register_values, signed=True): modbus_pdu = functions.write_multiple_registers(starting_address, register_values, signed) resp_data = self._send_receive(modbus_pdu, slave_addr, False) operation_status = functions.validate_resp_data(resp_data, Const.WRITE_MULTIPLE_REGISTERS, starting_address, quantity=len(register_values)) return operation_status def close(self): if self._uart == None: return try: self._uart.deinit() except Exception: pass