Skip to content

Commit

Permalink
refactor: support jdk8
Browse files Browse the repository at this point in the history
Signed-off-by: moxiaoying <[email protected]>
  • Loading branch information
CennyMo committed Jan 12, 2025
1 parent c9fb5ae commit 7d7ab18
Show file tree
Hide file tree
Showing 17 changed files with 100 additions and 82 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

![License](https://img.shields.io/badge/license-Apache2.0-green) ![Language](https://img.shields.io/badge/language-Java-blue.svg) [![version](https://img.shields.io/github/v/tag/protocol-laboratory/pulsar-admin-java?label=release&color=blue)](https://github.com/protocol-laboratory/pulsar-admin-java/releases) [![codecov](https://codecov.io/gh/protocol-laboratory/pulsar-admin-java/branch/main/graph/badge.svg)](https://codecov.io/gh/protocol-laboratory/pulsar-admin-java)

This is a simple alternative to pulsar-admin built using the built-in HTTP client of the JDK. It minimizes project dependencies and requires a minimum of JDK 17.
This is a simple alternative to pulsar-admin built using the built-in HTTP client of the JDK. It minimizes project dependencies and requires a minimum of JDK 8.

Features:
- Basic functionality for managing Pulsar clusters
- Built on top of the built-in HTTP client of the JDK
- Minimizes project dependencies to provide a lightweight solution
- Requires a minimum of JDK 17
- Requires a minimum of JDK 8

Notices:
- currently disable hostname verification is not available. if you want to disable it ,please set property like : **System.setProperty("jdk.internal.httpclient.disableHostnameVerification", "true")**, but it's better to provide valid certificates.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
</modules>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<src.dir>src/main/java</src.dir>
<!-- dependency -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class JacksonService {
Expand Down Expand Up @@ -41,7 +42,7 @@ public static <T> T toRefer(byte[] json, TypeReference<T> ref) throws IOExceptio

public static <T> List<T> toList(byte[] json, TypeReference<List<T>> typeRef) throws IOException {
if (json == null || json.length == 0) {
return List.of();
return new ArrayList<>();
}
return MAPPER.readValue(json, typeRef);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public List getList(String tenant, String namespace, String bundle, boolean incl
+ "under namespace %s/%s, status code %s, body : %s",
tenant, namespace, response.statusCode(), response.bodyAsString()));
}
return JacksonService.toRefer(response.body(), new TypeReference<>() {
return JacksonService.toRefer(response.body(), new TypeReference<List<String>>() {
});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
Expand All @@ -183,7 +183,7 @@ public List getPartitionedTopicList(String tenant, String namespace, boolean inc
+ "status code %s, body : %s",
tenant, namespace, response.statusCode(), response.bodyAsString()));
}
return JacksonService.toRefer(response.body(), new TypeReference<>() {
return JacksonService.toRefer(response.body(), new TypeReference<List<String>>() {
});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void healthcheck(TopicVersion topicVersion) throws PulsarAdminException {
httpResponse.statusCode());
}
if (!httpResponse.bodyAsString().equals("ok")) {
throw new PulsarAdminException("healthcheck failed, body: " + httpResponse.body());
throw new PulsarAdminException("healthcheck failed, body: " + httpResponse.bodyAsString());

Check warning on line 24 in pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/Brokers.java

View check run for this annotation

Codecov / codecov/patch

pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/Brokers.java#L24

Added line #L24 was not covered by tests
}
} catch (Exception e) {
throw new PulsarAdminException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public List<String> getClusters() throws PulsarAdminException {
String.format("failed to get list of clusters, "
+ "status code %s, body : %s", response.statusCode(), response.body()));
}
return JacksonService.toRefer(response.body(), new TypeReference<>() {});
return JacksonService.toRefer(response.body(), new TypeReference<List<String>>() {});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,7 +62,7 @@ public HttpResponse post(String url, Object body, String... params)
private HttpResponse innerPost(String url, byte[] body, String... params)
throws InterruptedException, ExecutionException {
Map<String, List<String>> headers = new HashMap<>();
headers.put("Content-Type", List.of("application/json"));
headers.put("Content-Type", Arrays.asList("application/json"));
HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.POST, headers, body);
return client.send(request).get();
}
Expand All @@ -82,7 +83,7 @@ public HttpResponse put(String url, Object body, String... params)
private HttpResponse innerPut(String url, byte[] body, String... params)
throws InterruptedException, ExecutionException {
Map<String, List<String>> headers = new HashMap<>();
headers.put("Content-Type", List.of("application/json"));
headers.put("Content-Type", Arrays.asList("application/json"));
HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.PUT, headers, body);
return client.send(request).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public List<String> getTenantNamespaces(String tenant) throws PulsarAdminExcepti
String.format("failed to get namespaces of tenant %s, status code %s, body : %s",
tenant, response.statusCode(), response.bodyAsString()));
}
return JacksonService.toList(response.body(), new TypeReference<>() {
return JacksonService.toList(response.body(), new TypeReference<List<String>>() {
});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
Expand All @@ -43,7 +43,7 @@ public List<String> getTopics(String tenant, String namespace, Mode mode, boolea
String.format("failed to get topics of namespace %s/%s, status code %s, body : %s",
tenant, namespace, response.statusCode(), response.bodyAsString()));
}
return JacksonService.toList(response.body(), new TypeReference<>() {
return JacksonService.toList(response.body(), new TypeReference<List<String>>() {
});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
Expand Down Expand Up @@ -90,7 +90,7 @@ public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String tenant, Str
"failed to get backlog quota map of namespace %s/%s, status code %s, body : %s",
tenant, namespace, response.statusCode(), response.bodyAsString()));
}
return JacksonService.toRefer(response.body(), new TypeReference<>() {
return JacksonService.toRefer(response.body(), new TypeReference<Map<BacklogQuotaType, BacklogQuota>>() {
});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public List<String> getTenants() throws PulsarAdminException {
throw new PulsarAdminException(String.format("failed to get list of tenant, status code %s, body : %s",
response.statusCode(), response.bodyAsString()));
}
return JacksonService.toList(response.body(), new TypeReference<>() {});
return JacksonService.toList(response.body(), new TypeReference<List<String>>() {});
} catch (IOException | InterruptedException | ExecutionException e) {
throw new PulsarAdminException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Arrays;

public class ClustersTest {

Expand All @@ -24,7 +24,7 @@ public static void teardown() throws Exception {

@Test
public void getClustersTest() throws PulsarAdminException {
Assertions.assertEquals(List.of("standalone"),
Assertions.assertEquals(Arrays.asList("standalone"),
PulsarAdmin.builder().port(SERVER.getWebPort()).build().clusters().getClusters());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package io.github.protocol.pulsar.admin.jdk;

import com.google.common.collect.ImmutableMap;
import io.github.embedded.pulsar.core.EmbeddedPulsarServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

public class NamespacesTest {
Expand All @@ -20,7 +19,7 @@ public class NamespacesTest {

private static final TenantInfo initialTenantInfo = (new TenantInfo.TenantInfoBuilder())
.adminRoles(new HashSet<>(0))
.allowedClusters(Set.of(CLUSTER_STANDALONE)).build();
.allowedClusters(new HashSet<>(Arrays.asList(CLUSTER_STANDALONE))).build();

private static PulsarAdmin pulsarAdmin;

Expand All @@ -41,16 +40,16 @@ public void namespaceTest() throws PulsarAdminException {
String namespace = RandomUtil.randomString();
pulsarAdmin.tenants().createTenant(tenant, initialTenantInfo);
pulsarAdmin.namespaces().createNamespace(tenant, namespace);
Assertions.assertEquals(List.of(), pulsarAdmin.namespaces()
.getTopics(tenant, namespace, Mode.PERSISTENT, true));
Assertions.assertEquals(List.of(), pulsarAdmin.namespaces()
Assertions.assertEquals(Arrays.asList(), pulsarAdmin.namespaces()
.getTopics(tenant, namespace, Mode.PERSISTENT, true));
Assertions.assertEquals(Arrays.asList(), pulsarAdmin.namespaces()
.getTopics(tenant, namespace, Mode.NON_PERSISTENT, true));
Assertions.assertEquals(List.of(), pulsarAdmin.namespaces()
Assertions.assertEquals(Arrays.asList(), pulsarAdmin.namespaces()
.getTopics(tenant, namespace, Mode.ALL, false));
Assertions.assertEquals(
List.of(tenant + "/" + namespace), pulsarAdmin.namespaces().getTenantNamespaces(tenant));
Arrays.asList(tenant + "/" + namespace), pulsarAdmin.namespaces().getTenantNamespaces(tenant));
pulsarAdmin.namespaces().deleteNamespace(tenant, namespace, false, false);
Assertions.assertEquals(List.of(), pulsarAdmin.namespaces().getTenantNamespaces(tenant));
Assertions.assertEquals(Arrays.asList(), pulsarAdmin.namespaces().getTenantNamespaces(tenant));
}

@Test
Expand All @@ -72,18 +71,18 @@ public void namespacesBacklogQuotaTest() throws PulsarAdminException, Interrupte
pulsarAdmin.namespaces().setBacklogQuota(tenant, namespace, BacklogQuotaType.message_age, backlogQuota1);
pulsarAdmin.namespaces().setBacklogQuota(tenant, namespace,
BacklogQuotaType.destination_storage, backlogQuota2);
Assertions.assertEquals(Map.of(
Assertions.assertEquals(ImmutableMap.of(
BacklogQuotaType.destination_storage, backlogQuota2,
BacklogQuotaType.message_age, backlogQuota1),
new TreeMap<>(pulsarAdmin.namespaces().getBacklogQuotaMap(tenant, namespace)));
backlogQuota1.setPolicy(RetentionPolicy.producer_exception);
pulsarAdmin.namespaces().setBacklogQuota(tenant, namespace, BacklogQuotaType.message_age, backlogQuota1);
Assertions.assertEquals(Map.of(
Assertions.assertEquals(ImmutableMap.of(
BacklogQuotaType.destination_storage, backlogQuota2,
BacklogQuotaType.message_age, backlogQuota1),
new TreeMap<>(pulsarAdmin.namespaces().getBacklogQuotaMap(tenant, namespace)));
pulsarAdmin.namespaces().removeBacklogQuota(tenant, namespace, BacklogQuotaType.message_age);
Assertions.assertEquals(Map.of(BacklogQuotaType.destination_storage, backlogQuota2),
Assertions.assertEquals(ImmutableMap.of(BacklogQuotaType.destination_storage, backlogQuota2),
new TreeMap<>(pulsarAdmin.namespaces().getBacklogQuotaMap(tenant, namespace)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class NonPersistentTopicsTest {

Expand All @@ -25,8 +24,8 @@ public static void setup() throws Exception {
SERVER.start();
pulsarAdmin = PulsarAdmin.builder().port(SERVER.getWebPort()).build();
TenantInfo initialTenantInfo = (new TenantInfo.TenantInfoBuilder())
.adminRoles(new HashSet<>(0))
.allowedClusters(Set.of(CLUSTER_STANDALONE)).build();
.adminRoles(new HashSet<>(0))
.allowedClusters(new HashSet<>(Arrays.asList(CLUSTER_STANDALONE))).build();
pulsarAdmin.tenants().createTenant(tenant, initialTenantInfo);
}

Expand All @@ -41,20 +40,20 @@ public void partitionedTopicsTest() throws PulsarAdminException {
String topic = RandomUtil.randomString();
pulsarAdmin.namespaces().createNamespace(tenant, namespace);
pulsarAdmin.nonPersistentTopics().createPartitionedTopic(tenant, namespace, topic, 2, false);
Assertions.assertEquals(List.of(String.format("non-persistent://%s/%s/%s", tenant, namespace, topic)),
Assertions.assertEquals(Arrays.asList(String.format("non-persistent://%s/%s/%s", tenant, namespace, topic)),
pulsarAdmin.nonPersistentTopics().getPartitionedTopicList(tenant, namespace, false));
Assertions.assertEquals(2, pulsarAdmin.nonPersistentTopics().getPartitionedMetadata(tenant, namespace,
topic, false, false).getPartitions());
pulsarAdmin.nonPersistentTopics().updatePartitionedTopic(tenant, namespace, topic, false, false, false, 3);
Assertions.assertEquals(List.of(String.format("non-persistent://%s/%s/%s", tenant, namespace, topic)),
Assertions.assertEquals(Arrays.asList(String.format("non-persistent://%s/%s/%s", tenant, namespace, topic)),
pulsarAdmin.nonPersistentTopics().getPartitionedTopicList(tenant, namespace, false));
Assertions.assertEquals(3, pulsarAdmin.nonPersistentTopics().getPartitionedMetadata(tenant, namespace,
topic, false, false).getPartitions());
pulsarAdmin.nonPersistentTopics().deletePartitionedTopic(tenant, namespace, topic, false, false);
Assertions.assertEquals(List.of(),
Assertions.assertEquals(Arrays.asList(),
pulsarAdmin.nonPersistentTopics().getPartitionedTopicList(tenant, namespace, false));
Assertions.assertEquals(
List.of(),
Arrays.asList(),
pulsarAdmin.nonPersistentTopics().getList(tenant, namespace, null, false));
}

Expand All @@ -75,11 +74,11 @@ public void nonPartitionedTopicsTest() throws PulsarAdminException {
pulsarAdmin.namespaces().createNamespace(tenant, namespace);
pulsarAdmin.nonPersistentTopics().createNonPartitionedTopic(tenant, namespace, topic, false, null);
Assertions.assertEquals(
List.of(String.format("non-persistent://%s/%s/%s", tenant, namespace, topic)),
Arrays.asList(String.format("non-persistent://%s/%s/%s", tenant, namespace, topic)),
pulsarAdmin.nonPersistentTopics().getList(tenant, namespace, null, false));
pulsarAdmin.nonPersistentTopics().deleteTopic(tenant, namespace, topic, false, false);
Assertions.assertEquals(
List.of(),
Arrays.asList(),
pulsarAdmin.nonPersistentTopics().getList(tenant, namespace, null, false));
}

Expand Down
Loading

0 comments on commit 7d7ab18

Please sign in to comment.