From d350f677b94ec2fd44e57e305aaf0330b6585a8d Mon Sep 17 00:00:00 2001 From: Brandur Date: Thu, 19 Sep 2019 16:24:37 -0700 Subject: [PATCH] Introduce system for garbage collecting connection managers Introduces a system for garbage collecting connection managers in an attempt to solve #850. Previously, the number of connection managers (and by extension the number of connections that they were holding) would stay stable if a program used a stable number of threads. However, if threads were used disposably, the number of active connection managers out there could continue to grow unchecked, and each of those could be holding one or more dead connections which are no longer open, but still holding a file descriptor waiting to be unlinked in disposed of by Ruby's GC. This PR introduces a connection manager garbage collector that runs periodically whenever a new connection manager is created. Connection managers get a timestamp to indicate when they were last used, and the GC runs through each one and prunes any that haven't seen use within a certain threshold (currently, 120 seconds). This should have the effect of removing connection managers as they're not needed anymore, and thus resolving the socket leakage seen in #850. I had to make a couple implementation tweaks to get this working correctly. Namely: * The `StripeClient` class now tracks thread contexts instead of connection managers. This is so that when we're disposing of a connection manager, we can set `default_connection_manager` on its parent thread context to `nil` so that it's not still tracking a connection manager that we're trying to get rid of. * `StripeClient` instances can still be instantiated as before, but no longer internalize a reference to their own connection manager, instead falling back to the one in the current thread context. The rationale is that when trying to dispose of a connection manager, we'd also have to dispose of its reference in any outstanding `StripeClient` instances that might still be tracking it, and that starts to get a little unwieldy. I've left `#connection_manager` in place for backwards compatibility, but marked it as deprecated. --- lib/stripe/connection_manager.rb | 53 +++++++----- lib/stripe/stripe_client.rb | 108 ++++++++++++++++++++----- test/stripe/connection_manager_test.rb | 28 +++++++ test/stripe/stripe_client_test.rb | 84 +++++++++++++++---- 4 files changed, 219 insertions(+), 54 deletions(-) diff --git a/lib/stripe/connection_manager.rb b/lib/stripe/connection_manager.rb index 237d5018d..c7dfff905 100644 --- a/lib/stripe/connection_manager.rb +++ b/lib/stripe/connection_manager.rb @@ -9,24 +9,35 @@ module Stripe # # Note that this class in itself is *not* thread safe. We expect it to be # instantiated once per thread. - # - # Note also that this class doesn't currently clean up after itself because - # it expects to only ever have a few connections (unless `.clear` is called - # manually). It'd be possible to tank memory by constantly changing the value - # of `Stripe.api_base` or the like. A possible improvement might be to detect - # and prune old connections whenever a request is executed. class ConnectionManager + # Timestamp indicating when the connection manager last made a request. + # This is used by `StripeClient` to determine whether a connection manager + # should be garbage collected or not. + attr_reader :last_used + def initialize @active_connections = {} + @last_used = Time.now + + # A connection manager may be accessed across threads as one thread makes + # requests on it while another is trying to clear it (either because it's + # trying to garbage collect the manager or trying to clear it because a + # configuration setting has changed). That's probably thread-safe already + # because of Ruby's GIL, but just in case the library's running on JRuby + # or the like, use a mutex to synchronize access in this connection + # manager. + @mutex = Mutex.new end # Finishes any active connections by closing their TCP connection and # clears them from internal tracking. def clear - @active_connections.each do |_, connection| - connection.finish + @mutex.synchronize do + @active_connections.each do |_, connection| + connection.finish + end + @active_connections = {} end - @active_connections = {} end # Gets a connection for a given URI. This is for internal use only as it's @@ -35,17 +46,19 @@ def clear # # `uri` is expected to be a string. def connection_for(uri) - u = URI.parse(uri) - connection = @active_connections[[u.host, u.port]] + @mutex.synchronize do + u = URI.parse(uri) + connection = @active_connections[[u.host, u.port]] - if connection.nil? - connection = create_connection(u) - connection.start + if connection.nil? + connection = create_connection(u) + connection.start - @active_connections[[u.host, u.port]] = connection - end + @active_connections[[u.host, u.port]] = connection + end - connection + connection + end end # Executes an HTTP request to the given URI with the given method. Also @@ -65,6 +78,8 @@ def execute_request(method, uri, body: nil, headers: nil, query: nil) raise ArgumentError, "query should be a string" \ if query && !query.is_a?(String) + @last_used = Time.now + connection = connection_for(uri) u = URI.parse(uri) @@ -74,7 +89,9 @@ def execute_request(method, uri, body: nil, headers: nil, query: nil) u.path end - connection.send_request(method.to_s.upcase, path, body, headers) + @mutex.synchronize do + connection.send_request(method.to_s.upcase, path, body, headers) + end end # diff --git a/lib/stripe/stripe_client.rb b/lib/stripe/stripe_client.rb index a373f395c..8a68e6459 100644 --- a/lib/stripe/stripe_client.rb +++ b/lib/stripe/stripe_client.rb @@ -5,18 +5,17 @@ module Stripe # recover both a resource a call returns as well as a response object that # contains information on the HTTP call. class StripeClient - # A set of all known connection managers across all threads and a mutex to + # A set of all known thread contexts across all threads and a mutex to # synchronize global access to them. - @all_connection_managers = [] - @all_connection_managers_mutex = Mutex.new + @thread_contexts_with_connection_managers = [] + @thread_contexts_with_connection_managers_mutex = Mutex.new + @last_connection_manager_gc = Time.now - attr_accessor :connection_manager - - # Initializes a new `StripeClient`. Expects a `ConnectionManager` object, - # and uses a default connection manager unless one is passed. - def initialize(connection_manager = nil) - self.connection_manager = connection_manager || - self.class.default_connection_manager + # Initializes a new `StripeClient`. + # + # Takes a connection manager object for backwards compatibility only, and + # that use is DEPRECATED. + def initialize(_connection_manager = nil) @system_profiler = SystemProfiler.new @last_request_metrics = nil end @@ -42,17 +41,24 @@ def self.clear_all_connection_managers # Just a quick path for when configuration is being set for the first # time before any connections have been opened. There is technically some # potential for thread raciness here, but not in a practical sense. - return if @all_connection_managers.empty? - - @all_connection_managers_mutex.synchronize do - @all_connection_managers.each(&:clear) + return if @thread_contexts_with_connection_managers.empty? + + @thread_contexts_with_connection_managers_mutex.synchronize do + @thread_contexts_with_connection_managers.each do |thread_context| + # Note that the thread context itself is not destroyed, but we clear + # its connection manager and remove our reference to it. If it ever + # makes a new request we'll give it a new connection manager and + # it'll go back into `@thread_contexts_with_connection_managers`. + thread_context.default_connection_manager.clear + thread_context.default_connection_manager = nil + end + @thread_contexts_with_connection_managers.clear end end # A default client for the current thread. def self.default_client - current_thread_context.default_client ||= - StripeClient.new(default_connection_manager) + current_thread_context.default_client ||= StripeClient.new end # A default connection manager for the current thread. @@ -60,8 +66,9 @@ def self.default_connection_manager current_thread_context.default_connection_manager ||= begin connection_manager = ConnectionManager.new - @all_connection_managers_mutex.synchronize do - @all_connection_managers << connection_manager + @thread_contexts_with_connection_managers_mutex.synchronize do + maybe_gc_connection_managers + @thread_contexts_with_connection_managers << current_thread_context end connection_manager @@ -131,6 +138,15 @@ def self.sleep_time(num_retries) sleep_seconds end + # Gets the connection manager in use for the current `StripeClient`. + # + # This method is DEPRECATED and for backwards compatibility only. + def connection_manager + self.class.default_connection_manager + end + extend Gem::Deprecate + deprecate :connection_manager, :none, 2020, 9 + # Executes the API call within the given block. Usage looks like: # # client = StripeClient.new @@ -207,10 +223,10 @@ def execute_request(method, path, context.query = query http_resp = execute_request_with_rescues(method, api_base, context) do - connection_manager.execute_request(method, url, - body: body, - headers: headers, - query: query) + self.class.default_connection_manager.execute_request(method, url, + body: body, + headers: headers, + query: query) end begin @@ -233,6 +249,14 @@ def execute_request(method, path, # private # + # How often to check (in seconds) for connection managers that haven't been + # used in a long time and which should be garbage collected. + CONNECTION_MANAGER_GC_PERIOD = 60 + + # Time (in seconds) that a connection manager has not been used before it's + # eligible for garbage collection. + CONNECTION_MANAGER_GC_LAST_USED_EXPIRY = 120 + ERROR_MESSAGE_CONNECTION = "Unexpected error communicating when trying to connect to " \ "Stripe (%s). You may be seeing this message because your DNS is not " \ @@ -320,6 +344,46 @@ def self.current_thread_context Thread.current[:stripe_client__internal_use_only] ||= ThreadContext.new end + # Garbage collects connection managers that haven't been used in some time, + # with the idea being that we want to remove old connection managers that + # belong to dead threads and the like. + # + # Prefixed with `maybe_` because garbage collection will only run + # periodically so that we're not constantly engaged in busy work. If + # connection managers live a little passed their useful age it's not + # harmful, so it's not necessary to get them right away. + # + # For testability, returns `nil` if it didn't run and the number of + # connection managers that were garbage collected otherwise. + # + # IMPORTANT: This method is not thread-safe and expects to be called inside + # a lock on `@thread_contexts_with_connection_managers_mutex`. + # + # For internal use only. Does not provide a stable API and may be broken + # with future non-major changes. + def self.maybe_gc_connection_managers + if @last_connection_manager_gc + CONNECTION_MANAGER_GC_PERIOD > Time.now + return nil + end + + last_used_threshold = Time.now - CONNECTION_MANAGER_GC_LAST_USED_EXPIRY + + pruned_thread_contexts = [] + @thread_contexts_with_connection_managers.each do |thread_context| + connection_manager = thread_context.default_connection_manager + next if connection_manager.last_used > last_used_threshold + + connection_manager.clear + thread_context.default_connection_manager = nil + pruned_thread_contexts << thread_context + end + + @thread_contexts_with_connection_managers -= pruned_thread_contexts + @last_connection_manager_gc = Time.now + + pruned_thread_contexts.count + end + private def api_url(url = "", api_base = nil) (api_base || Stripe.api_base) + url end diff --git a/test/stripe/connection_manager_test.rb b/test/stripe/connection_manager_test.rb index c74af59e3..13ac85b2b 100644 --- a/test/stripe/connection_manager_test.rb +++ b/test/stripe/connection_manager_test.rb @@ -8,6 +8,15 @@ class ConnectionManagerTest < Test::Unit::TestCase @manager = Stripe::ConnectionManager.new end + context "#initialize" do + should "set #last_used to current time" do + t = Time.new(2019) + Timecop.freeze(t) do + assert_equal t, Stripe::ConnectionManager.new.last_used + end + end + end + context "#clear" do should "clear any active connections" do stub_request(:post, "#{Stripe.api_base}/path") @@ -133,6 +142,25 @@ class ConnectionManagerTest < Test::Unit::TestCase end assert_equal e.message, "query should be a string" end + + should "set #last_used to current time" do + stub_request(:post, "#{Stripe.api_base}/path") + .to_return(body: JSON.generate(object: "account")) + + t = Time.new(2019) + + # Make sure the connection manager is initialized at a different time + # than the one we're going to measure at because `#last_used` is also + # set by the constructor. + manager = Timecop.freeze(t) do + Stripe::ConnectionManager.new + end + + Timecop.freeze(t + 1) do + manager.execute_request(:post, "#{Stripe.api_base}/path") + assert_equal t + 1, manager.last_used + end + end end end end diff --git a/test/stripe/stripe_client_test.rb b/test/stripe/stripe_client_test.rb index a951a5c72..202951648 100644 --- a/test/stripe/stripe_client_test.rb +++ b/test/stripe/stripe_client_test.rb @@ -17,6 +17,60 @@ class StripeClientTest < Test::Unit::TestCase end end + context ".maybe_gc_connection_managers" do + should "garbage collect old connection managers when appropriate" do + stub_request(:post, "#{Stripe.api_base}/v1/path") + .to_return(body: JSON.generate(object: "account")) + + # Make sure we start with a blank slate (state may have been left in + # place by other tests). + StripeClient.clear_all_connection_managers + + # Establish a base time. + t = Time.new(2019) + + # And pretend that `StripeClient` was just initialized for the first + # time. (Don't access instance variables like this, but it's tricky to + # test properly otherwise.) + StripeClient.instance_variable_set(:@last_connection_manager_gc, t) + + Timecop.freeze(t) do + # Execute an initial request to ensure that a connection manager was + # created. + client = StripeClient.new + client.execute_request(:post, "/v1/path") + + # The GC shouldn't run yet (a `nil` return indicates that GC didn't run). + assert_equal nil, StripeClient.maybe_gc_connection_managers + end + + # Move time to just *before* garbage collection is eligible to run. + # Nothing should happen. + Timecop.freeze(t + StripeClient::CONNECTION_MANAGER_GC_PERIOD - 1) do + assert_equal nil, StripeClient.maybe_gc_connection_managers + end + + # Move time to just *after* garbage collection is eligible to run. + # Garbage collection will run, but because the connection manager is + # not passed its expiry age, it will not be collected. Zero is returned + # to indicate so. + Timecop.freeze(t + StripeClient::CONNECTION_MANAGER_GC_PERIOD + 1) do + assert_equal 0, StripeClient.maybe_gc_connection_managers + end + + # Move us far enough into the future that we're passed the horizons for + # both a GC run as well as well as the expiry age of a connection + # manager. That means the GC will run and collect the connection + # manager that we created above. + Timecop.freeze(t + StripeClient::CONNECTION_MANAGER_GC_LAST_USED_EXPIRY + 1) do + assert_equal 1, StripeClient.maybe_gc_connection_managers + + # And as an additional check, the connection manager of the current thread context should have been set to `nil` as it was GCed. + assert_nil StripeClient.current_thread_context.default_connection_manager + end + end + end + context ".clear_all_connection_managers" do should "clear connection managers across all threads" do stub_request(:post, "#{Stripe.api_base}/path") @@ -199,20 +253,6 @@ class StripeClientTest < Test::Unit::TestCase end end - context "#initialize" do - should "set Stripe.default_connection_manager" do - client = StripeClient.new - assert_equal StripeClient.default_connection_manager, - client.connection_manager - end - - should "set a different connection if one was specified" do - connection_manager = ConnectionManager.new - client = StripeClient.new(connection_manager) - assert_equal connection_manager, client.connection_manager - end - end - context "#execute_request" do context "headers" do should "support literal headers" do @@ -842,6 +882,22 @@ class StripeClientTest < Test::Unit::TestCase end end + context "#connection_manager" do + should "warn that #connection_manager is deprecated" do + old_stderr = $stderr + $stderr = StringIO.new + begin + client = StripeClient.new + client.connection_manager + message = "NOTE: Stripe::StripeClient#connection_manager is " \ + "deprecated" + assert_match Regexp.new(message), $stderr.string + ensure + $stderr = old_stderr + end + end + end + context "#request" do should "return a result and response object" do stub_request(:post, "#{Stripe.api_base}/v1/charges")