-
Notifications
You must be signed in to change notification settings - Fork 198
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a timeout option to htex command client
This is in preparation in an upcoming PR for replacing the current multiprocessing.Queue based report of interchange ports (which has a 120s timeout) with a command client based retrieval of that information (so now the command client needs to implement a 120s timeout to closely replicate that behaviour). That work is itself part of using fork/exec to launch the interchange, rather than multiprocessing.fork (issue #3373) When the command client timeouts after sending a command, then it sets itself to permanently bad: this is because the state of the channel is now unknown. eg. Should the next action be to receive a response from a previously timed out command that was eventually executed? Should the channel be recreated assuming a previously sent command was never sent? Tagging issue #3376 (command client is not thread safe) because I feel like reworking this timeout behaviour and reworking that thread safety might be a single piece of deeper work.
- Loading branch information
1 parent
b214714
commit 2d62249
Showing
3 changed files
with
115 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
import pytest | ||
import threading | ||
import time | ||
import zmq | ||
from parsl import curvezmq | ||
from parsl.executors.high_throughput.zmq_pipes import CommandClient | ||
from parsl.executors.high_throughput.errors import CommandClientTimeoutError, CommandClientBadError | ||
|
||
|
||
# Time constant used for timeout tests: various delays and | ||
# timeouts will be appropriate multiples of this, but the | ||
# value of T itself should not matter too much as long as | ||
# it is big enough for zmq connections to happen successfully. | ||
T = 0.25 | ||
|
||
|
||
@pytest.mark.local | ||
def test_command_not_sent() -> None: | ||
"""Tests timeout on command send. | ||
""" | ||
ctx = curvezmq.ClientContext(None) | ||
|
||
# RFC6335 ephemeral port range | ||
cc = CommandClient(ctx, "127.0.0.1", (49152, 65535)) | ||
|
||
# cc will now wait for a connection, but we won't do anything to make the | ||
# other side of the connection exist, so any command given to cc should | ||
# timeout. | ||
|
||
with pytest.raises(CommandClientTimeoutError): | ||
cc.run("SOMECOMMAND", timeout_s=T) | ||
|
||
cc.close() | ||
|
||
|
||
@pytest.mark.local | ||
def test_command_ignored() -> None: | ||
"""Tests timeout on command response. | ||
Tests that we timeout after a response and that the command client | ||
sets itself into a bad state. | ||
This only tests sequential access to the command client, even though | ||
htex makes multithreaded use of the command client: see issue #3376 about | ||
that lack of thread safety. | ||
""" | ||
ctx = curvezmq.ClientContext(None) | ||
|
||
# RFC6335 ephemeral port range | ||
cc = CommandClient(ctx, "127.0.0.1", (49152, 65535)) | ||
|
||
ic_ctx = curvezmq.ServerContext(None) | ||
ic_channel = ic_ctx.socket(zmq.REP) | ||
ic_channel.connect(f"tcp://127.0.0.1:{cc.port}") | ||
|
||
with pytest.raises(CommandClientTimeoutError): | ||
cc.run("SLOW_COMMAND", timeout_s=T) | ||
|
||
req = ic_channel.recv_pyobj() | ||
assert req == "SLOW_COMMAND", "Should have received command on interchange side" | ||
assert not cc.ok, "CommandClient should have set itself to bad" | ||
|
||
with pytest.raises(CommandClientBadError): | ||
cc.run("ANOTHER_COMMAND") | ||
|
||
cc.close() | ||
ctx.term() | ||
|
||
ic_channel.close() | ||
ic_ctx.term() |