Skip to content

Commit

Permalink
Merge pull request #866 from fluent/introduce-new-plugin-classes
Browse files Browse the repository at this point in the history
Introduce New Plugin Classes
  • Loading branch information
tagomoris committed Apr 19, 2016
2 parents debf241 + e96dcdd commit 1a3df5e
Show file tree
Hide file tree
Showing 50 changed files with 9,066 additions and 136 deletions.
52 changes: 32 additions & 20 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
language: ruby
cache: bundler

rvm:
- 2.1
- 2.2.3
- 2.3.0
- ruby-head
- rbx
# http://rubies.travis-ci.org/
# See here for osx_image -> OSX versions: https://docs.travis-ci.com/user/languages/objective-c
matrix:
include:
- rvm: 2.1.10
os: linux
- rvm: 2.2.4
os: linux
- rvm: 2.3.0
os: linux
- rvm: ruby-head
os: linux
- rvm: 2.1.0
os: osx
osx_image: xcode7.3 # OSX 10.11
- rvm: 2.2.4
os: osx
osx_image: xcode7.1 # OSX 10.10
- rvm: ruby-head
os: osx
osx_image: xcode 7.3 # OSX 10.11
allow_failures:
- rvm: ruby-head

os:
- linux
- osx
# no valid version/env for ruby 2.3 right now
# - rvm: 2.3.0
# os: osx
# osx_image: ....

branches:
only:
Expand All @@ -18,16 +37,9 @@ branches:
- v0.12
- v0.14

gemfile:
- Gemfile

script: bundle exec rake

sudo: false

matrix:
allow_failures:
- rvm: ruby-head
- rvm: rbx
- rvm: 2.3.0
os: osx
addons:
apt:
packages:
- libgmp3-dev
2 changes: 1 addition & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("strptime", [">= 0.1.7"])

gem.add_development_dependency("rake", [">= 0.9.2"])
gem.add_development_dependency("flexmock", ["~> 1.3.3"])
gem.add_development_dependency("flexmock", ["~> 2.0.5"])
gem.add_development_dependency("parallel_tests", [">= 0.15.3"])
gem.add_development_dependency("simplecov", ["~> 0.6.4"])
gem.add_development_dependency("rr", [">= 1.0.0"])
Expand Down
35 changes: 34 additions & 1 deletion lib/fluent/config/configure_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
module Fluent
module Config
class ConfigureProxy
attr_accessor :name, :final, :param_name, :init, :required, :multi, :alias, :argument, :params, :defaults, :descriptions, :sections
attr_accessor :name, :final, :param_name, :init, :required, :multi, :alias, :configured_in_section
attr_accessor :argument, :params, :defaults, :descriptions, :sections
# config_param :desc, :string, :default => '....'
# config_set_default :buffer_type, :memory
#
Expand Down Expand Up @@ -50,6 +51,11 @@ def initialize(name, opts = {})

raise "init and required are exclusive" if @init && @required

# specify section name for viewpoint of owner(parent) plugin
# for buffer plugins: all params are in <buffer> section of owner
# others: <storage>, <format> (formatter/parser), ...
@configured_in_section = nil

@argument = nil # nil: ignore argument
@params = {}
@defaults = {}
Expand Down Expand Up @@ -89,6 +95,9 @@ def merge(other) # self is base class, other is subclass
if overwrite?(other, :alias)
raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: alias"
end
if overwrite?(other, :configured_in_section)
raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: configured_in"
end

options = {}
# param_name is used not to ovewrite plugin's instance
Expand All @@ -103,6 +112,9 @@ def merge(other) # self is base class, other is subclass

merged = self.class.new(other.name, options)

# configured_in MUST be kept
merged.configured_in_section = self.configured_in_section

merged.argument = other.argument || self.argument
merged.params = other.params.merge(self.params)
merged.defaults = self.defaults.merge(other.defaults)
Expand Down Expand Up @@ -144,6 +156,9 @@ def merge_for_finalized(other)
if overwrite?(other, :alias)
raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: alias"
end
if overwrite?(other, :configured_in_section)
raise ConfigError, "BUG: subclass cannot overwrite base class's config_section: configured_in"
end

options = {}
options[:param_name] = other.param_name
Expand All @@ -155,6 +170,8 @@ def merge_for_finalized(other)

merged = self.class.new(other.name, options)

merged.configured_in_section = self.configured_in_section

merged.argument = self.argument || other.argument
merged.params = other.params.merge(self.params)
merged.defaults = other.defaults.merge(self.defaults)
Expand All @@ -175,6 +192,15 @@ def merge_for_finalized(other)
merged
end

def overwrite_defaults(other) # other is owner plugin's corresponding proxy
self.defaults = self.defaults.merge(other.defaults)
self.sections.keys.each do |section_key|
if other.sections.has_key?(section_key)
self.sections[section_key].overwrite_defaults(other.sections[section_key])
end
end
end

def parameter_configuration(name, *args, &block)
name = name.to_sym

Expand Down Expand Up @@ -213,6 +239,13 @@ def parameter_configuration(name, *args, &block)
[name, block, opts]
end

def configured_in(section_name)
if @configured_in_section
raise ArgumentError, "#{self.name}: configured_in called twice"
end
@configured_in_section = section_name.to_sym
end

def config_argument(name, *args, &block)
if @argument
raise ArgumentError, "#{self.name}: config_argument called twice"
Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/configurable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def self.included(mod)
end

def initialize
super
# to simulate implicit 'attr_accessor' by config_param / config_section and its value by config_set_default
proxy = self.class.merged_configure_proxy
proxy.params.keys.each do |name|
Expand All @@ -51,6 +52,14 @@ def configure(conf)
proxy = self.class.merged_configure_proxy
conf.corresponding_proxies << proxy

if self.respond_to?(:owner) && self.owner
owner_proxy = owner.class.merged_configure_proxy
if proxy.configured_in_section
owner_proxy = owner_proxy.sections[proxy.configured_in_section]
end
proxy.overwrite_defaults(owner_proxy) if owner_proxy
end

# In the nested section, can't get plugin class through proxies so get plugin class here
plugin_class = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s)
root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class)
Expand Down Expand Up @@ -97,6 +106,10 @@ def configure_proxy(mod_name)
map[mod_name]
end

def configured_in(section_name)
configure_proxy(self.name).configured_in(section_name)
end

def config_param(name, *args, &block)
configure_proxy(self.name).config_param(name, *args, &block)
attr_accessor name
Expand Down
10 changes: 10 additions & 0 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -398,5 +398,15 @@ def configure(conf)
@log.optional_attrs = {}
end
end

def start
@log.reset
super
end

def terminate
super
@log.reset
end
end
end
39 changes: 23 additions & 16 deletions lib/fluent/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ module Plugin
# plugins for fluentd plugins: fluent/plugin/type/NAME.rb
# ex: storage, buffer chunk, ...

# first class plugins (instantiated by Engine)
INPUT_REGISTRY = Registry.new(:input, 'fluent/plugin/in_')
OUTPUT_REGISTRY = Registry.new(:output, 'fluent/plugin/out_')
FILTER_REGISTRY = Registry.new(:filter, 'fluent/plugin/filter_')

# feature plugin: second class plugins (instanciated by Plugins or Helpers)
BUFFER_REGISTRY = Registry.new(:buffer, 'fluent/plugin/buf_')
PARSER_REGISTRY = Registry.new(:parser, 'fluent/plugin/parser_')
FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_')
Expand Down Expand Up @@ -105,28 +108,28 @@ def self.new_filter(type)
new_impl('filter', FILTER_REGISTRY, type)
end

def self.new_buffer(type)
new_impl('buffer', BUFFER_REGISTRY, type)
def self.new_buffer(type, parent: nil)
new_impl('buffer', BUFFER_REGISTRY, type, parent)
end

def self.new_parser(type)
def self.new_parser(type, parent: nil)
require 'fluent/parser'

if type[0] == '/' && type[-1] == '/'
# This usage is not recommended for new API... create RegexpParser directly
require 'fluent/parser'
Fluent::TextParser.lookup(type)
else
new_impl('parser', PARSER_REGISTRY, type)
new_impl('parser', PARSER_REGISTRY, type, parent)
end
end

def self.new_formatter(type)
new_impl('formatter', FORMATTER_REGISTRY, type)
def self.new_formatter(type, parent: nil)
new_impl('formatter', FORMATTER_REGISTRY, type, parent)
end

def self.new_storage(type)
new_impl('storage', STORAGE_REGISTRY, type)
def self.new_storage(type, parent: nil)
new_impl('storage', STORAGE_REGISTRY, type, parent)
end

def self.register_impl(kind, registry, type, value)
Expand All @@ -138,17 +141,21 @@ def self.register_impl(kind, registry, type, value)
nil
end

def self.new_impl(kind, registry, type)
def self.new_impl(kind, registry, type, parent=nil)
# "'type' not found" is handled by registry
obj = registry.lookup(type)
case
when obj.is_a?(Class)
obj.new
when obj.respond_to?(:call) && obj.arity == 0
obj.call
else
raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)."
impl = case
when obj.is_a?(Class)
obj.new
when obj.respond_to?(:call) && obj.arity == 0
obj.call
else
raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)."
end
if parent && impl.respond_to?("owner=")
impl.owner = parent
end
impl
end
end
end
Loading

0 comments on commit 1a3df5e

Please sign in to comment.