diff --git a/iso15118/evcc/comm_session_handler.py b/iso15118/evcc/comm_session_handler.py index 0b01a0b8..940c28f0 100644 --- a/iso15118/evcc/comm_session_handler.py +++ b/iso15118/evcc/comm_session_handler.py @@ -53,7 +53,7 @@ SDP_MAX_REQUEST_COUNTER = 50 -class EVCCCommunicationSession(V2GCommunicationSession): +class EVCCCommunicationSession(V2GCommunicationSession, EXI): """ The communication session object for the EVCC, which holds session-specific variables and also implements a pausing mechanism. @@ -65,18 +65,22 @@ def __init__( session_handler_queue: asyncio.Queue, config: Config, ev_controller: EVControllerInterface, + iexi_codec: IEXICodec ): # Need to import here to avoid a circular import error # pylint: disable=import-outside-toplevel from iso15118.evcc.states.sap_states import SupportedAppProtocol + EXI.__init__(self, iexi_codec) + # TODO: There must be another way to do this than to pass the self # itself into the child. There are just a few attributes in these # class. If it is really necessary we can pass them into the child # From what I could see, we just use attributes of the V2GCommunication # Session, so we dont need to do this self injection, since self # is already injected by default on a child - super().__init__(transport, SupportedAppProtocol, session_handler_queue, self) + V2GCommunicationSession.__init__(self, transport, SupportedAppProtocol, + session_handler_queue, self) self.config = config # The EV controller that implements the interface EVControllerInterface @@ -169,7 +173,7 @@ async def send_sap(self): v2gtp_msg = V2GTPMessage( Protocol.UNKNOWN, ISOV2PayloadTypes.EXI_ENCODED, - EXI().to_exi(sap_req, Namespace.SAP), + self.to_exi(sap_req, Namespace.SAP), ) self.current_state.next_msg = sap_req await self.send(v2gtp_msg) @@ -217,13 +221,11 @@ def __init__( self.tcp_client = None self.tls_client = None self.config = config + self.iexi_codec = codec self.ev_controller = ev_controller self.sdp_retries_number = SDP_MAX_REQUEST_COUNTER self._sdp_retry_cycles = self.config.sdp_retry_cycles - # Set the selected EXI codec implementation - EXI().set_exi_codec(codec) - # Receiving queue for UDP client to notify about incoming datagrams self._rcv_queue = asyncio.Queue(0) @@ -369,6 +371,7 @@ async def start_comm_session(self, host: IPv6Address, port: int, is_tls: bool): self._rcv_queue, self.config, self.ev_controller, + self.iexi_codec ) try: diff --git a/iso15118/evcc/states/iso15118_20_states.py b/iso15118/evcc/states/iso15118_20_states.py index c1a5dbfa..50bd8279 100644 --- a/iso15118/evcc/states/iso15118_20_states.py +++ b/iso15118/evcc/states/iso15118_20_states.py @@ -142,10 +142,11 @@ def process_message( # ISO 15118-2 signature try: signature = create_signature( + self.comm_session, [ ( oem_prov_cert_chain.id, - EXI().to_exi( + self.comm_session.to_exi( oem_prov_cert_chain, Namespace.ISO_V20_COMMON_MSG ), ) @@ -204,10 +205,11 @@ def process_message( # TODO Need a signature for ISO 15118-20, not ISO 15118-2 try: signature = create_signature( + self.comm_session, [ ( pnc_params.id, - EXI().to_exi(pnc_params, Namespace.ISO_V20_COMMON_MSG), + self.comm_session.to_exi(pnc_params, Namespace.ISO_V20_COMMON_MSG), ) ], load_priv_key(KeyPath.OEM_LEAF_PEM, KeyEncoding.PEM), diff --git a/iso15118/evcc/states/iso15118_2_states.py b/iso15118/evcc/states/iso15118_2_states.py index ca4de4c3..c71c54e8 100644 --- a/iso15118/evcc/states/iso15118_2_states.py +++ b/iso15118/evcc/states/iso15118_2_states.py @@ -411,10 +411,11 @@ def process_message( try: signature = create_signature( + self.comm_session, [ ( cert_install_req.id, - EXI().to_exi( + self.comm_session.to_exi( cert_install_req, Namespace.ISO_V2_MSG_DEF ), ) @@ -494,29 +495,30 @@ def process_message( ) if not verify_signature( + comm_session=self.comm_session, signature=msg.header.signature, elements_to_sign=[ ( cert_install_res.contract_cert_chain.id, - EXI().to_exi( + self.comm_session.to_exi( cert_install_res.contract_cert_chain, Namespace.ISO_V2_MSG_DEF ), ), ( cert_install_res.encrypted_private_key.id, - EXI().to_exi( + self.comm_session.to_exi( cert_install_res.encrypted_private_key, Namespace.ISO_V2_MSG_DEF ), ), ( cert_install_res.dh_public_key.id, - EXI().to_exi( + self.comm_session.to_exi( cert_install_res.dh_public_key, Namespace.ISO_V2_MSG_DEF ), ), ( cert_install_res.emaid.id, - EXI().to_exi(cert_install_res.emaid, Namespace.ISO_V2_MSG_DEF), + self.comm_session.to_exi(cert_install_res.emaid, Namespace.ISO_V2_MSG_DEF), ), ], leaf_cert=cert_install_res.cps_cert_chain.certificate, @@ -599,10 +601,11 @@ def process_message( try: signature = create_signature( + self.comm_session, [ ( authorization_req.id, - EXI().to_exi(authorization_req, Namespace.ISO_V2_MSG_DEF), + self.comm_session.to_exi(authorization_req, Namespace.ISO_V2_MSG_DEF), ) ], load_priv_key(KeyPath.CONTRACT_LEAF_PEM, KeyEncoding.PEM), @@ -1027,10 +1030,11 @@ def process_message( try: signature = create_signature( + self.comm_session, [ ( metering_receipt_req.id, - EXI().to_exi( + self.comm_session.to_exi( metering_receipt_req, Namespace.ISO_V2_MSG_DEF ), ) diff --git a/iso15118/secc/comm_session_handler.py b/iso15118/secc/comm_session_handler.py index 49a788fe..2c38abdd 100644 --- a/iso15118/secc/comm_session_handler.py +++ b/iso15118/secc/comm_session_handler.py @@ -55,7 +55,7 @@ logger = logging.getLogger(__name__) -class SECCCommunicationSession(V2GCommunicationSession): +class SECCCommunicationSession(V2GCommunicationSession, EXI): """ The communication session object for the SECC, which holds session-specific variables and also implements a pausing mechanism. @@ -67,12 +67,15 @@ def __init__( session_handler_queue: asyncio.Queue, config: Config, evse_controller: EVSEControllerInterface, + iexi_codec: IEXICodec ): # Need to import here to avoid a circular import error # pylint: disable=import-outside-toplevel from iso15118.secc.states.sap_states import SupportedAppProtocol - super().__init__(transport, SupportedAppProtocol, session_handler_queue, self) + EXI.__init__(self, iexi_codec) + V2GCommunicationSession.__init__(self, transport, SupportedAppProtocol, + session_handler_queue, self) self.config = config # The EVSE controller that implements the interface EVSEControllerInterface @@ -148,11 +151,9 @@ def __init__( self.udp_server = None self.tcp_server = None self.config = config + self.iexi_codec = codec self.evse_controller = evse_controller - # Set the selected EXI codec implementation - EXI().set_exi_codec(codec) - # Receiving queue for UDP or TCP packets and session # triggers (e.g. pause/terminate) self._rcv_queue = asyncio.Queue() @@ -221,6 +222,7 @@ async def get_from_rcv_queue(self, queue: asyncio.Queue): self._rcv_queue, self.config, self.evse_controller, + self.iexi_codec ) task = asyncio.create_task( diff --git a/iso15118/secc/states/iso15118_20_states.py b/iso15118/secc/states/iso15118_20_states.py index 3c4e4ccd..3404b848 100644 --- a/iso15118/secc/states/iso15118_20_states.py +++ b/iso15118/secc/states/iso15118_20_states.py @@ -292,11 +292,12 @@ def process_message( # Verify signature if EVCC sent PnC authorization data if auth_req.pnc_params and not verify_signature( + self.comm_session, auth_req.header.signature, [ ( auth_req.pnc_params.id, - EXI().to_exi(auth_req.pnc_params, Namespace.ISO_V20_COMMON_MSG), + self.comm_session.to_exi(auth_req.pnc_params, Namespace.ISO_V20_COMMON_MSG), ) ], self.comm_session.contract_cert_chain.certificate, diff --git a/iso15118/secc/states/iso15118_2_states.py b/iso15118/secc/states/iso15118_2_states.py index 1b1f533b..8c3bc062 100644 --- a/iso15118/secc/states/iso15118_2_states.py +++ b/iso15118/secc/states/iso15118_2_states.py @@ -566,11 +566,12 @@ def process_message( ) if not verify_signature( + comm_session=self.comm_session, signature=msg.header.signature, elements_to_sign=[ ( cert_install_req.id, - EXI().to_exi(cert_install_req, Namespace.ISO_V2_MSG_DEF), + self.comm_session.to_exi(cert_install_req, Namespace.ISO_V2_MSG_DEF), ) ], leaf_cert=cert_install_req.oem_provisioning_cert, @@ -650,17 +651,17 @@ def process_message( # Elements to sign, containing its id and the exi encoded stream contract_cert_tuple = ( contract_cert_chain.id, - EXI().to_exi(contract_cert_chain, Namespace.ISO_V2_MSG_DEF), + self.comm_session.to_exi(contract_cert_chain, Namespace.ISO_V2_MSG_DEF), ) encrypted_priv_key_tuple = ( encrypted_priv_key.id, - EXI().to_exi(encrypted_priv_key, Namespace.ISO_V2_MSG_DEF), + self.comm_session.to_exi(encrypted_priv_key, Namespace.ISO_V2_MSG_DEF), ) dh_public_key_tuple = ( dh_public_key.id, - EXI().to_exi(dh_public_key, Namespace.ISO_V2_MSG_DEF), + self.comm_session.to_exi(dh_public_key, Namespace.ISO_V2_MSG_DEF), ) - emaid_tuple = (emaid.id, EXI().to_exi(emaid, Namespace.ISO_V2_MSG_DEF)) + emaid_tuple = (emaid.id, self.comm_session.to_exi(emaid, Namespace.ISO_V2_MSG_DEF)) elements_to_sign = [ contract_cert_tuple, @@ -671,7 +672,8 @@ def process_message( # The private key to be used for the signature signature_key = load_priv_key(KeyPath.CPS_LEAF_PEM, KeyEncoding.PEM) - signature = create_signature(elements_to_sign, signature_key) + signature = create_signature(self.comm_session, elements_to_sign, + signature_key) self.create_next_message( PaymentDetails, @@ -852,11 +854,12 @@ def process_message( return if not verify_signature( + self.comm_session, msg.header.signature, [ ( authorization_req.id, - EXI().to_exi(authorization_req, Namespace.ISO_V2_MSG_DEF), + self.comm_session.to_exi(authorization_req, Namespace.ISO_V2_MSG_DEF), ) ], self.comm_session.contract_cert_chain.certificate, @@ -1002,14 +1005,15 @@ def process_message( try: element_to_sign = ( schedule.sales_tariff.id, - EXI().to_exi( + self.comm_session.to_exi( schedule.sales_tariff, Namespace.ISO_V2_MSG_DEF ), ) signature_key = load_priv_key( KeyPath.MO_SUB_CA2_PEM, KeyEncoding.PEM ) - signature = create_signature([element_to_sign], signature_key) + signature = create_signature(self.comm_session, + [element_to_sign], signature_key) except PrivateKeyReadError as exc: logger.warning( "Can't read private key to needed to create " @@ -1269,11 +1273,12 @@ def process_message( "signature of MeteringReceiptReq" ) elif not verify_signature( + self.comm_session, msg.header.signature, [ ( metering_receipt_req.id, - EXI().to_exi(metering_receipt_req, Namespace.ISO_V2_MSG_DEF), + self.comm_session.to_exi(metering_receipt_req, Namespace.ISO_V2_MSG_DEF), ) ], self.comm_session.contract_cert_chain.certificate, diff --git a/iso15118/shared/comm_session.py b/iso15118/shared/comm_session.py index 16e19358..4c7959fd 100644 --- a/iso15118/shared/comm_session.py +++ b/iso15118/shared/comm_session.py @@ -176,7 +176,8 @@ def process_message(self, message: bytes): None, ] = None try: - decoded_message = EXI().from_exi(v2gtp_msg.payload, self.get_exi_ns()) + decoded_message = self.comm_session.from_exi(v2gtp_msg.payload, + self.get_exi_ns()) except EXIDecodingError as exc: logger.exception(f"{exc}") raise exc diff --git a/iso15118/shared/exi_codec.py b/iso15118/shared/exi_codec.py index 70b2ecab..8fa897b7 100644 --- a/iso15118/shared/exi_codec.py +++ b/iso15118/shared/exi_codec.py @@ -127,13 +127,9 @@ class EXI: The codec to be used will be requested during encode and decode operations. """ - _instance = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super(EXI, cls).__new__(cls) - cls._instance.exi_codec = None - return cls._instance + def __init__(self, codec: IEXICodec): + logger.debug(f"EXI Codec version: {codec.get_version()}") + self.exi_codec = codec def set_exi_codec(self, codec: IEXICodec): logger.debug(f"EXI Codec version: {codec.get_version()}") diff --git a/iso15118/shared/security.py b/iso15118/shared/security.py index d65b78cc..12ff8b80 100644 --- a/iso15118/shared/security.py +++ b/iso15118/shared/security.py @@ -67,6 +67,20 @@ Transforms, ) from iso15118.shared.settings import PKI_PATH +from typing_extensions import TYPE_CHECKING + +if TYPE_CHECKING: + # EVCCCommunicationSession and SECCCommunicationSession are used for + # annotation purposes only, as a type hint for the comm_session class + # attribute. But comm_session also imports State. To avoid a circular import + # error, one can use the TYPE_CHECKING boolean from typing, which evaluates + # to True during mypy or other 3rd party type checker but assumes the value + # 'False' during runtime. + # Please check: + # https://stackoverflow.com/questions/61545580/how-does-mypy-use-typing-type-checking-to-resolve-the-circular-import-annotation + # https://docs.python.org/3/library/typing.html#typing.TYPE_CHECKING + from iso15118.evcc.comm_session_handler import EVCCCommunicationSession + from iso15118.secc.comm_session_handler import SECCCommunicationSession logger = logging.getLogger(__name__) @@ -661,6 +675,7 @@ def get_cert_issuer_serial(cert_path: str) -> Tuple[str, int]: def create_signature( + comm_session: Union["EVCCCommunicationSession", "SECCCommunicationSession"], elements_to_sign: List[Tuple[str, bytes]], signature_key: EllipticCurvePrivateKey ) -> Signature: """ @@ -675,6 +690,7 @@ def create_signature( generate a signature) Args: + comm_session: instance of the running session elements_to_sign: A list of tuples [str, bytes], where the first entry of each tuple is the Id field (XML attribute) and the second entry is the EXI encoded bytes representation @@ -721,7 +737,7 @@ def create_signature( ) # 2. Step: Signature generation - exi_encoded_signed_info = EXI().to_exi(signed_info, Namespace.XML_DSIG) + exi_encoded_signed_info = comm_session.to_exi(signed_info, Namespace.XML_DSIG) signature_value = signature_key.sign(exi_encoded_signed_info, ec.ECDSA(SHA256())) signature = Signature( signed_info=signed_info, signature_value=SignatureValue(value=signature_value) @@ -731,6 +747,7 @@ def create_signature( def verify_signature( + comm_session: Union["EVCCCommunicationSession", "SECCCommunicationSession"], signature: Signature, elements_to_sign: List[Tuple[str, bytes]], leaf_cert: bytes, @@ -759,6 +776,7 @@ def verify_signature( certificate and sub-CA certificate(s)). Args: + comm_session: instance of the running session signature: The Signature instance containing the Reference elements and the SignatureValue needed to verify the signature. elements_to_sign: A list of tuples [int, bytes], where the first entry @@ -810,7 +828,7 @@ def verify_signature( # 2. Step: Checking signature value logger.debug("Verifying signature value for SignedInfo element") pub_key = load_der_x509_certificate(leaf_cert).public_key() - exi_encoded_signed_info = EXI().to_exi(signature.signed_info, Namespace.XML_DSIG) + exi_encoded_signed_info = comm_session.to_exi(signature.signed_info, Namespace.XML_DSIG) try: if isinstance(pub_key, EllipticCurvePublicKey): diff --git a/iso15118/shared/states.py b/iso15118/shared/states.py index 71febaf5..673a74e7 100644 --- a/iso15118/shared/states.py +++ b/iso15118/shared/states.py @@ -258,7 +258,7 @@ def create_next_message( # Step 3 exi_payload: bytes = bytes(0) try: - exi_payload = EXI().to_exi(to_be_exi_encoded, namespace) + exi_payload = self.comm_session.to_exi(to_be_exi_encoded, namespace) except EXIEncodingError as exc: logger.error(f"{exc}") self.next_state = Terminate diff --git a/tests/evcc/states/test_iso15118_2_states.py b/tests/evcc/states/test_iso15118_2_states.py index 600673b3..5d6eef5a 100644 --- a/tests/evcc/states/test_iso15118_2_states.py +++ b/tests/evcc/states/test_iso15118_2_states.py @@ -1,7 +1,7 @@ -from unittest.mock import Mock +from unittest.mock import Mock, patch import pytest -from evcc.states.test_messages import ( +from tests.evcc.states.test_messages import ( get_v2g_message_current_demand_res, get_v2g_message_current_demand_res_with_stop_charging, get_v2g_message_power_delivery_res, @@ -21,7 +21,6 @@ ) from iso15118.shared.notifications import StopNotification - @pytest.fixture def comm_session_mock(): comm_session_mock = Mock(spec=EVCCCommunicationSession) @@ -31,6 +30,10 @@ def comm_session_mock(): comm_session_mock.protocol = Protocol.UNKNOWN comm_session_mock.selected_schedule = 1 comm_session_mock.selected_energy_mode = EnergyTransferModeEnum.DC_EXTENDED + comm_session_mock.selected_charging_type_is_ac = False + # to exi must return something or the V2GTPMessage creation fails during extraction + # of the payload length + comm_session_mock.to_exi = Mock(return_value=b'\x01') return comm_session_mock @@ -41,7 +44,8 @@ def test_current_demand_to_current_demand(comm_session_mock): assert current_demand.next_state == CurrentDemand -def test_current_demand_to_power_delivery__when__evse_notification_is_stop_charging( + +def test_current_demand_to_power_delivery_when_evse_notification_is_stop_charging( comm_session_mock, ): # according V2G2-679 (EVSENotification = EVSENotification) @@ -54,12 +58,12 @@ def test_current_demand_to_power_delivery__when__evse_notification_is_stop_charg assert current_demand.next_state == PowerDelivery -def test_current_demand_to_power_delivery__when__stopped_by_ev(comm_session_mock): +def test_current_demand_to_power_delivery_when_stopped_by_ev(comm_session_mock): # V2G2-527 pass -def test_power_delivery_to_welding_detection__when__charge_progress_is_stop( +def test_power_delivery_to_welding_detection_when_charge_progress_is_stop( comm_session_mock, ): # V2G2-533 diff --git a/tests/secc/states/test_iso15118_2_states.py b/tests/secc/states/test_iso15118_2_states.py index 2f89174f..98fbc491 100644 --- a/tests/secc/states/test_iso15118_2_states.py +++ b/tests/secc/states/test_iso15118_2_states.py @@ -1,7 +1,7 @@ +import pytest from unittest.mock import Mock -import pytest -from secc.states.test_messages import ( +from tests.secc.states.test_messages import ( get_dummy_v2g_message_welding_detection_req, get_sa_schedule_list, get_v2g_message_power_delivery_req, @@ -25,14 +25,17 @@ def comm_session_mock(): comm_session_mock.session_id = "F9F9EE8505F55838" comm_session_mock.offered_schedules = get_sa_schedule_list() comm_session_mock.selected_energy_mode = EnergyTransferModeEnum.DC_EXTENDED - + comm_session_mock.selected_charging_type_is_ac = False comm_session_mock.stop_reason = StopNotification(False, "pytest") comm_session_mock.evse_controller = SimEVSEController() comm_session_mock.protocol = Protocol.UNKNOWN + # to exi must return something or the V2GTPMessage creation fails during extraction + # of the payload length + comm_session_mock.to_exi = Mock(return_value=b'\x01') return comm_session_mock -def test_current_demand_to_power_delivery__when__power_delivery_received( +def test_current_demand_to_power_delivery_when_power_delivery_received( comm_session_mock, ): current_demand = CurrentDemand(comm_session_mock) @@ -41,7 +44,7 @@ def test_current_demand_to_power_delivery__when__power_delivery_received( assert isinstance(comm_session_mock.current_state, PowerDelivery) -def test_power_delivery_to_welding_detection__when__welding_detection_received( +def test_power_delivery_to_welding_detection_when_welding_detection_received( comm_session_mock, ): # V2G2-601 (to WeldingDetection) @@ -53,7 +56,7 @@ def test_power_delivery_to_welding_detection__when__welding_detection_received( assert isinstance(comm_session_mock.current_state, WeldingDetection) -def test_welding_detection_to_session_stop__when__session_stop_received( +def test_welding_detection_to_session_stop_when_session_stop_received( comm_session_mock, ): pass