Skip to content

Commit

Permalink
Custom buffer implementation
Browse files Browse the repository at this point in the history
Fork and vendor in `Stud::Buffer` in order to customize its behavior better. This eliminates some of the issues with how `Stud::Buffer` was being (ab)used before, namely unpredictable exceptions causing mutex issues, and non-used threads causing the process to crash.

As a side-effect, the new `autoflush` setting, which defaults to false, allows for log messages to truly be buffered and flushed in the background. By default, there should no longer be any blocking behavior or exceptions bubbling up to the process.

This should fix #88
  • Loading branch information
dwbutler committed Jul 27, 2016
1 parent f71d37c commit 08384b0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 56 deletions.
50 changes: 43 additions & 7 deletions lib/logstash-logger/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ module Buffer
# * :max_items, Max number of items to buffer before flushing. Default 50.
# * :max_interval, Max number of seconds to wait between flushes. Default 5.
# * :logger, A logger to write log messages to. No default. Optional.
# * :autoflush, Whether to immediately flush all inbound messages. Default true.
# * :drop_messages_on_flush_error, Whether to drop messages when there is a flush error. Default false.
# * :drop_messages_on_full_buffer, Whether to drop messages when the buffer is full. Default false.
#
# @param [Hash] options
def buffer_initialize(options={})
Expand All @@ -88,9 +91,17 @@ def buffer_initialize(options={})
:max_items => options[:max_items] || 50,
:max_interval => options[:max_interval] || 5,
:logger => options[:logger] || nil,
:autoflush => options.fetch(:autoflush, true),
:has_on_flush_error => self.class.method_defined?(:on_flush_error),
:has_on_full_buffer_receive => self.class.method_defined?(:on_full_buffer_receive)
:has_on_full_buffer_receive => self.class.method_defined?(:on_full_buffer_receive),
:drop_messages_on_flush_error => options.fetch(:drop_messages_on_flush_error, false),
:drop_messages_on_full_buffer => options.fetch(:drop_messages_on_full_buffer, false)
}

reset_buffer
end

def reset_buffer
@buffer_state = {
# items accepted from including class
:pending_items => {},
Expand All @@ -111,7 +122,10 @@ def buffer_initialize(options={})
:timer => Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(:force => true)
begin
buffer_flush(:force => true)
rescue
end
end
end
}
Expand Down Expand Up @@ -150,15 +164,20 @@ def buffer_receive(event, group=nil)
:pending => @buffer_state[:pending_count],
:outgoing => @buffer_state[:outgoing_count]
) if @buffer_config[:has_on_full_buffer_receive]
sleep 0.1

if @buffer_config[:drop_messages_on_full_buffer]
reset_buffer
else
sleep 0.1
end
end

@buffer_state[:pending_mutex].synchronize do
@buffer_state[:pending_items][group] << event
@buffer_state[:pending_count] += 1
end

buffer_flush
buffer_flush if @buffer_state[:autoflush]
end

# Try to flush events.
Expand Down Expand Up @@ -227,6 +246,7 @@ def buffer_flush(options={})
events_size = events.size
@buffer_state[:outgoing_count] -= events_size
items_flushed += events_size
@buffer_state[:last_flush] = Time.now

rescue => e

Expand All @@ -240,10 +260,13 @@ def buffer_flush(options={})
on_flush_error e
end

sleep 1
retry
if @buffer_config[:drop_messages_on_flush_error]
reset_buffer
else
cancel_flush
end

end
@buffer_state[:last_flush] = Time.now
end

ensure
Expand All @@ -258,5 +281,18 @@ def buffer_clear_pending
@buffer_state[:pending_items] = Hash.new { |h, k| h[k] = [] }
@buffer_state[:pending_count] = 0
end

def buffer_clear_outgoing
@buffer_state[:outgoing_items] = Hash.new { |h, k| h[k] = [] }
@buffer_state[:outgoing_count] = 0
end

def cancel_flush
@buffer_state[:outgoing_items].each do |group, items|
@buffer_state[:pending_items][group].concat items
end
@buffer_state[:pending_count] += @buffer_state[:outgoing_count]
buffer_clear_outgoing
end
end
end
61 changes: 12 additions & 49 deletions lib/logstash-logger/device/connectable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize(opts = {})
@buffer_group = nil
@buffer_max_items = opts[:batch_events] || opts[:buffer_max_items]
@buffer_max_interval = opts[:batch_timeout] || opts[:buffer_max_interval]
@drop_messages_on_flush_error =
@drop_messages_on_flush_error =
if opts.key?(:drop_messages_on_flush_error)
opts.delete(:drop_messages_on_flush_error)
else
Expand All @@ -33,13 +33,20 @@ def initialize(opts = {})
true
end

reset_buffer
@autoflush = opts.fetch(:autoflush, false)

buffer_initialize(
max_items: @buffer_max_items,
max_interval: @buffer_max_interval,
autoflush: @autoflush,
drop_messages_on_flush_error: @drop_messages_on_flush_error,
drop_messages_on_full_buffer: @drop_messages_on_full_buffer
)
end

def write(message)
buffer_receive message, @buffer_group
buffer_flush(force: true) if @sync
rescue
end

def flush(*args)
Expand All @@ -49,23 +56,10 @@ def flush(*args)
messages, group = *args
write_batch(messages, group)
end
rescue
if @drop_messages_on_flush_error
reset_buffer
else
cancel_flush
end
raise
end

def on_flush_error(e)
raise e
end

def on_full_buffer_receive(args)
if @drop_messages_on_full_buffer
reset_buffer
end
def on_full_buffer_receive(data)
log_warning("Buffer Full - #{data}")
end

def close(opts = {})
Expand Down Expand Up @@ -117,37 +111,6 @@ def with_connection(&block)
close(flush: false)
raise
end

private

def reset_buffer
buffer_initialize max_items: @buffer_max_items, max_interval: @buffer_max_interval
@buffer_state[:timer] = Thread.new do
loop do
sleep(@buffer_config[:max_interval])
begin
buffer_flush(:force => true)
rescue
end
end
end
end

def buffer_clear_outgoing
@buffer_state[:outgoing_items] = Hash.new { |h, k| h[k] = [] }
@buffer_state[:outgoing_count] = 0
end

def cancel_flush
@buffer_state[:flush_mutex].lock rescue false
@buffer_state[:outgoing_items].each do |group, items|
@buffer_state[:pending_items][group].concat items
end
@buffer_state[:pending_count] += @buffer_state[:outgoing_count]
buffer_clear_outgoing
ensure
@buffer_state[:flush_mutex].unlock rescue false
end
end
end
end

0 comments on commit 08384b0

Please sign in to comment.