Skip to content

Commit

Permalink
Add support for making a request and receiving the response as a stre…
Browse files Browse the repository at this point in the history
…am. (#983)
  • Loading branch information
dcr-stripe authored Jun 24, 2021
1 parent 28e6d19 commit 59eb8d0
Show file tree
Hide file tree
Showing 9 changed files with 979 additions and 659 deletions.
37 changes: 35 additions & 2 deletions lib/stripe/api_operations/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@ module Request
module ClassMethods
def execute_resource_request(method, url,
params = {}, opts = {})
execute_resource_request_internal(
:execute_request, method, url, params, opts
)
end

def execute_resource_request_stream(method, url,
params = {}, opts = {},
&read_body_chunk_block)
execute_resource_request_internal(
:execute_request_stream,
method,
url,
params,
opts,
&read_body_chunk_block
)
end

private def execute_resource_request_internal(client_request_method_sym,
method, url,
params, opts,
&read_body_chunk_block)
params ||= {}

error_on_invalid_params(params)
Expand All @@ -22,10 +44,12 @@ def execute_resource_request(method, url,
client = headers.delete(:client)
# Assume all remaining opts must be headers

resp, opts[:api_key] = client.execute_request(
resp, opts[:api_key] = client.send(
client_request_method_sym,
method, url,
api_base: api_base, api_key: api_key,
headers: headers, params: params
headers: headers, params: params,
&read_body_chunk_block
)

# Hash#select returns an array before 1.9
Expand Down Expand Up @@ -89,6 +113,15 @@ def self.included(base)
self.class.execute_resource_request(method, url, params, opts)
end

protected def execute_resource_request_stream(method, url,
params = {}, opts = {},
&read_body_chunk_block)
opts = @opts.merge(Util.normalize_opts(opts))
self.class.execute_resource_request_stream(
method, url, params, opts, &read_body_chunk_block
)
end

# See notes on `alias` above.
alias request execute_resource_request
end
Expand Down
8 changes: 8 additions & 0 deletions lib/stripe/api_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,13 @@ def self.retrieve(id, opts = {})
Util.convert_to_stripe_object(resp.data, opts)
end
end

protected def request_stream(method:, path:, params:, opts: {},
&read_body_chunk_block)
resp, = execute_resource_request_stream(
method, path, params, opts, &read_body_chunk_block
)
resp
end
end
end
19 changes: 17 additions & 2 deletions lib/stripe/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def connection_for(uri)

# Executes an HTTP request to the given URI with the given method. Also
# allows a request body, headers, and query string to be specified.
def execute_request(method, uri, body: nil, headers: nil, query: nil)
def execute_request(method, uri, body: nil, headers: nil, query: nil,
&block)
# Perform some basic argument validation because it's easy to get
# confused between strings and hashes for things like body and query
# parameters.
Expand All @@ -92,8 +93,22 @@ def execute_request(method, uri, body: nil, headers: nil, query: nil)
u.path
end

method_name = method.to_s.upcase
has_response_body = method_name != "HEAD"
request = Net::HTTPGenericRequest.new(
method_name,
(body ? true : false),
has_response_body,
path,
headers
)

@mutex.synchronize do
connection.send_request(method.to_s.upcase, path, body, headers)
# The block parameter is special here. If a block is provided, the block
# is invoked with the Net::HTTPResponse. However, the body will not have
# been read yet in the block, and can be streamed by calling
# HTTPResponse#read_body.
connection.request(request, body, &block)
end
end

Expand Down
177 changes: 120 additions & 57 deletions lib/stripe/stripe_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -213,62 +213,9 @@ def request

def execute_request(method, path,
api_base: nil, api_key: nil, headers: {}, params: {})
raise ArgumentError, "method should be a symbol" \
unless method.is_a?(Symbol)
raise ArgumentError, "path should be a string" \
unless path.is_a?(String)

api_base ||= config.api_base
api_key ||= config.api_key
params = Util.objects_to_ids(params)

check_api_key!(api_key)

body_params = nil
query_params = nil
case method
when :get, :head, :delete
query_params = params
else
body_params = params
end

query_params, path = merge_query_params(query_params, path)

headers = request_headers(api_key, method)
.update(Util.normalize_headers(headers))
url = api_url(path, api_base)

# Merge given query parameters with any already encoded in the path.
query = query_params ? Util.encode_parameters(query_params) : nil

# Encoding body parameters is a little more complex because we may have
# to send a multipart-encoded body. `body_log` is produced separately as
# a log-friendly variant of the encoded form. File objects are displayed
# as such instead of as their file contents.
body, body_log =
body_params ? encode_body(body_params, headers) : [nil, nil]

# stores information on the request we're about to make so that we don't
# have to pass as many parameters around for logging.
context = RequestLogContext.new
context.account = headers["Stripe-Account"]
context.api_key = api_key
context.api_version = headers["Stripe-Version"]
context.body = body_log
context.idempotency_key = headers["Idempotency-Key"]
context.method = method
context.path = path
context.query = query

http_resp = execute_request_with_rescues(method, api_base, context) do
self.class
.default_connection_manager(config)
.execute_request(method, url,
body: body,
headers: headers,
query: query)
end
http_resp, api_key = execute_request_internal(
method, path, api_base, api_key, headers, params
)

begin
resp = StripeResponse.from_net_http(http_resp)
Expand All @@ -284,6 +231,38 @@ def execute_request(method, path,
[resp, api_key]
end

# Executes a request and returns the body as a stream instead of converting
# it to a StripeObject. This should be used for any request where we expect
# an arbitrary binary response.
#
# A `read_body_chunk` block can be passed, which will be called repeatedly
# with the body chunks read from the socket.
#
# If a block is passed, a StripeHeadersOnlyResponse is returned as the
# block is expected to do all the necessary body processing. If no block is
# passed, then a StripeStreamResponse is returned containing an IO stream
# with the response body.
def execute_request_stream(method, path,
api_base: nil, api_key: nil,
headers: {}, params: {},
&read_body_chunk_block)
unless block_given?
raise ArgumentError,
"execute_request_stream requires a read_body_chunk_block"
end

http_resp, api_key = execute_request_internal(
method, path, api_base, api_key, headers, params, &read_body_chunk_block
)

# When the read_body_chunk_block is given, we no longer have access to the
# response body at this point and so return a response object containing
# only the headers. This is because the body was consumed by the block.
resp = StripeHeadersOnlyResponse.from_net_http(http_resp)

[resp, api_key]
end

def store_last_response(object_id, resp)
return unless last_response_has_key?(object_id)

Expand Down Expand Up @@ -451,6 +430,83 @@ def self.maybe_gc_connection_managers
pruned_contexts.count
end

private def execute_request_internal(method, path,
api_base, api_key, headers, params,
&read_body_chunk_block)
raise ArgumentError, "method should be a symbol" \
unless method.is_a?(Symbol)
raise ArgumentError, "path should be a string" \
unless path.is_a?(String)

api_base ||= config.api_base
api_key ||= config.api_key
params = Util.objects_to_ids(params)

check_api_key!(api_key)

body_params = nil
query_params = nil
case method
when :get, :head, :delete
query_params = params
else
body_params = params
end

query_params, path = merge_query_params(query_params, path)

headers = request_headers(api_key, method)
.update(Util.normalize_headers(headers))
url = api_url(path, api_base)

# Merge given query parameters with any already encoded in the path.
query = query_params ? Util.encode_parameters(query_params) : nil

# Encoding body parameters is a little more complex because we may have
# to send a multipart-encoded body. `body_log` is produced separately as
# a log-friendly variant of the encoded form. File objects are displayed
# as such instead of as their file contents.
body, body_log =
body_params ? encode_body(body_params, headers) : [nil, nil]

# stores information on the request we're about to make so that we don't
# have to pass as many parameters around for logging.
context = RequestLogContext.new
context.account = headers["Stripe-Account"]
context.api_key = api_key
context.api_version = headers["Stripe-Version"]
context.body = body_log
context.idempotency_key = headers["Idempotency-Key"]
context.method = method
context.path = path
context.query = query

# A block can be passed in to read the content directly from the response.
# We want to execute this block only when the response was actually
# successful. When it wasn't, we defer to the standard error handling as
# we have to read the body and parse the error JSON.
response_block =
if block_given?
lambda do |response|
unless should_handle_as_error(response.code.to_i)
response.read_body(&read_body_chunk_block)
end
end
end

http_resp = execute_request_with_rescues(method, api_base, context) do
self.class
.default_connection_manager(config)
.execute_request(method, url,
body: body,
headers: headers,
query: query,
&response_block)
end

[http_resp, api_key]
end

private def api_url(url = "", api_base = nil)
(api_base || config.api_base) + url
end
Expand Down Expand Up @@ -490,6 +546,7 @@ def self.maybe_gc_connection_managers
# that's more condusive to logging.
flattened_params =
flattened_params.map { |k, v| [k, v.is_a?(String) ? v : v.to_s] }.to_h

else
body = Util.encode_parameters(body_params)
end
Expand All @@ -503,6 +560,10 @@ def self.maybe_gc_connection_managers
[body, body_log]
end

private def should_handle_as_error(http_status)
http_status >= 400
end

private def execute_request_with_rescues(method, api_base, context)
num_retries = 0

Expand All @@ -520,7 +581,9 @@ def self.maybe_gc_connection_managers
http_status = resp.code.to_i
context = context.dup_from_response_headers(resp)

handle_error_response(resp, context) if http_status >= 400
if should_handle_as_error(http_status)
handle_error_response(resp, context)
end

log_response(context, request_start, http_status, resp.body)
notify_request_end(context, request_duration, http_status,
Expand Down
Loading

0 comments on commit 59eb8d0

Please sign in to comment.