From 5e2efa49e5a92f6c47ffa75c3c2e9792638cc891 Mon Sep 17 00:00:00 2001 From: bstoney Date: Sat, 11 Feb 2017 12:26:52 +0800 Subject: [PATCH 1/5] Added zmq identifier to action calls --- src/Actions/Action.php | 2 +- src/Actions/ExecuteAction.php | 2 +- src/Actions/HistoryAction.php | 4 ++-- src/Actions/KernelInfoAction.php | 2 +- src/Actions/ShutdownAction.php | 2 +- src/Handlers/ShellMessagesHandler.php | 10 ++++----- src/JupyterBroker.php | 31 +++++++++++++++++++-------- 7 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/Actions/Action.php b/src/Actions/Action.php index 3c42ed9..eea1a01 100644 --- a/src/Actions/Action.php +++ b/src/Actions/Action.php @@ -6,5 +6,5 @@ interface Action { - public function call(array $header, array $content); + public function call(array $header, array $content, $zmqId = null); } diff --git a/src/Actions/ExecuteAction.php b/src/Actions/ExecuteAction.php index 5d018b1..3b6f66b 100644 --- a/src/Actions/ExecuteAction.php +++ b/src/Actions/ExecuteAction.php @@ -52,7 +52,7 @@ public function __construct( $this->shellSoul = $shellSoul; } - public function call(array $header, array $content) + public function call(array $header, array $content, $zmqId = null) { $this->broker->send( $this->iopubSocket, 'status', ['execution_state' => 'busy'], $header diff --git a/src/Actions/HistoryAction.php b/src/Actions/HistoryAction.php index 11a6706..0600b60 100644 --- a/src/Actions/HistoryAction.php +++ b/src/Actions/HistoryAction.php @@ -28,8 +28,8 @@ public function __construct(JupyterBroker $broker, SocketWrapper $shellSocket) $this->shellSocket = $shellSocket; } - public function call(array $header, array $content) + public function call(array $header, array $content, $zmqId = null) { - $this->broker->send($this->shellSocket, 'history_reply', ['history' => []], $header); + $this->broker->send($this->shellSocket, 'history_reply', ['history' => []], $header, [], $zmqId); } } diff --git a/src/Actions/KernelInfoAction.php b/src/Actions/KernelInfoAction.php index d9ef334..70e09db 100644 --- a/src/Actions/KernelInfoAction.php +++ b/src/Actions/KernelInfoAction.php @@ -32,7 +32,7 @@ public function __construct(JupyterBroker $broker, SocketWrapper $shellSocket, S $this->iopubSocket = $iopubSocket; } - public function call(array $header, array $content) + public function call(array $header, array $content, $zmqId = null) { // TODO: Implement call() method. diff --git a/src/Actions/ShutdownAction.php b/src/Actions/ShutdownAction.php index 78ee19b..b23c3da 100644 --- a/src/Actions/ShutdownAction.php +++ b/src/Actions/ShutdownAction.php @@ -6,7 +6,7 @@ final class ShutdownAction implements Action { - public function call(array $header, array $content) + public function call(array $header, array $content, $zmqId = null) { // TODO: Implement call() method. } diff --git a/src/Handlers/ShellMessagesHandler.php b/src/Handlers/ShellMessagesHandler.php index 5b0b844..30e345a 100644 --- a/src/Handlers/ShellMessagesHandler.php +++ b/src/Handlers/ShellMessagesHandler.php @@ -75,7 +75,7 @@ public function __invoke(array $msg) $this->logger->debug('Received message', [ 'processId' => getmypid(), - 'zmqId' => $zmqId, + 'zmqId' => htmlentities($zmqId, ENT_COMPAT, "UTF-8"), 'delim' => $delim, 'hmac' => $hmac, 'header' => $header, @@ -85,13 +85,13 @@ public function __invoke(array $msg) ]); if ('kernel_info_request' === $header['msg_type']) { - $this->kernelInfoAction->call($header, $content); + $this->kernelInfoAction->call($header, $content, $zmqId); } elseif ('execute_request' === $header['msg_type']) { - $this->executeAction->call($header, $content); + $this->executeAction->call($header, $content, $zmqId); } elseif ('history_request' === $header['msg_type']) { - $this->historyAction->call($header, $content); + $this->historyAction->call($header, $content, $zmqId); } elseif ('shutdown_request' === $header['msg_type']) { - $this->shutdownAction->call($header, $content); + $this->shutdownAction->call($header, $content, $zmqId); } elseif ('comm_open' === $header['msg_type']) { // TODO: Research about what should be done. } else { diff --git a/src/JupyterBroker.php b/src/JupyterBroker.php index 5392144..d045795 100644 --- a/src/JupyterBroker.php +++ b/src/JupyterBroker.php @@ -57,11 +57,16 @@ public function __construct($key, $signatureScheme, UuidInterface $sessionId, Lo * @param array $content * @param array $parentHeader * @param array $metadata + * @param string $zmqId */ public function send( - SocketWrapper $stream, $msgType, array $content = [], array $parentHeader = [], array $metadata = [] - ) - { + SocketWrapper $stream, + $msgType, + array $content = [], + array $parentHeader = [], + array $metadata = [], + $zmqId = null + ) { $header = $this->createHeader($msgType); $msgDef = [ @@ -71,10 +76,16 @@ public function send( json_encode(empty($content) ? new \stdClass : $content), ]; + if ($zmqId !== null) { + $finalMsg = [$zmqId]; + } else { + $finalMsg = []; + } + $finalMsg = array_merge( + $finalMsg, ['', $this->sign($msgDef)], - $msgDef - ); + $msgDef); if (null !== $this->logger) { $this->logger->debug('Sent message', ['processId' => getmypid(), 'message' => $finalMsg]); @@ -90,15 +101,17 @@ public function send( private function createHeader($msgType) { return [ - 'date' => (new \DateTime('NOW'))->format('c'), - 'msg_id' => Uuid::uuid4()->toString(), + 'date' => (new \DateTime('NOW'))->format('c'), + 'msg_id' => Uuid::uuid4()->toString(), 'username' => "kernel", - 'session' => $this->sesssionId->toString(), + 'session' => $this->sesssionId->toString(), 'msg_type' => $msgType, + 'version' => '5.0', ]; } - private function sign(array $message_list) { + private function sign(array $message_list) + { $hm = hash_init( $this->hashAlgorithm, HASH_HMAC, From 3267898bc1bbd1368341c9df639a2bf3d5e35c59 Mon Sep 17 00:00:00 2001 From: bstoney Date: Sat, 11 Feb 2017 12:38:13 +0800 Subject: [PATCH 2/5] Updated kernel info message format --- src/Actions/KernelInfoAction.php | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/Actions/KernelInfoAction.php b/src/Actions/KernelInfoAction.php index 70e09db..5f17a9f 100644 --- a/src/Actions/KernelInfoAction.php +++ b/src/Actions/KernelInfoAction.php @@ -3,11 +3,9 @@ namespace Litipk\JupyterPHP\Actions; - use Litipk\JupyterPHP\JupyterBroker; use React\ZMQ\SocketWrapper; - final class KernelInfoAction implements Action { /** @var JupyterBroker */ @@ -34,30 +32,28 @@ public function __construct(JupyterBroker $broker, SocketWrapper $shellSocket, S public function call(array $header, array $content, $zmqId = null) { - // TODO: Implement call() method. + $this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header); - $this->broker->send( - $this->iopubSocket, 'status', ['execution_state' => 'busy'], $header - ); - $this->broker->send( $this->shellSocket, 'kernel_info_reply', [ - 'protocol_version' => '5.0.0', + 'protocol_version' => '5.0', 'implementation' => 'jupyter-php', 'implementation_version' => '0.1.0', 'banner' => 'Jupyter-PHP Kernel', - 'language' => 'PHP', - 'language_version' => phpversion(), 'language_info' => [ 'name' => 'PHP', 'version' => phpversion(), 'mimetype' => 'text/x-php', 'file_extension' => '.php', - 'pygments_lexer' => 'PHP' - ] - ] + 'pygments_lexer' => 'PHP', + ], + 'status' => 'ok', + ], + $header, + [], + $zmqId ); $this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $header); From bb7b4ad4afd45d22780dc36619cea3098a16c372 Mon Sep 17 00:00:00 2001 From: bstoney Date: Sat, 11 Feb 2017 12:50:51 +0800 Subject: [PATCH 3/5] Added handling of heartbeat message --- src/Handlers/HbMessagesHandler.php | 18 ++++++++++++++---- src/KernelCore.php | 6 +++--- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/Handlers/HbMessagesHandler.php b/src/Handlers/HbMessagesHandler.php index 3c920fa..b1f5400 100644 --- a/src/Handlers/HbMessagesHandler.php +++ b/src/Handlers/HbMessagesHandler.php @@ -3,22 +3,32 @@ namespace Litipk\JupyterPHP\Handlers; - +use Litipk\JupyterPHP\JupyterBroker; use Monolog\Logger; - +use React\ZMQ\SocketWrapper; final class HbMessagesHandler { /** @var Logger */ private $logger; - public function __construct(Logger $logger) + /** @var SocketWrapper */ + private $hbSocket; + + public function __construct(SocketWrapper $hbSocket, Logger $logger) { $this->logger = $logger; + $this->hbSocket = $hbSocket; } public function __invoke($msg) { $this->logger->debug('Received message', ['processId' => getmypid(), 'msg' => $msg]); + + if (['ping'] == $msg) { + $this->hbSocket->send($msg); + } else { + $this->logger->error('Unknown message', ['processId' => getmypid(), 'msg' => $msg]); + } } -} \ No newline at end of file +} diff --git a/src/KernelCore.php b/src/KernelCore.php index 381fc3d..7e2ecb4 100644 --- a/src/KernelCore.php +++ b/src/KernelCore.php @@ -60,7 +60,7 @@ public function __construct(JupyterBroker $jupyterBroker, array $connUris, Logge { $this->broker = $jupyterBroker; $this->logger = $logger; - + $this->initSockets($connUris); $this->registerHandlers(); } @@ -74,7 +74,7 @@ public function run() } /** - * @param array[string]string $connUris + * @param array [string]string $connUris */ private function initSockets(array $connUris) { @@ -111,7 +111,7 @@ private function registerHandlers() ); $this->hbSocket->on( 'messages', - new HbMessagesHandler($this->logger->withName('HbMessagesHandler')) + new HbMessagesHandler($this->hbSocket, $this->logger->withName('HbMessagesHandler')) ); $this->iopubSocket->on( 'messages', From 6fbac23f3659c612d7dbebcc1c33ad7f441b6ca2 Mon Sep 17 00:00:00 2001 From: bstoney Date: Sat, 11 Feb 2017 13:02:33 +0800 Subject: [PATCH 4/5] Added handling of shutdown request --- src/Actions/ShutdownAction.php | 25 ++++++++++++++++++++- src/Handlers/ShellMessagesHandler.php | 32 +++++++++++++-------------- src/JupyterBroker.php | 2 +- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/src/Actions/ShutdownAction.php b/src/Actions/ShutdownAction.php index b23c3da..c7a03ba 100644 --- a/src/Actions/ShutdownAction.php +++ b/src/Actions/ShutdownAction.php @@ -3,11 +3,34 @@ namespace Litipk\JupyterPHP\Actions; +use Litipk\JupyterPHP\JupyterBroker; +use React\ZMQ\SocketWrapper; final class ShutdownAction implements Action { + /** @var JupyterBroker */ + private $broker; + + /** @var SocketWrapper */ + private $shellSocket; + + /** @var SocketWrapper */ + private $iopubSocket; + + public function __construct(JupyterBroker $broker, SocketWrapper $iopubSocket, SocketWrapper $shellSocket) + { + $this->broker = $broker; + $this->iopubSocket = $iopubSocket; + $this->shellSocket = $shellSocket; + } + public function call(array $header, array $content, $zmqId = null) { - // TODO: Implement call() method. + $this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header); + + $replyContent = ['restart' => $content['restart']]; + $this->broker->send($this->shellSocket, 'shutdown_reply', $replyContent, $header, [], $zmqId); + + $this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $header); } } diff --git a/src/Handlers/ShellMessagesHandler.php b/src/Handlers/ShellMessagesHandler.php index 30e345a..27455e9 100644 --- a/src/Handlers/ShellMessagesHandler.php +++ b/src/Handlers/ShellMessagesHandler.php @@ -3,7 +3,6 @@ namespace Litipk\JupyterPHP\Handlers; - use Litipk\JupyterPHP\Actions\ExecuteAction; use Litipk\JupyterPHP\Actions\HistoryAction; use Litipk\JupyterPHP\Actions\KernelInfoAction; @@ -15,7 +14,6 @@ use Psy\Shell; use React\ZMQ\SocketWrapper; - final class ShellMessagesHandler { /** @var ExecuteAction */ @@ -44,23 +42,25 @@ final class ShellMessagesHandler * @param Logger $logger */ public function __construct( - JupyterBroker $broker, SocketWrapper $iopubSocket, SocketWrapper $shellSocket, Logger $logger - ) - { + JupyterBroker $broker, + SocketWrapper $iopubSocket, + SocketWrapper $shellSocket, + Logger $logger + ) { $this->shellSoul = new Shell(); - + $this->executeAction = new ExecuteAction($broker, $iopubSocket, $shellSocket, $this->shellSoul); $this->historyAction = new HistoryAction($broker, $shellSocket); $this->kernelInfoAction = new KernelInfoAction($broker, $shellSocket, $iopubSocket); - $this->shutdownAction = new ShutdownAction($broker, $shellSocket); - + $this->shutdownAction = new ShutdownAction($broker, $iopubSocket, $shellSocket); + $this->logger = $logger; $broker->send( $iopubSocket, 'status', ['execution_state' => 'starting'], [] ); - $this->shellSoul->setOutput( new KernelOutput($this->executeAction, $this->logger->withName('KernelOutput'))); + $this->shellSoul->setOutput(new KernelOutput($this->executeAction, $this->logger->withName('KernelOutput'))); } /** @@ -74,14 +74,14 @@ public function __invoke(array $msg) $content = json_decode($content, true); $this->logger->debug('Received message', [ - 'processId' => getmypid(), - 'zmqId' => htmlentities($zmqId, ENT_COMPAT, "UTF-8"), - 'delim' => $delim, - 'hmac' => $hmac, - 'header' => $header, + 'processId' => getmypid(), + 'zmqId' => htmlentities($zmqId, ENT_COMPAT, "UTF-8"), + 'delim' => $delim, + 'hmac' => $hmac, + 'header' => $header, 'parentHeader' => $parentHeader, - 'metadata' => $metadata, - 'content' => $content + 'metadata' => $metadata, + 'content' => $content ]); if ('kernel_info_request' === $header['msg_type']) { diff --git a/src/JupyterBroker.php b/src/JupyterBroker.php index d045795..08892e5 100644 --- a/src/JupyterBroker.php +++ b/src/JupyterBroker.php @@ -76,7 +76,7 @@ public function send( json_encode(empty($content) ? new \stdClass : $content), ]; - if ($zmqId !== null) { + if (null !== $zmqId) { $finalMsg = [$zmqId]; } else { $finalMsg = []; From ff4f5af76e362477930a382b1c0b94a3d0027b43 Mon Sep 17 00:00:00 2001 From: bstoney Date: Sat, 11 Feb 2017 14:03:56 +0800 Subject: [PATCH 5/5] Added execute reply message --- src/Actions/ExecuteAction.php | 51 ++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/src/Actions/ExecuteAction.php b/src/Actions/ExecuteAction.php index 3b6f66b..063ce93 100644 --- a/src/Actions/ExecuteAction.php +++ b/src/Actions/ExecuteAction.php @@ -3,7 +3,6 @@ namespace Litipk\JupyterPHP\Actions; - use Litipk\JupyterPHP\JupyterBroker; use Psy\Exception\BreakException; use Psy\Exception\ThrowUpException; @@ -11,7 +10,6 @@ use Psy\Shell; use React\ZMQ\SocketWrapper; - final class ExecuteAction implements Action { /** @var JupyterBroker */ @@ -31,9 +29,12 @@ final class ExecuteAction implements Action /** @var string */ private $code; - + + /** @var bool */ + private $silent; + /** @var int */ - private $execCount; + private $execCount = 0; /** * ExecuteAction constructor. @@ -43,9 +44,11 @@ final class ExecuteAction implements Action * @param Shell $shellSoul */ public function __construct( - JupyterBroker $broker, SocketWrapper $iopubSocket, SocketWrapper $shellSocket, Shell $shellSoul - ) - { + JupyterBroker $broker, + SocketWrapper $iopubSocket, + SocketWrapper $shellSocket, + Shell $shellSoul + ) { $this->broker = $broker; $this->iopubSocket = $iopubSocket; $this->shellSocket = $shellSocket; @@ -54,16 +57,36 @@ public function __construct( public function call(array $header, array $content, $zmqId = null) { - $this->broker->send( - $this->iopubSocket, 'status', ['execution_state' => 'busy'], $header - ); + $this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'busy'], $header); $this->header = $header; - $this->execCount = isset($content->execution_count) ? $content->execution_count : 0; $this->code = $content['code']; + $this->silent = $content['silent']; + + if (!$this->silent) { + $this->execCount = $this->execCount + 1; + + $this->broker->send( + $this->iopubSocket, + 'execute_input', + ['code' => $this->code, 'execution_count' => $this->execCount], + $this->header + ); + } $closure = $this->getClosure(); $closure(); + + $replyContent = [ + 'status' => 'ok', + 'execution_count' => $this->execCount, + 'payload' => [], + 'user_expressions' => new \stdClass + ]; + + $this->broker->send($this->shellSocket, 'execute_reply', $replyContent, $this->header, [], $zmqId); + + $this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $this->header); } /** @@ -71,15 +94,13 @@ public function call(array $header, array $content, $zmqId = null) */ public function notifyMessage($message) { - $this->broker->send($this->shellSocket, 'execute_reply', ['status' => 'ok'], $this->header); - $this->broker->send($this->iopubSocket, 'stream', ['name' => 'stdout', 'data' => $message], $this->header); + $this->broker->send($this->iopubSocket, 'stream', ['name' => 'stdout', 'text' => $message], $this->header); $this->broker->send( $this->iopubSocket, 'execute_result', - ['execution_count' => $this->execCount + 1, 'data' => $message, 'metadata' => new \stdClass], + ['execution_count' => $this->execCount, 'data' => ['text/plain' => $message], 'metadata' => new \stdClass], $this->header ); - $this->broker->send($this->iopubSocket, 'status', ['execution_state' => 'idle'], $this->header); } /**