Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unix socket support #61

Merged
merged 8 commits into from
Nov 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ rvm:
- 2.2
- 2.3
- 2.4
- jruby-19mode

bundler_args: --without=localdev
3 changes: 2 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ task :default => :spec

Rake::TestTask.new(:spec) do |spec|
spec.libs << 'lib' << 'spec'
spec.pattern = 'spec/**/*_spec.rb'
spec.loader = :direct
spec.pattern = './spec/statsd_spec.rb'
spec.verbose = true
end

Expand Down
36 changes: 29 additions & 7 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class Statsd
# StatsD port. Defaults to 8125.
attr_reader :port

# DogStatsd unix socket path. Not used by default.
attr_reader :socket_path

# Global tags to be added to every statsd call. Defaults to no tags.
attr_reader :tags

Expand Down Expand Up @@ -90,8 +93,9 @@ def self.VERSION
# @option opts [Array<String>] :tags tags to be added to every metric
def initialize(host = DEFAULT_HOST, port = DEFAULT_PORT, opts = {}, max_buffer_size=50)
self.host, self.port = host, port
@socket_path = opts[:socket_path]
@prefix = nil
@socket = connect_to_socket(host, port)
@socket = connect_to_socket(host, port, socket_path) if @socket_path.nil?
self.namespace = opts[:namespace]
self.tags = opts[:tags]
@buffer = Array.new
Expand Down Expand Up @@ -433,27 +437,45 @@ def send_stat(message)
end
end

def flush_buffer()
def flush_buffer
send_to_socket(@buffer.join(NEW_LINE))
@buffer = Array.new
end

def connect_to_socket(host, port)
socket = UDPSocket.new
socket.connect(host, port)
def connect_to_socket(host, port, socket_path)
if !socket_path.nil?
socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
socket.connect(Socket.pack_sockaddr_un(socket_path))
else
socket = UDPSocket.new
socket.connect(host, port)
end
socket
end

def sock
@socket ||= connect_to_socket(host, port, socket_path)
end

def send_to_socket(message)
self.class.logger.debug { "Statsd: #{message}" } if self.class.logger
@socket.send(message, 0)
if @socket_path.nil?
sock.send(message, 0)
else
sock.sendmsg_nonblock(message)
end
rescue => boom
if @socket_path && (boom.is_a?(Errno::ECONNREFUSED) ||
boom.is_a?(Errno::ECONNRESET) ||
boom.is_a?(Errno::ENOENT))
return @socket = nil
end
# Try once to reconnect if the socket has been closed
retries ||= 1
if retries <= 1 && boom.is_a?(IOError) && boom.message =~ /closed stream/i
retries += 1
begin
@socket = connect_to_socket(host, port)
@socket = connect_to_socket(host, port, socket_path)
retry
rescue => e
boom = e
Expand Down
127 changes: 125 additions & 2 deletions spec/statsd_spec.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
require 'helper'
require 'timecop'
require 'socket'
require 'stringio'
require 'timecop'

describe Datadog::Statsd do
class Datadog::Statsd
# we need to stub this
attr_accessor :socket
end

before do
before do |test|
@statsd = Datadog::Statsd.new('localhost', 1234)
@statsd.socket = FakeUDPSocket.new
end
Expand All @@ -19,6 +20,22 @@ class Datadog::Statsd
@statsd.port.must_equal 1234
end

it "should create a UDPSocket when nothing is given" do
statsd = Datadog::Statsd.new
statsd.socket.must_be_instance_of(UDPSocket)
end

it "should create a UDPSocket when host and port are given" do
statsd = Datadog::Statsd.new('localhost', 1234)
statsd.socket.must_be_instance_of(UDPSocket)
end

it "should not create a socket when socket_path is given" do
# the socket may not exist when creating the Statsd object
statsd = Datadog::Statsd.new('localhost', 1234, {socket_path: '/tmp/socket'})
assert_nil statsd.socket
end

it "should default the host to 127.0.0.1, port to 8125, namespace to nil, and tags to []" do
statsd = Datadog::Statsd.new
statsd.host.must_equal '127.0.0.1'
Expand Down Expand Up @@ -438,6 +455,112 @@ def send(*)
end
end

describe "UDS error handling" do
before do
@statsd = Datadog::Statsd.new('localhost', 1234, {:socket_path => '/tmp/socket'})
Datadog::Statsd.logger = Logger.new(@log = StringIO.new)
end

describe "when socket throws connection reset error" do
before do
@fake_socket = Minitest::Mock.new
@fake_socket.expect(:connect, true) { true }
@fake_socket.expect :sendmsg_nonblock, true, ['foo:1|c']
@fake_socket.expect(:sendmsg_nonblock, true) { raise Errno::ECONNRESET }

@fake_socket2 = Minitest::Mock.new
@fake_socket2.expect(:connect, true) { true }
@fake_socket2.expect :sendmsg_nonblock, true, ['bar:1|c']
end

it "should ignore message and try reconnect on next call" do
Socket.stub(:new, @fake_socket) do
@statsd.increment('foo')
end
@statsd.increment('baz')
Socket.stub(:new, @fake_socket2) do
@statsd.increment('bar')
end
@fake_socket.verify
@fake_socket2.verify
end
end

describe "when socket throws connection refused error" do
before do
@fake_socket = Minitest::Mock.new
@fake_socket.expect(:connect, true) { true }
@fake_socket.expect :sendmsg_nonblock, true, ['foo:1|c']
@fake_socket.expect(:sendmsg_nonblock, true) { raise Errno::ECONNREFUSED }

@fake_socket2 = Minitest::Mock.new
@fake_socket2.expect(:connect, true) { true }
@fake_socket2.expect :sendmsg_nonblock, true, ['bar:1|c']
end

it "should ignore message and try reconnect on next call" do
Socket.stub(:new, @fake_socket) do
@statsd.increment('foo')
end
@statsd.increment('baz')
Socket.stub(:new, @fake_socket2) do
@statsd.increment('bar')
end
@fake_socket.verify
@fake_socket2.verify
end
end

describe "when socket throws file not found error" do
before do
@fake_socket = Minitest::Mock.new
@fake_socket.expect(:connect, true) { true }
@fake_socket.expect :sendmsg_nonblock, true, ['foo:1|c']
@fake_socket.expect(:sendmsg_nonblock, true) { raise Errno::ENOENT }

@fake_socket2 = Minitest::Mock.new
@fake_socket2.expect(:connect, true) { true }
@fake_socket2.expect :sendmsg_nonblock, true, ['bar:1|c']
end

it "should ignore message and try reconnect on next call" do
Socket.stub(:new, @fake_socket) do
@statsd.increment('foo')
end
@statsd.increment('baz')
Socket.stub(:new, @fake_socket2) do
@statsd.increment('bar')
end
@fake_socket.verify
@fake_socket2.verify
end
end

describe "when socket is full" do
before do
@fake_socket = Minitest::Mock.new
@fake_socket.expect(:connect, true) { true }
@fake_socket.expect :sendmsg_nonblock, true, ['foo:1|c']
@fake_socket.expect(:sendmsg_nonblock, true) { raise IO::EAGAINWaitWritable }
@fake_socket.expect :sendmsg_nonblock, true, ['bar:1|c']

@fake_socket2 = Minitest::Mock.new
end

it "should ignore message but does not reconnect on next call" do
Socket.stub(:new, @fake_socket) do
@statsd.increment('foo')
end
@statsd.increment('baz')
Socket.stub(:new, @fake_socket2) do
@statsd.increment('bar')
end
@fake_socket.verify
@fake_socket2.verify
end
end
end

describe "tagged" do

it "gauges support tags" do
Expand Down