Skip to content

Commit

Permalink
flux: cleanup zmq context and socket (#3518)
Browse files Browse the repository at this point in the history
Problem: flux_instance_manager.py runs as a script and creates a
ZMQ context and socket but never cleans them up.

Use the context and socket as Python context managers so that they
are cleaned up properly.
  • Loading branch information
jameshcorbett authored Jul 10, 2024
1 parent 0364bab commit 3d09cad
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions parsl/executors/flux/flux_instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,29 @@ def main():
parser.add_argument("hostname", help="hostname of the parent executor's socket")
parser.add_argument("port", help="Port of the parent executor's socket")
args = parser.parse_args()
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(
args.protocol + "://" + gethostbyname(args.hostname) + ":" + args.port
)
# send the path to the ``flux.job`` package
socket.send(dirname(dirname(os.path.realpath(flux.__file__))).encode())
logging.debug("Flux package path sent.")
# collect the encapsulating Flux instance's URI
local_uri = flux.Flux().attr_get("local-uri")
hostname = gethostname()
if args.hostname == hostname:
flux_uri = local_uri
else:
flux_uri = "ssh://" + gethostname() + local_uri.replace("local://", "")
logging.debug("Flux URI is %s", flux_uri)
response = socket.recv() # get acknowledgment
logging.debug("Received acknowledgment %s", response)
socket.send(flux_uri.encode()) # send URI
logging.debug("URI sent. Blocking for response...")
response = socket.recv() # wait for shutdown message
logging.debug("Response %s received, draining flux jobs...", response)
flux.Flux().rpc("job-manager.drain").get()
logging.debug("Flux jobs drained, exiting.")
with zmq.Context() as context, context.socket(zmq.REQ) as socket:
socket.connect(
args.protocol + "://" + gethostbyname(args.hostname) + ":" + args.port
)
# send the path to the ``flux.job`` package
socket.send(dirname(dirname(os.path.realpath(flux.__file__))).encode())
logging.debug("Flux package path sent.")
# collect the encapsulating Flux instance's URI
local_uri = flux.Flux().attr_get("local-uri")
hostname = gethostname()
if args.hostname == hostname:
flux_uri = local_uri
else:
flux_uri = "ssh://" + gethostname() + local_uri.replace("local://", "")
logging.debug("Flux URI is %s", flux_uri)
response = socket.recv() # get acknowledgment
logging.debug("Received acknowledgment %s", response)
socket.send(flux_uri.encode()) # send URI
logging.debug("URI sent. Blocking for response...")
response = socket.recv() # wait for shutdown message
logging.debug("Response %s received, draining flux jobs...", response)
flux.Flux().rpc("job-manager.drain").get()
logging.debug("Flux jobs drained, exiting.")


if __name__ == "__main__":
Expand Down

0 comments on commit 3d09cad

Please sign in to comment.