-
Notifications
You must be signed in to change notification settings - Fork 180
/
Copy pathServers.jl
423 lines (378 loc) · 13.1 KB
/
Servers.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
"""
The `HTTP.Servers` module provides HTTP server functionality.
The main entry point is `HTTP.listen(f, host, port; kw...)` which takes
a `f(::HTTP.Stream)::Nothing` function argument, a `host`, a `port` and
optional keyword arguments. For full details, see `?HTTP.listen`.
For server functionality operating on full requests, see `?HTTP.Handlers`
module and `?HTTP.serve` function.
"""
module Servers
export listen
using ..IOExtras
using ..Streams
using ..Messages
using ..Parsers
using ..ConnectionPool
using Sockets
using MbedTLS: SSLContext, SSLConfig
import MbedTLS
using Dates
import ..@debug, ..@debugshow, ..DEBUG_LEVEL, ..taskid
# rate limiting
mutable struct RateLimit
allowance::Float64
lastcheck::Dates.DateTime
end
function update!(rl::RateLimit, rate_limit)
current = Dates.now()
timepassed = float(Dates.value(current - rl.lastcheck)) / 1000.0
rl.lastcheck = current
rl.allowance += timepassed * rate_limit
return nothing
end
const RATE_LIMITS = [Dict{IPAddr, RateLimit}()]
check_rate_limit(tcp::Base.PipeEndpoint, rate_limit::Rational{Int}) = true
check_rate_limit(tcp, ::Nothing) = true
"""
`check_rate_limit` takes a new connection (socket), and checks in
the global RATE_LIMITS store for the last time a connection was
seen for the same ip address. If the new connection has come too
soon, it is closed and discarded, otherwise, the timestamp for the
ip address is updated in the global cache.
"""
function check_rate_limit(tcp, rate_limit::Rational{Int})
ip = Sockets.getsockname(tcp)[1]
rate = Float64(rate_limit.num)
rl = get!(RATE_LIMITS[Threads.threadid()], ip, RateLimit(rate, Dates.now()))
update!(rl, rate_limit)
if rl.allowance > rate
@warn "throttling $ip"
rl.allowance = rate
end
if rl.allowance < 1.0
@warn "discarding connection from $ip due to rate limiting"
return false
else
rl.allowance -= 1.0
end
return true
end
"Convenience object for passing around server details"
struct Server{S <: Union{SSLConfig, Nothing}, I <: Base.IOServer}
ssl::S
server::I
hostname::String
hostport::String
on_shutdown::Any
end
Base.isopen(s::Server) = isopen(s.server)
Base.close(s::Server) = (shutdown(s.on_shutdown); close(s.server))
"""
shutdown(fns::Vector{<:Function})
shutdown(fn::Function)
shutdown(::Nothing)
Runs function(s) in `on_shutdown` field of `Server` when
`Server` is closed.
"""
shutdown(fns::Vector{<:Function}) = foreach(shutdown, fns)
shutdown(::Nothing) = nothing
function shutdown(fn::Function)
try
fn()
catch e
@error "shutdown function $fn failed" exception=(e, catch_backtrace())
end
end
Sockets.accept(s::Server{Nothing}) = accept(s.server)::TCPSocket
Sockets.accept(s::Server{SSLConfig}) = getsslcontext(accept(s.server), s.ssl)
function getsslcontext(tcp, sslconfig)
try
ssl = MbedTLS.SSLContext()
MbedTLS.setup!(ssl, sslconfig)
MbedTLS.associate!(ssl, tcp)
MbedTLS.handshake!(ssl)
return ssl
catch e
return nothing
end
end
"""
HTTP.listen([host=Sockets.localhost[, port=8081]]; kw...) do http::HTTP.Stream
...
end
Listen for HTTP connections and execute the `do` function for each request.
The `do` function should be of the form `f(::HTTP.Stream)::Nothing`.
Optional keyword arguments:
- `sslconfig=nothing`, Provide an `MbedTLS.SSLConfig` object to handle ssl
connections. Pass `sslconfig=MbedTLS.SSLConfig(false)` to disable ssl
verification (useful for testing).
- `reuse_limit = nolimit`, number of times a connection is allowed to be
reused after the first request.
- `tcpisvalid = tcp->true`, function `f(::TCPSocket)::Bool` to, check accepted
connection before processing requests. e.g. to do source IP filtering.
- `readtimeout::Int=0`, close the connection if no data is received for this
many seconds. Use readtimeout = 0 to disable.
- `reuseaddr::Bool=false`, allow multiple servers to listen on the same port.
- `server::Base.IOServer=nothing`, provide an `IOServer` object to listen on;
allows closing the server.
- `connection_count::Ref{Int}`, reference to track the number of currently
open connections.
- `rate_limit::Rational{Int}=nothing"`, number of `connections//second`
allowed per client IP address; excess connections are immediately closed.
e.g. 5//1.
- `verbose::Bool=false`, log connection information to `stdout`.
- `on_shutdown::Union{Function, Vector{<:Function}, Nothing}=nothing`, one or
more functions to be run if the server is closed (for example by an
`InterruptException`). Note, shutdown function(s) will not run if a
`IOServer` object is supplied and closed by `close(server)`.
e.g.
```julia
HTTP.listen("127.0.0.1", 8081) do http
HTTP.setheader(http, "Content-Type" => "text/html")
write(http, "target uri: \$(http.message.target)<BR>")
write(http, "request body:<BR><PRE>")
write(http, read(http))
write(http, "</PRE>")
return
end
HTTP.listen("127.0.0.1", 8081) do http
@show http.message
@show HTTP.header(http, "Content-Type")
while !eof(http)
println("body data: ", String(readavailable(http)))
end
HTTP.setstatus(http, 404)
HTTP.setheader(http, "Foo-Header" => "bar")
startwrite(http)
write(http, "response body")
write(http, "more response body")
end
```
The `server=` option can be used to pass an already listening socket to
`HTTP.listen`. This allows manual control of server shutdown.
e.g.
```julia
using Sockets
server = Sockets.listen(Sockets.InetAddr(parse(IPAddr, host), port))
@async HTTP.listen(f, host, port; server=server)
# Closing server will stop HTTP.listen.
close(server)
```
To run the following HTTP chat example, open two Julia REPL windows and paste
the example code into both of them. Then in one window run `chat_server()` and
in the other run `chat_client()`, then type `hello` and press return.
Whatever you type on the client will be displayed on the server and vis-versa.
```
using HTTP
function chat(io::HTTP.Stream)
@async while !eof(io)
write(stdout, readavailable(io), "\\n")
end
while isopen(io)
write(io, readline(stdin))
end
end
chat_server() = HTTP.listen("127.0.0.1", 8087) do io
write(io, "HTTP.jl Chat Server. Welcome!")
chat(io)
end
chat_client() = HTTP.open("POST", "http://127.0.0.1:8087") do io
chat(io)
end
```
"""
function listen end
const nolimit = typemax(Int)
getinet(host::String, port::Integer) = Sockets.InetAddr(parse(IPAddr, host), port)
getinet(host::IPAddr, port::Integer) = Sockets.InetAddr(host, port)
function listen(f,
host::Union{IPAddr, String}=Sockets.localhost,
port::Integer=8081
;
sslconfig::Union{MbedTLS.SSLConfig, Nothing}=nothing,
tcpisvalid::Function=tcp->true,
server::Union{Base.IOServer, Nothing}=nothing,
reuseaddr::Bool=false,
connection_count::Ref{Int}=Ref(0),
rate_limit::Union{Rational{Int}, Nothing}=nothing,
reuse_limit::Int=nolimit,
readtimeout::Int=0,
verbose::Bool=false,
on_shutdown::Union{Function, Vector{<:Function}, Nothing}=nothing)
inet = getinet(host, port)
if server !== nothing
tcpserver = server
host, port = getsockname(server)
elseif reuseaddr
tcpserver = Sockets.TCPServer(; delay=false)
if Sys.isunix()
if Sys.isapple()
verbose && @warn "note that `reuseaddr=true` allows multiple processes to bind to the same addr/port, but only one process will accept new connections (if that process exits, another process listening will start accepting)"
end
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Cvoid},), tcpserver.handle)
Sockets.bind(tcpserver, inet.host, inet.port; reuseaddr=true)
else
@warn "reuseaddr=true may not be supported on this platform: $(Sys.KERNEL)"
Sockets.bind(tcpserver, inet.host, inet.port; reuseaddr=true)
end
Sockets.listen(tcpserver)
else
tcpserver = Sockets.listen(inet)
end
verbose && @info "Listening on: $host:$port"
tcpisvalid = let f=tcpisvalid
x -> f(x) && check_rate_limit(x, rate_limit)
end
s = Server(sslconfig, tcpserver, string(host), string(port), on_shutdown)
return listenloop(f, s, tcpisvalid, connection_count,
reuse_limit, readtimeout, verbose)
end
""""
Main server loop.
Accepts new tcp connections and spawns async tasks to handle them."
"""
function listenloop(f, server, tcpisvalid, connection_count,
reuse_limit, readtimeout, verbose)
count = 1
while isopen(server)
try
io = accept(server)
if io === nothing
verbose && @warn "unable to accept new connection"
continue
elseif !tcpisvalid(io)
verbose && @info "Accept-Reject: $io"
close(io)
continue
end
connection_count[] += 1
conn = Connection(io)
conn.host, conn.port = server.hostname, server.hostport
@async try
# verbose && @info "Accept ($count): $conn"
handle_connection(f, conn, server, reuse_limit, readtimeout)
# verbose && @info "Closed ($count): $conn"
catch e
if e isa Base.IOError && (e.code == -54 || e.code == -4077)
verbose && @warn "connection reset by peer (ECONNRESET)"
else
@error exception=(e, stacktrace(catch_backtrace()))
end
finally
connection_count[] -= 1
# handle_connection is in charge of closing the underlying io
end
catch e
close(server)
if e isa InterruptException
@warn "Interrupted: listen($server)"
break
else
rethrow(e)
end
end
count += 1
end
return
end
"""
Start a `check_readtimeout` task to close the `Connection` if it is inactive.
Create a `Transaction` object for each HTTP Request received.
After `reuse_limit + 1` transactions, signal `final_transaction` to the
transaction handler.
"""
function handle_connection(f, c::Connection, server, reuse_limit, readtimeout)
if readtimeout > 0
wait_for_timeout = Ref{Bool}(true)
@async check_readtimeout(c, readtimeout, wait_for_timeout)
end
try
count = 0
# if the connection socket or original server close, we stop taking requests
while isopen(c) && isopen(server) && count <= reuse_limit
handle_transaction(f, Transaction(c), server;
final_transaction=(count == reuse_limit))
count += 1
end
finally
if readtimeout > 0
wait_for_timeout[] = false
end
end
return
end
"""
If `c` is inactive for a more than `readtimeout` then close the `c`."
"""
function check_readtimeout(c, readtimeout, wait_for_timeout)
while wait_for_timeout[]
if inactiveseconds(c) > readtimeout
@warn "Connection Timeout: $c"
try
writeheaders(c.io, Response(408, ["Connection" => "close"]))
finally
close(c)
end
break
end
sleep(8 + rand() * 4)
end
return
end
"""
Create a `HTTP.Stream` and parse the Request headers from a `HTTP.Transaction`
(by calling `startread(::Stream`).
If there is a parse error, send an error Response.
Otherwise, execute stream processing function `f`.
If `f` throws an exception, send an error Response and close the connection.
"""
function handle_transaction(f, t::Transaction, server; final_transaction::Bool=false)
request = Request()
http = Stream(request, t)
try
@debug 2 "server startread"
startread(http)
if !isopen(server)
close(t)
return
end
catch e
if e isa EOFError && isempty(request.method)
return
elseif e isa ParseError
status = e.code == :HEADER_SIZE_EXCEEDS_LIMIT ? 413 : 400
write(t, Response(status, body = string(e.code)))
close(t)
return
else
rethrow(e)
end
end
request.response.status = 200
if final_transaction || hasheader(request, "Connection", "close")
setheader(request.response, "Connection" => "close")
end
@async try
f(http)
@debug 2 "server closeread"
closeread(http)
@debug 2 "server closewrite"
closewrite(http)
catch e
@error "error handling request" exception=(e, stacktrace(catch_backtrace()))
if isopen(http) && !iswritable(http)
http.message.response.status = 500
startwrite(http)
write(http, sprint(showerror, e))
end
final_transaction = true
finally
final_transaction && close(t.c.io)
end
return
end
function __init__()
Threads.resize_nthreads!(RATE_LIMITS)
return
end
end # module