Skip to content

Commit

Permalink
Merge pull request #864 from fluent/introduce-plugin-storage
Browse files Browse the repository at this point in the history
Introduce plugin storage
  • Loading branch information
tagomoris committed Mar 29, 2016
2 parents 3be4232 + 600b98d commit 06f5a3e
Show file tree
Hide file tree
Showing 15 changed files with 1,174 additions and 25 deletions.
12 changes: 10 additions & 2 deletions lib/fluent/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ module Plugin
BUFFER_REGISTRY = Registry.new(:buffer, 'fluent/plugin/buf_')
PARSER_REGISTRY = Registry.new(:parser, 'fluent/plugin/parser_')
FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_')
# TODO: plugin storage
STORAGE_REGISTRY = Registry.new(:storage, 'fluent/plugin/storage_')

REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY]
REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY]

def self.register_input(type, klass)
register_impl('input', INPUT_REGISTRY, type, klass)
Expand Down Expand Up @@ -71,6 +71,10 @@ def self.register_formatter(type, klass_or_proc)
end
end

def self.register_storage(type, klass)
register_impl('storage', STORAGE_REGISTRY, type, klass)
end

def self.lookup_type_from_class(klass_or_its_name)
klass = if klass_or_its_name.is_a? Class
klass_or_its_name
Expand Down Expand Up @@ -121,6 +125,10 @@ def self.new_formatter(type)
new_impl('formatter', FORMATTER_REGISTRY, type)
end

def self.new_storage(type)
new_impl('storage', STORAGE_REGISTRY, type)
end

def self.register_impl(kind, registry, type, value)
if !value.is_a?(Class) && !value.respond_to?(:call)
raise Fluent::ConfigError, "Invalid implementation as #{kind} plugin: '#{type}'. It must be a Class, or callable."
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
require 'fluent/plugin_id'
require 'fluent/log'
require 'fluent/plugin_helper'
require 'fluent/system_config'

module Fluent
module Plugin
class Base
include Configurable
include PluginId
include SystemConfig::Mixin
include PluginLoggerMixin
include PluginHelper::Mixin

Expand Down
7 changes: 4 additions & 3 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
require 'fileutils'
require 'uri'

require 'fluent/buffer'
require 'fluent/env'
require 'fluent/plugin'
require 'fluent/buffer'
require 'fluent/system_config'

module Fluent
class FileBufferChunk < BufferChunk
include SystemConfigMixin
include SystemConfig::Mixin

FILE_PERMISSION = 0644

Expand Down Expand Up @@ -96,7 +97,7 @@ def mv(path)
end

class FileBuffer < BasicBuffer
include SystemConfigMixin
include SystemConfig::Mixin

Plugin.register_buffer('file', self)

Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require 'fluent/input'
require 'fluent/config/error'
require 'fluent/event'
require 'fluent/system_config'

if Fluent.windows?
require_relative 'file_wrapper'
Expand All @@ -28,7 +29,7 @@

module Fluent
class TailInput < Input
include SystemConfigMixin
include SystemConfig::Mixin

Plugin.register_input('tail', self)

Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

require 'fluent/output'
require 'fluent/config/error'
require 'fluent/system_config'

module Fluent
class FileOutput < TimeSlicedOutput
include SystemConfigMixin
include SystemConfig::Mixin

Plugin.register_output('file', self)

Expand Down
115 changes: 115 additions & 0 deletions lib/fluent/plugin/storage.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin'
require 'fluent/configurable'

module Fluent
module Plugin
class Storage
include Fluent::Configurable
include Fluent::SystemConfig::Mixin

DEFAULT_TYPE = 'local'

config_param :persistent, :bool, default: false # load/save with all operations
config_param :autosave, :bool, default: true
config_param :autosave_interval, :time, default: 10
config_param :save_at_shutdown, :bool, default: true

def self.validate_key(key)
raise ArgumentError, "key must be a string (or symbol for to_s)" unless key.is_a?(String) || key.is_a?(Symbol)
key.to_s
end

attr_accessor :log

def configure(conf)
super(conf)

@_owner = nil
end

def plugin_id(id, configured)
@_plugin_id = id
@_plugin_id_configured = configured
end

def owner=(plugin)
@_owner = plugin

@_plugin_id = plugin.plugin_id
@_plugin_id_configured = plugin.plugin_id_configured?

@log = plugin.log
end

def owner
@_owner
end

def persistent_always?
false
end

def synchronized?
false
end

def implementation
self
end

def load
# load storage data from any data source, or initialize storage internally
end

def save
# save internal data store into data source (to be loaded)
end

def get(key)
raise NotImplementedError, "Implement this method in child class"
end

def fetch(key, defval)
raise NotImplementedError, "Implement this method in child class"
end

def put(key, value)
# return value
raise NotImplementedError, "Implement this method in child class"
end

def delete(key)
# return deleted value
raise NotImplementedError, "Implement this method in child class"
end

def update(key, &block) # transactional get-and-update
raise NotImplementedError, "Implement this method in child class"
end

# storage plugins has only 'close' and 'terminate'
# stop: used in helper to stop autosave
# shutdown: used in helper to call #save finally if needed
def close; end
def terminate
@_owner = nil
end
end
end
end
111 changes: 111 additions & 0 deletions lib/fluent/plugin/storage_local.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
require 'fluent/plugin'
require 'fluent/plugin/storage'

require 'fileutils'
require 'yajl'

module Fluent
module Plugin
class LocalStorage < Storage
Fluent::Plugin.register_storage('local', self)

DEFAULT_DIR_MODE = 0755
DEFAULT_FILE_MODE = 0644

config_param :path, :string, default: nil
config_param :mode, :integer, default: DEFAULT_FILE_MODE
config_param :dir_mode, :integer, default: DEFAULT_DIR_MODE
config_param :pretty_print, :bool, default: false

def initialize
super
@store = {}
end

def configure(conf, plugin)
super

@on_memory = false
if !@path && !@_plugin_id_configured
if @autosave || @persistent
raise Fluent::ConfigError, "Plugin @id or path for <storage> required to save data"
else
log.info "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
@on_memory = true
end
elsif @path
path = @path.dup
else # @_plugin_id_configured
## TODO: get process-wide directory for plugin storage, and generate path for this plugin storage instance
# path =
end

if !@on_memory
dir = File.dirname(@path)
FileUtils.mkdir_p(dir, mode: @dir_mode) unless File.exist?(dir)
if File.exist?(@path)
raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path)
begin
data = Yajl::Parser.parse(open(@path, 'r:utf-8'){ |io| io.read })
raise Fluent::ConfigError, "Invalid contents (not object) in plugin storage file: '#{@path}'" unless data.is_a?(Hash)
rescue => e
log.error "failed to read data from plugin storage file", path: @path, error_class: e.class, error: e
raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'"
end
else
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.writable?(@path)
end
end
end

def load
return if @on_memory
return unless File.exist?(@path)
begin
json_string = open(@path, 'r:utf-8'){ |io| io.read }
json = Yajl::Parser.parse(json_string)
unless json.is_a?(Hash)
log.error "broken content for plugin storage (Hash required: ignored)", type: json.class
log.debug "broken content", content: json_string
return
end
@store = json
rescue => e
log.error "failed to load data for plugin storage from file", path: @path, error_class: e.class, error: e
end
end

def save
return if @on_memory
tmp_path = @path + '.tmp'
begin
json_string = Yajl::Encoder.encode(@store, pretty: @pretty_print)
open(tmp_path, 'w:utf-8', @mode){ |io| io.write json_string }
File.rename(tmp_path, @path)
rescue => e
log.error "failed to save data for plugin storage to file", path: @path, tmp: tmp_path, error_class: e.class, error: e
end
end

def get(key)
@store[key.to_s]
end

def fetch(key, defval)
@store.fetch(key.to_s, defval)
end

def put(key, value)
@store[key.to_s] = value
end

def delete(key)
@store.delete(key.to_s)
end

def update(key, &block)
@store[key.to_s] = block.call(@store[key.to_s])
end
end
end
end
Loading

0 comments on commit 06f5a3e

Please sign in to comment.