Skip to content

Commit

Permalink
Added support for Sidekiq 5
Browse files Browse the repository at this point in the history
  • Loading branch information
silviusimeria committed Jul 7, 2017
1 parent 52997f0 commit e7f2e71
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 72 deletions.
1 change: 1 addition & 0 deletions .ruby-gemset
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sidekiq-logstash
1 change: 1 addition & 0 deletions .ruby-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2.3.0
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
language: ruby
rvm:
- 2.2.2
- 2.3.0
- 2.4.0
before_install: gem install bundler -v 1.11.2

gemfile:
- gemfiles/sidekiq4.gemfile
- gemfiles/sidekiq5.gemfile
3 changes: 2 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
source 'https://rubygems.org'

# Specify your gem's dependencies in sidekiq-logstash.gemspec
# Specify your gem's dependencies in sidekiq-logstash.gemspec

gemspec
5 changes: 5 additions & 0 deletions gemfiles/sidekiq4.gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
source 'https://rubygems.org'

gem sidekiq '~> 4.0'

gemspec :path => ''../'
5 changes: 5 additions & 0 deletions gemfiles/sidekiq5.gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
source 'https://rubygems.org'

gem sidekiq '~> 5.0'

gemspec :path => ''../'
16 changes: 8 additions & 8 deletions lib/sidekiq/logging/argument_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ def self.compile(filters)
strings, regexps, blocks = [], [], []
filters.each do |item|
case item
when Proc
blocks << item
when Regexp
regexps << item
else
strings << Regexp.escape(item.to_s)
when Proc
blocks << item
when Regexp
regexps << item
else
strings << Regexp.escape(item.to_s)
end
end
deep_regexps, regexps = regexps.partition { |r| r.to_s.include?("\\.".freeze) }
Expand All @@ -44,9 +44,9 @@ def self.compile(filters)
attr_reader :regexps, :deep_regexps, :blocks

def initialize(regexps, deep_regexps, blocks)
@regexps = regexps
@regexps = regexps
@deep_regexps = deep_regexps.any? ? deep_regexps : nil
@blocks = blocks
@blocks = blocks
end

def call(original_args, parents = [])
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq/logging/logstash_formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def call(severity, time, progname, data)
else
json_data = {
severity: severity,
message: data
message: data
}
end

Expand Down
62 changes: 62 additions & 0 deletions lib/sidekiq/logging/shared.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
module Sidekiq
module Logging
module Shared
def log_job(payload, started_at, exc = nil)
# Create a copy of the payload using JSON
# This should always be possible since Sidekiq store it in Redis
payload = JSON.parse(JSON.unparse(payload))

# Convert timestamps into Time instances
%w( created_at enqueued_at retried_at failed_at completed_at ).each do |key|
payload[key] = parse_time(payload[key]) if payload[key]
end

# Add process id params
payload['pid'] = ::Process.pid
payload['duration'] = elapsed(started_at)

message = "#{payload['class']} JID-#{payload['jid']}"

if exc
payload['message'] = "#{message}: fail: #{payload['duration']} sec"
payload['job_status'] = 'fail'
payload['error_message'] = exc.message
payload['error'] = exc.class
payload['error_backtrace'] = %('#{exc.backtrace.join("\n")}')
else
payload['message'] = "#{message}: done: #{payload['duration']} sec"
payload['job_status'] = 'done'
payload['completed_at'] = Time.now.utc
end

# Filter sensitive parameters
unless filter_args.empty?
args_filter = Sidekiq::Logging::ArgumentFilter.new(filter_args)
payload['args'] = args_filter.filter({ args: payload['args'] })[:args]
end

# Needs to map all args to strings for ElasticSearch compatibility
payload['args'].map!(&:to_s)

payload
end

def elapsed(start)
(Time.now.utc - start).round(3)
end

def parse_time(timestamp)
return timestamp if timestamp.is_a? Time
timestamp.is_a?(Float) ?
Time.at(timestamp).utc :
Time.parse(timestamp)
rescue
timestamp
end

def filter_args
Sidekiq::Logstash.configuration.filter_args
end
end
end
end
14 changes: 10 additions & 4 deletions lib/sidekiq/logstash.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'sidekiq/middleware/server/logstah_logging'
require 'sidekiq/logging/logstash_formatter'
require 'sidekiq/logging/argument_filter'
require 'sidekiq/logstash_job_logger'

module Sidekiq
module Logstash
Expand All @@ -18,12 +19,17 @@ def self.setup(opts = {})
# Calls Sidekiq.configure_server to inject logics
Sidekiq.configure_server do |config|
# Remove default Sidekiq error_handler that logs errors
config.error_handlers.delete_if {|h| h.is_a?(Sidekiq::ExceptionHandler::Logger) }
config.error_handlers.delete_if { |h| h.is_a?(Sidekiq::ExceptionHandler::Logger) }

# Add logstash support
config.server_middleware do |chain|
chain.add Sidekiq::Middleware::Server::LogstashLogging
chain.remove Sidekiq::Middleware::Server::Logging
# The logging server middleware was removed in Sidekiq 5.0.0, see: https://github.com/mperham/sidekiq/blob/master/Changes.md
if Sidekiq::Middleware::Server.const_defined?(:Logging)
config.server_middleware do |chain|
chain.add Sidekiq::Middleware::Server::LogstashLogging
chain.remove Sidekiq::Middleware::Server::Logging
end
else
Sidekiq.options[:job_logger] = Sidekiq::LogstashJobLogger
end

# Set custom formatter for Sidekiq logger
Expand Down
23 changes: 23 additions & 0 deletions lib/sidekiq/logstash_job_logger.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
require 'sidekiq/logging/shared'

module Sidekiq
class LogstashJobLogger
include Sidekiq::Logging::Shared

def call(job, _queue)
started_at = Time.now.utc
yield
Sidekiq.logger.info log_job(job, started_at)
rescue => exc
begin
Sidekiq.logger.warn log_job(job, started_at, exc)
rescue => ex
Sidekiq.logger.error 'Error logging the job execution!'
Sidekiq.logger.error "Job: #{job}"
Sidekiq.logger.error "Job Exception: #{exc}"
Sidekiq.logger.error "Log Exception: #{ex}"
end
raise
end
end
end
61 changes: 4 additions & 57 deletions lib/sidekiq/middleware/server/logstah_logging.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
require 'sidekiq/logging/shared'

module Sidekiq
module Middleware
module Server
class LogstashLogging
include Sidekiq::Logging::Shared

def call(_, job, _)
started_at = Time.now.utc
yield
Expand All @@ -17,63 +21,6 @@ def call(_, job, _)
end
raise
end

def log_job(payload, started_at, exc = nil)
# Create a copy of the payload using JSON
# This should always be possible since Sidekiq store it in Redis
payload = JSON.parse(JSON.unparse(payload))

# Convert timestamps into Time instances
%w( created_at enqueued_at retried_at failed_at completed_at ).each do |key|
payload[key] = parse_time(payload[key]) if payload[key]
end

# Add process id params
payload['pid'] = ::Process.pid
payload['duration'] = elapsed(started_at)

message = "#{payload['class']} JID-#{payload['jid']}"

if exc
payload['message'] = "#{message}: fail: #{payload['duration']} sec"
payload['job_status'] = 'fail'
payload['error_message'] = exc.message
payload['error'] = exc.class
payload['error_backtrace'] = %('#{exc.backtrace.join("\n")}')
else
payload['message'] = "#{message}: done: #{payload['duration']} sec"
payload['job_status'] = 'done'
payload['completed_at'] = Time.now.utc
end

# Filter sensitive parameters
unless filter_args.empty?
args_filter = Sidekiq::Logging::ArgumentFilter.new(filter_args)
payload['args'] = args_filter.filter({ args: payload['args'] })[:args]
end

# Needs to map all args to strings for ElasticSearch compatibility
payload['args'].map!(&:to_s)

payload
end

def elapsed(start)
(Time.now.utc - start).round(3)
end

def parse_time(timestamp)
return timestamp if timestamp.is_a? Time
timestamp.is_a?(Float) ?
Time.at(timestamp).utc :
Time.parse(timestamp)
rescue
timestamp
end

def filter_args
Sidekiq::Logstash.configuration.filter_args
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion sidekiq-logstash.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ DESC
spec.require_paths = ['lib']

spec.add_dependency 'logstash-event', '~> 1.2'
spec.add_runtime_dependency 'sidekiq', '~> 4.0'
spec.add_runtime_dependency 'sidekiq', '>= 4.0' , '<6'

spec.add_development_dependency 'bundler', '~> 1.11'
spec.add_development_dependency 'rake', '~> 10.0'
Expand Down

0 comments on commit e7f2e71

Please sign in to comment.