Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternate Downloads.jl-based backend (alternative to HTTP.jl) #396

Merged
merged 24 commits into from
Aug 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d4c8f91
add downloads backend
ericphanson Jul 19, 2021
cae56c4
test with it
ericphanson Jul 19, 2021
9b917fa
bump CI to 1.3
ericphanson Jul 19, 2021
c8329a9
fix import
ericphanson Jul 19, 2021
e66dca9
Update .github/workflows/CI.yml
ericphanson Jul 27, 2021
96a96ea
Update src/utilities/downloads_backend.jl
ericphanson Jul 27, 2021
2fd84d4
fix syntax for 1.3
ericphanson Jul 27, 2021
8311723
add timeout
ericphanson Jul 27, 2021
31e126d
Seek to start of body on every attempt
christopher-dG Aug 12, 2021
383eced
Set Content-Length
christopher-dG Aug 12, 2021
d979da4
Test with explicit HTTPBackend
christopher-dG Aug 12, 2021
bffb900
More correct Content-Length
christopher-dG Aug 12, 2021
a549d87
Support `response_stream` and `return_stream`
christopher-dG Aug 13, 2021
a370fd1
Support other IOs, hopefully
christopher-dG Aug 13, 2021
93d4a29
Fix for return_stream, make sure test doesn't hang
christopher-dG Aug 13, 2021
be7adb1
oops, it's a function
christopher-dG Aug 13, 2021
3bf0a31
Always set `request.response_stream` to an IO
christopher-dG Aug 13, 2021
10d81ee
Apply suggestions from code review
christopher-dG Aug 13, 2021
73c0bbf
Safer(?) `read_body`
christopher-dG Aug 13, 2021
5d50a61
Run some tests on both backends
christopher-dG Aug 13, 2021
9783836
Replace old user agent after test
christopher-dG Aug 13, 2021
9c2da08
The tests are too fast
christopher-dG Aug 13, 2021
1c6c1bf
Safer user agent test
christopher-dG Aug 13, 2021
8d63853
Update bors config to look for 1.3 status
christopher-dG Aug 13, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }}
runs-on: ${{ matrix.os }}
timeout-minutes: 30
continue-on-error: ${{ matrix.version == 'nightly' }}
strategy:
fail-fast: false
Expand All @@ -25,9 +26,9 @@ jobs:
arch:
- x64
include:
# Add a 1.0 job just to make sure we still support it
# Add a job using the earliest version of Julia supported by this package
- os: ubuntu-latest
version: 1.0.5
version: 1.3
arch: x64
# Add a 1.5 job because that's what Invenia actually uses
- os: ubuntu-latest
Expand Down
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ version = "1.56.0"
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
Compat = "34da2185-b29b-5c13-b0c7-acf172513d20"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6"
GitHub = "bc5e4493-9b4d-5f90-b8aa-2b2bcaad7a26"
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
IniFile = "83e8ac13-25f8-5344-8a64-a9f2b223428f"
Expand All @@ -32,7 +33,7 @@ OrderedCollections = "1"
Retry = "0.3, 0.4"
URIs = "1"
XMLDict = "0.3, 0.4"
julia = "1"
julia = "1.3"
mattBrzezinski marked this conversation as resolved.
Show resolved Hide resolved

[extras]
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
Expand Down
2 changes: 1 addition & 1 deletion bors.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
status = [
"Julia 1.0.5 - ubuntu-latest - x64",
"Julia 1.3 - ubuntu-latest - x64",
"Julia 1.5 - ubuntu-latest - x64",
"Julia 1 - macOS-latest - x64",
"Julia 1 - ubuntu-latest - x64",
Expand Down
2 changes: 2 additions & 0 deletions src/AWS.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module AWS
using Compat: Compat, @something
using Base64
using Dates
using Downloads: Downloads, Downloader, Curl
using HTTP
using MbedTLS
using Mocking
Expand Down Expand Up @@ -33,6 +34,7 @@ include("AWSMetadata.jl")

include(joinpath("utilities", "request.jl"))
include(joinpath("utilities", "sign.jl"))
include(joinpath("utilities", "downloads_backend.jl"))


using ..AWSExceptions
Expand Down
2 changes: 1 addition & 1 deletion src/AWSCredentials.jl
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ function credentials_from_webtoken()
role_creds["SessionToken"],
assumed_role_user["Arn"];
expiry=DateTime(rstrip(role_creds["Expiration"], 'Z')),
renew=credentials_from_webtoken
renew=credentials_from_webtoken,
)
end

Expand Down
115 changes: 115 additions & 0 deletions src/utilities/downloads_backend.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
struct DownloadsBackend <: AWS.AbstractBackend
downloader::Union{Nothing, Downloads.Downloader}
end

DownloadsBackend() = DownloadsBackend(nothing)

const AWS_DOWNLOADER = Ref{Union{Nothing, Downloader}}(nothing)
const AWS_DOWNLOAD_LOCK = ReentrantLock()

# Here we mimic Download.jl's own setup for using a global downloader.
# We do this to have our own downloader (separate from Downloads.jl's global downloader)
# because we add a hook to avoid redirects in order to try to match the HTTPBackend's
# implementation, and we don't want to mutate the global downloader from Downloads.jl.
# https://github.com/JuliaLang/Downloads.jl/blob/84e948c02b8a0625552a764bf90f7d2ee97c949c/src/Downloads.jl#L293-L301
function get_downloader(downloader=nothing)
lock(AWS_DOWNLOAD_LOCK) do
yield() # let other downloads finish
downloader isa Downloader && return
while true
downloader = AWS_DOWNLOADER[]
downloader isa Downloader && return
D = Downloader()
D.easy_hook = (easy, info) -> Curl.setopt(easy, Curl.CURLOPT_FOLLOWLOCATION, false)
AWS_DOWNLOADER[] = D
end
end
return downloader
end
Comment on lines +15 to +28
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to use the formulation from JuliaLang/Downloads.jl#136 instead

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can leave it until it gets merged upstream


# https://github.com/JuliaWeb/HTTP.jl/blob/2a03ca76376162ffc3423ba7f15bd6d966edff9b/src/MessageRequest.jl#L84-L85
body_length(x::AbstractVector{UInt8}) = length(x)
body_length(x::AbstractString) = sizeof(x)
mattBrzezinski marked this conversation as resolved.
Show resolved Hide resolved

read_body(x::IOBuffer) = take!(x)
function read_body(x::IO)
close(x)
return read(x)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just reading through this, I don't understand close(x); read(x) — for normal files read(x) will return zero bytes after it's closed. (For IOBuffer it throws an exception).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a workaround specifically for Base.BufferStream, without closeing it beforehand, read hangs. readavailable works as well and that was my first choice, but @ericphanson pointed out this warning:

  │ Warning
  │
  │  The amount of data returned is implementation-dependent; for example it can depend on the internal choice of buffer size. Other
  │  functions such as read should generally be used instead.

I'm no expert when it comes to the various IO types and how to properly handle them all, so any feedback is good feedback!

Copy link
Contributor

@c42f c42f Aug 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think perhaps we just need to use read() here and hope for the best? It turns out there's a lot of inconsistency in the blocking behavior of streams in Julia — see the issue here JuliaLang/julia#24526

Perhaps for this case, the newly available — but strangely named — Base.shutdown() is what you want JuliaLang/julia#40783

But even so, I'm not sure that AWS should be unilaterally closing or shutting down the write side of the the stream at all — to me that seems to be the business of the code which passes in the stream. For example, if reading from a pipe, presumably you want to keep reading until the writing side decides it's done?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, seems like whoever was filling the buffer needs to call shutdown (or close) to mark it as being done

Copy link
Member Author

@ericphanson ericphanson Aug 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that AWS should be unilaterally closing or shutting down the write side of the the stream at all — to me that seems to be the business of the code which passes in the stream.

Well, we only do it if return_stream is false, in which case you are supposed to get back some bytes as far as I understand it, and it seems like we need to close it to get those bytes.

Agreed, seems like whoever was filling the buffer needs to call shutdown (or close) to mark it as being done

I’m not sure who that is- Downloads.jl fills it but I don’t think they should be closing it (because of exactly what @c42f just said).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, somebody was wrong before, and now AWS.jl is now possibly also very bad at stream EOF handling here too. Whoever created the stream is generally responsible for marking when it is done, or passing it to another function which will take care of that. I suspect Downloads.jl absolutely should be copying the EOF marker from the underlying stream also. It is out-of-band data, but likely just as relevant as the in-band bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure who that is- Downloads.jl fills it but I don’t think they should be closing it

Perhaps I've misunderstood what's going on here, but is the problem in Downloads.jl then? Shouldn't they close the write side when writing is done?

Copy link
Member Author

@ericphanson ericphanson Aug 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I 100% don't understand what's going on. But let me try to summarize the current behavior (including this PR).

Summary

The HTTPBackend only sets a response_stream if both the user has not passed one and they ask for return_stream=true, whereas with the DownloadsBackend we create one whenever the user has not passed one. This is because Downloads.jl wants you to pass an IO object to get the response always, unlike HTTP.jl which will either populate a stream you pass or return a vector of bytes directly.

  • Case 1: user passes response_stream, sets return_stream=true
    • neither backend creates a stream, nor closes it ✔️
  • Case 2: user passes response_stream, sets return_stream=false
    • HTTP backend passes the stream along; HTTP.jl does not return a body, so the caller does not get back a body but does have their stream populated. No one closes the stream. Seems correct ✔️
    • Downloads backend passes the stream along, but closes the stream and collects the bytes to return. Seems wrong ❌
  • Case 3: user does not pass response_stream, sets return_stream=true
    • Both backends create a stream and return it; neither closes it. Not sure if this is correct or not ❓
  • Case 4: user does not pass response_stream, and sets return_stream=false
    • HTTPBackend does not create a stream, just gets the bytes from the body HTTP.jl returns ✔️
    • DownloadsBackend internally creates a stream, then closes it and collects the bytes to return ✔️

If I'm understanding then, I think Case 2 is probematic for the DownloadsBackend, and we should just not try to return bytes in this case, matching the behavior of the HTTP backend. I.e. we should not be closing the stream in this case.

Case 3: I don't really know if we should close the stream or not. I think ideally this case wouldn't exist, because I don't think there really should be a return_stream option-- I think a simpler API is "if you want a stream you should pass one and it will get populated (and not closed); if you don't want a stream, don't pass one and you will get a vector of bytes back".

So: if that analysis is right, Downloads.jl is fine, and we have at least one problematic case here. But I'll admit to definitely not fully understanding the semantics of how streams should work (and the manual isn't very helpful in this regard), though the discussion here helps me a bit.

Though possibly, HTTP.jl and Downloads.jl should be closing the streams whenever they are passed and the backend is done populating them? I think maybe that's what @vtjnash is saying in #396 (comment). In which case most/all of these cases are wrong for both backends.

Appendix: Following the path of get_object

To see these cases, play yout, we let's say we are doing an s3 get_object.

  • We start

    AWS.jl/src/services/s3.jl

    Lines 2239 to 2246 in 181c340

    function get_object(
    Bucket,
    Key,
    params::AbstractDict{String};
    aws_config::AbstractAWSConfig=global_aws_config(),
    )
    return s3("GET", "/$(Bucket)/$(Key)", params; aws_config=aws_config)
    end
    and possibly submit params which is a dictionary with additional parameters
  • we use those params to build Request, changing the name from params to args:

    AWS.jl/src/AWS.jl

    Lines 213 to 217 in 181c340

    request = Request(;
    _extract_common_kw_args(service, args)...,
    request_method=request_method,
    content=_pop!(args, "body", ""),
    )
  • we populate with default values:
    function _extract_common_kw_args(service, args)
    return (
    service=service.signing_name,
    api_version=service.api_version,
    return_stream=_pop!(args, "return_stream", false),
    return_raw=_pop!(args, "return_raw", false),
    response_stream=_pop!(args, "response_stream", nothing),
    headers=LittleDict{String,String}(_pop!(args, "headers", [])),
    http_options=_pop!(args, "http_options", LittleDict{Symbol,String}()),
    response_dict_type=_pop!(args, "response_dict_type", LittleDict),
    backend=_pop!(args, "backend", DEFAULT_BACKEND[]),
    )
    end
  • we end up with our Request object:
    Base.@kwdef mutable struct Request
    service::String
    api_version::String
    request_method::String
    headers::AbstractDict{String,String} = LittleDict{String,String}()
    content::Union{String,Vector{UInt8}} = ""
    resource::String = ""
    url::String = ""
    return_stream::Bool = false
    response_stream::Union{<:IO,Nothing} = nothing
    http_options::AbstractDict{Symbol,<:Any} = LittleDict{Symbol,String}()
    return_raw::Bool = false
    response_dict_type::Type{<:AbstractDict} = LittleDict
    backend::AbstractBackend = DEFAULT_BACKEND[]
    end
  • We pass it to submit_request:
    function submit_request(
    aws::AbstractAWSConfig, request::Request; return_headers::Bool=false
    )

Here, what happens depends on which backend we are using. This PR is about the new DownloadsBackend.

Using the DownloadsBackend

  • submit_request calls _http_request and if we are using a DownloadsBackend, we end up at the method in question,
    function _http_request(backend::DownloadsBackend, request)
  • Here, if user has not passed a response_stream, we create a default one:
    if request.response_stream === nothing
    request.response_stream = IOBuffer()
    end
  • And if they have not specified return_stream==true, we arrange to collect the bytes from the response stream, including possibly closing the response_stream:
    body_arg = if request.request_method == "HEAD" || request.return_stream
    () -> NamedTuple()
    else
    () -> (; body=read_body(request.response_stream))
    end

Using the HTTPBackend

If the user is using the existant HTTP.jl backend, what happens?

  • submit_request calls _http_request with an HTTPBackend, leading us to this method:
    function _http_request(http_backend::HTTPBackend, request::Request)
    http_options = merge(http_backend.http_options, request.http_options)
    @repeat 4 try
    http_stack = HTTP.stack(; redirect=false, retry=false, aws_authorization=false)
    if request.return_stream && request.response_stream === nothing
    request.response_stream = Base.BufferStream()
    end
    return @mock HTTP.request(
    http_stack,
    request.request_method,
    HTTP.URI(request.url),
    HTTP.mkheaders(request.headers),
    request.content;
    require_ssl_verification=false,
    response_stream=request.response_stream,
    http_options...,
    )
    catch e
    # Base.IOError is needed because HTTP.jl can often have errors that aren't
    # caught and wrapped in an HTTP.IOError
    # https://github.com/JuliaWeb/HTTP.jl/issues/382
    @delay_retry if isa(e, Sockets.DNSError) ||
    isa(e, HTTP.ParseError) ||
    isa(e, HTTP.IOError) ||
    isa(e, Base.IOError) ||
    (isa(e, HTTP.StatusError) && _http_status(e) >= 500)
    end
    end
    end
  • There we create a response_stream if both the user has not passed one and they ask for return_stream=true:
    if request.return_stream && request.response_stream === nothing
    request.response_stream = Base.BufferStream()
    end
  • We pass the possibly-nothing request.response_stream to HTTP.request:
    response_stream=request.response_stream,
  • HTTP.jl will give us back a body if they were not given a response_stream, otherwise they put the bytes in the stream: https://github.com/JuliaWeb/HTTP.jl/blob/a2b467e24c9bbd45691f9d0f57b57ee7463bd15a/src/HTTP.jl#L77-L78

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've filed #433 so we don't forget about this

end

function _http_request(backend::DownloadsBackend, request)
# If we pass `output`, older versions of Downloads.jl will
# expect a message body in the response. Specifically, it sets
# <https://curl.se/libcurl/c/CURLOPT_NOBODY.html>
# only when we do not pass the `output` argument.
# See <https://github.com/JuliaLang/Downloads.jl/issues/131>.
#
# When the method is `HEAD`, the response may have a Content-Length
# but not send any content back (which appears to be correct,
# <https://stackoverflow.com/a/18925736/12486544>).
#
# Thus, if we did not set `CURLOPT_NOBODY`, and it gets a Content-Length
# back, it will hang waiting for that body.
#
# Therefore, we do not pass an `output` when the `request_method` is `HEAD`.
# (Note: this is fixed on the latest Downloads.jl, but we include this workaround
# for compatability).
if request.response_stream === nothing
request.response_stream = IOBuffer()
end
output_arg = if request.request_method == "HEAD"
NamedTuple()
else
(; output=request.response_stream)
end

# If we're going to return the stream, we don't want to read the body into an
# HTTP.Response we're never going to use. If we do that, the returned stream
# will have no data available (and reading from it could hang forever).
body_arg = if request.request_method == "HEAD" || request.return_stream
() -> NamedTuple()
else
() -> (; body = read_body(request.response_stream))
end

# HTTP.jl sets this header automatically.
request.headers["Content-Length"] = string(body_length(request.content))

# We pass an `input` only when we have content we wish to send.
input = IOBuffer()
input_arg = if !isempty(request.content)
write(input, request.content)
(; input=input)
else
NamedTuple()
end

@repeat 4 try
downloader = @something(backend.downloader, get_downloader())
# set the hook so that we don't follow redirects. Only
# need to do this on per-request downloaders, because we
# set our global one with this hook already.
if backend.downloader !== nothing && downloader.easy_hook === nothing
downloader.easy_hook = (easy, info) -> Curl.setopt(easy, Curl.CURLOPT_FOLLOWLOCATION, false)
end

# We seekstart on every attempt, otherwise every attempt
# but the first will send an empty payload.
seekstart(input)

response = Downloads.request(request.url; input_arg..., output_arg...,
method = request.request_method,
headers = request.headers, verbose=false, throw=true,
downloader=downloader)

http_response = HTTP.Response(response.status, response.headers; body_arg()..., request=nothing)

if HTTP.iserror(http_response)
target = HTTP.resource(HTTP.URI(request.url))
throw(HTTP.StatusError(http_response.status, request.request_method, target, http_response))
end
return http_response
catch e
@delay_retry if ((isa(e, HTTP.StatusError) && AWS._http_status(e) >= 500) || isa(e, Downloads.RequestError)) end
end
end
14 changes: 10 additions & 4 deletions test/AWS.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ end
end

@testset "set user agent" begin
old_user_agent = AWS.user_agent[]
new_user_agent = "new user agent"

@test AWS.user_agent[] == "AWS.jl/1.0.0"
set_user_agent(new_user_agent)
@test AWS.user_agent[] == new_user_agent
try
@test AWS.user_agent[] == "AWS.jl/1.0.0"
set_user_agent(new_user_agent)
@test AWS.user_agent[] == new_user_agent
finally
set_user_agent(old_user_agent)
end
end

@testset "sign" begin
Expand Down Expand Up @@ -365,6 +370,7 @@ end
api_version="api_version",
request_method="GET",
url="https://s3.us-east-1.amazonaws.com/sample-bucket",
backend=AWS.HTTPBackend(),
)
apply(Patches._http_options_patch) do
# No default options
Expand Down Expand Up @@ -393,7 +399,7 @@ end
api_version="api_version",
request_method="GET",
url="https://s3.us-east-1.amazonaws.com/sample-bucket",
backend = TestBackend(4)
backend = TestBackend(4),
)
@test AWS._http_request(request.backend, request) == 4

Expand Down
2 changes: 1 addition & 1 deletion test/AWSCredentials.jl
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ end
@test result.user_arn == "$(role_arn)/$(session_name)"
@test result.renew == credentials_from_webtoken
expiry = result.expiry

sleep(0.1)
result = check_credentials(result)

@test result.access_key_id == access_key
Expand Down
6 changes: 5 additions & 1 deletion test/issues.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ end
# => BUG: header `response_stream` is pushed into the query...
io = Base.BufferStream()
S3.get_object(bucket_name, file_name, Dict("response_stream"=>io, "return_stream"=>true))
@test String(read(io)) == body
if bytesavailable(io) > 0
@test String(readavailable(io)) == body
else
@test "no body data was available" == body
end

finally
S3.delete_object(bucket_name, file_name)
Expand Down
15 changes: 10 additions & 5 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,20 @@ function _now_formatted()
end

@testset "AWS.jl" begin
include("AWS.jl")
include("AWSCredentials.jl")
include("AWSExceptions.jl")
include("AWSMetadataUtilities.jl")
include("issues.jl")
include("test_pkg.jl")
include("utilities.jl")

if haskey(ENV, "TEST_MINIO")
include("minio.jl")
backends = [AWS.HTTPBackend, AWS.DownloadsBackend]
@testset "Backend: $(nameof(backend))" for backend in backends
AWS.DEFAULT_BACKEND[] = backend()
include("AWS.jl")
include("AWSCredentials.jl")
include("issues.jl")

if haskey(ENV, "TEST_MINIO")
include("minio.jl")
end
end
end