Skip to content

Commit

Permalink
TEST: negotiate hybrid PQ/T keys with OQS, Cloudflare and IBM
Browse files Browse the repository at this point in the history
  • Loading branch information
reneme committed Oct 6, 2023
1 parent 86700fa commit 870e008
Showing 1 changed file with 122 additions and 0 deletions.
122 changes: 122 additions & 0 deletions src/scripts/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

CLI_PATH = None
TEST_DATA_DIR = '.'
ONLINE_TESTS = False
TESTS_RUN = 0
TESTS_FAILED = 0

Expand All @@ -41,6 +42,9 @@ def run_socket_tests():
# Connecting to the server port fails. Possibly a local firewall?
return platform.system().lower() != "freebsd"

def run_online_tests():
return ONLINE_TESTS

class TestLogHandler(logging.StreamHandler):
def emit(self, record):
# Do the default stuff first
Expand Down Expand Up @@ -1047,6 +1051,119 @@ def __init__(self, name, protocol_version, policy, **kwargs):
logging.error("server said (stdout): %s", srv_stdout)
logging.error("server said (stderr): %s", srv_stderr)

def cli_tls_online_pqc_hybrid_tests(tmp_dir):
if not run_socket_tests() or not run_online_tests() or not check_for_command("tls_client"):
return

oqs_test_ca = '\n'.join([
"-----BEGIN CERTIFICATE-----"
"MIIFCzCCAvOgAwIBAgIUCpn6WBGVTKUeNehaBCnHK4AgfrUwDQYJKoZIhvcNAQEL"
"BQAwFTETMBEGA1UEAwwKb3FzdGVzdF9DQTAeFw0yMzA4MDgxMDQwMzRaFw0yNDEy"
"MjAxMDQwMzRaMBUxEzARBgNVBAMMCm9xc3Rlc3RfQ0EwggIiMA0GCSqGSIb3DQEB"
"AQUAA4ICDwAwggIKAoICAQCnUS9KCJuwwGbgdsYoVkU7pp/M5gApTHdURaSx1NN+"
"0f50155cJvk0FZjJibL5wOawGcsDXQ31ujaXvtZPEWDbW8wUNhM66vLUjY8SuWNW"
"AoK2O42EH8jxNBNTethojZxMs+IKijh25Iz8O8nrNXPV1kQAPts9y9XHL8KNAGcS"
"+6xpRb19dln83veoIGvwkLcde/xmkWtkhDKiaT4TkTTNSVMavP4X8nGlzVkWYxiT"
"XjY36G784rPz6bY8G7dyrxDk4awOCktY5Hmw6C1gy6FxVTFezCPqVJMlznO/vu42"
"DtzHis9ztT3Yo3j2vroywHNa8F4E6lQVZtMOnqPm+bo65imWCmDMYToWMuA22a4O"
"CWYy6riOSKhMh2h1bn+b+/F2PxN4m7rfF9EgNAWjDYoQI75bXpGgFswVYbV3NHwM"
"jiwKAt5lW0ZrHELwnXDg3YeozvL/aBxtknhhHv4e9cwr91LbDjPZYbiVrtjlIzLk"
"O8TqdjdanDPsFTvscMd6CGy6ZPuJta60FcsBazQOVLo6avkZlGvosPstmcM1Cmkb"
"uXHprdWjW5eCesf28LXl0zlJzAldcleHva8JFsgv9Qyjhz91n9YPb1pnSb5o9YY8"
"WNYqq6vZgQb9uxna99CmZtlbFKueusn0BWyOYldnbW0J/dhWA2J1f/smC80oRNnP"
"HQIDAQABo1MwUTAdBgNVHQ4EFgQUQuYEXbkQgyG7YNznz3zXpmDo0sowHwYDVR0j"
"BBgwFoAUQuYEXbkQgyG7YNznz3zXpmDo0sowDwYDVR0TAQH/BAUwAwEB/zANBgkq"
"hkiG9w0BAQsFAAOCAgEADkGyktZjsMQcRguJAV6ZS9et35rzRaBUmMqZ4rRnTGqA"
"q/z6gKArC6n2zm0w9DHbBpVrLKqCmC/F1Pyhmm975Y8mPiQl1BzO86ShsMhBtFLZ"
"YBmikWicZtt2bznLSCwyMB6WoaG4OrCgigqFkiPHX18SwblgRaI+6J4BxrKqEMRz"
"Qjo4BzpqBxepDGwe4xJVFA/KoO9QENSj35h+15RHNC33nMPmE068R4jwVFSvKmJe"
"9qVJjbMQqBtsbVW/1jcgYKUIlg2IPMzplbvrZzWX3EZ7vOZx6g3nI9gDEx2g2WKN"
"t4fDXpzsNpdqFOdol9eRzUpHbkg1N9SRZLB1HGZ3+5xgQq0o0alHdEp5I/du0ESE"
"SQASXOdZgrxjIErm3xq/cVms0JBhia1tYAJnW9CjM9I0YdG/zwH6BK/m5RWQzkNV"
"N6n5lsgHrtWjlcUPMgd8OGTR7F1HQW5q8LDMDhI6ebuiRoMc7BJD0OsZKrohpCLz"
"MgXtU+muFVWaRd6q+8KcX6NilSrG88+SMnTJCEyillQ578X3MlkJMSx/XHwDAS14"
"JK8yIcMmTLVxpc4RAMR7milNrCVBZHxLwhgTWvxf5BXw1Fiif4iyemBdS6W/m4h2"
"QiwOZdgXDK7NYwekF9H+vl4EGTYA6AiccaTuNiTVWS9hivRcucNZ92MJaomdypc="
"-----END CERTIFICATE-----"])

class TestConfig:
def __init__(self, host, kex_algo, port=443, ca=None):
self.host = host
self.kex_algo = kex_algo
self.port = port
self.ca = ca

self.policy_file = None
self.ca_file = None

def setup(self, tmp_dir):
self.policy_file = tempfile.NamedTemporaryFile(dir=tmp_dir, mode="w+", encoding="utf-8")
self.policy_file.write('\n'.join(["allow_tls13 = true",
"allow_tls12 = false",
"key_exchange_methods = HYBRID KEM",
f"key_exchange_groups = {self.kex_algo}"]))
self.policy_file.flush()

if self.ca:
self.ca_file = tempfile.NamedTemporaryFile(dir=tmp_dir, mode="w+", encoding="utf-8")
self.ca_file.write(self.ca)
self.ca_file.flush()

def run(self):
cmd_options = []
if self.ca_file:
cmd_options += [f"--trusted-cas={self.ca_file.name}"]
if self.port:
cmd_options += [f"--port={self.port}"]
cmd_options += [f"--policy={self.policy_file.name}"]
cmd_options += [self.host]
return test_cli("tls_client", cmd_options, cmd_input="", timeout=5)

def get_oqs_ports():
try:
conn = HTTPSConnection("test.openquantumsafe.org")
conn.request("GET", "/assignments.json")
resp = conn.getresponse()
if resp.status != 200:
return None
return json.loads(resp.read().decode("utf-8"))['ecdsap256']
except Exception:
return None

test_cfg = [
TestConfig("pq.cloudflareresearch.com", "x25519/Kyber-512-r3/cloudflare"),
TestConfig("pq.cloudflareresearch.com", "x25519/Kyber-768-r3/cloudflare"),

TestConfig("qsc.eu-de.kms.cloud.ibm.com", "secp256r1/Kyber-512-r3"),
TestConfig("qsc.eu-de.kms.cloud.ibm.com", "secp384r1/Kyber-768-r3"),
TestConfig("qsc.eu-de.kms.cloud.ibm.com", "secp521r1/Kyber-1024-r3"),
TestConfig("qsc.eu-de.kms.cloud.ibm.com", "Kyber-512-r3"),
TestConfig("qsc.eu-de.kms.cloud.ibm.com", "Kyber-768-r3"),
TestConfig("qsc.eu-de.kms.cloud.ibm.com", "Kyber-1024-r3"),
]

oqsp = get_oqs_ports()
if oqsp:
test_cfg += [
TestConfig("test.openquantumsafe.org", "x25519/Kyber-512-r3", port=oqsp['x25519_kyber512'], ca=oqs_test_ca),
TestConfig("test.openquantumsafe.org", "x25519/Kyber-768-r3", port=oqsp['x25519_kyber768'], ca=oqs_test_ca),
TestConfig("test.openquantumsafe.org", "secp256r1/Kyber-512-r3", port=oqsp['p256_kyber512'], ca=oqs_test_ca),
TestConfig("test.openquantumsafe.org", "secp384r1/Kyber-768-r3", port=oqsp['p384_kyber768'], ca=oqs_test_ca),
TestConfig("test.openquantumsafe.org", "secp521r1/Kyber-1024-r3", port=oqsp['p521_kyber1024'], ca=oqs_test_ca),
TestConfig("test.openquantumsafe.org", "Kyber-512-r3", port=oqsp['kyber512'], ca=oqs_test_ca),
TestConfig("test.openquantumsafe.org", "Kyber-768-r3", port=oqsp['kyber768'], ca=oqs_test_ca),
TestConfig("test.openquantumsafe.org", "Kyber-1024-r3", port=oqsp['kyber1024'], ca=oqs_test_ca),
]
else:
logging.info("failed to pull OQS port assignment, skipping OQS...")

for cfg in test_cfg:
cfg.setup(tmp_dir)
stdout = cfg.run()
if "Handshake complete" not in stdout:
logging.error('Failed to complete handshake (%s with %s): %s', cfg.host, cfg.kex_algo, stdout)


def cli_tls_http_server_tests(tmp_dir):
if not run_socket_tests() or not check_for_command("tls_http_server"):
return
Expand Down Expand Up @@ -1475,6 +1592,7 @@ def main(args=None):
parser.add_option('--quiet', action='store_true', default=False)
parser.add_option('--threads', action='store', type='int', default=0)
parser.add_option('--run-slow-tests', action='store_true', default=False)
parser.add_option('--run-online-tests', action='store_true', default=False)
parser.add_option('--test-data-dir', default='.')

(options, args) = parser.parse_args(args)
Expand Down Expand Up @@ -1557,6 +1675,7 @@ def main(args=None):
cli_tls_http_server_tests,
cli_tls_proxy_tests,
cli_tls_socket_tests,
cli_tls_online_pqc_hybrid_tests,
cli_trust_root_tests,
cli_tss_tests,
cli_uuid_tests,
Expand All @@ -1571,6 +1690,9 @@ def main(args=None):
else:
test_fns = fast_test_fns

global ONLINE_TESTS
ONLINE_TESTS = options.run_online_tests

tests_to_run = []
for fn in test_fns:
fn_name = fn.__name__
Expand Down

0 comments on commit 870e008

Please sign in to comment.