diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2cbba1808..9f0d0198c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,6 +34,9 @@ jobs: - os: windows-latest version: '1' arch: x86 + exclude: + - os: windows-latest + version: '1.6' steps: - uses: actions/checkout@v3 - uses: julia-actions/setup-julia@v1 @@ -55,6 +58,7 @@ jobs: env: PIE_SOCKET_API_KEY: ${{ secrets.PIE_SOCKET_API_KEY }} JULIA_VERSION: ${{ matrix.version }} + JULIA_NUM_THREADS: 2 - uses: julia-actions/julia-processcoverage@v1 - uses: codecov/codecov-action@v1 with: diff --git a/Project.toml b/Project.toml index 9a3d9b1d3..e1061566b 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "HTTP" uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3" authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"] -version = "1.8.1" +version = "1.9.0" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" @@ -21,7 +21,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [compat] CodecZlib = "0.7" -ConcurrentUtilities = "2.1" +ConcurrentUtilities = "2.2" LoggingExtras = "0.4.9,1" MbedTLS = "0.6.8, 0.7, 1" OpenSSL = "1.3" diff --git a/docs/src/client.md b/docs/src/client.md index 4190037ab..74efb2ecf 100644 --- a/docs/src/client.md +++ b/docs/src/client.md @@ -107,6 +107,10 @@ When a non-2XX HTTP status code is received in a response, this is meant to conv If `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.ConnectError` will be logged via `@error` as they happen, regardless of whether the request is then retried or not. Useful for debugging or monitoring requests where there's worry of certain errors happening but ignored because of retries. +### `logtag` + +If provided, will be used as the tag for error logging. Useful for debugging or monitoring requests. + ### `observelayers` If `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored in the `Request.context` dictionary, and can be accessed like `HTTP.get(...).request.context`. diff --git a/src/Connections.jl b/src/Connections.jl index f069568ef..b48214c12 100644 --- a/src/Connections.jl +++ b/src/Connections.jl @@ -95,7 +95,7 @@ end Used for "hashing" a Connection object on just the key properties necessary for determining connection re-useability. That is, when a new request calls `newconnection`, we take the request parameters of host and port, and if ssl verification is required, if keepalive is enabled, -and if an existing Connection was already created with the exact +and if an existing Connection was already created with the exact. same parameters, we can re-use it (as long as it's not already being used, obviously). """ connectionkey(x::Connection) = (x.host, x.port, x.require_ssl_verification, x.keepalive, x.clientconnection) @@ -299,7 +299,7 @@ function IOExtras.closeread(c::Connection) c.readable = false @debugv 3 "✉️ Read done: $c" if c.clientconnection - t = @async monitor_idle_connection(c) + t = Threads.@spawn monitor_idle_connection(c) @isdefined(errormonitor) && errormonitor(t) end return @@ -424,7 +424,7 @@ function connection_isvalid(c, idle_timeout) end @noinline connection_limit_warning(cl) = cl === nothing || - @warn "connection_limit no longer supported as a keyword argument; use `HTTP.set_default_connection_limit!($cl)` or pass a connection pool like `pool=HTTP.Pool($cl)` instead." + @warn "connection_limit no longer supported as a keyword argument; use `HTTP.set_default_connection_limit!($cl)` before any requests are made or construct a shared pool via `POOL = HTTP.Pool($cl)` and pass to each request like `pool=POOL` instead." """ newconnection(type, host, port) -> Connection @@ -509,7 +509,7 @@ function getconnection(::Type{TCPSocket}, tcp = Sockets.TCPSocket() Sockets.connect!(tcp, addr, p) try - Exceptions.try_with_timeout(connect_timeout) do + try_with_timeout(connect_timeout) do _ Sockets.wait_connected(tcp) keepalive && keepalive!(tcp) end @@ -523,7 +523,7 @@ function getconnection(::Type{TCPSocket}, end return tcp catch e - lasterr = e isa TimeoutError ? ConnectTimeout(host, port) : e + lasterr = e isa ConcurrentUtilities.TimeoutException ? ConnectTimeout(host, port) : e end end # If no connetion could be set up, to any address, throw last error @@ -624,7 +624,7 @@ function sslupgrade(::Type{IOType}, c::Connection{T}, # if the upgrade fails, an error will be thrown and the original c will be closed # in ConnectionRequest tls = if readtimeout > 0 - Exceptions.try_with_timeout(readtimeout) do + try_with_timeout(readtimeout) do _ sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...) end else diff --git a/src/Exceptions.jl b/src/Exceptions.jl index 5918e684a..0cf74e30b 100644 --- a/src/Exceptions.jl +++ b/src/Exceptions.jl @@ -1,6 +1,6 @@ module Exceptions -export @try, try_with_timeout, HTTPError, ConnectError, TimeoutError, StatusError, RequestError +export @try, HTTPError, ConnectError, TimeoutError, StatusError, RequestError, current_exceptions_to_string using LoggingExtras import ..HTTP # for doc references @@ -25,24 +25,6 @@ macro $(:try)(exes...) end end # @eval -function try_with_timeout(f, timeout) - ch = Channel(0) - timer = Timer(tm -> close(ch, TimeoutError(timeout)), timeout) - @async begin - try - put!(ch, $f()) - catch e - if !(e isa HTTPError) - e = CapturedException(e, catch_backtrace()) - end - close(ch, e) - finally - close(timer) - end - end - return take!(ch) -end - abstract type HTTPError <: Exception end """ @@ -97,4 +79,12 @@ struct RequestError <: HTTPError error::Any end +function current_exceptions_to_string(curr_exc) + buf = IOBuffer() + println(buf) + println(buf, "\n===========================\nHTTP Error message:\n") + Base.showerror(buf, curr_exc) + return String(take!(buf)) +end + end # module Exceptions \ No newline at end of file diff --git a/src/HTTP.jl b/src/HTTP.jl index 68b9fbbdb..9e0633ffc 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -7,7 +7,7 @@ const DEBUG_LEVEL = Ref(0) Base.@deprecate escape escapeuri -using Base64, Sockets, Dates, URIs, LoggingExtras, MbedTLS +using Base64, Sockets, Dates, URIs, LoggingExtras, MbedTLS, OpenSSL function access_threaded(f, v::Vector) tid = Threads.threadid() @@ -24,7 +24,7 @@ end function open end -const SOCKET_TYPE_TLS = Ref{Any}(MbedTLS.SSLContext) +const SOCKET_TYPE_TLS = Ref{Any}(OpenSSL.SSLStream) include("Conditions.jl") ;using .Conditions include("access_log.jl") @@ -158,6 +158,7 @@ Supported optional keyword arguments: - `logerrors = false`, if `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.ConnectError` will be logged via `@error` as they happen, regardless of whether the request is then retried or not. Useful for debugging or monitoring requests where there's worry of certain errors happening but ignored because of retries. + - `logtag = nothing`, if provided, will be used as the tag for error logging. Useful for debugging or monitoring requests. - `observelayers = false`, if `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored diff --git a/src/IOExtras.jl b/src/IOExtras.jl index 50adfc624..fd81f161c 100644 --- a/src/IOExtras.jl +++ b/src/IOExtras.jl @@ -84,7 +84,7 @@ else end tcpsocket(io::SSLContext)::TCPSocket = io.bio -tcpsocket(io::SSLStream)::TCPSocket = io.bio_read_stream.io +tcpsocket(io::SSLStream)::TCPSocket = io.io tcpsocket(io::TCPSocket)::TCPSocket = io localport(io) = try !isopen(tcpsocket(io)) ? 0 : diff --git a/src/Streams.jl b/src/Streams.jl index 80b92720f..79b4ee188 100644 --- a/src/Streams.jl +++ b/src/Streams.jl @@ -56,7 +56,7 @@ IOExtras.isopen(http::Stream) = isopen(http.stream) # Writing HTTP Messages -messagetowrite(http::Stream{<:Response}) = http.message.request +messagetowrite(http::Stream{<:Response}) = http.message.request::Request messagetowrite(http::Stream{<:Request}) = http.message.response IOExtras.iswritable(http::Stream) = iswritable(http.stream) @@ -372,7 +372,7 @@ function IOExtras.closeread(http::Stream{<:Response}) else # Discard body bytes that were not read... - while !eof(http) + @try Base.IOError EOFError while !eof(http) readavailable(http) end diff --git a/src/clientlayers/ConnectionRequest.jl b/src/clientlayers/ConnectionRequest.jl index 19c324ed4..27923e6eb 100644 --- a/src/clientlayers/ConnectionRequest.jl +++ b/src/clientlayers/ConnectionRequest.jl @@ -1,6 +1,6 @@ module ConnectionRequest -using URIs, Sockets, Base64, LoggingExtras +using URIs, Sockets, Base64, LoggingExtras, ConcurrentUtilities using MbedTLS: SSLContext, SSLConfig using OpenSSL: SSLStream using ..Messages, ..IOExtras, ..Connections, ..Streams, ..Exceptions @@ -55,7 +55,7 @@ Close the connection if the request throws an exception. Otherwise leave it open so that it can be reused. """ function connectionlayer(handler) - return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, kw...) + return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, logtag=nothing, kw...) local io, stream if proxy !== nothing target_url = req.url @@ -79,7 +79,8 @@ function connectionlayer(handler) io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...) catch e if logerrors - @error "HTTP.ConnectError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context + err = current_exceptions_to_string(CapturedException(e, catch_backtrace())) + @error err type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag end req.context[:connect_errors] = get(req.context, :connect_errors, 0) + 1 throw(ConnectError(string(url), e)) @@ -98,7 +99,7 @@ function connectionlayer(handler) target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail end r = if readtimeout > 0 - try_with_timeout(readtimeout) do + try_with_timeout(readtimeout) do _ connect_tunnel(io, target_url, req) end else @@ -115,7 +116,7 @@ function connectionlayer(handler) end stream = Stream(req.response, io) - return handler(stream; readtimeout=readtimeout, logerrors=logerrors, kw...) + return handler(stream; readtimeout=readtimeout, logerrors=logerrors, logtag=logtag, kw...) catch e while true if e isa CompositeException @@ -126,8 +127,11 @@ function connectionlayer(handler) break end end - if logerrors && !(e isa StatusError || e isa TimeoutError) - @error "HTTP.ConnectionRequest" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context + root_err = e isa CapturedException ? e.ex : e + # don't log if it's an HTTPError since we should have already logged it + if logerrors && !(root_err isa HTTPError) + err = current_exceptions_to_string(e) + @error err type=Symbol("HTTP.ConnectionRequest") method=req.method url=req.url context=req.context logtag=logtag end @debugv 1 "❗️ ConnectionLayer $e. Closing: $io" shouldreuse = false @@ -136,7 +140,7 @@ function connectionlayer(handler) # idempotency of the request req.context[:nothingwritten] = true end - e isa HTTPError || throw(RequestError(req, e)) + root_err isa HTTPError || throw(RequestError(req, e)) rethrow(e) finally releaseconnection(io, shouldreuse; kw...) diff --git a/src/clientlayers/ExceptionRequest.jl b/src/clientlayers/ExceptionRequest.jl index 7c1bb3dab..0900c5771 100644 --- a/src/clientlayers/ExceptionRequest.jl +++ b/src/clientlayers/ExceptionRequest.jl @@ -10,14 +10,15 @@ using ..IOExtras, ..Messages, ..Exceptions Throw a `StatusError` if the request returns an error response status. """ function exceptionlayer(handler) - return function exceptions(stream; status_exception::Bool=true, logerrors::Bool=false, kw...) - res = handler(stream; logerrors=logerrors, kw...) + return function exceptions(stream; status_exception::Bool=true, timedout=nothing, logerrors::Bool=false, logtag=nothing, kw...) + res = handler(stream; timedout=timedout, logerrors=logerrors, logtag=logtag, kw...) if status_exception && iserror(res) req = res.request req.context[:status_errors] = get(req.context, :status_errors, 0) + 1 e = StatusError(res.status, req.method, req.target, res) - if logerrors - @error "HTTP.StatusError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context + if logerrors && (timedout === nothing || !timedout[]) + err = current_exceptions_to_string(CapturedException(e, catch_backtrace())) + @error err type=Symbol("HTTP.StatusError") method=req.method url=req.url context=req.context logtag=logtag end throw(e) else diff --git a/src/clientlayers/StreamRequest.jl b/src/clientlayers/StreamRequest.jl index 8c5e64ec0..b96974349 100644 --- a/src/clientlayers/StreamRequest.jl +++ b/src/clientlayers/StreamRequest.jl @@ -17,7 +17,7 @@ immediately so that the transmission can be aborted if the `Response` status indicates that the server does not wish to receive the message body. [RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5). """ -function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, kw...)::Response +function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, logtag=nothing, timedout=nothing, kw...)::Response response = stream.message req = response.request @debugv 1 sprintcompact(req) @@ -33,43 +33,45 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi try @sync begin if iofunction === nothing - @async try + Threads.@spawn try writebody(stream, req) - @debugv 2 "client closewrite" - closewrite(stream) finally req.context[:write_duration_ms] = get(req.context, :write_duration_ms, 0.0) + ((time() - write_start) * 1000) + @debugv 2 "client closewrite" + closewrite(stream) end read_start = time() - @async try + Threads.@spawn try @debugv 2 "client startread" startread(stream) - if isaborted(stream) - # The server may have closed the connection. - # Don't propagate such errors. - @try Base.IOError close(stream.stream) + if !isaborted(stream) + readbody(stream, response, decompress) end - readbody(stream, response, decompress) finally req.context[:read_duration_ms] = get(req.context, :read_duration_ms, 0.0) + ((time() - read_start) * 1000) + @debugv 2 "client closeread" + closeread(stream) end else - iofunction(stream) + try + iofunction(stream) + finally + closewrite(stream) + closeread(stream) + end end end catch e - if logerrors - @error "HTTP.IOError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context + if timedout === nothing || !timedout[] + req.context[:io_errors] = get(req.context, :io_errors, 0) + 1 + if logerrors + err = current_exceptions_to_string(CapturedException(e, catch_backtrace())) + @error err type=Symbol("HTTP.IOError") method=req.method url=req.url context=req.context logtag=logtag + end end - req.context[:io_errors] = get(req.context, :io_errors, 0) + 1 rethrow() end - @debugv 2 "client closewrite" - closewrite(stream) - @debugv 2 "client closeread" - closeread(stream) - @debugv 1 sprintcompact(response) @debugv 2 sprint(show, response) return response @@ -150,7 +152,7 @@ function readbody!(stream::Stream, res::Response, buf_or_stream) # so using the default write is fastest because it utilizes # readavailable under the hood, for which BufferStream is optimized n = write(body, buf_or_stream) - elseif buf_or_stream isa Stream + elseif buf_or_stream isa Stream{Response} # for HTTP.Stream, there's already an optimized read method # that just needs an IOBuffer to write into n = readall!(buf_or_stream, body) @@ -161,7 +163,7 @@ function readbody!(stream::Stream, res::Response, buf_or_stream) res.body = read(buf_or_stream) n = length(res.body) end - elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream + elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream{Response} # optimization for IOBuffer response_stream to avoid temporary allocations n = readall!(buf_or_stream, res.body) else diff --git a/src/clientlayers/TimeoutRequest.jl b/src/clientlayers/TimeoutRequest.jl index 770a6319f..4920e2a95 100644 --- a/src/clientlayers/TimeoutRequest.jl +++ b/src/clientlayers/TimeoutRequest.jl @@ -1,7 +1,7 @@ module TimeoutRequest -using ..Connections, ..Streams, ..Exceptions -using LoggingExtras +using ..Connections, ..Streams, ..Exceptions, ..Messages +using LoggingExtras, ConcurrentUtilities export timeoutlayer @@ -11,24 +11,26 @@ export timeoutlayer Close the `HTTP.Stream` if no data has been received for `readtimeout` seconds. """ function timeoutlayer(handler) - return function timeouts(stream::Stream; readtimeout::Int=0, logerrors::Bool=false, kw...) + return function timeouts(stream::Stream; readtimeout::Int=0, logerrors::Bool=false, logtag=nothing, kw...) if readtimeout <= 0 # skip return handler(stream; logerrors=logerrors, kw...) end return try - try_with_timeout(readtimeout) do - handler(stream; logerrors=logerrors, kw...) + try_with_timeout(readtimeout, Response) do timedout + handler(stream; logerrors=logerrors, logtag=logtag, timedout=timedout, kw...) end catch e - if e isa TimeoutError + if e isa ConcurrentUtilities.TimeoutException req = stream.message.request + req.context[:timeout_errors] = get(req.context, :timeout_errors, 0) + 1 if logerrors - @error "HTTP.TimeoutError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context + err = current_exceptions_to_string(CapturedException(e, catch_backtrace())) + @error err type=Symbol("HTTP.TimeoutError") method=req.method url=req.url context=req.context timeout=readtimeout logtag=logtag end - req.context[:timeout_errors] = get(req.context, :timeout_errors, 0) + 1 + e = Exceptions.TimeoutError(readtimeout) end - rethrow() + rethrow(e) end end end diff --git a/test/chunking.jl b/test/chunking.jl index 267930b83..7fd1c2117 100644 --- a/test/chunking.jl +++ b/test/chunking.jl @@ -33,31 +33,33 @@ using BufferedStreams flush(tcp) end - r = HTTP.get("http://127.0.0.1:$port") + try + r = HTTP.get("http://127.0.0.1:$port") + @test String(r.body) == decoded_data - @test String(r.body) == decoded_data + for wrap in (identity, BufferedInputStream) + r = "" - for wrap in (identity, BufferedInputStream) - r = "" + # Ignore byte-by-byte read warning + CL = Base.CoreLogging + CL.with_logger(CL.SimpleLogger(stderr, CL.Error)) do + HTTP.open("GET", "http://127.0.0.1:$port") do io + io = wrap(io) + x = split(decoded_data, "\n") - # Ignore byte-by-byte read warning - CL = Base.CoreLogging - CL.with_logger(CL.SimpleLogger(stderr, CL.Error)) do - HTTP.open("GET", "http://127.0.0.1:$port") do io - io = wrap(io) - x = split(decoded_data, "\n") - - for i in 1:6 - l = readline(io) - @test l == x[i] - r *= l * "\n" + for i in 1:6 + l = readline(io) + @test l == x[i] + r *= l * "\n" + end end end - end - @test r == decoded_data + @test r == decoded_data + end + finally + close(server) end - close(server) end end # module diff --git a/test/loopback.jl b/test/loopback.jl index df0edcbda..d8681078b 100644 --- a/test/loopback.jl +++ b/test/loopback.jl @@ -43,7 +43,7 @@ Base.isopen(lb::Loopback) = isopen(lb.io) Base.read(fio::FunctionIO, a...) = (call(fio); read(fio.buf, a...)) Base.readavailable(fio::FunctionIO) = (call(fio); readavailable(fio.buf)) Base.readavailable(lb::Loopback) = readavailable(lb.io) -Base.unsafe_read(lb::Loopback, p::Ptr, n::Integer) = unsafe_read(lb.io, p, n) +Base.unsafe_read(lb::Loopback, p::Ptr{UInt8}, n::UInt) = unsafe_read(lb.io, p, n) HTTP.IOExtras.tcpsocket(::Loopback) = TCPSocket() diff --git a/test/runtests.jl b/test/runtests.jl index af84b736f..aa5e75fef 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -26,7 +26,6 @@ include(joinpath(dir, "resources/TestRequest.jl")) "server.jl", "async.jl", "mwe.jl", - "try_with_timeout.jl", "httpversion.jl", "websockets/autobahn.jl", ] diff --git a/test/try_with_timeout.jl b/test/try_with_timeout.jl deleted file mode 100644 index 6adfbf742..000000000 --- a/test/try_with_timeout.jl +++ /dev/null @@ -1,50 +0,0 @@ -@testset "try_with_timeout $warmup" for warmup in [true, false] - throwerrorexception() = throw(ErrorException("error as expected")) - throwargumenterror() = throw(ArgumentError("unexpected error")) - - @testset "rethrow exceptions" begin - t = @elapsed begin - err = try - HTTP.try_with_timeout(1) do - throwerrorexception() - end - catch e - e - end - @test err.ex isa ErrorException - end - if !warmup - @test t < 1 - end - end - - @testset "TimeoutError is thrown" begin - t = @elapsed begin - err = try - HTTP.try_with_timeout(1) do - sleep(5) - throwargumenterror() - end - catch e - e - end - @test err isa HTTP.TimeoutError - end - if !warmup - @test 1 < t < 2 - end - end - - @testset "value is successfully returned under timeout" begin - t = @elapsed begin - ret = HTTP.try_with_timeout(5) do - sleep(1) - return 1 - end - end - @test ret == 1 - if !warmup - @test 1 < t < 2 - end - end -end