Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metric for InMemoryDelayedDeliveryTracker's memory usage #15867

Merged
merged 12 commits into from
Aug 2, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ public long getNumberOfDelayedMessages() {
return priorityQueue.size();
}

public long getBufferMemoryUsage() {
return priorityQueue.bytesCapacity();
}

/**
* Update the scheduled timer task such that:
* 1. If there are no delayed messages, return and do not schedule a timer task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -1009,6 +1010,19 @@ public PersistentTopic getTopic() {
return topic;
}


public long getDelayedTrackerMemoryUsage() {
if (delayedDeliveryTracker.isEmpty()) {
return 0;
}

if (delayedDeliveryTracker.get() instanceof InMemoryDelayedDeliveryTracker) {
return ((InMemoryDelayedDeliveryTracker) delayedDeliveryTracker.get()).getBufferMemoryUsage();
}

return 0;
}

protected int getStickyKeyHash(Entry entry) {
return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,12 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.activeConsumerName = activeConsumer.consumerName();
}
}

if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
subStats.delayedTrackerMemoryUsage =
((PersistentDispatcherMultipleConsumers) dispatcher).getDelayedTrackerMemoryUsage();
}

if (Subscription.isIndividualAckMode(subType)) {
if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers d = (PersistentDispatcherMultipleConsumers) dispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,7 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog
stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
stats.nonContiguousDeletedMessagesRangesSerializedSize +=
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
stats.delayedMessageIndexSizeInBytes += subStats.delayedTrackerMemoryUsage;
});

replicators.forEach((cluster, replicator) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class AggregatedNamespaceStats {
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
int delayedTrackerMemoryUsage;

void updateStats(TopicStats stats) {
topicsCount++;
Expand All @@ -76,6 +77,7 @@ void updateStats(TopicStats stats) {
msgInCounter += stats.msgInCounter;
bytesOutCounter += stats.bytesOutCounter;
msgOutCounter += stats.msgOutCounter;
delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;

managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
Expand Down Expand Up @@ -156,5 +158,6 @@ public void reset() {

replicationStats.clear();
subscriptionStats.clear();
delayedTrackerMemoryUsage = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
stats.delayedTrackerMemoryUsage = tStatus.delayedMessageIndexSizeInBytes;

stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
Expand Down Expand Up @@ -348,6 +349,9 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl

metric(stream, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);

metric(stream, cluster, namespace, "pulsar_delayed_message_index_size_bytes",
stats.delayedTrackerMemoryUsage);

metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);

stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class TopicStats {
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
public int delayedTrackerMemoryUsage;
tjiuming marked this conversation as resolved.
Show resolved Hide resolved

public void reset() {
subscriptionsCount = 0;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void reset() {
compactionCompactedEntriesCount = 0;
compactionCompactedEntriesSize = 0;
compactionLatencyBuckets.reset();
delayedTrackerMemoryUsage = 0;
}

static void resetTypes() {
Expand Down Expand Up @@ -148,6 +150,9 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time",
stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);

metric(stream, cluster, namespace, topic, "pulsar_delayed_message_index_size_bytes",
stats.delayedTrackerMemoryUsage, splitTopicAndPartitionIndexLabel);

long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
splitTopicAndPartitionIndexLabel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,33 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker")
Expand Down Expand Up @@ -269,4 +274,77 @@ public void testPersistentPartitionedTopicUnload() throws Exception {
producer.close();
}
}


@DataProvider(name = "topicAndMetricsLevel")
public Object[][] indexPatternTestData() {
return new Object[][]{
new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", true},
new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", false},
};
}


@Test(dataProvider = "topicAndMetricsLevel")
public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics) throws Exception {
PulsarClient client = pulsar.getClient();
String namespace = TopicName.get(topic).getNamespace();
admin.namespaces().createNamespace(namespace);

final int messages = 100;
CountDownLatch latch = new CountDownLatch(messages);

@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test_sub")
.subscriptionType(SubscriptionType.Shared)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
try {
latch.countDown();
consumer1.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
}
})
.subscribe();
for (int a = 0; a < messages; a++) {
producer.newMessage()
.value(UUID.randomUUID().toString())
.deliverAfter(30, TimeUnit.SECONDS)
.sendAsync();
}
producer.flush();

latch.await(10, TimeUnit.SECONDS);
ByteArrayOutputStream output = new ByteArrayOutputStream();
tjiuming marked this conversation as resolved.
Show resolved Hide resolved
PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
String metricsStr = output.toString(StandardCharsets.UTF_8);

Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_delayed_message_index_size_bytes");
Assert.assertTrue(metrics.size() > 0);

int topicLevelNum = 0;
int namespaceLevelNum = 0;
for (PrometheusMetricsTest.Metric metric : metrics) {
if (exposeTopicLevelMetrics && metric.tags.get("topic").equals(topic)) {
Assert.assertTrue(metric.value > 0);
tjiuming marked this conversation as resolved.
Show resolved Hide resolved
topicLevelNum++;
} else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
Assert.assertTrue(metric.value > 0);
namespaceLevelNum++;
}
}

if (exposeTopicLevelMetrics) {
Assert.assertTrue(topicLevelNum > 0);
Assert.assertEquals(0, namespaceLevelNum);
} else {
Assert.assertTrue(namespaceLevelNum > 0);
Assert.assertEquals(topicLevelNum, 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
/** The serialized size of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRangesSerializedSize;

/** The size of InMemoryDelayedDeliveryTracer memory usage. */
public long delayedTrackerMemoryUsage;

/** SubscriptionProperties (key/value strings) associated with this subscribe. */
public Map<String, String> subscriptionProperties;

Expand Down Expand Up @@ -158,6 +161,7 @@ public void reset() {
consumersAfterMarkDeletePosition.clear();
nonContiguousDeletedMessagesRanges = 0;
nonContiguousDeletedMessagesRangesSerializedSize = 0;
delayedTrackerMemoryUsage = 0;
subscriptionProperties.clear();
}

Expand Down Expand Up @@ -193,6 +197,7 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) {
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
this.delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
this.subscriptionProperties.putAll(stats.subscriptionProperties);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public class TopicStatsImpl implements TopicStats {
/** The serialized size of non-contiguous deleted messages ranges. */
public int nonContiguousDeletedMessagesRangesSerializedSize;

/** The size of InMemoryDelayedDeliveryTracer memory usage. */
public int delayedMessageIndexSizeInBytes;

/** The compaction stats. */
public CompactionStatsImpl compaction;

Expand Down Expand Up @@ -200,6 +203,7 @@ public void reset() {
this.lastOffloadFailureTimeStamp = 0;
this.lastOffloadSuccessTimeStamp = 0;
this.publishRateLimitedTimes = 0L;
this.delayedMessageIndexSizeInBytes = 0;
this.compaction.reset();
}

Expand All @@ -226,6 +230,7 @@ public TopicStatsImpl add(TopicStats ts) {
this.offloadedStorageSize += stats.offloadedStorageSize;
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
this.delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes;

stats.getPublishers().forEach(s -> {
if (s.isSupportsPartialProducer() && s.getProducerName() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1598,11 +1598,12 @@ private void publishAndConsumeMessages(String inputTopic,
Json.pretty(pulsarAdmin.topics().getInternalStats(inputTopic, true)));
log.info("Output topic internal-stats: {}",
Json.pretty(pulsarAdmin.topics().getInternalStats(outputTopic, true)));
} else {
String logMsg = new String(msg.getValue(), UTF_8);
log.info("Received message: '{}'", logMsg);
assertTrue(expectedMessages.contains(logMsg), "Message '" + logMsg + "' not expected");
expectedMessages.remove(logMsg);
}
String logMsg = new String(msg.getValue(), UTF_8);
log.info("Received message: '{}'", logMsg);
assertTrue(expectedMessages.contains(logMsg), "Message '" + logMsg + "' not expected");
expectedMessages.remove(logMsg);
}

consumer.close();
Expand Down