Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate in_tail to v0.14 API #1059

Merged
merged 38 commits into from
Sep 1, 2016
Merged
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0df10d9
Migrate in_tail to v0.14 API
okkez Jun 20, 2016
5b4bbc8
Remove backward compatibility
okkez Jun 20, 2016
d305b8b
Remove needless tests
okkez Jun 20, 2016
095f058
Force remove
okkez Jun 20, 2016
ff6d47a
Use `assert_equal` instead of `assert`
okkez Jun 20, 2016
f4b8526
Use `expect_emits` parameter instead of plain sleep
okkez Jun 20, 2016
2f656d9
Fix errors on Windows
okkez Jun 27, 2016
9372c2e
Use plugin heleper `:timer`
okkez Jun 27, 2016
794888b
Use 'fluent/test/driver/input' instead of 'fluent/test'
okkez Jul 1, 2016
2cf5792
Remove needless include
okkez Jul 7, 2016
98c0631
Use plugin helper event_loop instead of `@_event_loop`
okkez Jul 7, 2016
5260e57
Use plugin helper timer for `TailWatcher#timer_trigger`
okkez Jul 7, 2016
34b7057
Use plugin helper parser
okkez Jul 8, 2016
65e0043
Add groupt receive_lines
okkez Jul 8, 2016
ec3a646
Indent
okkez Jul 8, 2016
cf163e5
Divide test
okkez Jul 8, 2016
28aa9cb
Fix a typo
okkez Jul 8, 2016
f858095
Use `emit_count`
okkez Jul 29, 2016
c687b76
Use plugin helper `compat_parameters`
okkez Jul 29, 2016
f758a9d
Simplify code
okkez Jul 29, 2016
6d154c2
Use string literal instead of symbol to specify `@type` in `parser_co…
okkez Aug 3, 2016
ccfa213
Use `config_element` instead of string literal
okkez Aug 5, 2016
605fada
Use `expect_emits` parameter instead of sleep
okkez Aug 5, 2016
8b906de
Remove outdated comment
okkez Aug 5, 2016
77cd882
Group test
okkez Aug 5, 2016
cad9b7a
Indent
okkez Aug 5, 2016
bd83a59
Add test got using parse section
okkez Aug 5, 2016
db798b9
Increment `expect_emits`
okkez Aug 8, 2016
7759587
Use `expect_records` instead of `expect_emits`
okkez Aug 8, 2016
f87bdf4
Remove redandunt assignment
okkez Aug 10, 2016
7ffd09e
Add prefix to timer title
okkez Aug 10, 2016
88f3860
Remove redandunt assignment
okkez Aug 10, 2016
5663292
Raise Fluent::ConfigError
okkez Aug 10, 2016
cd16f06
Check <parse> section existence before use it
okkez Aug 17, 2016
fc49465
Add test group about configure
okkez Aug 24, 2016
f63f3de
Indent
okkez Aug 24, 2016
e1da436
Organize code to migrate to v0.14 API
okkez Aug 24, 2016
3b39f6d
Call `parser_create` in `#configure`
okkez Aug 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 41 additions & 85 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
@@ -16,10 +16,9 @@

require 'cool.io'

require 'fluent/input'
require 'fluent/plugin/input'
require 'fluent/config/error'
require 'fluent/event'
require 'fluent/system_config'
require 'fluent/plugin/buffer'

if Fluent.windows?
@@ -28,11 +27,11 @@
Fluent::FileWrapper = File
end

module Fluent
class TailInput < Input
include SystemConfig::Mixin
module Fluent::Plugin
class TailInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('tail', self)

Plugin.register_input('tail', self)
helpers :timer, :event_loop, :parser, :compat_parameters

FILE_PERMISSION = 0644

@@ -77,34 +76,42 @@ def initialize
attr_reader :paths

def configure(conf)
compat_parameters_convert(conf, :parser)
parser_config = conf.elements('parse').first
unless parser_config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this check is here?
This check should be moved to before if parser_config at line 82?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. I misunderstand that super at L88 may add <parse> section.

raise Fluent::ConfigError, "<parse> section is required."
end
unless parser_config["@type"]
raise Fluent::ConfigError, "parse/@type is required."
end

(1..Fluent::Plugin::MultilineParser::FORMAT_MAX_NUM).each do |n|
parser_config["format#{n}"] = conf["format#{n}"] if conf["format#{n}"]
end

super

@paths = @path.split(',').map {|path| path.strip }
if @paths.empty?
raise ConfigError, "tail: 'path' parameter is required on tail input"
raise Fluent::ConfigError, "tail: 'path' parameter is required on tail input"
end

unless @pos_file
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
end

configure_parser(conf)
configure_tag
configure_encoding

@multiline_mode = conf['format'] =~ /multiline/
@multiline_mode = parser_config["@type"] =~ /multiline/
@receive_handler = if @multiline_mode
method(:parse_multilines)
else
method(:parse_singleline)
end
@file_perm = system_config.file_permission || FILE_PERMISSION
end

def configure_parser(conf)
@parser = Plugin.new_parser(conf['format'])
@parser.configure(conf)
@parser = parser_create(conf: parser_config)
end

def configure_tag
@@ -120,7 +127,7 @@ def configure_tag
def configure_encoding
unless @encoding
if @from_encoding
raise ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter."
raise Fluent::ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter."
end
end

@@ -132,7 +139,7 @@ def parse_encoding_param(encoding_name)
begin
Encoding.find(encoding_name) if encoding_name
rescue ArgumentError => e
raise ConfigError, e.message
raise Fluent::ConfigError, e.message
end
end

@@ -145,20 +152,12 @@ def start
@pf = PositionFile.parse(@pf_file)
end

@loop = Coolio::Loop.new
refresh_watchers

@refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, log, &method(:refresh_watchers))
@refresh_trigger.attach(@loop)
@thread = Thread.new(&method(:run))
timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers))
end

def shutdown
@refresh_trigger.detach if @refresh_trigger && @refresh_trigger.attached?

stop_watchers(@tails.keys, true)
@loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception.
@thread.join
@pf_file.close if @pf_file

super
@@ -206,8 +205,11 @@ def refresh_watchers

def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines))
tw.attach(@loop)
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines))
tw.attach do |watcher|
timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer
event_loop_attach(watcher.stat_trigger)
end
tw
end

@@ -218,7 +220,7 @@ def start_watchers(paths)
pe = @pf[path]
if @read_from_head && pe.read_inode.zero?
begin
pe.update(FileWrapper.stat(path).ino, 0)
pe.update(Fluent::FileWrapper.stat(path).ino, 0)
rescue Errno::ENOENT
$log.warn "#{path} not found. Continuing without tailing it."
end
@@ -263,8 +265,9 @@ def close_watcher(tw, close_io = true)
end

def close_watcher_after_rotate_wait(tw)
closer = TailWatcher::Closer.new(@rotate_wait, tw, log, &method(:close_watcher))
closer.attach(@loop)
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
Copy link
Member

@repeatedly repeatedly Aug 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closer calls detach inside ensure.
Does this code detach one-shot timer properly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code calls close_watcher(tw) at once.
Because repeat: false is specified, see also ev(3).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing event is only once but don't call detach.
So a watcher is still attached to event loop. It is resource leak.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've sent PR for this resource leak. #1178

close_watcher(tw)
end
end

def flush_buffer(tw)
@@ -293,13 +296,6 @@ def flush_buffer(tw)
end
end

def run
@loop.run
rescue
log.error "unexpected error", error: $!.to_s
log.error_backtrace
end

# @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError
def receive_lines(lines, tail_watcher)
es = @receive_handler.call(lines, tail_watcher)
@@ -347,7 +343,7 @@ def convert_line_to_event(line, es, tail_watcher)
end

def parse_singleline(lines, tail_watcher)
es = MultiEventStream.new
es = Fluent::MultiEventStream.new
lines.each { |line|
convert_line_to_event(line, es, tail_watcher)
}
@@ -356,7 +352,7 @@ def parse_singleline(lines, tail_watcher)

def parse_multilines(lines, tail_watcher)
lb = tail_watcher.line_buffer
es = MultiEventStream.new
es = Fluent::MultiEventStream.new
if @parser.has_firstline?
tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher
lines.each { |line|
@@ -400,8 +396,6 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, r
@receive_lines = receive_lines
@update_watcher = update_watcher

@timer_trigger = TimerWatcher.new(1, true, log, &method(:on_notify)) if @enable_watch_timer

@stat_trigger = StatWatcher.new(path, log, &method(:on_notify))

@rotate_handler = RotateHandler.new(path, log, &method(:on_rotate))
@@ -412,6 +406,8 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, r
end

attr_reader :path
attr_reader :stat_trigger, :enable_watch_timer
attr_accessor :timer_trigger
attr_accessor :line_buffer, :line_buffer_timer_flusher
attr_accessor :unwatched # This is used for removing position entry from PositionFile

@@ -423,14 +419,12 @@ def wrap_receive_lines(lines)
@receive_lines.call(lines, self)
end

def attach(loop)
@timer_trigger.attach(loop) if @enable_watch_timer
@stat_trigger.attach(loop)
def attach
yield self
on_notify
end

def detach
@timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached?
@stat_trigger.detach if @stat_trigger.attached?
end

@@ -523,22 +517,6 @@ def swap_state(pe)
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, repeat, log, &callback)
@callback = callback
@log = log
super(interval, repeat)
end

def on_timer
@callback.call
rescue
# TODO log?
@log.error $!.to_s
@log.error_backtrace
end
end

class StatWatcher < Coolio::StatWatcher
def initialize(path, log, &callback)
@callback = callback
@@ -555,24 +533,6 @@ def on_change(prev, cur)
end
end

class Closer < Coolio::TimerWatcher
def initialize(interval, tw, log, &callback)
@callback = callback
@tw = tw
@log = log
super(interval, false)
end

def on_timer
@callback.call(@tw)
rescue => e
@log.error e.to_s
@log.error_backtrace(e.backtrace)
ensure
detach
end
end

class IOHandler
def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines)
@log = log
@@ -660,7 +620,7 @@ def initialize(path, log, &on_rotate)

def on_notify
begin
stat = FileWrapper.stat(@path)
stat = Fluent::FileWrapper.stat(@path)
inode = stat.ino
fsize = stat.size
rescue Errno::ENOENT
@@ -673,7 +633,7 @@ def on_notify
if @inode != inode || fsize < @fsize
# rotated or truncated
begin
io = FileWrapper.open(@path)
io = Fluent::FileWrapper.open(@path)
rescue Errno::ENOENT
end
@on_rotate.call(io)
@@ -688,7 +648,6 @@ def on_notify
end
end


class LineBufferTimerFlusher
def initialize(log, flush_interval, &flush_method)
@log = log
@@ -713,7 +672,6 @@ def reset_timer
end
end


class PositionFile
UNWATCHED_POSITION = 0xffffffffffffffff

@@ -833,6 +791,4 @@ def read_inode
end
end
end

NewTailInput = TailInput # for backward compatibility
end
1,344 changes: 681 additions & 663 deletions test/plugin/test_in_tail.rb

Large diffs are not rendered by default.