Skip to content

Commit

Permalink
Fix: properly pass (symbolized) sequel opts (#37)
Browse files Browse the repository at this point in the history
* [refactor] share driver jars loading logic
* [refactor] also share load_driver between plugins
* [fix] share and always symbolize sequel opts
* need to :nailcare: true/false values as booleans
  • Loading branch information
kares authored Jun 16, 2020
1 parent 94821e5 commit 0c17231
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 91 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 5.0.5
- Fixed user sequel_opts not being passed down properly [#37](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/37)
- Refactored jdbc_streaming to share driver loading, so the fixes from the jdbc plugin also effect jdbc_streaming

## 5.0.4
- Fixed issue where JDBC Drivers that don't correctly register with Java's DriverManager fail to load (such as Sybase) [#34](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/34)

Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/filters/jdbc_streaming.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/plugin_mixins/jdbc/common"
require "logstash/plugin_mixins/jdbc_streaming"
require "logstash/plugin_mixins/jdbc_streaming/cache_payload"
require "logstash/plugin_mixins/jdbc_streaming/statement_handler"
Expand Down Expand Up @@ -46,6 +47,7 @@
# }
#
module LogStash module Filters class JdbcStreaming < LogStash::Filters::Base
include LogStash::PluginMixins::Jdbc::Common
include LogStash::PluginMixins::JdbcStreaming

config_name "jdbc_streaming"
Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/jdbc/common"
require "logstash/plugin_mixins/jdbc/jdbc"

# this require_relative returns early unless the JRuby version is between 9.2.0.0 and 9.2.8.0
Expand Down Expand Up @@ -126,6 +127,7 @@
# ---------------------------------------------------------------------------------------------------
#
module LogStash module Inputs class Jdbc < LogStash::Inputs::Base
include LogStash::PluginMixins::Jdbc::Common
include LogStash::PluginMixins::Jdbc::Jdbc
config_name "jdbc"

Expand Down
66 changes: 66 additions & 0 deletions lib/logstash/plugin_mixins/jdbc/common.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@

module LogStash module PluginMixins module Jdbc
module Common

private

def complete_sequel_opts(defaults = {})
sequel_opts = @sequel_opts.
map { |key,val| [key.is_a?(String) ? key.to_sym : key, val] }.
map { |key,val| [key, val.eql?('true') ? true : (val.eql?('false') ? false : val)] }
sequel_opts = defaults.merge Hash[sequel_opts]
sequel_opts[:user] = @jdbc_user unless @jdbc_user.nil? || @jdbc_user.empty?
sequel_opts[:password] = @jdbc_password.value unless @jdbc_password.nil?
sequel_opts[:driver] = @driver_impl # Sequel uses this as a fallback, if given URI doesn't auto-load the driver correctly
sequel_opts
end

def load_driver
return @driver_impl if @driver_impl ||= nil

require "java"
require "sequel"
require "sequel/adapters/jdbc"

load_driver_jars
begin
@driver_impl = Sequel::JDBC.load_driver(@jdbc_driver_class)
rescue Sequel::AdapterNotFound => e # Sequel::AdapterNotFound, "#{@jdbc_driver_class} not loaded"
# fix this !!!
message = if jdbc_driver_library_set?
"Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
else
":jdbc_driver_library is not set, are you sure you included " +
"the proper driver client libraries in your classpath?"
end
raise LogStash::PluginLoadingError, "#{e}. #{message}"
end
end

def load_driver_jars
if jdbc_driver_library_set?
@jdbc_driver_library.split(",").each do |driver_jar|
@logger.debug("loading #{driver_jar}")
# load 'driver.jar' is different than load 'some.rb' as it only causes the file to be added to
# JRuby's class-loader lookup (class) path - won't raise a LoadError when file is not readable
unless FileTest.readable?(driver_jar)
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, " +
"file not readable (please check user and group permissions for the path)"
end
begin
require driver_jar
rescue LoadError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e.message}"
rescue StandardError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e}"
end
end
end
end

def jdbc_driver_library_set?
!@jdbc_driver_library.nil? && !@jdbc_driver_library.empty?
end

end
end end end
63 changes: 3 additions & 60 deletions lib/logstash/plugin_mixins/jdbc/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,12 @@ def setup_jdbc_config

private
def jdbc_connect
opts = {
:user => @jdbc_user,
:password => @jdbc_password.nil? ? nil : @jdbc_password.value,
:pool_timeout => @jdbc_pool_timeout,
:driver => @driver_impl, # Sequel uses this as a fallback, if given URI doesn't auto-load the driver correctly
:keep_reference => false
}.merge(@sequel_opts)
sequel_opts = complete_sequel_opts(:pool_timeout => @jdbc_pool_timeout, :keep_reference => false)
retry_attempts = @connection_retry_attempts
loop do
retry_attempts -= 1
begin
return Sequel.connect(@jdbc_connection_string, opts)
return Sequel.connect(@jdbc_connection_string, sequel_opts)
rescue Sequel::PoolTimeout => e
if retry_attempts <= 0
@logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Tried #{@connection_retry_attempts} times.")
Expand All @@ -128,7 +122,7 @@ def jdbc_connect
# rescue Java::JavaSql::SQLException, ::Sequel::Error => e
rescue ::Sequel::Error => e
if retry_attempts <= 0
@logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message, )
@logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message)
raise e
else
@logger.error("Unable to connect to database. Trying again", :error_message => e.message)
Expand All @@ -138,56 +132,6 @@ def jdbc_connect
end
end

private

def load_driver
if @drivers_loaded.false?
require "java"
require "sequel"
require "sequel/adapters/jdbc"

load_driver_jars
begin
@driver_impl = Sequel::JDBC.load_driver(@jdbc_driver_class)
rescue Sequel::AdapterNotFound => e # Sequel::AdapterNotFound, "#{@jdbc_driver_class} not loaded"
# fix this !!!
message = if jdbc_driver_library_set?
"Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
else
":jdbc_driver_library is not set, are you sure you included " +
"the proper driver client libraries in your classpath?"
end
raise LogStash::PluginLoadingError, "#{e}. #{message}"
end
@drivers_loaded.make_true
end
end

def load_driver_jars
if jdbc_driver_library_set?
@jdbc_driver_library.split(",").each do |driver_jar|
@logger.debug("loading #{driver_jar}")
# load 'driver.jar' is different than load 'some.rb' as it only causes the file to be added to
# JRuby's class-loader lookup (class) path - won't raise a LoadError when file is not readable
unless FileTest.readable?(driver_jar)
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, " +
"file not readable (please check user and group permissions for the path)"
end
begin
require driver_jar
rescue LoadError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e.message}"
rescue StandardError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e}"
end
end
end
end

def jdbc_driver_library_set?
!@jdbc_driver_library.nil? && !@jdbc_driver_library.empty?
end

def open_jdbc_connection
# at this point driver is already loaded
Sequel.application_timezone = @plugin_timezone.to_sym
Expand Down Expand Up @@ -230,7 +174,6 @@ def open_jdbc_connection
public
def prepare_jdbc_connection
@connection_lock = ReentrantLock.new
@drivers_loaded = Concurrent::AtomicBoolean.new
end

public
Expand Down
30 changes: 2 additions & 28 deletions lib/logstash/plugin_mixins/jdbc_streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,11 @@ def setup_jdbc_config
config :jdbc_validation_timeout, :validate => :number, :default => 3600
end

private

def load_driver_jars
unless @jdbc_driver_library.nil? || @jdbc_driver_library.empty?
@jdbc_driver_library.split(",").each do |driver_jar|
begin
@logger.debug("loading #{driver_jar}")
# Use https://github.com/jruby/jruby/wiki/CallingJavaFromJRuby#from-jar-files to make classes from jar
# available
require driver_jar
rescue LoadError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e.message}"
end
end
end
end

public
def prepare_jdbc_connection
require "sequel"
require "sequel/adapters/jdbc"
require "java"

load_driver_jars

@sequel_opts_symbols = @sequel_opts.inject({}) {|hash, (k,v)| hash[k.to_sym] = v; hash}
@sequel_opts_symbols[:user] = @jdbc_user unless @jdbc_user.nil? || @jdbc_user.empty?
@sequel_opts_symbols[:password] = @jdbc_password.value unless @jdbc_password.nil?
load_driver

@sequel_opts_symbols[:driver] = Sequel::JDBC.load_driver(@jdbc_driver_class)
@database = Sequel.connect(@jdbc_connection_string, @sequel_opts_symbols)
@database = Sequel.connect(@jdbc_connection_string, complete_sequel_opts)
if @jdbc_validate_connection
@database.extension(:connection_validator)
@database.pool.connection_validation_timeout = @jdbc_validation_timeout
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-jdbc'
s.version = '5.0.4'
s.version = '5.0.5'
s.licenses = ['Apache License (2.0)']
s.summary = "Integration with JDBC - input and filter plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
59 changes: 57 additions & 2 deletions spec/inputs/jdbc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
# We do not need to set TZ env var anymore because we can have 'Sequel.application_timezone' set to utc by default now.

describe LogStash::Inputs::Jdbc do
let(:connection_string) { "jdbc:derby:memory:testdb;create=true" }
let(:mixin_settings) do
{ "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true"}
{
"jdbc_user" => ENV['USER'],
"jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"jdbc_connection_string" => connection_string
}
end
let(:settings) { {} }
let(:plugin) { LogStash::Inputs::Jdbc.new(mixin_settings.merge(settings)) }
Expand Down Expand Up @@ -112,6 +116,57 @@
end
end

context "when sequel opts has user option" do
let(:settings) do
{
"jdbc_user" => 'system',
"statement" => "SELECT * from test_table",
"sequel_opts" => { "user" => 'from-opts' }
}
end

before do
plugin.register
end

after do
plugin.stop
end

it "should honor set jdbc-user when connecting" do
expect( Sequel ).to receive(:connect).with connection_string, hash_including(:user=>"system")
plugin.send(:jdbc_connect)
end
end

context "with sequel opts" do
let(:settings) do
{
"jdbc_user" => 'system',
"statement" => "SELECT * from test_table",
"sequel_opts" => {
"truthy" => 'true',
"falsey" => 'false',
"foo" => 'bar',
"jdbc_properties" => { "some" => 'true' }
}
}
end

before do
plugin.register
end

after do
plugin.stop
end

it "should symbolize keys" do
expect( Sequel ).to receive(:connect).with connection_string,
hash_including(:truthy => true, :falsey => false, :foo => 'bar', :jdbc_properties => { 'some' => 'true' })
plugin.send(:jdbc_connect)
end
end

context "when neither statement and statement_filepath arguments are passed" do
it "should fail to register" do
Expand Down

0 comments on commit 0c17231

Please sign in to comment.