Skip to content

Commit

Permalink
qubes: reorganise API protocols
Browse files Browse the repository at this point in the history
Now instantiating API servers is handled by common function. This is,
among other reasons, for creating ad-hoc sockets for tests.
  • Loading branch information
woju committed Jun 6, 2017
1 parent d9f5192 commit 65a7326
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 182 deletions.
177 changes: 176 additions & 1 deletion qubes/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.

import asyncio
import functools

import os
import shutil
import struct

class ProtocolError(AssertionError):
'''Raised when something is wrong with data received'''
Expand Down Expand Up @@ -175,3 +178,175 @@ def fire_event_for_filter(self, iterable, **kwargs):
'''Fire an event on the source qube to filter for permission'''
return apply_filters(iterable,
self.fire_event_for_permission(**kwargs))


class QubesDaemonProtocol(asyncio.Protocol):
buffer_size = 65536
header = struct.Struct('Bx')

def __init__(self, handler, *args, app, debug=False, **kwargs):
super().__init__(*args, **kwargs)
self.handler = handler
self.app = app
self.untrusted_buffer = io.BytesIO()
self.len_untrusted_buffer = 0
self.transport = None
self.debug = debug
self.event_sent = False
self.mgmt = None

def connection_made(self, transport):
self.transport = transport

def connection_lost(self, exc):
self.untrusted_buffer.close()
# for cancellable operation, interrupt it, otherwise it will do nothing
if self.mgmt is not None:
self.mgmt.cancel()
self.transport = None

def data_received(self, untrusted_data): # pylint: disable=arguments-differ
if self.len_untrusted_buffer + len(untrusted_data) > self.buffer_size:
self.app.log.warning('request too long')
self.transport.abort()
self.untrusted_buffer.close()
return

self.len_untrusted_buffer += \
self.untrusted_buffer.write(untrusted_data)

def eof_received(self):
try:
src, method, dest, arg, untrusted_payload = \
self.untrusted_buffer.getvalue().split(b'\0', 4)
except ValueError:
self.app.log.warning('framing error')
self.transport.abort()
return
finally:
self.untrusted_buffer.close()

asyncio.ensure_future(self.respond(
src, method, dest, arg, untrusted_payload=untrusted_payload))

return True

@asyncio.coroutine
def respond(self, src, method, dest, arg, *, untrusted_payload):
try:
self.mgmt = self.handler(self.app, src, method, dest, arg,
self.send_event)
response = yield from self.mgmt.execute(
untrusted_payload=untrusted_payload)
assert not (self.event_sent and response)
if self.transport is None:
return

# except clauses will fall through to transport.abort() below

except qubes.api.PermissionDenied:
self.app.log.warning(
'permission denied for call %s+%s (%s → %s) '
'with payload of %d bytes',
method, arg, src, dest, len(untrusted_payload))

except qubes.api.ProtocolError:
self.app.log.warning(
'protocol error for call %s+%s (%s → %s) '
'with payload of %d bytes',
method, arg, src, dest, len(untrusted_payload))

except qubes.exc.QubesException as err:
msg = ('%r while calling '
'src=%r method=%r dest=%r arg=%r len(untrusted_payload)=%d')

if self.debug:
self.app.log.exception(msg,
err, src, method, dest, arg, len(untrusted_payload))
else:
self.app.log.info(msg,
err, src, method, dest, arg, len(untrusted_payload))
if self.transport is not None:
self.send_exception(err)
self.transport.write_eof()
self.transport.close()
return

except Exception: # pylint: disable=broad-except
self.app.log.exception(
'unhandled exception while calling '
'src=%r method=%r dest=%r arg=%r len(untrusted_payload)=%d',
src, method, dest, arg, len(untrusted_payload))

else:
if not self.event_sent:
self.send_response(response)
try:
self.transport.write_eof()
except NotImplementedError:
pass
self.transport.close()
return

# this is reached if from except: blocks; do not put it in finally:,
# because this will prevent the good case from sending the reply
self.transport.abort()

def send_header(self, *args):
self.transport.write(self.header.pack(*args))

def send_response(self, content):
assert not self.event_sent
self.send_header(0x30)
if content is not None:
self.transport.write(content.encode('utf-8'))

def send_event(self, subject, event, **kwargs):
self.event_sent = True
self.send_header(0x31)

if subject is not self.app:
self.transport.write(subject.name.encode('ascii'))
self.transport.write(b'\0')

self.transport.write(event.encode('ascii') + b'\0')

for k, v in kwargs.items():
self.transport.write('{}\0{}\0'.format(k, str(v)).encode('ascii'))
self.transport.write(b'\0')

def send_exception(self, exc):
self.send_header(0x32)

self.transport.write(type(exc).__name__.encode() + b'\0')

if self.debug:
self.transport.write(''.join(traceback.format_exception(
type(exc), exc, exc.__traceback__)).encode('utf-8'))
self.transport.write(b'\0')

self.transport.write(str(exc).encode('utf-8') + b'\0')


_umask_lock = asyncio.Lock()

@asyncio.coroutine
def create_server(sockpath, handler, app, debug=False, *, loop=None):
loop = loop or asyncio.get_event_loop()
try:
os.unlink(sockpath)
except FileNotFoundError:
pass

with (yield from _umask_lock):
old_umask = os.umask(0o007)
try:
server = yield from loop.create_unix_server(
functools.partial(QubesDaemonProtocol,
handler, app=app, debug=debug),
sockpath)
finally:
os.umask(old_umask)

shutil.chown(sockpath, group='qubes')
return server
2 changes: 2 additions & 0 deletions qubes/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import qubes.vm
import qubes.vm.qubesvm

QUBESD_ADMIN_SOCK = '/var/run/qubesd.sock'


class QubesMgmtEventsDispatcher(object):
def __init__(self, filters, send_event):
Expand Down
2 changes: 2 additions & 0 deletions qubes/api/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import qubes.api.admin
import qubes.vm.dispvm

QUBESD_INTERNAL_SOCK = '/var/run/qubesd.internal.sock'


class QubesInternalAPI(qubes.api.AbstractQubesAPI):
''' Communication interface for dom0 components,
Expand Down
Loading

0 comments on commit 65a7326

Please sign in to comment.