diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py index 8f8bd59fc..caafad54d 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/result_publisher.py @@ -149,6 +149,11 @@ def run(self) -> None: except Exception: log.exception("%r Unhandled error; event loop stopped", self) + + finally: + if self._mq_conn and self._mq_conn.ioloop: + self._mq_conn.ioloop.close() + self._stop_event.set() log.debug("%r thread ends.", self) diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/task_queue_subscriber.py b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/task_queue_subscriber.py index 8088edacd..c71a53216 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/task_queue_subscriber.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/rabbit_mq/task_queue_subscriber.py @@ -142,10 +142,16 @@ def run(self): self._connection = self._connect() self._event_watcher() self._connection.ioloop.start() + except Exception: logger.exception( "%s Unhandled exception: shutting down connection.", self ) + + finally: + if self._connection and self._connection.ioloop: + self._connection.ioloop.close() + self._stop_event.set() logger.debug("%s Shutdown complete", self) diff --git a/compute_endpoint/tests/integration/test_rabbit_mq/result_queue_subscriber.py b/compute_endpoint/tests/integration/test_rabbit_mq/result_queue_subscriber.py index a2b88ba73..1a361b1ea 100644 --- a/compute_endpoint/tests/integration/test_rabbit_mq/result_queue_subscriber.py +++ b/compute_endpoint/tests/integration/test_rabbit_mq/result_queue_subscriber.py @@ -323,6 +323,9 @@ def run(self): self._connection.ioloop.start() except Exception: logger.exception("Failed to start subscriber") + finally: + if self._connection and self._connection.ioloop: + self._connection.ioloop.close() def stop(self) -> None: """stop() is called by the parent to shutdown the subscriber"""