Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
still writing
Browse files Browse the repository at this point in the history
tagomoris committed Mar 31, 2016

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 4cb69d7 commit 3ac2264
Showing 6 changed files with 231 additions and 39 deletions.
47 changes: 20 additions & 27 deletions lib/fluent/plugin.rb
Original file line number Diff line number Diff line change
@@ -106,35 +106,24 @@ def self.new_filter(type)
end

def self.new_buffer(type, parent: nil)
impl = new_impl('buffer', BUFFER_REGISTRY, type)
if parent && impl.respond_to?(:"owner=")
impl.owner = parent
end
impl
new_impl('buffer', BUFFER_REGISTRY, type, parent)
end

def self.new_parser(type, parent: nil)
require 'fluent/parser'

impl = if type[0] == '/' && type[-1] == '/'
# This usage is not recommended for new API... create RegexpParser directly
require 'fluent/parser'
Fluent::TextParser.lookup(type)
else
new_impl('parser', PARSER_REGISTRY, type)
end
if parent && impl.respond_to?(:"owner=")
impl.owner = parent
if type[0] == '/' && type[-1] == '/'
# This usage is not recommended for new API... create RegexpParser directly
require 'fluent/parser'
Fluent::TextParser.lookup(type)
else
new_impl('parser', PARSER_REGISTRY, type, parent)
end
impl
end

def self.new_formatter(type, parent: nil)
impl = new_impl('formatter', FORMATTER_REGISTRY, type)
if parent && impl.respond_to?(:"owner=")
impl.owner = parent
end
impl
new_impl('formatter', FORMATTER_REGISTRY, type, parent)
end

def self.new_storage(type)
@@ -150,17 +139,21 @@ def self.register_impl(kind, registry, type, value)
nil
end

def self.new_impl(kind, registry, type)
def self.new_impl(kind, registry, type, parent)
# "'type' not found" is handled by registry
obj = registry.lookup(type)
case
when obj.is_a?(Class)
obj.new
when obj.respond_to?(:call) && obj.arity == 0
obj.call
else
raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)."
impl = case
when obj.is_a?(Class)
obj.new
when obj.respond_to?(:call) && obj.arity == 0
obj.call
else
raise Fluent::ConfigError, "#{kind} plugin '#{type}' is not a Class nor callable (without arguments)."
end
if parent && impl.respond_to?(:"owner=")
impl.owner = parent
end
impl
end
end
end
13 changes: 10 additions & 3 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
@@ -30,11 +30,11 @@ class Base
include PluginLoggerMixin
include PluginHelper::Mixin

State = Struct.new(:configure, :start, :stop, :shutdown, :close, :terminate)
State = Struct.new(:configure, :start, :stop, :before_shutdown, :shutdown, :close, :terminate)

def initialize
super
@state = State.new(false, false, false, false, false, false)
@state = State.new(false, false, false, false, false, false, false)
end

def has_router?
@@ -58,7 +58,10 @@ def stop
self
end

# TBD: before_shutdown ?
def before_shutdown
@state.before_shutdown = true
self
end

def shutdown
@state.shutdown = true
@@ -88,6 +91,10 @@ def stopped?
@state.stop
end

def before_shutdown?
@state.before_shutdown
end

def shutdown?
@state.shutdown
end
111 changes: 111 additions & 0 deletions lib/fluent/plugin/buf_file2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fileutils'
require 'uri'

require 'fluent/buffer'
require 'fluent/env'
require 'fluent/plugin'
require 'fluent/system_config'

module Fluent
module Plugin
class FileBuffer < Fluent::Plugin::Buffer
Plugin.register_buffer('file2', self)

include SystemConfig::Mixin

DEFAULT_CHUNK_BYTES_LIMIT = 256 * 1024 * 1024 # 256MB
DEFAULT_TOTAL_BYTES_LIMIT = 64 * 1024 * 1024 * 1024 # 64GB, same with v0.12 (TimeSlicedOutput + buf_file)

# TODO: buffer_path based on system config
desc 'The path where buffer chunks are stored.'
config_param :path, :string

config_set_default :chunk_bytes_limit, DEFAULT_CHUNK_BYTES_LIMIT
config_set_default :total_bytes_limit, DEFAULT_TOTAL_BYTES_LIMIT

##TODO: Buffer plugin cannot handle symlinks because new API @stage has many writing buffer chunks
## re-implement this feature on out_file, w/ enqueue_chunk(or generate_chunk) hook + chunk.path
# attr_accessor :symlink_path

@@buffer_paths = {}

def initialize
super
@uri_parser = URI::Parser.new
@symlink_path = nil
@dir_permission = system_config.dir_permission || DIR_PERMISSION
end

def configure(conf)
super

type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
if @@buffer_paths.has_key?(@path)
type_using_this_path = Plugin.lookup_type_from_class(@@buffer_paths[@path])
raise ConfigError, "Other '#{type_using_this_path}' plugin already use same buffer path: type = #{type_of_owner}, buffer path = #{@path}"
end

@@buffer_paths[@path] = type_of_owner

unless @path.include?('*')
@path += '.*.log'
end
end

def start
FileUtils.mkdir_p File.dirname(@buffer_path_prefix + "path"), mode: @dir_permission

super
end

def persistent?
true
end

def resume
stage = {}
queue = []

Dir.glob(@path) do |path|
m = new_metadata()
mode = Fluent::Plugin::Buffer::FileChunk.assume_chunk_state(path)
chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
case chunk.state
when :staged
stage[chunk.metadata] = chunk
when :queued, :blocked
queue << chunk
else
raise "BUG: unexpected chunk state '#{chunk.state}' for path '#{path}'"
end
add_metadata(chunk.metadata)
end

queue.sort_by!{ |chunk| chunk.modified_at }

return stage, queue
end

def generate_chunk(metadata)
# FileChunk generates real path with unique_id
Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create)
end
end
end
end
34 changes: 34 additions & 0 deletions lib/fluent/plugin/buf_memory2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/buffer'
require 'fluent/plugin/buffer/memory_chunk'

module Fluent
module Plugin
class MemoryBuffer < Fluent::Plugin::Buffer
Plugin.register_buffer('memory2', self)

def resume
return {}, []
end

def generate_chunk(metadata)
Fluent::Plugin::Buffer::MemoryChunk.new(metadata)
end
end
end
end
43 changes: 41 additions & 2 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
@@ -19,11 +19,50 @@
module Fluent
module Plugin
class Output < Base
def emit(tag, es)

config_section :buffer, param_name: :buf_config, required: false, multi: false, final: true do
config_argument(:chunk_keys, default: nil){ v.start_with?("[") ? JSON.load(v) : v.to_s.strip.split(/\s*,\s*/) } # TODO: use string_list

end

def process(tag, es)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def format(tag, time, record)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def write(chunk)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

def try_write(chunk)
raise NotImplementedError, "BUG: output plugins MUST implement this method"
end

# TBD: merge buffered output ?
def configure(conf)
super
end

def emit_sync(tag, es)
process(tag, es)
end

def emit_buffered(tag, es)
# TODO: create hash of metadata => [formatted_lines]

meta = metadata(tag)
@emit_count += 1
data = format_stream(tag, es)
@buffer.emit(meta, data)
[meta]
end

def emit(tag, es)
if self.class.instance_methods.include?(:process)
end
end
end
end
end
22 changes: 15 additions & 7 deletions lib/fluent/plugin_helper/event_emitter.rb
Original file line number Diff line number Diff line change
@@ -19,15 +19,27 @@
module Fluent
module PluginHelper
module EventEmitter
attr_accessor :router

# stop : [-]
# shutdown : disable @router
# close : [-]
# terminate: [-]

def router
@_event_emitter_used_actually = true
@router
end

def has_router?
true
end

def event_emitter_used_actually?
@_event_emitter_used_actually
end

def initialize
super
@_event_emitter_used_actually = false
@router = nil
end

@@ -42,11 +54,7 @@ def configure(conf)
end
end

def has_router?
true
end

def close
def shutdown
super
@router = nil
end

0 comments on commit 3ac2264

Please sign in to comment.