Skip to content

Commit

Permalink
[capture] Enable uJSOn for KMAC
Browse files Browse the repository at this point in the history
Similar to lowRISC#218, this PR enables uJSON support for capturing KMAC
traces. The device command handler code can be found in #20563.

Signed-off-by: Pascal Nasahl <[email protected]>
  • Loading branch information
nasahlpa committed Dec 5, 2023
1 parent 19a084e commit c9ef663
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 46 deletions.
45 changes: 29 additions & 16 deletions capture/capture_kmac.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
from datetime import datetime
from functools import partial
from pathlib import Path
from typing import Optional

import lib.helpers as helpers
import numpy as np
import yaml
from Crypto.Hash import KMAC128
from lib.ot_communication import OTKMAC
from lib.ot_communication import OTKMAC, OTPRNG, OTUART
from project_library.project import ProjectConfig, SCAProject
from scopes.cycle_converter import convert_num_cycles, convert_offset_cycles
from scopes.scope import Scope, ScopeConfig, determine_sampling_rate
Expand Down Expand Up @@ -61,6 +62,8 @@ class CaptureConfig:
key_fixed: bytearray
key_len_bytes: int
text_len_bytes: int
protocol: str
port: Optional[str] = "None"


def setup(cfg: dict, project: Path):
Expand All @@ -87,6 +90,7 @@ def setup(cfg: dict, project: Path):
pll_frequency = cfg["target"]["pll_frequency"],
baudrate = cfg["target"]["baudrate"],
output_len = cfg["target"]["output_len_bytes"],
protocol = cfg["target"]["protocol"]
)

# Init scope.
Expand Down Expand Up @@ -142,8 +146,16 @@ def configure_cipher(cfg, target, capture_cfg) -> OTKMAC:
Returns:
The communication interface to the KMAC cipher.
"""
# Establish UART for uJSON command interface. Returns None for simpleserial.
ot_uart = OTUART(protocol=capture_cfg.protocol, port=capture_cfg.port)

# Create communication interface to OT KMAC.
ot_kmac = OTKMAC(target.target)
ot_kmac = OTKMAC(target=target.target, protocol=capture_cfg.protocol,
port=ot_uart.uart)

# Create communication interface to OT PRNG.
ot_prng = OTPRNG(target=target.target, protocol=capture_cfg.protocol,
port=ot_uart.uart)

# If batch mode, configure PRNGs.
if capture_cfg.batch_mode:
Expand All @@ -153,7 +165,7 @@ def configure_cipher(cfg, target, capture_cfg) -> OTKMAC:
# Seed the target's PRNGs for initial key masking, and additionally
# turn off masking when '0'.
ot_kmac.write_lfsr_seed(cfg["test"]["lfsr_seed"].to_bytes(4, "little"))
ot_kmac.write_batch_prng_seed(cfg["test"]["batch_prng_seed"].to_bytes(4, "little"))
ot_prng.seed_prng(cfg["test"]["batch_prng_seed"].to_bytes(4, "little"))

return ot_kmac

Expand Down Expand Up @@ -196,28 +208,27 @@ def generate_ref_crypto(sample_fixed, mode, batch, key, key_fixed, plaintext,
# returns pt, ct, key, needs key and pt as arguments
mac = KMAC128.new(key=bytes(key), mac_len=32)
mac.update(bytes(plaintext))
ciphertext = bytearray(mac.digest())
ciphertext = mac.digest()
else:
# returns random pt, ct, key, needs no arguments
if sample_fixed:
# Use fixed_key as this key.
key = np.asarray(key_fixed)
key = key_fixed
else:
# Generate this key from the PRNG.
key = bytearray(key_length)
key = []
for i in range(0, key_length):
key[i] = random.randint(0, 255)
key.append(random.randint(0, 255))
# Always generate this plaintext from PRNG (including very first one).
plaintext = bytearray(16)
plaintext = []
for i in range(0, 16):
plaintext[i] = random.randint(0, 255)
plaintext.append(random.randint(0, 255))
# Compute ciphertext for this key and plaintext.
mac = KMAC128.new(key=bytes(key), mac_len=32)
mac.update(bytes(plaintext))
ciphertext = bytearray(mac.digest())
# Determine if next iteration uses fixed_key.
sample_fixed = plaintext[0] & 0x1

return plaintext, key, ciphertext, sample_fixed


Expand Down Expand Up @@ -258,10 +269,10 @@ def capture(scope: Scope, ot_kmac: OTKMAC, capture_cfg: CaptureConfig,
cwtarget: The CW FPGA target.
"""
# Initial plaintext.
text_fixed = bytearray(capture_cfg.text_fixed)
text_fixed = capture_cfg.text_fixed
text = text_fixed
# Load fixed key.
key_fixed = bytearray(capture_cfg.key_fixed)
key_fixed = capture_cfg.key_fixed
key = key_fixed

# FVSR setup.
Expand Down Expand Up @@ -329,9 +340,9 @@ def capture(scope: Scope, ot_kmac: OTKMAC, capture_cfg: CaptureConfig,
assert len(waves[i, :]) >= 1
# Store trace into database.
project.append_trace(wave = waves[i, :],
plaintext = text,
ciphertext = ciphertext,
key = key)
plaintext = bytearray(text),
ciphertext = bytearray(ciphertext),
key = bytearray(key))

if capture_cfg.capture_mode == "kmac_random":
plaintext = bytearray(16)
Expand Down Expand Up @@ -417,7 +428,9 @@ def main(argv=None):
text_fixed = cfg["test"]["text_fixed"],
key_fixed = cfg["test"]["key_fixed"],
key_len_bytes = cfg["test"]["key_len_bytes"],
text_len_bytes = cfg["test"]["text_len_bytes"])
text_len_bytes = cfg["test"]["text_len_bytes"],
protocol = cfg["target"]["protocol"],
port = cfg["target"].get("port"))
logger.info(f"Setting up capture {capture_cfg.capture_mode} batch={capture_cfg.batch_mode}...")

# Configure cipher.
Expand Down
11 changes: 8 additions & 3 deletions capture/configs/kmac_cw310.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ target:
fpga_bitstream: ../objs/lowrisc_systems_chip_earlgrey_cw310_0.1_kmac_dom.bit
force_program_bitstream: False
fw_bin: ../objs/kmac_serial_fpga_cw310.bin
# fw_bin: "../objs/kmac_ujson_fpga_cw310.bin"
# target_clk_mult is a hardcoded value in the bitstream. Do not change.
target_clk_mult: 0.24
target_freq: 24000000
baudrate: 115200
output_len_bytes: 32
protocol: "simpleserial"
# protocol: "ujson"
# port: "/dev/ttyACM1"
husky:
sampling_rate: 200000000
num_segments: 20
Expand Down Expand Up @@ -35,9 +40,9 @@ capture:
trace_db: ot_trace_library
trace_threshold: 10000
test:
#which_test: kmac_random
#which_test: kmac_fvsr_key
which_test: kmac_fvsr_key_batch
# which_test: kmac_random
# which_test: kmac_fvsr_key
which_test: kmac_random
key_len_bytes: 16
key_fixed: [0x81, 0x1E, 0x37, 0x31, 0xB0, 0x12, 0x0A, 0x78, 0x42, 0x78, 0x1E, 0x22, 0xB2, 0x5C, 0xDD, 0xF9]
text_len_bytes: 16
Expand Down
104 changes: 87 additions & 17 deletions capture/lib/ot_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,50 +276,109 @@ def seed_prng(self, seed, seed_length: Optional[int] = 4):


class OTKMAC:
def __init__(self, target) -> None:
def __init__(self, target, protocol: str,
port: Optional[OTUART] = None) -> None:
self.target = target
self.simple_serial = True
self.port = port
if protocol == "ujson":
self.simple_serial = False
# Init the KMAC core.
self.port.write(json.dumps("KmacSca").encode("ascii"))
self.port.write(json.dumps("Init").encode("ascii"))

def write_key(self, key):
def _ujson_kmac_sca_cmd(self):
# TODO: without the delay, the device uJSON command handler program
# does not recognize the commands.
time.sleep(0.01)
self.port.write(json.dumps("KmacSca").encode("ascii"))

def write_key(self, key: list[int]):
""" Write the key to KMAC.
Args:
key: Bytearray containing the key.
"""
self.target.simpleserial_write("k", key)
if self.simple_serial:
self.target.simpleserial_write("k", bytearray(key))
else:
# KmacSca command.
self._ujson_kmac_sca_cmd()
# SetKey command.
self.port.write(json.dumps("SetKey").encode("ascii"))
# Key payload.
time.sleep(0.01)
key_data = {"key": key, "key_length": 16}
self.port.write(json.dumps(key_data).encode("ascii"))

def fvsr_key_set(self, key):
def fvsr_key_set(self, key: list[int], key_length: Optional[int] = 16):
""" Write the fixed key to KMAC.
Args:
key: Bytearray containing the key.
"""
self.target.simpleserial_write("f", key)
if self.simple_serial:
self.target.simpleserial_write("f", bytearray(key))
else:
# KmacSca command.
self._ujson_kmac_sca_cmd()
# SetKey command.
self.port.write(json.dumps("FixedKeySet").encode("ascii"))
# FixedKeySet payload.
time.sleep(0.01)
key_data = {"key": key, "key_length": key_length}
self.port.write(json.dumps(key_data).encode("ascii"))

def write_lfsr_seed(self, seed):
""" Seed the LFSR.
Args:
seed: The 4-byte seed.
"""
self.target.simpleserial_write("l", seed)

def write_batch_prng_seed(self, seed):
""" Seed the PRNG.
Args:
seed: The 4-byte seed.
"""
self.target.simpleserial_write("s", seed)
if self.simple_serial:
self.target.simpleserial_write("l", seed)
else:
# KmacSca command.
self._ujson_kmac_sca_cmd()
# SeedLfsr command.
self.port.write(json.dumps("SeedLfsr").encode("ascii"))
# Seed payload.
time.sleep(0.01)
seed_int = [x for x in seed]
seed_data = {"seed": seed_int}
self.port.write(json.dumps(seed_data).encode("ascii"))

def absorb_batch(self, num_segments):
""" Start absorb for batch.
Args:
num_segments: Number of encryptions to perform.
"""
self.target.simpleserial_write("b", num_segments)
if self.simple_serial:
self.target.simpleserial_write("b", num_segments)
else:
# KmacSca command.
self._ujson_kmac_sca_cmd()
# Batch command.
self.port.write(json.dumps("Batch").encode("ascii"))
# Num_segments payload.
time.sleep(0.01)
num_segments_data = {"data": [x for x in num_segments]}
self.port.write(json.dumps(num_segments_data).encode("ascii"))

def absorb(self, text):
def absorb(self, text, text_length: Optional[int] = 16):
""" Write plaintext to OpenTitan KMAC & start absorb.
Args:
text: The plaintext bytearray.
"""
self.target.simpleserial_write("p", text)
if self.simple_serial:
self.target.simpleserial_write("p", text)
else:
# KmacSca command.
self._ujson_kmac_sca_cmd()
# SingleAbsorb command.
self.port.write(json.dumps("SingleAbsorb").encode("ascii"))
# Msg payload.
time.sleep(0.01)
text_int = [x for x in text]
text_data = {"msg": text_int, "msg_length": text_length}
self.port.write(json.dumps(text_data).encode("ascii"))

def read_ciphertext(self, len_bytes):
""" Read ciphertext from OpenTitan KMAC.
Expand All @@ -329,4 +388,15 @@ def read_ciphertext(self, len_bytes):
Returns:
The received ciphertext.
"""
return self.target.simpleserial_read("r", len_bytes, ack=False)
if self.simple_serial:
return self.target.simpleserial_read("r", len_bytes, ack=False)
else:
while True:
read_line = str(self.port.readline())
if "RESP_OK" in read_line:
json_string = read_line.split("RESP_OK:")[1].split(" CRC:")[0]
try:
batch_digest = json.loads(json_string)["batch_digest"]
return bytearray(batch_digest[0:len_bytes])
except Exception:
pass # noqa: E302
25 changes: 15 additions & 10 deletions util/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0

import numpy as np
from Crypto.Cipher import AES
from Crypto.Hash import KMAC128

Expand Down Expand Up @@ -37,11 +36,17 @@ def set_start(self):
0x53, 0x53, 0x53, 0x53, 0x53, 0x53, 0x53, 0x53]

def advance_fixed(self):
self.text_fixed = self.cipher_gen.encrypt(bytes(self.text_fixed))
text_fixed_bytes = self.cipher_gen.encrypt(bytes(self.text_fixed))
# Convert bytearray into int array.
self.text_fixed = [x for x in text_fixed_bytes]

def advance_random(self):
self.text_random = self.cipher_gen.encrypt(bytes(self.text_random))
self.key_random = self.cipher_gen.encrypt(bytes(self.key_random))
text_random_bytes = self.cipher_gen.encrypt(bytes(self.text_random))
# Convert bytearray into int array.
self.text_random = [x for x in text_random_bytes]
key_random_bytes = self.cipher_gen.encrypt(bytes(self.key_random))
# Convert bytearray into int array.
self.key_random = [x for x in key_random_bytes]

def get_fixed(self):
pt = self.text_fixed
Expand All @@ -62,21 +67,21 @@ def get_random(self):
return pt, ct, key

def get_kmac_fixed(self):
pt = np.asarray(self.text_fixed)
key = np.asarray(self.key_fixed)
pt = self.text_fixed
key = self.key_fixed
mac_fixed = KMAC128.new(key=bytes(self.key_fixed), mac_len=32)
mac_fixed.update(bytes(self.text_fixed))
ct = np.asarray(bytearray(mac_fixed.digest()))
ct = mac_fixed.digest()
del (mac_fixed)
self.advance_fixed()
return pt, ct, key

def get_kmac_random(self):
pt = np.asarray(self.text_random)
key = np.asarray(self.key_random)
pt = self.text_random
key = self.key_random
mac_random = KMAC128.new(key=bytes(self.key_random), mac_len=32)
mac_random.update(bytes(self.text_random))
ct = np.asarray(bytearray(mac_random.digest()))
ct = mac_random.digest()

del (mac_random)
self.advance_random()
Expand Down

0 comments on commit c9ef663

Please sign in to comment.