Skip to content

Commit

Permalink
SAMZA-1283: Expose the buffered-message-size metric
Browse files Browse the repository at this point in the history
Regardless of whether we enable size limit for the consumer buffer, this metric helps to see what's the buffer size and make configuring size limit easier.

Author: Xinyu Liu <[email protected]>

Reviewers: Jagadish Venkatraman <[email protected]>

Closes apache#184 from xinyuiscool/SAMZA-1283
  • Loading branch information
Xinyu Liu committed May 10, 2017
1 parent 36b2f23 commit 71930e2
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize; // size in bytes per SystemStreamPartition
private final Map<SystemStreamPartition, Boolean> noMoreMessage;
private final Clock clock;
protected final boolean fetchLimitByBytesEnabled;

public BlockingEnvelopeMap() {
this(new NoOpMetricsRegistry());
Expand All @@ -83,17 +82,15 @@ public long currentTimeMillis() {
}

public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
this(metricsRegistry, clock, null, false);
this(metricsRegistry, clock, null);
}

public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName, boolean fetchLimitByBytesEnabled) {
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName;
this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
this.clock = clock;
this.fetchLimitByBytesEnabled = fetchLimitByBytesEnabled;
// Created when size is disabled for code simplification, and as the overhead is negligible.
this.bufferedMessagesSize = new ConcurrentHashMap<SystemStreamPartition, AtomicLong>();
}

Expand All @@ -103,7 +100,6 @@ public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String
public void register(SystemStreamPartition systemStreamPartition, String offset) {
metrics.initMetrics(systemStreamPartition);
bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
// Created when size is disabled for code simplification, and the overhead is negligible.
bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
}

Expand Down Expand Up @@ -155,9 +151,7 @@ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<System

if (outgoingList.size() > 0) {
messagesToReturn.put(systemStreamPartition, outgoingList);
if (fetchLimitByBytesEnabled) {
subtractSizeOnQDrain(systemStreamPartition, outgoingList);
}
subtractSizeOnQDrain(systemStreamPartition, outgoingList);
}
}

Expand All @@ -183,9 +177,7 @@ private void subtractSizeOnQDrain(SystemStreamPartition systemStreamPartition, L
*/
protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
bufferedMessages.get(systemStreamPartition).put(envelope);
if (fetchLimitByBytesEnabled) {
bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
}
bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
}

/**
Expand Down Expand Up @@ -262,9 +254,7 @@ public void initMetrics(SystemStreamPartition systemStreamPartition) {
this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));

metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
if (fetchLimitByBytesEnabled) {
metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
}
metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
}

public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public MockBlockingEnvelopeMap() {
}

public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) {
super(new NoOpMetricsRegistry(), CLOCK, null, fetchLimitByBytesEnabled);
super(new NoOpMetricsRegistry(), CLOCK, null);
injectedQueue = new MockQueue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ private[kafka] class KafkaSystemConsumer(
new Clock {
def currentTimeMillis = clock()
},
classOf[KafkaSystemConsumerMetrics].getName,
fetchLimitByBytesEnabled) with Toss with Logging {
classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {

type HostPort = (String, Int)
val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
Expand Down

0 comments on commit 71930e2

Please sign in to comment.