Skip to content

Commit

Permalink
Replace auto_keyring_sync by source_keyring_dir
Browse files Browse the repository at this point in the history
The latter defaults to the GnuPG home directory, and is used as a source
of secret subkeys.
  • Loading branch information
DemiMarie committed Jan 28, 2023
1 parent 6dc661d commit 5e94eef
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
38 changes: 22 additions & 16 deletions splitgpg2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ class GpgServer:
agent_socket_path: Optional[str]
agent_reader: Optional[asyncio.StreamReader]
agent_writer: Optional[asyncio.StreamWriter]
auto_keyring_sync: bool
source_keyring_dir: Optional[str]

cache_nonce_regex: re.Pattern[bytes] = re.compile(rb'\A[0-9A-F]{24}\Z')

Expand All @@ -234,12 +234,7 @@ def __init__(self, reader: asyncio.StreamReader,
#: signal those Futures when connection is terminated
self.notify_on_disconnect = set()
self.log_io_enable = False
try:
gnupghome = os.environ['GNUPGHOME']
except KeyError:
gnupghome = os.path.join(pwd.getpwuid(os.getuid()).pw_name,
'.gnupg')
self.gnupghome = gnupghome
self.gnupghome = xdg.BaseDirectory.xdg_config_home + '/qubes-split-gpg2/gnupg'

self.client_reader = reader
self.client_writer = writer
Expand All @@ -256,7 +251,6 @@ def __init__(self, reader: asyncio.StreamReader,

self.seen_data = False
self.config_loaded = False
self.auto_keyring_sync = True

if debug_log:
handler = logging.FileHandler(debug_log)
Expand Down Expand Up @@ -306,8 +300,19 @@ def load_config(self, config) -> None:

self.allow_keygen = self._parse_bool_val(
config.get('allow_keygen', 'no'), 'allow_keygen')
self.auto_keyring_sync = self._parse_bool_val(
config.get('auto_keyring_sync', 'yes'), 'auto_keyring_sync')
source_keyring_dir = config.get('source_keyring_dir', None)
if source_keyring_dir != 'no':
if source_keyring_dir is None:
source_keyring_dir = os.getenv('GNUPGHOME')
if source_keyring_dir is None:
source_keyring_dir = (pwd.getpwuid(os.getuid()).pw_dir +
'/.gnupg')
if not source_keyring_dir.startswith('/'):
raise ValueError('Source keyring directory {!r} is not '
'absolute!'.format(source_keyring_dir))
self.source_keyring_dir = source_keyring_dir
else:
self.source_keyring_dir = None

if 'isolated_gnupghome_dirs' in config:
self.gnupghome = os.path.join(
Expand All @@ -325,7 +330,7 @@ def load_config(self, config) -> None:
'verbose_notifications',
'allow_keygen',
'gnupghome',
'auto_keyring_sync',
'source_keyring_dir',
'isolated_gnupghome_dirs',
# handled in main()
'debug_log',
Expand All @@ -335,15 +340,16 @@ def load_config(self, config) -> None:
self.log.warning('Unsupported config option: %s', option)
self.log.info('Using GnuPG home directory %s', self.gnupghome)
os.makedirs(self.gnupghome, 0o700, exist_ok=True)
if self.auto_keyring_sync:
new_home_directory = os.path.join(self.gnupghome, 'qubes-auto-keyring')
if self.source_keyring_dir is not None:
new_home_directory = self.gnupghome + '/qubes-auto-keyring'
try:
os.mkdir(new_home_directory, 0o700)
except FileExistsError:
pass
self.gnupghome = new_home_directory
default_gnupg_home = os.path.join(pwd.getpwuid(os.getuid()).pw_dir, '.gnupg')
stat1 = os.stat(default_gnupg_home)
if source_keyring_dir == new_home_directory:
return
stat1 = os.stat(source_keyring_dir)
stat2 = os.stat(new_home_directory)
if stat1.st_mtime <= stat2.st_mtime:
return
Expand All @@ -356,7 +362,7 @@ def load_config(self, config) -> None:
xferflags = ('gpg', '--no-armor', '--batch', '--with-colons',
'--no-tty', '--disable-dirmngr')
export_cmd = xferflags + ('--export-secret-subkeys', '--homedir',
default_gnupg_home)
self.source_keyring_dir)
import_cmd = xferflags + ('--import', '--homedir', self.gnupghome,)
with subprocess.Popen(export_cmd,
stdout=subprocess.PIPE,
Expand Down
15 changes: 6 additions & 9 deletions splitgpg2/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,13 +627,12 @@ def test_000_basic(self):
pksign_autoaccept = 300
verbose_notifications = yes
allow_keygen = yes
auto_keyring_sync = no
""")
gpg_server = GpgServer(reader, writer, 'testvm')
gpg_server.load_config(config['client:testvm'])
self.assertTrue(gpg_server.allow_keygen)
self.assertTrue(gpg_server.verbose_notifications)
self.assertEqual(gpg_server.gnupghome, self.server_gpghome)
self.assertEqual(gpg_server.gnupghome, self.server_gpghome + '/qubes-auto-keyring')
self.assertEqual(gpg_server.timer_delay['PKSIGN'], 300)
self.assertEqual(gpg_server.timer_delay['PKDECRYPT'], -1)

Expand All @@ -646,11 +645,10 @@ def test_001_per_client_gpghome(self):
f"""
[DEFAULT]
isolated_gnupghome_dirs = {self.gpg_dir.name}
auto_keyring_sync = no
""")
gpg_server = GpgServer(reader, writer, 'server')
gpg_server.load_config(config['DEFAULT'])
self.assertEqual(gpg_server.gnupghome, self.server_gpghome)
self.assertEqual(gpg_server.gnupghome, self.server_gpghome + '/qubes-auto-keyring')

def test_002_invalid(self):
reader = mock.Mock()
Expand Down Expand Up @@ -683,14 +681,14 @@ def test_003_option_typo(self):
config = configparser.ConfigParser()
config.read_string("""[DEFAULT]
autoaccept = no
auto_keyring_sync = no
no_such_option = 1
""")
gpg_server.load_config(config['DEFAULT'])
# warns about unsupported option only
self.assertEquals(gpg_server.log.mock_calls, [
mock.call.warning('Unsupported config option: %s', 'no_such_option'),
mock.call.info('Using GnuPG home directory %s', gpg_server.gnupghome),
mock.call.info('Using GnuPG home directory %s',
gpg_server.gnupghome[:-len('/qubes-auto-keyring')]),
])

def test_010_gpghome(self):
Expand All @@ -702,7 +700,6 @@ def test_010_gpghome(self):
f"""
[DEFAULT]
gnupghome = {self.server_gpghome}
auto_keyring_sync = no
[client:testvm]
""")
self.server = self.loop.run_until_complete(
Expand All @@ -711,8 +708,8 @@ def test_010_gpghome(self):
self.socket_path))

p = self.loop.run_until_complete(asyncio.create_subprocess_exec(
'gpg', '--with-colons', '-K', self.key_uid,
env=self.test_environ,
'gpg', '--with-colons', f'--homedir={self.server_gpghome}',
'-K', '--', self.key_uid,
stderr=subprocess.PIPE, stdout=subprocess.PIPE))
stdout, stderr = self.loop.run_until_complete(p.communicate())
if p.returncode:
Expand Down

0 comments on commit 5e94eef

Please sign in to comment.