Skip to content

Commit

Permalink
[fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when l…
Browse files Browse the repository at this point in the history
…oad non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed (apache#22580)

(cherry picked from commit 340d60d)
(cherry picked from commit 053c455)
  • Loading branch information
poorbarcode authored and srinath-ctds committed May 16, 2024
1 parent 32aff55 commit da6212a
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,8 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
} catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
return FutureUtil.failedFuture(e);
topicFuture.completeExceptionally(e);
return topicFuture;
}
CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
isOwner.thenRun(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,31 @@

import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -70,6 +74,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
Expand Down Expand Up @@ -113,7 +118,12 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.awaitility.Awaitility;
import org.glassfish.jersey.client.JerseyClient;
import org.glassfish.jersey.client.JerseyClientBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -1651,80 +1661,120 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception

// this test is disabled since it is flaky
@Test(enabled = false)
public void testBrokerStatsTopicLoadFailed() throws Exception {
admin.namespaces().createNamespace("prop/ns-test");

String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID();
String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID();
public void testMetricsPersistentTopicLoadFails() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);
admin.topics().unload(topic);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) &&
path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) {
return true;
}
return false;
});

BrokerService brokerService = pulsar.getBrokerService();
brokerService = Mockito.spy(brokerService);
// mock create persistent topic failed
Mockito
.doAnswer(invocation -> {
CompletableFuture<ManagedLedgerConfig> f = new CompletableFuture<>();
f.completeExceptionally(new RuntimeException("This is an exception"));
return f;
})
.when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic)));

// mock create non-persistent topic failed
Mockito
.doAnswer(inv -> {
CompletableFuture<Void> f = new CompletableFuture<>();
f.completeExceptionally(new RuntimeException("This is an exception"));
return f;
})
.when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic));


PulsarService pulsarService = pulsar;
Field field = PulsarService.class.getDeclaredField("brokerService");
field.setAccessible(true);
field.set(pulsarService, brokerService);

CompletableFuture<Producer<String>> producer = pulsarClient.newProducer(Schema.STRING)
.topic(persistentTopic)
.createAsync();
CompletableFuture<Producer<String>> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(nonPersistentTopic)
.createAsync();

producer.whenComplete((v, t) -> {
if (t == null) {
try {
v.close();
} catch (PulsarClientException e) {
// ignore
// Do test
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
JerseyClient httpClient = JerseyClientBuilder.createClient();
Awaitility.await().until(() -> {
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
BufferedReader reader = new BufferedReader(new StringReader(response));
String line;
String metricsLine = null;
while ((line = reader.readLine()) != null) {
if (StringUtils.isBlank(line)) {
continue;
}
if (line.startsWith("#")) {
continue;
}
if (line.contains("topic_load_failed")) {
metricsLine = line;
break;
}
}
log.info("topic_load_failed: {}", metricsLine);
if (metricsLine == null) {
return false;
}
reader.close();
String[] parts = metricsLine.split(" ");
Double value = Double.valueOf(parts[parts.length - 1]);
return value >= 1D;
});
producer1.whenComplete((v, t) -> {
if (t == null) {
try {
v.close();
} catch (PulsarClientException e) {
// ignore

// Remove the injection.
failMarker.set(false);
// cleanup.
httpClient.close();
producer.join().close();
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
}

@Test
public void testMetricsNonPersistentTopicLoadFails() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
String topic = "non-persistent://" + namespace + "/topic1_" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);

// Inject an error that makes the topic load fails.
// Since we did not set a topic factory name, the "topicFactory" variable is null, inject a mocked
// "topicFactory".
Field fieldTopicFactory = BrokerService.class.getDeclaredField("topicFactory");
fieldTopicFactory.setAccessible(true);
TopicFactory originalTopicFactory = (TopicFactory) fieldTopicFactory.get(pulsar.getBrokerService());
assertNull(originalTopicFactory);
TopicFactory mockedTopicFactory = mock(TopicFactory.class);
when(mockedTopicFactory.create(anyString(), any(), any(), any()))
.thenThrow(new RuntimeException("mocked error"));
fieldTopicFactory.set(pulsar.getBrokerService(), mockedTopicFactory);

// Do test.
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
JerseyClient httpClient = JerseyClientBuilder.createClient();
Awaitility.await().until(() -> {
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
BufferedReader reader = new BufferedReader(new StringReader(response));
String line;
String metricsLine = null;
while ((line = reader.readLine()) != null) {
if (StringUtils.isBlank(line)) {
continue;
}
if (line.startsWith("#")) {
continue;
}
if (line.contains("topic_load_failed")) {
metricsLine = line;
break;
}
}
log.info("topic_load_failed: {}", metricsLine);
if (metricsLine == null) {
return false;
}
reader.close();
String[] parts = metricsLine.split(" ");
Double value = Double.valueOf(parts[parts.length - 1]);
return value >= 1D;
});

Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> {
String json = admin.brokerStats().getMetrics();
JsonArray metrics = new Gson().fromJson(json, JsonArray.class);
AtomicBoolean flag = new AtomicBoolean(false);

metrics.forEach(ele -> {
JsonObject obj = ((JsonObject) ele);
JsonObject metrics0 = (JsonObject) obj.get("metrics");
JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count");
if (null != v && v.getAsDouble() >= 2D) {
flag.set(true);
}
});
// Remove the injection.
fieldTopicFactory.set(pulsar.getBrokerService(), null);

return flag.get();
});
// cleanup.
httpClient.close();
producer.join().close();
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
}

@Test
Expand Down

0 comments on commit da6212a

Please sign in to comment.