Skip to content

Commit

Permalink
Merge branch 'master' into ci-actions
Browse files Browse the repository at this point in the history
  • Loading branch information
DilumAluthge authored Oct 7, 2024
2 parents 0501d02 + 89d3c7d commit eae0eef
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 16 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ request(url;
[ debug = <none>, ]
[ throw = true, ]
[ downloader = <default>, ]
[ interrupt = <none>, ]
) -> Union{Response, RequestError}
```
- `url :: AbstractString`
Expand All @@ -110,6 +111,7 @@ request(url;
- `debug :: (type, message) --> Any`
- `throw :: Bool`
- `downloader :: Downloader`
- `interrupt :: Base.Event`

Make a request to the given url, returning a `Response` object capturing the
status, headers and other information about the response. The body of the
Expand All @@ -129,6 +131,11 @@ be downloaded (indicated by non-2xx status code), `request` returns a `Response`
object no matter what the status code of the response is. If there is an error
with getting a response at all, then a `RequestError` is thrown or returned.

If the `interrupt` keyword argument is provided, it must be a `Base.Event` object.
If the event is triggered while the request is in progress, the request will be
cancelled and an error will be thrown. This can be used to interrupt a long
running request, for example if the user wants to cancel a download.

### default_downloader!

```jl
Expand Down
18 changes: 14 additions & 4 deletions src/Curl/Curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ using Base: OS_HANDLE, preserve_handle, unpreserve_handle

include("utils.jl")

function __init__()
@check curl_global_init(CURL_GLOBAL_ALL)
end

const CURL_VERSION_INFO = unsafe_load(curl_version_info(CURLVERSION_NOW))
if CURL_VERSION_INFO.ssl_version == Base.C_NULL
const SSL_VERSION = ""
Expand All @@ -91,6 +87,20 @@ end
include("Easy.jl")
include("Multi.jl")

function __init__()
@check curl_global_init(CURL_GLOBAL_ALL)

# Close any Multis and their timers at exit that haven't been finalized by then
Base.atexit() do
while true
w = @lock MULTIS_LOCK (isempty(MULTIS) ? nothing : pop!(MULTIS))
w === nothing && break
w = w.value
w isa Multi && done!(w)
end
end
end

function with_handle(f, handle::Union{Multi, Easy})
try f(handle)
finally
Expand Down
6 changes: 5 additions & 1 deletion src/Curl/Easy.jl
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,11 @@ function upload_data(easy::Easy, input::IO)
curl_easy_pause(easy.handle, Curl.CURLPAUSE_CONT)
wait(easy.ready)
easy.input === nothing && break
easy.ready = Threads.Event()
if hasmethod(reset, (Base.Event,))
reset(easy.ready)
else
easy.ready = Threads.Event()
end
end
end

Expand Down
13 changes: 4 additions & 9 deletions src/Curl/Multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,6 @@ end

const MULTIS_LOCK = Base.ReentrantLock()
const MULTIS = WeakRef[]
# Close any Multis and their timers at exit that haven't been finalized by then
Base.atexit() do
while true
w = @lock MULTIS_LOCK (isempty(MULTIS) ? nothing : pop!(MULTIS))
w === nothing && break
w = w.value
w isa Multi && done!(w)
end
end

function remove_handle(multi::Multi, easy::Easy)
lock(multi.lock) do
Expand Down Expand Up @@ -201,6 +192,10 @@ function socket_callback(
end
end
@isdefined(errormonitor) && errormonitor(task)
else
lock(multi.lock) do
check_multi_info(multi)
end
end
@isdefined(old_watcher) && close(old_watcher)
return 0
Expand Down
43 changes: 41 additions & 2 deletions src/Downloads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ end
[ debug = <none>, ]
[ throw = true, ]
[ downloader = <default>, ]
[ interrupt = <none>, ]
) -> Union{Response, RequestError}
url :: AbstractString
Expand All @@ -299,6 +300,7 @@ end
debug :: (type, message) --> Any
throw :: Bool
downloader :: Downloader
interrupt :: Base.Event
Make a request to the given url, returning a `Response` object capturing the
status, headers and other information about the response. The body of the
Expand All @@ -317,6 +319,11 @@ Note that unlike `download` which throws an error if the requested URL could not
be downloaded (indicated by non-2xx status code), `request` returns a `Response`
object no matter what the status code of the response is. If there is an error
with getting a response at all, then a `RequestError` is thrown or returned.
If the `interrupt` keyword argument is provided, it must be a `Base.Event` object.
If the event is triggered while the request is in progress, the request will be
cancelled and an error will be thrown. This can be used to interrupt a long
running request, for example if the user wants to cancel a download.
"""
function request(
url :: AbstractString;
Expand All @@ -330,6 +337,7 @@ function request(
debug :: Union{Function, Nothing} = nothing,
throw :: Bool = true,
downloader :: Union{Downloader, Nothing} = nothing,
interrupt :: Union{Nothing, Base.Event} = nothing,
) :: Union{Response, RequestError}
if downloader === nothing
lock(DOWNLOAD_LOCK) do
Expand Down Expand Up @@ -388,6 +396,22 @@ function request(

# do the request
add_handle(downloader.multi, easy)
interrupted = Threads.Atomic{Bool}(false)
if interrupt !== nothing
interrupt_task = @async begin
# wait for the interrupt event
wait(interrupt)
# cancel the request
remove_handle(downloader.multi, easy)
close(easy.output)
close(easy.progress)
interrupted[] = true
close(input)
notify(easy.ready)
end
else
interrupt_task = nothing
end
try # ensure handle is removed
@sync begin
@async for buf in easy.output
Expand All @@ -403,14 +427,28 @@ function request(
end
end
finally
remove_handle(downloader.multi, easy)
if !(interrupted[])
if interrupt_task !== nothing
# trigger interrupt
notify(interrupt)
wait(interrupt_task)
else
remove_handle(downloader.multi, easy)
end
end
end

# return the response or throw an error
response = Response(get_response_info(easy)...)
easy.code == Curl.CURLE_OK && return response
message = get_curl_errstr(easy)
response = RequestError(url, easy.code, message, response)
if easy.code == typemax(Curl.CURLcode)
# uninitialized code, likely a protocol error
code = Int(0)
else
code = Int(easy.code)
end
response = RequestError(url, code, message, response)
throw && Base.throw(response)
end
end
Expand Down Expand Up @@ -468,6 +506,7 @@ end

# Precompile
let
Curl.__init__()
d = Downloader()
f = mktemp()[1]
download("file://" * f; downloader=d)
Expand Down
23 changes: 23 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,29 @@ include("setup.jl")
end
end

@testset "interrupt" begin
url = "$server/delay/10"
interrupt = Base.Event()
download_task = @async request(url; interrupt=interrupt)
sleep(0.1)
@test !istaskdone(download_task)
notify(interrupt)
timedwait(()->istaskdone(download_task), 5.0)
@test istaskdone(download_task)
@test download_task.result isa RequestError

interrupt = Base.Event()
url = "$server/put"
input=`sh -c 'sleep 15; echo "hello"'`
download_task = @async request(url; interrupt=interrupt, input=input)
sleep(0.1)
@test !istaskdone(download_task)
notify(interrupt)
timedwait(()->istaskdone(download_task), 5.0)
@test istaskdone(download_task)
@test download_task.result isa RequestError
end

@testset "progress" begin
url = "$server/drip"
progress = []
Expand Down

0 comments on commit eae0eef

Please sign in to comment.