Skip to content

Commit

Permalink
Merge pull request #86 from grosser/grosser/bytes
Browse files Browse the repository at this point in the history
Set buffer size in bytes
  • Loading branch information
gmmeyer authored Jun 7, 2018
2 parents 8459020 + a5a96b4 commit aeea888
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
* port / host / tags / namespace can no longer be set on the instance to allow thread-safety [#87][] by [@grosser][]
* replace global logger with instance option [#90][] by [@grosser][]
* make format_service_check private [#89][] [@grosser][]
* [IMPROVEMENT] Frozen strings and less allocations everywhere. [#78][], [@grosser][]
* [BUGFIX] Make sure the UDP message fits into the 8k buffer of dd-agent. [#86][], [@Antti][]
* max_buffer_size initializer argument removed and replaced with max_buffer_bytes (defaults to 8192)
* max_buffer_size/max_buffer_size= methods removed
* format_event is now private

## 3.3.0 / 2018.02.04

Expand Down
33 changes: 23 additions & 10 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class Statsd
# Buffer containing the statsd message before they are sent in batch
attr_reader :buffer

# Maximum number of metrics in the buffer before it is flushed
attr_reader :max_buffer_size
# Maximum buffer size in bytes before it is flushed
attr_reader :max_buffer_bytes

# Logger
attr_reader :logger
Expand All @@ -86,14 +86,14 @@ class Statsd
# @option [String] namespace set a namespace to be prepended to every metric name
# @option [Array<String>] tags tags to be added to every metric
# @option [Loger] logger for debugging
# @option [Integer] max_buffer_size max messages to buffer
# @option [Integer] max_buffer_bytes max bytes to buffer when using #batch
# @option [String] socket_path unix socket path
def initialize(
host = DEFAULT_HOST,
port = DEFAULT_PORT,
namespace: nil,
tags: nil,
max_buffer_size: nil,
max_buffer_bytes: 8192,
socket_path: nil,
logger: nil
)
Expand All @@ -108,8 +108,10 @@ def initialize(
raise ArgumentError, 'tags must be a Array<String>' unless tags.nil? or tags.is_a? Array
@tags = (tags || []).compact.map! {|tag| escape_tag_content(tag)}

@buffer = Array.new
@max_buffer_size = max_buffer_size || 50
# batching
@max_buffer_bytes = max_buffer_bytes
@buffer = String.new
@buffer_bytes = 0
@batch_nesting_depth = 0
end

Expand Down Expand Up @@ -432,17 +434,28 @@ def send_stats(stat, delta, type, opts=EMPTY_OPTIONS)

def send_stat(message)
if @batch_nesting_depth > 0
message_bytes = message.bytesize
unless @buffer_bytes == 0
if @buffer_bytes + 1 + message_bytes >= @max_buffer_bytes
flush_buffer
else
@buffer << NEW_LINE
@buffer_bytes += 1
end
end

@buffer << message
flush_buffer if @buffer.length >= @max_buffer_size
@buffer_bytes += message_bytes
else
send_to_socket(message)
end
end

def flush_buffer
return @buffer if @buffer.empty?
send_to_socket(@buffer.join(NEW_LINE))
@buffer = Array.new
return if @buffer_bytes == 0
send_to_socket(@buffer)
@buffer = String.new
@buffer_bytes = 0
end

def connect_to_socket
Expand Down
13 changes: 5 additions & 8 deletions spec/statsd_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -683,18 +683,15 @@ def send(*)
end

it "should flush when the buffer gets too big" do
expected_message = 'mycounter:1|c'
number_of_messages_to_fill_the_buffer = (8192 - 1) / (expected_message.bytesize + 1)
@statsd.batch do |s|
# increment a counter 50 times in batch
51.times do
# increment a counter to fill the buffer and trigger buffer flush
(number_of_messages_to_fill_the_buffer + 1).times do
s.increment("mycounter")
end

# We should receive a packet of 50 messages that was automatically
# flushed when the buffer got too big
theoretical_reply = Array.new
50.times do
theoretical_reply.push('mycounter:1|c')
end
theoretical_reply = Array.new(number_of_messages_to_fill_the_buffer) { 'mycounter:1|c' }
@statsd.socket.recv.must_equal [theoretical_reply.join("\n")]
end

Expand Down

0 comments on commit aeea888

Please sign in to comment.