Skip to content

Commit

Permalink
Merge pull request #2381 from benwh/configurable-json-stream-buffer
Browse files Browse the repository at this point in the history
parser_json: Add stream_buffer_size config param
  • Loading branch information
repeatedly authored Apr 25, 2019
2 parents 97ece65 + 811390d commit 73a4f01
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/fluent/plugin/out_exec_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class ExecFilterOutput < Output
COMPAT_PARSE_PARAMS = {
'out_format' => '@type',
'out_keys' => 'keys',
'out_stream_buffer_size' => 'stream_buffer_size',
}
COMPAT_EXTRACT_PARAMS = {
'out_tag_key' => 'tag_key',
Expand Down
8 changes: 7 additions & 1 deletion lib/fluent/plugin/parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class JSONParser < Parser
desc 'Set JSON parser'
config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj

# The Yajl library defines a default buffer size of 8KiB when parsing
# from IO streams, so maintain this for backwards-compatibility.
# https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse
desc 'Set the buffer size that Yajl will use when parsing streaming input'
config_param :stream_buffer_size, :integer, default: 8192

config_set_default :time_type, :float

def configure(conf)
Expand Down Expand Up @@ -81,7 +87,7 @@ def parse_io(io, &block)
y.on_parse_complete = ->(record){
block.call(parse_time(record), record)
}
y.parse(io)
y.parse(io, @stream_buffer_size)
end
end
end
Expand Down
6 changes: 6 additions & 0 deletions test/plugin/test_out_exec_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ def create_driver(conf)
</format>
<parse>
@type json
stream_buffer_size 1
</parse>
<extract>
tag_key tag
Expand All @@ -338,6 +339,7 @@ def create_driver(conf)
command cat
in_keys message
out_format json
out_stream_buffer_size 1
time_key time
tag_key tag
]
Expand Down Expand Up @@ -372,6 +374,7 @@ def create_driver(conf)
</format>
<parse>
@type json
stream_buffer_size 1
</parse>
<extract>
tag_key tag
Expand All @@ -382,6 +385,7 @@ def create_driver(conf)
command cat
in_keys message
out_format json
out_stream_buffer_size 1
time_key time
tag_key tag
]
Expand Down Expand Up @@ -414,6 +418,7 @@ def create_driver(conf)
</format>
<parse>
@type json
stream_buffer_size 1
</parse>
<extract>
tag_key tag
Expand All @@ -426,6 +431,7 @@ def create_driver(conf)
command cat
in_keys message
out_format json
out_stream_buffer_size 1
time_key time
time_format %d/%b/%Y %H:%M:%S.%N %z
tag_key tag
Expand Down
24 changes: 24 additions & 0 deletions test/plugin/test_parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,28 @@ def test_parse_with_keep_time_key_without_time_format(data)
assert_equal text, record['time']
end
end

def test_yajl_parse_io_with_buffer_smaller_than_input
parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::JSONParser)
parser.configure(
'keep_time_key' => 'true',
'json_parser' => 'yajl',
'stream_buffer_size' => 1,
)
text = "100"

waiting(5) do
rd, wr = IO.pipe
wr.write "{\"time\":\"#{text}\"}"

parser.instance.parse_io(rd) do |time, record|
assert_equal text.to_i, time.sec
assert_equal text, record['time']

# Once a record has been received the 'write' end of the pipe must be
# closed, otherwise the test will block waiting for more input.
wr.close
end
end
end
end

0 comments on commit 73a4f01

Please sign in to comment.