Skip to content

Commit

Permalink
Merge pull request #136 from NebraLtd/posterzh/fix_diagnostics_error
Browse files Browse the repository at this point in the history
fix: make DiagnosticsCharacteristic return correct protobuf always
  • Loading branch information
posterzh authored Nov 26, 2021
2 parents 75ad4d0 + 3108262 commit 8cc544f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,68 @@
DBUS_LOAD_SLEEP_SECONDS = 0.1
logger = get_logger(__name__)


class DiagnosticsCharacteristic(Characteristic):
# Returns proto of eth, wifi, fw, ip, p2pstatus

def __init__(self, service, eth0_mac_address, wlan0_mac_address):
def __init__(self, service, eth0_mac_address, wlan0_mac_address, firmware_version):
Characteristic.__init__(
self, constants.DIAGNOSTICS_CHARACTERISTIC_UUID,
["read"], service)
self.add_descriptor(DiagnosticsDescriptor(self))
self.add_descriptor(OpaqueStructureDescriptor(self))
self.p2pstatus = False
self.eth0_mac_address = eth0_mac_address
self.wlan0_mac_address = wlan0_mac_address

self.new_diagnostics_proto(eth0_mac_address.replace(":", ""),
wlan0_mac_address.replace(":", ""),
firmware_version)

def ReadValue(self, options):
logger.debug('Read diagnostics')
try:
self.p2pstatus = self.get_p2pstatus()
diagnostics = self.build_diagnostics_proto().SerializeToString()
logger.debug("Diagnostics are %s" % diagnostics)
return string_to_dbus_byte_array(diagnostics)
except Exception:
logger.exception("Unexpected exception while trying to read diagnostics")
p2pstatus = self.get_p2pstatus()

# Update self.diagnostics_proto
self.update_diagnostics_proto(p2pstatus)
except Exception as ex:
logger.exception("Unexpected exception while trying to read diagnostics: %s" % str(ex))

diagnostics = self.diagnostics_proto.SerializeToString()
logger.debug("Diagnostics are %s" % diagnostics)
return string_to_dbus_byte_array(diagnostics)

# Make a new diagnostics_pb2.diagnostics_v1
def new_diagnostics_proto(self,
eth0_mac_address,
wlan0_mac_address,
firmware_version):
logger.debug('New Diagnostics Proto')

self.diagnostics_proto = diagnostics_pb2.diagnostics_v1()
self.diagnostics_proto.diagnostics['connected'] = DBUS_UNAVAILABLE_VALUE
self.diagnostics_proto.diagnostics['dialable'] = DBUS_UNAVAILABLE_VALUE
self.diagnostics_proto.diagnostics['height'] = DBUS_UNAVAILABLE_VALUE
self.diagnostics_proto.diagnostics['nat_type'] = DBUS_UNAVAILABLE_VALUE

self.diagnostics_proto.diagnostics['eth'] = str(eth0_mac_address)
self.diagnostics_proto.diagnostics['wifi'] = str(wlan0_mac_address)
self.diagnostics_proto.diagnostics['fw'] = str(firmware_version)
self.diagnostics_proto.diagnostics['ip'] = ""

# Update diagnostics_proto member variable
def update_diagnostics_proto(self, p2pstatus):
logger.debug('Update Diagnostics Proto')

if p2pstatus:
try:
self.diagnostics_proto.diagnostics['connected'] = str(p2pstatus[0][1])
self.diagnostics_proto.diagnostics['dialable'] = str(p2pstatus[1][1])
self.diagnostics_proto.diagnostics['height'] = str(p2pstatus[3][1])
self.diagnostics_proto.diagnostics['nat_type'] = str(p2pstatus[2][1])
except Exception as ex:
logger.exception("Unexpected exception while trying to read p2pstatus")
raise ex

self.diagnostics_proto.diagnostics['ip'] = self.get_ip()

# Returns the p2pstatus or an empty string if there is a dbus failure
def get_p2pstatus(self):
Expand All @@ -50,39 +90,19 @@ def get_p2pstatus(self):
miner_interface = dbus.Interface(miner_object, 'com.helium.Miner')
sleep(DBUS_LOAD_SLEEP_SECONDS)
logger.debug('Diagnostics p2pstatus')

try:
p2pstatus = miner_interface.P2PStatus()
logger.debug('DBUS P2P SUCCEED')
logger.debug(self.p2pstatus)
except dbus.exceptions.DBusException:
logger.debug(p2pstatus)
except dbus.exceptions.DBusException as ex:
p2pstatus = False
logger.warn('DBUS P2P FAIL')
raise ex

logger.debug("p2pstatus: %s" % p2pstatus)
return p2pstatus

# Returns a diagnostics_pb2.diagnostics_v1
def build_diagnostics_proto(self):
diagnostics_proto = diagnostics_pb2.diagnostics_v1()
if not self.p2pstatus:
diagnostics_proto.diagnostics['connected'] = DBUS_UNAVAILABLE_VALUE
diagnostics_proto.diagnostics['dialable'] = DBUS_UNAVAILABLE_VALUE
diagnostics_proto.diagnostics['height'] = DBUS_UNAVAILABLE_VALUE
diagnostics_proto.diagnostics['nat_type'] = DBUS_UNAVAILABLE_VALUE
else:
diagnostics_proto.diagnostics['connected'] = str(self.p2pstatus[0][1])
diagnostics_proto.diagnostics['dialable'] = str(self.p2pstatus[1][1])
diagnostics_proto.diagnostics['height'] = str(self.p2pstatus[3][1])
diagnostics_proto.diagnostics['nat_type'] = str(self.p2pstatus[2][1])

diagnostics_proto.diagnostics['eth'] = self.eth0_mac_address.replace(":", "")
diagnostics_proto.diagnostics['wifi'] = self.wlan0_mac_address.replace(":", "")
diagnostics_proto.diagnostics['fw'] = os.getenv('FIRMWARE_VERSION')
diagnostics_proto.diagnostics['ip'] = self.get_ip()
logger.debug('All attibutes were added to the diagnostics proto')
return diagnostics_proto

# Return the ETH IP address, or WLAN if it does not exist
# 0.0.0.0 is the default value if neither ETH or WLAN IP available
def get_ip(self):
Expand All @@ -95,7 +115,7 @@ def get_ip(self):
except KeyError:
pass

ip_address = "0.0.0.0" # nosec
ip_address = ""
if('eth_ip' in locals()):
logger.debug("Using ETH IP address %s" % eth_ip)
ip_address = str(eth_ip)
Expand Down
4 changes: 2 additions & 2 deletions gatewayconfig/bluetooth/services/helium_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, index, eth0_mac_address, wlan0_mac_address, firmware_version,
self.add_characteristic(PublicKeyCharacteristic(self, shared_state))
self.add_characteristic(WifiServicesCharacteristic(self, shared_state))
self.add_characteristic(WifiConfiguredServicesCharacteristic(self, shared_state))
self.add_characteristic(DiagnosticsCharacteristic(self, eth0_mac_address, wlan0_mac_address))
self.add_characteristic(DiagnosticsCharacteristic(self, eth0_mac_address, wlan0_mac_address, firmware_version))
self.add_characteristic(MacAddressCharacteristic(self, eth0_mac_address))
self.add_characteristic(LightsCharacteristic(self))
self.add_characteristic(WifiSSIDCharacteristic(self, shared_state))
Expand All @@ -34,4 +34,4 @@ def __init__(self, index, eth0_mac_address, wlan0_mac_address, firmware_version,
self.add_characteristic(WifiConnectCharacteristic(self))
self.add_characteristic(EthernetOnlineCharacteristic(self, ethernet_is_online_filepath))
self.add_characteristic(SoftwareVersionCharacteristic(self, firmware_version))
self.add_characteristic(WifiRemoveCharacteristic(self))
self.add_characteristic(WifiRemoveCharacteristic(self))
62 changes: 40 additions & 22 deletions tests/gatewayconfig/bluetooth/test_diagnostics_characteristic.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,54 @@
import pytest
import sys
import uuid
from io import StringIO
from lib.cputemp.service import Service

from unittest import TestCase
import dbus
import dbus.mainloop.glib
from unittest.mock import patch, mock_open

from gatewayconfig.bluetooth.characteristics.diagnostics_characteristic import DiagnosticsCharacteristic

from unittest.mock import patch, Mock
from gatewayconfig.bluetooth.characteristics.diagnostics_characteristic \
import DiagnosticsCharacteristic

# Should correspond with BluetoothConnectionAdvertisement.ADVERTISEMENT_SERVICE_UUID
DEFAULT_SERVICE_UUID = '0fda92b2-44a2-4af2-84f5-fa682baa2b8d'
VALID_LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
INVALID_LE_ADVERTISEMENT_IFACE = 'org.fake.iface'


class TestDiagnosticCharacteristic(TestCase):

# Prevent error log diff from being trimmed
maxDiff = None

@classmethod
def setUpClass(cls):
cls.service = Service(200, '1111', True)

def test_instantiation(self):
service = Service(200, '1111', True)
diagnostics_characteristic = DiagnosticsCharacteristic(service, 'A1:B2:C3:DD:E5:F6', 'B1:B2:C3:DD:E5:F6')
self.assertEqual(
diagnostics_characteristic.eth0_mac_address,
'A1:B2:C3:DD:E5:F6'
)

self.assertEqual(
diagnostics_characteristic.wlan0_mac_address,
'B1:B2:C3:DD:E5:F6'
)
diagnostics_characteristic = DiagnosticsCharacteristic(self.service,
'A1:B2:C3:DD:E5:F6',
'B1:B2:C3:DD:E5:F6',
'2021.06.26.4')
self.assertIsInstance(diagnostics_characteristic, DiagnosticsCharacteristic)

@patch('dbus.SessionBus')
@patch('dbus.Interface', return_value=Mock(
P2PStatus=Mock(return_value={
'connected': '1',
'dialable': '0',
'height': '100',
'nat_type': 'NAT',
})))
def test_get_p2pstatus(self, mock_dbus_session_bus, mock_dbus_interface):
diagnostics_characteristic = DiagnosticsCharacteristic(self.service,
'A1:B2:C3:DD:E5:F6',
'B1:B2:C3:DD:E5:F6',
'2021.06.26.4')
p2pstatus = diagnostics_characteristic.get_p2pstatus()
print("p2pstatus: %s" % p2pstatus)

expected = {
'connected': '1',
'dialable': '0',
'height': '100',
'nat_type': 'NAT',
}
self.assertDictEqual(p2pstatus, expected)

mock_dbus_session_bus.assert_called()
mock_dbus_interface.assert_called()

0 comments on commit 8cc544f

Please sign in to comment.