Skip to content

Commit

Permalink
Merge pull request #35 from dwbutler/buffer-connectable
Browse files Browse the repository at this point in the history
Buffer connectable log devices
  • Loading branch information
dwbutler committed Sep 21, 2015
2 parents 6150eba + fa7c4be commit 368f7fa
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 42 deletions.
46 changes: 43 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ writing to a file or syslog since logstash can receive the structured data direc

## Features

* Can write directly to logstash over a UDP or TCP/SSL connection.
* Can write directly to a logstash listener over a UDP or TCP/SSL connection.
* Can write to a file, Redis, Kafka, a unix socket, syslog, stdout, or stderr.
* Writes in logstash JSON format, but supports other formats as well.
* Can write to multiple outputs.
* Logger can take a string message, a hash, a `LogStash::Event`, an object, or a JSON string as input.
* Events are automatically populated with message, timestamp, host, and severity.
* Writes in logstash JSON format, but supports other formats as well.
* Can write to multiple outputs.
* Log messages are buffered and automatically re-sent if there is a connection problem.
* Easily integrates with Rails via configuration.

## Installation
Expand Down Expand Up @@ -234,6 +235,39 @@ This configuration would result in the following output.
}
```

## Buffering / Automatic Retries

Log messages are buffered internally, and automatically re-sent if there is a connection problem.
Outputs that support batch writing (Redis and Kafka) will write log messages in bulk from the
buffer. This functionality is implemented using
[Stud::Buffer](https://github.com/jordansissel/ruby-stud/blob/master/lib/stud/buffer.rb).
You can configure its behavior by passing the following options to LogStashLogger:

:buffer_max_items - Max number of items to buffer before flushing. Defaults to 50.
:buffer_max_interval - Max number of seconds to wait between flushes. Defaults to 5.

You can turn this behavior off by setting `buffer_max_items` to `1` or `sync` to `true`.

Please be aware of the following caveats to this behavior:

* It's possible for duplicate log messages to be sent when retrying. For outputs like Redis and
Kafka that write in batches, the whole batch could get re-sent. If this is a problem, you
can add a UUID field to each event to uniquely identify it. You can either do this
in a `customize_event` block, or by using logstash's
[UUID filter](https://www.elastic.co/guide/en/logstash/current/plugins-filters-uuid.html).
* It's still possible to lose log messages. Ruby won't detect a TCP/UDP connection problem
immediately. In my testing, it took Ruby about 4 seconds to notice the receiving end was down
and start raising exceptions. Since logstash listeners over TCP/UDP do not acknowledge received
messages, it's not possible to know which log messages to re-send.
* If your output source is unavailable long enough, writing to the log will block until it is
available again. This could make your application unresponsive.
* If your application suddenly terminates (for example, by SIGKILL or a power outage), the whole
buffer will be lost.

You can make message loss and application blockage less likely by increasing `buffer_max_items`
(so that more events can be held in the buffer), and increasing `buffer_max_interval` (to wait
longer between flushes). This will increase memory pressure on your application as log messages
accumulate in the buffer, so make sure you have allocated enough memory to your process.

## Rails Integration

Expand Down Expand Up @@ -269,6 +303,12 @@ config.logstash.uri = ENV['LOGSTASH_URI']
# Optional. Defaults to :json_lines. If there are multiple outputs,
# they will all share the same formatter.
config.logstash.formatter = :json_lines

# Optional, max number of items to buffer before flushing. Defaults to 50
config.logstash.buffer_max_items = 50

# Optional, max number of seconds to wait between flushes. Defaults to 5
config.logstash.buffer_max_interval = 5
```

#### UDP
Expand Down
52 changes: 42 additions & 10 deletions lib/logstash-logger/device/connectable.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,45 @@
require 'stud/buffer'

module LogStashLogger
module Device
class Connectable < Base
def write(message)
with_connection do
super
include Stud::Buffer

def initialize(opts = {})
super

if opts[:batch_events]
warn "The :batch_events option is deprecated. Please use :buffer_max_items instead"
end

if opts[:batch_timeout]
warn "The :batch_timeout option is deprecated. Please use :buffer_max_interval instead"
end

@buffer_max_items = opts[:batch_events] || opts[:buffer_max_items]
@buffer_max_interval = opts[:batch_timeout] || opts[:buffer_max_interval]

buffer_initialize max_items: @buffer_max_items, max_interval: @buffer_max_interval
end

def flush
return unless connected?
with_connection do
super
def write(message)
buffer_receive message
buffer_flush(force: true) if @sync
end

def flush(*args)
if args.empty?
buffer_flush
else
write_batch(args[0])
end
end

def close
buffer_flush(final: true)
super
end

def to_io
with_connection do
@io
Expand All @@ -24,7 +50,13 @@ def connected?
!!@io
end

protected
def write_batch(messages)
with_connection do
messages.each do |message|
@io.write(message)
end
end
end

# Implemented by subclasses
def connect
Expand All @@ -38,12 +70,12 @@ def reconnect

# Ensure the block is executed with a valid connection
def with_connection(&block)
connect unless @io
connect unless connected?
yield
rescue => e
warn "#{self.class} - #{e.class} - #{e.message}"
close
@io = nil
raise
end
end
end
Expand Down
17 changes: 7 additions & 10 deletions lib/logstash-logger/device/kafka.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
require 'poseidon'
require 'stud/buffer'

module LogStashLogger
module Device
class Kafka < Connectable
include Stud::Buffer

DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 9092
Expand All @@ -22,11 +20,6 @@ def initialize(opts)
@topic = opts[:path] || DEFAULT_TOPIC
@producer = opts[:producer] || DEFAULT_PRODUCER
@backoff = opts[:backoff] || DEFAULT_BACKOFF

@batch_events = opts.fetch(:batch_events, 50)
@batch_timeout = opts.fetch(:batch_timeout, 5)

buffer_initialize max_items: @batch_events, max_interval: @batch_timeout
end

def connect
Expand Down Expand Up @@ -56,6 +49,12 @@ def write(message)
buffer_flush(force: true) if @sync
end

def write_batch(messages)
with_connection do
@io.send_messages messages
end
end

def close
buffer_flush(final: true)
@io && @io.close
Expand All @@ -70,9 +69,7 @@ def flush(*args)
buffer_flush
else
messages = *args.first
with_connection do
@io.send_messages messages
end
write_batch(messages)
end
end

Expand Down
20 changes: 8 additions & 12 deletions lib/logstash-logger/device/redis.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
require 'redis'
require 'stud/buffer'

module LogStashLogger
module Device
class Redis < Connectable
include Stud::Buffer

DEFAULT_LIST = 'logstash'

attr_accessor :list
Expand All @@ -17,14 +14,8 @@ def initialize(opts)
normalize_path(opts)

@redis_options = opts

@batch_events = opts.fetch(:batch_events, 50)
@batch_timeout = opts.fetch(:batch_timeout, 5)

buffer_initialize max_items: @batch_events, max_interval: @batch_timeout
end


def connect
@io = ::Redis.new(@redis_options)
end
Expand All @@ -42,13 +33,20 @@ def with_connection
rescue => e
warn "#{self.class} - #{e.class} - #{e.message}"
@io = nil
raise
end

def write(message)
buffer_receive message, @list
buffer_flush(force: true) if @sync
end

def write_batch(messages, list = nil)
with_connection do
@io.rpush(list, messages)
end
end

def close
buffer_flush(final: true)
@io && @io.quit
Expand All @@ -63,9 +61,7 @@ def flush(*args)
buffer_flush
else
messages, list = *args
with_connection do
@io.rpush(list, messages)
end
write_batch(messages, list)
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/logstash-logger/device/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ def use_ssl?
@use_ssl || !@ssl_certificate.nil?
end

protected

def connect
if use_ssl?
ssl_connect
Expand All @@ -28,6 +26,8 @@ def connect
@io
end

protected

def non_ssl_connect
@io = TCPSocket.new(@host, @port).tap do |socket|
socket.sync = sync unless sync.nil?
Expand Down
1 change: 1 addition & 0 deletions spec/device/unix_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

before(:each) do
allow(::UNIXSocket).to receive(:new) { unix_socket }
allow(unix_socket).to receive(:sync=)
end

it "writes to a local unix socket" do
Expand Down
10 changes: 5 additions & 5 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ def connection_type
let(:port) { PORT }

# The logstash logger
let(:logger) { LogStashLogger.new(host: host, port: port, type: connection_type) }
let(:logger) { LogStashLogger.new(host: host, port: port, type: connection_type, sync: true) }
# The log device that the logger writes to
let(:logdev) { logger.instance_variable_get(:@logdev) }
end

RSpec.shared_context 'device' do
let(:port) { PORT }
let(:device_with_port) { LogStashLogger::Device.new(port: port) }
let(:udp_device) { LogStashLogger::Device.new(type: :udp, port: port) }
let(:tcp_device) { LogStashLogger::Device.new(type: :tcp, port: port) }
let(:ssl_tcp_device) { LogStashLogger::Device.new(type: :tcp, port: port, ssl_enable: true) }
let(:unix_device) { LogStashLogger::Device.new(type: :unix, path: '/tmp/logstash') }
let(:udp_device) { LogStashLogger::Device.new(type: :udp, port: port, sync: true) }
let(:tcp_device) { LogStashLogger::Device.new(type: :tcp, port: port, sync: true) }
let(:ssl_tcp_device) { LogStashLogger::Device.new(type: :tcp, port: port, ssl_enable: true, sync: true) }
let(:unix_device) { LogStashLogger::Device.new(type: :unix, path: '/tmp/logstash', sync: true) }

let(:file) { Tempfile.new('test') }
let(:file_device) { LogStashLogger::Device.new(type: :file, path: file.path)}
Expand Down

0 comments on commit 368f7fa

Please sign in to comment.