-
Notifications
You must be signed in to change notification settings - Fork 181
/
StreamRequest.jl
136 lines (110 loc) · 3.46 KB
/
StreamRequest.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
module StreamRequest
import ..Layer, ..request
using ..IOExtras
using ..Messages
using ..Streams
import ..ConnectionPool
using ..MessageRequest
import ..@debug, ..DEBUG_LEVEL, ..printlncompact
"""
request(StreamLayer, ::IO, ::Request, body) -> HTTP.Response
Create a [`Stream`](@ref) to send a `Request` and `body` to an `IO`
stream and read the response.
Send the `Request` body in a background task and begins reading the response
immediately so that the transmission can be aborted if the `Response` status
indicates that the server does not wish to receive the message body.
[RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5).
"""
abstract type StreamLayer <: Layer end
export StreamLayer
function request(::Type{StreamLayer}, io::IO, request::Request, body;
response_stream=nothing,
iofunction=nothing,
verbose::Int=0,
kw...)::Response
verbose == 1 && printlncompact(request)
response = request.response
http = Stream(response, io)
startwrite(http)
if verbose == 2
println(request)
if iofunction == nothing && request.body === body_is_a_stream
println("$(typeof(request)).body: $(sprint(showcompact, body))")
end
end
if !isidempotent(request)
# Wait for pipelined reads to complete
# before sending non-idempotent request body.
startread(io)
end
aborted = false
try
@sync begin
if iofunction == nothing
@async writebody(http, request, body)
startread(http)
readbody(http, response, response_stream)
else
iofunction(http)
end
if isaborted(http)
close(io)
aborted = true
end
end
catch e
if e isa CompositeException
e = first(e.exceptions).ex
end
if aborted && isioerror(e)
@debug 1 "⚠️ $(response.status) abort exception excpeted: $ex"
else
rethrow(e)
end
end
closewrite(http)
closeread(http)
verbose == 1 && printlncompact(response)
verbose == 2 && println(response)
return response
end
function writebody(http::Stream, req::Request, body)
if req.body === body_is_a_stream
writebodystream(http, req, body)
closebody(http)
else
write(http, req.body)
end
req.txcount += 1
if isidempotent(req)
closewrite(http)
else
@debug 2 "🔒 $(req.method) non-idempotent, " *
"holding write lock: $(http.stream)"
# "A user agent SHOULD NOT pipeline requests after a
# non-idempotent method, until the final response
# status code for that method has been received"
# https://tools.ietf.org/html/rfc7230#section-6.3.2
end
end
function writebodystream(http, req, body)
for chunk in body
writechunk(http, req, chunk)
end
end
function writebodystream(http, req, body::IO)
req.body = body_was_streamed
write(http, body)
end
writechunk(http, req, body::IO) = writebodystream(http, req, body)
writechunk(http, req, body) = write(http, body)
function readbody(http::Stream, res::Response, response_stream)
if response_stream == nothing
res.body = read(http)
else
res.body = body_was_streamed
write(response_stream, http)
close(response_stream)
end
end
end # module StreamRequest