-
-
Notifications
You must be signed in to change notification settings - Fork 590
/
Copy pathtest_pki.py
137 lines (119 loc) · 4.84 KB
/
test_pki.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import os
import tempfile
import subprocess
from typing import Tuple
import unittest
from unittest import mock
from proxy.common import pki
class TestPki(unittest.TestCase):
def setUp(self) -> None:
self._tempdir = tempfile.gettempdir()
return super().setUp()
@mock.patch('subprocess.Popen')
def test_run_openssl_command(self, mock_popen: mock.Mock) -> None:
command = ['my', 'custom', 'command']
mock_popen.return_value.returncode = 0
self.assertTrue(pki.run_openssl_command(command, 10))
mock_popen.assert_called_with(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
def test_get_ext_config(self) -> None:
self.assertEqual(pki.get_ext_config(None, None), b'')
self.assertEqual(pki.get_ext_config([], None), b'')
self.assertEqual(
pki.get_ext_config(
['proxy.py'],
None,
),
b'\nsubjectAltName=DNS:proxy.py',
)
self.assertEqual(
pki.get_ext_config(
None,
'serverAuth',
),
b'\nextendedKeyUsage=serverAuth',
)
self.assertEqual(
pki.get_ext_config(['proxy.py'], 'serverAuth'),
b'\nsubjectAltName=DNS:proxy.py\nextendedKeyUsage=serverAuth',
)
self.assertEqual(
pki.get_ext_config(['proxy.py', 'www.proxy.py'], 'serverAuth'),
b'\nsubjectAltName=DNS:proxy.py,DNS:www.proxy.py\nextendedKeyUsage=serverAuth',
)
def test_ssl_config_no_ext(self) -> None:
with pki.ssl_config() as (config_path, has_extension):
self.assertFalse(has_extension)
with open(config_path, 'rb') as config:
self.assertEqual(config.read(), pki.DEFAULT_CONFIG)
def test_ssl_config(self) -> None:
with pki.ssl_config(['proxy.py']) as (config_path, has_extension):
self.assertTrue(has_extension)
with open(config_path, 'rb') as config:
self.assertEqual(
config.read(),
pki.DEFAULT_CONFIG +
b'\n[PROXY]\nsubjectAltName=DNS:proxy.py',
)
def test_extfile_no_ext(self) -> None:
with pki.ext_file() as config_path:
with open(config_path, 'rb') as config:
self.assertEqual(config.read(), b'')
def test_extfile(self) -> None:
with pki.ext_file(['proxy.py']) as config_path:
with open(config_path, 'rb') as config:
self.assertEqual(
config.read(),
b'\nsubjectAltName=DNS:proxy.py',
)
def test_gen_private_key(self) -> None:
key_path, nopass_key_path = self._gen_private_key()
self.assertTrue(os.path.exists(key_path))
self.assertTrue(os.path.exists(nopass_key_path))
os.remove(key_path)
os.remove(nopass_key_path)
def test_gen_public_key(self) -> None:
key_path, nopass_key_path, crt_path = self._gen_public_private_key()
self.assertTrue(os.path.exists(crt_path))
# TODO: Assert generated public key matches private key
os.remove(crt_path)
os.remove(key_path)
os.remove(nopass_key_path)
def test_gen_csr(self) -> None:
key_path, nopass_key_path, crt_path = self._gen_public_private_key()
csr_path = os.path.join(self._tempdir, 'test_gen_public.csr')
pki.gen_csr(csr_path, key_path, 'password', crt_path)
self.assertTrue(os.path.exists(csr_path))
# TODO: Assert CSR is valid for provided crt and key
os.remove(csr_path)
os.remove(crt_path)
os.remove(key_path)
os.remove(nopass_key_path)
def test_sign_csr(self) -> None:
pass
def _gen_public_private_key(self) -> Tuple[str, str, str]:
key_path, nopass_key_path = self._gen_private_key()
crt_path = os.path.join(self._tempdir, 'test_gen_public.crt')
pki.gen_public_key(crt_path, key_path, 'password', '/CN=localhost')
return (key_path, nopass_key_path, crt_path)
def _gen_private_key(self) -> Tuple[str, str]:
key_path = os.path.join(self._tempdir, 'test_gen_private.key')
nopass_key_path = os.path.join(
self._tempdir,
'test_gen_private_nopass.key',
)
pki.gen_private_key(key_path, 'password')
pki.remove_passphrase(key_path, 'password', nopass_key_path)
return (key_path, nopass_key_path)