diff --git a/src/cmd-sign b/src/cmd-sign index b7a230ee82..594669a9d9 100755 --- a/src/cmd-sign +++ b/src/cmd-sign @@ -1,12 +1,8 @@ #!/usr/bin/python3 ''' - Implements signing with RoboSignatory via fedora-messaging. To run this, - one needs credentials to the restricted Fedora broker. In a developer - workflow, one can also run it (and RoboSignatory) against a local rabbitmq - instance. For more details, see: - - https://fedora-messaging.readthedocs.io/en/latest/quick-start.html + Implements signing with RoboSignatory via fedora-messaging from + the cosalib/fedora_messaging_request library. ''' import argparse @@ -17,19 +13,9 @@ import subprocess import sys import tarfile import tempfile -import threading -import uuid - -import multiprocessing as mp import boto3 -from fedora_messaging import message -from fedora_messaging.api import publish, twisted_consume -from fedora_messaging.config import conf - -from twisted.internet import reactor - sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from cosalib.meta import GenericBuildMeta as Meta from cosalib.builds import Builds @@ -37,19 +23,11 @@ from cosalib.cmdlib import ( get_basearch, sha256sum_file, import_ostree_commit) +from cosalib.fedora_messaging_request import send_request_and_wait_for_response gi.require_version('OSTree', '1.0') from gi.repository import Gio, OSTree -# these files are part of fedora-messaging -FEDORA_MESSAGING_PUBLIC_CONF = { - 'prod': '/etc/fedora-messaging/fedora.toml', - 'stg': '/etc/fedora-messaging/fedora.stg.toml', -} -FEDORA_MESSAGING_TOPIC_PREFIX = { - 'prod': 'org.fedoraproject.prod.coreos.build.request', - 'stg': 'org.fedoraproject.stg.coreos.build.request', -} # this is really the worst case scenario, it's usually pretty fast otherwise ROBOSIGNATORY_REQUEST_TIMEOUT_SEC = 60 * 60 @@ -105,26 +83,15 @@ def cmd_robosignatory(args): key, val = keyval.split('=', 1) # will throw exception if there's no = args.extra_keys[key] = val - # Add in a unique request id to the keys to pass - requestid = str(uuid.uuid4()) - args.extra_keys['request_id'] = requestid - - request = 'ostree-sign' if args.ostree else 'artifacts-sign' - - global request_state - request_state = {"status": "pending"} - cond = threading.Condition() - start_consumer_thread(cond, request, requestid) - # these two are different enough that they deserve separate handlers if args.ostree: - robosign_ostree(args, s3, cond) + robosign_ostree(args, s3) else: assert args.images - robosign_images(args, s3, cond) + robosign_images(args, s3) -def robosign_ostree(args, s3, cond): +def robosign_ostree(args, s3): build = Meta(build=args.build) builds = Builds() builddir = builds.get_build_dir(args.build) @@ -143,12 +110,21 @@ def robosign_ostree(args, s3, cond): s3.upload_file(build_dir_commit_obj, args.bucket, commit_key) s3.delete_object(Bucket=args.bucket, Key=commitmeta_key) - send_message(args, 'ostree-sign', { - 'commit_object': f's3://{args.bucket}/{commit_key}', - 'checksum': f'sha256:{checksum}' - }) + response = send_request_and_wait_for_response( + request_type='ostree-sign', + config=args.fedmsg_conf, + request_timeout=ROBOSIGNATORY_REQUEST_TIMEOUT_SEC, + environment=fedenv, + body={ + 'build_id': args.build, + 'basearch': get_basearch(), + 'commit_object': f's3://{args.bucket}/{commit_key}', + 'checksum': f'sha256:{checksum}', + **args.extra_keys + } + ) - validate_response(cond) + validate_response(response) # download back sig and verify it in a throwaway repo print(f"Verifying OSTree signature") @@ -214,7 +190,7 @@ def robosign_ostree(args, s3, cond): import_ostree_commit('tmp/repo', checksum, commit_tarfile, force=True) -def robosign_images(args, s3, cond): +def robosign_images(args, s3): build = Meta(build=args.build) builds = Builds() builddir = builds.get_build_dir(args.build) @@ -228,8 +204,20 @@ def robosign_images(args, s3, cond): 'checksum': f'sha256:{img["sha256"]}' } for img in build['images'].values()] - send_message(args, 'artifacts-sign', {'artifacts': artifacts}) - validate_response(cond) + response = send_request_and_wait_for_response( + request_type='artifacts-sign', + config=args.fedmsg_conf, + request_timeout=ROBOSIGNATORY_REQUEST_TIMEOUT_SEC, + environment=fedenv, + body={ + 'build_id': args.build, + 'basearch': get_basearch(), + 'artifacts': artifacts, + **args.extra_keys + } + ) + + validate_response(response) # download sigs and verify (use /tmp to avoid gpg hitting ENAMETOOLONG) with tempfile.TemporaryDirectory(prefix="cosa-sign-") as d: @@ -271,106 +259,13 @@ def get_bucket_and_prefix(path): return split -def get_request_topic(request): - return f'{FEDORA_MESSAGING_TOPIC_PREFIX[fedenv]}.{request}' - - -def get_request_finished_topic(request): - return get_request_topic(request) + '.finished' - - -def send_message(args, request, body): - print(f"Sending {request} request for build {args.build}") - # This is a bit hacky; we fork to publish the message here so that we can - # load the publishing fedora-messaging config. The TL;DR is: we need auth - # to publish, but we need to use the public endpoint for consuming so we - # can create temporary queues. We use the 'spawn' start method so we don't - # inherit anything by default (like the Twisted state). - ctx = mp.get_context('spawn') - p = ctx.Process(target=send_message_impl, args=(args, request, body)) - p.start() - p.join() - - -def send_message_impl(args, request, body): - if args.fedmsg_conf: - conf.load_config(args.fedmsg_conf) - publish(message.Message(topic=get_request_topic(request), body={ - 'build_id': args.build, - 'basearch': get_basearch(), - **args.extra_keys, - **body - })) - - -def validate_response(cond): - with cond: - print("Waiting for response from RoboSignatory") - cond.wait_for(lambda: request_state['status'] != 'pending', - timeout=ROBOSIGNATORY_REQUEST_TIMEOUT_SEC) - if request_state['status'] == 'pending': - raise Exception("Timed out waiting for RoboSignatory") - if request_state['status'].lower() == 'failure': - # https://pagure.io/robosignatory/pull-request/38 - if 'failure-message' not in request_state: - raise Exception("Signing failed") - raise Exception(f"Signing failed: {request_state['failure-message']}") - assert request_state['status'].lower() == 'success', str(request_state) - - -def start_consumer_thread(cond, request, requestid): - registered = threading.Event() - t = threading.Thread(target=watch_finished_messages, - args=(cond, registered, request, requestid), - daemon=True) - t.start() - registered.wait() - print(f"Successfully started consumer thread") - - -def watch_finished_messages(cond, registered, request, requestid): - def callback(message): - if 'request_id' not in message.body or \ - message.body['request_id'] != requestid: - return - with cond: - global request_state - request_state = message.body - cond.notify() - - queue = str(uuid.uuid4()) - - def registered_cb(consumers): - for consumer in consumers: - if consumer.queue == queue: - registered.set() - break - - def error_cb(failure): - print(f"Consumer hit failure {failure}") - reactor.stop() # pylint: disable=E1101 - - # use the public config for this; see related comment in send_message - conf.load_config(FEDORA_MESSAGING_PUBLIC_CONF[fedenv]) - - bindings = [{ - 'exchange': 'amq.topic', - 'queue': queue, - 'routing_keys': [get_request_finished_topic(request)] - }] - queues = { - queue: { - "durable": False, - "auto_delete": True, - "exclusive": True, - "arguments": {} - } - } - - consumers = twisted_consume(callback, bindings=bindings, queues=queues) - consumers.addCallback(registered_cb) - consumers.addErrback(error_cb) - reactor.run(installSignalHandlers=False) # pylint: disable=E1101 +def validate_response(response): + if response['status'].lower() == 'failure': + # https://pagure.io/robosignatory/pull-request/38 + if 'failure-message' not in response: + raise Exception("Signing failed") + raise Exception(f"Signing failed: {response['failure-message']}") + assert response['status'].lower() == 'success', str(response) if __name__ == '__main__': diff --git a/src/cosalib/fedora_messaging_request.py b/src/cosalib/fedora_messaging_request.py new file mode 100644 index 0000000000..ed69270a99 --- /dev/null +++ b/src/cosalib/fedora_messaging_request.py @@ -0,0 +1,160 @@ +#!/usr/bin/python3 + +''' + Implements sending messages via fedora-messaging. To send messages + one needs credentials to the restricted Fedora broker. In a developer + workflow, one can also run it against a local rabbitmq instance. + For more details, see: + + https://fedora-messaging.readthedocs.io/en/latest/quick-start.html +''' + +import copy +import threading +import uuid + +import multiprocessing as mp + +from fedora_messaging import message +from fedora_messaging.api import publish, twisted_consume +from fedora_messaging.config import conf + +from twisted.internet import reactor + +# these files are part of fedora-messaging +FEDORA_MESSAGING_PUBLIC_CONF = { + 'prod': '/etc/fedora-messaging/fedora.toml', + 'stg': '/etc/fedora-messaging/fedora.stg.toml', +} +# https://apps.fedoraproject.org/datagrepper/raw?topic=org.fedoraproject.prod.coreos.build.request.ostree-sign&delta=100000 +# https://apps.fedoraproject.org/datagrepper/raw?topic=org.fedoraproject.prod.coreos.build.request.artifacts-sign&delta=100000 +FEDORA_MESSAGING_TOPIC_PREFIX = { + 'prod': 'org.fedoraproject.prod.coreos.build.request', + 'stg': 'org.fedoraproject.stg.coreos.build.request', +} + +# Default to timeout after 60 seconds +DEFAULT_REQUEST_TIMEOUT_SEC = 60 + + +def send_request_and_wait_for_response(request_type, + config=None, + environment='prod', + request_timeout=DEFAULT_REQUEST_TIMEOUT_SEC, + body={}): + # Generate a unique id for this request + request_id = str(uuid.uuid4()) + + # We'll watch for the request response in a thread. Here we create a + # request_state variable to pass information back and forth and we + # use threading.Condition() to wake up other threads waiting on + # the condition. + global request_state + request_state = {"status": "pending"} + cond = threading.Condition() + start_consumer_thread(cond, request_type, request_id, environment) + + # Send the message/request + send_message(config=config, + request_type=request_type, + environment=environment, + body={**body, 'request_id': request_id}) + # Wait for the response to come back + return wait_for_response(cond, request_timeout) + + +def get_request_topic(request_type, environment): + return f'{FEDORA_MESSAGING_TOPIC_PREFIX[environment]}.{request_type}' + + +def get_request_finished_topic(request_type, environment): + return get_request_topic(request_type, environment) + '.finished' + + +def send_message(config, request_type, environment, body): + print(f"Sending {request_type} request for build {body['build_id']}") + # This is a bit hacky; we fork to publish the message here so that we can + # load the publishing fedora-messaging config. The TL;DR is: we need auth + # to publish, but we need to use the public endpoint for consuming so we + # can create temporary queues. We use the 'spawn' start method so we don't + # inherit anything by default (like the Twisted state). + ctx = mp.get_context('spawn') + p = ctx.Process(target=send_message_impl, + args=(config, request_type, environment, body)) + p.start() + p.join() + + +def send_message_impl(config, request_type, environment, body): + if config: + conf.load_config(config) + publish( + message.Message(body=body, topic=get_request_topic(request_type, environment)) + ) + + +def wait_for_response(cond, request_timeout): + with cond: + print("Waiting for a response to the sent request") + cond.wait_for(lambda: request_state['status'] != 'pending', + timeout=request_timeout) + # waiting is over now let's make sure it wasn't a timeout + if request_state['status'] == 'pending': + raise Exception("Timed out waiting for request response message") + return copy.deepcopy(request_state) + + +def start_consumer_thread(cond, request_type, request_id, environment): + registered = threading.Event() + t = threading.Thread(target=watch_finished_messages, + args=(cond, registered, + request_type, request_id, environment), + daemon=True) + t.start() + registered.wait() + print(f"Successfully started consumer thread") + + +def watch_finished_messages(cond, registered, + request_type, request_id, environment): + def callback(message): + if 'request_id' not in message.body or message.body['request_id'] != request_id: + return + with cond: + global request_state + request_state = message.body + cond.notify() + + queue = str(uuid.uuid4()) + + def registered_cb(consumers): + for consumer in consumers: + if consumer.queue == queue: + registered.set() + break + + def error_cb(failure): + print(f"Consumer hit failure {failure}") + reactor.stop() # pylint: disable=E1101 + + # use the public config for this; see related comment in send_message() + conf.load_config(FEDORA_MESSAGING_PUBLIC_CONF[environment]) + + bindings = [{ + 'exchange': 'amq.topic', + 'queue': queue, + 'routing_keys': [get_request_finished_topic(request_type, environment)] + }] + queues = { + queue: { + "durable": False, + "auto_delete": True, + "exclusive": True, + "arguments": {} + } + } + + consumers = twisted_consume(callback, bindings=bindings, queues=queues) + consumers.addCallback(registered_cb) + consumers.addErrback(error_cb) + reactor.run(installSignalHandlers=False) # pylint: disable=E1101