diff --git a/src/ostorlab/runtimes/local/runtime.py b/src/ostorlab/runtimes/local/runtime.py index dece11a37..b0c8dc809 100644 --- a/src/ostorlab/runtimes/local/runtime.py +++ b/src/ostorlab/runtimes/local/runtime.py @@ -309,13 +309,15 @@ def stop(self, scan_id: str) -> None: networks = self._docker_client.networks.list() for network in networks: network_labels = network.attrs["Labels"] - if ( - network_labels is not None - and int(network_labels.get("ostorlab.universe")) == scan_id - ): - logger.info("removing network %s", network_labels) - stopped_network.append(network) - network.remove() + if network_labels is None: + logger.debug("Skipping network with no labels") + continue + if isinstance(network_labels, dict): + universe = network_labels.get("ostorlab.universe") + if universe is not None and int(universe) == scan_id: + logger.info("removing network %s", network_labels) + stopped_network.append(network) + network.remove() configs = self._docker_client.configs.list() for config in configs: diff --git a/tests/runtimes/local/runtime_test.py b/tests/runtimes/local/runtime_test.py index e3e2f1519..c29cb77f8 100644 --- a/tests/runtimes/local/runtime_test.py +++ b/tests/runtimes/local/runtime_test.py @@ -6,6 +6,7 @@ import docker import pytest from docker.models import services as services_model +from docker.models import networks as networks_model from pytest_mock import plugin import ostorlab @@ -364,3 +365,68 @@ def testCheckServicesMethod_whenServicesAreStopped_shouldExit( assert exit_mock.call_count == 1 exit_with = exit_mock.call_args_list[0][0][0] assert exit_with == 0 + + +@pytest.mark.docker +def testRuntimeScanStop_whenUnrelatedNetworks_removesScanServiceWithoutCrash( + mocker: plugin.MockerFixture, db_engine_path: str +): + """Unittest for the scan stop method when there are networks not related to the scan, the process shouldn't crash""" + mocker.patch.object(models, "ENGINE_URL", db_engine_path) + create_scan_db = models.Scan.create("test") + + def docker_services() -> list[services_model.Service]: + """Method for mocking the services list response.""" + with models.Database() as session: + scan = session.query(models.Scan).first() + services = [ + { + "ID": "0099i5n1y3gycuekvksyqyxav", + "CreatedAt": "2021-12-27T13:37:02.795789947Z", + "Spec": {"Labels": {"ostorlab.universe": scan.id}}, + }, + { + "ID": "0099i5n1y3gycuekvksyqyxav", + "CreatedAt": "2021-12-27T13:37:02.795789947Z", + "Spec": {"Labels": {"ostorlab.universe": 9999}}, + }, + ] + + return [services_model.Service(attrs=service) for service in services] + + def docker_networks() -> list[networks_model.Network]: + """Method for mocking the services list response.""" + with models.Database() as session: + scan = session.query(models.Scan).first() + networks = [ + { + "ID": "0099i5n1y3gycuekvksyqyxav", + "CreatedAt": "2021-12-27T13:37:02.795789947Z", + "Labels": {}, + }, + { + "ID": "0099i5n1y3gycuekvksyqyxav", + "CreatedAt": "2021-12-27T13:37:02.795789947Z", + "Labels": {"ostorlab.universe": scan.id}, + }, + ] + + return [networks_model.Network(attrs=network) for network in networks] + + mocker.patch( + "docker.DockerClient.services", return_value=services_model.ServiceCollection() + ) + mocker.patch("docker.DockerClient.services.list", side_effect=docker_services) + mocker.patch( + "docker.models.networks.NetworkCollection.list", return_value=docker_networks() + ) + mocker.patch("docker.models.configs.ConfigCollection.list", return_value=[]) + docker_network_remove = mocker.patch("docker.models.networks.Network.remove") + docker_service_remove = mocker.patch( + "docker.models.services.Service.remove", return_value=None + ) + + local_runtime.LocalRuntime().stop(scan_id=create_scan_db.id) + + docker_service_remove.assert_called_once() + docker_network_remove.assert_called_once()