diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index a96c1eee1d..34b7854ef5 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -16,10 +16,9 @@ require 'cool.io' -require 'fluent/input' +require 'fluent/plugin/input' require 'fluent/config/error' require 'fluent/event' -require 'fluent/system_config' require 'fluent/plugin/buffer' if Fluent.windows? @@ -28,11 +27,11 @@ Fluent::FileWrapper = File end -module Fluent - class TailInput < Input - include SystemConfig::Mixin +module Fluent::Plugin + class TailInput < Fluent::Plugin::Input + Fluent::Plugin.register_input('tail', self) - Plugin.register_input('tail', self) + helpers :timer, :event_loop, :parser, :compat_parameters FILE_PERMISSION = 0644 @@ -77,11 +76,24 @@ def initialize attr_reader :paths def configure(conf) + compat_parameters_convert(conf, :parser) + parser_config = conf.elements('parse').first + unless parser_config + raise Fluent::ConfigError, " section is required." + end + unless parser_config["@type"] + raise Fluent::ConfigError, "parse/@type is required." + end + + (1..Fluent::Plugin::MultilineParser::FORMAT_MAX_NUM).each do |n| + parser_config["format#{n}"] = conf["format#{n}"] if conf["format#{n}"] + end + super @paths = @path.split(',').map {|path| path.strip } if @paths.empty? - raise ConfigError, "tail: 'path' parameter is required on tail input" + raise Fluent::ConfigError, "tail: 'path' parameter is required on tail input" end unless @pos_file @@ -89,22 +101,17 @@ def configure(conf) $log.warn "this parameter is highly recommended to save the position to resume tailing." end - configure_parser(conf) configure_tag configure_encoding - @multiline_mode = conf['format'] =~ /multiline/ + @multiline_mode = parser_config["@type"] =~ /multiline/ @receive_handler = if @multiline_mode method(:parse_multilines) else method(:parse_singleline) end @file_perm = system_config.file_permission || FILE_PERMISSION - end - - def configure_parser(conf) - @parser = Plugin.new_parser(conf['format']) - @parser.configure(conf) + @parser = parser_create(conf: parser_config) end def configure_tag @@ -120,7 +127,7 @@ def configure_tag def configure_encoding unless @encoding if @from_encoding - raise ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter." + raise Fluent::ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter." end end @@ -132,7 +139,7 @@ def parse_encoding_param(encoding_name) begin Encoding.find(encoding_name) if encoding_name rescue ArgumentError => e - raise ConfigError, e.message + raise Fluent::ConfigError, e.message end end @@ -145,20 +152,12 @@ def start @pf = PositionFile.parse(@pf_file) end - @loop = Coolio::Loop.new refresh_watchers - - @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, log, &method(:refresh_watchers)) - @refresh_trigger.attach(@loop) - @thread = Thread.new(&method(:run)) + timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers)) end def shutdown - @refresh_trigger.detach if @refresh_trigger && @refresh_trigger.attached? - stop_watchers(@tails.keys, true) - @loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception. - @thread.join @pf_file.close if @pf_file super @@ -206,8 +205,11 @@ def refresh_watchers def setup_watcher(path, pe) line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil - tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines)) - tw.attach(@loop) + tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines)) + tw.attach do |watcher| + timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer + event_loop_attach(watcher.stat_trigger) + end tw end @@ -218,7 +220,7 @@ def start_watchers(paths) pe = @pf[path] if @read_from_head && pe.read_inode.zero? begin - pe.update(FileWrapper.stat(path).ino, 0) + pe.update(Fluent::FileWrapper.stat(path).ino, 0) rescue Errno::ENOENT $log.warn "#{path} not found. Continuing without tailing it." end @@ -263,8 +265,9 @@ def close_watcher(tw, close_io = true) end def close_watcher_after_rotate_wait(tw) - closer = TailWatcher::Closer.new(@rotate_wait, tw, log, &method(:close_watcher)) - closer.attach(@loop) + timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do + close_watcher(tw) + end end def flush_buffer(tw) @@ -293,13 +296,6 @@ def flush_buffer(tw) end end - def run - @loop.run - rescue - log.error "unexpected error", error: $!.to_s - log.error_backtrace - end - # @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError def receive_lines(lines, tail_watcher) es = @receive_handler.call(lines, tail_watcher) @@ -347,7 +343,7 @@ def convert_line_to_event(line, es, tail_watcher) end def parse_singleline(lines, tail_watcher) - es = MultiEventStream.new + es = Fluent::MultiEventStream.new lines.each { |line| convert_line_to_event(line, es, tail_watcher) } @@ -356,7 +352,7 @@ def parse_singleline(lines, tail_watcher) def parse_multilines(lines, tail_watcher) lb = tail_watcher.line_buffer - es = MultiEventStream.new + es = Fluent::MultiEventStream.new if @parser.has_firstline? tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher lines.each { |line| @@ -400,8 +396,6 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, r @receive_lines = receive_lines @update_watcher = update_watcher - @timer_trigger = TimerWatcher.new(1, true, log, &method(:on_notify)) if @enable_watch_timer - @stat_trigger = StatWatcher.new(path, log, &method(:on_notify)) @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate)) @@ -412,6 +406,8 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, r end attr_reader :path + attr_reader :stat_trigger, :enable_watch_timer + attr_accessor :timer_trigger attr_accessor :line_buffer, :line_buffer_timer_flusher attr_accessor :unwatched # This is used for removing position entry from PositionFile @@ -423,14 +419,12 @@ def wrap_receive_lines(lines) @receive_lines.call(lines, self) end - def attach(loop) - @timer_trigger.attach(loop) if @enable_watch_timer - @stat_trigger.attach(loop) + def attach + yield self on_notify end def detach - @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? end @@ -523,22 +517,6 @@ def swap_state(pe) pe # This pe will be updated in on_rotate after TailWatcher is initialized end - class TimerWatcher < Coolio::TimerWatcher - def initialize(interval, repeat, log, &callback) - @callback = callback - @log = log - super(interval, repeat) - end - - def on_timer - @callback.call - rescue - # TODO log? - @log.error $!.to_s - @log.error_backtrace - end - end - class StatWatcher < Coolio::StatWatcher def initialize(path, log, &callback) @callback = callback @@ -555,24 +533,6 @@ def on_change(prev, cur) end end - class Closer < Coolio::TimerWatcher - def initialize(interval, tw, log, &callback) - @callback = callback - @tw = tw - @log = log - super(interval, false) - end - - def on_timer - @callback.call(@tw) - rescue => e - @log.error e.to_s - @log.error_backtrace(e.backtrace) - ensure - detach - end - end - class IOHandler def initialize(io, pe, log, read_lines_limit, first = true, &receive_lines) @log = log @@ -660,7 +620,7 @@ def initialize(path, log, &on_rotate) def on_notify begin - stat = FileWrapper.stat(@path) + stat = Fluent::FileWrapper.stat(@path) inode = stat.ino fsize = stat.size rescue Errno::ENOENT @@ -673,7 +633,7 @@ def on_notify if @inode != inode || fsize < @fsize # rotated or truncated begin - io = FileWrapper.open(@path) + io = Fluent::FileWrapper.open(@path) rescue Errno::ENOENT end @on_rotate.call(io) @@ -688,7 +648,6 @@ def on_notify end end - class LineBufferTimerFlusher def initialize(log, flush_interval, &flush_method) @log = log @@ -713,7 +672,6 @@ def reset_timer end end - class PositionFile UNWATCHED_POSITION = 0xffffffffffffffff @@ -833,6 +791,4 @@ def read_inode end end end - - NewTailInput = TailInput # for backward compatibility end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 5f2cf0375c..e23f764974 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1,5 +1,5 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_tail' require 'fluent/plugin/buffer' require 'fluent/system_config' @@ -16,7 +16,7 @@ def setup # ensure files are closed for Windows, on which deleted files # are still visible from filesystem GC.start(full_mark: true, immediate_mark: true, immediate_sweep: true) - FileUtils.remove_entry_secure(TMP_DIR) + FileUtils.remove_entry_secure(TMP_DIR, true) end FileUtils.mkdir_p(TMP_DIR) end @@ -28,102 +28,206 @@ def teardown TMP_DIR = File.dirname(__FILE__) + "/../tmp/tail#{ENV['TEST_ENV_NUMBER']}" - CONFIG = %[ - path #{TMP_DIR}/tail.txt - tag t1 - rotate_wait 2s - ] - COMMON_CONFIG = CONFIG + %[ - pos_file #{TMP_DIR}/tail.pos - ] - CONFIG_READ_FROM_HEAD = %[ - read_from_head true - ] - CONFIG_ENABLE_WATCH_TIMER = %[ - enable_watch_timer false - ] - SINGLE_LINE_CONFIG = %[ - format /(?.*)/ - ] + CONFIG = config_element("ROOT", "", { + "path" => "#{TMP_DIR}/tail.txt", + "tag" => "t1", + "rotate_wait" => "2s" + }) + COMMON_CONFIG = CONFIG + config_element("", "", { "pos_file" => "#{TMP_DIR}/tail.pos" }) + CONFIG_READ_FROM_HEAD = config_element("", "", { "read_from_head" => true }) + CONFIG_ENABLE_WATCH_TIMER = config_element("", "", { "enable_watch_timer" => false }) + SINGLE_LINE_CONFIG = config_element("", "", { "format" => "/(?.*)/" }) + PARSE_SINGLE_LINE_CONFIG = config_element("", "", {}, [config_element("parse", "", { "@type" => "/(?.*)/" })]) + MULTILINE_CONFIG = config_element( + "", "", { + "format" => "multiline", + "format1" => "/^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/", + "format_firstline" => "/^[s]/" + }) + PARSE_MULTILINE_CONFIG = config_element( + "", "", {}, + [config_element("parse", "", { + "@type" => "multiline", + "format1" => "/^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/", + "format_firstline" => "/^[s]/" + }) + ]) def create_driver(conf = SINGLE_LINE_CONFIG, use_common_conf = true) config = use_common_conf ? COMMON_CONFIG + conf : conf - Fluent::Test::InputTestDriver.new(Fluent::NewTailInput).configure(config) + Fluent::Test::Driver::Input.new(Fluent::Plugin::TailInput).configure(config) end - def test_configure - d = create_driver - assert_equal ["#{TMP_DIR}/tail.txt"], d.instance.paths - assert_equal "t1", d.instance.tag - assert_equal 2, d.instance.rotate_wait - assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file - assert_equal 1000, d.instance.read_lines_limit - end + sub_test_case "configure" do + test "plain single line" do + d = create_driver + assert_equal ["#{TMP_DIR}/tail.txt"], d.instance.paths + assert_equal "t1", d.instance.tag + assert_equal 2, d.instance.rotate_wait + assert_equal "#{TMP_DIR}/tail.pos", d.instance.pos_file + assert_equal 1000, d.instance.read_lines_limit + end + + data("empty" => config_element, + "w/o @type" => config_element("", "", {}, [config_element("parse", "", {})])) + test "w/o parse section" do |conf| + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end - def test_configure_encoding - # valid encoding - d = create_driver(SINGLE_LINE_CONFIG + 'encoding utf-8') - assert_equal Encoding::UTF_8, d.instance.encoding + sub_test_case "encoding" do + test "valid" do + conf = SINGLE_LINE_CONFIG + config_element("", "", { "encoding" => "utf-8" }) + d = create_driver(conf) + assert_equal Encoding::UTF_8, d.instance.encoding + end - # invalid encoding - assert_raise(Fluent::ConfigError) do - create_driver(SINGLE_LINE_CONFIG + 'encoding no-such-encoding') + test "invalid" do + conf = SINGLE_LINE_CONFIG + config_element("", "", { "encoding" => "no-such-encoding" }) + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end end - end - def test_configure_from_encoding - # If only specified from_encoding raise ConfigError - assert_raise(Fluent::ConfigError) do - create_driver(SINGLE_LINE_CONFIG + 'from_encoding utf-8') - end + sub_test_case "from_encoding" do + test "only specified from_encoding raise ConfigError" do + conf = SINGLE_LINE_CONFIG + config_element("", "", { "from_encoding" => "utf-8" }) + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end - # valid setting - d = create_driver %[ - format /(?.*)/ - read_from_head true - from_encoding utf-8 - encoding utf-8 - ] - assert_equal Encoding::UTF_8, d.instance.from_encoding + test "valid" do + conf = SINGLE_LINE_CONFIG + config_element("", "", { + "from_encoding" => "utf-8", + "encoding" => "utf-8" + }) + d = create_driver(conf) + assert_equal(Encoding::UTF_8, d.instance.from_encoding) + end - # invalid from_encoding - assert_raise(Fluent::ConfigError) do - d = create_driver %[ - format /(?.*)/ - read_from_head true - from_encoding no-such-encoding - encoding utf-8 - ] + test "invalid" do + conf = SINGLE_LINE_CONFIG + config_element("", "", { + "from_encoding" => "no-such-encoding", + "encoding" => "utf-8" + }) + assert_raise(Fluent::ConfigError) do + create_driver(conf) + end + end end end - # TODO: Should using more better approach instead of sleep wait + sub_test_case "singleline" do + data(flat: SINGLE_LINE_CONFIG, + parse: PARSE_SINGLE_LINE_CONFIG) + def test_emit(data) + config = data + File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + f.puts "test1" + f.puts "test2" + } + + d = create_driver(config) - def test_emit - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| - f.puts "test1" - f.puts "test2" - } + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + f.puts "test3" + f.puts "test4" + } + end - d = create_driver + events = d.events + assert_equal(true, events.length > 0) + assert_equal({"message" => "test3"}, events[0][2]) + assert_equal({"message" => "test4"}, events[1][2]) + assert(events[0][1].is_a?(Fluent::EventTime)) + assert(events[1][1].is_a?(Fluent::EventTime)) + assert_equal(1, d.emit_count) + end + + data('flat 1' => [:flat, 1, 2], + 'flat 10' => [:flat, 10, 1], + 'parse 1' => [:parse, 1, 2], + 'parse 10' => [:parse, 10, 1]) + def test_emit_with_read_lines_limit(data) + config_style, limit, num_events = data + case config_style + when :flat + config = CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + config_element("", "", { "read_lines_limit" => limit }) + when :parse + config = CONFIG_READ_FROM_HEAD + config_element("", "", { "read_lines_limit" => limit }) + PARSE_SINGLE_LINE_CONFIG + end + d = create_driver(config) + msg = 'test' * 500 # in_tail reads 2048 bytes at once. - d.run do - sleep 1 + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + f.puts msg + f.puts msg + } + end - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - f.puts "test3" - f.puts "test4" + events = d.events + assert_equal(true, events.length > 0) + assert_equal({"message" => msg}, events[0][2]) + assert_equal({"message" => msg}, events[1][2]) + assert_equal(num_events, d.emit_count) + end + + data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG, + parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) + def test_emit_with_read_from_head(data) + config = data + File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + f.puts "test1" + f.puts "test2" } - sleep 1 + + d = create_driver(config) + + d.run(expect_emits: 2) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + f.puts "test3" + f.puts "test4" + } + end + + events = d.events + assert(events.length > 0) + assert_equal({"message" => "test1"}, events[0][2]) + assert_equal({"message" => "test2"}, events[1][2]) + assert_equal({"message" => "test3"}, events[2][2]) + assert_equal({"message" => "test4"}, events[3][2]) end - emits = d.emits - assert_equal(true, emits.length > 0) - assert_equal({"message" => "test3"}, emits[0][2]) - assert_equal({"message" => "test4"}, emits[1][2]) - assert(emits[0][1].is_a?(Fluent::EventTime)) - assert(emits[1][1].is_a?(Fluent::EventTime)) - assert_equal(1, d.emit_streams.size) + data(flat: CONFIG_ENABLE_WATCH_TIMER + SINGLE_LINE_CONFIG, + parse: CONFIG_ENABLE_WATCH_TIMER + PARSE_SINGLE_LINE_CONFIG) + def test_emit_with_enable_watch_timer(data) + config = data + File.open("#{TMP_DIR}/tail.txt", "wb") {|f| + f.puts "test1" + f.puts "test2" + } + + d = create_driver(config) + + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") {|f| + f.puts "test3" + f.puts "test4" + } + # according to cool.io's stat_watcher.c, systems without inotify will use + # an "automatic" value, typically around 5 seconds + end + + events = d.events + assert(events.length > 0) + assert_equal({"message" => "test3"}, events[0][2]) + assert_equal({"message" => "test4"}, events[1][2]) + end end class TestWithSystem < self @@ -163,198 +267,133 @@ def test_emit_with_system d = create_driver - d.run do - sleep 1 - + d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } - sleep 1 end - emits = d.emits - assert_equal(true, emits.length > 0) - assert_equal({"message" => "test3"}, emits[0][2]) - assert_equal({"message" => "test4"}, emits[1][2]) - assert(emits[0][1].is_a?(Fluent::EventTime)) - assert(emits[1][1].is_a?(Fluent::EventTime)) - assert_equal(1, d.emit_streams.size) + events = d.events + assert_equal(true, events.length > 0) + assert_equal({"message" => "test3"}, events[0][2]) + assert_equal({"message" => "test4"}, events[1][2]) + assert(events[0][1].is_a?(Fluent::EventTime)) + assert(events[1][1].is_a?(Fluent::EventTime)) + assert_equal(1, d.emit_count) pos = d.instance.instance_variable_get(:@pf_file) mode = "%o" % File.stat(pos).mode assert_equal OVERRIDE_FILE_PERMISSION, mode[-3, 3].to_i end end - data('1' => [1, 2], '10' => [10, 1]) - def test_emit_with_read_lines_limit(data) - limit, num_emits = data - d = create_driver(CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG + "read_lines_limit #{limit}") - msg = 'test' * 500 # in_tail reads 2048 bytes at once. - - d.run do - sleep 1 - - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - f.puts msg - f.puts msg - } - sleep 1 - end - - emits = d.emits - assert_equal(true, emits.length > 0) - assert_equal({"message" => msg}, emits[0][2]) - assert_equal({"message" => msg}, emits[1][2]) - assert_equal(num_emits, d.emit_streams.size) - end - - def test_emit_with_read_from_head - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| - f.puts "test1" - f.puts "test2" - } - - d = create_driver(CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG) - - d.run do - sleep 1 + sub_test_case "rotate file" do + data(flat: SINGLE_LINE_CONFIG, + parse: PARSE_SINGLE_LINE_CONFIG) + def test_rotate_file(data) + config = data + events = sub_test_rotate_file(config, expect_emits: 2) + assert_equal(4, events.length) + assert_equal({"message" => "test3"}, events[0][2]) + assert_equal({"message" => "test4"}, events[1][2]) + assert_equal({"message" => "test5"}, events[2][2]) + assert_equal({"message" => "test6"}, events[3][2]) + end + + data(flat: CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG, + parse: CONFIG_READ_FROM_HEAD + PARSE_SINGLE_LINE_CONFIG) + def test_rotate_file_with_read_from_head(data) + config = data + events = sub_test_rotate_file(config, expect_records: 6) + assert_equal(6, events.length) + assert_equal({"message" => "test1"}, events[0][2]) + assert_equal({"message" => "test2"}, events[1][2]) + assert_equal({"message" => "test3"}, events[2][2]) + assert_equal({"message" => "test4"}, events[3][2]) + assert_equal({"message" => "test5"}, events[4][2]) + assert_equal({"message" => "test6"}, events[5][2]) + end + + data(flat: SINGLE_LINE_CONFIG, + parse: PARSE_SINGLE_LINE_CONFIG) + def test_rotate_file_with_write_old(data) + config = data + events = sub_test_rotate_file(config, expect_emits: 3) { |rotated_file| + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + rotated_file.puts "test7" + rotated_file.puts "test8" + rotated_file.flush - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - f.puts "test3" - f.puts "test4" + sleep 1 + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + f.puts "test5" + f.puts "test6" + } } - sleep 1 - end - - emits = d.emits - assert(emits.length > 0) - assert_equal({"message" => "test1"}, emits[0][2]) - assert_equal({"message" => "test2"}, emits[1][2]) - assert_equal({"message" => "test3"}, emits[2][2]) - assert_equal({"message" => "test4"}, emits[3][2]) - end - - def test_emit_with_enable_watch_timer - File.open("#{TMP_DIR}/tail.txt", "wb") {|f| - f.puts "test1" - f.puts "test2" - } - - d = create_driver(CONFIG_ENABLE_WATCH_TIMER + SINGLE_LINE_CONFIG) - - d.run do - sleep 1 - - File.open("#{TMP_DIR}/tail.txt", "ab") {|f| - f.puts "test3" - f.puts "test4" + assert_equal(6, events.length) + assert_equal({"message" => "test3"}, events[0][2]) + assert_equal({"message" => "test4"}, events[1][2]) + assert_equal({"message" => "test7"}, events[2][2]) + assert_equal({"message" => "test8"}, events[3][2]) + assert_equal({"message" => "test5"}, events[4][2]) + assert_equal({"message" => "test6"}, events[5][2]) + end + + data(flat: SINGLE_LINE_CONFIG, + parse: PARSE_SINGLE_LINE_CONFIG) + def test_rotate_file_with_write_old_and_no_new_file(data) + config = data + events = sub_test_rotate_file(config, expect_emits: 2) { |rotated_file| + rotated_file.puts "test7" + rotated_file.puts "test8" + rotated_file.flush } - # according to cool.io's stat_watcher.c, systems without inotify will use - # an "automatic" value, typically around 5 seconds - sleep 10 + assert_equal(4, events.length) + assert_equal({"message" => "test3"}, events[0][2]) + assert_equal({"message" => "test4"}, events[1][2]) + assert_equal({"message" => "test7"}, events[2][2]) + assert_equal({"message" => "test8"}, events[3][2]) end - emits = d.emits - assert(emits.length > 0) - assert_equal({"message" => "test3"}, emits[0][2]) - assert_equal({"message" => "test4"}, emits[1][2]) - end - - def test_rotate_file - emits = sub_test_rotate_file(SINGLE_LINE_CONFIG) - assert_equal(4, emits.length) - assert_equal({"message" => "test3"}, emits[0][2]) - assert_equal({"message" => "test4"}, emits[1][2]) - assert_equal({"message" => "test5"}, emits[2][2]) - assert_equal({"message" => "test6"}, emits[3][2]) - end - - def test_rotate_file_with_read_from_head - emits = sub_test_rotate_file(CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG) - assert_equal(6, emits.length) - assert_equal({"message" => "test1"}, emits[0][2]) - assert_equal({"message" => "test2"}, emits[1][2]) - assert_equal({"message" => "test3"}, emits[2][2]) - assert_equal({"message" => "test4"}, emits[3][2]) - assert_equal({"message" => "test5"}, emits[4][2]) - assert_equal({"message" => "test6"}, emits[5][2]) - end - - def test_rotate_file_with_write_old - emits = sub_test_rotate_file(SINGLE_LINE_CONFIG) { |rotated_file| - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - rotated_file.puts "test7" - rotated_file.puts "test8" - rotated_file.flush - - sleep 1 - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| - f.puts "test5" - f.puts "test6" - } - } - assert_equal(6, emits.length) - assert_equal({"message" => "test3"}, emits[0][2]) - assert_equal({"message" => "test4"}, emits[1][2]) - assert_equal({"message" => "test7"}, emits[2][2]) - assert_equal({"message" => "test8"}, emits[3][2]) - assert_equal({"message" => "test5"}, emits[4][2]) - assert_equal({"message" => "test6"}, emits[5][2]) - end - - def test_rotate_file_with_write_old_and_no_new_file - emits = sub_test_rotate_file(SINGLE_LINE_CONFIG) { |rotated_file| - rotated_file.puts "test7" - rotated_file.puts "test8" - rotated_file.flush - } - assert_equal(4, emits.length) - assert_equal({"message" => "test3"}, emits[0][2]) - assert_equal({"message" => "test4"}, emits[1][2]) - assert_equal({"message" => "test7"}, emits[2][2]) - assert_equal({"message" => "test8"}, emits[3][2]) - end - - def sub_test_rotate_file(config = nil) - file = Fluent::FileWrapper.open("#{TMP_DIR}/tail.txt", "wb") - file.puts "test1" - file.puts "test2" - file.flush - - d = create_driver(config) - d.run do - sleep 1 - - file.puts "test3" - file.puts "test4" + def sub_test_rotate_file(config = nil, expect_emits: nil, expect_records: nil, timeout: nil) + file = Fluent::FileWrapper.open("#{TMP_DIR}/tail.txt", "wb") + file.puts "test1" + file.puts "test2" file.flush - sleep 1 - FileUtils.mv("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail2.txt") - if block_given? - yield file - sleep 1 - else - sleep 1 - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - sleep 1 + d = create_driver(config) + d.run(expect_emits: expect_emits, expect_records: expect_records, timeout: timeout) do + size = d.emit_count + file.puts "test3" + file.puts "test4" + file.flush + sleep(0.1) until d.emit_count >= size + 1 + size = d.emit_count + + if Fluent.windows? + file.close + FileUtils.mv("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail2.txt", force: true) + file = File.open("#{TMP_DIR}/tail.txt", "ab") + else + FileUtils.mv("#{TMP_DIR}/tail.txt", "#{TMP_DIR}/tail2.txt") + end + if block_given? + yield file + else + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + sleep 1 - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| - f.puts "test5" - f.puts "test6" - } - sleep 1 + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + f.puts "test5" + f.puts "test6" + } + end end - end - d.run do - sleep 1 + d.events + ensure + file.close if file && !file.closed? end - - d.emits - ensure - file.close if file end def test_lf @@ -374,9 +413,9 @@ def test_lf sleep 1 end - emits = d.emits - assert_equal(true, emits.length > 0) - assert_equal({"message" => "test3test4"}, emits[0][2]) + events = d.events + assert_equal(true, events.length > 0) + assert_equal({"message" => "test3test4"}, events[0][2]) end def test_whitespace @@ -384,9 +423,7 @@ def test_whitespace d = create_driver - d.run do - sleep 1 - + d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts " " # 4 spaces f.puts " 4 spaces" @@ -395,328 +432,346 @@ def test_whitespace f.puts " tab" f.puts "tab " } - sleep 1 end - emits = d.emits - assert_equal(true, emits.length > 0) - assert_equal({"message" => " "}, emits[0][2]) - assert_equal({"message" => " 4 spaces"}, emits[1][2]) - assert_equal({"message" => "4 spaces "}, emits[2][2]) - assert_equal({"message" => " "}, emits[3][2]) - assert_equal({"message" => " tab"}, emits[4][2]) - assert_equal({"message" => "tab "}, emits[5][2]) + events = d.events + assert_equal(true, events.length > 0) + assert_equal({"message" => " "}, events[0][2]) + assert_equal({"message" => " 4 spaces"}, events[1][2]) + assert_equal({"message" => "4 spaces "}, events[2][2]) + assert_equal({"message" => " "}, events[3][2]) + assert_equal({"message" => " tab"}, events[4][2]) + assert_equal({"message" => "tab "}, events[5][2]) end data( - 'default encoding' => ['', Encoding::ASCII_8BIT], - 'explicit encoding config' => ['encoding utf-8', Encoding::UTF_8]) + 'flat default encoding' => [SINGLE_LINE_CONFIG, Encoding::ASCII_8BIT], + 'flat explicit encoding config' => [SINGLE_LINE_CONFIG + config_element("", "", { "encoding" => "utf-8" }), Encoding::UTF_8], + 'parse default encoding' => [PARSE_SINGLE_LINE_CONFIG, Encoding::ASCII_8BIT], + 'parse explicit encoding config' => [PARSE_SINGLE_LINE_CONFIG + config_element("", "", { "encoding" => "utf-8" }), Encoding::UTF_8]) def test_encoding(data) encoding_config, encoding = data - d = create_driver(SINGLE_LINE_CONFIG + CONFIG_READ_FROM_HEAD + encoding_config) - - d.run do - sleep 1 + d = create_driver(CONFIG_READ_FROM_HEAD + encoding_config) + d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "wb") {|f| f.puts "test" } - sleep 1 end - emits = d.emits - assert_equal(encoding, emits[0][2]['message'].encoding) + events = d.events + assert_equal(encoding, events[0][2]['message'].encoding) end def test_from_encoding - d = create_driver %[ - format /(?.*)/ - read_from_head true - from_encoding cp932 - encoding utf-8 - ] - - d.run do - sleep 1 - + conf = config_element( + "", "", { + "format" => "/(?.*)/", + "read_from_head" => "true", + "from_encoding" => "cp932", + "encoding" => "utf-8" + }) + d = create_driver(conf) + cp932_message = "\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) + utf8_message = cp932_message.encode(Encoding::UTF_8) + + d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "w:cp932") {|f| - f.puts "\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) - } - sleep 1 - end - - emits = d.emits - assert_equal("\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932).encode(Encoding::UTF_8), emits[0][2]['message']) - assert_equal(Encoding::UTF_8, emits[0][2]['message'].encoding) - end - - # multiline mode test - - def test_multiline - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - - d = create_driver %[ - format multiline - format1 /^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/ - format_firstline /^[s]/ - ] - d.run do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| - f.puts "f test1" - f.puts "s test2" - f.puts "f test3" - f.puts "f test4" - f.puts "s test5" - f.puts "s test6" - f.puts "f test7" - f.puts "s test8" + f.puts cp932_message } - sleep 1 - - emits = d.emits - assert(emits.length == 3) - assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, emits[0][2]) - assert_equal({"message1" => "test5"}, emits[1][2]) - assert_equal({"message1" => "test6", "message2" => "test7"}, emits[2][2]) - - sleep 3 - emits = d.emits - assert(emits.length == 3) end - emits = d.emits - assert(emits.length == 4) - assert_equal({"message1" => "test8"}, emits[3][2]) + events = d.events + assert_equal(utf8_message, events[0][2]['message']) + assert_equal(Encoding::UTF_8, events[0][2]['message'].encoding) end - def test_multiline_with_flush_interval - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - - d = create_driver %[ - format multiline - format1 /^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/ - format_firstline /^[s]/ - multiline_flush_interval 2s - ] - - assert_equal 2, d.instance.multiline_flush_interval - - d.run do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| - f.puts "f test1" - f.puts "s test2" - f.puts "f test3" - f.puts "f test4" - f.puts "s test5" - f.puts "s test6" - f.puts "f test7" - f.puts "s test8" - } - sleep 1 + sub_test_case "multiline" do + data(flat: MULTILINE_CONFIG, + parse: PARSE_MULTILINE_CONFIG) + def test_multiline(data) + config = data + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - emits = d.emits - assert(emits.length == 3) - assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, emits[0][2]) - assert_equal({"message1" => "test5"}, emits[1][2]) - assert_equal({"message1" => "test6", "message2" => "test7"}, emits[2][2]) + d = create_driver(config) + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + f.puts "f test1" + f.puts "s test2" + f.puts "f test3" + f.puts "f test4" + f.puts "s test5" + f.puts "s test6" + f.puts "f test7" + f.puts "s test8" + } + end - sleep 3 - emits = d.emits - assert(emits.length == 4) - assert_equal({"message1" => "test8"}, emits[3][2]) + events = d.events + assert_equal(4, events.length) + assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, events[0][2]) + assert_equal({"message1" => "test5"}, events[1][2]) + assert_equal({"message1" => "test6", "message2" => "test7"}, events[2][2]) + assert_equal({"message1" => "test8"}, events[3][2]) end - end - data( - 'default encoding' => ['', Encoding::ASCII_8BIT], - 'explicit encoding config' => ['encoding utf-8', Encoding::UTF_8]) - def test_multiline_encoding_of_flushed_record(data) - encoding_config, encoding = data + data(flat: MULTILINE_CONFIG, + parse: PARSE_MULTILINE_CONFIG) + def test_multiline_with_flush_interval(data) + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - d = create_driver %[ - format multiline - format1 /^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/ - format_firstline /^[s]/ - multiline_flush_interval 2s - read_from_head true - #{encoding_config} - ] + config = data + config_element("", "", { "multiline_flush_interval" => "2s" }) + d = create_driver(config) - d.run do - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| - f.puts "s test" - } + assert_equal 2, d.instance.multiline_flush_interval - sleep 4 - emits = d.emits - assert_equal(1, emits.length) - assert_equal(encoding, emits[0][2]['message1'].encoding) - end - end + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + f.puts "f test1" + f.puts "s test2" + f.puts "f test3" + f.puts "f test4" + f.puts "s test5" + f.puts "s test6" + f.puts "f test7" + f.puts "s test8" + } + end - def test_multiline_from_encoding_of_flushed_record - d = create_driver %[ - format multiline - format1 /^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/ - format_firstline /^[s]/ - multiline_flush_interval 2s - read_from_head true - from_encoding cp932 - encoding utf-8 - ] + events = d.events + assert_equal(4, events.length) + assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, events[0][2]) + assert_equal({"message1" => "test5"}, events[1][2]) + assert_equal({"message1" => "test6", "message2" => "test7"}, events[2][2]) + assert_equal({"message1" => "test8"}, events[3][2]) + end + + data( + 'flat default encoding' => [MULTILINE_CONFIG, Encoding::ASCII_8BIT], + 'flat explicit encoding config' => [MULTILINE_CONFIG + config_element("", "", { "encoding" => "utf-8" }), Encoding::UTF_8], + 'parse default encoding' => [PARSE_MULTILINE_CONFIG, Encoding::ASCII_8BIT], + 'parse explicit encoding config' => [PARSE_MULTILINE_CONFIG + config_element("", "", { "encoding" => "utf-8" }), Encoding::UTF_8]) + def test_multiline_encoding_of_flushed_record(data) + encoding_config, encoding = data + + config = config_element("", "", { + "multiline_flush_interval" => "2s", + "read_from_head" => "true", + }) + d = create_driver(config + encoding_config) + + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| + f.puts "s test" + } + end + events = d.events + assert_equal(1, events.length) + assert_equal(encoding, events[0][2]['message1'].encoding) + end + + def test_multiline_from_encoding_of_flushed_record + conf = MULTILINE_CONFIG + config_element( + "", "", + { + "multiline_flush_interval" => "1s", + "read_from_head" => "true", + "from_encoding" => "cp932", + "encoding" => "utf-8" + }) + d = create_driver(conf) + + cp932_message = "s \x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) + utf8_message = "\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".encode(Encoding::UTF_8, Encoding::CP932) + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "w:cp932") { |f| + f.puts cp932_message + } + end - d.run do - sleep 1 - File.open("#{TMP_DIR}/tail.txt", "w:cp932") { |f| - f.puts "s \x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932) - } + events = d.events + assert_equal(1, events.length) + assert_equal(utf8_message, events[0][2]['message1']) + assert_equal(Encoding::UTF_8, events[0][2]['message1'].encoding) + end + + data(flat: config_element( + "", "", { + "format" => "multiline", + "format1" => "/^s (?[^\\n]+)\\n?/", + "format2" => "/(f (?[^\\n]+)\\n?)?/", + "format3" => "/(f (?.*))?/", + "format_firstline" => "/^[s]/" + }), + parse: config_element( + "", "", {}, + [config_element("parse", "", { + "@type" => "multiline", + "format1" => "/^s (?[^\\n]+)\\n?/", + "format2" => "/(f (?[^\\n]+)\\n?)?/", + "format3" => "/(f (?.*))?/", + "format_firstline" => "/^[s]/" + }) + ]) + ) + def test_multiline_with_multiple_formats(data) + config = data + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - sleep 4 - emits = d.emits - assert_equal(1, emits.length) - assert_equal("\x82\xCD\x82\xEB\x81\x5B\x82\xED\x81\x5B\x82\xE9\x82\xC7".force_encoding(Encoding::CP932).encode(Encoding::UTF_8), emits[0][2]['message1']) - assert_equal(Encoding::UTF_8, emits[0][2]['message1'].encoding) - end - end + d = create_driver(config) + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + f.puts "f test1" + f.puts "s test2" + f.puts "f test3" + f.puts "f test4" + f.puts "s test5" + f.puts "s test6" + f.puts "f test7" + f.puts "s test8" + } + end - def test_multiline_with_multiple_formats - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } + events = d.events + assert(events.length > 0) + assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, events[0][2]) + assert_equal({"message1" => "test5"}, events[1][2]) + assert_equal({"message1" => "test6", "message2" => "test7"}, events[2][2]) + assert_equal({"message1" => "test8"}, events[3][2]) + end + + data(flat: config_element( + "", "", { + "format" => "multiline", + "format1" => "/^[s|f] (?.*)/", + "format_firstline" => "/^[s]/" + }), + parse: config_element( + "", "", {}, + [config_element("parse", "", { + "@type" => "multiline", + "format1" => "/^[s|f] (?.*)/", + "format_firstline" => "/^[s]/" + }) + ]) + ) + def test_multilinelog_with_multiple_paths(data) + files = ["#{TMP_DIR}/tail1.txt", "#{TMP_DIR}/tail2.txt"] + files.each { |file| File.open(file, "wb") { |f| } } - d = create_driver %[ - format multiline - format1 /^s (?[^\\n]+)\\n?/ - format2 /(f (?[^\\n]+)\\n?)?/ - format3 /(f (?.*))?/ - format_firstline /^[s]/ - ] - d.run do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| - f.puts "f test1" - f.puts "s test2" - f.puts "f test3" - f.puts "f test4" - f.puts "s test5" - f.puts "s test6" - f.puts "f test7" - f.puts "s test8" - } - sleep 1 - end + config = data + config_element("", "", { + "path" => "#{files[0]},#{files[1]}", + "tag" => "t1", + }) + d = create_driver(config, false) + d.run(expect_emits: 2) do + files.each do |file| + File.open(file, 'ab') { |f| + f.puts "f #{file} line should be ignored" + f.puts "s test1" + f.puts "f test2" + f.puts "f test3" + f.puts "s test4" + } + end + end - emits = d.emits - assert(emits.length > 0) - assert_equal({"message1" => "test2", "message2" => "test3", "message3" => "test4"}, emits[0][2]) - assert_equal({"message1" => "test5"}, emits[1][2]) - assert_equal({"message1" => "test6", "message2" => "test7"}, emits[2][2]) - assert_equal({"message1" => "test8"}, emits[3][2]) - end + events = d.events + assert_equal({"message" => "test1\nf test2\nf test3"}, events[0][2]) + assert_equal({"message" => "test1\nf test2\nf test3"}, events[1][2]) + # "test4" events are here because these events are flushed at shutdown phase + assert_equal({"message" => "test4"}, events[2][2]) + assert_equal({"message" => "test4"}, events[3][2]) + end + + data(flat: config_element("", "", { + "format" => "multiline", + "format1" => "/(?foo \\d)\\n/", + "format2" => "/(?bar \\d)\\n/", + "format3" => "/(?baz \\d)/" + }), + parse: config_element( + "", "", {}, + [config_element("parse", "", { + "@type" => "multiline", + "format1" => "/(?foo \\d)\\n/", + "format2" => "/(?bar \\d)\\n/", + "format3" => "/(?baz \\d)/" + }) + ]) + ) + def test_multiline_without_firstline(data) + File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - def test_multilinelog_with_multiple_paths - files = ["#{TMP_DIR}/tail1.txt", "#{TMP_DIR}/tail2.txt"] - files.each { |file| File.open(file, "wb") { |f| } } - - d = create_driver(%[ - path #{files[0]},#{files[1]} - tag t1 - format multiline - format1 /^[s|f] (?.*)/ - format_firstline /^[s]/ - ], false) - d.run do - files.each do |file| - File.open(file, 'ab') { |f| - f.puts "f #{file} line should be ignored" - f.puts "s test1" - f.puts "f test2" - f.puts "f test3" - f.puts "s test4" + config = data + d = create_driver(config) + d.run(expect_emits: 1) do + File.open("#{TMP_DIR}/tail.txt", "ab") { |f| + f.puts "foo 1" + f.puts "bar 1" + f.puts "baz 1" + f.puts "foo 2" + f.puts "bar 2" + f.puts "baz 2" } end - sleep 1 - end - emits = d.emits - assert_equal({"message" => "test1\nf test2\nf test3"}, emits[0][2]) - assert_equal({"message" => "test1\nf test2\nf test3"}, emits[1][2]) - # "test4" events are here because these events are flushed at shutdown phase - assert_equal({"message" => "test4"}, emits[2][2]) - assert_equal({"message" => "test4"}, emits[3][2]) - end - - def test_multiline_without_firstline - File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - - d = create_driver %[ - format multiline - format1 /(?foo \\d)\\n/ - format2 /(?bar \\d)\\n/ - format3 /(?baz \\d)/ + events = d.events + assert_equal(2, events.length) + assert_equal({"var1" => "foo 1", "var2" => "bar 1", "var3" => "baz 1"}, events[0][2]) + assert_equal({"var1" => "foo 2", "var2" => "bar 2", "var3" => "baz 2"}, events[1][2]) + end + end + + sub_test_case "path" do + # * path test + # TODO: Clean up tests + EX_RORATE_WAIT = 0 + + EX_CONFIG = config_element("", "", { + "tag" => "tail", + "path" => "test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log", + "format" => "none", + "pos_file" => "#{TMP_DIR}/tail.pos", + "read_from_head" => true, + "refresh_interval" => 30, + "rotate_wait" => "#{EX_RORATE_WAIT}s", + }) + EX_PATHS = [ + 'test/plugin/data/2010/01/20100102-030405.log', + 'test/plugin/data/log/foo/bar.log', + 'test/plugin/data/log/test.log' ] - d.run do - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| - f.puts "foo 1" - f.puts "bar 1" - f.puts "baz 1" - f.puts "foo 2" - f.puts "bar 2" - f.puts "baz 2" - } - sleep 1 - end - emits = d.emits - assert_equal(2, emits.length) - assert_equal({"var1" => "foo 1", "var2" => "bar 1", "var3" => "baz 1"}, emits[0][2]) - assert_equal({"var1" => "foo 2", "var2" => "bar 2", "var3" => "baz 2"}, emits[1][2]) - end + def test_expand_paths + plugin = create_driver(EX_CONFIG, false).instance + flexstub(Time) do |timeclass| + timeclass.should_receive(:now).with_no_args.and_return(Time.new(2010, 1, 2, 3, 4, 5)) + assert_equal EX_PATHS, plugin.expand_paths.sort + end - # * path test - # TODO: Clean up tests - EX_RORATE_WAIT = 0 - - EX_CONFIG = %[ - tag tail - path test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log - format none - pos_file #{TMP_DIR}/tail.pos - read_from_head true - refresh_interval 30 - rotate_wait #{EX_RORATE_WAIT}s - ] - EX_PATHS = [ - 'test/plugin/data/2010/01/20100102-030405.log', - 'test/plugin/data/log/foo/bar.log', - 'test/plugin/data/log/test.log' - ] - - def test_expand_paths - plugin = create_driver(EX_CONFIG, false).instance - flexstub(Time) do |timeclass| - timeclass.should_receive(:now).with_no_args.and_return(Time.new(2010, 1, 2, 3, 4, 5)) - assert_equal EX_PATHS, plugin.expand_paths.sort + # Test exclusion + exclude_config = EX_CONFIG + config_element("", "", { "exclude_path" => %Q(["#{EX_PATHS.last}"]) }) + plugin = create_driver(exclude_config, false).instance + assert_equal EX_PATHS - [EX_PATHS.last], plugin.expand_paths.sort end - - # Test exclusion - exclude_config = EX_CONFIG + " exclude_path [\"#{EX_PATHS.last}\"]" - plugin = create_driver(exclude_config, false).instance - assert_equal EX_PATHS - [EX_PATHS.last], plugin.expand_paths.sort end def test_z_refresh_watchers plugin = create_driver(EX_CONFIG, false).instance sio = StringIO.new plugin.instance_eval do - @pf = Fluent::NewTailInput::PositionFile.parse(sio) + @pf = Fluent::Plugin::TailInput::PositionFile.parse(sio) @loop = Coolio::Loop.new end flexstub(Time) do |timeclass| timeclass.should_receive(:now).with_no_args.and_return(Time.new(2010, 1, 2, 3, 4, 5), Time.new(2010, 1, 2, 3, 4, 6), Time.new(2010, 1, 2, 3, 4, 7)) - flexstub(Fluent::NewTailInput::TailWatcher) do |watcherclass| + flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| EX_PATHS.each do |path| - watcherclass.should_receive(:new).with(path, EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, true, true, 1000, any, any, any).once.and_return do + watcherclass.should_receive(:new).with(path, EX_RORATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, 1000, any, any, any).once.and_return do flexmock('TailWatcher') { |watcher| watcher.should_receive(:attach).once watcher.should_receive(:unwatched=).zero_or_more_times @@ -731,8 +786,8 @@ def test_z_refresh_watchers @tails['test/plugin/data/2010/01/20100102-030405.log'].should_receive(:close).zero_or_more_times end - flexstub(Fluent::NewTailInput::TailWatcher) do |watcherclass| - watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_RORATE_WAIT, Fluent::NewTailInput::FilePositionEntry, any, true, true, 1000, any, any, any).once.and_return do + flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| + watcherclass.should_receive(:new).with('test/plugin/data/2010/01/20100102-030406.log', EX_RORATE_WAIT, Fluent::Plugin::TailInput::FilePositionEntry, any, true, true, 1000, any, any, any).once.and_return do flexmock('TailWatcher') do |watcher| watcher.should_receive(:attach).once watcher.should_receive(:unwatched=).zero_or_more_times @@ -742,67 +797,77 @@ def test_z_refresh_watchers plugin.refresh_watchers end - flexstub(Fluent::NewTailInput::TailWatcher) do |watcherclass| + flexstub(Fluent::Plugin::TailInput::TailWatcher) do |watcherclass| watcherclass.should_receive(:new).never plugin.refresh_watchers end end end - DummyWatcher = Struct.new("DummyWatcher", :tag) + sub_test_case "receive_lines" do + DummyWatcher = Struct.new("DummyWatcher", :tag) - def test_receive_lines - plugin = create_driver(EX_CONFIG, false).instance - flexstub(plugin.router) do |engineclass| - engineclass.should_receive(:emit_stream).with('tail', any).once + def test_tag + d = create_driver(EX_CONFIG, false) + d.run {} + plugin = d.instance + mock(plugin.router).emit_stream('tail', anything).once plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log')) end - config = %[ - tag pre.* - path test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log - format none - read_from_head true - ] - plugin = create_driver(config, false).instance - flexstub(plugin.router) do |engineclass| - engineclass.should_receive(:emit_stream).with('pre.foo.bar.log', any).once + def test_tag_prefix + config = config_element("", "", { + "tag" => "pre.*", + "path" => "test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log", + "format" => "none", + "read_from_head" => true + }) + d = create_driver(config, false) + d.run {} + plugin = d.instance + mock(plugin.router).emit_stream('pre.foo.bar.log', anything).once plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log')) end - config = %[ - tag *.post - path test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log - format none - read_from_head true - ] - plugin = create_driver(config, false).instance - flexstub(plugin.router) do |engineclass| - engineclass.should_receive(:emit_stream).with('foo.bar.log.post', any).once + def test_tag_suffix + config = config_element("", "", { + "tag" => "*.post", + "path" => "test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log", + "format" => "none", + "read_from_head" => true + }) + d = create_driver(config, false) + d.run {} + plugin = d.instance + mock(plugin.router).emit_stream('foo.bar.log.post', anything).once plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log')) end - config = %[ - tag pre.*.post - path test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log - format none - read_from_head true - ] - plugin = create_driver(config, false).instance - flexstub(plugin.router) do |engineclass| - engineclass.should_receive(:emit_stream).with('pre.foo.bar.log.post', any).once + def test_tag_prefix_and_suffix + config = config_element("", "", { + "tag" => "pre.*.post", + "path" => "test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log", + "format" => "none", + "read_from_head" => true + }) + d = create_driver(config, false) + d.run {} + plugin = d.instance + mock(plugin.router).emit_stream('pre.foo.bar.log.post', anything).once plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log')) end - config = %[ - tag pre.*.post*ignore - path test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log - format none - read_from_head true - ] - plugin = create_driver(config, false).instance - flexstub(plugin.router) do |engineclass| - engineclass.should_receive(:emit_stream).with('pre.foo.bar.log.post', any).once + def test_tag_prefix_and_suffix_ignore + config = config_element("", "", { + "tag" => "pre.*.post*ignore", + "path" => "test/plugin/*/%Y/%m/%Y%m%d-%H%M%S.log,test/plugin/data/log/**/*.log", + "format" => "none", + "read_from_head" => true + }) + d = create_driver(config, false) + d.run {} + plugin = d.instance + mock(plugin.router).emit_stream('pre.foo.bar.log.post', anything).once plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log')) end end @@ -817,70 +882,26 @@ def test_missing_file # Try two different configs - one with read_from_head and one without, # since their interactions with the filesystem differ. - config1 = %[ - tag t1 - path #{TMP_DIR}/non_existent_file.txt,#{TMP_DIR}/tail.txt - format none - rotate_wait 2s - pos_file #{TMP_DIR}/tail.pos - ] - config2 = config1 + ' read_from_head true' + config1 = config_element("", "", { + "tag" => "t1", + "path" => "#{TMP_DIR}/non_existent_file.txt,#{TMP_DIR}/tail.txt", + "format" => "none", + "rotate_wait" => "2s", + "pos_file" => "#{TMP_DIR}/tail.pos" + }) + config2 = config1 + config_element("", "", { "read_from_head" => true }) [config1, config2].each do |config| d = create_driver(config, false) - d.run do - sleep 1 + d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } - sleep 1 end - emits = d.emits - assert_equal(2, emits.length) - assert_equal({"message" => "test3"}, emits[0][2]) - assert_equal({"message" => "test4"}, emits[1][2]) - end - end - - sub_test_case 'emit error cases' do - def test_emit_error_with_buffer_queue_limit_error - emits = execute_test(Fluent::Plugin::Buffer::BufferOverflowError, "buffer space has too many data") - assert_equal(10, emits.length) - 10.times { |i| - assert_equal({"message" => "test#{i}"}, emits[i][2]) - } - end - - def test_emit_error_with_non_buffer_queue_limit_error - emits = execute_test(StandardError, "non BufferQueueLimitError error") - assert_true(emits.size > 0 && emits.size != 10) - emits.size.times { |i| - assert_equal({"message" => "test#{10 - emits.size + i}"}, emits[i][2]) - } - end - - def execute_test(error_class, error_message) - d = create_driver(CONFIG_READ_FROM_HEAD + SINGLE_LINE_CONFIG) - # Use define_singleton_method instead of d.emit_stream to capture local variable - d.define_singleton_method(:emit_stream) do |tag, es| - @test_num_errors ||= 0 - if @test_num_errors < 5 - @test_num_errors += 1 - raise error_class, error_message - else - @emit_streams << [tag, es.to_a] - end - end - - d.run do - 10.times { |i| - File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "test#{i}" } - sleep 0.5 - } - sleep 1 - end - - d.emits + events = d.events + assert_equal(2, events.length) + assert_equal({"message" => "test3"}, events[0][2]) + assert_equal({"message" => "test4"}, events[1][2]) end end @@ -891,21 +912,18 @@ def test_tail_path_with_singleline f.puts "test2" } - d = create_driver(%[path_key path] + SINGLE_LINE_CONFIG) - - d.run do - sleep 1 + d = create_driver(SINGLE_LINE_CONFIG + config_element("", "", { "path_key" => "path" })) + d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "ab") {|f| f.puts "test3" f.puts "test4" } - sleep 1 end - emits = d.emits - assert_equal(true, emits.length > 0) - emits.each do |emit| + events = d.events + assert_equal(true, events.length > 0) + events.each do |emit| assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"]) end end @@ -913,13 +931,14 @@ def test_tail_path_with_singleline def test_tail_path_with_multiline_with_firstline File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - d = create_driver %[ - path_key path - format multiline - format1 /^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/ - format_firstline /^[s]/ - ] - d.run do + config = config_element("", "", { + "path_key" => "path", + "format" => "multiline", + "format1" => "/^s (?[^\\n]+)(\\nf (?[^\\n]+))?(\\nf (?.*))?/", + "format_firstline" => "/^[s]/" + }) + d = create_driver(config) + d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "f test1" f.puts "s test2" @@ -930,12 +949,11 @@ def test_tail_path_with_multiline_with_firstline f.puts "f test7" f.puts "s test8" } - sleep 1 end - emits = d.emits - assert(emits.length == 4) - emits.each do |emit| + events = d.events + assert_equal(4, events.length) + events.each do |emit| assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"]) end end @@ -943,25 +961,25 @@ def test_tail_path_with_multiline_with_firstline def test_tail_path_with_multiline_without_firstline File.open("#{TMP_DIR}/tail.txt", "wb") { |f| } - d = create_driver %[ - path_key path - format multiline - format1 /(?foo \\d)\\n/ - format2 /(?bar \\d)\\n/ - format3 /(?baz \\d)/ - ] - d.run do + config = config_element("", "", { + "path_key" => "path", + "format" => "multiline", + "format1" => "/(?foo \\d)\\n/", + "format2" => "/(?bar \\d)\\n/", + "format3" => "/(?baz \\d)/", + }) + d = create_driver(config) + d.run(expect_emits: 1) do File.open("#{TMP_DIR}/tail.txt", "ab") { |f| f.puts "foo 1" f.puts "bar 1" f.puts "baz 1" } - sleep 1 end - emits = d.emits - assert(emits.length > 0) - emits.each do |emit| + events = d.events + assert(events.length > 0) + events.each do |emit| assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"]) end end @@ -970,15 +988,16 @@ def test_tail_path_with_multiline_with_multiple_paths files = ["#{TMP_DIR}/tail1.txt", "#{TMP_DIR}/tail2.txt"] files.each { |file| File.open(file, "wb") { |f| } } - d = create_driver(%[ - path #{files[0]},#{files[1]} - path_key path - tag t1 - format multiline - format1 /^[s|f] (?.*)/ - format_firstline /^[s]/ - ], false) - d.run do + config = config_element("", "", { + "path" => "#{files[0]},#{files[1]}", + "path_key" => "path", + "tag" => "t1", + "format" => "multiline", + "format1" => "/^[s|f] (?.*)/", + "format_firstline" => "/^[s]/" + }) + d = create_driver(config, false) + d.run(expect_emits: 2) do files.each do |file| File.open(file, 'ab') { |f| f.puts "f #{file} line should be ignored" @@ -988,14 +1007,13 @@ def test_tail_path_with_multiline_with_multiple_paths f.puts "s test4" } end - sleep 1 end - emits = d.emits - assert(emits.length == 4) - assert_equal(files, [emits[0][2]["path"], emits[1][2]["path"]].sort) + events = d.events + assert_equal(4, events.length) + assert_equal(files, [events[0][2]["path"], events[1][2]["path"]].sort) # "test4" events are here because these events are flushed at shutdown phase - assert_equal(files, [emits[2][2]["path"], emits[3][2]["path"]].sort) + assert_equal(files, [events[2][2]["path"], events[3][2]["path"]].sort) end end end