From 8aa0ec51e35c852d8f860987d61a8671eef56661 Mon Sep 17 00:00:00 2001 From: Paul Date: Sun, 19 Jun 2022 14:20:46 +0200 Subject: [PATCH] Update to HTTP.jl 1.0 [HTTP.jl](https://github.com/JuliaWeb/HTTP.jl) is nearing its 1.0 release! This PR is a first draft at updating Pluto to use it. --- Project.toml | 2 +- src/webserver/PutUpdates.jl | 15 ++-- src/webserver/Static.jl | 32 ++++---- src/webserver/WebServer.jl | 146 ++++++++++++++++------------------ src/webserver/WebSocketFix.jl | 80 ------------------- 5 files changed, 92 insertions(+), 183 deletions(-) delete mode 100644 src/webserver/WebSocketFix.jl diff --git a/Project.toml b/Project.toml index f6c6abd129..e106a071a5 100644 --- a/Project.toml +++ b/Project.toml @@ -31,7 +31,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [compat] Configurations = "0.15, 0.16, 0.17" FuzzyCompletions = "0.3, 0.4, 0.5" -HTTP = "^0.9.1" +HTTP = "^1.0.0" HypertextLiteral = "0.7, 0.8, 0.9" MIMEs = "0.1" MsgPack = "1.1" diff --git a/src/webserver/PutUpdates.jl b/src/webserver/PutUpdates.jl index ea541777d0..c566067252 100644 --- a/src/webserver/PutUpdates.jl +++ b/src/webserver/PutUpdates.jl @@ -16,7 +16,9 @@ function serialize_message_to_stream(io::IO, message::UpdateMessage) end function serialize_message(message::UpdateMessage) - sprint(serialize_message_to_stream, message) + io = IOBuffer() + serialize_message_to_stream(io, message) + take!(io) end "Send `messages` to all clients connected to the `notebook`." @@ -69,14 +71,11 @@ function flushclient(client::ClientSession) take!(flushtoken) while isready(client.pendingupdates) next_to_send = take!(client.pendingupdates) - + try if client.stream !== nothing - if isopen(client.stream) - if client.stream isa HTTP.WebSockets.WebSocket - client.stream.frame_type = HTTP.WebSockets.WS_BINARY - end - write(client.stream, serialize_message(next_to_send)) + if !HTTP.WebSockets.isclosed(client.stream) + HTTP.send(client.stream, serialize_message(next_to_send)) else put!(flushtoken) return false @@ -112,4 +111,4 @@ end function flushallclients(session::ServerSession) flushallclients(session, values(session.connected_clients)) -end \ No newline at end of file +end diff --git a/src/webserver/Static.jl b/src/webserver/Static.jl index 0aa511d027..e4aecb908e 100644 --- a/src/webserver/Static.jl +++ b/src/webserver/Static.jl @@ -158,11 +158,11 @@ function http_router_for(session::ServerSession) # require_secret_for_access == false # Access to all 'risky' endpoints is still restricted to requests that have the secret cookie, but visiting `/` is allowed, and it will set the cookie. From then on the security situation is identical to # secret_for_access == true - HTTP.@register(router, "GET", "/", with_authentication( + HTTP.register!(router, "GET", "/", with_authentication( create_serve_onefile(project_relative_path(frontend_directory(), "index.html")); required=security.require_secret_for_access )) - HTTP.@register(router, "GET", "/edit", with_authentication( + HTTP.register!(router, "GET", "/edit", with_authentication( create_serve_onefile(project_relative_path(frontend_directory(), "editor.html")); required=security.require_secret_for_access || security.require_secret_for_open_links, @@ -170,8 +170,8 @@ function http_router_for(session::ServerSession) # the /edit page also uses with_authentication, but this is not how access to notebooks is secured: this is done by requiring the WS connection to be authenticated. # we still use it for /edit to do the cookie stuff, and show a more helpful error, instead of the WS never connecting. - HTTP.@register(router, "GET", "/ping", r -> HTTP.Response(200, "OK!")) - HTTP.@register(router, "GET", "/possible_binder_token_please", r -> session.binder_token === nothing ? HTTP.Response(200,"") : HTTP.Response(200, session.binder_token)) + HTTP.register!(router, "GET", "/ping", r -> HTTP.Response(200, "OK!")) + HTTP.register!(router, "GET", "/possible_binder_token_please", r -> session.binder_token === nothing ? HTTP.Response(200,"") : HTTP.Response(200, session.binder_token)) function try_launch_notebook_response(action::Function, path_or_url::AbstractString; title="", advice="", home_url="./", as_redirect=true, action_kwargs...) try @@ -192,8 +192,8 @@ function http_router_for(session::ServerSession) ) do request::HTTP.Request notebook_response(SessionActions.new(session); as_redirect=(request.method == "GET")) end - HTTP.@register(router, "GET", "/new", serve_newfile) - HTTP.@register(router, "POST", "/new", serve_newfile) + HTTP.register!(router, "GET", "/new", serve_newfile) + HTTP.register!(router, "POST", "/new", serve_newfile) # This is not in Dynamic.jl because of bookmarks, how HTML works, # real loading bars and the rest; Same for CustomLaunchEvent @@ -242,8 +242,8 @@ function http_router_for(session::ServerSession) end end - HTTP.@register(router, "GET", "/open", serve_openfile) - HTTP.@register(router, "POST", "/open", serve_openfile) + HTTP.register!(router, "GET", "/open", serve_openfile) + HTTP.register!(router, "POST", "/open", serve_openfile) serve_sample = with_authentication(; required=security.require_secret_for_access || @@ -262,8 +262,8 @@ function http_router_for(session::ServerSession) advice="Please report this error!" ) end - HTTP.@register(router, "GET", "/sample/*", serve_sample) - HTTP.@register(router, "POST", "/sample/*", serve_sample) + HTTP.register!(router, "GET", "/sample/*", serve_sample) + HTTP.register!(router, "POST", "/sample/*", serve_sample) notebook_from_uri(request) = let uri = HTTP.URI(request.target) @@ -285,7 +285,7 @@ function http_router_for(session::ServerSession) return error_response(400, "Bad query", "Please report this error!", sprint(showerror, e, stacktrace(catch_backtrace()))) end end - HTTP.@register(router, "GET", "/notebookfile", serve_notebookfile) + HTTP.register!(router, "GET", "/notebookfile", serve_notebookfile) serve_statefile = with_authentication(; required=security.require_secret_for_access || @@ -301,7 +301,7 @@ function http_router_for(session::ServerSession) return error_response(400, "Bad query", "Please report this error!", sprint(showerror, e, stacktrace(catch_backtrace()))) end end - HTTP.@register(router, "GET", "/statefile", serve_statefile) + HTTP.register!(router, "GET", "/statefile", serve_statefile) serve_notebookexport = with_authentication(; required=security.require_secret_for_access || @@ -317,7 +317,7 @@ function http_router_for(session::ServerSession) return error_response(400, "Bad query", "Please report this error!", sprint(showerror, e, stacktrace(catch_backtrace()))) end end - HTTP.@register(router, "GET", "/notebookexport", serve_notebookexport) + HTTP.register!(router, "GET", "/notebookexport", serve_notebookexport) serve_notebookupload = with_authentication(; required=security.require_secret_for_access || @@ -338,15 +338,15 @@ function http_router_for(session::ServerSession) advice="The contents could not be read as a Pluto notebook file. When copying contents from somewhere else, make sure that you copy the entire notebook file. You can also report this error!" ) end - HTTP.@register(router, "POST", "/notebookupload", serve_notebookupload) + HTTP.register!(router, "POST", "/notebookupload", serve_notebookupload) function serve_asset(request::HTTP.Request) uri = HTTP.URI(request.target) filepath = project_relative_path(frontend_directory(), relpath(HTTP.unescapeuri(uri.path), "/")) asset_response(filepath; cacheable=should_cache(filepath)) end - HTTP.@register(router, "GET", "/*", serve_asset) - HTTP.@register(router, "GET", "/favicon.ico", create_serve_onefile(project_relative_path(frontend_directory(allow_bundled=false), "img", "favicon.ico"))) + HTTP.register!(router, "GET", "/**", serve_asset) + HTTP.register!(router, "GET", "/favicon.ico", create_serve_onefile(project_relative_path(frontend_directory(allow_bundled=false), "img", "favicon.ico"))) return router end diff --git a/src/webserver/WebServer.jl b/src/webserver/WebServer.jl index 5f967ba4bc..88320b48a5 100644 --- a/src/webserver/WebServer.jl +++ b/src/webserver/WebServer.jl @@ -4,8 +4,6 @@ import HTTP import Sockets import .PkgCompat -include("./WebSocketFix.jl") - function open_in_default_browser(url::AbstractString)::Bool try if Sys.isapple() @@ -27,8 +25,9 @@ end isurl(s::String) = startswith(s, "http://") || startswith(s, "https://") -swallow_exception(f, exception_type::Type{T}) where T = - try f() +swallow_exception(f, exception_type::Type{T}) where {T} = + try + f() catch e isa(e, T) || rethrow(e) end @@ -67,19 +66,19 @@ end # Deprecation errors -function run(host::String, port::Union{Nothing,Integer}=nothing; kwargs...) +function run(host::String, port::Union{Nothing,Integer} = nothing; kwargs...) @error """run(host, port) is deprecated in favor of: - + run(;host="$host", port=$port) - + """ end function run(port::Integer; kwargs...) @error "Oopsie! This is the old command to launch Pluto. The new command is: - + Pluto.run() - + without the port as argument - it will choose one automatically. If you need to specify the port, use: Pluto.run(port=$port) @@ -88,27 +87,17 @@ end # open notebook(s) on startup -open_notebook!(session:: ServerSession, notebook:: Nothing) = Nothing +open_notebook!(session::ServerSession, notebook::Nothing) = Nothing -open_notebook!(session:: ServerSession, notebook:: AbstractString) = SessionActions.open(session, notebook) +open_notebook!(session::ServerSession, notebook::AbstractString) = SessionActions.open(session, notebook) -function open_notebook!(session:: ServerSession, notebook:: AbstractVector{<: AbstractString}) +function open_notebook!(session::ServerSession, notebook::AbstractVector{<:AbstractString}) for nb in notebook SessionActions.open(session, nb) end end -""" - run(session::ServerSession) - -Specifiy the [`Pluto.ServerSession`](@ref) to run the web server on, which includes the configuration. Passing a session as argument allows you to start the web server with some notebooks already running. See [`SessionActions`](@ref) to learn more about manipulating a `ServerSession`. -""" -function run(session::ServerSession) - pluto_router = http_router_for(session) - Base.invokelatest(run, session, pluto_router) -end - const is_first_run = Ref(true) "Return a port and serversocket to use while taking into account the `favourite_port`." @@ -127,16 +116,23 @@ function port_serversocket(hostIP::Sockets.IPAddr, favourite_port, port_hint) return port, serversocket end -function run(session::ServerSession, pluto_router) +""" + run(session::ServerSession) + +Specifiy the [`Pluto.ServerSession`](@ref) to run the web server on, which includes the configuration. Passing a session as argument allows you to start the web server with some notebooks already running. See [`SessionActions`](@ref) to learn more about manipulating a `ServerSession`. +""" +function run(session::ServerSession) if is_first_run[] is_first_run[] = false @info "Loading..." end - + if VERSION < v"1.6.2" @warn("\nPluto is running on an old version of Julia ($(VERSION)) that is no longer supported. Visit https://julialang.org/downloads/ for more information about upgrading Julia.") end + pluto_router = http_router_for(session) + notebook_at_startup = session.options.server.notebook open_notebook!(session, notebook_at_startup) @@ -149,9 +145,9 @@ function run(session::ServerSession, pluto_router) shutdown_server = Ref{Function}(() -> ()) - servertask = @async HTTP.serve(hostIP, port; stream=true, server=serversocket) do http::HTTP.Stream + servertask = HTTP.listen!(hostIP, port; stream = true, server = serversocket, on_shutdown = () -> shutdown_server[]()) do http::HTTP.Stream # messy messy code so that we can use the websocket on the same port as the HTTP server - if HTTP.WebSockets.is_upgrade(http.message) + if HTTP.WebSockets.isupgrade(http.message) secret_required = let s = session.options.security s.require_secret_for_access || s.require_secret_for_open_links @@ -160,39 +156,34 @@ function run(session::ServerSession, pluto_router) try HTTP.WebSockets.upgrade(http) do clientstream - if !isopen(clientstream) + if HTTP.WebSockets.isclosed(clientstream) return end try - while !eof(clientstream) - # This stream contains data received over the WebSocket. - # It is formatted and MsgPack-encoded by send(...) in PlutoConnection.js - local parentbody = nothing - try - message = collect(WebsocketFix.readmessage(clientstream)) - parentbody = unpack(message) - - let - lag = session.options.server.simulated_lag - (lag > 0) && sleep(lag * (0.5 + rand())) # sleep(0) would yield to the process manager which we dont want - end - - process_ws_message(session, parentbody, clientstream) - catch ex - if ex isa InterruptException - shutdown_server[]() - elseif ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError - # that's fine! - else - bt = stacktrace(catch_backtrace()) - @warn "Reading WebSocket client stream failed for unknown reason:" parentbody exception = (ex, bt) + for message in clientstream + # This stream contains data received over the WebSocket. + # It is formatted and MsgPack-encoded by send(...) in PlutoConnection.js + local parentbody = nothing + try + parentbody = unpack(message) + + let + lag = session.options.server.simulated_lag + (lag > 0) && sleep(lag * (0.5 + rand())) # sleep(0) would yield to the process manager which we dont want + end + + process_ws_message(session, parentbody, clientstream) + catch ex + if ex isa InterruptException || ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError + # that's fine! + else + bt = stacktrace(catch_backtrace()) + @warn "Reading WebSocket client stream failed for unknown reason:" parentbody exception = (ex, bt) + end end end - end catch ex - if ex isa InterruptException - shutdown_server[]() - elseif ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError || (ex isa Base.IOError && occursin("connection reset", ex.msg)) + if ex isa InterruptException || ex isa HTTP.WebSockets.WebSocketError || ex isa EOFError || (ex isa Base.IOError && occursin("connection reset", ex.msg)) # that's fine! else bt = stacktrace(catch_backtrace()) @@ -202,7 +193,7 @@ function run(session::ServerSession, pluto_router) end catch ex if ex isa InterruptException - shutdown_server[]() + # that's fine! elseif ex isa Base.IOError # that's fine! elseif ex isa ArgumentError && occursin("stream is closed", ex.msg) @@ -225,10 +216,10 @@ function run(session::ServerSession, pluto_router) end else # then it's a regular HTTP request, not a WS upgrade - + request::HTTP.Request = http.message request.body = read(http) - HTTP.closeread(http) + # HTTP.closeread(http) # If a "token" url parameter is passed in from binder, then we store it to add to every URL (so that you can share the URL to collaborate). params = HTTP.queryparams(HTTP.URI(request.target)) @@ -236,9 +227,8 @@ function run(session::ServerSession, pluto_router) session.binder_token = params["token"] end - request_body = IOBuffer(HTTP.payload(request)) - response_body = HTTP.handle(pluto_router, request) - + response_body = pluto_router(request) + request.response::HTTP.Response = response_body request.response.request = request try @@ -249,7 +239,6 @@ function run(session::ServerSession, pluto_router) HTTP.setheader(http, "Server" => "Pluto.jl/$(PLUTO_VERSION_STR[2:end]) Julia/$(JULIA_VERSION_STR[2:end])") HTTP.startwrite(http) write(http, request.response.body) - HTTP.closewrite(http) catch e if isa(e, Base.IOError) || isa(e, ArgumentError) # @warn "Attempted to write to a closed stream at $(request.target)" @@ -259,15 +248,16 @@ function run(session::ServerSession, pluto_router) end end end - - server_running() = try - HTTP.get("http://$(hostIP):$(port)/ping"; status_exception=false, retry=false, connect_timeout=10, readtimeout=10).status == 200 - catch - false - end + + server_running() = + try + HTTP.get("http://$(hostIP):$(port)/ping"; status_exception = false, retry = false, connect_timeout = 10, readtimeout = 10).status == 200 + catch + false + end # Wait for the server to start up before opening the browser. We have a 5 second grace period for allowing the connection, and then 10 seconds for the server to write data. WorkspaceManager.poll(server_running, 5.0, 1.0) - + address = pretty_address(session, hostIP, port) if session.options.server.launch_browser && open_in_default_browser(address) @info("\nOpening $address in your default browser... ~ have fun!") @@ -275,7 +265,7 @@ function run(session::ServerSession, pluto_router) @info("\nGo to $address in your browser to start writing ~ have fun!") end @info("\nPress Ctrl+C in this terminal to stop Pluto\n\n") - + # Trigger ServerStartEvent with server details try_event_call(session, ServerStartEvent(address, port)) @@ -286,13 +276,12 @@ function run(session::ServerSession, pluto_router) # Start this in the background, so that the first notebook launch (which will trigger registry update) will be faster @asynclog withtoken(pkg_token) do will_update = !PkgCompat.check_registry_age() - PkgCompat.update_registries(; force=false) + PkgCompat.update_registries(; force = false) will_update && println(" Updating registry done ✓") end shutdown_server[] = () -> @sync begin @info("\n\nClosing Pluto... Restart Julia for a fresh session. \n\nHave a nice day! 🎈\n\n") - @async swallow_exception(() -> close(serversocket), Base.IOError) # TODO: HTTP has a kill signal? # TODO: put do_work tokens back for client in values(session.connected_clients) @@ -300,7 +289,7 @@ function run(session::ServerSession, pluto_router) end empty!(session.connected_clients) for nb in values(session.notebooks) - @asynclog SessionActions.shutdown(session, nb; keep_in_session=false, async=false, verbose=false) + @asynclog SessionActions.shutdown(session, nb; keep_in_session = false, async = false, verbose = false) end end @@ -311,6 +300,7 @@ function run(session::ServerSession, pluto_router) if e isa InterruptException shutdown_server[]() elseif e isa TaskFailedException + @debug "Error is " exception = e stacktrace = catch_backtrace() # nice! else rethrow(e) @@ -319,9 +309,9 @@ function run(session::ServerSession, pluto_router) end precompile(run, (ServerSession, HTTP.Handlers.Router{Symbol("##001")})) -get_favorite_notebook(notebook:: Nothing) = nothing -get_favorite_notebook(notebook:: String) = notebook -get_favorite_notebook(notebook:: AbstractVector) = first(notebook) +get_favorite_notebook(notebook::Nothing) = nothing +get_favorite_notebook(notebook::String) = notebook +get_favorite_notebook(notebook::AbstractVector) = first(notebook) function pretty_address(session::ServerSession, hostIP, port) root = if session.options.server.root_url !== nothing @@ -359,15 +349,15 @@ function pretty_address(session::ServerSession, hostIP, port) else root end - string(HTTP.URI(HTTP.URI(new_root); query=url_params)) + string(HTTP.URI(HTTP.URI(new_root); query = url_params)) end "All messages sent over the WebSocket get decoded+deserialized and end up here." -function process_ws_message(session::ServerSession, parentbody::Dict, clientstream::IO) +function process_ws_message(session::ServerSession, parentbody::Dict, clientstream::HTTP.WebSocket) client_id = Symbol(parentbody["client_id"]) client = get!(session.connected_clients, client_id, ClientSession(client_id, clientstream)) client.stream = clientstream # it might change when the same client reconnects - + messagetype = Symbol(parentbody["type"]) request_id = Symbol(parentbody["request_id"]) @@ -384,7 +374,7 @@ function process_ws_message(session::ServerSession, parentbody::Dict, clientstre client.connected_notebook = notebook end end - + notebook else nothing diff --git a/src/webserver/WebSocketFix.jl b/src/webserver/WebSocketFix.jl deleted file mode 100644 index a7ad726938..0000000000 --- a/src/webserver/WebSocketFix.jl +++ /dev/null @@ -1,80 +0,0 @@ -"Things that will hopefully go into HTTP.jl someday." -module WebsocketFix - -import HTTP.WebSockets - -function readframe(ws::WebSockets.WebSocket) - header = WebSockets.readheader(ws.io) - # @debug 1 "WebSocket ➡️ $header" - - if header.length > 0 - if length(ws.rxpayload) < header.length - resize!(ws.rxpayload, header.length) - end - unsafe_read(ws.io, pointer(ws.rxpayload), header.length) - # @debug 2 " ➡️ \"$(String(ws.rxpayload[1:header.length]))\"" - end - l = Int(header.length) - if header.hasmask - WebSockets.mask!(ws.rxpayload, ws.rxpayload, l, reinterpret(UInt8, [header.mask])) - end - - return header, view(ws.rxpayload, 1:l) -end - -""" - readmessage(ws::WebSocket) - -HTTP.jl's default `readframe` (or `readavailable`) doesn't look at the FINAL field of frames. -This means that it will return a frame no matter what, even though most people expect to get a full message. -This method fixes that and gives you what you expect. -""" -function readmessage(ws::WebSockets.WebSocket) - # this code is based on HTTP.jl source code: https://github.com/JuliaWeb/HTTP.jl/blob/master/src/WebSockets.jl - - header, data = readframe(ws) - l = Int(header.length) - - if header.opcode == WebSockets.WS_CLOSE - ws.rxclosed = true - if l >= 2 - status = UInt16(ws.rxpayload[1]) << 8 | ws.rxpayload[2] - if status != 1000 - message = String(ws.rxpayload[3:l]) - status_descr = get(WebSockets.STATUS_CODE_DESCRIPTION, Int(status), "") - msg = "Status: $(status_descr), Internal Code: $(message)" - throw(WebSockets.WebSocketError(status, msg)) - end - end - return UInt8[] - elseif header.opcode == WebSockets.WS_PING - WebSockets.wswrite(ws, WebSockets.WS_FINAL | WebSockets.WS_PONG, ws.rxpayload[1:l]) - header2, data2 = readframe(ws) - return readmessage(ws) - elseif header.opcode == WebSockets.WS_CONTINUATION - error("WS continuation gone wrong") - else - if header.final == true - return view(ws.rxpayload, 1:l) - else - multi_message_data = UInt8[] - append!(multi_message_data, data) - while true - header2, data2 = readframe(ws) - if header2.opcode != WebSockets.WS_CONTINUATION - println("header2.opcode:", header2.opcode) - println("header2:", header2) - throw("Should be a continuation") - end - append!(multi_message_data, data2) - if header2.final - break - end - end - - multi_message_data - end - end -end - -end \ No newline at end of file