Skip to content
This repository has been archived by the owner on Dec 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #16 from bstoney/master
Browse files Browse the repository at this point in the history
Merge "Fixed zmq reply messaging", from @bstoney
  • Loading branch information
castarco authored Feb 12, 2017
2 parents c229d47 + 38163c6 commit bd5095b
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 67 deletions.
2 changes: 1 addition & 1 deletion src/Actions/Action.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

interface Action
{
public function call(array $header, array $content);
public function call(array $header, array $content, $zmqId = null);
}
53 changes: 37 additions & 16 deletions src/Actions/ExecuteAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@

namespace Litipk\JupyterPHP\Actions;


use Litipk\JupyterPHP\JupyterBroker;
use Psy\Exception\BreakException;
use Psy\Exception\ThrowUpException;
use Psy\ExecutionLoop\Loop;
use Psy\Shell;
use React\ZMQ\SocketWrapper;


final class ExecuteAction implements Action
{
/** @var JupyterBroker */
Expand All @@ -39,45 +37,68 @@ final class ExecuteAction implements Action

/** @var string */
private $code;


/** @var bool */
private $silent;

/** @var int */
private $execCount;
private $execCount = 0;


public function __construct(
JupyterBroker $broker, SocketWrapper $iopubSocket, SocketWrapper $shellSocket, Shell $shellSoul
)
{
JupyterBroker $broker,
SocketWrapper $iopubSocket,
SocketWrapper $shellSocket,
Shell $shellSoul
) {
$this->broker = $broker;
$this->iopubSocket = $iopubSocket;
$this->shellSocket = $shellSocket;
$this->shellSoul = $shellSoul;
}

public function call(array $header, array $content)
public function call(array $header, array $content, $zmqId = null)
{
$this->broker->send(
$this->iopubSocket, 'status', ['execution_state' => 'busy'], $header
);
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header);

$this->header = $header;
$this->execCount = isset($content->execution_count) ? $content->execution_count : 0;
$this->code = $content['code'];
$this->silent = $content['silent'];

if (!$this->silent) {
$this->execCount = $this->execCount + 1;

$this->broker->send(
$this->iopubSocket,
'execute_input',
['code' => $this->code, 'execution_count' => $this->execCount],
$this->header
);
}

($this->getClosure())();

$replyContent = [
'status' => 'ok',
'execution_count' => $this->execCount,
'payload' => [],
'user_expressions' => new \stdClass
];

$this->broker->send($this->shellSocket, 'execute_reply', $replyContent, $this->header, [], $zmqId);

$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $this->header);
}

public function notifyMessage(string $message)
{
$this->broker->send($this->shellSocket, 'execute_reply', ['status' => 'ok'], $this->header);
$this->broker->send($this->iopubSocket, 'stream', ['name' => 'stdout', 'data' => $message], $this->header);
$this->broker->send($this->iopubSocket, 'stream', ['name' => 'stdout', 'text' => $message], $this->header);
$this->broker->send(
$this->iopubSocket,
'execute_result',
['execution_count' => $this->execCount + 1, 'data' => $message, 'metadata' => new \stdClass],
['execution_count' => $this->execCount, 'data' => ['text/plain' => $message], 'metadata' => new \stdClass],
$this->header
);
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $this->header);
}

private function getClosure(): callable
Expand Down
4 changes: 2 additions & 2 deletions src/Actions/HistoryAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public function __construct(JupyterBroker $broker, SocketWrapper $shellSocket)
$this->shellSocket = $shellSocket;
}

public function call(array $header, array $content)
public function call(array $header, array $content, $zmqId = null)
{
$this->broker->send($this->shellSocket, 'history_reply', ['history' => []], $header);
$this->broker->send($this->shellSocket, 'history_reply', ['history' => []], $header, [], $zmqId);
}
}
24 changes: 10 additions & 14 deletions src/Actions/KernelInfoAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@

namespace Litipk\JupyterPHP\Actions;


use Litipk\JupyterPHP\JupyterBroker;
use React\ZMQ\SocketWrapper;


final class KernelInfoAction implements Action
{
/** @var JupyterBroker */
Expand All @@ -35,32 +33,30 @@ public function __construct(JupyterBroker $broker, SocketWrapper $shellSocket, S
$this->iopubSocket = $iopubSocket;
}

public function call(array $header, array $content)
public function call(array $header, array $content, $zmqId = null)
{
// TODO: Implement call() method.
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header);

$this->broker->send(
$this->iopubSocket, 'status', ['execution_state' => 'busy'], $header
);

$this->broker->send(
$this->shellSocket,
'kernel_info_reply',
[
'protocol_version' => '5.0.0',
'protocol_version' => '5.0',
'implementation' => 'jupyter-php',
'implementation_version' => '0.1.0',
'banner' => 'Jupyter-PHP Kernel',
'language' => 'PHP',
'language_version' => phpversion(),
'language_info' => [
'name' => 'PHP',
'version' => phpversion(),
'mimetype' => 'text/x-php',
'file_extension' => '.php',
'pygments_lexer' => 'PHP'
]
]
'pygments_lexer' => 'PHP',
],
'status' => 'ok',
],
$header,
[],
$zmqId
);

$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $header);
Expand Down
27 changes: 25 additions & 2 deletions src/Actions/ShutdownAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,34 @@

namespace Litipk\JupyterPHP\Actions;

use Litipk\JupyterPHP\JupyterBroker;
use React\ZMQ\SocketWrapper;

final class ShutdownAction implements Action
{
public function call(array $header, array $content)
/** @var JupyterBroker */
private $broker;

/** @var SocketWrapper */
private $shellSocket;

/** @var SocketWrapper */
private $iopubSocket;

public function __construct(JupyterBroker $broker, SocketWrapper $iopubSocket, SocketWrapper $shellSocket)
{
$this->broker = $broker;
$this->iopubSocket = $iopubSocket;
$this->shellSocket = $shellSocket;
}

public function call(array $header, array $content, $zmqId = null)
{
// TODO: Implement call() method.
$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header);

$replyContent = ['restart' => $content['restart']];
$this->broker->send($this->shellSocket, 'shutdown_reply', $replyContent, $header, [], $zmqId);

$this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $header);
}
}
18 changes: 14 additions & 4 deletions src/Handlers/HbMessagesHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,32 @@

namespace Litipk\JupyterPHP\Handlers;


use Litipk\JupyterPHP\JupyterBroker;
use Monolog\Logger;

use React\ZMQ\SocketWrapper;

final class HbMessagesHandler
{
/** @var Logger */
private $logger;

public function __construct(Logger $logger)
/** @var SocketWrapper */
private $hbSocket;

public function __construct(SocketWrapper $hbSocket, Logger $logger)
{
$this->logger = $logger;
$this->hbSocket = $hbSocket;
}

public function __invoke($msg)
{
$this->logger->debug('Received message', ['processId' => getmypid(), 'msg' => $msg]);

if (['ping'] == $msg) {
$this->hbSocket->send($msg);
} else {
$this->logger->error('Unknown message', ['processId' => getmypid(), 'msg' => $msg]);
}
}
}
}
40 changes: 20 additions & 20 deletions src/Handlers/ShellMessagesHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

namespace Litipk\JupyterPHP\Handlers;


use Litipk\JupyterPHP\Actions\ExecuteAction;
use Litipk\JupyterPHP\Actions\HistoryAction;
use Litipk\JupyterPHP\Actions\KernelInfoAction;
Expand All @@ -23,7 +22,6 @@
use Psy\Shell;
use React\ZMQ\SocketWrapper;


final class ShellMessagesHandler
{
/** @var ExecuteAction */
Expand All @@ -46,23 +44,25 @@ final class ShellMessagesHandler


public function __construct(
JupyterBroker $broker, SocketWrapper $iopubSocket, SocketWrapper $shellSocket, Logger $logger
)
{
JupyterBroker $broker,
SocketWrapper $iopubSocket,
SocketWrapper $shellSocket,
Logger $logger
) {
$this->shellSoul = new Shell();

$this->executeAction = new ExecuteAction($broker, $iopubSocket, $shellSocket, $this->shellSoul);
$this->historyAction = new HistoryAction($broker, $shellSocket);
$this->kernelInfoAction = new KernelInfoAction($broker, $shellSocket, $iopubSocket);
$this->shutdownAction = new ShutdownAction($broker, $shellSocket);
$this->shutdownAction = new ShutdownAction($broker, $iopubSocket, $shellSocket);

$this->logger = $logger;

$broker->send(
$iopubSocket, 'status', ['execution_state' => 'starting'], []
);

$this->shellSoul->setOutput( new KernelOutput($this->executeAction, $this->logger->withName('KernelOutput')));
$this->shellSoul->setOutput(new KernelOutput($this->executeAction, $this->logger->withName('KernelOutput')));
}

public function __invoke(array $msg)
Expand All @@ -73,24 +73,24 @@ public function __invoke(array $msg)
$content = json_decode($content, true);

$this->logger->debug('Received message', [
'processId' => getmypid(),
'zmqId' => $zmqId,
'delim' => $delim,
'hmac' => $hmac,
'header' => $header,
'processId' => getmypid(),
'zmqId' => htmlentities($zmqId, ENT_COMPAT, "UTF-8"),
'delim' => $delim,
'hmac' => $hmac,
'header' => $header,
'parentHeader' => $parentHeader,
'metadata' => $metadata,
'content' => $content
'metadata' => $metadata,
'content' => $content
]);

if ('kernel_info_request' === $header['msg_type']) {
$this->kernelInfoAction->call($header, $content);
$this->kernelInfoAction->call($header, $content, $zmqId);
} elseif ('execute_request' === $header['msg_type']) {
$this->executeAction->call($header, $content);
$this->executeAction->call($header, $content, $zmqId);
} elseif ('history_request' === $header['msg_type']) {
$this->historyAction->call($header, $content);
$this->historyAction->call($header, $content, $zmqId);
} elseif ('shutdown_request' === $header['msg_type']) {
$this->shutdownAction->call($header, $content);
$this->shutdownAction->call($header, $content, $zmqId);
} elseif ('comm_open' === $header['msg_type']) {
// TODO: Research about what should be done.
} else {
Expand Down
21 changes: 16 additions & 5 deletions src/JupyterBroker.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ public function __construct($key, $signatureScheme, UuidInterface $sessionId, Lo
}

public function send(
SocketWrapper $stream, $msgType, array $content = [], array $parentHeader = [], array $metadata = []
)
{
SocketWrapper $stream,
$msgType,
array $content = [],
array $parentHeader = [],
array $metadata = [],
$zmqId = null
) {
$header = $this->createHeader($msgType);

$msgDef = [
Expand All @@ -58,10 +62,16 @@ public function send(
json_encode(empty($content) ? new \stdClass : $content),
];

if (null !== $zmqId) {
$finalMsg = [$zmqId];
} else {
$finalMsg = [];
}

$finalMsg = array_merge(
$finalMsg,
['<IDS|MSG>', $this->sign($msgDef)],
$msgDef
);
$msgDef);

if (null !== $this->logger) {
$this->logger->debug('Sent message', ['processId' => getmypid(), 'message' => $finalMsg]);
Expand All @@ -78,6 +88,7 @@ private function createHeader(string $msgType): array
'username' => "kernel",
'session' => $this->sessionId->toString(),
'msg_type' => $msgType,
'version' => '5.0',
];
}

Expand Down
Loading

0 comments on commit bd5095b

Please sign in to comment.