From cf368c5c562a44e34af4e12bb123dee91e2a943f Mon Sep 17 00:00:00 2001 From: James Corbett Date: Wed, 10 Jul 2024 10:25:01 -0700 Subject: [PATCH] flux: cleanup zmq context and socket Problem: flux_instance_manager.py runs as a script and creates a ZMQ context and socket but never cleans them up. Use the context and socket as Python context managers so that they are cleaned up properly. --- parsl/executors/flux/flux_instance_manager.py | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/parsl/executors/flux/flux_instance_manager.py b/parsl/executors/flux/flux_instance_manager.py index 3d760bb5c8..e6111796b5 100644 --- a/parsl/executors/flux/flux_instance_manager.py +++ b/parsl/executors/flux/flux_instance_manager.py @@ -27,30 +27,29 @@ def main(): parser.add_argument("hostname", help="hostname of the parent executor's socket") parser.add_argument("port", help="Port of the parent executor's socket") args = parser.parse_args() - context = zmq.Context() - socket = context.socket(zmq.REQ) - socket.connect( - args.protocol + "://" + gethostbyname(args.hostname) + ":" + args.port - ) - # send the path to the ``flux.job`` package - socket.send(dirname(dirname(os.path.realpath(flux.__file__))).encode()) - logging.debug("Flux package path sent.") - # collect the encapsulating Flux instance's URI - local_uri = flux.Flux().attr_get("local-uri") - hostname = gethostname() - if args.hostname == hostname: - flux_uri = local_uri - else: - flux_uri = "ssh://" + gethostname() + local_uri.replace("local://", "") - logging.debug("Flux URI is %s", flux_uri) - response = socket.recv() # get acknowledgment - logging.debug("Received acknowledgment %s", response) - socket.send(flux_uri.encode()) # send URI - logging.debug("URI sent. Blocking for response...") - response = socket.recv() # wait for shutdown message - logging.debug("Response %s received, draining flux jobs...", response) - flux.Flux().rpc("job-manager.drain").get() - logging.debug("Flux jobs drained, exiting.") + with zmq.Context() as context, context.socket(zmq.REQ) as socket: + socket.connect( + args.protocol + "://" + gethostbyname(args.hostname) + ":" + args.port + ) + # send the path to the ``flux.job`` package + socket.send(dirname(dirname(os.path.realpath(flux.__file__))).encode()) + logging.debug("Flux package path sent.") + # collect the encapsulating Flux instance's URI + local_uri = flux.Flux().attr_get("local-uri") + hostname = gethostname() + if args.hostname == hostname: + flux_uri = local_uri + else: + flux_uri = "ssh://" + gethostname() + local_uri.replace("local://", "") + logging.debug("Flux URI is %s", flux_uri) + response = socket.recv() # get acknowledgment + logging.debug("Received acknowledgment %s", response) + socket.send(flux_uri.encode()) # send URI + logging.debug("URI sent. Blocking for response...") + response = socket.recv() # wait for shutdown message + logging.debug("Response %s received, draining flux jobs...", response) + flux.Flux().rpc("job-manager.drain").get() + logging.debug("Flux jobs drained, exiting.") if __name__ == "__main__":