Skip to content

Commit

Permalink
filter_parser: Transition from single record to stream (#4620)
Browse files Browse the repository at this point in the history
Signed-off-by: Athish Pranav D <[email protected]>
  • Loading branch information
Athishpranav2003 authored Nov 27, 2024
1 parent 4de960b commit 2d8c9d4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 51 deletions.
78 changes: 27 additions & 51 deletions lib/fluent/plugin/filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,32 @@ def configure(conf)
@parser = parser_create
end

FAILED_RESULT = [nil, nil].freeze # reduce allocation cost
REPLACE_CHAR = '?'.freeze

def filter_with_time(tag, time, record)
raw_value = @accessor.call(record)
if raw_value.nil?
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
end
if @reserve_data
return time, handle_parsed(tag, record, time, {})
else
return FAILED_RESULT
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each do |time, record|
begin
raw_value = @accessor.call(record)
if raw_value.nil?
new_es.add(time, handle_parsed(tag, record, time, {})) if @reserve_data
raise ArgumentError, "#{@key_name} does not exist"
else
filter_one_record(tag, time, record, raw_value) do |result_time, result_record|
new_es.add(result_time, result_record)
end
end
rescue => e
router.emit_error_event(tag, time, record, e) if @emit_invalid_record_to_error
end
end
begin
# Note: https://github.com/fluent/fluentd/issues/4100
# If the parser returns multiple records from one raw_value,
# this returns only the first one record.
# This should be fixed in the future version.
result_time = nil
result_record = nil
new_es
end

private

def filter_one_record(tag, time, record, raw_value)
begin
@parser.parse(raw_value) do |t, values|
if values
t = if @reserve_time
Expand All @@ -85,55 +88,28 @@ def filter_with_time(tag, time, record)
t.nil? ? time : t
end
@accessor.delete(record) if @remove_key_name_field
r = handle_parsed(tag, record, t, values)

if result_record.nil?
result_time = t
result_record = r
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
"Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
))
end
end
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
end

router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'")) if @emit_invalid_record_to_error
next unless @reserve_data
next unless result_record.nil?

result_time = time
result_record = handle_parsed(tag, record, time, {})
t = time
values = {}
end
yield(t, handle_parsed(tag, record, t, values))
end

return result_time, result_record
rescue Fluent::Plugin::Parser::ParserError => e
if @emit_invalid_record_to_error
raise e
else
return FAILED_RESULT
end
raise e
rescue ArgumentError => e
raise unless @replace_invalid_sequence
raise unless e.message.index("invalid byte sequence in") == 0

raw_value = raw_value.scrub(REPLACE_CHAR)
retry
rescue => e
if @emit_invalid_record_to_error
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
else
return FAILED_RESULT
end
raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
end
end

private

def handle_parsed(tag, record, t, values)
if values && @inject_key_prefix
values = Hash[values.map { |k, v| [@inject_key_prefix + k, v] }]
Expand Down
17 changes: 17 additions & 0 deletions test/plugin/test_filter_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,23 @@ def test_filter

end

def test_filter_with_multiple_records
d1 = create_driver(%[
key_name data
<parse>
@type json
</parse>
])
time = Fluent::EventTime.from_time(@default_time)
d1.run(default_tag: @tag) do
d1.feed(time, {'data' => '[{"xxx_1":"first","yyy":"second"}, {"xxx_2":"first", "yyy_2":"second"}]'})
end
filtered = d1.filtered
assert_equal 2, filtered.length
assert_equal ({"xxx_1"=>"first", "yyy"=>"second"}), filtered[0][1]
assert_equal ({"xxx_2"=>"first", "yyy_2"=>"second"}), filtered[1][1]
end

data(:keep_key_name => false,
:remove_key_name => true)
def test_filter_with_reserved_data(remove_key_name)
Expand Down

0 comments on commit 2d8c9d4

Please sign in to comment.