Skip to content

Commit

Permalink
Allow tracking by column number, not just time
Browse files Browse the repository at this point in the history
Test for tracking_column and warn user once per query if it's not there.
Add a test to verify this is working properly

closes #57
  • Loading branch information
untergeek committed Jan 6, 2016
1 parent bc37474 commit 0233050
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.0.0
- [#57](https://github.com/logstash-plugins/logstash-input-jdbc/issues/57) New feature: Allow tracking by a column value rather than by last run time. **This is a breaking change**, as users may be required to change from using `sql_last_start` to use `sql_last_value` in their queries. No other changes are required if you've been using time-based queries. See the documentation if you wish to use an incremental column value to track updates to your tables.

## 2.1.1
- [#44](https://github.com/logstash-plugins/logstash-input-jdbc/issues/44) add option to control the lowercase or not, of the column names.

Expand Down
38 changes: 26 additions & 12 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
#
# ==== State
#
# The plugin will persist the `sql_last_start` parameter in the form of a
# metadata file stored in the configured `last_run_metadata_path`. Upon query execution,
# this file will be updated with the current value of `sql_last_start`. Next time
# The plugin will persist the `sql_last_value` parameter in the form of a
# metadata file stored in the configured `last_run_metadata_path`. Upon query execution,
# this file will be updated with the current value of `sql_last_value`. Next time
# the pipeline starts up, this value will be updated by reading from the file. If
# `clean_run` is set to true, this value will be ignored and `sql_last_start` will be
# set to Jan 1, 1970, as if no query has ever been executed.
# `clean_run` is set to true, this value will be ignored and `sql_last_value` will be
# set to Jan 1, 1970, or 0 if `use_column_value` is true, as if no query has ever been executed.
#
# ==== Dealing With Large Result-sets
#
Expand Down Expand Up @@ -90,8 +90,9 @@
# Here is the list:
#
# |==========================================================
# |sql_last_start | The last time a statement was executed. This is set to Thursday, 1 January 1970
# before any query is run, and updated accordingly after first query is run.
# |sql_last_value | The value used to calculate which rows to query. Before any query is run,
# this is set to Thursday, 1 January 1970, or 0 if `use_column_value` is true and
# `tracking_column` is set. It is updated accordingly after subsequent queries are run.
# |==========================================================
#
class LogStash::Inputs::Jdbc < LogStash::Inputs::Base
Expand Down Expand Up @@ -131,6 +132,12 @@ class LogStash::Inputs::Jdbc < LogStash::Inputs::Base
# Path to file with last run time
config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run"

# Use an incremental column value rather than a timestamp
config :use_column_value, :validate => :boolean, :default => false

# If tracking column value rather than timestamp, the column whose value is to be tracked
config :tracking_column, :validate => :string

# Whether the previous run state should be preserved
config :clean_run, :validate => :boolean, :default => false

Expand All @@ -146,11 +153,18 @@ def register
require "rufus/scheduler"
prepare_jdbc_connection

# load sql_last_start from file if exists
# Raise an error if @use_column_value is true, but no @tracking_column is set
if @use_column_value
if @tracking_column.nil?
raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
end
end

# load sql_last_value from file if exists
if @clean_run && File.exist?(@last_run_metadata_path)
File.delete(@last_run_metadata_path)
elsif File.exist?(@last_run_metadata_path)
@sql_last_start = YAML.load(File.read(@last_run_metadata_path))
@sql_last_value = YAML.load(File.read(@last_run_metadata_path))
end

unless @statement.nil? ^ @statement_filepath.nil?
Expand Down Expand Up @@ -185,17 +199,17 @@ def stop

def execute_query(queue)
# update default parameters
@parameters['sql_last_start'] = @sql_last_start
@parameters['sql_last_value'] = @sql_last_value
execute_statement(@statement, @parameters) do |row|
event = LogStash::Event.new(row)
decorate(event)
queue << event
end
end

def update_state_file
if @record_last_run
File.write(@last_run_metadata_path, YAML.dump(@sql_last_start))
File.write(@last_run_metadata_path, YAML.dump(@sql_last_value))
end
end

Expand Down
28 changes: 25 additions & 3 deletions lib/logstash/plugin_mixins/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ def prepare_jdbc_connection
else
@database.identifier_output_method = :to_s
end
@sql_last_start = Time.at(0).utc
if @use_column_value
@sql_last_value = 0
else
@sql_last_value = Time.at(0).utc
end
end # def prepare_jdbc_connection

public
Expand All @@ -175,29 +179,47 @@ def execute_statement(statement, parameters)
begin
parameters = symbolized_params(parameters)
query = @database[statement, parameters]
sql_last_start = Time.now.utc
sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc
@tracking_column_warning_sent = false
@logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count)

if @jdbc_paging_enabled
query.each_page(@jdbc_page_size) do |paged_dataset|
paged_dataset.each do |row|
sql_last_value = get_column_value(row) if @use_column_value
yield extract_values_from(row)
end
end
else
query.each do |row|
sql_last_value = get_column_value(row) if @use_column_value
yield extract_values_from(row)
end
end
success = true
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
@logger.warn("Exception when executing JDBC query", :exception => e)
else
@sql_last_start = sql_last_start
@sql_last_value = sql_last_value
end
return success
end

public
def get_column_value(row)
if !row.has_key?(@tracking_column.to_sym)
if !@tracking_column_warning_sent
@logger.warn("tracking_column not found in dataset.", :tracking_column => @tracking_column)
@tracking_column_warning_sent = true
end
# If we can't find the tracking column, return the current value in the ivar
@sql_last_value
else
# Otherwise send the updated tracking column
row[@tracking_column.to_sym]
end
end

# Symbolize parameters keys to use with Sequel
private
def symbolized_params(parameters)
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-jdbc'
s.version = '2.1.1'
s.version = '3.0.0'
s.licenses = ['Apache License (2.0)']
s.summary = "This example input streams a string at a definable interval."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand Down
Loading

0 comments on commit 0233050

Please sign in to comment.