Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: fluent/fluentd
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: bcfc08dc2284625c659795cec9b59850240d412a
Choose a base ref
..
head repository: fluent/fluentd
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 3ac22647413b25a241f6baf360227567b72da40a
Choose a head ref
2 changes: 1 addition & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("win32-event", ["~> 0.6.1"])
gem.add_runtime_dependency("windows-pr", ["~> 1.2.3"])
end
gem.add_runtime_dependency("strptime", [">= 0.1.3"])
gem.add_runtime_dependency("strptime", [">= 0.1.7"])

gem.add_development_dependency("rake", [">= 0.9.2"])
gem.add_development_dependency("flexmock", ["~> 1.3.3"])
42 changes: 37 additions & 5 deletions lib/fluent/config/element.rb
Original file line number Diff line number Diff line change
@@ -108,11 +108,7 @@ def to_s(nest = 0)
out << "#{indent}<#{@name} #{@arg}>\n"
end
each_pair { |k, v|
if secret_param?(k)
out << "#{nindent}#{k} xxxxxx\n"
else
out << "#{nindent}#{k} #{v}\n"
end
out << dump_value(k, v, indent, nindent)
}
@elements.each { |e|
out << e.to_s(nest + 1)
@@ -124,6 +120,8 @@ def to_s(nest = 0)
def to_masked_element
new_elems = @elements.map { |e| e.to_masked_element }
new_elem = Element.new(@name, @arg, {}, new_elems, @unused)
new_elem.v1_config = @v1_config
new_elem.corresponding_proxies = @corresponding_proxies
each_pair { |k, v|
new_elem[k] = secret_param?(k) ? 'xxxxxx' : v
}
@@ -144,6 +142,40 @@ def secret_param?(key)
false
end

def param_type(key)
return nil if @corresponding_proxies.empty?

param_key = key.to_sym
proxy = @corresponding_proxies.detect do |_proxy|
_proxy.params.has_key?(param_key)
end
return nil unless proxy
_block, opts = proxy.params[param_key]
opts[:type]
end

def dump_value(k, v, indent, nindent)
if secret_param?(k)
"#{nindent}#{k} xxxxxx\n"
else
if @v1_config
case param_type(k)
when :string
"#{nindent}#{k} \"#{self.class.unescape_parameter(v)}\"\n"
when :enum, :integer, :float, :size, :bool, :time
"#{nindent}#{k} #{v}\n"
when :hash, :array
"#{nindent}#{k} #{v}\n"
else
# Unknown type
"#{nindent}#{k} #{v}\n"
end
else
"#{nindent}#{k} #{v}\n"
end
end
end

def self.unescape_parameter(v)
result = ''
v.each_char { |c| result << LiteralParser.unescape_char(c) }
9 changes: 5 additions & 4 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
@@ -16,19 +16,21 @@

require 'socket'

require 'msgpack'
require 'cool.io'

require 'fluent/config'
require 'fluent/event'
require 'fluent/event_router'
require 'fluent/msgpack_factory'
require 'fluent/root_agent'
require 'fluent/time'
require 'fluent/system_config'
require 'fluent/plugin'

module Fluent
class EngineClass
include Fluent::MessagePackFactory::Mixin

def initialize
@root_agent = nil
@event_router = nil
@@ -41,8 +43,6 @@ def initialize

@suppress_config_dump = false

@msgpack_factory = MessagePack::Factory.new
@msgpack_factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime)
@system_config = SystemConfig.new
end

@@ -51,7 +51,6 @@ def initialize

attr_reader :root_agent
attr_reader :matches, :sources
attr_reader :msgpack_factory
attr_reader :system_config

def init(system_config)
@@ -69,6 +68,8 @@ def init(system_config)

@root_agent = RootAgent.new(@system_config)

MessagePackFactory.init

self
end

3 changes: 0 additions & 3 deletions lib/fluent/env.rb
Original file line number Diff line number Diff line change
@@ -18,9 +18,6 @@ module Fluent
DEFAULT_CONFIG_PATH = ENV['FLUENT_CONF'] || '/etc/fluent/fluent.conf'
DEFAULT_PLUGIN_DIR = ENV['FLUENT_PLUGIN'] || '/etc/fluent/plugin'
DEFAULT_SOCKET_PATH = ENV['FLUENT_SOCKET'] || '/var/run/fluent/fluent.sock'
DEFAULT_LISTEN_PORT = 24224
DEFAULT_FILE_PERMISSION = 0644
DEFAULT_DIR_PERMISSION = 0755
IS_WINDOWS = /mswin|mingw/ === RUBY_PLATFORM
private_constant :IS_WINDOWS

62 changes: 62 additions & 0 deletions lib/fluent/msgpack_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'msgpack'
require 'fluent/time'

module Fluent
module MessagePackFactory
@@engine_factory = nil

module Mixin
def msgpack_factory
MessagePackFactory.engine_factory
end

def msgpack_packer(*args)
msgpack_factory.packer(*args)
end

def msgpack_unpacker(*args)
msgpack_factory.unpacker(*args)
end
end

def self.engine_factory
@@engine_factory || factory
end

def self.factory
factory = MessagePack::Factory.new
factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime)
factory
end

def self.packer(*args)
factory.packer(*args)
end

def self.unpacker(*args)
factory.unpacker(*args)
end

def self.init
factory = MessagePack::Factory.new
factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime)
@@engine_factory = factory
end
end
end
45 changes: 29 additions & 16 deletions lib/fluent/plugin.rb
Original file line number Diff line number Diff line change
@@ -31,9 +31,9 @@ module Plugin
BUFFER_REGISTRY = Registry.new(:buffer, 'fluent/plugin/buf_')
PARSER_REGISTRY = Registry.new(:parser, 'fluent/plugin/parser_')
FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_')
# TODO: plugin storage
STORAGE_REGISTRY = Registry.new(:storage, 'fluent/plugin/storage_')

REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY]
REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY]

def self.register_input(type, klass)
register_impl('input', INPUT_REGISTRY, type, klass)
@@ -71,6 +71,10 @@ def self.register_formatter(type, klass_or_proc)
end
end

def self.register_storage(type, klass)
register_impl('storage', STORAGE_REGISTRY, type, klass)
end

def self.lookup_type_from_class(klass_or_its_name)
klass = if klass_or_its_name.is_a? Class
klass_or_its_name
@@ -101,24 +105,29 @@ def self.new_filter(type)
new_impl('filter', FILTER_REGISTRY, type)
end

def self.new_buffer(type)
new_impl('buffer', BUFFER_REGISTRY, type)
def self.new_buffer(type, parent: nil)
new_impl('buffer', BUFFER_REGISTRY, type, parent)
end

def self.new_parser(type)
def self.new_parser(type, parent: nil)
require 'fluent/parser'

if type[0] == '/' && type[-1] == '/'
# This usage is not recommended for new API... create RegexpParser directly
require 'fluent/parser'
Fluent::TextParser.lookup(type)
else
new_impl('parser', PARSER_REGISTRY, type)
new_impl('parser', PARSER_REGISTRY, type, parent)
end
impl
end

def self.new_formatter(type)
new_impl('formatter', FORMATTER_REGISTRY, type)
def self.new_formatter(type, parent: nil)
new_impl('formatter', FORMATTER_REGISTRY, type, parent)
end

def self.new_storage(type)
new_impl('storage', STORAGE_REGISTRY, type)
end

def self.register_impl(kind, registry, type, value)
@@ -130,17 +139,21 @@ def self.register_impl(kind, registry, type, value)
nil
end

def self.new_impl(kind, registry, type)
def self.new_impl(kind, registry, type, parent)
# "'type' not found" is handled by registry
obj = registry.lookup(type)
case
when obj.is_a?(Class)
obj.new
when obj.respond_to?(:call) && obj.arity == 0
obj.call
else
raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)."
impl = case
when obj.is_a?(Class)
obj.new
when obj.respond_to?(:call) && obj.arity == 0
obj.call
else
raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)."
end
if parent && impl.respond_to?(:"owner=")
impl.owner = parent
end
impl
end
end
end
15 changes: 12 additions & 3 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
@@ -19,20 +19,22 @@
require 'fluent/plugin_id'
require 'fluent/log'
require 'fluent/plugin_helper'
require 'fluent/system_config'

module Fluent
module Plugin
class Base
include Configurable
include PluginId
include SystemConfig::Mixin
include PluginLoggerMixin
include PluginHelper::Mixin

State = Struct.new(:configure, :start, :stop, :shutdown, :close, :terminate)
State = Struct.new(:configure, :start, :stop, :before_shutdown, :shutdown, :close, :terminate)

def initialize
super
@state = State.new(false, false, false, false, false, false)
@state = State.new(false, false, false, false, false, false, false)
end

def has_router?
@@ -56,7 +58,10 @@ def stop
self
end

# TBD: before_shutdown ?
def before_shutdown
@state.before_shutdown = true
self
end

def shutdown
@state.shutdown = true
@@ -86,6 +91,10 @@ def stopped?
@state.stop
end

def before_shutdown?
@state.before_shutdown
end

def shutdown?
@state.shutdown
end
19 changes: 15 additions & 4 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
@@ -17,17 +17,23 @@
require 'fileutils'
require 'uri'

require 'fluent/buffer'
require 'fluent/env'
require 'fluent/plugin'
require 'fluent/buffer'
require 'fluent/system_config'

module Fluent
class FileBufferChunk < BufferChunk
include SystemConfig::Mixin

FILE_PERMISSION = 0644

def initialize(key, path, unique_id, mode="a+", symlink_path = nil)
super(key)
@path = path
@unique_id = unique_id
@file = File.open(@path, mode, DEFAULT_FILE_PERMISSION)
@file_permission = system_config.file_permission || FILE_PERMISSION
@file = File.open(@path, mode, @file_permission)
@file.binmode
@file.sync = true
@size = @file.stat.size
@@ -78,7 +84,7 @@ def mv(path)
@file.close
File.rename(@path, path)
@path = path
@file = File.open(@path, 'rb', DEFAULT_FILE_PERMISSION)
@file = File.open(@path, 'rb', @file_permission)
@file.sync = true
@size = @file.size
@file.pos = pos
@@ -91,8 +97,12 @@ def mv(path)
end

class FileBuffer < BasicBuffer
include SystemConfig::Mixin

Plugin.register_buffer('file', self)

DIR_PERMISSION = 0755

@@buffer_paths = {}

def initialize
@@ -130,10 +140,11 @@ def configure(conf)
@buffer_path_suffix = ".log"
end

@dir_perm = system_config.dir_permission || DIR_PERMISSION
end

def start
FileUtils.mkdir_p File.dirname(@buffer_path_prefix + "path"), mode: DEFAULT_DIR_PERMISSION
FileUtils.mkdir_p File.dirname(@buffer_path_prefix + "path"), mode: @dir_perm
super
end

Loading