From 2fc05b21536ac95ac08cbb36822232239923e08a Mon Sep 17 00:00:00 2001 From: Sujin Kang Date: Thu, 17 Jun 2021 14:03:37 -0700 Subject: [PATCH] Refactor Pcied and add unittest (#189) Description Refactor the pcied and add the unit test Motivation and Context Added unit test to increase the pmon unit test coverage. How Has This Been Tested? Build with unit test enabled and run manually on a dut to verify the pcied. --- sonic-pcied/pytest.ini | 2 + sonic-pcied/scripts/pcied | 265 ++++++++++-------- sonic-pcied/setup.cfg | 2 + sonic-pcied/setup.py | 12 + sonic-pcied/tests/__init__.py | 0 sonic-pcied/tests/mock_platform.py | 48 ++++ .../tests/mocked_libs/swsscommon/__init__.py | 5 + .../mocked_libs/swsscommon/swsscommon.py | 54 ++++ sonic-pcied/tests/test_DaemonPcied.py | 226 +++++++++++++++ sonic-pcied/tests/test_pcied.py | 43 +++ 10 files changed, 542 insertions(+), 115 deletions(-) create mode 100644 sonic-pcied/pytest.ini create mode 100644 sonic-pcied/setup.cfg create mode 100644 sonic-pcied/tests/__init__.py create mode 100644 sonic-pcied/tests/mock_platform.py create mode 100644 sonic-pcied/tests/mocked_libs/swsscommon/__init__.py create mode 100644 sonic-pcied/tests/mocked_libs/swsscommon/swsscommon.py create mode 100644 sonic-pcied/tests/test_DaemonPcied.py create mode 100644 sonic-pcied/tests/test_pcied.py diff --git a/sonic-pcied/pytest.ini b/sonic-pcied/pytest.ini new file mode 100644 index 000000000000..d90ee9ed9e12 --- /dev/null +++ b/sonic-pcied/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +addopts = --cov=scripts --cov-report html --cov-report term --cov-report xml --junitxml=test-results.xml -vv diff --git a/sonic-pcied/scripts/pcied b/sonic-pcied/scripts/pcied index 0f2a5208c68b..7265063ff894 100644 --- a/sonic-pcied/scripts/pcied +++ b/sonic-pcied/scripts/pcied @@ -5,30 +5,62 @@ PCIe device monitoring daemon for SONiC """ -try: - import os - import signal - import sys - import threading +import os +import signal +import sys +import threading - from sonic_py_common import daemon_base, device_info - from swsscommon import swsscommon -except ImportError as e: - raise ImportError(str(e) + " - required module not found") +from sonic_py_common import daemon_base, device_info +from swsscommon import swsscommon # # Constants ==================================================================== # + +# TODO: Once we no longer support Python 2, we can eliminate this and get the +# name using the 'name' field (e.g., `signal.SIGINT.name`) starting with Python 3.5 +SIGNALS_TO_NAMES_DICT = dict((getattr(signal, n), n) + for n in dir(signal) if n.startswith('SIG') and '_' not in n) + SYSLOG_IDENTIFIER = "pcied" PCIE_RESULT_REGEX = "PCIe Device Checking All Test" -PCIE_TABLE_NAME = "PCIE_STATUS" PCIE_DEVICE_TABLE_NAME = "PCIE_DEVICE" - -PCIE_CONF_FILE = 'pcie.yaml' +PCIE_STATUS_TABLE_NAME = "PCIE_DEVICES" PCIED_MAIN_THREAD_SLEEP_SECS = 60 -REDIS_HOSTIP = "127.0.0.1" + +PCIEUTIL_CONF_FILE_ERROR = 1 +PCIEUTIL_LOAD_ERROR = 2 + +platform_pcieutil = None + +exit_code = 0 + +# wrapper functions to call the platform api +def load_platform_pcieutil(): + _platform_pcieutil = None + (platform_path, _) = device_info.get_paths_to_platform_and_hwsku_dirs() + try: + from sonic_platform.pcie import Pcie + _platform_pcieutil = Pcie(platform_path) + except ImportError as e: + self.log_error("Failed to load platform Pcie module. Error : {}".format(str(e)), True) + try: + from sonic_platform_base.sonic_pcie.pcie_common import PcieUtil + _platform_pcieutil = PcieUtil(platform_path) + except ImportError as e: + self.log_error("Failed to load default PcieUtil module. Error : {}".format(str(e)), True) + return _platform_pcieutil + +def read_id_file(device_name): + id = None + dev_id_path = '/sys/bus/pci/devices/0000:%s/device' % device_name + + if os.path.exists(dev_id_path): + with open(dev_id_path, 'r') as fd: + id = fd.read().strip() + return id # # Daemon ======================================================================= @@ -39,134 +71,129 @@ class DaemonPcied(daemon_base.DaemonBase): def __init__(self, log_identifier): super(DaemonPcied, self).__init__(log_identifier) - (platform_path, _) = device_info.get_paths_to_platform_and_hwsku_dirs() - pciefilePath = os.path.join(platform_path, PCIE_CONF_FILE) - if not os.path.exists(pciefilePath): - self.log_error("Platform pcie configuration file doesn't exist! Exiting ...") - sys.exit("Platform PCIe Configuration file doesn't exist!") - self.timeout = PCIED_MAIN_THREAD_SLEEP_SECS self.stop_event = threading.Event() - - self.state_db = swsscommon.SonicV2Connector(host=REDIS_HOSTIP) - self.state_db.connect("STATE_DB") - state_db = daemon_base.db_connect("STATE_DB") - self.device_table = swsscommon.Table(state_db, PCIE_DEVICE_TABLE_NAME) - - # Load AER-fields into STATEDB - def update_aer_to_statedb(self, device_name, aer_stats): + self.state_db = None + self.device_table = None + self.table = None + self.resultInfo = [] + self.device_name = None + self.aer_stats = {} + + global platform_pcieutil + + platform_pcieutil = load_platform_pcieutil() + if platform_pcieutil is None: + sys.exit(PCIEUTIL_LOAD_ERROR) + + # Connect to STATE_DB and create pcie device table + self.state_db = daemon_base.db_connect("STATE_DB") + self.device_table = swsscommon.Table(self.state_db, PCIE_DEVICE_TABLE_NAME) + self.status_table = swsscommon.Table(self.state_db, PCIE_STATUS_TABLE_NAME) + + def __del__(self): + if self.device_table: + table_keys = self.device_table.getKeys() + for tk in table_keys: + self.device_table._del(tk) + if self.status_table: + stable_keys = self.status_table.getKeys() + for stk in stable_keys: + self.status_table._del(stk) + + # load aer-fields into statedb + def update_aer_to_statedb(self): + if self.aer_stats is None: + self.log_debug("PCIe device {} has no AER Stats".format(device_name)) + return aer_fields = {} - for field, value in aer_stats['correctable'].items(): - correctable_field = "correctable|" + field - aer_fields[correctable_field] = value - - for field, value in aer_stats['fatal'].items(): - fatal_field = "fatal|" + field - aer_fields[fatal_field] = value - - for field, value in aer_stats['non_fatal'].items(): - non_fatal_field = "non_fatal|" + field - aer_fields[non_fatal_field] = value + for key, fv in self.aer_stats.items(): + for field, value in fv.items(): + key_field = "{}|{}".format(key,field) + aer_fields[key_field] = value if aer_fields: formatted_fields = swsscommon.FieldValuePairs(list(aer_fields.items())) - self.device_table.set(device_name, formatted_fields) + self.device_table.set(self.device_name, formatted_fields) else: - self.log_debug("PCIe device {} has no AER attriutes".format(device_name)) + self.log_debug("PCIe device {} has no AER attriutes".format(self.device_name)) - # Check the PCIe devices - def check_pcie_devices(self): - try: - platform_path, _ = device_info.get_paths_to_platform_and_hwsku_dirs() - from sonic_platform_base.sonic_pcie.pcie_common import PcieUtil - platform_pcieutil = PcieUtil(platform_path) - except ImportError as e: - self.log_error("Failed to load default PcieUtil module. Error : {}".format(str(e)), True) - raise e - resultInfo = platform_pcieutil.get_pcie_check() - err = 0 + # Check the PCIe AER Stats + def check_n_update_pcie_aer_stats(self, Bus, Dev, Fn): + self.device_name = "%02x:%02x.%d" % (Bus, Dev, Fn) - for item in resultInfo: - if item["result"] == "Failed": - self.log_warning("PCIe Device: " + item["name"] + " Not Found") - err += 1 + Id = read_id_file(self.device_name) + self.aer_stats = {} + if Id is not None: + self.device_table.set(self.device_name, [('id', Id)]) + self.aer_stats = platform_pcieutil.get_pcie_aer_stats(bus=Bus, dev=Dev, func=Fn) + self.update_aer_to_statedb() + + + # Update the PCIe devices status to DB + def update_pcie_devices_status_db(self, err): if err: - self.update_state_db("PCIE_DEVICES", "status", "FAILED") - self.log_error("PCIe device status check : FAILED") + pcie_status = "FAILED" + self.log_error("PCIe device status check : {}".format(pcie_status)) else: - self.update_state_db("PCIE_DEVICES", "status", "PASSED") - self.log_info("PCIe device status check : PASSED") + pcie_status = "PASSED" + self.log_info("PCIe device status check : {}".format(pcie_status)) + fvs = swsscommon.FieldValuePairs([ + ('status', pcie_status) + ]) - # update AER-attributes to DB - for item in resultInfo: - if item["result"] == "Failed": - continue + self.status_table.set("status", fvs) - Bus = int(item["bus"], 16) - Dev = int(item["dev"], 16) - Fn = int(item["fn"], 16) + # Check the PCIe devices + def check_pcie_devices(self): + self.resultInfo = platform_pcieutil.get_pcie_check() + err = 0 + if self.resultInfo is None: + return - device_name = "%02x:%02x.%d" % (Bus, Dev, Fn) - dev_id_path = '/sys/bus/pci/devices/0000:%s/device' % device_name - with open(dev_id_path, 'r') as fd: - Id = fd.read().strip() + for result in self.resultInfo: + if result["result"] == "Failed": + self.log_warning("PCIe Device: " + result["name"] + " Not Found") + err += 1 + else: + Bus = int(result["bus"], 16) + Dev = int(result["dev"], 16) + Fn = int(result["fn"], 16) + # update AER-attributes to DB + self.check_n_update_pcie_aer_stats(Bus, Dev, Fn) - self.device_table.set(device_name, [('id', Id)]) - aer_stats = platform_pcieutil.get_pcie_aer_stats(bus=Bus, device=Dev, func=Fn) - self.update_aer_to_statedb(device_name, aer_stats) + # update PCIe Device Status to DB + self.update_pcie_devices_status_db(err) - def read_state_db(self, key1, key2): - return self.state_db.get('STATE_DB', key1, key2) + # Override signal handler from DaemonBase + def signal_handler(self, sig, frame): + FATAL_SIGNALS = [signal.SIGINT, signal.SIGTERM] + NONFATAL_SIGNALS = [signal.SIGHUP] - def update_state_db(self, key1, key2, value): - self.state_db.set('STATE_DB', key1, key2, value) + global exit_code - # Signal handler - def signal_handler(self, sig, frame): - if sig == signal.SIGHUP: - self.log_info("Caught SIGHUP - ignoring...") - elif sig == signal.SIGINT: - self.log_info("Caught SIGINT - exiting...") - self.stop_event.set() - elif sig == signal.SIGTERM: - self.log_info("Caught SIGTERM - exiting...") + if sig in FATAL_SIGNALS: + self.log_info("Caught signal '{}' - exiting...".format(SIGNALS_TO_NAMES_DICT[sig])) + exit_code = 128 + sig # Make sure we exit with a non-zero code so that supervisor will try to restart us self.stop_event.set() + elif sig in NONFATAL_SIGNALS: + self.log_info("Caught signal '{}' - ignoring...".format(SIGNALS_TO_NAMES_DICT[sig])) else: - self.log_warning("Caught unhandled signal '" + sig + "'") + self.log_warning("Caught unhandled signal '{}' - ignoring...".format(SIGNALS_TO_NAMES_DICT[sig])) - # Initialize daemon - def init(self): - self.log_info("Start daemon init...") - - # Deinitialize daemon - def deinit(self): - self.log_info("Start daemon deinit...") - - # Run daemon + # Main daemon logic def run(self): - self.log_info("Starting up...") - - # Start daemon initialization sequence - self.init() - - # Start main loop - self.log_info("Start daemon main loop") - - while not self.stop_event.wait(self.timeout): - # Check the Pcie device status - self.check_pcie_devices() - - self.log_info("Stop daemon main loop") + if self.stop_event.wait(self.timeout): + # We received a fatal signal + return False - # Start daemon deinitialization sequence - self.deinit() - - self.log_info("Shutting down...") + self.check_pcie_devices() + return True # # Main ========================================================================= # @@ -174,7 +201,15 @@ class DaemonPcied(daemon_base.DaemonBase): def main(): pcied = DaemonPcied(SYSLOG_IDENTIFIER) - pcied.run() + + pcied.log_info("Starting up...") + + while pcied.run(): + pass + + pcied.log_info("Shutting down...") + + return exit_code if __name__ == '__main__': - main() + sys.exit(main()) diff --git a/sonic-pcied/setup.cfg b/sonic-pcied/setup.cfg new file mode 100644 index 000000000000..b7e478982ccf --- /dev/null +++ b/sonic-pcied/setup.cfg @@ -0,0 +1,2 @@ +[aliases] +test=pytest diff --git a/sonic-pcied/setup.py b/sonic-pcied/setup.py index 0d7d32118422..7c7cbc1464b1 100644 --- a/sonic-pcied/setup.py +++ b/sonic-pcied/setup.py @@ -14,8 +14,19 @@ 'scripts/pcied', ], setup_requires=[ + 'pytest-runner', 'wheel' ], + install_requires=[ + 'enum34; python_version < "3.4"', + 'sonic-py-common', + ], + tests_requires=[ + 'mock>=2.0.0; python_version < "3.3"', + 'pytest', + 'pytest-cov', + 'sonic-platform-common' + ], classifiers=[ 'Development Status :: 4 - Beta', 'Environment :: No Input/Output (Daemon)', @@ -29,4 +40,5 @@ 'Topic :: System :: Hardware', ], keywords='sonic SONiC PCIe pcie PCIED pcied', + test_suite='setup.get_test_suite' ) diff --git a/sonic-pcied/tests/__init__.py b/sonic-pcied/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sonic-pcied/tests/mock_platform.py b/sonic-pcied/tests/mock_platform.py new file mode 100644 index 000000000000..7d8a32a90852 --- /dev/null +++ b/sonic-pcied/tests/mock_platform.py @@ -0,0 +1,48 @@ + +""" + Mock implementation of sonic_platform package for unit testing +""" + +# TODO: Clean this up once we no longer need to support Python 2 +import sys +if sys.version_info.major == 3: + from unittest import mock +else: + import mock + + + +pcie_device_list = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A'}] +""" + +pcie_check_result = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A', 'result': 'Passed'}] +""" + +pcie_aer_stats = \ +""" +{'correctable': {}, 'fatal': {}, 'non_fatal': {}} +""" + +#class MockPcieUtil(PcieUtil): +class MockPcieUtil(): + def __init__(self, + pciList=pcie_device_list, + result=pcie_check_result, + aer_stats=pcie_aer_stats): + super(MockPcieUtil, self).__init__() + self._pciList = pciList + self._result = result + self._aer_stats = aer_stats + + def get_pcie_device(self): + return self._pciList + + def get_pcie_check(self): + return self._result + + def get_pcie_aer_stats(self, domain, bus, dev, fn): + return self._aer_stats diff --git a/sonic-pcied/tests/mocked_libs/swsscommon/__init__.py b/sonic-pcied/tests/mocked_libs/swsscommon/__init__.py new file mode 100644 index 000000000000..012af621e5f0 --- /dev/null +++ b/sonic-pcied/tests/mocked_libs/swsscommon/__init__.py @@ -0,0 +1,5 @@ +''' + Mock implementation of swsscommon package for unit testing +''' + +from . import swsscommon diff --git a/sonic-pcied/tests/mocked_libs/swsscommon/swsscommon.py b/sonic-pcied/tests/mocked_libs/swsscommon/swsscommon.py new file mode 100644 index 000000000000..6947a8601819 --- /dev/null +++ b/sonic-pcied/tests/mocked_libs/swsscommon/swsscommon.py @@ -0,0 +1,54 @@ +''' + Mock implementation of swsscommon package for unit testing +''' + +STATE_DB = '' + + +class Table: + def __init__(self, db, table_name): + self.table_name = table_name + self.mock_dict = {} + + def _del(self, key): + del self.mock_dict[key] + pass + + def set(self, key, fvs): + self.mock_dict[key] = fvs.fv_dict + pass + + def get(self, key): + if key in self.mock_dict: + return self.mock_dict[key] + return None + + def get_size(self): + return (len(self.mock_dict)) + + +class FieldValuePairs: + fv_dict = {} + + def __init__(self, tuple_list): + if isinstance(tuple_list, list) and isinstance(tuple_list[0], tuple): + self.fv_dict = dict(tuple_list) + + def __setitem__(self, key, kv_tuple): + self.fv_dict[kv_tuple[0]] = kv_tuple[1] + + def __getitem__(self, key): + return self.fv_dict[key] + + def __eq__(self, other): + if not isinstance(other, FieldValuePairs): + # don't attempt to compare against unrelated types + return NotImplemented + + return self.fv_dict == other.fv_dict + + def __repr__(self): + return repr(self.fv_dict) + + def __str__(self): + return repr(self.fv_dict) diff --git a/sonic-pcied/tests/test_DaemonPcied.py b/sonic-pcied/tests/test_DaemonPcied.py new file mode 100644 index 000000000000..24044767e888 --- /dev/null +++ b/sonic-pcied/tests/test_DaemonPcied.py @@ -0,0 +1,226 @@ +import datetime +import os +import sys +from imp import load_source # Replace with importlib once we no longer need to support Python 2 + +import pytest + +# TODO: Clean this up once we no longer need to support Python 2 +if sys.version_info.major == 3: + from unittest import mock +else: + import mock + +from sonic_py_common import daemon_base + +from .mock_platform import MockPcieUtil + +SYSLOG_IDENTIFIER = 'pcied_test' +NOT_AVAILABLE = 'N/A' + +daemon_base.db_connect = mock.MagicMock() + +tests_path = os.path.dirname(os.path.abspath(__file__)) + +# Add mocked_libs path so that the file under test can load mocked modules from there +mocked_libs_path = os.path.join(tests_path, "mocked_libs") +sys.path.insert(0, mocked_libs_path) + +# Add path to the file under test so that we can load it +modules_path = os.path.dirname(tests_path) +scripts_path = os.path.join(modules_path, "scripts") +sys.path.insert(0, modules_path) +load_source('pcied', os.path.join(scripts_path, 'pcied')) +import pcied + +pcie_no_aer_stats = \ +""" +{'correctable': {}, 'fatal': {}, 'non_fatal': {}} +""" + +pcie_aer_stats_no_err = \ +""" +{'correctable': {'field1': '0', 'field2': '0'}, + 'fatal': {'field3': '0', 'field4': '0'}, + 'non_fatal': {'field5': '0', 'field6': '0'}} +""" + +pcie_aer_stats_err = \ +""" +{'correctable': {'field1': '1', 'field2': '0'}, + 'fatal': {'field3': '0', 'field4': '1'}, + 'non_fatal': {'field5': '0', 'field6': '1'}} +""" + +pcie_device_list = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A'}, + {'bus': '00', 'dev': '02', 'fn': '0', 'id': '1f11', 'name': 'PCI B'}, + {'bus': '00', 'dev': '03', 'fn': '0', 'id': '1f13', 'name': 'PCI C'}] +""" + +pcie_check_result_no = [] + +pcie_check_result_pass = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A', 'result': 'Passed'}, + {'bus': '00', 'dev': '02', 'fn': '0', 'id': '1f11', 'name': 'PCI B', 'result': 'Passed'}, + {'bus': '00', 'dev': '03', 'fn': '0', 'id': '1f12', 'name': 'PCI C', 'result': 'Passed'}] +""" + +pcie_check_result_fail = \ +""" +[{'bus': '00', 'dev': '01', 'fn': '0', 'id': '1f10', 'name': 'PCI A', 'result': 'Passed'}, + {'bus': '00', 'dev': '02', 'fn': '0', 'id': '1f11', 'name': 'PCI B', 'result': 'Passed'}, + {'bus': '00', 'dev': '03', 'fn': '0', 'id': '1f12', 'name': 'PCI C', 'result': 'Failed'}] +""" + +class TestDaemonPcied(object): + """ + Test cases to cover functionality in DaemonPcied class + """ + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_signal_handler(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.stop_event.set = mock.MagicMock() + daemon_pcied.log_info = mock.MagicMock() + daemon_pcied.log_warning = mock.MagicMock() + + # Test SIGHUP + daemon_pcied.signal_handler(pcied.signal.SIGHUP, None) + assert daemon_pcied.log_info.call_count == 1 + daemon_pcied.log_info.assert_called_with("Caught signal 'SIGHUP' - ignoring...") + assert daemon_pcied.log_warning.call_count == 0 + assert daemon_pcied.stop_event.set.call_count == 0 + assert pcied.exit_code == 0 + + # Reset + daemon_pcied.log_info.reset_mock() + daemon_pcied.log_warning.reset_mock() + daemon_pcied.stop_event.set.reset_mock() + + # Test SIGINT + test_signal = pcied.signal.SIGINT + daemon_pcied.signal_handler(test_signal, None) + assert daemon_pcied.log_info.call_count == 1 + daemon_pcied.log_info.assert_called_with("Caught signal 'SIGINT' - exiting...") + assert daemon_pcied.log_warning.call_count == 0 + assert daemon_pcied.stop_event.set.call_count == 1 + assert pcied.exit_code == (128 + test_signal) + + # Reset + daemon_pcied.log_info.reset_mock() + daemon_pcied.log_warning.reset_mock() + daemon_pcied.stop_event.set.reset_mock() + + # Test SIGTERM + test_signal = pcied.signal.SIGTERM + daemon_pcied.signal_handler(test_signal, None) + assert daemon_pcied.log_info.call_count == 1 + daemon_pcied.log_info.assert_called_with("Caught signal 'SIGTERM' - exiting...") + assert daemon_pcied.log_warning.call_count == 0 + assert daemon_pcied.stop_event.set.call_count == 1 + assert pcied.exit_code == (128 + test_signal) + + # Reset + daemon_pcied.log_info.reset_mock() + daemon_pcied.log_warning.reset_mock() + daemon_pcied.stop_event.set.reset_mock() + pcied.exit_code = 0 + + # Test an unhandled signal + daemon_pcied.signal_handler(pcied.signal.SIGUSR1, None) + assert daemon_pcied.log_warning.call_count == 1 + daemon_pcied.log_warning.assert_called_with("Caught unhandled signal 'SIGUSR1' - ignoring...") + assert daemon_pcied.log_info.call_count == 0 + assert daemon_pcied.stop_event.set.call_count == 0 + assert pcied.exit_code == 0 + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_run(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.check_pcie_devices = mock.MagicMock() + + daemon_pcied.run() + assert daemon_pcied.check_pcie_devices.call_count == 1 + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_check_pcie_devices(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.update_pcie_devices_status_db = mock.MagicMock() + daemon_pcied.check_n_update_pcie_aer_stats = mock.MagicMock() + pcied.platform_pcieutil.get_pcie_check = mock.MagicMock() + + daemon_pcied.check_pcie_devices() + assert daemon_pcied.update_pcie_devices_status_db.call_count == 1 + assert daemon_pcied.check_n_update_pcie_aer_stats.call_count == 0 + + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_update_pcie_devices_status_db(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.status_table = mock.MagicMock() + daemon_pcied.log_info = mock.MagicMock() + daemon_pcied.log_error = mock.MagicMock() + + # test for pass resultInfo + daemon_pcied.update_pcie_devices_status_db(0) + assert daemon_pcied.status_table.set.call_count == 1 + assert daemon_pcied.log_info.call_count == 1 + assert daemon_pcied.log_error.call_count == 0 + + daemon_pcied.status_table.set.reset_mock() + daemon_pcied.log_info.reset_mock() + + # test for resultInfo with 1 device failed to detect + daemon_pcied.update_pcie_devices_status_db(1) + assert daemon_pcied.status_table.set.call_count == 1 + assert daemon_pcied.log_info.call_count == 0 + assert daemon_pcied.log_error.call_count == 1 + + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + @mock.patch('pcied.read_id_file') + def test_check_n_update_pcie_aer_stats(self, mock_read): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.device_table = mock.MagicMock() + daemon_pcied.update_aer_to_statedb = mock.MagicMock() + pcied.platform_pcieutil.get_pcie_aer_stats = mock.MagicMock() + + mock_read.return_value = None + daemon_pcied.check_n_update_pcie_aer_stats(0,1,0) + assert daemon_pcied.update_aer_to_statedb.call_count == 0 + assert daemon_pcied.device_table.set.call_count == 0 + assert pcied.platform_pcieutil.get_pcie_aer_stats.call_count == 0 + + mock_read.return_value = '1714' + daemon_pcied.check_n_update_pcie_aer_stats(0,1,0) + assert daemon_pcied.update_aer_to_statedb.call_count == 1 + assert daemon_pcied.device_table.set.call_count == 1 + assert pcied.platform_pcieutil.get_pcie_aer_stats.call_count == 1 + + + @mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) + def test_update_aer_to_statedb(self): + daemon_pcied = pcied.DaemonPcied(SYSLOG_IDENTIFIER) + daemon_pcied.log_debug = mock.MagicMock() + daemon_pcied.device_table = mock.MagicMock() + daemon_pcied.device_name = mock.MagicMock() + daemon_pcied.aer_stats = mock.MagicMock() + + + mocked_expected_fvp = pcied.swsscommon.FieldValuePairs( + [("correctable|field1", '0'), + ("correctable|field2", '0'), + ("fatal|field3", '0'), + ("fatal|field4", '0'), + ("non_fatal|field5", '0'), + ("non_fatal|field6", '0'), + ]) + + daemon_pcied.update_aer_to_statedb() + assert daemon_pcied.log_debug.call_count == 1 + assert daemon_pcied.device_table.set.call_count == 0 + + daemon_pcied.device_table.set.reset_mock() diff --git a/sonic-pcied/tests/test_pcied.py b/sonic-pcied/tests/test_pcied.py new file mode 100644 index 000000000000..f3b3e78e9471 --- /dev/null +++ b/sonic-pcied/tests/test_pcied.py @@ -0,0 +1,43 @@ +import os +import sys +from imp import load_source # Replace with importlib once we no longer need to support Python 2 + +import pytest + +# TODO: Clean this up once we no longer need to support Python 2 +if sys.version_info.major == 3: + from unittest import mock +else: + import mock +from sonic_py_common import daemon_base, device_info + +from .mock_platform import MockPcieUtil + +tests_path = os.path.dirname(os.path.abspath(__file__)) + +# Add mocked_libs path so that the file under test can load mocked modules from there +mocked_libs_path = os.path.join(tests_path, "mocked_libs") +sys.path.insert(0, mocked_libs_path) + +# Add path to the file under test so that we can load it +modules_path = os.path.dirname(tests_path) +scripts_path = os.path.join(modules_path, "scripts") +sys.path.insert(0, modules_path) +load_source('pcied', os.path.join(scripts_path, 'pcied')) +import pcied + + +daemon_base.db_connect = mock.MagicMock() + + +SYSLOG_IDENTIFIER = 'pcied_test' +NOT_AVAILABLE = 'N/A' + + +@mock.patch('pcied.load_platform_pcieutil', mock.MagicMock()) +@mock.patch('pcied.DaemonPcied.run') +def test_main(mock_run): + mock_run.return_value = False + + pcied.main() + assert mock_run.call_count == 1