Skip to content
This repository has been archived by the owner on Dec 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #20 from thomasjm/master
Browse files Browse the repository at this point in the history
Fix handling of multiple ZMQ identifiers
  • Loading branch information
castarco authored May 20, 2017
2 parents b0f4283 + 9ad9402 commit 1d816d5
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/Actions/Action.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

interface Action
{
public function call(array $header, array $content, $zmqId = null);
public function call(array $header, array $content, $zmqIds = []);
}
4 changes: 2 additions & 2 deletions src/Actions/ExecuteAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public function __construct(
$this->shellSoul = $shellSoul;
}

public function call(array $header, array $content, $zmqId = null)
public function call(array $header, array $content, $zmqIds = [])
{
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header);

Expand Down Expand Up @@ -85,7 +85,7 @@ public function call(array $header, array $content, $zmqId = null)
'user_expressions' => new \stdClass
];

$this->broker->send($this->shellSocket, 'execute_reply', $replyContent, $this->header, [], $zmqId);
$this->broker->send($this->shellSocket, 'execute_reply', $replyContent, $this->header, [], $zmqIds);

$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $this->header);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Actions/HistoryAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public function __construct(JupyterBroker $broker, SocketWrapper $shellSocket)
$this->shellSocket = $shellSocket;
}

public function call(array $header, array $content, $zmqId = null)
public function call(array $header, array $content, $zmqIds = [])
{
$this->broker->send($this->shellSocket, 'history_reply', ['history' => []], $header, [], $zmqId);
$this->broker->send($this->shellSocket, 'history_reply', ['history' => []], $header, [], $zmqIds);
}
}
4 changes: 2 additions & 2 deletions src/Actions/KernelInfoAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function __construct(JupyterBroker $broker, SocketWrapper $shellSocket, S
$this->iopubSocket = $iopubSocket;
}

public function call(array $header, array $content, $zmqId = null)
public function call(array $header, array $content, $zmqIds = [])
{
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header);

Expand All @@ -56,7 +56,7 @@ public function call(array $header, array $content, $zmqId = null)
],
$header,
[],
$zmqId
$zmqIds
);

$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $header);
Expand Down
4 changes: 2 additions & 2 deletions src/Actions/ShutdownAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ public function __construct(JupyterBroker $broker, SocketWrapper $iopubSocket, S
$this->shellSocket = $shellSocket;
}

public function call(array $header, array $content, $zmqId = null)
public function call(array $header, array $content, $zmqIds = [])
{
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header);

$replyContent = ['restart' => $content['restart']];
$this->broker->send($this->shellSocket, 'shutdown_reply', $replyContent, $header, [], $zmqId);
$this->broker->send($this->shellSocket, 'shutdown_reply', $replyContent, $header, [], $zmqIds);

$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $header);
}
Expand Down
22 changes: 15 additions & 7 deletions src/Handlers/ShellMessagesHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,23 @@ public function __construct(

public function __invoke(array $msg)
{
list($zmqId, $delim, $hmac, $header, $parentHeader, $metadata, $content) = $msg;
// Read ZMQ IDs until we reach the delimiter
$zmqIds = array();
while (!empty($msg)) {
$item = array_shift($msg);
if ($item === '<IDS|MSG>') break;
else array_push($zmqIds, $item);
}

// Read the remaining items
list($hmac, $header, $parentHeader, $metadata, $content) = $msg;

$header = json_decode($header, true);
$content = json_decode($content, true);

$this->logger->debug('Received message', [
'processId' => getmypid(),
'zmqId' => htmlentities($zmqId, ENT_COMPAT, "UTF-8"),
'delim' => $delim,
'zmqIds' => htmlentities(implode(", ", $zmqIds), ENT_COMPAT, "UTF-8"),
'hmac' => $hmac,
'header' => $header,
'parentHeader' => $parentHeader,
Expand All @@ -84,13 +92,13 @@ public function __invoke(array $msg)
]);

if ('kernel_info_request' === $header['msg_type']) {
$this->kernelInfoAction->call($header, $content, $zmqId);
$this->kernelInfoAction->call($header, $content, $zmqIds);
} elseif ('execute_request' === $header['msg_type']) {
$this->executeAction->call($header, $content, $zmqId);
$this->executeAction->call($header, $content, $zmqIds);
} elseif ('history_request' === $header['msg_type']) {
$this->historyAction->call($header, $content, $zmqId);
$this->historyAction->call($header, $content, $zmqIds);
} elseif ('shutdown_request' === $header['msg_type']) {
$this->shutdownAction->call($header, $content, $zmqId);
$this->shutdownAction->call($header, $content, $zmqIds);
} elseif ('comm_open' === $header['msg_type']) {
// TODO: Research about what should be done.
} else {
Expand Down
8 changes: 2 additions & 6 deletions src/JupyterBroker.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public function send(
array $content = [],
array $parentHeader = [],
array $metadata = [],
$zmqId = null
$zmqIds = []
) {
$header = $this->createHeader($msgType);

Expand All @@ -62,11 +62,7 @@ public function send(
json_encode(empty($content) ? new \stdClass : $content),
];

if (null !== $zmqId) {
$finalMsg = [$zmqId];
} else {
$finalMsg = [];
}
$finalMsg = $zmqIds;

$finalMsg = array_merge(
$finalMsg,
Expand Down

0 comments on commit 1d816d5

Please sign in to comment.