Skip to content

Commit

Permalink
Http.jl (#92)
Browse files Browse the repository at this point in the history
* Update README

* Graceful close from client + run HTTP server HttpServer server simulataneously.

* Test ping, pong. Temp tests.

* Add try, catch and close

Indent

WIP

* Fix String mask issue

* Minor: Fix an indent

* Update WebSockets.jl

Add a comment to remove the `copy` for v0.7.
  • Loading branch information
Eric Forgy authored and hustf committed Feb 20, 2018
1 parent 4f40a50 commit 9b01fe6
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 72 deletions.
14 changes: 12 additions & 2 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ function open(f::Function, url; binary=false, verbose=false, kw...)
end

io = HTTP.ConnectionPool.getrawstream(http)
f(WebSocket(io,false))
ws = WebSocket(io,false)
try
f(ws)
finally
close(ws)
end
end
end

Expand All @@ -49,7 +54,12 @@ function upgrade(f::Function, http::HTTP.Stream; binary=false)
HTTP.startwrite(http)

io = HTTP.ConnectionPool.getrawstream(http)
f(WebSocket(io, true))
ws = WebSocket(io, true)
try
f(ws)
finally
close(ws)
end
end

function check_upgrade(http)
Expand Down
34 changes: 17 additions & 17 deletions src/HttpServer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ function websocket_handshake(request,client)
Base.write(client.sock, HttpServer.Response(400))
return false
end
resp_key = generate_websocket_key(key)
resp_key = generate_websocket_key(key)

response = HttpServer.Response(101)
response.headers["Upgrade"] = "websocket"
response.headers["Connection"] = "Upgrade"
response.headers["Sec-WebSocket-Accept"] = resp_key

if haskey(request.headers, "Sec-WebSocket-Protocol")
if hasprotocol(request.headers["Sec-WebSocket-Protocol"])
response.headers["Sec-WebSocket-Protocol"] = request.headers["Sec-WebSocket-Protocol"]
else
Base.write(client.sock, HttpServer.Response(400))
return false
end
end

Base.write(client.sock, response)
return true
response = HttpServer.Response(101)
response.headers["Upgrade"] = "websocket"
response.headers["Connection"] = "Upgrade"
response.headers["Sec-WebSocket-Accept"] = resp_key
if haskey(request.headers, "Sec-WebSocket-Protocol")
if hasprotocol(request.headers["Sec-WebSocket-Protocol"])
response.headers["Sec-WebSocket-Protocol"] = request.headers["Sec-WebSocket-Protocol"]
else
Base.write(client.sock, HttpServer.Response(400))
return false
end
end
Base.write(client.sock, response)
return true
end

""" Implement the WebSocketInterface, for compatilibility with HttpServer."""
Expand Down
16 changes: 8 additions & 8 deletions src/WebSockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ end
write_fragment(io, islast, opcode, hasmask, data::Array{UInt8})
Write the raw frame to a bufffer
"""
function write_fragment(io::IO, islast::Bool, opcode, hasmask::Bool, data::Array{UInt8})
function write_fragment(io::IO, islast::Bool, opcode, hasmask::Bool, data::Vector{UInt8})
l = length(data)
b1::UInt8 = (islast ? 0b1000_0000 : 0b0000_0000) | opcode

Expand All @@ -159,10 +159,10 @@ function write_fragment(io::IO, islast::Bool, opcode, hasmask::Bool, data::Array
end

""" Write without interruptions"""
function locked_write(io::IO, islast::Bool, opcode, hasmask, data)
function locked_write(io::IO, islast::Bool, opcode, hasmask::Bool, data::Vector{UInt8})
isa(io, TCPSock) && lock(io.lock)
try
write_fragment(io, islast, opcode, hasmask, Vector{UInt8}(data))
write_fragment(io, islast, opcode, hasmask, data)
finally
if isa(io, TCPSock)
flush(io)
Expand All @@ -173,7 +173,7 @@ end

""" Write text data; will be sent as one frame."""
function Base.write(ws::WebSocket,data::String)
locked_write(ws.socket, true, OPCODE_TEXT, !ws.server, data)
locked_write(ws.socket, true, OPCODE_TEXT, !ws.server, copy(Vector{UInt8}(data))) # Remove this `copy` after v0.7!
end

""" Write binary data; will be sent as one frame."""
Expand All @@ -182,14 +182,14 @@ function Base.write(ws::WebSocket, data::Array{UInt8})
end


function write_ping(io::IO, hasmask, data = "")
function write_ping(io::IO, hasmask, data = UInt8[])
locked_write(io, true, OPCODE_PING, hasmask, data)
end
""" Send a ping message, optionally with data."""
send_ping(ws, data...) = write_ping(ws.socket, !ws.server, data...)


function write_pong(io::IO, hasmask, data = "")
function write_pong(io::IO, hasmask, data = UInt8[])
locked_write(io, true, OPCODE_PONG, hasmask, data)
end
""" Send a pong message, optionally with data."""
Expand All @@ -202,7 +202,7 @@ Send a close message.
function Base.close(ws::WebSocket)
if isopen(ws)
ws.state = CLOSING
locked_write(ws.socket, true, OPCODE_CLOSE, !ws.server, "")
locked_write(ws.socket, true, OPCODE_CLOSE, !ws.server, UInt8[])

# Wait till the other end responds with an OPCODE_CLOSE. This process is
# complicated by potential blocking reads on the WebSocket in other Tasks
Expand Down Expand Up @@ -289,7 +289,7 @@ function handle_control_frame(ws::WebSocket,wsf::WebSocketFragment)
if wsf.opcode == OPCODE_CLOSE
info("$(ws.server ? "Server" : "Client") received OPCODE_CLOSE")
ws.state = CLOSED
locked_write(ws.socket, true, OPCODE_CLOSE, !ws.server, "")
locked_write(ws.socket, true, OPCODE_CLOSE, !ws.server, UInt8[])
elseif wsf.opcode == OPCODE_PING
info("$(ws.server ? "Server" : "Client") received OPCODE_PING")
send_pong(ws,wsf.data)
Expand Down
65 changes: 20 additions & 45 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,9 @@ using Base.Test

@testset "HTTP" begin

info("Testing ws...")
WebSockets.open("ws://echo.websocket.org") do ws
write(ws, "Foo")
@test String(read(ws)) == "Foo"

close(ws)
end
sleep(1)

info("Testing wss...")
WebSockets.open("wss://echo.websocket.org") do ws
write(ws, "Foo")
@test String(read(ws)) == "Foo"

close(ws)
end
sleep(1)

port_HTTP = 8000
port_HttpServer = 8081

info("Start HTTP server on port $(port_HTTP)")
@async HTTP.listen("127.0.0.1",UInt16(port_HTTP)) do http
if WebSockets.is_upgrade(http.message)
Expand All @@ -36,7 +20,6 @@ info("Start HTTP server on port $(port_HTTP)")
end
end

port_HttpServer = 8081
info("Start HttpServer on port $(port_HttpServer)")
wsh = WebSocketHandler() do req,ws
while !eof(ws)
Expand All @@ -49,32 +32,24 @@ server = Server(wsh)

sleep(2)

info("Testing local HTTP server...")
WebSockets.open("ws://127.0.0.1:$(port_HTTP)") do ws
write(ws, "Foo")
@test String(read(ws)) == "Foo"

write(ws, "Bar")
@test String(read(ws)) == "Bar"

send_ping(ws)
read(ws)

close(ws)
end

info("Testing local HttpServer...")
WebSockets.open("ws://127.0.0.1:$(port_HttpServer)") do ws
write(ws, "Foo")
@test String(read(ws)) == "Foo"

write(ws, "Bar")
@test String(read(ws)) == "Bar"

send_ping(ws)
read(ws)

close(ws)
servers = [
("ws", "ws://echo.websocket.org"),
("wss", "wss://echo.websocket.org"),
("HTTP", "ws://127.0.0.1:$(port_HTTP)"),
("HttpServer", "ws://127.0.0.1:$(port_HttpServer)")]

for (s, url) in servers
info("Testing local $(s) server at $(url)...")
WebSockets.open(url) do ws
write(ws, "Foo")
@test String(read(ws)) == "Foo"

write(ws, "Bar")
@test String(read(ws)) == "Bar"

send_ping(ws)
read(ws)
end
end

end # testset

0 comments on commit 9b01fe6

Please sign in to comment.