Skip to content

Commit

Permalink
Remove uses of at-async in favor of at-spawn (#1039)
Browse files Browse the repository at this point in the history
* Remove uses of at-async in favor of at-spawn. at-async tends to "poison" cooperating
tasks to be sticky to the thread that is running them or the parent's task's thread.

* cleanup

* add docs for logtag
  • Loading branch information
quinnj authored May 10, 2023
1 parent ab4d9b3 commit b64beb7
Show file tree
Hide file tree
Showing 16 changed files with 103 additions and 144 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ jobs:
- os: windows-latest
version: '1'
arch: x86
exclude:
- os: windows-latest
version: '1.6'
steps:
- uses: actions/checkout@v3
- uses: julia-actions/setup-julia@v1
Expand All @@ -55,6 +58,7 @@ jobs:
env:
PIE_SOCKET_API_KEY: ${{ secrets.PIE_SOCKET_API_KEY }}
JULIA_VERSION: ${{ matrix.version }}
JULIA_NUM_THREADS: 2
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
Expand Down
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "HTTP"
uuid = "cd3eb016-35fb-5094-929b-558a96fad6f3"
authors = ["Jacob Quinn", "contributors: https://github.com/JuliaWeb/HTTP.jl/graphs/contributors"]
version = "1.8.1"
version = "1.9.0"

[deps]
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
Expand All @@ -21,7 +21,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

[compat]
CodecZlib = "0.7"
ConcurrentUtilities = "2.1"
ConcurrentUtilities = "2.2"
LoggingExtras = "0.4.9,1"
MbedTLS = "0.6.8, 0.7, 1"
OpenSSL = "1.3"
Expand Down
4 changes: 4 additions & 0 deletions docs/src/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ When a non-2XX HTTP status code is received in a response, this is meant to conv

If `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.ConnectError` will be logged via `@error` as they happen, regardless of whether the request is then retried or not. Useful for debugging or monitoring requests where there's worry of certain errors happening but ignored because of retries.

### `logtag`

If provided, will be used as the tag for error logging. Useful for debugging or monitoring requests.

### `observelayers`

If `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored in the `Request.context` dictionary, and can be accessed like `HTTP.get(...).request.context`.
Expand Down
12 changes: 6 additions & 6 deletions src/Connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ end
Used for "hashing" a Connection object on just the key properties necessary for determining
connection re-useability. That is, when a new request calls `newconnection`, we take the
request parameters of host and port, and if ssl verification is required, if keepalive is enabled,
and if an existing Connection was already created with the exact
and if an existing Connection was already created with the exact.
same parameters, we can re-use it (as long as it's not already being used, obviously).
"""
connectionkey(x::Connection) = (x.host, x.port, x.require_ssl_verification, x.keepalive, x.clientconnection)
Expand Down Expand Up @@ -299,7 +299,7 @@ function IOExtras.closeread(c::Connection)
c.readable = false
@debugv 3 "✉️ Read done: $c"
if c.clientconnection
t = @async monitor_idle_connection(c)
t = Threads.@spawn monitor_idle_connection(c)
@isdefined(errormonitor) && errormonitor(t)
end
return
Expand Down Expand Up @@ -424,7 +424,7 @@ function connection_isvalid(c, idle_timeout)
end

@noinline connection_limit_warning(cl) = cl === nothing ||
@warn "connection_limit no longer supported as a keyword argument; use `HTTP.set_default_connection_limit!($cl)` or pass a connection pool like `pool=HTTP.Pool($cl)` instead."
@warn "connection_limit no longer supported as a keyword argument; use `HTTP.set_default_connection_limit!($cl)` before any requests are made or construct a shared pool via `POOL = HTTP.Pool($cl)` and pass to each request like `pool=POOL` instead."

"""
newconnection(type, host, port) -> Connection
Expand Down Expand Up @@ -509,7 +509,7 @@ function getconnection(::Type{TCPSocket},
tcp = Sockets.TCPSocket()
Sockets.connect!(tcp, addr, p)
try
Exceptions.try_with_timeout(connect_timeout) do
try_with_timeout(connect_timeout) do _
Sockets.wait_connected(tcp)
keepalive && keepalive!(tcp)
end
Expand All @@ -523,7 +523,7 @@ function getconnection(::Type{TCPSocket},
end
return tcp
catch e
lasterr = e isa TimeoutError ? ConnectTimeout(host, port) : e
lasterr = e isa ConcurrentUtilities.TimeoutException ? ConnectTimeout(host, port) : e
end
end
# If no connetion could be set up, to any address, throw last error
Expand Down Expand Up @@ -624,7 +624,7 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
# if the upgrade fails, an error will be thrown and the original c will be closed
# in ConnectionRequest
tls = if readtimeout > 0
Exceptions.try_with_timeout(readtimeout) do
try_with_timeout(readtimeout) do _
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...)
end
else
Expand Down
28 changes: 9 additions & 19 deletions src/Exceptions.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Exceptions

export @try, try_with_timeout, HTTPError, ConnectError, TimeoutError, StatusError, RequestError
export @try, HTTPError, ConnectError, TimeoutError, StatusError, RequestError, current_exceptions_to_string
using LoggingExtras
import ..HTTP # for doc references

Expand All @@ -25,24 +25,6 @@ macro $(:try)(exes...)
end
end # @eval

function try_with_timeout(f, timeout)
ch = Channel(0)
timer = Timer(tm -> close(ch, TimeoutError(timeout)), timeout)
@async begin
try
put!(ch, $f())
catch e
if !(e isa HTTPError)
e = CapturedException(e, catch_backtrace())
end
close(ch, e)
finally
close(timer)
end
end
return take!(ch)
end

abstract type HTTPError <: Exception end

"""
Expand Down Expand Up @@ -97,4 +79,12 @@ struct RequestError <: HTTPError
error::Any
end

function current_exceptions_to_string(curr_exc)
buf = IOBuffer()
println(buf)
println(buf, "\n===========================\nHTTP Error message:\n")
Base.showerror(buf, curr_exc)
return String(take!(buf))
end

end # module Exceptions
5 changes: 3 additions & 2 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const DEBUG_LEVEL = Ref(0)

Base.@deprecate escape escapeuri

using Base64, Sockets, Dates, URIs, LoggingExtras, MbedTLS
using Base64, Sockets, Dates, URIs, LoggingExtras, MbedTLS, OpenSSL

function access_threaded(f, v::Vector)
tid = Threads.threadid()
Expand All @@ -24,7 +24,7 @@ end

function open end

const SOCKET_TYPE_TLS = Ref{Any}(MbedTLS.SSLContext)
const SOCKET_TYPE_TLS = Ref{Any}(OpenSSL.SSLStream)

This comment has been minimized.

Copy link
@cmcaine

cmcaine Sep 5, 2023

Contributor

This is a breaking change because it changes the accepted type for the sslconfig keyword argument that many functions in the public API of HTTP.jl accept.

Did you intend to change this? It seems unrelated to your other changes.

This comment has been minimized.

Copy link
@nickrobinson251

nickrobinson251 Sep 6, 2023

Collaborator

please can you open an issue so this comment doesn't get lost? thanks!

This comment has been minimized.

Copy link
@cmcaine

cmcaine Sep 6, 2023

Contributor

Sure. Done. I think this line and the import of OpenSSL in this file should just be reverted, tho.


include("Conditions.jl") ;using .Conditions
include("access_log.jl")
Expand Down Expand Up @@ -158,6 +158,7 @@ Supported optional keyword arguments:
- `logerrors = false`, if `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.ConnectError` will be
logged via `@error` as they happen, regardless of whether the request is then retried or not. Useful for debugging or
monitoring requests where there's worry of certain errors happening but ignored because of retries.
- `logtag = nothing`, if provided, will be used as the tag for error logging. Useful for debugging or monitoring requests.
- `observelayers = false`, if `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of
time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries
or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored
Expand Down
2 changes: 1 addition & 1 deletion src/IOExtras.jl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ else
end

tcpsocket(io::SSLContext)::TCPSocket = io.bio
tcpsocket(io::SSLStream)::TCPSocket = io.bio_read_stream.io
tcpsocket(io::SSLStream)::TCPSocket = io.io
tcpsocket(io::TCPSocket)::TCPSocket = io

localport(io) = try !isopen(tcpsocket(io)) ? 0 :
Expand Down
4 changes: 2 additions & 2 deletions src/Streams.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ IOExtras.isopen(http::Stream) = isopen(http.stream)

# Writing HTTP Messages

messagetowrite(http::Stream{<:Response}) = http.message.request
messagetowrite(http::Stream{<:Response}) = http.message.request::Request
messagetowrite(http::Stream{<:Request}) = http.message.response

IOExtras.iswritable(http::Stream) = iswritable(http.stream)
Expand Down Expand Up @@ -372,7 +372,7 @@ function IOExtras.closeread(http::Stream{<:Response})
else

# Discard body bytes that were not read...
while !eof(http)
@try Base.IOError EOFError while !eof(http)
readavailable(http)
end

Expand Down
20 changes: 12 additions & 8 deletions src/clientlayers/ConnectionRequest.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module ConnectionRequest

using URIs, Sockets, Base64, LoggingExtras
using URIs, Sockets, Base64, LoggingExtras, ConcurrentUtilities
using MbedTLS: SSLContext, SSLConfig
using OpenSSL: SSLStream
using ..Messages, ..IOExtras, ..Connections, ..Streams, ..Exceptions
Expand Down Expand Up @@ -55,7 +55,7 @@ Close the connection if the request throws an exception.
Otherwise leave it open so that it can be reused.
"""
function connectionlayer(handler)
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, kw...)
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, logtag=nothing, kw...)
local io, stream
if proxy !== nothing
target_url = req.url
Expand All @@ -79,7 +79,8 @@ function connectionlayer(handler)
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
catch e
if logerrors
@error "HTTP.ConnectError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
@error err type=Symbol("HTTP.ConnectError") method=req.method url=req.url context=req.context logtag=logtag
end
req.context[:connect_errors] = get(req.context, :connect_errors, 0) + 1
throw(ConnectError(string(url), e))
Expand All @@ -98,7 +99,7 @@ function connectionlayer(handler)
target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail
end
r = if readtimeout > 0
try_with_timeout(readtimeout) do
try_with_timeout(readtimeout) do _
connect_tunnel(io, target_url, req)
end
else
Expand All @@ -115,7 +116,7 @@ function connectionlayer(handler)
end

stream = Stream(req.response, io)
return handler(stream; readtimeout=readtimeout, logerrors=logerrors, kw...)
return handler(stream; readtimeout=readtimeout, logerrors=logerrors, logtag=logtag, kw...)
catch e
while true
if e isa CompositeException
Expand All @@ -126,8 +127,11 @@ function connectionlayer(handler)
break
end
end
if logerrors && !(e isa StatusError || e isa TimeoutError)
@error "HTTP.ConnectionRequest" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
root_err = e isa CapturedException ? e.ex : e
# don't log if it's an HTTPError since we should have already logged it
if logerrors && !(root_err isa HTTPError)
err = current_exceptions_to_string(e)
@error err type=Symbol("HTTP.ConnectionRequest") method=req.method url=req.url context=req.context logtag=logtag
end
@debugv 1 "❗️ ConnectionLayer $e. Closing: $io"
shouldreuse = false
Expand All @@ -136,7 +140,7 @@ function connectionlayer(handler)
# idempotency of the request
req.context[:nothingwritten] = true
end
e isa HTTPError || throw(RequestError(req, e))
root_err isa HTTPError || throw(RequestError(req, e))
rethrow(e)
finally
releaseconnection(io, shouldreuse; kw...)
Expand Down
9 changes: 5 additions & 4 deletions src/clientlayers/ExceptionRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ using ..IOExtras, ..Messages, ..Exceptions
Throw a `StatusError` if the request returns an error response status.
"""
function exceptionlayer(handler)
return function exceptions(stream; status_exception::Bool=true, logerrors::Bool=false, kw...)
res = handler(stream; logerrors=logerrors, kw...)
return function exceptions(stream; status_exception::Bool=true, timedout=nothing, logerrors::Bool=false, logtag=nothing, kw...)
res = handler(stream; timedout=timedout, logerrors=logerrors, logtag=logtag, kw...)
if status_exception && iserror(res)
req = res.request
req.context[:status_errors] = get(req.context, :status_errors, 0) + 1
e = StatusError(res.status, req.method, req.target, res)
if logerrors
@error "HTTP.StatusError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
if logerrors && (timedout === nothing || !timedout[])
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
@error err type=Symbol("HTTP.StatusError") method=req.method url=req.url context=req.context logtag=logtag
end
throw(e)
else
Expand Down
44 changes: 23 additions & 21 deletions src/clientlayers/StreamRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ immediately so that the transmission can be aborted if the `Response` status
indicates that the server does not wish to receive the message body.
[RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5).
"""
function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, kw...)::Response
function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, logtag=nothing, timedout=nothing, kw...)::Response
response = stream.message
req = response.request
@debugv 1 sprintcompact(req)
Expand All @@ -33,43 +33,45 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi
try
@sync begin
if iofunction === nothing
@async try
Threads.@spawn try
writebody(stream, req)
@debugv 2 "client closewrite"
closewrite(stream)
finally
req.context[:write_duration_ms] = get(req.context, :write_duration_ms, 0.0) + ((time() - write_start) * 1000)
@debugv 2 "client closewrite"
closewrite(stream)
end
read_start = time()
@async try
Threads.@spawn try
@debugv 2 "client startread"
startread(stream)
if isaborted(stream)
# The server may have closed the connection.
# Don't propagate such errors.
@try Base.IOError close(stream.stream)
if !isaborted(stream)
readbody(stream, response, decompress)
end
readbody(stream, response, decompress)
finally
req.context[:read_duration_ms] = get(req.context, :read_duration_ms, 0.0) + ((time() - read_start) * 1000)
@debugv 2 "client closeread"
closeread(stream)
end
else
iofunction(stream)
try
iofunction(stream)
finally
closewrite(stream)
closeread(stream)
end
end
end
catch e
if logerrors
@error "HTTP.IOError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context
if timedout === nothing || !timedout[]
req.context[:io_errors] = get(req.context, :io_errors, 0) + 1
if logerrors
err = current_exceptions_to_string(CapturedException(e, catch_backtrace()))
@error err type=Symbol("HTTP.IOError") method=req.method url=req.url context=req.context logtag=logtag
end
end
req.context[:io_errors] = get(req.context, :io_errors, 0) + 1
rethrow()
end

@debugv 2 "client closewrite"
closewrite(stream)
@debugv 2 "client closeread"
closeread(stream)

@debugv 1 sprintcompact(response)
@debugv 2 sprint(show, response)
return response
Expand Down Expand Up @@ -150,7 +152,7 @@ function readbody!(stream::Stream, res::Response, buf_or_stream)
# so using the default write is fastest because it utilizes
# readavailable under the hood, for which BufferStream is optimized
n = write(body, buf_or_stream)
elseif buf_or_stream isa Stream
elseif buf_or_stream isa Stream{Response}
# for HTTP.Stream, there's already an optimized read method
# that just needs an IOBuffer to write into
n = readall!(buf_or_stream, body)
Expand All @@ -161,7 +163,7 @@ function readbody!(stream::Stream, res::Response, buf_or_stream)
res.body = read(buf_or_stream)
n = length(res.body)
end
elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream
elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream{Response}
# optimization for IOBuffer response_stream to avoid temporary allocations
n = readall!(buf_or_stream, res.body)
else
Expand Down
Loading

2 comments on commit b64beb7

@quinnj
Copy link
Member Author

@quinnj quinnj commented on b64beb7 May 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator register()

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/83328

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v1.9.0 -m "<description of version>" b64beb797470ef6c96e75ef7c77d60f975fd32f1
git push origin v1.9.0

Please sign in to comment.