Skip to content

Commit

Permalink
refactor fedora message sending into cosalib/fedora_messaging_request.py
Browse files Browse the repository at this point in the history
This commit re-factors out the code that sends requests to fedora
releng/infra via fedora messaging messages into a library. It can
then be called by the code in src/cmd-sign as well as new consumers
in the future.
  • Loading branch information
dustymabe authored and openshift-merge-robot committed Feb 5, 2020
1 parent 1efe78c commit ec5c0ee
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 147 deletions.
189 changes: 42 additions & 147 deletions src/cmd-sign
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
#!/usr/bin/python3

'''
Implements signing with RoboSignatory via fedora-messaging. To run this,
one needs credentials to the restricted Fedora broker. In a developer
workflow, one can also run it (and RoboSignatory) against a local rabbitmq
instance. For more details, see:
https://fedora-messaging.readthedocs.io/en/latest/quick-start.html
Implements signing with RoboSignatory via fedora-messaging from
the cosalib/fedora_messaging_request library.
'''

import argparse
Expand All @@ -17,39 +13,21 @@ import subprocess
import sys
import tarfile
import tempfile
import threading
import uuid

import multiprocessing as mp

import boto3

from fedora_messaging import message
from fedora_messaging.api import publish, twisted_consume
from fedora_messaging.config import conf

from twisted.internet import reactor

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cosalib.meta import GenericBuildMeta as Meta
from cosalib.builds import Builds
from cosalib.cmdlib import (
get_basearch,
sha256sum_file,
import_ostree_commit)
from cosalib.fedora_messaging_request import send_request_and_wait_for_response

gi.require_version('OSTree', '1.0')
from gi.repository import Gio, OSTree

# these files are part of fedora-messaging
FEDORA_MESSAGING_PUBLIC_CONF = {
'prod': '/etc/fedora-messaging/fedora.toml',
'stg': '/etc/fedora-messaging/fedora.stg.toml',
}
FEDORA_MESSAGING_TOPIC_PREFIX = {
'prod': 'org.fedoraproject.prod.coreos.build.request',
'stg': 'org.fedoraproject.stg.coreos.build.request',
}
# this is really the worst case scenario, it's usually pretty fast otherwise
ROBOSIGNATORY_REQUEST_TIMEOUT_SEC = 60 * 60

Expand Down Expand Up @@ -105,26 +83,15 @@ def cmd_robosignatory(args):
key, val = keyval.split('=', 1) # will throw exception if there's no =
args.extra_keys[key] = val

# Add in a unique request id to the keys to pass
requestid = str(uuid.uuid4())
args.extra_keys['request_id'] = requestid

request = 'ostree-sign' if args.ostree else 'artifacts-sign'

global request_state
request_state = {"status": "pending"}
cond = threading.Condition()
start_consumer_thread(cond, request, requestid)

# these two are different enough that they deserve separate handlers
if args.ostree:
robosign_ostree(args, s3, cond)
robosign_ostree(args, s3)
else:
assert args.images
robosign_images(args, s3, cond)
robosign_images(args, s3)


def robosign_ostree(args, s3, cond):
def robosign_ostree(args, s3):
build = Meta(build=args.build)
builds = Builds()
builddir = builds.get_build_dir(args.build)
Expand All @@ -143,12 +110,21 @@ def robosign_ostree(args, s3, cond):
s3.upload_file(build_dir_commit_obj, args.bucket, commit_key)
s3.delete_object(Bucket=args.bucket, Key=commitmeta_key)

send_message(args, 'ostree-sign', {
'commit_object': f's3://{args.bucket}/{commit_key}',
'checksum': f'sha256:{checksum}'
})
response = send_request_and_wait_for_response(
request_type='ostree-sign',
config=args.fedmsg_conf,
request_timeout=ROBOSIGNATORY_REQUEST_TIMEOUT_SEC,
environment=fedenv,
body={
'build_id': args.build,
'basearch': get_basearch(),
'commit_object': f's3://{args.bucket}/{commit_key}',
'checksum': f'sha256:{checksum}',
**args.extra_keys
}
)

validate_response(cond)
validate_response(response)

# download back sig and verify it in a throwaway repo
print(f"Verifying OSTree signature")
Expand Down Expand Up @@ -214,7 +190,7 @@ def robosign_ostree(args, s3, cond):
import_ostree_commit('tmp/repo', checksum, commit_tarfile, force=True)


def robosign_images(args, s3, cond):
def robosign_images(args, s3):
build = Meta(build=args.build)
builds = Builds()
builddir = builds.get_build_dir(args.build)
Expand All @@ -228,8 +204,20 @@ def robosign_images(args, s3, cond):
'checksum': f'sha256:{img["sha256"]}'
} for img in build['images'].values()]

send_message(args, 'artifacts-sign', {'artifacts': artifacts})
validate_response(cond)
response = send_request_and_wait_for_response(
request_type='artifacts-sign',
config=args.fedmsg_conf,
request_timeout=ROBOSIGNATORY_REQUEST_TIMEOUT_SEC,
environment=fedenv,
body={
'build_id': args.build,
'basearch': get_basearch(),
'artifacts': artifacts,
**args.extra_keys
}
)

validate_response(response)

# download sigs and verify (use /tmp to avoid gpg hitting ENAMETOOLONG)
with tempfile.TemporaryDirectory(prefix="cosa-sign-") as d:
Expand Down Expand Up @@ -271,106 +259,13 @@ def get_bucket_and_prefix(path):
return split


def get_request_topic(request):
return f'{FEDORA_MESSAGING_TOPIC_PREFIX[fedenv]}.{request}'


def get_request_finished_topic(request):
return get_request_topic(request) + '.finished'


def send_message(args, request, body):
print(f"Sending {request} request for build {args.build}")
# This is a bit hacky; we fork to publish the message here so that we can
# load the publishing fedora-messaging config. The TL;DR is: we need auth
# to publish, but we need to use the public endpoint for consuming so we
# can create temporary queues. We use the 'spawn' start method so we don't
# inherit anything by default (like the Twisted state).
ctx = mp.get_context('spawn')
p = ctx.Process(target=send_message_impl, args=(args, request, body))
p.start()
p.join()


def send_message_impl(args, request, body):
if args.fedmsg_conf:
conf.load_config(args.fedmsg_conf)
publish(message.Message(topic=get_request_topic(request), body={
'build_id': args.build,
'basearch': get_basearch(),
**args.extra_keys,
**body
}))


def validate_response(cond):
with cond:
print("Waiting for response from RoboSignatory")
cond.wait_for(lambda: request_state['status'] != 'pending',
timeout=ROBOSIGNATORY_REQUEST_TIMEOUT_SEC)
if request_state['status'] == 'pending':
raise Exception("Timed out waiting for RoboSignatory")
if request_state['status'].lower() == 'failure':
# https://pagure.io/robosignatory/pull-request/38
if 'failure-message' not in request_state:
raise Exception("Signing failed")
raise Exception(f"Signing failed: {request_state['failure-message']}")
assert request_state['status'].lower() == 'success', str(request_state)


def start_consumer_thread(cond, request, requestid):
registered = threading.Event()
t = threading.Thread(target=watch_finished_messages,
args=(cond, registered, request, requestid),
daemon=True)
t.start()
registered.wait()
print(f"Successfully started consumer thread")


def watch_finished_messages(cond, registered, request, requestid):
def callback(message):
if 'request_id' not in message.body or \
message.body['request_id'] != requestid:
return
with cond:
global request_state
request_state = message.body
cond.notify()

queue = str(uuid.uuid4())

def registered_cb(consumers):
for consumer in consumers:
if consumer.queue == queue:
registered.set()
break

def error_cb(failure):
print(f"Consumer hit failure {failure}")
reactor.stop() # pylint: disable=E1101

# use the public config for this; see related comment in send_message
conf.load_config(FEDORA_MESSAGING_PUBLIC_CONF[fedenv])

bindings = [{
'exchange': 'amq.topic',
'queue': queue,
'routing_keys': [get_request_finished_topic(request)]
}]
queues = {
queue: {
"durable": False,
"auto_delete": True,
"exclusive": True,
"arguments": {}
}
}

consumers = twisted_consume(callback, bindings=bindings, queues=queues)
consumers.addCallback(registered_cb)
consumers.addErrback(error_cb)
reactor.run(installSignalHandlers=False) # pylint: disable=E1101
def validate_response(response):
if response['status'].lower() == 'failure':
# https://pagure.io/robosignatory/pull-request/38
if 'failure-message' not in response:
raise Exception("Signing failed")
raise Exception(f"Signing failed: {response['failure-message']}")
assert response['status'].lower() == 'success', str(response)


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit ec5c0ee

Please sign in to comment.