Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: properly pass (symbolized) sequel opts #37

Merged
merged 5 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 5.0.5
- Fixed user sequel_opts not being passed down properly [#37](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/37)
- Refactored jdbc_streaming to share driver loading, so the fixes from the jdbc plugin also effect jdbc_streaming

## 5.0.4
- Fixed issue where JDBC Drivers that don't correctly register with Java's DriverManager fail to load (such as Sybase) [#34](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/34)

Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/filters/jdbc_streaming.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/plugin_mixins/jdbc/common"
require "logstash/plugin_mixins/jdbc_streaming"
require "logstash/plugin_mixins/jdbc_streaming/cache_payload"
require "logstash/plugin_mixins/jdbc_streaming/statement_handler"
Expand Down Expand Up @@ -46,6 +47,7 @@
# }
#
module LogStash module Filters class JdbcStreaming < LogStash::Filters::Base
include LogStash::PluginMixins::Jdbc::Common
include LogStash::PluginMixins::JdbcStreaming

config_name "jdbc_streaming"
Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/jdbc/common"
require "logstash/plugin_mixins/jdbc/jdbc"

# this require_relative returns early unless the JRuby version is between 9.2.0.0 and 9.2.8.0
Expand Down Expand Up @@ -126,6 +127,7 @@
# ---------------------------------------------------------------------------------------------------
#
module LogStash module Inputs class Jdbc < LogStash::Inputs::Base
include LogStash::PluginMixins::Jdbc::Common
include LogStash::PluginMixins::Jdbc::Jdbc
config_name "jdbc"

Expand Down
66 changes: 66 additions & 0 deletions lib/logstash/plugin_mixins/jdbc/common.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@

module LogStash module PluginMixins module Jdbc
module Common

private

def complete_sequel_opts(defaults = {})
sequel_opts = @sequel_opts.
map { |key,val| [key.is_a?(String) ? key.to_sym : key, val] }.
map { |key,val| [key, val.eql?('true') ? true : (val.eql?('false') ? false : val)] }
sequel_opts = defaults.merge Hash[sequel_opts]
sequel_opts[:user] = @jdbc_user unless @jdbc_user.nil? || @jdbc_user.empty?
sequel_opts[:password] = @jdbc_password.value unless @jdbc_password.nil?
sequel_opts[:driver] = @driver_impl # Sequel uses this as a fallback, if given URI doesn't auto-load the driver correctly
sequel_opts
end

def load_driver
return @driver_impl if @driver_impl ||= nil

require "java"
require "sequel"
require "sequel/adapters/jdbc"

load_driver_jars
begin
@driver_impl = Sequel::JDBC.load_driver(@jdbc_driver_class)
rescue Sequel::AdapterNotFound => e # Sequel::AdapterNotFound, "#{@jdbc_driver_class} not loaded"
# fix this !!!
message = if jdbc_driver_library_set?
"Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
else
":jdbc_driver_library is not set, are you sure you included " +
"the proper driver client libraries in your classpath?"
end
raise LogStash::PluginLoadingError, "#{e}. #{message}"
end
end

def load_driver_jars
if jdbc_driver_library_set?
@jdbc_driver_library.split(",").each do |driver_jar|
@logger.debug("loading #{driver_jar}")
# load 'driver.jar' is different than load 'some.rb' as it only causes the file to be added to
# JRuby's class-loader lookup (class) path - won't raise a LoadError when file is not readable
unless FileTest.readable?(driver_jar)
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, " +
"file not readable (please check user and group permissions for the path)"
end
begin
require driver_jar
rescue LoadError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e.message}"
rescue StandardError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e}"
end
end
end
end

def jdbc_driver_library_set?
!@jdbc_driver_library.nil? && !@jdbc_driver_library.empty?
end

end
end end end
63 changes: 3 additions & 60 deletions lib/logstash/plugin_mixins/jdbc/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,12 @@ def setup_jdbc_config

private
def jdbc_connect
opts = {
:user => @jdbc_user,
:password => @jdbc_password.nil? ? nil : @jdbc_password.value,
:pool_timeout => @jdbc_pool_timeout,
:driver => @driver_impl, # Sequel uses this as a fallback, if given URI doesn't auto-load the driver correctly
:keep_reference => false
}.merge(@sequel_opts)
sequel_opts = complete_sequel_opts(:pool_timeout => @jdbc_pool_timeout, :keep_reference => false)
retry_attempts = @connection_retry_attempts
loop do
retry_attempts -= 1
begin
return Sequel.connect(@jdbc_connection_string, opts)
return Sequel.connect(@jdbc_connection_string, sequel_opts)
rescue Sequel::PoolTimeout => e
if retry_attempts <= 0
@logger.error("Failed to connect to database. #{@jdbc_pool_timeout} second timeout exceeded. Tried #{@connection_retry_attempts} times.")
Expand All @@ -128,7 +122,7 @@ def jdbc_connect
# rescue Java::JavaSql::SQLException, ::Sequel::Error => e
rescue ::Sequel::Error => e
if retry_attempts <= 0
@logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message, )
@logger.error("Unable to connect to database. Tried #{@connection_retry_attempts} times", :error_message => e.message)
raise e
else
@logger.error("Unable to connect to database. Trying again", :error_message => e.message)
Expand All @@ -138,56 +132,6 @@ def jdbc_connect
end
end

private

def load_driver
if @drivers_loaded.false?
require "java"
require "sequel"
require "sequel/adapters/jdbc"

load_driver_jars
begin
@driver_impl = Sequel::JDBC.load_driver(@jdbc_driver_class)
rescue Sequel::AdapterNotFound => e # Sequel::AdapterNotFound, "#{@jdbc_driver_class} not loaded"
# fix this !!!
message = if jdbc_driver_library_set?
"Are you sure you've included the correct jdbc driver in :jdbc_driver_library?"
else
":jdbc_driver_library is not set, are you sure you included " +
"the proper driver client libraries in your classpath?"
end
raise LogStash::PluginLoadingError, "#{e}. #{message}"
end
@drivers_loaded.make_true
end
end

def load_driver_jars
if jdbc_driver_library_set?
@jdbc_driver_library.split(",").each do |driver_jar|
@logger.debug("loading #{driver_jar}")
# load 'driver.jar' is different than load 'some.rb' as it only causes the file to be added to
# JRuby's class-loader lookup (class) path - won't raise a LoadError when file is not readable
unless FileTest.readable?(driver_jar)
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, " +
"file not readable (please check user and group permissions for the path)"
end
begin
require driver_jar
rescue LoadError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e.message}"
rescue StandardError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e}"
end
end
end
end

def jdbc_driver_library_set?
!@jdbc_driver_library.nil? && !@jdbc_driver_library.empty?
end

def open_jdbc_connection
# at this point driver is already loaded
Sequel.application_timezone = @plugin_timezone.to_sym
Expand Down Expand Up @@ -230,7 +174,6 @@ def open_jdbc_connection
public
def prepare_jdbc_connection
@connection_lock = ReentrantLock.new
@drivers_loaded = Concurrent::AtomicBoolean.new
end

public
Expand Down
30 changes: 2 additions & 28 deletions lib/logstash/plugin_mixins/jdbc_streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,11 @@ def setup_jdbc_config
config :jdbc_validation_timeout, :validate => :number, :default => 3600
end

private

def load_driver_jars
unless @jdbc_driver_library.nil? || @jdbc_driver_library.empty?
@jdbc_driver_library.split(",").each do |driver_jar|
begin
@logger.debug("loading #{driver_jar}")
# Use https://github.com/jruby/jruby/wiki/CallingJavaFromJRuby#from-jar-files to make classes from jar
# available
require driver_jar
rescue LoadError => e
raise LogStash::PluginLoadingError, "unable to load #{driver_jar} from :jdbc_driver_library, #{e.message}"
end
end
end
end

public
def prepare_jdbc_connection
require "sequel"
require "sequel/adapters/jdbc"
require "java"

load_driver_jars

@sequel_opts_symbols = @sequel_opts.inject({}) {|hash, (k,v)| hash[k.to_sym] = v; hash}
@sequel_opts_symbols[:user] = @jdbc_user unless @jdbc_user.nil? || @jdbc_user.empty?
@sequel_opts_symbols[:password] = @jdbc_password.value unless @jdbc_password.nil?
load_driver

@sequel_opts_symbols[:driver] = Sequel::JDBC.load_driver(@jdbc_driver_class)
@database = Sequel.connect(@jdbc_connection_string, @sequel_opts_symbols)
@database = Sequel.connect(@jdbc_connection_string, complete_sequel_opts)
if @jdbc_validate_connection
@database.extension(:connection_validator)
@database.pool.connection_validation_timeout = @jdbc_validation_timeout
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-jdbc'
s.version = '5.0.4'
s.version = '5.0.5'
s.licenses = ['Apache License (2.0)']
s.summary = "Integration with JDBC - input and filter plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
59 changes: 57 additions & 2 deletions spec/inputs/jdbc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
# We do not need to set TZ env var anymore because we can have 'Sequel.application_timezone' set to utc by default now.

describe LogStash::Inputs::Jdbc do
let(:connection_string) { "jdbc:derby:memory:testdb;create=true" }
let(:mixin_settings) do
{ "jdbc_user" => ENV['USER'], "jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"jdbc_connection_string" => "jdbc:derby:memory:testdb;create=true"}
{
"jdbc_user" => ENV['USER'],
"jdbc_driver_class" => "org.apache.derby.jdbc.EmbeddedDriver",
"jdbc_connection_string" => connection_string
}
end
let(:settings) { {} }
let(:plugin) { LogStash::Inputs::Jdbc.new(mixin_settings.merge(settings)) }
Expand Down Expand Up @@ -112,6 +116,57 @@
end
end

context "when sequel opts has user option" do
let(:settings) do
{
"jdbc_user" => 'system',
"statement" => "SELECT * from test_table",
"sequel_opts" => { "user" => 'from-opts' }
}
end

before do
plugin.register
end

after do
plugin.stop
end

it "should honor set jdbc-user when connecting" do
expect( Sequel ).to receive(:connect).with connection_string, hash_including(:user=>"system")
plugin.send(:jdbc_connect)
end
end

context "with sequel opts" do
let(:settings) do
{
"jdbc_user" => 'system',
"statement" => "SELECT * from test_table",
"sequel_opts" => {
"truthy" => 'true',
"falsey" => 'false',
"foo" => 'bar',
"jdbc_properties" => { "some" => 'true' }
}
}
end

before do
plugin.register
end

after do
plugin.stop
end

it "should symbolize keys" do
expect( Sequel ).to receive(:connect).with connection_string,
hash_including(:truthy => true, :falsey => false, :foo => 'bar', :jdbc_properties => { 'some' => 'true' })
plugin.send(:jdbc_connect)
end
end

context "when neither statement and statement_filepath arguments are passed" do
it "should fail to register" do
Expand Down