From fdc135c2985f00359171519ed22cc08a46feccd8 Mon Sep 17 00:00:00 2001 From: wei <493703217@qq.com> Date: Sun, 1 Sep 2024 17:44:23 +0800 Subject: [PATCH] feat: add get partition stats function (#27) Signed-off-by: wei <493703217@qq.com> --- .../pulsar/admin/jdk/BaseTopicsImpl.java | 23 +++++++++++++++++++ .../admin/jdk/PartitionedTopicStats.java | 2 +- .../admin/jdk/NonPersistentTopicsTest.java | 10 ++++++++ .../admin/jdk/PersistentTopicsTest.java | 9 ++++++++ 4 files changed, 43 insertions(+), 1 deletion(-) diff --git a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/BaseTopicsImpl.java b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/BaseTopicsImpl.java index f1263be..f32da0c 100644 --- a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/BaseTopicsImpl.java +++ b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/BaseTopicsImpl.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.net.http.HttpResponse; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -69,6 +70,28 @@ public void updatePartitionedTopic(String tenant, String namespace, String encod } } + public PartitionedTopicStats getPartitionedStats(String tenant, String namespace, String encodedTopic, + boolean perPartition) + throws PulsarAdminException { + String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, + encodedTopic, "/partitioned-stats"); + try { + HttpResponse response = httpClient.get(url, "perPartition", + Arrays.toString(new Object[] {perPartition}), "getPreciseBacklog", + Arrays.toString(new Object[] {false}), "subscriptionBacklogSize", + Arrays.toString(new Object[] {false}), + "getEarliestTimeInBacklog", Arrays.toString(new Object[] {false})); + if (response.statusCode() != 200) { + throw new PulsarAdminException( + String.format("failed to get partitioned stats of topic %s/%s/%s, status code %s, body : %s", + tenant, namespace, encodedTopic, response.statusCode(), response.body())); + } + return JacksonService.toObject(response.body(), PartitionedTopicStats.class); + } catch (IOException | InterruptedException e) { + throw new PulsarAdminException(e); + } + } + public PartitionedTopicMetadata getPartitionedMetadata(String tenant, String namespace, String encodedTopic, boolean checkAllowAutoCreation, boolean authoritative) throws PulsarAdminException { diff --git a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/PartitionedTopicStats.java b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/PartitionedTopicStats.java index ae1ba11..3ba49de 100644 --- a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/PartitionedTopicStats.java +++ b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/PartitionedTopicStats.java @@ -13,7 +13,7 @@ @NoArgsConstructor @AllArgsConstructor @ToString -public class PartitionedTopicStats extends TopicStats{ +public class PartitionedTopicStats extends TopicStats { public PartitionedTopicMetadata metadata; diff --git a/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/NonPersistentTopicsTest.java b/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/NonPersistentTopicsTest.java index 1bd6943..4cedfcc 100644 --- a/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/NonPersistentTopicsTest.java +++ b/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/NonPersistentTopicsTest.java @@ -58,6 +58,16 @@ public void partitionedTopicsTest() throws PulsarAdminException { pulsarAdmin.nonPersistentTopics().getList(tenant, namespace, null, false)); } + @Test + public void getPartitionedStatsTest() throws PulsarAdminException { + String namespace = RandomUtil.randomString(); + String topic = RandomUtil.randomString(); + pulsarAdmin.namespaces().createNamespace(tenant, namespace); + pulsarAdmin.nonPersistentTopics().createPartitionedTopic(tenant, namespace, topic, 2, false); + Assertions.assertNotNull(pulsarAdmin.nonPersistentTopics().getPartitionedStats(tenant, namespace, + topic, false)); + } + @Test public void nonPartitionedTopicsTest() throws PulsarAdminException { String namespace = RandomUtil.randomString(); diff --git a/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java b/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java index 9636a76..1a82837 100644 --- a/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java +++ b/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java @@ -192,4 +192,13 @@ public void retentionTest() throws PulsarAdminException { false, false, false)); } + @Test + public void getPartitionedStatsTest() throws PulsarAdminException { + String namespace = RandomUtil.randomString(); + String topic = RandomUtil.randomString(); + pulsarAdmin.namespaces().createNamespace(tenant, namespace); + pulsarAdmin.persistentTopics().createPartitionedTopic(tenant, namespace, topic, 2, false); + Assertions.assertNotNull(pulsarAdmin.persistentTopics().getPartitionedStats(tenant, namespace, topic, false)); + } + }