From 0c172313c193415a259964711ac85231bc67a2f2 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Tue, 16 Jun 2020 18:45:00 +0200 Subject: [PATCH] Fix: properly pass (symbolized) sequel opts (#37) * [refactor] share driver jars loading logic * [refactor] also share load_driver between plugins * [fix] share and always symbolize sequel opts * need to :nailcare: true/false values as booleans --- CHANGELOG.md | 4 ++ lib/logstash/filters/jdbc_streaming.rb | 2 + lib/logstash/inputs/jdbc.rb | 2 + lib/logstash/plugin_mixins/jdbc/common.rb | 66 ++++++++++++++++++++ lib/logstash/plugin_mixins/jdbc/jdbc.rb | 63 +------------------ lib/logstash/plugin_mixins/jdbc_streaming.rb | 30 +-------- logstash-integration-jdbc.gemspec | 2 +- spec/inputs/jdbc_spec.rb | 59 ++++++++++++++++- 8 files changed, 137 insertions(+), 91 deletions(-) create mode 100644 lib/logstash/plugin_mixins/jdbc/common.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f43b72..97fcc96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 5.0.5 + - Fixed user sequel_opts not being passed down properly [#37](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/37) + - Refactored jdbc_streaming to share driver loading, so the fixes from the jdbc plugin also effect jdbc_streaming + ## 5.0.4 - Fixed issue where JDBC Drivers that don't correctly register with Java's DriverManager fail to load (such as Sybase) [#34](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/34) diff --git a/lib/logstash/filters/jdbc_streaming.rb b/lib/logstash/filters/jdbc_streaming.rb index 1960a21..90a4be7 100644 --- a/lib/logstash/filters/jdbc_streaming.rb +++ b/lib/logstash/filters/jdbc_streaming.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/filters/base" require "logstash/namespace" +require "logstash/plugin_mixins/jdbc/common" require "logstash/plugin_mixins/jdbc_streaming" require "logstash/plugin_mixins/jdbc_streaming/cache_payload" require "logstash/plugin_mixins/jdbc_streaming/statement_handler" @@ -46,6 +47,7 @@ # } # module LogStash module Filters class JdbcStreaming < LogStash::Filters::Base + include LogStash::PluginMixins::Jdbc::Common include LogStash::PluginMixins::JdbcStreaming config_name "jdbc_streaming" diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 6b981da..4b138c4 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" +require "logstash/plugin_mixins/jdbc/common" require "logstash/plugin_mixins/jdbc/jdbc" # this require_relative returns early unless the JRuby version is between 9.2.0.0 and 9.2.8.0 @@ -126,6 +127,7 @@ # --------------------------------------------------------------------------------------------------- # module LogStash module Inputs class Jdbc < LogStash::Inputs::Base + include LogStash::PluginMixins::Jdbc::Common include LogStash::PluginMixins::Jdbc::Jdbc config_name "jdbc" diff --git a/lib/logstash/plugin_mixins/jdbc/common.rb b/lib/logstash/plugin_mixins/jdbc/common.rb new file mode 100644 index 0000000..9c30e97 --- /dev/null +++ b/lib/logstash/plugin_mixins/jdbc/common.rb @@ -0,0 +1,66 @@ + +module LogStash module PluginMixins module Jdbc + module Common + + private + + def complete_sequel_opts(defaults = {}) + sequel_opts = @sequel_opts. + map { |key,val| [key.is_a?(String) ? key.to_sym : key, val] }. + map { |key,val| [key, val.eql?('true') ? true : (val.eql?('false') ? false : val)] } + sequel_opts = defaults.merge Hash[sequel_opts] + sequel_opts[:user] = @jdbc_user unless @jdbc_user.nil? || @jdbc_user.empty? + sequel_opts[:password] = @jdbc_password.value unless @jdbc_password.nil? + sequel_opts[:driver] = @driver_impl # Sequel uses this as a fallback, if given URI doesn't auto-load the driver correctly + sequel_opts + end + + def load_driver + return @driver_impl if @driver_impl ||= nil + + require "java" + require "sequel" + require "sequel/adapters/jdbc" + + load_driver_jars + begin + @driver_impl = Sequel::JDBC.load_driver(@jdbc_driver_class) + rescue Sequel::AdapterNotFound => e # Sequel::AdapterNotFound, "#{@jdbc_driver_class} not loaded" + # fix this !!! + message = if jdbc_driver_library_set? + "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?" + else + ":jdbc_driver_library is not set, are you sure you included " + + "the proper driver client libraries in your classpath?" + end + raise LogStash::PluginLoadingError, "#{e}. #{message}" + end + end + + def load_driver_jars + if jdbc_driver_library_set? + @jdbc_driver_library.split(",").each do |driver_jar| + @logger.debug("loading #{driver_jar}") + # load 'driver.jar' is different than load 'some.rb' as it only causes the file to be added to + # JRuby's class-loader lookup (class) path - won't raise a LoadError when file is not readable + unless FileTest.readable?(driver_jar) + raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, " + + "file not readable (please check user and group permissions for the path)" + end + begin + require driver_jar + rescue LoadError => e + raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e.message}" + rescue StandardError => e + raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e}" + end + end + end + end + + def jdbc_driver_library_set? + !@jdbc_driver_library.nil? && !@jdbc_driver_library.empty? + end + + end +end end end diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb index 93ec9ec..e7517b5 100644 --- a/lib/logstash/plugin_mixins/jdbc/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -106,18 +106,12 @@ def setup_jdbc_config private def jdbc_connect - opts = { - :user => @jdbc_user, - :password => @jdbc_password.nil? ? nil : @jdbc_password.value, - :pool_timeout => @jdbc_pool_timeout, - :driver => @driver_impl, # Sequel uses this as a fallback, if given URI doesn't auto-load the driver correctly - :keep_reference => false - }.merge(@sequel_opts) + sequel_opts = complete_sequel_opts(:pool_timeout => @jdbc_pool_timeout, :keep_reference => false) retry_attempts = @connection_retry_attempts loop do retry_attempts -= 1 begin - return Sequel.connect(@jdbc_connection_string, opts) + return Sequel.connect(@jdbc_connection_string, sequel_opts) rescue Sequel::PoolTimeout => e if retry_attempts <= 0 @logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Tried #{@connection_retry_attempts} times.") @@ -128,7 +122,7 @@ def jdbc_connect # rescue Java::JavaSql::SQLException, ::Sequel::Error => e rescue ::Sequel::Error => e if retry_attempts <= 0 - @logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message, ) + @logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message) raise e else @logger.error("Unable to connect to database. Trying again", :error_message => e.message) @@ -138,56 +132,6 @@ def jdbc_connect end end - private - - def load_driver - if @drivers_loaded.false? - require "java" - require "sequel" - require "sequel/adapters/jdbc" - - load_driver_jars - begin - @driver_impl = Sequel::JDBC.load_driver(@jdbc_driver_class) - rescue Sequel::AdapterNotFound => e # Sequel::AdapterNotFound, "#{@jdbc_driver_class} not loaded" - # fix this !!! - message = if jdbc_driver_library_set? - "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?" - else - ":jdbc_driver_library is not set, are you sure you included " + - "the proper driver client libraries in your classpath?" - end - raise LogStash::PluginLoadingError, "#{e}. #{message}" - end - @drivers_loaded.make_true - end - end - - def load_driver_jars - if jdbc_driver_library_set? - @jdbc_driver_library.split(",").each do |driver_jar| - @logger.debug("loading #{driver_jar}") - # load 'driver.jar' is different than load 'some.rb' as it only causes the file to be added to - # JRuby's class-loader lookup (class) path - won't raise a LoadError when file is not readable - unless FileTest.readable?(driver_jar) - raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, " + - "file not readable (please check user and group permissions for the path)" - end - begin - require driver_jar - rescue LoadError => e - raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e.message}" - rescue StandardError => e - raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e}" - end - end - end - end - - def jdbc_driver_library_set? - !@jdbc_driver_library.nil? && !@jdbc_driver_library.empty? - end - def open_jdbc_connection # at this point driver is already loaded Sequel.application_timezone = @plugin_timezone.to_sym @@ -230,7 +174,6 @@ def open_jdbc_connection public def prepare_jdbc_connection @connection_lock = ReentrantLock.new - @drivers_loaded = Concurrent::AtomicBoolean.new end public diff --git a/lib/logstash/plugin_mixins/jdbc_streaming.rb b/lib/logstash/plugin_mixins/jdbc_streaming.rb index 601cea9..8788625 100644 --- a/lib/logstash/plugin_mixins/jdbc_streaming.rb +++ b/lib/logstash/plugin_mixins/jdbc_streaming.rb @@ -55,37 +55,11 @@ def setup_jdbc_config config :jdbc_validation_timeout, :validate => :number, :default => 3600 end - private - - def load_driver_jars - unless @jdbc_driver_library.nil? || @jdbc_driver_library.empty? - @jdbc_driver_library.split(",").each do |driver_jar| - begin - @logger.debug("loading #{driver_jar}") - # Use https://github.com/jruby/jruby/wiki/CallingJavaFromJRuby#from-jar-files to make classes from jar - # available - require driver_jar - rescue LoadError => e - raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e.message}" - end - end - end - end - public def prepare_jdbc_connection - require "sequel" - require "sequel/adapters/jdbc" - require "java" - - load_driver_jars - - @sequel_opts_symbols = @sequel_opts.inject({}) {|hash, (k,v)| hash[k.to_sym] = v; hash} - @sequel_opts_symbols[:user] = @jdbc_user unless @jdbc_user.nil? || @jdbc_user.empty? - @sequel_opts_symbols[:password] = @jdbc_password.value unless @jdbc_password.nil? + load_driver - @sequel_opts_symbols[:driver] = Sequel::JDBC.load_driver(@jdbc_driver_class) - @database = Sequel.connect(@jdbc_connection_string, @sequel_opts_symbols) + @database = Sequel.connect(@jdbc_connection_string, complete_sequel_opts) if @jdbc_validate_connection @database.extension(:connection_validator) @database.pool.connection_validation_timeout = @jdbc_validation_timeout diff --git a/logstash-integration-jdbc.gemspec b/logstash-integration-jdbc.gemspec index dd74dd9..6747821 100755 --- a/logstash-integration-jdbc.gemspec +++ b/logstash-integration-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-jdbc' - s.version = '5.0.4' + s.version = '5.0.5' s.licenses = ['Apache License (2.0)'] s.summary = "Integration with JDBC - input and filter plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index 7edd95c..7147256 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -13,9 +13,13 @@ # We do not need to set TZ env var anymore because we can have 'Sequel.application_timezone' set to utc by default now. describe LogStash::Inputs::Jdbc do + let(:connection_string) { "jdbc:derby:memory:testdb;create=true" } let(:mixin_settings) do - { "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", - "jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true"} + { + "jdbc_user" => ENV['USER'], + "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver", + "jdbc_connection_string" => connection_string + } end let(:settings) { {} } let(:plugin) { LogStash::Inputs::Jdbc.new(mixin_settings.merge(settings)) } @@ -112,6 +116,57 @@ end end + context "when sequel opts has user option" do + let(:settings) do + { + "jdbc_user" => 'system', + "statement" => "SELECT * from test_table", + "sequel_opts" => { "user" => 'from-opts' } + } + end + + before do + plugin.register + end + + after do + plugin.stop + end + + it "should honor set jdbc-user when connecting" do + expect( Sequel ).to receive(:connect).with connection_string, hash_including(:user=>"system") + plugin.send(:jdbc_connect) + end + end + + context "with sequel opts" do + let(:settings) do + { + "jdbc_user" => 'system', + "statement" => "SELECT * from test_table", + "sequel_opts" => { + "truthy" => 'true', + "falsey" => 'false', + "foo" => 'bar', + "jdbc_properties" => { "some" => 'true' } + } + } + end + + before do + plugin.register + end + + after do + plugin.stop + end + + it "should symbolize keys" do + expect( Sequel ).to receive(:connect).with connection_string, + hash_including(:truthy => true, :falsey => false, :foo => 'bar', :jdbc_properties => { 'some' => 'true' }) + plugin.send(:jdbc_connect) + end + end context "when neither statement and statement_filepath arguments are passed" do it "should fail to register" do