This repository has been archived by the owner on Mar 13, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 26
/
rsa_utils.py
152 lines (122 loc) · 4.47 KB
/
rsa_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""In-memory RSA key generation and management utils."""
from __future__ import annotations
import contextlib
import functools
import os
import subprocess
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PublicFormat,
PrivateFormat,
)
class RSAKey:
"""In-memory RSA key wrapper."""
def __init__(self):
_rsa_key_obj = generate_private_key(
public_exponent=65537,
key_size=4096,
backend=default_backend(),
)
_rsa_pub_key_obj = _rsa_key_obj.public_key()
self._public_repr = _rsa_pub_key_obj.public_bytes(
encoding=Encoding.PEM,
format=PublicFormat.PKCS1,
).decode()
self._public_repr_openssh = _rsa_pub_key_obj.public_bytes(
encoding=Encoding.OpenSSH,
format=PublicFormat.OpenSSH,
).decode()
_private_rsa_key_repr = _rsa_key_obj.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.TraditionalOpenSSL, # A.K.A. PKCS#1
encryption_algorithm=NoEncryption(),
)
self._ssh_agent_cm = SSHAgent(_private_rsa_key_repr)
@property
def ssh_agent(self) -> SSHAgent:
"""SSH agent CM."""
return self._ssh_agent_cm
@property
def public(self) -> str:
"""String PKCS#1-formatted representation of the public key."""
return self._public_repr
@property
def public_openssh(self) -> str:
"""String OpenSSH-formatted representation of the public key."""
return self._public_repr_openssh
class SSHAgent:
"""SSH agent lifetime manager.
Only usable as a CM. Only holds one RSA key in memory.
"""
def __init__(self, ssh_key: bytes):
self._ssh_key = ssh_key
self._ssh_agent_proc = None
self._ssh_agent_socket = None
def __enter__(self) -> _SubprocessSSHAgentProxy:
ssh_agent_cmd = (
'ssh-agent', # man 1 ssh-agent
'-s', # generate Bourne shell commands on stdout
'-D', # foreground mode
)
ssh_add_cmd = 'ssh-add', '-'
self._ssh_agent_proc = subprocess.Popen(
ssh_agent_cmd,
stdout=subprocess.PIPE, # we need to parse the socket path
text=True, # auto-decode the text from bytes
)
self._ssh_agent_socket = (
self._ssh_agent_proc.
stdout.readline().
partition('; ')[0].
partition('=')[-1]
)
subprocess_proxy = _SubprocessSSHAgentProxy(self._ssh_agent_socket)
subprocess_proxy.check_output(
ssh_add_cmd,
input=self._ssh_key,
stderr=subprocess.DEVNULL,
)
return subprocess_proxy
def __exit__(self, exc_type, exc_val, exc_tb):
ssh_agent_proc = self._ssh_agent_proc
self._ssh_agent_socket = None
self._ssh_agent_proc = None
with contextlib.suppress(IOError, OSError):
ssh_agent_proc.terminate()
return False
def pre_populate_env_kwarg(meth):
"""Pre-populated env arg in decorated methods."""
@functools.wraps(meth)
def method_wrapper(self, *args, **kwargs):
if 'env' not in kwargs:
kwargs['env'] = os.environ.copy()
kwargs['env']['GIT_SSH_COMMAND'] = (
'ssh -2 '
'-F /dev/null '
'-o PreferredAuthentications=publickey '
'-o IdentityFile=/dev/null '
f'-o IdentityAgent="{self._sock}"'
)
kwargs['env']['SSH_AUTH_SOCK'] = self._sock
return meth(self, *args, **kwargs)
return method_wrapper
# pylint: disable=too-few-public-methods
class _SubprocessSSHAgentProxy:
"""Proxy object for calls to subprocess functions."""
def __init__(self, sock):
self._sock = sock
@pre_populate_env_kwarg
def check_call(self, *args, **kwargs):
"""Populate the SSH agent sock into the check_call env."""
return subprocess.check_call(*args, **kwargs)
@pre_populate_env_kwarg
def check_output(self, *args, **kwargs):
"""Populate the SSH agent sock into the check_output env."""
return subprocess.check_output(*args, **kwargs)
@pre_populate_env_kwarg
def run(self, *args, **kwargs):
"""Populate the SSH agent sock into the run env."""
return subprocess.run(*args, **kwargs)