Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove uses of at-async in favor of at-spawn #1039

Merged
merged 4 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ jobs:
- os: windows-latest
version: '1'
arch: x86
exclude:
- os: windows-latest
version: '1.6'
steps:
- uses: actions/checkout@v3
- uses: julia-actions/setup-julia@v1
Expand All @@ -55,6 +58,7 @@ jobs:
env:
PIE_SOCKET_API_KEY: ${{ secrets.PIE_SOCKET_API_KEY }}
JULIA_VERSION: ${{ matrix.version }}
JULIA_NUM_THREADS: 2
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
Expand Down
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "HTTP"
uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3"
authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"]
version = "1.8.1"
version = "1.9.0"

[deps]
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
Expand All @@ -21,7 +21,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

[compat]
CodecZlib = "0.7"
ConcurrentUtilities = "2.1"
ConcurrentUtilities = "2.2"
LoggingExtras = "0.4.9,1"
MbedTLS = "0.6.8, 0.7, 1"
OpenSSL = "1.3"
Expand Down
4 changes: 4 additions & 0 deletions docs/src/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ When a non-2XX HTTP status code is received in a response, this is meant to conv

If `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.ConnectError` will be logged via `@error` as they happen, regardless of whether the request is then retried or not. Useful for debugging or monitoring requests where there's worry of certain errors happening but ignored because of retries.

### `logtag`

If provided, will be used as the tag for error logging. Useful for debugging or monitoring requests.

### `observelayers`

If `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored in the `Request.context` dictionary, and can be accessed like `HTTP.get(...).request.context`.
Expand Down
12 changes: 6 additions & 6 deletions src/Connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ end
Used for "hashing" a Connection object on just the key properties necessary for determining
connection re-useability. That is, when a new request calls `newconnection`, we take the
request parameters of host and port, and if ssl verification is required, if keepalive is enabled,
and if an existing Connection was already created with the exact
and if an existing Connection was already created with the exact.
same parameters, we can re-use it (as long as it's not already being used, obviously).
"""
connectionkey(x::Connection) = (x.host, x.port, x.require_ssl_verification, x.keepalive, x.clientconnection)
Expand Down Expand Up @@ -299,7 +299,7 @@ function IOExtras.closeread(c::Connection)
c.readable = false
@debugv 3 "✉️ Read done: $c"
if c.clientconnection
t = @async monitor_idle_connection(c)
t = Threads.@spawn monitor_idle_connection(c)
@isdefined(errormonitor) && errormonitor(t)
end
return
Expand Down Expand Up @@ -424,7 +424,7 @@ function connection_isvalid(c, idle_timeout)
end

@noinline connection_limit_warning(cl) = cl === nothing ||
@warn "connection_limit no longer supported as a keyword argument; use `HTTP.set_default_connection_limit!($cl)` or pass a connection pool like `pool=HTTP.Pool($cl)` instead."
@warn "connection_limit no longer supported as a keyword argument; use `HTTP.set_default_connection_limit!($cl)` before any requests are made or construct a shared pool via `POOL = HTTP.Pool($cl)` and pass to each request like `pool=POOL` instead."

"""
newconnection(type, host, port) -> Connection
Expand Down Expand Up @@ -509,7 +509,7 @@ function getconnection(::Type{TCPSocket},
tcp = Sockets.TCPSocket()
Sockets.connect!(tcp, addr, p)
try
Exceptions.try_with_timeout(connect_timeout) do
try_with_timeout(connect_timeout) do _
Sockets.wait_connected(tcp)
keepalive && keepalive!(tcp)
end
Expand All @@ -523,7 +523,7 @@ function getconnection(::Type{TCPSocket},
end
return tcp
catch e
lasterr = e isa TimeoutError ? ConnectTimeout(host, port) : e
lasterr = e isa ConcurrentUtilities.TimeoutException ? ConnectTimeout(host, port) : e
end
end
# If no connetion could be set up, to any address, throw last error
Expand Down Expand Up @@ -624,7 +624,7 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
# if the upgrade fails, an error will be thrown and the original c will be closed
# in ConnectionRequest
tls = if readtimeout > 0
Exceptions.try_with_timeout(readtimeout) do
try_with_timeout(readtimeout) do _
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
end
else
Expand Down
28 changes: 9 additions & 19 deletions src/Exceptions.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Exceptions

export @try, try_with_timeout, HTTPError, ConnectError, TimeoutError, StatusError, RequestError
export @try, HTTPError, ConnectError, TimeoutError, StatusError, RequestError, current_exceptions_to_string
using LoggingExtras
import ..HTTP # for doc references

Expand All @@ -25,24 +25,6 @@ macro $(:try)(exes...)
end
end # @eval

function try_with_timeout(f, timeout)
ch = Channel(0)
timer = Timer(tm -> close(ch, TimeoutError(timeout)), timeout)
@async begin
try
put!(ch, $f())
catch e
if !(e isa HTTPError)
e = CapturedException(e, catch_backtrace())
end
close(ch, e)
finally
close(timer)
end
end
return take!(ch)
end

abstract type HTTPError <: Exception end

"""
Expand Down Expand Up @@ -97,4 +79,12 @@ struct RequestError <: HTTPError
error::Any
end

function current_exceptions_to_string(curr_exc)
buf = IOBuffer()
println(buf)
println(buf, "\n===========================\nHTTP Error message:\n")
Base.showerror(buf, curr_exc)
return String(take!(buf))
end

end # module Exceptions
5 changes: 3 additions & 2 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const DEBUG_LEVEL = Ref(0)

Base.@deprecate escape escapeuri

using Base64, Sockets, Dates, URIs, LoggingExtras, MbedTLS
using Base64, Sockets, Dates, URIs, LoggingExtras, MbedTLS, OpenSSL

function access_threaded(f, v::Vector)
tid = Threads.threadid()
Expand All @@ -24,7 +24,7 @@ end

function open end

const SOCKET_TYPE_TLS = Ref{Any}(MbedTLS.SSLContext)
const SOCKET_TYPE_TLS = Ref{Any}(OpenSSL.SSLStream)

include("Conditions.jl") ;using .Conditions
include("access_log.jl")
Expand Down Expand Up @@ -158,6 +158,7 @@ Supported optional keyword arguments:
- `logerrors = false`, if `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.ConnectError` will be
logged via `@error` as they happen, regardless of whether the request is then retried or not. Useful for debugging or
monitoring requests where there's worry of certain errors happening but ignored because of retries.
- `logtag = nothing`, if provided, will be used as the tag for error logging. Useful for debugging or monitoring requests.
- `observelayers = false`, if `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of
time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries
or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored
Expand Down
2 changes: 1 addition & 1 deletion src/IOExtras.jl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ else
end

tcpsocket(io::SSLContext)::TCPSocket = io.bio
tcpsocket(io::SSLStream)::TCPSocket = io.bio_read_stream.io
tcpsocket(io::SSLStream)::TCPSocket = io.io
tcpsocket(io::TCPSocket)::TCPSocket = io

localport(io) = try !isopen(tcpsocket(io)) ? 0 :
Expand Down
4 changes: 2 additions & 2 deletions src/Streams.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ IOExtras.isopen(http::Stream) = isopen(http.stream)

# Writing HTTP Messages

messagetowrite(http::Stream{<:Response}) = http.message.request
messagetowrite(http::Stream{<:Response}) = http.message.request::Request
messagetowrite(http::Stream{<:Request}) = http.message.response

IOExtras.iswritable(http::Stream) = iswritable(http.stream)
Expand Down Expand Up @@ -372,7 +372,7 @@ function IOExtras.closeread(http::Stream{<:Response})
else

# Discard body bytes that were not read...
while !eof(http)
@try Base.IOError EOFError while !eof(http)
readavailable(http)
end

Expand Down
20 changes: 12 additions & 8 deletions src/clientlayers/ConnectionRequest.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module ConnectionRequest

using URIs, Sockets, Base64, LoggingExtras
using URIs, Sockets, Base64, LoggingExtras, ConcurrentUtilities
using MbedTLS: SSLContext, SSLConfig
using OpenSSL: SSLStream
using ..Messages, ..IOExtras, ..Connections, ..Streams, ..Exceptions
Expand Down Expand Up @@ -55,7 +55,7 @@ Close the connection if the request throws an exception.
Otherwise leave it open so that it can be reused.
"""
function connectionlayer(handler)
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, kw...)
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, logtag=nothing, kw...)
local io, stream
if proxy !== nothing
target_url = req.url
Expand All @@ -79,7 +79,8 @@ function connectionlayer(handler)
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
catch e
if logerrors
@error "HTTP.ConnectError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
@error err type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag
end
req.context[:connect_errors] = get(req.context, :connect_errors, 0) + 1
throw(ConnectError(string(url), e))
Expand All @@ -98,7 +99,7 @@ function connectionlayer(handler)
target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail
end
r = if readtimeout > 0
try_with_timeout(readtimeout) do
try_with_timeout(readtimeout) do _
connect_tunnel(io, target_url, req)
end
else
Expand All @@ -115,7 +116,7 @@ function connectionlayer(handler)
end

stream = Stream(req.response, io)
return handler(stream; readtimeout=readtimeout, logerrors=logerrors, kw...)
return handler(stream; readtimeout=readtimeout, logerrors=logerrors, logtag=logtag, kw...)
catch e
while true
if e isa CompositeException
Expand All @@ -126,8 +127,11 @@ function connectionlayer(handler)
break
end
end
if logerrors && !(e isa StatusError || e isa TimeoutError)
@error "HTTP.ConnectionRequest" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
root_err = e isa CapturedException ? e.ex : e
# don't log if it's an HTTPError since we should have already logged it
if logerrors && !(root_err isa HTTPError)
err = current_exceptions_to_string(e)
@error err type=Symbol("HTTP.ConnectionRequest") method=req.method url=req.url context=req.context logtag=logtag
end
@debugv 1 "❗️ ConnectionLayer $e. Closing: $io"
shouldreuse = false
Expand All @@ -136,7 +140,7 @@ function connectionlayer(handler)
# idempotency of the request
req.context[:nothingwritten] = true
end
e isa HTTPError || throw(RequestError(req, e))
root_err isa HTTPError || throw(RequestError(req, e))
rethrow(e)
finally
releaseconnection(io, shouldreuse; kw...)
Expand Down
9 changes: 5 additions & 4 deletions src/clientlayers/ExceptionRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ using ..IOExtras, ..Messages, ..Exceptions
Throw a `StatusError` if the request returns an error response status.
"""
function exceptionlayer(handler)
return function exceptions(stream; status_exception::Bool=true, logerrors::Bool=false, kw...)
res = handler(stream; logerrors=logerrors, kw...)
return function exceptions(stream; status_exception::Bool=true, timedout=nothing, logerrors::Bool=false, logtag=nothing, kw...)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, timedout kwarg is strictly internal thing, the user shouldn't provide their own TimedOut struct to a HTTP.get? If that is the case, we might want to call it _timedout just to make it clearer that this is an internal thing.

res = handler(stream; timedout=timedout, logerrors=logerrors, logtag=logtag, kw...)
if status_exception && iserror(res)
req = res.request
req.context[:status_errors] = get(req.context, :status_errors, 0) + 1
e = StatusError(res.status, req.method, req.target, res)
if logerrors
@error "HTTP.StatusError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
if logerrors && (timedout === nothing || !timedout[])
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
@error err type=Symbol("HTTP.StatusError") method=req.method url=req.url context=req.context logtag=logtag
end
throw(e)
else
Expand Down
44 changes: 23 additions & 21 deletions src/clientlayers/StreamRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ immediately so that the transmission can be aborted if the `Response` status
indicates that the server does not wish to receive the message body.
[RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5).
"""
function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, kw...)::Response
function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, logtag=nothing, timedout=nothing, kw...)::Response
response = stream.message
req = response.request
@debugv 1 sprintcompact(req)
Expand All @@ -33,43 +33,45 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi
try
@sync begin
if iofunction === nothing
@async try
Threads.@spawn try
writebody(stream, req)
@debugv 2 "client closewrite"
closewrite(stream)
finally
req.context[:write_duration_ms] = get(req.context, :write_duration_ms, 0.0) + ((time() - write_start) * 1000)
@debugv 2 "client closewrite"
closewrite(stream)
end
read_start = time()
@async try
Threads.@spawn try
@debugv 2 "client startread"
startread(stream)
if isaborted(stream)
# The server may have closed the connection.
# Don't propagate such errors.
@try Base.IOError close(stream.stream)
if !isaborted(stream)
readbody(stream, response, decompress)
end
readbody(stream, response, decompress)
finally
req.context[:read_duration_ms] = get(req.context, :read_duration_ms, 0.0) + ((time() - read_start) * 1000)
@debugv 2 "client closeread"
closeread(stream)
end
else
iofunction(stream)
try
iofunction(stream)
finally
closewrite(stream)
closeread(stream)
end
end
end
catch e
if logerrors
@error "HTTP.IOError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
if timedout === nothing || !timedout[]
req.context[:io_errors] = get(req.context, :io_errors, 0) + 1
if logerrors
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
@error err type=Symbol("HTTP.IOError") method=req.method url=req.url context=req.context logtag=logtag
end
end
req.context[:io_errors] = get(req.context, :io_errors, 0) + 1
rethrow()
end

@debugv 2 "client closewrite"
closewrite(stream)
@debugv 2 "client closeread"
closeread(stream)

@debugv 1 sprintcompact(response)
@debugv 2 sprint(show, response)
return response
Expand Down Expand Up @@ -150,7 +152,7 @@ function readbody!(stream::Stream, res::Response, buf_or_stream)
# so using the default write is fastest because it utilizes
# readavailable under the hood, for which BufferStream is optimized
n = write(body, buf_or_stream)
elseif buf_or_stream isa Stream
elseif buf_or_stream isa Stream{Response}
# for HTTP.Stream, there's already an optimized read method
# that just needs an IOBuffer to write into
n = readall!(buf_or_stream, body)
Expand All @@ -161,7 +163,7 @@ function readbody!(stream::Stream, res::Response, buf_or_stream)
res.body = read(buf_or_stream)
n = length(res.body)
end
elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream
elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream{Response}
# optimization for IOBuffer response_stream to avoid temporary allocations
n = readall!(buf_or_stream, res.body)
else
Expand Down
Loading