Skip to content

Commit

Permalink
Add the unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xumia committed Jul 6, 2023
1 parent 4dff827 commit 20148ba
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 27 deletions.
29 changes: 12 additions & 17 deletions scripts/hostcfgd
Original file line number Diff line number Diff line change
Expand Up @@ -1801,8 +1801,8 @@ class FipsCfg(object):
"""

def __init__(self, state_db_conn):
self.enabled = False
self.enforced = False
self.enable = False
self.enforce = False
self.restart_services = DEFAULT_FIPS_RESTART_SERVICES
self.state_db_conn = state_db_conn

Expand All @@ -1822,21 +1822,15 @@ class FipsCfg(object):
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: skipped the FIPS config, the FIPS setting is empty.')
return
self.read_config()
mode = common_config.get('mode', '').lower()
self.enabled = False
self.enforced = False
if mode == 'enforce':
self.enabled = True
self.enforced = True
elif mode == 'noneenforce':
self.enabled = True
self.enforce = is_true(common_config.get('enforce', 'false'))
self.enable = self.enforce or is_true(common_config.get('enable', 'false'))
self.update()

def fips_handler(self, data):
self.load(data)

def update(self):
syslog.syslog(syslog.LOG_DEBUG, f'FipsCfg: update fips option enabled: {self.enabled}, enforced: {self.enforced}.')
syslog.syslog(syslog.LOG_DEBUG, f'FipsCfg: update fips option enable: {self.enable}, enforce: {self.enforce}.')
self.update_enforce_config()
self.update_noneenforce_config()
self.state_db_conn.hset('FIPS_STATE|state', 'config_datetime', datetime.utcnow().isoformat())
Expand All @@ -1849,7 +1843,7 @@ class FipsCfg(object):
cur_fips_enabled = f.read().strip()

expected_fips_enabled = '0'
if self.enabled:
if self.enable:
expected_fips_enabled = '1'

# If the runtime config is not as expected, change the config
Expand All @@ -1875,22 +1869,23 @@ class FipsCfg(object):

# Restart the services required and in the running state
output = run_cmd_output(['sudo', 'systemctl', '-t', 'service', '--state=running', '--no-pager', '-o', 'json'])
services = [s for s in json.dumps(output)]
services = [s['unit'] for s in json.loads(output)]
for service in self.restart_services:
if service in services or service + '.service' in services:
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: restart service {service}.')
run_cmd(['sudo', 'systemctl', 'restart', service])


def update_enforce_config(self):
fips_state = run_cmd_output(['sudo', 'sonic-installer', 'get-fips'])
next_enforced = 'enabled' in fips_state
if next_enforced != self.enforced:
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: skipped to configure the enforce option {self.enforced}, since the config has already been set.')
if next_enforced == self.enforce:
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: skipped to configure the enforce option {self.enforce}, since the config has already been set.')
return
fips_option = '--disable-fips'
if self.enforced:
if self.enforce:
fips_option = '--enable-fips'
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: update the FIPS enforce option {self.enforced}.')
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: update the FIPS enforce option {self.enforce}.')
run_cmd(['sudo', 'sonic-installer', 'set-fips', fips_option])

class HostConfigDaemon:
Expand Down
15 changes: 8 additions & 7 deletions scripts/procdockerstatsd
Original file line number Diff line number Diff line change
Expand Up @@ -177,23 +177,24 @@ class ProcDockerStats(daemon_base.DaemonBase):
# Check if FIPS enforced in the current kernel cmdline
with open('/proc/cmdline') as f:
kernel_cmdline = f.read().strip().split(' ')
cur_enforced = 'sonic_fips=1' in kernel_cmdline or 'fips=1' in kernel_cmdline
current_enforced = 'sonic_fips=1' in kernel_cmdline or 'fips=1' in kernel_cmdline

# Check if FIPS enforced in the next kernel cmdline
cmd = ['sonic-installer', 'get-fips']
exitcode, _ = getstatusoutput_noshell(['sudo', 'sonic-installer', 'get-fips'], ['grep', '-i', 'enabled'])
exitcode, _ = getstatusoutput_noshell_pipe(['sudo', 'sonic-installer', 'get-fips'], ['grep', '-i', 'enabled'])
if any(exitcode):
self.log_error("Error running command 'sudo sonic-installer get-fips'")
next_enforced = exitcode == 0
else:
next_enforced = True

# Check if FIPS runtime status
exitcode, _ = getstatusoutput_noshell(['sudo', 'openssl', 'engine', '-vv'], ['grep', '-i', 'symcryp'])
exitcode, _ = getstatusoutput_noshell_pipe(['sudo', 'openssl', 'engine', '-vv'], ['grep', '-i', 'symcryp'])
if any(exitcode):
self.log_error("Error running command 'sudo openssl engine -vv'")
enabled = exitcode == 0
else:
enabled = True

self.update_state_db(fips_db_key, 'timestamp', datetime.utcnow().isoformat())
self.update_state_db(fips_db_key, 'enforced', str(current_enforce))
self.update_state_db(fips_db_key, 'enforced', str(current_enforced))
self.update_state_db(fips_db_key, 'enforced_next', str(next_enforced))
self.update_state_db(fips_db_key, 'enabled', str(enabled))

Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
'scripts/procdockerstatsd',
'scripts/determine-reboot-cause',
'scripts/process-reboot-cause',
'scripts/sonic-host-server',
'scripts/update-fips-config'
'scripts/sonic-host-server'
],
install_requires = [
'dbus-python',
Expand Down
14 changes: 13 additions & 1 deletion tests/common/mock_configdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,16 @@ def pop(self):

class MockDBConnector():
def __init__(self, db, val, tcpFlag=False, name=None):
pass
self.data = {}

def hget(self, key, field):
if key not in self.data:
return None
if field not in self.data[key]:
return None
return self.data[key][field]

def hset(self, key, field, value):
if key not in self.data:
self.data[key] = {}
self.data[key][field] = value
128 changes: 128 additions & 0 deletions tests/hostcfgd/hostcfgd_fips_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import importlib.machinery
import importlib.util
import filecmp
import json
import shutil
import os
import sys
from swsscommon import swsscommon

from parameterized import parameterized
from unittest import TestCase, mock
from tests.common.mock_configdb import MockConfigDb, MockDBConnector
from sonic_py_common.general import getstatusoutput_noshell

test_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
modules_path = os.path.dirname(test_path)
scripts_path = os.path.join(modules_path, "scripts")
sys.path.insert(0, modules_path)

# Load the file under test
hostcfgd_path = os.path.join(scripts_path, 'hostcfgd')
loader = importlib.machinery.SourceFileLoader('hostcfgd', hostcfgd_path)
spec = importlib.util.spec_from_loader(loader.name, loader)
hostcfgd = importlib.util.module_from_spec(spec)
loader.exec_module(hostcfgd)
sys.modules['hostcfgd'] = hostcfgd
original_syslog = hostcfgd.syslog

# Mock swsscommon classes
hostcfgd.ConfigDBConnector = MockConfigDb
hostcfgd.DBConnector = MockDBConnector
hostcfgd.Table = mock.Mock()
running_services = [{"unit":"ssh.service","load":"loaded","active":"active","sub":"running","description":"OpenBSD Secure Shell server"},
{"unit":"restapi.service","load":"loaded","active":"active","sub":"running","description":"SONiC Restful API Service"}]

class TestHostcfgdFIPS(TestCase):
"""
Test hostcfd daemon - FIPS
"""
def run_diff(self, file1, file2):
_, output = getstatusoutput_noshell(['diff', '-uR', file1, file2])
return output

def setUp(self):
self._workPath =os.path.join('/tmp/test_fips/', self._testMethodName)
self.test_data = {'DEVICE_METADATA':{},'FIPS': {}}
self.test_data['DEVICE_METADATA'] = {'localhost': {'hostname': 'fips'}}
self.test_data['FIPS']['global'] = {'enable': 'false', 'enforce': 'false'}
hostcfgd.FIPS_CONFIG_FILE = os.path.join(self._workPath + 'eips.json')
hostcfgd.OPENSSL_FIPS_CONFIG_FILE = os.path.join(self._workPath, 'fips_enabled')
hostcfgd.PROC_CMDLINE = os.path.join(self._workPath, 'cmdline')
os.makedirs(self._workPath, exist_ok=True)
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=0')
with open(hostcfgd.OPENSSL_FIPS_CONFIG_FILE, 'w') as f:
f.write('0')

def tearDown(self):
shutil.rmtree(self._workPath, ignore_errors=True)

def assert_fips_runtime_config(self, result='1'):
with open(hostcfgd.OPENSSL_FIPS_CONFIG_FILE) as f:
assert f.read() == result

@mock.patch('syslog.syslog')
@mock.patch('subprocess.check_output', side_effect=['', json.dumps(running_services)])
@mock.patch('subprocess.check_call')
def test_hostcfgd_fips_enable(self, mock_check_call, mock_check_output, mock_syslog):
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=0')
self.test_data['FIPS']['global']['enable'] = 'true'
MockConfigDb.set_config_db(self.test_data)
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.fips_config_handler("FIPS", '', self.test_data)
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service ssh.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service restapi.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to configure the enforce option False, since the config has already been set.')
mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.')
self.assert_fips_runtime_config()

@mock.patch('syslog.syslog')
@mock.patch('subprocess.check_output', side_effect=['', json.dumps(running_services)])
@mock.patch('subprocess.check_call')
def test_hostcfgd_fips_disable(self, mock_check_call, mock_check_output, mock_syslog):
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=0')
with open(hostcfgd.OPENSSL_FIPS_CONFIG_FILE, 'w') as f:
f.write('1')
self.test_data['FIPS']['global']['enable'] = 'false'
MockConfigDb.set_config_db(self.test_data)
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.fips_config_handler("FIPS", '', self.test_data)
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service ssh.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service restapi.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to configure the enforce option False, since the config has already been set.')
mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.')
self.assert_fips_runtime_config('0')

@mock.patch('syslog.syslog')
@mock.patch('subprocess.check_output', side_effect=['FIPS is disabled', json.dumps(running_services)])
@mock.patch('subprocess.check_call')
def test_hostcfgd_fips_enforce(self, mock_check_call, mock_check_output, mock_syslog):
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=0')
self.test_data['FIPS']['global']['enforce'] = 'true'
MockConfigDb.set_config_db(self.test_data)
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.fips_config_handler("FIPS", '', self.test_data)
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service ssh.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service restapi.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: update the FIPS enforce option True.')
mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.')
self.assert_fips_runtime_config()

@mock.patch('syslog.syslog')
@mock.patch('subprocess.check_output', side_effect=['FIPS is enabled', json.dumps(running_services)])
@mock.patch('subprocess.check_call')
def test_hostcfgd_fips_enforce_reconf(self, mock_check_call, mock_check_output, mock_syslog):
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=1')
self.test_data['FIPS']['global']['enforce'] = 'true'
MockConfigDb.set_config_db(self.test_data)
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.fips_config_handler("FIPS", '', self.test_data)
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to restart services, since FIPS enforced.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to configure the enforce option True, since the config has already been set.')
mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.')
self.assert_fips_runtime_config()
5 changes: 5 additions & 0 deletions tests/mock_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ def connect(self, db_id):
def get(self, db_id, key, field):
return MockConnector.data[key][field]

def set(self, db_id, key, field, value):
if key not in MockConnector.data:
MockConnector.data[key] = {}
MockConnector.data[key][field] = value

def keys(self, db_id, pattern):
match = pattern.split('*')[0]
ret = []
Expand Down
7 changes: 7 additions & 0 deletions tests/procdockerstatsd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,10 @@ def test_update_processstats_command(self):
pdstatsd.update_processstats_command()
mock_cmd.assert_has_calls(expected_calls)

@patch('procdockerstatsd.getstatusoutput_noshell_pipe', return_value=([0, 0], ''))
def test_update_procfipsstats_command(self, mock_cmd):
pdstatsd = procdockerstatsd.ProcDockerStats(procdockerstatsd.SYSLOG_IDENTIFIER)
pdstatsd.update_procfipsstats_command()
assert pdstatsd.state_db.get('STATE_DB', 'FIPS_STATS|state', 'enforced') == "False"
assert pdstatsd.state_db.get('STATE_DB', 'FIPS_STATS|state', 'enforced_next') == "True"
assert pdstatsd.state_db.get('STATE_DB', 'FIPS_STATS|state', 'enabled') == "True"

0 comments on commit 20148ba

Please sign in to comment.