Skip to content

Commit

Permalink
Support Unix domain socket in Kafka::Datadog
Browse files Browse the repository at this point in the history
dogstatsd-ruby has supported Unix domain socket.
cf. DataDog/dogstatsd-ruby#61
  • Loading branch information
abicky committed May 10, 2020
1 parent 1b2ed7c commit 423629a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 23 deletions.
11 changes: 10 additions & 1 deletion lib/kafka/datadog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module Datadog

class << self
def statsd
@statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags)
@statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags, socket_path: socket_path)
end

def statsd=(statsd)
Expand All @@ -57,6 +57,15 @@ def port=(port)
clear
end

def socket_path
@socket_path
end

def socket_path=(socket_path)
@socket_path = socket_path
clear
end

def namespace
@namespace ||= STATSD_NAMESPACE
end
Expand Down
38 changes: 29 additions & 9 deletions spec/datadog_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,40 @@
agent.stop
end

it "emits metrics to the Datadog agent" do
Kafka::Datadog.host = agent.host
Kafka::Datadog.port = agent.port
context "when host and port are specified" do
it "emits metrics to the Datadog agent" do
Kafka::Datadog.host = agent.host
Kafka::Datadog.port = agent.port

client = Kafka::Datadog.statsd
client = Kafka::Datadog.statsd

client.increment("greetings")
client.increment("greetings")

agent.wait_for_metrics
agent.wait_for_metrics

expect(agent.metrics.count).to eq 1
expect(agent.metrics.count).to eq 1

metric = agent.metrics.first
metric = agent.metrics.first

expect(metric).to eq "ruby_kafka.greetings"
expect(metric).to eq "ruby_kafka.greetings"
end
end

context "when socket_path is specified" do
it "emits metrics to the Datadog agent" do
Kafka::Datadog.socket_path = agent.socket_path

client = Kafka::Datadog.statsd

client.increment("greetings")

agent.wait_for_metrics

expect(agent.metrics.count).to eq 1

metric = agent.metrics.first

expect(metric).to eq "ruby_kafka.greetings"
end
end
end
40 changes: 30 additions & 10 deletions spec/fake_datadog_agent.rb
Original file line number Diff line number Diff line change
@@ -1,27 +1,39 @@
# frozen_string_literal: true

require "socket"
require "tempfile"

class FakeDatadogAgent
attr_reader :host, :port, :metrics
attr_reader :host, :port, :socket_path, :metrics

def initialize
@host = "127.0.0.1"
@port = 9999
@socket = UDPSocket.new
@thread = nil
@socket_path = Tempfile.open("fake_datadog_agent_sock") do |f|
path = f.path
f.unlink
path
end
@udp_socket = UDPSocket.new
@uds_socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM)
@threads = []
@metrics = []

@socket.bind(@host, @port)
end

def start
@thread = Thread.new { loop { receive } }
@thread.abort_on_exception = true
@udp_socket.bind(@host, @port)
@uds_socket.bind(Addrinfo.unix(@socket_path))

@threads << Thread.new { loop { receive_from_udp_socket } }
@threads << Thread.new { loop { receive_from_uds_socket } }
@threads.each { |th| th.abort_on_exception = true }
end

def stop
@thread.kill
@threads.each(&:kill)
@udp_socket.close
@uds_socket.close
@threads = []
end

def wait_for_metrics(count: 1)
Expand All @@ -34,9 +46,17 @@ def wait_for_metrics(count: 1)

private

def receive
data, sender = @socket.recvfrom(512)
def receive_from_udp_socket
data, _ = @udp_socket.recvfrom(512)
add_metrics(data)
end

def receive_from_uds_socket
data, _ = @uds_socket.recvfrom(512)
add_metrics(data)
end

def add_metrics(data)
data.split("\n").each do |message|
metric = message.split(":").first
@metrics << metric if metric
Expand Down
25 changes: 22 additions & 3 deletions spec/functional/datadog_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,31 @@
require "fake_datadog_agent"

describe "Reporting metrics to Datadog", functional: true do
example "reporting connection metrics" do
agent = FakeDatadogAgent.new
let(:agent) { FakeDatadogAgent.new }

before do
agent.start
end

after do
agent.stop
end

example "reporting connection metrics using UDP socket" do
Kafka::Datadog.port = agent.port

agent.start
kafka.topics

agent.wait_for_metrics(count: 4)

expect(agent.metrics).to include("ruby_kafka.api.calls")
expect(agent.metrics).to include("ruby_kafka.api.latency")
expect(agent.metrics).to include("ruby_kafka.api.request_size")
expect(agent.metrics).to include("ruby_kafka.api.response_size")
end

example "reporting connection metrics using Unix domain socket" do
Kafka::Datadog.socket_path = agent.socket_path

kafka.topics

Expand Down

0 comments on commit 423629a

Please sign in to comment.