Skip to content

Tutorial 7: Using Measurements for temporal Verification

Artur Mrowca edited this page Apr 19, 2019 · 7 revisions

1. Add Encryption and Decryption full example

In this tutorial a full example will be given on how to exchange keys, perform asymteric encryption and then symmetric encryption. Plus, we will show how our timing measurements could be used to estimate the encryption and decryption times. We will start with the result that we obtained from Tutorial 6, which we will now extend step by step. Also refer to the code provided for this tutorial in the Tutorial 7 folder.

In Tutorial 6 we created a certificate + root certificates for each ECU. Those are stored in self.my_certificate and self.my_root_certificates, where in Line

[certificate, root_certificates_to_verify_this_certificate, self.certificate_private_key] = GeneralSpecPreset().certificate_manager.generate_valid_ecu_cert(self._ecu_id, CAEnum.CA_L311, 0, float('inf'))

a certificate is generated that has the key pair public key = certificate.pub_key_user and the private key = self.certificate_private_key

With this we can start implementing our easy protocol Step 1: First, we transmit our certificate and a message encrypted with the private key of the certificate (i.e. of the current ECU), which we can decrypt with the public key in the certificate. We transmit this message with ID 901. So at the sending side we write in the send function:

print("Sending Certificate and my private key : ")
# Encrypt message with the public key
encrypted_msg = encryption_tools.asy_encrypt(message, self.my_certificate.pub_key_user)

# Send the certificate and message and private key
message_to_send = SegData([encrypted_msg, self.my_certificate, self.certificate_private_key], 50)
sec_message_id = 901
print("\n\nSENDER - \nTime: " + str(
	self.sim_env.now) + "--Communication Layer: \nI am ECU " + sender_id + "\nSending message:\n - ID: " + str(
	sec_message_id) + "\n - Content: " + message_to_send.get())
yield self.sim_env.process(self.transp_lay.send_msg(sender_id, sec_message_id, message_to_send))

Step 2: At the receiver side we will now verify this certificate, and decrypt the message with the private key we got. Then, we will generate a session_key that we will transmit back. This session_key is then used on both sides for communication.

	So at the receiver side we write:
if message_id == 901:
	''' receive and verify certificate '''
	[encrypted_msg, received_certificate, received_cert_private_key] = message_data.get()
	# verify certificate - which works here as they all have the same CA authority that signed the certificate
	certificate_valid = encryption_tools.certificate_trustworthy(received_certificate, self.my_root_certificates, self.sim_env.now)
	print("Certificate is valid? %s" % str(certificate_valid))

	# Respond with a session key of size 30 and message id 902
	self._session_key = sym_get_key(SymAuthMechEnum.AES, AuKeyLengthEnum.bit_128)
	self._have_session_key = True
	sec_message_id = 902
	yield self.sim_env.process(self.transp_lay.send_msg(self._ecu_id, sec_message_id, SegData(self._session_key, 30)))

Note here: yield to transp_lay sends the message, while return gives it back to the application layer

Step 3: The receivers on the same bus will receive and store the session key. So in the receive function we will add another line: (this can of course be two seperate ECU types - here as an example for simplicity I put it in the same ECU type)

if message_id == 902:
	# Read the message data -> which is session key
	self._session_key = message_data.get()
	self._have_session_key = True 
	sec_message_id = 903
	
	# FROM NOW ON communication over the session_key is done (for the ECUS that got the same session_key)

Step 4: Now we can run symmteric encryption

	by adding at the sender 
# Encrypt message
encrypted_msg = sym_encrypt(message, self._session_key)
print("\n\nSENDER - \nTime: "+str(self.sim_env.now)+"--Communication Layer: \nI am ECU " + sender_id + "\nSending message:\n - ID: " + str(message_id)+"\n - Content: " + message.get())

# Sending message now with the session key I have
print("\nEncrypted the message")

# Assume it takes 0.2 seconds to e.g. encrypt this message
# BEFORE PROCESSING
print("\nECU "+ str(self._ecu_id) +"Time before message sent: "+ str(self.sim_env.now))

# Send message - here send your message with your message_id
encrypted_size = 30 # some size
yield self.sim_env.process(self.transp_lay.send_msg(sender_id, message_id, SegData(encrypted_msg, encrypted_size)))
	and at the receiver 
if self._have_session_key and not message_id in [901, 902, 903]:
	decrypted_msg = sym_decrypt(message_data.get(), self._session_key)

	time_for_decryption = self._get_time_for_session_decryption(message_data.get())

	if decrypted_msg is None:
		print("--- Decryption was not successful - probably this ECU did not get the session key")
	else:
		message_data = decrypted_msg.get()
		# push to higher layer
		return [message_id, message_data]

2. Add timings

Several examples of how timings are computed can be found in ECUSimulation/components/security/ecu/types/impl_sec_mod_lwa.py in StdSecurLwSecModTimingFunctions. What basically happens there is that the measurements database is queried for each message that is to be encrypted, decrypted, hashed,... based on the message sizes. This DB maps message sizes to times it takes to perform operations on those sizes. If no value is found interpolation is used. Please checkout the database in ECUSimulation/config/data/measurements.db, where you will see which tags you can query with the lookup_interpol method. There you can also add your own measurements. With the lookup_interpol method all columns are queried based on the given arguments, as shown below.

E.g. the encryption time and decryption time can be retrieved using:

def _get_time_for_session_encryption(self):
	# encrypted size
	size_to_encrypt = encrypted_message._size

	# encryption time
	algorithm = EnumTrafor().to_value(SymAuthMechEnum.AES)
	key_len = EnumTrafor().to_value(AuKeyLengthEnum.bit_128)

	library_tag = "Crypto_Lib_HW" # available tags: ['CyaSSL', 'Crypto_Lib_HW', 'Crypto_Lib_SW']


	db_val = TimingDBMap().lookup_interpol(lib=library_tag, mode='ENCRYPTION', \
										   keylen=key_len, alg=algorithm, data_size=size_to_encrypt,
										   description='t_ecu_auth_reg_msg_validate_cert')

	encrypted_size = 24 # something you need to compute yourself or use the examples provided in ECUSimulation/components/security/ecu/types/impl_sec_mod_lwa.py
	encryption_time = db_val
return encryption_time, encrypted_size

# This can then be applied to the simulation environment
print("%s: Time before encryption %s" % (str(self._ecu_id), str(self.sim_env.now)))
encryption_time, encrypted_size = self._get_time_for_session_encryption(message)
yield self.sim_env.timeout(encryption_time) # apply time for encryption
print("%s: Time after encryption %s"  % (str(self._ecu_id), str(self.sim_env.now)))
Analogously at the decryption side we can query the decryption sizes 
def _get_time_for_session_decryption(self, encrypted_message):
        
	# decrypted size
	size_to_decrypt = 24

	# decryption time
	algorithm = EnumTrafor().to_value(SymAuthMechEnum.AES)
	key_len = EnumTrafor().to_value(AuKeyLengthEnum.bit_128)

	library_tag = "Crypto_Lib_HW"  # available tags: ['CyaSSL', 'Crypto_Lib_HW', 'Crypto_Lib_SW']

	db_val = TimingDBMap().lookup_interpol(lib=library_tag, mode='DECRYPTION', \
										   keylen=key_len, alg=algorithm, data_size=size_to_decrypt,
										   description='some_description')

	decrypted_size = encrypted_message.msg_unencrpyted._size  # this is stored in the encrypted message

	decryption_time = db_val

	return decryption_time, decrypted_size
	
# And again we can apply it 
print("%s: Time before decryption %s" % (str(self._ecu_id), str(self.sim_env.now)))
decryption_time, decrypted_size = self._get_time_for_session_decryption(message_data.get())
yield self.sim_env.timeout(decryption_time) # time for decryption
print("%s: Time after decryption %s" % (str(self._ecu_id), str(self.sim_env.now)))

Please checkout the whole implementation in the github project in the Tutorial 7 folder. There only the method send_msg and receive_msg are relevant as those contain the whole logic. In this example only the symmetric encryption times were demonstrated -> for all other encryption/decryption times please look for the examples given in StdSecurLwSecModTimingFunctions. Also the timings depend on your protocol i.e. the required hashing, decryption, encryption or verify operations that you may perform.