Skip to content

Commit

Permalink
Merge pull request #1874 from fluent/ignore-broken-file-chunks
Browse files Browse the repository at this point in the history
buf_file: Skip and delete broken file chunks during resume. fix #1760
  • Loading branch information
repeatedly authored Mar 2, 2018
2 parents 20bfab1 + cb70a44 commit 5c7c32a
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 3 deletions.
14 changes: 13 additions & 1 deletion lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ def resume
next
end

chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
begin
chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
rescue Fluent::Plugin::Buffer::FileChunk::FileChunkError => e
handle_broken_files(path, mode, e)
next
end

case chunk.state
when :staged
stage[chunk.metadata] = chunk
Expand All @@ -167,6 +173,12 @@ def generate_chunk(metadata)

return chunk
end

def handle_broken_files(path, mode, e)
log.error "found broken chunk file during resume. Deleted corresponding files:", :path => path, :mode => mode, :err_msg => e.message
# After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
File.unlink(path, path + '.meta') rescue nil
end
end
end
end
21 changes: 19 additions & 2 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ module Fluent
module Plugin
class Buffer
class FileChunk < Chunk
class FileChunkError < StandardError; end

### buffer path user specified : /path/to/directory/user_specified_prefix.*.log
### buffer chunk path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log
### buffer chunk metadata path : /path/to/directory/user_specified_prefix.b513b61c9791029c2513b61c9791029c2.log.meta
Expand Down Expand Up @@ -309,6 +311,8 @@ def load_existing_staged_chunk(path)
# staging buffer chunk without metadata is classic buffer chunk file
# and it should be enqueued immediately
if File.exist?(@meta_path)
raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero?

@chunk = File.open(@path, 'rb+')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.sync = true
Expand All @@ -319,7 +323,13 @@ def load_existing_staged_chunk(path)
@meta.set_encoding(Encoding::ASCII_8BIT)
@meta.sync = true
@meta.binmode
restore_metadata(@meta.read)
begin
restore_metadata(@meta.read)
rescue => e
@chunk.close
@meta.close
raise FileChunkError, "staged meta file is broken. #{e.message}"
end
@meta.seek(0, IO::SEEK_SET)

@state = :staged
Expand All @@ -345,6 +355,8 @@ def load_existing_staged_chunk(path)

def load_existing_enqueued_chunk(path)
@path = path
raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero?

@chunk = File.open(@path, 'rb')
@chunk.set_encoding(Encoding::ASCII_8BIT)
@chunk.binmode
Expand All @@ -354,7 +366,12 @@ def load_existing_enqueued_chunk(path)

@meta_path = @path + '.meta'
if File.readable?(@meta_path)
restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
begin
restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
rescue => e
@chunk.close
raise FileChunkError, "enqueued meta file is broken. #{e.message}"
end
else
restore_metadata_partially(@chunk)
end
Expand Down
124 changes: 124 additions & 0 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -840,4 +840,128 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
assert File.exist?(@not_chunk)
end
end

sub_test_case 'there are existing broken file chunks' do
setup do
@bufdir = File.expand_path('../../tmp/broken_buffer_file', __FILE__)
FileUtils.mkdir_p @bufdir unless File.exist?(@bufdir)
@bufpath = File.join(@bufdir, 'broken_test.*.log')

Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
end

teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end

def create_first_chunk(mode)
cid = Fluent::UniqueId.generate
path = File.join(@bufdir, "broken_test.#{mode}#{Fluent::UniqueId.hex(cid)}.log")
File.open(path, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:00:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:00:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:00:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 14:00:28 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
path + '.meta', cid, metadata(timekey: event_time('2016-04-17 14:00:00 -0700').to_i),
4, event_time('2016-04-17 14:00:00 -0700').to_i, event_time('2016-04-17 14:00:28 -0700').to_i
)

return cid, path
end

def create_second_chunk(mode)
cid = Fluent::UniqueId.generate
path = File.join(@bufdir, "broken_test.#{mode}#{Fluent::UniqueId.hex(cid)}.log")
File.open(path, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 14:01:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 14:01:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 14:01:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
write_metadata(
path + '.meta', cid, metadata(timekey: event_time('2016-04-17 14:01:00 -0700').to_i),
3, event_time('2016-04-17 14:01:00 -0700').to_i, event_time('2016-04-17 14:01:25 -0700').to_i
)

return cid, path
end

def compare_staged_chunk(staged, id, time, num, mode)
assert_equal 1, staged.size
m = metadata(timekey: event_time(time).to_i)
assert_equal id, staged[m].unique_id
assert_equal num, staged[m].size
assert_equal mode, staged[m].state
end

def compare_queued_chunk(queued, id, num, mode)
assert_equal 1, queued.size
assert_equal id, queued[0].unique_id
assert_equal num, queued[0].size
assert_equal mode, queued[0].state
end

def compare_log(plugin, msg)
logs = plugin.log.out.logs
assert { logs.any? { |log| log.include?(msg) } }
end

test '#resume ignores staged empty chunk' do
_, p1 = create_first_chunk('b')
File.open(p1, 'wb') { |f| } # create staged empty chunk file
c2id, _ = create_second_chunk('b')

@p.start
compare_staged_chunk(@p.stage, c2id, '2016-04-17 14:01:00 -0700', 3, :staged)
compare_log(@p, 'staged file chunk is empty')
end

test '#resume ignores staged broken metadata' do
c1id, _ = create_first_chunk('b')
_, p2 = create_second_chunk('b')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file

@p.start
compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged)
compare_log(@p, 'staged meta file is broken')
end

test '#resume ignores enqueued empty chunk' do
_, p1 = create_first_chunk('q')
File.open(p1, 'wb') { |f| } # create enqueued empty chunk file
c2id, _ = create_second_chunk('q')

@p.start
compare_queued_chunk(@p.queue, c2id, 3, :queued)
compare_log(@p, 'enqueued file chunk is empty')
end

test '#resume ignores enqueued broken metadata' do
c1id, _ = create_first_chunk('q')
_, p2 = create_second_chunk('q')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create enqueued broken meta file

@p.start
compare_queued_chunk(@p.queue, c1id, 4, :queued)
compare_log(@p, 'enqueued meta file is broken')
end
end
end

0 comments on commit 5c7c32a

Please sign in to comment.