-
Notifications
You must be signed in to change notification settings - Fork 493
/
evio.go
268 lines (251 loc) · 7.79 KB
/
evio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Copyright 2018 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package evio
import (
"io"
"net"
"os"
"strings"
"time"
)
// Action is an action that occurs after the completion of an event.
type Action int
const (
// None indicates that no action should occur following an event.
None Action = iota
// Detach detaches a connection. Not available for UDP connections.
Detach
// Close closes the connection.
Close
// Shutdown shutdowns the server.
Shutdown
)
// Options are set when the client opens.
type Options struct {
// TCPKeepAlive (SO_KEEPALIVE) socket option.
TCPKeepAlive time.Duration
// ReuseInputBuffer will forces the connection to share and reuse the
// same input packet buffer with all other connections that also use
// this option.
// Default value is false, which means that all input data which is
// passed to the Data event will be a uniquely copied []byte slice.
ReuseInputBuffer bool
}
// Server represents a server context which provides information about the
// running server and has control functions for managing state.
type Server struct {
// The addrs parameter is an array of listening addresses that align
// with the addr strings passed to the Serve function.
Addrs []net.Addr
// NumLoops is the number of loops that the server is using.
NumLoops int
}
// Conn is an evio connection.
type Conn interface {
// Context returns a user-defined context.
Context() interface{}
// SetContext sets a user-defined context.
SetContext(interface{})
// AddrIndex is the index of server address that was passed to the Serve call.
AddrIndex() int
// LocalAddr is the connection's local socket address.
LocalAddr() net.Addr
// RemoteAddr is the connection's remote peer address.
RemoteAddr() net.Addr
// Wake triggers a Data event for this connection.
Wake()
}
// LoadBalance sets the load balancing method.
type LoadBalance int
const (
// Random requests that connections are randomly distributed.
Random LoadBalance = iota
// RoundRobin requests that connections are distributed to a loop in a
// round-robin fashion.
RoundRobin
// LeastConnections assigns the next accepted connection to the loop with
// the least number of active connections.
LeastConnections
)
// Events represents the server events for the Serve call.
// Each event has an Action return value that is used manage the state
// of the connection and server.
type Events struct {
// NumLoops sets the number of loops to use for the server. Setting this
// to a value greater than 1 will effectively make the server
// multithreaded for multi-core machines. Which means you must take care
// with synchonizing memory between all event callbacks. Setting to 0 or 1
// will run the server single-threaded. Setting to -1 will automatically
// assign this value equal to runtime.NumProcs().
NumLoops int
// LoadBalance sets the load balancing method. Load balancing is always a
// best effort to attempt to distribute the incoming connections between
// multiple loops. This option is only works when NumLoops is set.
LoadBalance LoadBalance
// Serving fires when the server can accept connections. The server
// parameter has information and various utilities.
Serving func(server Server) (action Action)
// Opened fires when a new connection has opened.
// The info parameter has information about the connection such as
// it's local and remote address.
// Use the out return value to write data to the connection.
// The opts return value is used to set connection options.
Opened func(c Conn) (out []byte, opts Options, action Action)
// Closed fires when a connection has closed.
// The err parameter is the last known connection error.
Closed func(c Conn, err error) (action Action)
// Detached fires when a connection has been previously detached.
// Once detached it's up to the receiver of this event to manage the
// state of the connection. The Closed event will not be called for
// this connection.
// The conn parameter is a ReadWriteCloser that represents the
// underlying socket connection. It can be freely used in goroutines
// and should be closed when it's no longer needed.
Detached func(c Conn, rwc io.ReadWriteCloser) (action Action)
// PreWrite fires just before any data is written to any client socket.
PreWrite func()
// Data fires when a connection sends the server data.
// The in parameter is the incoming data.
// Use the out return value to write data to the connection.
Data func(c Conn, in []byte) (out []byte, action Action)
// Tick fires immediately after the server starts and will fire again
// following the duration specified by the delay return value.
Tick func() (delay time.Duration, action Action)
}
// Serve starts handling events for the specified addresses.
//
// Addresses should use a scheme prefix and be formatted
// like `tcp://192.168.0.10:9851` or `unix://socket`.
// Valid network schemes:
// tcp - bind to both IPv4 and IPv6
// tcp4 - IPv4
// tcp6 - IPv6
// udp - bind to both IPv4 and IPv6
// udp4 - IPv4
// udp6 - IPv6
// unix - Unix Domain Socket
//
// The "tcp" network scheme is assumed when one is not specified.
func Serve(events Events, addr ...string) error {
var lns []*listener
defer func() {
for _, ln := range lns {
ln.close()
}
}()
var stdlib bool
for _, addr := range addr {
var ln listener
var stdlibt bool
ln.network, ln.addr, ln.opts, stdlibt = parseAddr(addr)
if stdlibt {
stdlib = true
}
if ln.network == "unix" {
os.RemoveAll(ln.addr)
}
var err error
if ln.network == "udp" {
if ln.opts.reusePort {
ln.pconn, err = reuseportListenPacket(ln.network, ln.addr)
} else {
ln.pconn, err = net.ListenPacket(ln.network, ln.addr)
}
} else {
if ln.opts.reusePort {
ln.ln, err = reuseportListen(ln.network, ln.addr)
} else {
ln.ln, err = net.Listen(ln.network, ln.addr)
}
}
if err != nil {
return err
}
if ln.pconn != nil {
ln.lnaddr = ln.pconn.LocalAddr()
} else {
ln.lnaddr = ln.ln.Addr()
}
if !stdlib {
if err := ln.system(); err != nil {
return err
}
}
lns = append(lns, &ln)
}
if stdlib {
return stdserve(events, lns)
}
return serve(events, lns)
}
// InputStream is a helper type for managing input streams from inside
// the Data event.
type InputStream struct{ b []byte }
// Begin accepts a new packet and returns a working sequence of
// unprocessed bytes.
func (is *InputStream) Begin(packet []byte) (data []byte) {
data = packet
if len(is.b) > 0 {
is.b = append(is.b, data...)
data = is.b
}
return data
}
// End shifts the stream to match the unprocessed data.
func (is *InputStream) End(data []byte) {
if len(data) > 0 {
if len(data) != len(is.b) {
is.b = append(is.b[:0], data...)
}
} else if len(is.b) > 0 {
is.b = is.b[:0]
}
}
type listener struct {
ln net.Listener
lnaddr net.Addr
pconn net.PacketConn
opts addrOpts
f *os.File
fd int
network string
addr string
}
type addrOpts struct {
reusePort bool
}
func parseAddr(addr string) (network, address string, opts addrOpts, stdlib bool) {
network = "tcp"
address = addr
opts.reusePort = false
if strings.Contains(address, "://") {
network = strings.Split(address, "://")[0]
address = strings.Split(address, "://")[1]
}
if strings.HasSuffix(network, "-net") {
stdlib = true
network = network[:len(network)-4]
}
q := strings.Index(address, "?")
if q != -1 {
for _, part := range strings.Split(address[q+1:], "&") {
kv := strings.Split(part, "=")
if len(kv) == 2 {
switch kv[0] {
case "reuseport":
if len(kv[1]) != 0 {
switch kv[1][0] {
default:
opts.reusePort = kv[1][0] >= '1' && kv[1][0] <= '9'
case 'T', 't', 'Y', 'y':
opts.reusePort = true
}
}
}
}
}
address = address[:q]
}
return
}