Skip to content

Commit

Permalink
Do not return control frames like ping.
Browse files Browse the repository at this point in the history
Recurse to the next frame like in master.

modified:   src/WebSockets.jl

 Reintroduce recursion on control frames:376. Returning control codes
 would crash String(msg). Multi-frame messages are also possible.
 Add WebSocketClosedError messages :47, for underlying exceptions.
 Propagate unrecognized errors.

 Remove INFO message when closing. Readguarded() informs better.

 Added functions:
  readguarded          return tuple: data and success indication
                       incomplete messages are always empty
  writeguarded         return success indication
  subprotocol(request) common for Httpserver and HTTP
  target(request)      common for Httpserver and HTTP
  origin(request)      common for Httpserver and HTTP

modified:   src/HTTP.jl

 make upgrade(ws) call websocket handler with full request.
     Origin can now be determined, as per recommendations.
 call showerror with catch_stacktrace(). backtrace was less interesting.
 add method target(request)
 add method origin(request)
 add method subprotocol(request)
 add ServerWS
     WebsocketHandler
     serve(::ServerWS, etc..)
     The intententions is user code brevity, access to HTTP
     keyword options, more similar interface with HttpServer,
     can save the user from distinguishing Stream and Message.

modified:   src/HttpServer.jl

 add method target(request)
 add method origin(request)
 add method subprotocol(request)
 improve inline doc for WebSocketHandler

modified:   examples/chat_explore.jl
 demonstrate new functions, improve readability

modified:   benchmark/functions_benchmark.jl
 capture WebSocketClosedError with readguarded(ws)

modified:   logutils/log_httpserver.jl
 explicitly import Request and Response from HttpServer

modified:   test/functions_server.jl
 rephrase inline docs, use subprotocol(request)

modified:   test/runtests.jl
 not rely on pong returning read(ws) call
  • Loading branch information
hustf committed May 24, 2018
1 parent 404b3b6 commit 8edb459
Show file tree
Hide file tree
Showing 8 changed files with 486 additions and 335 deletions.
4 changes: 2 additions & 2 deletions benchmark/functions_benchmark.jl
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ function HTS_JCE(n, messagesize)
sendtime = time_ns()
write(hts, msg)
# throw away replies
read(hts)
readguarded(hts)
receivereplytime = time_ns()
push!(receivereplytimes, Int64(receivereplytime < typemax(Int64) ? receivereplytime : 0 ))
push!(sendtimes, Int64(sendtime < typemax(Int64) ? sendtime : 0 ))
Expand All @@ -132,7 +132,7 @@ function HTS_JCE(n, messagesize)
# We must read from the websocket in order for it to respond to
# a closing message from JCE. It's also nice to yield to the async server
# so it can exit from it's handler and release the websocket reference.
isopen(hts) && read(hts)
isopen(hts) && readguarded(hts)
yield()
serverlatencies = receivetimes - sendtimes
clientlatencies = receivereplytimes - replytimes
Expand Down
315 changes: 111 additions & 204 deletions examples/chat_explore.jl
Original file line number Diff line number Diff line change
@@ -1,46 +1,89 @@
#=
Difference to chat-client:
This example declares global variables and use duck typing on
them. Their types will change.
The aim is that you can examine types in the REPL while running the
example. The aim is NOT that you can expect clean exits. We don't
release all the references after you close connections.
Function containers are explicitly defined with names. Although
anonymous functions may be more commonly used in the web domain,
named functions may improve error message readability.
A chat application using both HttpServer and HTTP to do
the same thing: Start a new task for each browser (tab) that connects.
=#
To use:
- include("chat_explore.jl") in REPL
- start a browser on address 127.0.0.1:8000, and another on 127.0.0.1:8080
- inspect global variables starting with 'last' while the chat is running asyncronously
To call in from other devices, figure out your IP address on the network and change the 'gatekeeper' code.
Note that type of 'lastreq' changes depending on whether the last call was made through HttpServer or HTTP.
# TODO fix errors and style
Functions used as arguments are explicitly defined with names instead of anonymous functions (do..end constructs).
This may improve debugging readability at the cost of increased verbosity.
# Globals, where used in functions will change the type
=#
global lastreq = 0
global lastreqheadersdict = 0
global lastws= 0
global lastwsHTTP = 0
global lastdata= 0
global lastmsg= 0
global lasthttp= 0
global lastws= 0
global laste= 0
global lasthttp= 0

using HttpServer
using HTTP
using WebSockets
const CLOSEAFTER = Base.Dates.Second(1800)
const HTTPPORT = 8080
const PORT_OLDTYPE = 8000
const HTTPSERVERPORT = 8000
const URL = "127.0.0.1"
const USERNAMES = Dict{String, WebSocket}()
const HTMLSTRING = readstring(Pkg.dir("WebSockets","examples","chat_explore.html"));

# If we are to access a websocket from outside
# it's websocket handler function, we need some kind of
# mutable container for storing references:
const WEBSOCKETS = Dict{WebSocket, Int}()

"""
Called by 'gatekeeper', this function stays active while the
particular websocket is open. The argument is an open websocket.
Other instances of the function run in other tasks. The tasks
are generated by either HTTP or HttpServer.
"""
function usews(thisws)
global lastws = thisws
push!(WEBSOCKETS, thisws => length(WEBSOCKETS) +1 )
t1 = now() + CLOSEAFTER
username = ""
while now() < t1
# This next call waits for a message to
# appear on the socket. If there is none,
# this task yields to other tasks.
data, success = readguarded(thisws)
!success && break
global lastmsg = msg = String(data)
print("Received: $msg ")
if username == ""
username = approvedusername(msg, thisws)
if username != ""
println("from new user $username ")
!writeguarded(thisws, username) && break
println("Tell everybody about $username")
foreach(keys(WEBSOCKETS)) do ws
writeguarded(ws, username * " enters chat")
end
else
println(", username taken!")
!writeguarded(thisws, "Username taken!") && break
end
else
println("from $username ")
distributemsg(msg, thisws)
startswith(msg, "exit") && break
end
end
exitmsg = username == "" ? "unknown" : username * " has left"
distributemsg(exitmsg, thisws)
println(exitmsg)
# No need to close the websocket. Just clean up external references:
removereferences(thisws)
nothing
end

#=
low level functions, works on old and new type.
=#
function removereferences(ws)
ws in keys(WEBSOCKETS) && pop!(WEBSOCKETS, ws)
haskey(WEBSOCKETS, ws) && pop!(WEBSOCKETS, ws)
for (discardname, wsref) in USERNAMES
if wsref === ws
pop!(USERNAMES, discardname)
Expand All @@ -50,55 +93,8 @@ function removereferences(ws)
nothing
end

function process_error(id, e)
if typeof(e) == InterruptException
info(id, "Received exit order.")
elseif typeof(e) == ArgumentError
info(id, typeof(e), "\t", e.msg)
elseif typeof(e) == ErrorException
info(id, typeof(e), "\t", e.msg)
else
if :msg in fieldnames(e) && e.msg == "Attempt to read from closed WebSocket"
warn(id, typeof(e), "\t", e.msg)
else
warn(id, e, "\nStacktrace:", stacktrace(true))
end
end
end

function protectedwrite(ws, msg)
global laste
try
write(ws, msg)
catch e
laste = e
process_error("chat_explore.protectedwrite: ", e)
removereferences(ws)
return false
end
true
end

function protectedread(ws)
global laste
global lastdata
data = Vector{UInt8}()
contflag = true
try
data = read(ws)
lastdata = data
catch e
laste = e
contflag = false
process_error("chat_explore.protectedread: ", e)
finally
return data, contflag
end
end



function findusername(msg, ws)
function approvedusername(msg, ws)
!startswith(msg, "userName:") && return ""
newname = msg[length("userName:") + 1:end]
newname =="" && return ""
Expand All @@ -111,152 +107,63 @@ end
function distributemsg(msgout, not_to_ws)
foreach(keys(WEBSOCKETS)) do ws
if ws !== not_to_ws
protectedwrite(ws, msgout)
writeguarded(ws, msgout)
end
end
nothing
end

function wsfunc(thisws)
global lastws
global lastmsg
lastws = thisws
push!(WEBSOCKETS, thisws => length(WEBSOCKETS) +1 )
contflag = true
t0 = now()
data = Vector{UInt8}()
msg = ""
username = ""
changedname = false
while now()-t0 < CLOSEAFTER && contflag
data, contflag = protectedread(thisws)
if contflag
msg = String(data)
lastmsg = msg
println("Received: $msg")
if username == ""
username = findusername(msg, thisws)
if username != ""
if !protectedwrite(thisws, username)
contflag = false
end
println("Tell everybody about $username")
foreach(keys(WEBSOCKETS)) do ws
protectedwrite(ws, username * " enters chat")
end
else
println("Username taken!")
if !protectedwrite(thisws, "Username taken!")
contflag = false
end
end
else
contflag = !startswith(msg, "exit")
contflag || println("Received exit message. Closing.")
end
end
end
exitusername = username == "" ? "unknown" : username
distributemsg(exitusername * " has left", thisws)
removereferences(thisws)
# It's not this functions responsibility to close the websocket. Just to forget about it.
nothing
end



#=
Functions for old type i.e. HttpServer based connections.
This function is called after handshake, and after
subprotocol is checked against a list of user supported subprotocols.
=#
function gatekeeper_oldtype(req, ws)
global lastreq
global lastws
lastreq = req
lastws = ws
# Here we can pick between functions
# based on e.g.
# if haskey(req.headers,"Sec-WebSocket-Protocol")
#
wsfunc(ws)
end


# Just for easy REPL inspection, we'll declare the handler object explicitly.
# With handler we mean an instance of a structure with at least one function reference.
handler_ws_oldtype = WebSocketHandler(gatekeeper_oldtype)
# explicit http server handlers
httpfunc_oldtype(req, res) = readstring(Pkg.dir("WebSockets","examples","chat_explore.html")) |> Response
handler_http_oldtype = HttpHandler(httpfunc_oldtype)
# define both in one server. We could call this a handler, too, since it's just a
# bigger function structure. Or we may call it an object.
server_def_oldtype = Server(handler_http_oldtype, handler_ws_oldtype )

#=
Now we'll run an external program which starts
the necessary tasks on Julia.
We can run this async, which might be considered
bad pracice and leads to more bad connections.
For debugging and building programs, it's gold to run this async.
You can close a server that's running in a task using
@schedule Base.throwto(listentask, InterruptException())
=#
litas_oldtype = @schedule run(server_def_oldtype, PORT_OLDTYPE)

info("Chat server listening on $PORT_OLDTYPE")

#=
Now open another port using HTTP instead of HttpServer
We'll start by defining the input functions for HTTP's listen method
=#


function gatekeeper_newtype(reqheadersdict, ws)
global lastreqheadersdict
lastreqheadersdict = reqheadersdict
global lastwsHTTP
lastwsHTTP = ws
# Inspect header Sec-WebSocket-Protocol to pick the right function.
wsfunc(ws)
end

httpfunc_newtype(req::HTTP.Request) = readstring(Pkg.dir("WebSockets","examples","chat_explore.html")) |> HTTP.Response
"""
`Server => gatekeeper(Request, WebSocket) => usews(WebSocket)`
function server_def_newtype(http)
global lasthttp
lasthttp = http
if WebSockets.is_upgrade(http.message)
WebSockets.upgrade(gatekeeper_newtype, http)
The gatekeeper makes it a little harder to connect with
malicious code. It inspects the request that was upgraded
to a a websocket.
"""
function gatekeeper(req, ws)
global lastreq = req
global lastws = ws
orig = WebSockets.origin(req)
if startswith(orig, "http://localhost") || startswith(orig, "http://127.0.0.1")
usews(ws)
else
HTTP.Servers.handle_request(httpfunc_newtype, http)
warn("Unauthorized websocket connection, $orig not approved by gatekeeper")
end
nothing
end

info("Start HTTP server on port $(HTTPPORT)")
litas_newtype = @schedule HTTP.listen(server_def_newtype, "127.0.0.1", UInt16(HTTPPORT))
"Request to response. Response is the predefined HTML page with some javascript"
req2resp(req::HttpServer.Request, resp) = HTMLSTRING |> Response
req2resp(req::HTTP.Request) = HTMLSTRING |> HTTP.Response

# Both server definitions need two function wrappers; one handler function for page requests,
# one for opening websockets (which the javascript in the HTML page will try to do)
server_httpserver = Server(HttpHandler(req2resp), WebSocketHandler(gatekeeper))
server_HTTP = WebSockets.ServerWS(HTTP.HandlerFunction(req2resp), WebSockets.WebsocketHandler(gatekeeper))

# Start the HTTP server asyncronously, and stop it later
litas_HTTP = @schedule WebSockets.serve(server_HTTP, URL, HTTPPORT, false)
@schedule begin
println("HTTP server listening on $URL:$HTTPPORT for $CLOSEAFTER")
sleep(CLOSEAFTER.value)
println("Time out, closing down $HTTPPORT")
Base.throwto(litas_HTTP, InterruptException())
end

"""
This stops the servers using InterruptExceptions.
"""
function closefromoutside()
# Throwing exceptions can be slow. This function also
# starts a task which seems to not exit and free up
# its memory properly. HTTP.listen offers an alternative
# method. See HTTP.listen > tcpref
if isdefined(:litas_newtype)
@schedule Base.throwto(litas_newtype, InterruptException())
end
if isdefined(:litas_oldtype)
try
@schedule Base.throwto(litas_oldtype, InterruptException())
catch e
info("closefromoutside: ", e)
end
end
# Start the HttpServer asyncronously, stop it later
litas_httpserver = @schedule run(server_httpserver, HTTPSERVERPORT)
@schedule begin
println("HttpServer listening on $URL:$HTTPSERVERPORT for $CLOSEAFTER")
sleep(CLOSEAFTER.value + 2)
println("Time out, closing down $HTTPSERVERPORT")
Base.throwto(litas_httpserver, InterruptException())
end

# Note that stopping the HttpServer in a while will send an error messages to the
# console. We could get rid of the messages by binding the task to a Channel.
# However, we can't get rid of ECONNRESET messages in that way. This is
# because the errors are triggered in tasks generated by litas_httpserver again,
# and those aren't channeled anywhere.

nothing
Loading

0 comments on commit 8edb459

Please sign in to comment.