-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[sender] enable buffer pool + concurrent sending threads #95
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice PR!
Added some minor notes, feel free to ignore them.
import java.nio.channels.DatagramChannel; | ||
import java.nio.charset.Charset; | ||
import java.util.concurrent.Callable; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note:
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.DatagramChannel;
import java.util.concurrent.Callable;
not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
@@ -0,0 +1,38 @@ | |||
package com.timgroup.statsd; | |||
|
|||
import java.io.IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Not used
pool.put(buffer); | ||
} | ||
|
||
int available() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: Unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still like it as a class method, so I think I'm going to keep it unless you strongly disagree, good catch.
boolean send(final String message) { | ||
if (!shutdown) { | ||
if (qSize.get() < qCapacity) { | ||
messages.offer(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returned value of boolean java.util.Queue.offer(String e)
might be checked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has now been moved to StatsDNonBlockingProcessor
, there we use a ConcurrentLinkedQueue
which never returns false
on offer()
because it is an unbounded queue.
} | ||
} catch (final InterruptedException e) { | ||
if (shutdown) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also want to call handler.handle(e);
as in line 76?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good question, I'm not sure, we're going to gracefully shutdown, and handle doesn't do much, but it's a good question. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about it, I'll address in #94 if I decide we should handle these errors differently.
ca83be3
to
cc97650
Compare
cc97650
to
921a0bc
Compare
CI is green, merging to the big larger #94, final changes and fixes will be made there. Thank you for the review. |
This PR adds a buffer pool to the sender. The original code used a single pre-allocated buffer we reused to submit batched dogstatsd metrics. While efficient from a memory perspective, this approach implied we had to stop processing dogstatsd metrics while waiting on IO for every packet submission.
This PR aims to decouple actual packet transmission from dogstatsd message processing and packet assembling. To achieve this we introduce a pool of preallocated buffers and an additional queue, message are processed and batched into these buffers - blocking, if no buffers are available (further improvements are possible here, where we could entertain different strategies to deal with buffer unavailability), once a packet has been filled or no more messages are waiting, the buffer is queued in an outgoing queue. A workers thread (or threads) then will dequeue and write these buffers to the socket. The benefits of this approach, other than the high-level idea explained above, is that it should allow for better usage of the IOwait slice, as the OS kernel will likely do the right thing when scheduling pending work helping us increase the throughput.
TL;DR:
Note: the max perf test is useful for benchmarking but might be worth removing or modifying before merging - or disabling in the CI. It also conflicts currently with a similar test in
jaime/perf
, we can address that later.Note: this should be merged onto
jaime/perf
(so before #94)