From f5cbaf1ca65d290319b9f716f71633f7f240559d Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 21 Oct 2015 13:54:38 +0300 Subject: [PATCH] add ability to configure timeout and other opts Sequel supports various vendor-specific and connection-pool-specific options that were not exposed to the plugin. This change introduces a dedicated new configuration option to set a timeout and also a catch-all configuration option for any other options that Sequel supports upon opening a new database connection. Fixes #62. --- lib/logstash/plugin_mixins/jdbc.rb | 33 +++++++++++++++++++++++++++++- spec/inputs/jdbc_spec.rb | 33 ++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index 272d35f..08f273a 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -57,6 +57,37 @@ def setup_jdbc_config # Connection pool configuration. # How often to validate a connection (in seconds) config :jdbc_validation_timeout, :validate => :number, :default => 3600 + + # Connection pool configuration. + # The amount of seconds to wait to acquire a connection before raising a PoolTimeoutError (default 5) + config :jdbc_pool_timeout, :validate => :number, :default => 5 + + # General/Vendor-specific Sequel configuration options. + # + # An example of an optional connection pool configuration + # max_connections - The maximum number of connections the connection pool + # + # examples of vendor-specific options can be found in this + # documentation page: https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc + config :sequel_opts, :validate => :hash, :default => {} + end + + private + def jdbc_connect + opts = { + :user => @jdbc_user, + :password => @jdbc_password, + :pool_timeout => @jdbc_pool_timeout + }.merge(@sequel_opts) + begin + Sequel.connect(@jdbc_connection_string, opts=opts) + rescue Sequel::PoolTimeout => e + @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded.") + raise e + rescue Sequel::Error => e + @logger.error("Unable to connect to database", :error_message => e.message) + raise e + end end public @@ -76,7 +107,7 @@ def prepare_jdbc_connection end raise LogStash::ConfigurationError, "#{e}. #{message}" end - @database = Sequel.connect(@jdbc_connection_string, :user=> @jdbc_user, :password=> @jdbc_password.nil? ? nil : @jdbc_password.value) + @database = jdbc_connect() @database.extension(:pagination) if @jdbc_validate_connection @database.extension(:connection_validator) diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index 9eaef34..49cff1f 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -310,4 +310,37 @@ expect { plugin.register }.to raise_error(LogStash::ConfigurationError) end end + + context "when timing out on connection" do + let(:settings) do + { + "statement" => "SELECT * FROM test_table", + "jdbc_pool_timeout" => 0, + "jdbc_connection_string" => 'mock://localhost:1527/db', + "sequel_opts" => { + "max_connections" => 1 + } + } + end + + it "should raise PoolTimeout error" do + plugin.register + db = plugin.instance_variable_get(:@database) + expect(db.pool.instance_variable_get(:@timeout)).to eq(0) + expect(db.pool.instance_variable_get(:@max_size)).to eq(1) + + q, q1 = Queue.new, Queue.new + t = Thread.new{db.pool.hold{|c| q1.push nil; q.pop}} + q1.pop + expect{db.pool.hold {|c|}}.to raise_error(Sequel::PoolTimeout) + q.push nil + t.join + end + + it "should log error message" do + allow(Sequel).to receive(:connect).and_raise(Sequel::PoolTimeout) + expect(plugin.logger).to receive(:error).with("Failed to connect to database. 0 second timeout exceeded.") + expect { plugin.register }.to raise_error(Sequel::PoolTimeout) + end + end end