From 87184bb92dcd1b877f93511cc69d21d396beb392 Mon Sep 17 00:00:00 2001 From: Paul Berg Date: Tue, 12 Jul 2022 22:23:41 +0200 Subject: [PATCH] Update to HTTP.jl 1.0 (#2185) * Update to HTTP.jl 1.0 [HTTP.jl](https://github.com/JuliaWeb/HTTP.jl) is nearing its 1.0 release! This PR is a first draft at updating Pluto to use it. * Use Accept-Encoding: identity in test/compiletimes.jl * use https://github.com/JuliaWeb/HTTP.jl/pull/857 for testing * Allow using IO instead of stream for tests * soften HTTP.jl constraint for now * update HTTP.jl to 1.0.2 * remove test install * close serversocket --- Project.toml | 2 +- src/webserver/PutUpdates.jl | 29 ++++-- src/webserver/Static.jl | 32 +++--- src/webserver/WebServer.jl | 180 ++++++++++++++++------------------ src/webserver/WebSocketFix.jl | 80 --------------- test/compiletimes.jl | 2 +- 6 files changed, 125 insertions(+), 200 deletions(-) delete mode 100644 src/webserver/WebSocketFix.jl diff --git a/Project.toml b/Project.toml index f6c6abd129..efcdff0eb7 100644 --- a/Project.toml +++ b/Project.toml @@ -31,7 +31,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [compat] Configurations = "0.15, 0.16, 0.17" FuzzyCompletions = "0.3, 0.4, 0.5" -HTTP = "^0.9.1" +HTTP = "^1.0.2" HypertextLiteral = "0.7, 0.8, 0.9" MIMEs = "0.1" MsgPack = "1.1" diff --git a/src/webserver/PutUpdates.jl b/src/webserver/PutUpdates.jl index ea541777d0..2fd44f074b 100644 --- a/src/webserver/PutUpdates.jl +++ b/src/webserver/PutUpdates.jl @@ -16,7 +16,9 @@ function serialize_message_to_stream(io::IO, message::UpdateMessage) end function serialize_message(message::UpdateMessage) - sprint(serialize_message_to_stream, message) + io = IOBuffer() + serialize_message_to_stream(io, message) + take!(io) end "Send `messages` to all clients connected to the `notebook`." @@ -65,18 +67,29 @@ end # https://github.com/JuliaWeb/HTTP.jl/issues/382 const flushtoken = Token() +function send_message(stream::HTTP.WebSocket, msg) + HTTP.send(stream, serialize_message(msg)) +end +function send_message(stream::IO, msg) + write(stream, serialize_message(msg)) +end + +function is_stream_open(stream::HTTP.WebSocket) + !HTTP.WebSockets.isclosed(stream) +end +function is_stream_open(io::IO) + isopen(io) +end + function flushclient(client::ClientSession) take!(flushtoken) while isready(client.pendingupdates) next_to_send = take!(client.pendingupdates) - + try if client.stream !== nothing - if isopen(client.stream) - if client.stream isa HTTP.WebSockets.WebSocket - client.stream.frame_type = HTTP.WebSockets.WS_BINARY - end - write(client.stream, serialize_message(next_to_send)) + if is_stream_open(client.stream) + send_message(client.stream, next_to_send) else put!(flushtoken) return false @@ -112,4 +125,4 @@ end function flushallclients(session::ServerSession) flushallclients(session, values(session.connected_clients)) -end \ No newline at end of file +end diff --git a/src/webserver/Static.jl b/src/webserver/Static.jl index 0aa511d027..e4aecb908e 100644 --- a/src/webserver/Static.jl +++ b/src/webserver/Static.jl @@ -158,11 +158,11 @@ function http_router_for(session::ServerSession) # require_secret_for_access == false # Access to all 'risky' endpoints is still restricted to requests that have the secret cookie, but visiting `/` is allowed, and it will set the cookie. From then on the security situation is identical to # secret_for_access == true - HTTP.@register(router, "GET", "/", with_authentication( + HTTP.register!(router, "GET", "/", with_authentication( create_serve_onefile(project_relative_path(frontend_directory(), "index.html")); required=security.require_secret_for_access )) - HTTP.@register(router, "GET", "/edit", with_authentication( + HTTP.register!(router, "GET", "/edit", with_authentication( create_serve_onefile(project_relative_path(frontend_directory(), "editor.html")); required=security.require_secret_for_access || security.require_secret_for_open_links, @@ -170,8 +170,8 @@ function http_router_for(session::ServerSession) # the /edit page also uses with_authentication, but this is not how access to notebooks is secured: this is done by requiring the WS connection to be authenticated. # we still use it for /edit to do the cookie stuff, and show a more helpful error, instead of the WS never connecting. - HTTP.@register(router, "GET", "/ping", r -> HTTP.Response(200, "OK!")) - HTTP.@register(router, "GET", "/possible_binder_token_please", r -> session.binder_token === nothing ? HTTP.Response(200,"") : HTTP.Response(200, session.binder_token)) + HTTP.register!(router, "GET", "/ping", r -> HTTP.Response(200, "OK!")) + HTTP.register!(router, "GET", "/possible_binder_token_please", r -> session.binder_token === nothing ? HTTP.Response(200,"") : HTTP.Response(200, session.binder_token)) function try_launch_notebook_response(action::Function, path_or_url::AbstractString; title="", advice="", home_url="./", as_redirect=true, action_kwargs...) try @@ -192,8 +192,8 @@ function http_router_for(session::ServerSession) ) do request::HTTP.Request notebook_response(SessionActions.new(session); as_redirect=(request.method == "GET")) end - HTTP.@register(router, "GET", "/new", serve_newfile) - HTTP.@register(router, "POST", "/new", serve_newfile) + HTTP.register!(router, "GET", "/new", serve_newfile) + HTTP.register!(router, "POST", "/new", serve_newfile) # This is not in Dynamic.jl because of bookmarks, how HTML works, # real loading bars and the rest; Same for CustomLaunchEvent @@ -242,8 +242,8 @@ function http_router_for(session::ServerSession) end end - HTTP.@register(router, "GET", "/open", serve_openfile) - HTTP.@register(router, "POST", "/open", serve_openfile) + HTTP.register!(router, "GET", "/open", serve_openfile) + HTTP.register!(router, "POST", "/open", serve_openfile) serve_sample = with_authentication(; required=security.require_secret_for_access || @@ -262,8 +262,8 @@ function http_router_for(session::ServerSession) advice="Please report this error!" ) end - HTTP.@register(router, "GET", "/sample/*", serve_sample) - HTTP.@register(router, "POST", "/sample/*", serve_sample) + HTTP.register!(router, "GET", "/sample/*", serve_sample) + HTTP.register!(router, "POST", "/sample/*", serve_sample) notebook_from_uri(request) = let uri = HTTP.URI(request.target) @@ -285,7 +285,7 @@ function http_router_for(session::ServerSession) return error_response(400, "Bad query", "Please report this error!", sprint(showerror, e, stacktrace(catch_backtrace()))) end end - HTTP.@register(router, "GET", "/notebookfile", serve_notebookfile) + HTTP.register!(router, "GET", "/notebookfile", serve_notebookfile) serve_statefile = with_authentication(; required=security.require_secret_for_access || @@ -301,7 +301,7 @@ function http_router_for(session::ServerSession) return error_response(400, "Bad query", "Please report this error!", sprint(showerror, e, stacktrace(catch_backtrace()))) end end - HTTP.@register(router, "GET", "/statefile", serve_statefile) + HTTP.register!(router, "GET", "/statefile", serve_statefile) serve_notebookexport = with_authentication(; required=security.require_secret_for_access || @@ -317,7 +317,7 @@ function http_router_for(session::ServerSession) return error_response(400, "Bad query", "Please report this error!", sprint(showerror, e, stacktrace(catch_backtrace()))) end end - HTTP.@register(router, "GET", "/notebookexport", serve_notebookexport) + HTTP.register!(router, "GET", "/notebookexport", serve_notebookexport) serve_notebookupload = with_authentication(; required=security.require_secret_for_access || @@ -338,15 +338,15 @@ function http_router_for(session::ServerSession) advice="The contents could not be read as a Pluto notebook file. When copying contents from somewhere else, make sure that you copy the entire notebook file. You can also report this error!" ) end - HTTP.@register(router, "POST", "/notebookupload", serve_notebookupload) + HTTP.register!(router, "POST", "/notebookupload", serve_notebookupload) function serve_asset(request::HTTP.Request) uri = HTTP.URI(request.target) filepath = project_relative_path(frontend_directory(), relpath(HTTP.unescapeuri(uri.path), "/")) asset_response(filepath; cacheable=should_cache(filepath)) end - HTTP.@register(router, "GET", "/*", serve_asset) - HTTP.@register(router, "GET", "/favicon.ico", create_serve_onefile(project_relative_path(frontend_directory(allow_bundled=false), "img", "favicon.ico"))) + HTTP.register!(router, "GET", "/**", serve_asset) + HTTP.register!(router, "GET", "/favicon.ico", create_serve_onefile(project_relative_path(frontend_directory(allow_bundled=false), "img", "favicon.ico"))) return router end diff --git a/src/webserver/WebServer.jl b/src/webserver/WebServer.jl index 5f967ba4bc..4fef488f25 100644 --- a/src/webserver/WebServer.jl +++ b/src/webserver/WebServer.jl @@ -4,8 +4,6 @@ import HTTP import Sockets import .PkgCompat -include("./WebSocketFix.jl") - function open_in_default_browser(url::AbstractString)::Bool try if Sys.isapple() @@ -27,11 +25,13 @@ end isurl(s::String) = startswith(s, "http://") || startswith(s, "https://") -swallow_exception(f, exception_type::Type{T}) where T = - try f() +function swallow_exception(f, exception_type::Type{T}) where {T} + try + f() catch e isa(e, T) || rethrow(e) end +end """ Pluto.run() @@ -67,19 +67,19 @@ end # Deprecation errors -function run(host::String, port::Union{Nothing,Integer}=nothing; kwargs...) +function run(host::String, port::Union{Nothing,Integer} = nothing; kwargs...) @error """run(host, port) is deprecated in favor of: - + run(;host="$host", port=$port) - + """ end function run(port::Integer; kwargs...) @error "Oopsie! This is the old command to launch Pluto. The new command is: - + Pluto.run() - + without the port as argument - it will choose one automatically. If you need to specify the port, use: Pluto.run(port=$port) @@ -88,27 +88,17 @@ end # open notebook(s) on startup -open_notebook!(session:: ServerSession, notebook:: Nothing) = Nothing +open_notebook!(session::ServerSession, notebook::Nothing) = Nothing -open_notebook!(session:: ServerSession, notebook:: AbstractString) = SessionActions.open(session, notebook) +open_notebook!(session::ServerSession, notebook::AbstractString) = SessionActions.open(session, notebook) -function open_notebook!(session:: ServerSession, notebook:: AbstractVector{<: AbstractString}) +function open_notebook!(session::ServerSession, notebook::AbstractVector{<:AbstractString}) for nb in notebook SessionActions.open(session, nb) end end -""" - run(session::ServerSession) - -Specifiy the [`Pluto.ServerSession`](@ref) to run the web server on, which includes the configuration. Passing a session as argument allows you to start the web server with some notebooks already running. See [`SessionActions`](@ref) to learn more about manipulating a `ServerSession`. -""" -function run(session::ServerSession) - pluto_router = http_router_for(session) - Base.invokelatest(run, session, pluto_router) -end - const is_first_run = Ref(true) "Return a port and serversocket to use while taking into account the `favourite_port`." @@ -127,16 +117,23 @@ function port_serversocket(hostIP::Sockets.IPAddr, favourite_port, port_hint) return port, serversocket end -function run(session::ServerSession, pluto_router) +""" + run(session::ServerSession) + +Specifiy the [`Pluto.ServerSession`](@ref) to run the web server on, which includes the configuration. Passing a session as argument allows you to start the web server with some notebooks already running. See [`SessionActions`](@ref) to learn more about manipulating a `ServerSession`. +""" +function run(session::ServerSession) if is_first_run[] is_first_run[] = false @info "Loading..." end - + if VERSION < v"1.6.2" @warn("\nPluto is running on an old version of Julia ($(VERSION)) that is no longer supported. Visit https://julialang.org/downloads/ for more information about upgrading Julia.") end + pluto_router = http_router_for(session) + notebook_at_startup = session.options.server.notebook open_notebook!(session, notebook_at_startup) @@ -147,11 +144,23 @@ function run(session::ServerSession, pluto_router) local port, serversocket = port_serversocket(hostIP, favourite_port, port_hint) - shutdown_server = Ref{Function}(() -> ()) + on_shutdown() = @sync begin + # Triggered by HTTP.jl + @info("\n\nClosing Pluto... Restart Julia for a fresh session. \n\nHave a nice day! 🎈\n\n") + # TODO: put do_work tokens back + @async swallow_exception(() -> close(serversocket), Base.IOError) + for client in values(session.connected_clients) + @async swallow_exception(() -> close(client.stream), Base.IOError) + end + empty!(session.connected_clients) + for nb in values(session.notebooks) + @asynclog SessionActions.shutdown(session, nb; keep_in_session = false, async = false, verbose = false) + end + end - servertask = @async HTTP.serve(hostIP, port; stream=true, server=serversocket) do http::HTTP.Stream + server = HTTP.listen!(hostIP, port; stream = true, server = serversocket, on_shutdown) do http::HTTP.Stream # messy messy code so that we can use the websocket on the same port as the HTTP server - if HTTP.WebSockets.is_upgrade(http.message) + if HTTP.WebSockets.isupgrade(http.message) secret_required = let s = session.options.security s.require_secret_for_access || s.require_secret_for_open_links @@ -160,39 +169,34 @@ function run(session::ServerSession, pluto_router) try HTTP.WebSockets.upgrade(http) do clientstream - if !isopen(clientstream) + if HTTP.WebSockets.isclosed(clientstream) return end try - while !eof(clientstream) - # This stream contains data received over the WebSocket. - # It is formatted and MsgPack-encoded by send(...) in PlutoConnection.js - local parentbody = nothing - try - message = collect(WebsocketFix.readmessage(clientstream)) - parentbody = unpack(message) - - let - lag = session.options.server.simulated_lag - (lag > 0) && sleep(lag * (0.5 + rand())) # sleep(0) would yield to the process manager which we dont want - end - - process_ws_message(session, parentbody, clientstream) - catch ex - if ex isa InterruptException - shutdown_server[]() - elseif ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError - # that's fine! - else - bt = stacktrace(catch_backtrace()) - @warn "Reading WebSocket client stream failed for unknown reason:" parentbody exception = (ex, bt) + for message in clientstream + # This stream contains data received over the WebSocket. + # It is formatted and MsgPack-encoded by send(...) in PlutoConnection.js + local parentbody = nothing + try + parentbody = unpack(message) + + let + lag = session.options.server.simulated_lag + (lag > 0) && sleep(lag * (0.5 + rand())) # sleep(0) would yield to the process manager which we dont want + end + + process_ws_message(session, parentbody, clientstream) + catch ex + if ex isa InterruptException || ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError + # that's fine! + else + bt = stacktrace(catch_backtrace()) + @warn "Reading WebSocket client stream failed for unknown reason:" parentbody exception = (ex, bt) + end end end - end catch ex - if ex isa InterruptException - shutdown_server[]() - elseif ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError || (ex isa Base.IOError && occursin("connection reset", ex.msg)) + if ex isa InterruptException || ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError || (ex isa Base.IOError && occursin("connection reset", ex.msg)) # that's fine! else bt = stacktrace(catch_backtrace()) @@ -202,7 +206,7 @@ function run(session::ServerSession, pluto_router) end catch ex if ex isa InterruptException - shutdown_server[]() + # that's fine! elseif ex isa Base.IOError # that's fine! elseif ex isa ArgumentError && occursin("stream is closed", ex.msg) @@ -225,10 +229,10 @@ function run(session::ServerSession, pluto_router) end else # then it's a regular HTTP request, not a WS upgrade - + request::HTTP.Request = http.message request.body = read(http) - HTTP.closeread(http) + # HTTP.closeread(http) # If a "token" url parameter is passed in from binder, then we store it to add to every URL (so that you can share the URL to collaborate). params = HTTP.queryparams(HTTP.URI(request.target)) @@ -236,9 +240,8 @@ function run(session::ServerSession, pluto_router) session.binder_token = params["token"] end - request_body = IOBuffer(HTTP.payload(request)) - response_body = HTTP.handle(pluto_router, request) - + response_body = pluto_router(request) + request.response::HTTP.Response = response_body request.response.request = request try @@ -249,7 +252,6 @@ function run(session::ServerSession, pluto_router) HTTP.setheader(http, "Server" => "Pluto.jl/$(PLUTO_VERSION_STR[2:end]) Julia/$(JULIA_VERSION_STR[2:end])") HTTP.startwrite(http) write(http, request.response.body) - HTTP.closewrite(http) catch e if isa(e, Base.IOError) || isa(e, ArgumentError) # @warn "Attempted to write to a closed stream at $(request.target)" @@ -259,15 +261,16 @@ function run(session::ServerSession, pluto_router) end end end - - server_running() = try - HTTP.get("http://$(hostIP):$(port)/ping"; status_exception=false, retry=false, connect_timeout=10, readtimeout=10).status == 200 - catch - false - end + + server_running() = + try + HTTP.get("http://$(hostIP):$(port)/ping"; status_exception = false, retry = false, connect_timeout = 10, readtimeout = 10).status == 200 + catch + false + end # Wait for the server to start up before opening the browser. We have a 5 second grace period for allowing the connection, and then 10 seconds for the server to write data. WorkspaceManager.poll(server_running, 5.0, 1.0) - + address = pretty_address(session, hostIP, port) if session.options.server.launch_browser && open_in_default_browser(address) @info("\nOpening $address in your default browser... ~ have fun!") @@ -275,7 +278,7 @@ function run(session::ServerSession, pluto_router) @info("\nGo to $address in your browser to start writing ~ have fun!") end @info("\nPress Ctrl+C in this terminal to stop Pluto\n\n") - + # Trigger ServerStartEvent with server details try_event_call(session, ServerStartEvent(address, port)) @@ -286,31 +289,18 @@ function run(session::ServerSession, pluto_router) # Start this in the background, so that the first notebook launch (which will trigger registry update) will be faster @asynclog withtoken(pkg_token) do will_update = !PkgCompat.check_registry_age() - PkgCompat.update_registries(; force=false) + PkgCompat.update_registries(; force = false) will_update && println(" Updating registry done ✓") end - shutdown_server[] = () -> @sync begin - @info("\n\nClosing Pluto... Restart Julia for a fresh session. \n\nHave a nice day! 🎈\n\n") - @async swallow_exception(() -> close(serversocket), Base.IOError) - # TODO: HTTP has a kill signal? - # TODO: put do_work tokens back - for client in values(session.connected_clients) - @async swallow_exception(() -> close(client.stream), Base.IOError) - end - empty!(session.connected_clients) - for nb in values(session.notebooks) - @asynclog SessionActions.shutdown(session, nb; keep_in_session=false, async=false, verbose=false) - end - end - try # create blocking call and switch the scheduler back to the server task, so that interrupts land there - wait(servertask) + wait(server) catch e if e isa InterruptException - shutdown_server[]() + close(server) elseif e isa TaskFailedException + @debug "Error is " exception = e stacktrace = catch_backtrace() # nice! else rethrow(e) @@ -319,9 +309,9 @@ function run(session::ServerSession, pluto_router) end precompile(run, (ServerSession, HTTP.Handlers.Router{Symbol("##001")})) -get_favorite_notebook(notebook:: Nothing) = nothing -get_favorite_notebook(notebook:: String) = notebook -get_favorite_notebook(notebook:: AbstractVector) = first(notebook) +get_favorite_notebook(notebook::Nothing) = nothing +get_favorite_notebook(notebook::String) = notebook +get_favorite_notebook(notebook::AbstractVector) = first(notebook) function pretty_address(session::ServerSession, hostIP, port) root = if session.options.server.root_url !== nothing @@ -359,15 +349,17 @@ function pretty_address(session::ServerSession, hostIP, port) else root end - string(HTTP.URI(HTTP.URI(new_root); query=url_params)) + string(HTTP.URI(HTTP.URI(new_root); query = url_params)) end "All messages sent over the WebSocket get decoded+deserialized and end up here." -function process_ws_message(session::ServerSession, parentbody::Dict, clientstream::IO) +function process_ws_message(session::ServerSession, parentbody::Dict, clientstream) client_id = Symbol(parentbody["client_id"]) - client = get!(session.connected_clients, client_id, ClientSession(client_id, clientstream)) + client = get!(session.connected_clients, client_id ) do + ClientSession(client_id, clientstream) + end client.stream = clientstream # it might change when the same client reconnects - + messagetype = Symbol(parentbody["type"]) request_id = Symbol(parentbody["request_id"]) @@ -384,7 +376,7 @@ function process_ws_message(session::ServerSession, parentbody::Dict, clientstre client.connected_notebook = notebook end end - + notebook else nothing diff --git a/src/webserver/WebSocketFix.jl b/src/webserver/WebSocketFix.jl deleted file mode 100644 index a7ad726938..0000000000 --- a/src/webserver/WebSocketFix.jl +++ /dev/null @@ -1,80 +0,0 @@ -"Things that will hopefully go into HTTP.jl someday." -module WebsocketFix - -import HTTP.WebSockets - -function readframe(ws::WebSockets.WebSocket) - header = WebSockets.readheader(ws.io) - # @debug 1 "WebSocket ➡️ $header" - - if header.length > 0 - if length(ws.rxpayload) < header.length - resize!(ws.rxpayload, header.length) - end - unsafe_read(ws.io, pointer(ws.rxpayload), header.length) - # @debug 2 " ➡️ \"$(String(ws.rxpayload[1:header.length]))\"" - end - l = Int(header.length) - if header.hasmask - WebSockets.mask!(ws.rxpayload, ws.rxpayload, l, reinterpret(UInt8, [header.mask])) - end - - return header, view(ws.rxpayload, 1:l) -end - -""" - readmessage(ws::WebSocket) - -HTTP.jl's default `readframe` (or `readavailable`) doesn't look at the FINAL field of frames. -This means that it will return a frame no matter what, even though most people expect to get a full message. -This method fixes that and gives you what you expect. -""" -function readmessage(ws::WebSockets.WebSocket) - # this code is based on HTTP.jl source code: https://github.com/JuliaWeb/HTTP.jl/blob/master/src/WebSockets.jl - - header, data = readframe(ws) - l = Int(header.length) - - if header.opcode == WebSockets.WS_CLOSE - ws.rxclosed = true - if l >= 2 - status = UInt16(ws.rxpayload[1]) << 8 | ws.rxpayload[2] - if status != 1000 - message = String(ws.rxpayload[3:l]) - status_descr = get(WebSockets.STATUS_CODE_DESCRIPTION, Int(status), "") - msg = "Status: $(status_descr), Internal Code: $(message)" - throw(WebSockets.WebSocketError(status, msg)) - end - end - return UInt8[] - elseif header.opcode == WebSockets.WS_PING - WebSockets.wswrite(ws, WebSockets.WS_FINAL | WebSockets.WS_PONG, ws.rxpayload[1:l]) - header2, data2 = readframe(ws) - return readmessage(ws) - elseif header.opcode == WebSockets.WS_CONTINUATION - error("WS continuation gone wrong") - else - if header.final == true - return view(ws.rxpayload, 1:l) - else - multi_message_data = UInt8[] - append!(multi_message_data, data) - while true - header2, data2 = readframe(ws) - if header2.opcode != WebSockets.WS_CONTINUATION - println("header2.opcode:", header2.opcode) - println("header2:", header2) - throw("Should be a continuation") - end - append!(multi_message_data, data2) - if header2.final - break - end - end - - multi_message_data - end - end -end - -end \ No newline at end of file diff --git a/test/compiletimes.jl b/test/compiletimes.jl index 123f01ca3c..c1eef7d165 100644 --- a/test/compiletimes.jl +++ b/test/compiletimes.jl @@ -40,7 +40,7 @@ wait_for_ready(nb) Pluto.SessionActions.shutdown(🍭, nb; async=false) -# Compile HTTP get. +# Compile HTTP get. Use no encoding since there seem to be an issue with Accept-Encoding: gzip HTTP.get("http://github.com") @timeit TOUT "Pluto.run" server_task = @eval let