Skip to content

Commit

Permalink
[system-health] Remove subprocess with shell=True (sonic-net#12572)
Browse files Browse the repository at this point in the history
Signed-off-by: maipbui <[email protected]>
#### Why I did it
`subprocess` is used with `shell=True`, which is very dangerous for shell injection.
#### How I did it
remove `shell=True`, use `shell=False`
#### How to verify it
Pass UT
Manual test
  • Loading branch information
maipbui authored Nov 2, 2022
1 parent e1440f0 commit b3a8167
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
11 changes: 6 additions & 5 deletions src/system-health/health_checker/service_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ class ServiceChecker(HealthChecker):
CRITICAL_PROCESSES_PATH = 'etc/supervisor/critical_processes'

# Command to get merged directory of a container
GET_CONTAINER_FOLDER_CMD = 'docker inspect {} --format "{{{{.GraphDriver.Data.MergedDir}}}}"'
GET_CONTAINER_FOLDER_CMD = ['docker', 'inspect', '', '--format', "{{.GraphDriver.Data.MergedDir}}"]

# Command to query the status of monit service.
CHECK_MONIT_SERVICE_CMD = 'systemctl is-active monit.service'
CHECK_MONIT_SERVICE_CMD = ['systemctl', 'is-active', 'monit.service']

# Command to get summary of critical system service.
CHECK_CMD = 'monit summary -B'
CHECK_CMD = ['monit', 'summary', '-B']
MIN_CHECK_CMD_LINES = 3

# Expect status for different system service category.
Expand Down Expand Up @@ -168,7 +168,8 @@ def _update_container_critical_processes(self, container, critical_process_list)
self.need_save_cache = True

def _get_container_folder(self, container):
container_folder = utils.run_command(ServiceChecker.GET_CONTAINER_FOLDER_CMD.format(container))
ServiceChecker.GET_CONTAINER_FOLDER_CMD[2] = str(container)
container_folder = utils.run_command(ServiceChecker.GET_CONTAINER_FOLDER_CMD)
if container_folder is None:
return container_folder

Expand Down Expand Up @@ -327,7 +328,7 @@ def check_process_existence(self, container_name, critical_process_list, config,
# We are using supervisorctl status to check the critical process status. We cannot leverage psutil here because
# it not always possible to get process cmdline in supervisor.conf. E.g, cmdline of orchagent is "/usr/bin/orchagent",
# however, in supervisor.conf it is "/usr/bin/orchagent.sh"
cmd = 'docker exec {} bash -c "supervisorctl status"'.format(container_name)
cmd = ['docker', 'exec', str(container_name), 'bash', '-c', "supervisorctl status"]
process_status = utils.run_command(cmd)
if process_status is None:
for process_name in critical_process_list:
Expand Down
2 changes: 1 addition & 1 deletion src/system-health/health_checker/sysmonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def get_app_ready_status(self, service):

#Gets the service properties
def run_systemctl_show(self, service):
command = ('systemctl show {} --property=Id,LoadState,UnitFileState,Type,ActiveState,SubState,Result'.format(service))
command = ['systemctl', 'show', str(service), '--property=Id,LoadState,UnitFileState,Type,ActiveState,SubState,Result']
output = utils.run_command(command)
srv_properties = output.split('\n')
prop_dict = {}
Expand Down
2 changes: 1 addition & 1 deletion src/system-health/health_checker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def run_command(command):
:return: Output of the shell command.
"""
try:
process = subprocess.Popen(command, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process = subprocess.Popen(command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process.communicate()[0]
except Exception:
return None
Expand Down
4 changes: 2 additions & 2 deletions src/system-health/tests/test_system_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,10 @@ def test_manager(mock_hw_info, mock_service_info, mock_udc_info):
manager._set_system_led(chassis, manager.config, 'normal')

def test_utils():
output = utils.run_command('some invalid command')
output = utils.run_command(['some', 'invalid', 'command'])
assert not output

output = utils.run_command('ls')
output = utils.run_command(['ls'])
assert output


Expand Down

0 comments on commit b3a8167

Please sign in to comment.