Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
Shard between different hosts on demand
Browse files Browse the repository at this point in the history
  • Loading branch information
vmg committed Jan 20, 2015
1 parent 50d2966 commit affe1b9
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions lib/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'securerandom'
require 'socket'
require 'time'
require 'zlib'

# = Statsd: A Statsd client (https://github.com/etsy/statsd)
#
Expand All @@ -16,6 +17,15 @@
# statsd = Statsd.new('localhost').tap{|sd| sd.namespace = 'account'}
# statsd.increment 'activate'
class Statsd
class Host
attr_reader :ip, :port, :key
def initialize(host, port, key = nil)
@ip = Addrinfo.ip(host).ip_address
@port = port
@key = key
end
end

# A namespace to prepend to all statsd calls.
attr_accessor :namespace

Expand All @@ -34,8 +44,12 @@ class << self
# @param [String] host your statsd host
# @param [Integer] port your statsd port
def initialize(host, port=8125, key=nil)
@host, @port, @key = host, port, key
@ip = Addrinfo.ip(host).ip_address
@hosts = []
add_host(host, port, key)
end

def add_host(host, port, key = nil)
@hosts << Host.new(host, port, key)
end

# Sends an increment (count = 1) for the given stat to the statsd server.
Expand Down Expand Up @@ -104,24 +118,33 @@ def send(stat, delta, type, sample_rate=1)
sampled(sample_rate) do
prefix = "#{@namespace}." unless @namespace.nil?
stat = stat.to_s.gsub('::', '.').gsub(RESERVED_CHARS_REGEX, '_')
send_to_socket("#{prefix}#{stat}:#{delta}|#{type}#{'|@' << sample_rate.to_s if sample_rate < 1}")
msg = "#{prefix}#{stat}:#{delta}|#{type}#{'|@' << sample_rate.to_s if sample_rate < 1}")
send_to_socket(select_host(stat), msg)
end
end

def send_to_socket(message)
def send_to_socket(host, message)
self.class.logger.debug {"Statsd: #{message}"} if self.class.logger
if @key.nil?
socket.send(message, 0, @ip, @port)
if host.key.nil?
socket.send(message, 0, host.ip, host.port)
else
socket.send(signed_payload(message), 0, @ip, @port)
socket.send(signed_payload(host.key, message), 0, host.ip, host.port)
end
rescue => boom
self.class.logger.error {"Statsd: #{boom.class} #{boom}"} if self.class.logger
end

def signed_payload(message)
def select_host(stat)
if @hosts.size == 1
@hosts.first
else
@hosts[Zlib.crc32(stat) % @hosts.size]
end
end

def signed_payload(key, message)
payload = timestamp + nonce + message
signature = OpenSSL::HMAC.digest(SHA256, @key, payload)
signature = OpenSSL::HMAC.digest(SHA256, key, payload)
signature + payload
end

Expand Down

0 comments on commit affe1b9

Please sign in to comment.