Skip to content

Commit

Permalink
qubes/api: refactor creating multiple qubesd sockets
Browse files Browse the repository at this point in the history
Now there is a single function to do this, shared with tests.
  • Loading branch information
woju authored and marmarek committed Jun 20, 2017
1 parent bec58fc commit 96a66ac
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 55 deletions.
72 changes: 56 additions & 16 deletions qubes/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
# with this program; if not, see <http://www.gnu.org/licenses/>.

import asyncio
import errno
import functools
import io
import os
import shutil
import socket
import struct
import traceback

Expand Down Expand Up @@ -105,6 +107,10 @@ class AbstractQubesAPI(object):
There are also two helper functions for firing events associated with API
calls.
'''

#: the preferred socket location (to be overridden in child's class)
SOCKNAME = None

def __init__(self, app, src, method_name, dest, arg, send_event=None):
#: :py:class:`qubes.Qubes` object
self.app = app
Expand Down Expand Up @@ -332,27 +338,61 @@ def send_exception(self, exc):
self.transport.write(str(exc).encode('utf-8') + b'\0')


_umask_lock = asyncio.Lock()

@asyncio.coroutine
def create_server(sockpath, handler, app, debug=False, *, loop=None):
def create_servers(*args, force=False, loop=None, **kwargs):
'''Create multiple Qubes API servers
:param qubes.Qubes app: the app that is a backend of the servers
:param bool force: if :py:obj:`True`, unconditionaly remove existing \
sockets; if :py:obj:`False`, raise an error if there is some process \
listening to such socket
:param asyncio.Loop loop: loop
*args* are supposed to be classess inheriting from
:py:class:`AbstractQubesAPI`
*kwargs* (like *app* or *debug* for example) are passed to
:py:class:`QubesDaemonProtocol` constructor
'''
loop = loop or asyncio.get_event_loop()

servers = []
old_umask = os.umask(0o007)
try:
os.unlink(sockpath)
except FileNotFoundError:
pass
# XXX this can be optimised with asyncio.wait() to start servers in
# parallel, but I currently don't see the need
for handler in args:
sockpath = handler.SOCKNAME
assert sockpath is not None, \
'SOCKNAME needs to be overloaded in {}'.format(
type(handler).__name__)

if os.path.exists(sockpath):
if force:
os.unlink(sockpath)
else:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
sock.connect(sockpath)
except ConnectionRefusedError:
# dead socket, remove it anyway
os.unlink(sockpath)
else:
# woops, someone is listening
sock.close()
raise FileExistsError(errno.EEXIST,
'socket already exists: {!r}'.format(sockpath))

with (yield from _umask_lock):
old_umask = os.umask(0o007)
try:
server = yield from loop.create_unix_server(
functools.partial(QubesDaemonProtocol,
handler, app=app, debug=debug),
functools.partial(QubesDaemonProtocol, handler, **kwargs),
sockpath)
finally:
os.umask(old_umask)

for sock in server.sockets:
shutil.chown(sock.getsockname(), group='qubes')
for sock in server.sockets:
shutil.chown(sock.getsockname(), group='qubes')

servers.append(server)

finally:
os.umask(old_umask)

return server
return servers
4 changes: 2 additions & 2 deletions qubes/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import qubes.vm
import qubes.vm.qubesvm

QUBESD_ADMIN_SOCK = '/var/run/qubesd.sock'


class QubesMgmtEventsDispatcher(object):
def __init__(self, filters, send_event):
Expand Down Expand Up @@ -75,6 +73,8 @@ class QubesAdminAPI(qubes.api.AbstractQubesAPI):
https://www.qubes-os.org/doc/mgmt1/
'''

SOCKNAME = '/var/run/qubesd.sock'

@qubes.api.method('admin.vmclass.List', no_payload=True)
@asyncio.coroutine
def vmclass_list(self):
Expand Down
9 changes: 1 addition & 8 deletions qubes/api/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,12 @@
import qubes.vm.adminvm
import qubes.vm.dispvm

QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock'


class QubesInternalAPI(qubes.api.AbstractQubesAPI):
''' Communication interface for dom0 components,
by design the input here is trusted.'''
#
# PRIVATE METHODS, not to be called via RPC
#

#
# ACTUAL RPC CALLS
#
SOCKNAME = '/var/run/qubesd.internal.sock'

@qubes.api.method('internal.GetSystemInfo', no_payload=True)
@asyncio.coroutine
Expand Down
4 changes: 2 additions & 2 deletions qubes/api/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import qubes.api.admin
import qubes.vm.dispvm

QUBESD_MISC_SOCK = '/var/run/qubesd.misc.sock'


class QubesMiscAPI(qubes.api.AbstractQubesAPI):
SOCKNAME = '/var/run/qubesd.misc.sock'

@qubes.api.method('qubes.FeaturesRequest', no_payload=True)
@asyncio.coroutine
def qubes_features_request(self):
Expand Down
22 changes: 11 additions & 11 deletions qubes/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
# command not found; let's assume we're outside
pass


def skipUnlessDom0(test_item):
'''Decorator that skips test outside dom0.
Expand Down Expand Up @@ -591,12 +590,11 @@ def setUp(self):
)
os.environ['QUBES_XML_PATH'] = XMLPATH

self.qrexec_policy_server = self.loop.run_until_complete(
qubes.api.create_server(
qubes.api.internal.QUBESD_INTERNAL_SOCK,
self.qubesd = self.loop.run_until_complete(
qubes.api.create_servers(
qubes.api.admin.QubesAdminAPI,
qubes.api.internal.QubesInternalAPI,
app=self.app,
debug=True))
app=self.app, debug=True))

def init_default_template(self, template=None):
if template is None:
Expand Down Expand Up @@ -680,11 +678,13 @@ def save_and_reload_db(self):
self.reload_db()

def tearDown(self):
# close the server before super(), because that might close the loop
for sock in self.qrexec_policy_server.sockets:
os.unlink(sock.getsockname())
self.qrexec_policy_server.close()
self.loop.run_until_complete(self.qrexec_policy_server.wait_closed())
# close the servers before super(), because that might close the loop
for server in self.qubesd:
for sock in server.sockets:
os.unlink(sock.getsockname())
server.close()
self.loop.run_until_complete(asyncio.wait([
server.wait_closed() for server in self.qubesd]))

super(SystemTestsMixin, self).tearDown()
self.remove_test_vms()
Expand Down
2 changes: 1 addition & 1 deletion qubes/tests/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def main():
logging.root.addHandler(ha_kmsg)

if not args.allow_running_along_qubesd \
and os.path.exists(qubes.api.admin.QUBESD_ADMIN_SOCK):
and os.path.exists(qubes.api.admin.QubesAdminAPI.SOCKNAME):
parser.error('refusing to run until qubesd is disabled')

runner = unittest.TextTestRunner(stream=sys.stdout,
Expand Down
20 changes: 5 additions & 15 deletions qubes/tools/qubesd.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,11 @@ def main(args=None):

args.app.vmm.register_event_handlers(args.app)

servers = []
servers.append(loop.run_until_complete(qubes.api.create_server(
qubes.api.admin.QUBESD_ADMIN_SOCK,
servers = loop.run_until_complete(qubes.api.create_servers(
qubes.api.admin.QubesAdminAPI,
app=args.app, debug=args.debug)))
servers.append(loop.run_until_complete(qubes.api.create_server(
qubes.api.internal.QUBESD_INTERNAL_SOCK,
qubes.api.internal.QubesInternalAPI,
app=args.app, debug=args.debug)))
servers.append(loop.run_until_complete(qubes.api.create_server(
qubes.api.misc.QUBESD_MISC_SOCK,
qubes.api.misc.QubesMiscAPI,
app=args.app, debug=args.debug)))
app=args.app, debug=args.debug))

socknames = []
for server in servers:
Expand All @@ -71,11 +63,9 @@ def main(args=None):
try:
os.unlink(sockname)
except FileNotFoundError:
# XXX
# We had our socket unlinked by somebody else, possibly other
# qubesd instance. That also means we probably unlinked their
# socket when creating our server...
pass
args.app.log.warning(
'socket {} got unlinked sometime before shutdown'.format(
sockname))
finally:
loop.close()

Expand Down

0 comments on commit 96a66ac

Please sign in to comment.