forked from gopherdata/gophernotes
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgophernotes.go
211 lines (180 loc) · 6.03 KB
/
gophernotes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package main
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
zmq "github.com/alecthomas/gozmq"
"github.com/pkg/errors"
)
var logger *log.Logger
// ConnectionInfo stores the contents of the kernel connection file created by Jupyter.
type ConnectionInfo struct {
SignatureScheme string `json:"signature_scheme"`
Transport string `json:"transport"`
StdinPort int `json:"stdin_port"`
ControlPort int `json:"control_port"`
IOPubPort int `json:"iopub_port"`
HBPort int `json:"hb_port"`
ShellPort int `json:"shell_port"`
Key string `json:"key"`
IP string `json:"ip"`
}
// SocketGroup holds the sockets needed to communicate with the kernel, and
// the key for message signing.
type SocketGroup struct {
ShellSocket *zmq.Socket
ControlSocket *zmq.Socket
StdinSocket *zmq.Socket
IOPubSocket *zmq.Socket
Key []byte
}
// PrepareSockets sets up the ZMQ sockets through which the kernel will communicate.
func PrepareSockets(connInfo ConnectionInfo) (SocketGroup, error) {
// Initialize the Socket Group.
context, sg, err := createSockets()
if err != nil {
return sg, errors.Wrap(err, "Could not initialize context and Socket Group")
}
// Bind the sockets.
address := fmt.Sprintf("%v://%v:%%v", connInfo.Transport, connInfo.IP)
sg.ShellSocket.Bind(fmt.Sprintf(address, connInfo.ShellPort))
sg.ControlSocket.Bind(fmt.Sprintf(address, connInfo.ControlPort))
sg.StdinSocket.Bind(fmt.Sprintf(address, connInfo.StdinPort))
sg.IOPubSocket.Bind(fmt.Sprintf(address, connInfo.IOPubPort))
// Message signing key
sg.Key = []byte(connInfo.Key)
// Start the heartbeat device
HBSocket, err := context.NewSocket(zmq.REP)
if err != nil {
return sg, errors.Wrap(err, "Could not get the Heartbeat device socket")
}
HBSocket.Bind(fmt.Sprintf(address, connInfo.HBPort))
go zmq.Device(zmq.FORWARDER, HBSocket, HBSocket)
return sg, nil
}
// createSockets initializes the sockets for the socket group based on values from zmq.
func createSockets() (*zmq.Context, SocketGroup, error) {
context, err := zmq.NewContext()
if err != nil {
return context, SocketGroup{}, errors.Wrap(err, "Could not create zmq Context")
}
var sg SocketGroup
sg.ShellSocket, err = context.NewSocket(zmq.ROUTER)
if err != nil {
return context, sg, errors.Wrap(err, "Could not get Shell Socket")
}
sg.ControlSocket, err = context.NewSocket(zmq.ROUTER)
if err != nil {
return context, sg, errors.Wrap(err, "Could not get Control Socket")
}
sg.StdinSocket, err = context.NewSocket(zmq.ROUTER)
if err != nil {
return context, sg, errors.Wrap(err, "Could not get Stdin Socket")
}
sg.IOPubSocket, err = context.NewSocket(zmq.PUB)
if err != nil {
return context, sg, errors.Wrap(err, "Could not get IOPub Socket")
}
return context, sg, nil
}
// HandleShellMsg responds to a message on the shell ROUTER socket.
func HandleShellMsg(receipt MsgReceipt) {
switch receipt.Msg.Header.MsgType {
case "kernel_info_request":
SendKernelInfo(receipt)
case "execute_request":
HandleExecuteRequest(receipt)
case "shutdown_request":
HandleShutdownRequest(receipt)
default:
logger.Println("Unhandled shell message:", receipt.Msg.Header.MsgType)
}
}
// KernelInfo holds information about the igo kernel, for kernel_info_reply messages.
type KernelInfo struct {
ProtocolVersion []int `json:"protocol_version"`
Language string `json:"language"`
}
// KernelStatus holds a kernel state, for status broadcast messages.
type KernelStatus struct {
ExecutionState string `json:"execution_state"`
}
// SendKernelInfo sends a kernel_info_reply message.
func SendKernelInfo(receipt MsgReceipt) {
reply := NewMsg("kernel_info_reply", receipt.Msg)
reply.Content = KernelInfo{[]int{4, 0}, "go"}
receipt.SendResponse(receipt.Sockets.ShellSocket, reply)
}
// ShutdownReply encodes a boolean indication of stutdown/restart
type ShutdownReply struct {
Restart bool `json:"restart"`
}
// HandleShutdownRequest sends a "shutdown" message
func HandleShutdownRequest(receipt MsgReceipt) {
reply := NewMsg("shutdown_reply", receipt.Msg)
content := receipt.Msg.Content.(map[string]interface{})
restart := content["restart"].(bool)
reply.Content = ShutdownReply{restart}
receipt.SendResponse(receipt.Sockets.ShellSocket, reply)
logger.Println("Shutting down in response to shutdown_request")
os.Exit(0)
}
// RunKernel is the main entry point to start the kernel.
func RunKernel(connectionFile string, logwriter io.Writer) {
logger = log.New(logwriter, "gophernotes ", log.LstdFlags)
// Set up the "Session" with the replpkg.
SetupExecutionEnvironment()
var connInfo ConnectionInfo
bs, err := ioutil.ReadFile(connectionFile)
if err != nil {
log.Fatalln(err)
}
if err = json.Unmarshal(bs, &connInfo); err != nil {
log.Fatalln(err)
}
logger.Printf("%+v\n", connInfo)
// Set up the ZMQ sockets through which the kernel will communicate.
sockets, err := PrepareSockets(connInfo)
if err != nil {
log.Fatalln(err)
}
pi := zmq.PollItems{
zmq.PollItem{Socket: sockets.ShellSocket, Events: zmq.POLLIN},
zmq.PollItem{Socket: sockets.StdinSocket, Events: zmq.POLLIN},
zmq.PollItem{Socket: sockets.ControlSocket, Events: zmq.POLLIN},
}
// Start a message receiving loop.
var msgparts [][]byte
for {
if _, err = zmq.Poll(pi, -1); err != nil {
log.Fatalln(err)
}
switch {
case pi[0].REvents&zmq.POLLIN != 0: // shell socket
msgparts, _ = pi[0].Socket.RecvMultipart(0)
msg, ids, err := WireMsgToComposedMsg(msgparts, sockets.Key)
if err != nil {
log.Println(err)
return
}
HandleShellMsg(MsgReceipt{msg, ids, sockets})
case pi[1].REvents&zmq.POLLIN != 0: // stdin socket - not implemented.
pi[1].Socket.RecvMultipart(0)
case pi[2].REvents&zmq.POLLIN != 0: // control socket - treat like shell socket.
msgparts, err = pi[2].Socket.RecvMultipart(0)
if err != nil {
log.Println(err)
return
}
msg, ids, err := WireMsgToComposedMsg(msgparts, sockets.Key)
if err != nil {
log.Println(err)
return
}
HandleShellMsg(MsgReceipt{msg, ids, sockets})
}
}
}