Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: test: in_tail: Add tests for rotation with follow_inodes #4207

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 258 additions & 0 deletions test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2638,4 +2638,262 @@ def test_lines_collected_with_no_throttling(data)
end
end
end

# https://github.com/fluent/fluentd/issues/3614
sub_test_case "Update watchers for rotation with follow_inodes" do
def test_updateTW_before_refreshTW_and_detach_before_refreshTW
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to detach the old watcher quickly.
"rotate_wait" => "1s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 4, timeout: 10) do
# Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation)
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
d.instance.refresh_watchers

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end

def test_updateTW_before_refreshTW_and_detach_after_refreshTW
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to detach the old watcher after refresh_watchers.
"rotate_wait" => "4s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 4, timeout: 10) do
# Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation)
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
sleep 2

# This reproduces the following situation:
# Rotation => update_watcher => refresh_watchers
# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
d.instance.refresh_watchers

# The old TailWathcer is detached here since `rotate_wait` is `4s`.
sleep 3

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end

def test_updateTW_after_refreshTW
config = config_element(
"ROOT",
"",
{
"path" => "#{@tmp_dir}/tail.txt*",
"pos_file" => "#{@tmp_dir}/tail.pos",
"tag" => "t1",
"format" => "none",
"read_from_head" => "true",
"follow_inodes" => "true",
# In order to detach the old watcher quickly.
"rotate_wait" => "1s",
# In order to reproduce the same condition stably, ensure that `refresh_watchers` is not
# called by a timer.
"refresh_interval" => "1h",
# stat_watcher often calls `TailWatcher::on_notify` faster than creating a new log file,
# so disable it in order to reproduce the same condition stably.
"enable_stat_watcher" => "false",
}
)
d = create_driver(config, false)

tail_watchers = []
stub.proxy(d.instance).setup_watcher do |tw|
tail_watchers.append(tw)
tw
end

Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file1 log1"}

d.run(expect_records: 4, timeout: 10) do
# Rotate (If the timing is bad, `TailWatcher::on_notify` might be called between mv and new-file-creation)
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file1 log2"}
FileUtils.move("#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt" + "1")
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") {|f| f.puts "file2 log1"}

# This reproduces the following situation:
# Rotation => refresh_watchers => update_watcher
# This add a new TailWatcher: TailWatcher(path: "tail.txt", inode: inode_1)
# This overwrites `@tails["tail.txt"]`
d.instance.refresh_watchers

# `watch_timer` calls `TailWatcher::on_notify`, and then `update_watcher` updates the TailWatcher:
# TailWatcher(path: "tail.txt", inode: inode_0) => TailWatcher(path: "tail.txt", inode: inode_1)
# The old TailWathcer is detached here since `rotate_wait` is just `1s`.
sleep 3

# This adds a new TailWatcher: TailWatcher(path: "tail.txt1", inode: inode_0)
d.instance.refresh_watchers

# Append to the new current log file.
Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "ab") {|f| f.puts "file2 log2"}
end

inode_0 = tail_watchers[0].ino
inode_1 = tail_watchers[1].ino
record_values = d.events.collect { |event| event[2]["message"] }.sort
position_entries = []
Fluent::FileWrapper.open("#{@tmp_dir}/tail.pos", "r") do |f|
f.readlines(chomp: true).each do |line|
values = line.split("\t")
position_entries.append([values[0], values[1], values[2].to_i(16)])
end
end

assert_equal(
{
record_values: ["file1 log1", "file1 log2", "file2 log1", "file2 log2"],
tail_watcher_paths: ["#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt", "#{@tmp_dir}/tail.txt1"],
tail_watcher_inodes: [inode_0, inode_1, inode_0],
tail_watcher_io_handler_opened_statuses: [false, false, false],
position_entries: [
["#{@tmp_dir}/tail.txt", "ffffffffffffffff", inode_0],
["#{@tmp_dir}/tail.txt", "0000000000000016", inode_1],
],
},
{
record_values: record_values,
tail_watcher_paths: tail_watchers.collect { |tw| tw.path },
tail_watcher_inodes: tail_watchers.collect { |tw| tw.ino },
tail_watcher_io_handler_opened_statuses: tail_watchers.collect { |tw| tw.instance_variable_get(:@io_handler)&.opened? || false },
position_entries: position_entries
},
)
end
end
end