-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient.go
345 lines (304 loc) · 9.36 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
package stratum
import (
"bytes"
"encoding/json"
"errors"
"io"
"log"
"math"
"net"
"net/rpc"
"os"
"reflect"
"runtime/pprof"
"sync"
"time"
"github.com/powerman/rpc-codec/jsonrpc2"
)
const (
seqNotify = math.MaxUint64
notificationBufferLength = 10
)
var (
null = json.RawMessage([]byte("null"))
// CallTimeout is the amount of time we wait for a response before we return an error
CallTimeout = 30 * time.Second
// ErrCallTimedOut means that call did not succeed within CallTimeout
ErrCallTimedOut = errors.New("rpc call timeout")
)
type clientCodec struct {
dec *json.Decoder // for reading JSON values
enc *json.Encoder // for writing JSON values
c io.ReadWriteCloser
// temporary work space
resp clientResponse
notif Notification
// JSON-RPC responses include the request id but not the request method.
// Package rpc expects both.
// We save the request method in pending when sending a request
// and then look it up by request ID when filling out the rpc Response.
mutex sync.Mutex // protects pending
pending map[uint64]string // map request id to method name
notifications chan Notification
}
// newClientCodec returns a new rpc.ClientCodec using JSON-RPC 2.0 on conn.
func newClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec {
return &clientCodec{
dec: json.NewDecoder(conn),
enc: json.NewEncoder(conn),
c: conn,
pending: make(map[uint64]string),
// if the buffer gets full, we assume that it's not being consumed and error out
notifications: make(chan Notification, notificationBufferLength),
}
}
type clientRequest struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Params interface{} `json:"params,omitempty"`
ID *uint64 `json:"id,omitempty"`
}
type Notification clientRequest
func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {
if r.Seq == 0 {
// seems many stratum pools don't like seq = 0
return errors.New("skipping first request")
}
// If return error: it will be returned as is for this call.
// Allow param to be only Array, Slice, Map or Struct.
// When param is nil or uninitialized Map or Slice - omit "params".
if param != nil {
switch k := reflect.TypeOf(param).Kind(); k {
case reflect.Map:
if reflect.TypeOf(param).Key().Kind() == reflect.String {
if reflect.ValueOf(param).IsNil() {
param = nil
}
}
case reflect.Slice:
if reflect.ValueOf(param).IsNil() {
param = nil
}
case reflect.Array, reflect.Struct:
case reflect.Ptr:
switch k := reflect.TypeOf(param).Elem().Kind(); k {
case reflect.Map:
if reflect.TypeOf(param).Elem().Key().Kind() == reflect.String {
if reflect.ValueOf(param).Elem().IsNil() {
param = nil
}
}
case reflect.Slice:
if reflect.ValueOf(param).Elem().IsNil() {
param = nil
}
case reflect.Array, reflect.Struct:
default:
return jsonrpc2.NewError(errInternal.Code, "unsupported param type: Ptr to "+k.String())
}
default:
return jsonrpc2.NewError(errInternal.Code, "unsupported param type: "+k.String())
}
}
var req clientRequest
if r.Seq != seqNotify {
c.mutex.Lock()
c.pending[r.Seq] = r.ServiceMethod
c.mutex.Unlock()
req.ID = &r.Seq
}
req.Version = "2.0"
req.Method = r.ServiceMethod
req.Params = param
if err := c.enc.Encode(&req); err != nil {
return jsonrpc2.NewError(errInternal.Code, err.Error())
}
return nil
}
type clientResponse struct {
Version string `json:"jsonrpc"`
ID *uint64 `json:"id"`
Result *json.RawMessage `json:"result,omitempty"`
Error *jsonrpc2.Error `json:"error,omitempty"`
}
func (r *clientResponse) reset() {
r.Version = ""
r.ID = nil
r.Result = nil
r.Error = nil
}
func (r *clientResponse) UnmarshalJSON(raw []byte) error {
r.reset()
type resp *clientResponse
if err := json.Unmarshal(raw, resp(r)); err != nil {
return errors.New("bad response: " + string(raw))
}
var o = make(map[string]*json.RawMessage)
if err := json.Unmarshal(raw, &o); err != nil {
return errors.New("bad response: " + string(raw))
}
_, okVer := o["jsonrpc"]
_, okID := o["id"]
_, okRes := o["result"]
_, okErr := o["error"]
// this has been updated to allow error and result as part of the response
if !okVer || !okID || !(okRes || okErr) || len(o) > 4 {
return errors.New("bad response: " + string(raw))
}
if r.Version != "2.0" {
return errors.New("bad response: " + string(raw))
}
if okRes && r.Result == nil {
r.Result = &null
}
if okErr && o["error"] != nil {
oe := make(map[string]*json.RawMessage)
if err := json.Unmarshal(*o["error"], &oe); err != nil {
return errors.New("bad response: " + string(raw))
}
if oe["code"] == nil || oe["message"] == nil {
return errors.New("bad response: " + string(raw))
}
if _, ok := oe["data"]; (!ok && len(oe) > 2) || len(oe) > 3 {
return errors.New("bad response: " + string(raw))
}
}
if o["id"] == nil && !okErr {
return errors.New("bad response: " + string(raw))
}
return nil
}
func (c *clientCodec) handleNotification(r io.Reader) error {
d := json.NewDecoder(r)
err := d.Decode(&c.notif)
// EOF is already handled by ReadResponseHeader
if err == nil {
c.receiveNotification()
}
return err
}
func (c *clientCodec) receiveNotification() {
// if we fill the buffer, kill the application
if len(c.notifications) >= notificationBufferLength {
out, _ := os.Create("/tmp/goroutine.pprof")
blockOut, _ := os.Create("/tmp/block.pprof")
defer out.Close()
defer blockOut.Close()
pprof.Lookup("goroutine").WriteTo(out, 2)
pprof.Lookup("block").WriteTo(blockOut, 2)
log.Fatal("Stratum client notification buffer is full! Process will be killed!" +
" Read from Client.Notifications to fix this error.")
}
c.notifications <- c.notif
}
// Because the stratum connection is bidirectional, we are going to modify the behavior of the client to accept
// notifications from the server (including jobs). Adding some server functionality (receive Notifs) to Client
// seems easier than multiplexing every connection. Notifications are NOT handled (eg. by RPC svc) by this codec
// This library throws a fatal error if it detects that notifications are not being consumed.
func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
// If return err:
// - io.EOF will became ErrShutdown or io.ErrUnexpectedEOF
// - it will be returned as is for all pending calls
// - client will be shutdown
// So, return io.EOF as is, return *Error for all other errors.
b := make([]byte, 0)
backup := bytes.NewBuffer(b)
conn := io.TeeReader(c.c, backup)
d := json.NewDecoder(conn)
if err := d.Decode(&c.resp); err != nil {
if err == io.EOF {
return err
}
return c.handleNotification(backup)
}
if c.resp.Error != nil {
return c.resp.Error
}
if c.resp.ID == nil {
// TODO - this is probably the wrong error
return errInternal
}
c.mutex.Lock()
r.ServiceMethod = c.pending[*c.resp.ID]
delete(c.pending, *c.resp.ID)
c.mutex.Unlock()
r.Error = ""
r.Seq = *c.resp.ID
if c.resp.Error != nil {
r.Error = c.resp.Error.Error()
}
return nil
}
func (c *clientCodec) ReadResponseBody(x interface{}) error {
// If x!=nil and return error e:
// - this call get e.Error() appended to "reading body "
// - other pending calls get error as is XXX actually other calls
// shouldn't be affected by this error at all, so let's at least
// provide different error message for other calls
if x == nil || c.resp.Result == nil {
return nil
}
if err := json.Unmarshal(*c.resp.Result, x); err != nil {
e := jsonrpc2.NewError(errInternal.Code, err.Error())
e.Data = jsonrpc2.NewError(errInternal.Code, "some other Call failed to unmarshal Reply")
return e
}
return nil
}
func (c *clientCodec) Close() error {
return c.c.Close()
}
type Client struct {
*rpc.Client
codec rpc.ClientCodec
}
// Call wraps rpc.Call to provide a timeout - otherwise functionality is the same
func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := c.Go(serviceMethod, args, reply, nil)
select {
case <-call.Done:
if call.Error != nil {
return call.Error
}
return nil
case <-time.After(CallTimeout):
return ErrCallTimedOut
}
}
// Notify tries to invoke the named function. It return error only in case
// it wasn't able to send request.
func (c *Client) Notify(serviceMethod string, args interface{}) error {
req := &rpc.Request{
ServiceMethod: serviceMethod,
Seq: seqNotify,
}
return c.codec.WriteRequest(req, args)
}
func (c *Client) Notifications() chan Notification {
return c.codec.(*clientCodec).notifications
}
// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
func NewClient(conn io.ReadWriteCloser) *Client {
codec := newClientCodec(conn)
client := rpc.NewClientWithCodec(codec)
// this is hack around
_ = client.Go("incrementMySequence", nil, nil, nil)
return &Client{client, codec}
}
// Dial connects to a JSON-RPC 2.0 server at the specified network address.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), err
}
// DialTimeout is Dial, but with the timeout specified
func DialTimeout(network, address string, timeout time.Duration) (*Client, error) {
conn, err := net.DialTimeout(network, address, timeout)
if err != nil {
return nil, err
}
return NewClient(conn), err
}