Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Counter API #1857

Merged
merged 23 commits into from
Apr 21, 2018
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions example/counter.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<system>
<counter_server>
scope server1
bind 127.0.0.1
port 24321
path tmp/back
</counter_server>
</system>

<source>
@type dummy
tag "test.data"
auto_increment_key number
</source>

<match>
@type stdout
</match>
23 changes: 23 additions & 0 deletions lib/fluent/counter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/counter/client'
require 'fluent/counter/server'

module Fluent
module Counter
end
end
46 changes: 46 additions & 0 deletions lib/fluent/counter/base_socket.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'cool.io'
require 'fluent/msgpack_factory'

module Fluent
module Counter
class BaseSocket < Coolio::TCPSocket
include Fluent::MessagePackFactory::Mixin

def packed_write(data)
write pack(data)
end

def on_read(data)
msgpack_unpacker.feed_each(data) do |d|
on_message d
end
end

def on_message(data)
raise NotImplementedError
end

private

def pack(data)
msgpack_packer.pack(data)
end
end
end
end
237 changes: 237 additions & 0 deletions lib/fluent/counter/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'cool.io'
require 'fluent/counter/base_socket'
require 'timeout'

module Fluent
module Counter
class Client
DEFAULT_PORT = 24321
DEFAULT_ADDR = '127.0.0.1'
DEFAULT_TIMEOUT = 5
ID_LIMIT_COUNT = 1 << 31

def initialize(loop = nil, opt = {})
@loop = loop || Coolio::Loop.new
@port = opt[:port] || DEFAULT_PORT
@host = opt[:host] || DEFAULT_ADDR
@log = opt[:log] || $log
@timeout = opt[:timeout] || DEFAULT_TIMEOUT
@conn = Connection.connect(@host, @port, method(:on_message))
@responses = {}
@id = 0
@id_mutex = Mutex.new
@loop_mutex = Mutex.new
end

def start
@loop.attach(@conn)
@log.debug("starting counter client: #{@host}:#{@port}")
self
rescue => e
if @log
@log.error e
else
STDERR.puts e
end
end

def stop
@conn.close
@log.debug("calling stop in counter client: #{@host}:#{@port}")
end

def establish(scope)
scope = Timeout.timeout(@timeout) {
response = send_request('establish', nil, [scope])
raise response.errors.first if response.errors?
data = response.data
data.first
}
@scope = scope
rescue Timeout::Error
raise "Can't establish the connection to counter server due to timeout"
end

# === Example
# `init` receives various arguments.
#
# 1. init(name: 'name')
# 2. init({ name: 'name',reset_interval: 20 }, options: {})
# 3. init([{ name: 'name1',reset_interval: 20 }, { name: 'name2',reset_interval: 20 }])
# 4. init([{ name: 'name1',reset_interval: 20 }, { name: 'name2',reset_interval: 20 }], options: {})
def init(params, options: {})
exist_scope!
params = [params] unless params.is_a?(Array)
res = send_request('init', @scope, params, options)

# if `async` is true, return a Future object (non blocking).
# if `async` is false or missing, block at this method and return a Hash object.
options[:async] ? res : res.get
end

def delete(*params, options: {})
exist_scope!
res = send_request('delete', @scope, params, options)
options[:async] ? res : res.get
end

# === Example
# `inc` receives various arguments.
#
# 1. init(name: 'name')
# 2. init({ name: 'name',value: 20 }, options: {})
# 3. init([{ name: 'name1',value: 20 }, { name: 'name2',value: 20 }])
# 4. init([{ name: 'name1',value: 20 }, { name: 'name2',value: 20 }], options: {})
def inc(params, options: {})
exist_scope!
params = [params] unless params.is_a?(Array)
res = send_request('inc', @scope, params, options)
options[:async] ? res : res.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about like following?

      def inc(params, options: {})
        exist_scope!
        params = [params] unless params.is_a?(Array)
        res = send_request('inc', @scope, params, options)
        if options[:async]
          if block_given?
            Thread.start do
              yield res.get
            end
          else
            res
          end
        else
          if block_given?
            yield res.get
          else
            res.get
          end
        end
      end

We can use this as following:

@counter_client = ...
@counter_client.inc({ name: "counter", value: 1}, options: { async: true }) do |response|
   # use response here
end

In the original code, we must handle future instance always when we send request asynchronously.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added suggested API.

end

def get(*params, options: {})
exist_scope!
res = send_request('get', @scope, params, options)
options[:async] ? res : res.get
end

def reset(*params, options: {})
exist_scope!
res = send_request('reset', @scope, params, options)
options[:async] ? res : res.get
end

private

def exist_scope!
raise 'Call `establish` method to get a `scope` before calling this method' unless @scope
end

def on_message(data)
if response = @responses.delete(data['id'])
response.set(data)
else
@log.warn("Receiving missing id data: #{data}")
end
end

def send_request(method, scope, params, opt = {})
id = generate_id
res = Future.new(@loop, @loop_mutex)
@responses[id] = res # set a response value to this future object at `on_message`
request = build_request(method, id, scope, params, opt)
@log.debug(request)
@conn.send_data request
res
end

def build_request(method, id, scope = nil, params = nil, options = nil)
r = { id: id, method: method }
r[:scope] = scope if scope
r[:params] = params if params
r[:options] = options if options
r
end

def generate_id
id = 0
@id_mutex.synchronize do
id = @id
@id += 1
@id = 0 if ID_LIMIT_COUNT < @id
end
id
end
end

class Connection < Fluent::Counter::BaseSocket
def initialize(io, on_message)
super(io)
@connection = false
@buffer = ''
@on_message = on_message
end

def send_data(data)
if @connection
packed_write data
else
@buffer += pack(data)
end
end

def on_connect
@connection = true
write @buffer
@buffer = ''
end

def on_close
@connection = false
end

def on_message(data)
@on_message.call(data)
end
end

class Future
def initialize(loop, mutex)
@set = false
@result = nil
@mutex = mutex
@loop = loop
end

def set(v)
@result = v
@set = true
end

def errors
get['errors']
end

def errors?
es = errors
es && !es.empty?
end

def data
get['data']
end

def get
# Block until `set` method is called and @result is set a value
join if @result.nil?
@result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing the return value to Response object from plain Hash?

class Response
   attr_reader :errors, :data
   def initialize(result)
      @errors = result["errors"]
      @data = result["data"]
      # ...
   end

  def success?
    @errors.nil? || @errors.empty?
  end

  def error?
    !success?
  end
end

It is useful to check Counter API response as following:

@counter_client.inc({ name: "counter", value: 1}, options: { async: true }) do |response|
   if response.success?
     log.debug("Update counter successfully")
   else
     log.warn("Counter API error: #{response.errors}")
   end
end

Original code:

future = @counter_client.inc({ name: "counter", value: 1}, options: { async: true })
Thread.start do
  response = future.get
  if future.errors?
     log.error("failure")
  else
     log.debug("success")
  end
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I will apply the changes soon.

end

private

def join
until @set
@mutex.synchronize do
@loop.run_once(0.0001) # retun a lock as soon as possible
end
end
end
end
end
end
65 changes: 65 additions & 0 deletions lib/fluent/counter/error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module Fluent
module Counter
class BaseError < StandardError
def to_hash
{ 'code' => code, 'message' => message }
end

def code
raise NotImplementedError
end
end

class InvalidParams < BaseError
def code
'invalid_params'
end
end

class UnknownKey < BaseError
def code
'unknown_key'
end
end

class ParseError < BaseError
def code
'parse_error'
end
end

class InvalidRequest < BaseError
def code
'invalid_request'
end
end

class MethodNotFound < BaseError
def code
'method_not_found'
end
end

class InternalServerError < BaseError
def code
'internal_server_error'
end
end
end
end
Loading