From 1061ecc377a053fce0df94e1a19e5260f7c030f5 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Wed, 28 Aug 2024 18:07:38 +0100 Subject: [PATCH 1/3] Fix setting atexit. Fixes trailing download tasks during precompilation (#257) --- src/Curl/Curl.jl | 18 ++++++++++++++---- src/Curl/Multi.jl | 9 --------- src/Downloads.jl | 1 + 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/Curl/Curl.jl b/src/Curl/Curl.jl index cc1b5b1..fd9b4c9 100644 --- a/src/Curl/Curl.jl +++ b/src/Curl/Curl.jl @@ -65,10 +65,6 @@ using Base: OS_HANDLE, preserve_handle, unpreserve_handle include("utils.jl") -function __init__() - @check curl_global_init(CURL_GLOBAL_ALL) -end - const CURL_VERSION_INFO = unsafe_load(curl_version_info(CURLVERSION_NOW)) if CURL_VERSION_INFO.ssl_version == Base.C_NULL const SSL_VERSION = "" @@ -91,6 +87,20 @@ end include("Easy.jl") include("Multi.jl") +function __init__() + @check curl_global_init(CURL_GLOBAL_ALL) + + # Close any Multis and their timers at exit that haven't been finalized by then + Base.atexit() do + while true + w = @lock MULTIS_LOCK (isempty(MULTIS) ? nothing : pop!(MULTIS)) + w === nothing && break + w = w.value + w isa Multi && done!(w) + end + end +end + function with_handle(f, handle::Union{Multi, Easy}) try f(handle) finally diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index c6ed079..33a606b 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -55,15 +55,6 @@ end const MULTIS_LOCK = Base.ReentrantLock() const MULTIS = WeakRef[] -# Close any Multis and their timers at exit that haven't been finalized by then -Base.atexit() do - while true - w = @lock MULTIS_LOCK (isempty(MULTIS) ? nothing : pop!(MULTIS)) - w === nothing && break - w = w.value - w isa Multi && done!(w) - end -end function remove_handle(multi::Multi, easy::Easy) lock(multi.lock) do diff --git a/src/Downloads.jl b/src/Downloads.jl index 5a3c214..ca1710c 100644 --- a/src/Downloads.jl +++ b/src/Downloads.jl @@ -468,6 +468,7 @@ end # Precompile let + Curl.__init__() d = Downloader() f = mktemp()[1] download("file://" * f; downloader=d) From df33406c5b6d4a4f21a145d12925e5b07c24152b Mon Sep 17 00:00:00 2001 From: Tanmay Mohapatra Date: Tue, 3 Sep 2024 18:13:37 +0530 Subject: [PATCH 2/3] gracefully cancel a request (#256) * gracefully cancel a request Adds a way to gracefully cancel an ongoing request. The `request` method accepts an additional `interrupt` keyword which can be a `Base.Event`. When it is triggered, the [`curl_multi_remove_handle`](https://curl.se/libcurl/c/curl_multi_remove_handle.html) is invoked, which interrupts the easy handle gracefully. It closes the `output` and `progress` channels of the `Easy` handle to unblock the waiting request task, which then terminates with a `RequestError`. --- README.md | 7 +++++++ src/Curl/Multi.jl | 4 ++++ src/Downloads.jl | 40 ++++++++++++++++++++++++++++++++++++++-- test/runtests.jl | 12 ++++++++++++ 4 files changed, 61 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b1cbb3b..d2ab82c 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,7 @@ request(url; [ debug = , ] [ throw = true, ] [ downloader = , ] + [ interrupt = , ] ) -> Union{Response, RequestError} ``` - `url :: AbstractString` @@ -110,6 +111,7 @@ request(url; - `debug :: (type, message) --> Any` - `throw :: Bool` - `downloader :: Downloader` +- `interrupt :: Base.Event` Make a request to the given url, returning a `Response` object capturing the status, headers and other information about the response. The body of the @@ -129,6 +131,11 @@ be downloaded (indicated by non-2xx status code), `request` returns a `Response` object no matter what the status code of the response is. If there is an error with getting a response at all, then a `RequestError` is thrown or returned. +If the `interrupt` keyword argument is provided, it must be a `Base.Event` object. +If the event is triggered while the request is in progress, the request will be +cancelled and an error will be thrown. This can be used to interrupt a long +running request, for example if the user wants to cancel a download. + ### default_downloader! ```jl diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index 33a606b..d2be032 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -192,6 +192,10 @@ function socket_callback( end end @isdefined(errormonitor) && errormonitor(task) + else + lock(multi.lock) do + check_multi_info(multi) + end end @isdefined(old_watcher) && close(old_watcher) return 0 diff --git a/src/Downloads.jl b/src/Downloads.jl index ca1710c..7ad35ab 100644 --- a/src/Downloads.jl +++ b/src/Downloads.jl @@ -286,6 +286,7 @@ end [ debug = , ] [ throw = true, ] [ downloader = , ] + [ interrupt = , ] ) -> Union{Response, RequestError} url :: AbstractString @@ -299,6 +300,7 @@ end debug :: (type, message) --> Any throw :: Bool downloader :: Downloader + interrupt :: Base.Event Make a request to the given url, returning a `Response` object capturing the status, headers and other information about the response. The body of the @@ -317,6 +319,11 @@ Note that unlike `download` which throws an error if the requested URL could not be downloaded (indicated by non-2xx status code), `request` returns a `Response` object no matter what the status code of the response is. If there is an error with getting a response at all, then a `RequestError` is thrown or returned. + +If the `interrupt` keyword argument is provided, it must be a `Base.Event` object. +If the event is triggered while the request is in progress, the request will be +cancelled and an error will be thrown. This can be used to interrupt a long +running request, for example if the user wants to cancel a download. """ function request( url :: AbstractString; @@ -330,6 +337,7 @@ function request( debug :: Union{Function, Nothing} = nothing, throw :: Bool = true, downloader :: Union{Downloader, Nothing} = nothing, + interrupt :: Union{Nothing, Base.Event} = nothing, ) :: Union{Response, RequestError} if downloader === nothing lock(DOWNLOAD_LOCK) do @@ -388,6 +396,20 @@ function request( # do the request add_handle(downloader.multi, easy) + interrupted = false + if interrupt !== nothing + interrupt_task = @async begin + # wait for the interrupt event + wait(interrupt) + # cancel the request + remove_handle(downloader.multi, easy) + close(easy.output) + close(easy.progress) + interrupted = true + end + else + interrupt_task = nothing + end try # ensure handle is removed @sync begin @async for buf in easy.output @@ -403,14 +425,28 @@ function request( end end finally - remove_handle(downloader.multi, easy) + if !interrupted + if interrupt_task !== nothing + # trigger interrupt + notify(interrupt) + wait(interrupt_task) + else + remove_handle(downloader.multi, easy) + end + end end # return the response or throw an error response = Response(get_response_info(easy)...) easy.code == Curl.CURLE_OK && return response message = get_curl_errstr(easy) - response = RequestError(url, easy.code, message, response) + if easy.code == typemax(Curl.CURLcode) + # uninitialized code, likely a protocol error + code = Int(0) + else + code = Int(easy.code) + end + response = RequestError(url, code, message, response) throw && Base.throw(response) end end diff --git a/test/runtests.jl b/test/runtests.jl index 76db9e8..f4b3ffb 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -468,6 +468,18 @@ include("setup.jl") end end + @testset "interrupt" begin + url = "$server/delay/10" + interrupt = Base.Event() + download_task = @async request(url; interrupt=interrupt) + sleep(0.1) + @test !istaskdone(download_task) + notify(interrupt) + timedwait(()->istaskdone(download_task), 5.0) + @test istaskdone(download_task) + @test download_task.result isa RequestError + end + @testset "progress" begin url = "$server/drip" progress = [] From 89d3c7dded535a77551e763a437a6d31e4d9bf84 Mon Sep 17 00:00:00 2001 From: Tanmay Mohapatra Date: Mon, 9 Sep 2024 15:38:24 +0530 Subject: [PATCH 3/3] fix cancelling upload requests (#259) Cancelling an upload (PUT) request using the mechanism introduced in #256 was not effective. The upload task was not interrupted, which still blocked and the call to `request` did not return. With this change, cancelling also closes the `input` stream of the request to unblock the upload task. Also changed the `interrupted` variable to be an `Atomic{Bool}`. Ref discussion [here](https://github.com/JuliaLang/Downloads.jl/pull/256#discussion_r1742148570). --- src/Curl/Easy.jl | 6 +++++- src/Downloads.jl | 8 +++++--- test/runtests.jl | 11 +++++++++++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/Curl/Easy.jl b/src/Curl/Easy.jl index cf63f50..726d663 100644 --- a/src/Curl/Easy.jl +++ b/src/Curl/Easy.jl @@ -375,7 +375,11 @@ function upload_data(easy::Easy, input::IO) curl_easy_pause(easy.handle, Curl.CURLPAUSE_CONT) wait(easy.ready) easy.input === nothing && break - easy.ready = Threads.Event() + if hasmethod(reset, (Base.Event,)) + reset(easy.ready) + else + easy.ready = Threads.Event() + end end end diff --git a/src/Downloads.jl b/src/Downloads.jl index 7ad35ab..1fe95fa 100644 --- a/src/Downloads.jl +++ b/src/Downloads.jl @@ -396,7 +396,7 @@ function request( # do the request add_handle(downloader.multi, easy) - interrupted = false + interrupted = Threads.Atomic{Bool}(false) if interrupt !== nothing interrupt_task = @async begin # wait for the interrupt event @@ -405,7 +405,9 @@ function request( remove_handle(downloader.multi, easy) close(easy.output) close(easy.progress) - interrupted = true + interrupted[] = true + close(input) + notify(easy.ready) end else interrupt_task = nothing @@ -425,7 +427,7 @@ function request( end end finally - if !interrupted + if !(interrupted[]) if interrupt_task !== nothing # trigger interrupt notify(interrupt) diff --git a/test/runtests.jl b/test/runtests.jl index f4b3ffb..75526c4 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -478,6 +478,17 @@ include("setup.jl") timedwait(()->istaskdone(download_task), 5.0) @test istaskdone(download_task) @test download_task.result isa RequestError + + interrupt = Base.Event() + url = "$server/put" + input=`sh -c 'sleep 15; echo "hello"'` + download_task = @async request(url; interrupt=interrupt, input=input) + sleep(0.1) + @test !istaskdone(download_task) + notify(interrupt) + timedwait(()->istaskdone(download_task), 5.0) + @test istaskdone(download_task) + @test download_task.result isa RequestError end @testset "progress" begin