diff --git a/lib/kafka/datadog.rb b/lib/kafka/datadog.rb index c5710c432..032c1749d 100644 --- a/lib/kafka/datadog.rb +++ b/lib/kafka/datadog.rb @@ -31,7 +31,7 @@ module Datadog class << self def statsd - @statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags) + @statsd ||= ::Datadog::Statsd.new(host, port, namespace: namespace, tags: tags, socket_path: socket_path) end def statsd=(statsd) @@ -57,6 +57,15 @@ def port=(port) clear end + def socket_path + @socket_path + end + + def socket_path=(socket_path) + @socket_path = socket_path + clear + end + def namespace @namespace ||= STATSD_NAMESPACE end diff --git a/spec/datadog_spec.rb b/spec/datadog_spec.rb index e02b81f61..6ed12731b 100644 --- a/spec/datadog_spec.rb +++ b/spec/datadog_spec.rb @@ -14,20 +14,40 @@ agent.stop end - it "emits metrics to the Datadog agent" do - Kafka::Datadog.host = agent.host - Kafka::Datadog.port = agent.port + context "when host and port are specified" do + it "emits metrics to the Datadog agent" do + Kafka::Datadog.host = agent.host + Kafka::Datadog.port = agent.port - client = Kafka::Datadog.statsd + client = Kafka::Datadog.statsd - client.increment("greetings") + client.increment("greetings") - agent.wait_for_metrics + agent.wait_for_metrics - expect(agent.metrics.count).to eq 1 + expect(agent.metrics.count).to eq 1 - metric = agent.metrics.first + metric = agent.metrics.first - expect(metric).to eq "ruby_kafka.greetings" + expect(metric).to eq "ruby_kafka.greetings" + end + end + + context "when socket_path is specified" do + it "emits metrics to the Datadog agent" do + Kafka::Datadog.socket_path = agent.socket_path + + client = Kafka::Datadog.statsd + + client.increment("greetings") + + agent.wait_for_metrics + + expect(agent.metrics.count).to eq 1 + + metric = agent.metrics.first + + expect(metric).to eq "ruby_kafka.greetings" + end end end diff --git a/spec/fake_datadog_agent.rb b/spec/fake_datadog_agent.rb index 4705c7d3d..3b0b930de 100644 --- a/spec/fake_datadog_agent.rb +++ b/spec/fake_datadog_agent.rb @@ -1,27 +1,39 @@ # frozen_string_literal: true require "socket" +require "tempfile" class FakeDatadogAgent - attr_reader :host, :port, :metrics + attr_reader :host, :port, :socket_path, :metrics def initialize @host = "127.0.0.1" @port = 9999 - @socket = UDPSocket.new - @thread = nil + @socket_path = Tempfile.open("fake_datadog_agent_sock") do |f| + path = f.path + f.unlink + path + end + @udp_socket = UDPSocket.new + @uds_socket = Socket.new(Socket::AF_UNIX, Socket::SOCK_DGRAM) + @threads = [] @metrics = [] - - @socket.bind(@host, @port) end def start - @thread = Thread.new { loop { receive } } - @thread.abort_on_exception = true + @udp_socket.bind(@host, @port) + @uds_socket.bind(Addrinfo.unix(@socket_path)) + + @threads << Thread.new { loop { receive_from_udp_socket } } + @threads << Thread.new { loop { receive_from_uds_socket } } + @threads.each { |th| th.abort_on_exception = true } end def stop - @thread.kill + @threads.each(&:kill) + @udp_socket.close + @uds_socket.close + @threads = [] end def wait_for_metrics(count: 1) @@ -34,9 +46,17 @@ def wait_for_metrics(count: 1) private - def receive - data, sender = @socket.recvfrom(512) + def receive_from_udp_socket + data, _ = @udp_socket.recvfrom(512) + add_metrics(data) + end + + def receive_from_uds_socket + data, _ = @uds_socket.recvfrom(512) + add_metrics(data) + end + def add_metrics(data) data.split("\n").each do |message| metric = message.split(":").first @metrics << metric if metric diff --git a/spec/functional/datadog_spec.rb b/spec/functional/datadog_spec.rb index a7a5915e4..7399e55df 100644 --- a/spec/functional/datadog_spec.rb +++ b/spec/functional/datadog_spec.rb @@ -5,12 +5,31 @@ require "fake_datadog_agent" describe "Reporting metrics to Datadog", functional: true do - example "reporting connection metrics" do - agent = FakeDatadogAgent.new + let(:agent) { FakeDatadogAgent.new } + before do + agent.start + end + + after do + agent.stop + end + + example "reporting connection metrics using UDP socket" do Kafka::Datadog.port = agent.port - agent.start + kafka.topics + + agent.wait_for_metrics(count: 4) + + expect(agent.metrics).to include("ruby_kafka.api.calls") + expect(agent.metrics).to include("ruby_kafka.api.latency") + expect(agent.metrics).to include("ruby_kafka.api.request_size") + expect(agent.metrics).to include("ruby_kafka.api.response_size") + end + + example "reporting connection metrics using Unix domain socket" do + Kafka::Datadog.socket_path = agent.socket_path kafka.topics