From 9b01fe63d94d5ddf42554116143748087483ec91 Mon Sep 17 00:00:00 2001 From: Eric Forgy Date: Wed, 21 Feb 2018 00:14:16 +0800 Subject: [PATCH] Http.jl (#92) * Update README * Graceful close from client + run HTTP server HttpServer server simulataneously. * Test ping, pong. Temp tests. * Add try, catch and close Indent WIP * Fix String mask issue * Minor: Fix an indent * Update WebSockets.jl Add a comment to remove the `copy` for v0.7. --- src/HTTP.jl | 14 ++++++++-- src/HttpServer.jl | 34 ++++++++++++------------- src/WebSockets.jl | 16 ++++++------ test/runtests.jl | 65 +++++++++++++++-------------------------------- 4 files changed, 57 insertions(+), 72 deletions(-) diff --git a/src/HTTP.jl b/src/HTTP.jl index f713c42..470c891 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -29,7 +29,12 @@ function open(f::Function, url; binary=false, verbose=false, kw...) end io = HTTP.ConnectionPool.getrawstream(http) - f(WebSocket(io,false)) + ws = WebSocket(io,false) + try + f(ws) + finally + close(ws) + end end end @@ -49,7 +54,12 @@ function upgrade(f::Function, http::HTTP.Stream; binary=false) HTTP.startwrite(http) io = HTTP.ConnectionPool.getrawstream(http) - f(WebSocket(io, true)) + ws = WebSocket(io, true) + try + f(ws) + finally + close(ws) + end end function check_upgrade(http) diff --git a/src/HttpServer.jl b/src/HttpServer.jl index 6d8523f..440c32c 100644 --- a/src/HttpServer.jl +++ b/src/HttpServer.jl @@ -25,24 +25,24 @@ function websocket_handshake(request,client) Base.write(client.sock, HttpServer.Response(400)) return false end - resp_key = generate_websocket_key(key) + resp_key = generate_websocket_key(key) - response = HttpServer.Response(101) - response.headers["Upgrade"] = "websocket" - response.headers["Connection"] = "Upgrade" - response.headers["Sec-WebSocket-Accept"] = resp_key - - if haskey(request.headers, "Sec-WebSocket-Protocol") - if hasprotocol(request.headers["Sec-WebSocket-Protocol"]) - response.headers["Sec-WebSocket-Protocol"] = request.headers["Sec-WebSocket-Protocol"] - else - Base.write(client.sock, HttpServer.Response(400)) - return false - end - end - - Base.write(client.sock, response) - return true + response = HttpServer.Response(101) + response.headers["Upgrade"] = "websocket" + response.headers["Connection"] = "Upgrade" + response.headers["Sec-WebSocket-Accept"] = resp_key + + if haskey(request.headers, "Sec-WebSocket-Protocol") + if hasprotocol(request.headers["Sec-WebSocket-Protocol"]) + response.headers["Sec-WebSocket-Protocol"] = request.headers["Sec-WebSocket-Protocol"] + else + Base.write(client.sock, HttpServer.Response(400)) + return false + end + end + + Base.write(client.sock, response) + return true end """ Implement the WebSocketInterface, for compatilibility with HttpServer.""" diff --git a/src/WebSockets.jl b/src/WebSockets.jl index 95b21d8..4e8aeab 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -136,7 +136,7 @@ end write_fragment(io, islast, opcode, hasmask, data::Array{UInt8}) Write the raw frame to a bufffer """ -function write_fragment(io::IO, islast::Bool, opcode, hasmask::Bool, data::Array{UInt8}) +function write_fragment(io::IO, islast::Bool, opcode, hasmask::Bool, data::Vector{UInt8}) l = length(data) b1::UInt8 = (islast ? 0b1000_0000 : 0b0000_0000) | opcode @@ -159,10 +159,10 @@ function write_fragment(io::IO, islast::Bool, opcode, hasmask::Bool, data::Array end """ Write without interruptions""" -function locked_write(io::IO, islast::Bool, opcode, hasmask, data) +function locked_write(io::IO, islast::Bool, opcode, hasmask::Bool, data::Vector{UInt8}) isa(io, TCPSock) && lock(io.lock) try - write_fragment(io, islast, opcode, hasmask, Vector{UInt8}(data)) + write_fragment(io, islast, opcode, hasmask, data) finally if isa(io, TCPSock) flush(io) @@ -173,7 +173,7 @@ end """ Write text data; will be sent as one frame.""" function Base.write(ws::WebSocket,data::String) - locked_write(ws.socket, true, OPCODE_TEXT, !ws.server, data) + locked_write(ws.socket, true, OPCODE_TEXT, !ws.server, copy(Vector{UInt8}(data))) # Remove this `copy` after v0.7! end """ Write binary data; will be sent as one frame.""" @@ -182,14 +182,14 @@ function Base.write(ws::WebSocket, data::Array{UInt8}) end -function write_ping(io::IO, hasmask, data = "") +function write_ping(io::IO, hasmask, data = UInt8[]) locked_write(io, true, OPCODE_PING, hasmask, data) end """ Send a ping message, optionally with data.""" send_ping(ws, data...) = write_ping(ws.socket, !ws.server, data...) -function write_pong(io::IO, hasmask, data = "") +function write_pong(io::IO, hasmask, data = UInt8[]) locked_write(io, true, OPCODE_PONG, hasmask, data) end """ Send a pong message, optionally with data.""" @@ -202,7 +202,7 @@ Send a close message. function Base.close(ws::WebSocket) if isopen(ws) ws.state = CLOSING - locked_write(ws.socket, true, OPCODE_CLOSE, !ws.server, "") + locked_write(ws.socket, true, OPCODE_CLOSE, !ws.server, UInt8[]) # Wait till the other end responds with an OPCODE_CLOSE. This process is # complicated by potential blocking reads on the WebSocket in other Tasks @@ -289,7 +289,7 @@ function handle_control_frame(ws::WebSocket,wsf::WebSocketFragment) if wsf.opcode == OPCODE_CLOSE info("$(ws.server ? "Server" : "Client") received OPCODE_CLOSE") ws.state = CLOSED - locked_write(ws.socket, true, OPCODE_CLOSE, !ws.server, "") + locked_write(ws.socket, true, OPCODE_CLOSE, !ws.server, UInt8[]) elseif wsf.opcode == OPCODE_PING info("$(ws.server ? "Server" : "Client") received OPCODE_PING") send_pong(ws,wsf.data) diff --git a/test/runtests.jl b/test/runtests.jl index c0afee6..075bdb9 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -5,25 +5,9 @@ using Base.Test @testset "HTTP" begin -info("Testing ws...") -WebSockets.open("ws://echo.websocket.org") do ws - write(ws, "Foo") - @test String(read(ws)) == "Foo" - - close(ws) -end -sleep(1) - -info("Testing wss...") -WebSockets.open("wss://echo.websocket.org") do ws - write(ws, "Foo") - @test String(read(ws)) == "Foo" - - close(ws) -end -sleep(1) - port_HTTP = 8000 +port_HttpServer = 8081 + info("Start HTTP server on port $(port_HTTP)") @async HTTP.listen("127.0.0.1",UInt16(port_HTTP)) do http if WebSockets.is_upgrade(http.message) @@ -36,7 +20,6 @@ info("Start HTTP server on port $(port_HTTP)") end end -port_HttpServer = 8081 info("Start HttpServer on port $(port_HttpServer)") wsh = WebSocketHandler() do req,ws while !eof(ws) @@ -49,32 +32,24 @@ server = Server(wsh) sleep(2) -info("Testing local HTTP server...") -WebSockets.open("ws://127.0.0.1:$(port_HTTP)") do ws - write(ws, "Foo") - @test String(read(ws)) == "Foo" - - write(ws, "Bar") - @test String(read(ws)) == "Bar" - - send_ping(ws) - read(ws) - - close(ws) -end - -info("Testing local HttpServer...") -WebSockets.open("ws://127.0.0.1:$(port_HttpServer)") do ws - write(ws, "Foo") - @test String(read(ws)) == "Foo" - - write(ws, "Bar") - @test String(read(ws)) == "Bar" - - send_ping(ws) - read(ws) - - close(ws) +servers = [ + ("ws", "ws://echo.websocket.org"), + ("wss", "wss://echo.websocket.org"), + ("HTTP", "ws://127.0.0.1:$(port_HTTP)"), + ("HttpServer", "ws://127.0.0.1:$(port_HttpServer)")] + +for (s, url) in servers + info("Testing local $(s) server at $(url)...") + WebSockets.open(url) do ws + write(ws, "Foo") + @test String(read(ws)) == "Foo" + + write(ws, "Bar") + @test String(read(ws)) == "Bar" + + send_ping(ws) + read(ws) + end end end # testset \ No newline at end of file