Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Merge vmg/fast into master #17

Merged
merged 12 commits into from
Sep 29, 2015
106 changes: 63 additions & 43 deletions lib/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,56 @@
# statsd = Statsd.new('localhost').tap{|sd| sd.namespace = 'account'}
# statsd.increment 'activate'
class Statsd
class Host
attr_reader :ip, :port, :key
def initialize(host, port, key = nil)
@ip = Addrinfo.ip(host).ip_address
@port = port
class RubyUdpClient
attr_reader :key, :sock

def initialize(address, port, key = nil)
addrinfo = Addrinfo.ip(address)
@sock = UDPSocket.new(addrinfo.pfamily)
@sock.connect(addrinfo.ip_address, port)
@key = key
end

def send(msg)
sock.write(msg)
rescue => boom
nil
end
end

# A namespace to prepend to all statsd calls.
attr_accessor :namespace
attr_reader :namespace

def namespace=(namespace)
@namespace = namespace
@prefix = namespace ? "#{@namespace}." : "".freeze
end

# All the endpoints where StatsD will report metrics
attr_reader :shards

#characters that will be replaced with _ in stat names
RESERVED_CHARS_REGEX = /[\:\|\@]/

class << self
# Set to any standard logger instance (including stdlib's Logger) to enable
# stat logging using logger.debug
attr_accessor :logger
COUNTER_TYPE = "c".freeze
TIMING_TYPE = "ms".freeze
GAUGE_TYPE = "g".freeze
HISTOGRAM_TYPE = "h".freeze

def initialize(client_class = nil)
@shards = []
@client_class = client_class || RubyUdpClient
self.namespace = nil
end

# @param [String] host your statsd host
# @param [Integer] port your statsd port
def initialize(host, port=8125, key=nil)
@hosts = []
add_host(host, port, key)
def self.simple(addr, port = nil)
self.new.add_shard(addr, port)
end

def add_host(host, port = nil, key = nil)
host, port = host.split(':') if host.include?(':')
@hosts << Host.new(host, port.to_i, key)
def add_shard(addr, port = nil, key = nil)
addr, port = addr.split(':') if addr.include?(':')
@shards << @client_class.new(addr, port.to_i, key)
self
end

# Sends an increment (count = 1) for the given stat to the statsd server.
Expand All @@ -66,7 +85,7 @@ def decrement(stat, sample_rate=1); count stat, -1, sample_rate end
# @param [String] stat stat name
# @param [Integer] count count
# @param [Integer] sample_rate sample rate, 1 for always
def count(stat, count, sample_rate=1); send stat, count, 'c', sample_rate end
def count(stat, count, sample_rate=1); send stat, count, COUNTER_TYPE, sample_rate end

# Sends an arbitary gauge value for the given stat to the statsd server.
#
Expand All @@ -75,7 +94,7 @@ def count(stat, count, sample_rate=1); send stat, count, 'c', sample_rate end
# @example Report the current user count:
# $statsd.gauge('user.count', User.count)
def gauge(stat, value)
send stat, value, 'g'
send stat, value, GAUGE_TYPE
end

# Sends a timing (in ms) for the given stat to the statsd server. The
Expand All @@ -86,7 +105,7 @@ def gauge(stat, value)
# @param stat stat name
# @param [Integer] ms timing in milliseconds
# @param [Integer] sample_rate sample rate, 1 for always
def timing(stat, ms, sample_rate=1); send stat, ms, 'ms', sample_rate end
def timing(stat, ms, sample_rate=1); send stat, ms, TIMING_TYPE, sample_rate end

# Reports execution time of the provided block using {#timing}.
#
Expand All @@ -107,7 +126,7 @@ def time(stat, sample_rate=1)
# sample_rate determines what percentage of the time this report is sent. The
# statsd server then uses the sample_rate to correctly track the average
# for the stat.
def histogram(stat, value, sample_rate=1); send stat, value, 'h', sample_rate end
def histogram(stat, value, sample_rate=1); send stat, value, HISTOGRAM_TYPE, sample_rate end

private

Expand All @@ -117,29 +136,32 @@ def sampled(sample_rate)

def send(stat, delta, type, sample_rate=1)
sampled(sample_rate) do
prefix = "#{@namespace}." unless @namespace.nil?
stat = stat.to_s.gsub('::', '.').gsub(RESERVED_CHARS_REGEX, '_')
msg = "#{prefix}#{stat}:#{delta}|#{type}#{'|@' << sample_rate.to_s if sample_rate < 1}"
send_to_socket(select_host(stat), msg)
end
end

def send_to_socket(host, message)
self.class.logger.debug {"Statsd: #{message}"} if self.class.logger
if host.key.nil?
socket.send(message, 0, host.ip, host.port)
else
socket.send(signed_payload(host.key, message), 0, host.ip, host.port)
stat = stat.to_s.dup
stat.gsub!(/::/, ".".freeze)
stat.gsub!(RESERVED_CHARS_REGEX, "_".freeze)

msg = ""
msg << @prefix
msg << stat
msg << ":".freeze
msg << delta.to_s
msg << "|".freeze
msg << type
if sample_rate < 1
msg << "|@".freeze
msg << sample_rate.to_s
end

shard = select_shard(stat)
shard.send(shard.key ? signed_payload(shard.key, msg) : msg)
end
rescue => boom
self.class.logger.error {"Statsd: #{boom.class} #{boom}"} if self.class.logger
end

def select_host(stat)
if @hosts.size == 1
@hosts.first
def select_shard(stat)
if @shards.size == 1
@shards.first
else
@hosts[Zlib.crc32(stat) % @hosts.size]
@shards[Zlib.crc32(stat) % @shards.size]
end
end

Expand Down Expand Up @@ -167,6 +189,4 @@ def timestamp
def nonce
SecureRandom.random_bytes(4)
end

def socket; @socket ||= UDPSocket.new end
end