Skip to content

Commit

Permalink
Showing 15 changed files with 213 additions and 192 deletions.
58 changes: 38 additions & 20 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
@@ -64,8 +64,8 @@ def time_slice_format=(format)
@_time_slice_format = format
end

def timekey_range=(range)
@_timekey_range = range
def timekey=(unit)
@_timekey = unit
end

def timezone=(tz)
@@ -87,7 +87,7 @@ def assume_timekey!
# file creation time is assumed in the time range of that time slice
# because the first record should be in that range.
time_int = self.created_at.to_i
self.metadata.timekey = time_int - (time_int % @_timekey_range)
self.metadata.timekey = time_int - (time_int % @_timekey)
end
end

@@ -116,7 +116,7 @@ def write(chunk)
chunk.extend(ChunkSizeCompatMixin)
chunk.extend(AddTimeSliceKeyToChunkMixin)
chunk.time_slice_format = @time_slice_format
chunk.timekey_range = @_timekey_range
chunk.timekey = @_timekey
chunk.timezone = @timezone
chunk.assume_timekey!
super
@@ -209,12 +209,18 @@ def configure(conf)
config_style = (bufconf ? :v1 : :v0)
if config_style == :v0
buf_params = {
"flush_mode" => "fast",
"flush_mode" => "interval",
"retry_type" => "exponential_backoff",
}
PARAMS_MAP.each do |older, newer|
next unless newer
buf_params[newer] = conf[older] if conf.has_key?(older)
if conf.has_key?(older)
if older == 'buffer_queue_full_action' && conf[older] == 'exception'
buf_params[newer] = 'throw_exception'
else
buf_params[newer] = conf[older]
end
end
end

conf.elements << Fluent::Config::Element.new('buffer', '', buf_params, [])
@@ -325,12 +331,18 @@ def configure(conf)
config_style = (bufconf ? :v1 : :v0)
if config_style == :v0
buf_params = {
"flush_mode" => "fast",
"flush_mode" => "interval",
"retry_type" => "exponential_backoff",
}
PARAMS_MAP.each do |older, newer|
next unless newer
buf_params[newer] = conf[older] if conf.has_key?(older)
if conf.has_key?(older)
if older == 'buffer_queue_full_action' && conf[older] == 'exception'
buf_params[newer] = 'throw_exception'
else
buf_params[newer] = conf[older]
end
end
end

conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, [])
@@ -444,12 +456,18 @@ def configure(conf)
if config_style == :v0
buf_params = {
"@type" => "file",
"flush_mode" => (conf['flush_interval'] ? "fast" : "none"),
"flush_mode" => (conf['flush_interval'] ? "interval" : "lazy"),
"retry_type" => "exponential_backoff",
}
PARAMS_MAP.each do |older, newer|
next unless newer
buf_params[newer] = conf[older] if conf.has_key?(older)
if conf.has_key?(older)
if older == 'buffer_queue_full_action' && conf[older] == 'exception'
buf_params[newer] = 'throw_exception'
else
buf_params[newer] = conf[older]
end
end
end
unless buf_params.has_key?("@type")
buf_params["@type"] = "file"
@@ -469,16 +487,16 @@ def configure(conf)
@localtime = false
end

@_timekey_range = case conf['time_slice_format']
when /\%S/ then 1
when /\%M/ then 60
when /\%H/ then 3600
when /\%d/ then 86400
when nil then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d'
else
raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long"
end
buf_params["timekey_range"] = @_timekey_range
@_timekey = case conf['time_slice_format']
when /\%S/ then 1
when /\%M/ then 60
when /\%H/ then 3600
when /\%d/ then 86400
when nil then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d'
else
raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long"
end
buf_params["timekey"] = @_timekey

conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, [])
end
8 changes: 4 additions & 4 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
@@ -27,17 +27,17 @@ class FileBuffer < Fluent::Plugin::Buffer

include SystemConfig::Mixin

DEFAULT_CHUNK_BYTES_LIMIT = 256 * 1024 * 1024 # 256MB
DEFAULT_TOTAL_BYTES_LIMIT = 64 * 1024 * 1024 * 1024 # 64GB, same with v0.12 (TimeSlicedOutput + buf_file)
DEFAULT_CHUNK_LIMIT_SIZE = 256 * 1024 * 1024 # 256MB
DEFAULT_TOTAL_LIMIT_SIZE = 64 * 1024 * 1024 * 1024 # 64GB, same with v0.12 (TimeSlicedOutput + buf_file)

DIR_PERMISSION = 0755

# TODO: buffer_path based on system config
desc 'The path where buffer chunks are stored.'
config_param :path, :string

config_set_default :chunk_bytes_limit, DEFAULT_CHUNK_BYTES_LIMIT
config_set_default :total_bytes_limit, DEFAULT_TOTAL_BYTES_LIMIT
config_set_default :chunk_limit_size, DEFAULT_CHUNK_LIMIT_SIZE
config_set_default :total_limit_size, DEFAULT_TOTAL_LIMIT_SIZE

config_param :file_permission, :string, default: nil # '0644'
config_param :dir_permission, :string, default: nil # '0755'
22 changes: 11 additions & 11 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
@@ -33,17 +33,17 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than

MINIMUM_APPEND_ATTEMPT_RECORDS = 10

DEFAULT_CHUNK_BYTES_LIMIT = 8 * 1024 * 1024 # 8MB
DEFAULT_TOTAL_BYTES_LIMIT = 512 * 1024 * 1024 # 512MB, same with v0.12 (BufferedOutput + buf_memory: 64 x 8MB)
DEFAULT_CHUNK_LIMIT_SIZE = 8 * 1024 * 1024 # 8MB
DEFAULT_TOTAL_LIMIT_SIZE = 512 * 1024 * 1024 # 512MB, same with v0.12 (BufferedOutput + buf_memory: 64 x 8MB)

DEFAULT_CHUNK_FULL_THRESHOLD = 0.95

configured_in :buffer

# TODO: system total buffer bytes limit by SystemConfig
# TODO: system total buffer limit size in bytes by SystemConfig

config_param :chunk_bytes_limit, :size, default: DEFAULT_CHUNK_BYTES_LIMIT
config_param :total_bytes_limit, :size, default: DEFAULT_TOTAL_BYTES_LIMIT
config_param :chunk_limit_size, :size, default: DEFAULT_CHUNK_LIMIT_SIZE
config_param :total_limit_size, :size, default: DEFAULT_TOTAL_LIMIT_SIZE

# If user specify this value and (chunk_size * queue_length) is smaller than total_size,
# then total_size is automatically configured to that value
@@ -64,8 +64,8 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
def initialize
super

@chunk_bytes_limit = nil
@total_bytes_limit = nil
@chunk_limit_size = nil
@total_limit_size = nil
@queue_length_limit = nil
@chunk_records_limit = nil

@@ -86,7 +86,7 @@ def configure(conf)
super

unless @queue_length_limit.nil?
@total_bytes_limit = @chunk_bytes_limit * @queue_length_limit
@total_limit_size = @chunk_limit_size * @queue_length_limit
end
end

@@ -128,7 +128,7 @@ def terminate
end

def storable?
@total_bytes_limit > @stage_size + @queue_size
@total_limit_size > @stage_size + @queue_size
end

## TODO: for back pressure feature
@@ -348,11 +348,11 @@ def clear_queue!
end

def chunk_size_over?(chunk)
chunk.bytesize > @chunk_bytes_limit || (@chunk_records_limit && chunk.size > @chunk_records_limit)
chunk.bytesize > @chunk_limit_size || (@chunk_records_limit && chunk.size > @chunk_records_limit)
end

def chunk_size_full?(chunk)
chunk.bytesize >= @chunk_bytes_limit * @chunk_full_threshold || (@chunk_records_limit && chunk.size >= @chunk_records_limit * @chunk_full_threshold)
chunk.bytesize >= @chunk_limit_size * @chunk_full_threshold || (@chunk_records_limit && chunk.size >= @chunk_records_limit * @chunk_full_threshold)
end

class ShouldRetry < StandardError; end
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_buffered_null.rb
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ class BufferedNullOutput < Output
config_section :buffer do
config_set_default :chunk_keys, ['tag']
config_set_default :flush_at_shutdown, true
config_set_default :chunk_bytes_limit, 10 * 1024
config_set_default :chunk_limit_size, 10 * 1024
end

attr_accessor :feed_proc, :delayed
Loading

0 comments on commit 2acc67c

Please sign in to comment.