diff --git a/conf/standalone.conf b/conf/standalone.conf index 2036556da4385..87267f580716a 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -212,7 +212,7 @@ clientLibraryVersionCheckEnabled=false # Path for the file used to determine the rotation status for the broker when responding # to service discovery health checks -statusFilePath=/usr/local/apache/htdocs +statusFilePath= # Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending # messages to consumer once, this limit reaches until consumer starts acknowledging messages back diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 7eb1f2fae09b6..45a7d407bd0de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -360,12 +360,37 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require public void addStaticResources(String basePath, String resourcePath) { ContextHandler capHandler = new ContextHandler(); capHandler.setContextPath(basePath); - ResourceHandler resHandler = new ResourceHandler(); - resHandler.setBaseResource(Resource.newClassPathResource(resourcePath)); - resHandler.setEtags(true); - resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL); - capHandler.setHandler(resHandler); - handlers.add(capHandler); + if (resourcePath != null && !resourcePath.isEmpty()) { + ResourceHandler resHandler = new ResourceHandler(); + resHandler.setBaseResource(Resource.newClassPathResource(resourcePath)); + resHandler.setEtags(true); + resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL); + capHandler.setHandler(resHandler); + handlers.add(capHandler); + } else { + // If statusFilePath is not set, return OK when the broker is ready, or 404 if not. + capHandler.setHandler(new DefaultHandler() { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, + javax.servlet.http.HttpServletRequest request, + javax.servlet.http.HttpServletResponse response) throws IOException { + if (target.equals("/status.html") ) { + if(pulsar.getState() == PulsarService.State.Started) { + response.setContentType("text/plain"); + response.setStatus(HttpServletResponse.SC_OK); + response.getWriter().write("OK"); + baseRequest.setHandled(true); + } else { + response.setStatus(HttpServletResponse.SC_NOT_FOUND); + baseRequest.setHandled(true); + } + } else { + super.handle(target, baseRequest, request, response); + } + } + }); + handlers.add(capHandler); + } } public void start() throws PulsarServerException { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index ef1e0cc1feade..3f0a0da0514af 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -284,8 +284,8 @@ default Optional getNativeSchema() { * @return a Schema instance */ static Schema PROTOBUF(Class clazz) { - return DefaultImplementation.getDefaultImplementation() - .newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build()); + return DefaultImplementation.getDefaultImplementation().newProtobufSchema(clazz); + } /** @@ -297,7 +297,6 @@ static Schema PROTOBUF(Cla static Schema PROTOBUF(SchemaDefinition schemaDefinition) { return DefaultImplementation.getDefaultImplementation().newProtobufSchema(schemaDefinition); } - /** * Create a Protobuf-Native schema type by extracting the fields of the specified class. * @@ -305,8 +304,8 @@ static Schema PROTOBUF(Sch * @return a Schema instance */ static Schema PROTOBUF_NATIVE(Class clazz) { - return DefaultImplementation.getDefaultImplementation() - .newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build()); + return DefaultImplementation.getDefaultImplementation().newProtobufNativeSchema(clazz); + } /** @@ -327,8 +326,7 @@ static Schema PROTOBUF_NAT * @return a Schema instance */ static Schema AVRO(Class pojo) { - return DefaultImplementation.getDefaultImplementation() - .newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build()); + return DefaultImplementation.getDefaultImplementation().newAvroSchema(pojo); } /** @@ -348,8 +346,7 @@ static Schema AVRO(SchemaDefinition schemaDefinition) { * @return a Schema instance */ static Schema JSON(Class pojo) { - return DefaultImplementation.getDefaultImplementation() - .newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build()); + return DefaultImplementation.getDefaultImplementation().newJSONSchema(pojo); } /** diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index 8fd05bff265f1..2177ec7128301 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -122,6 +122,14 @@ Schema newProtobufNativeSc Schema newJSONSchema(SchemaDefinition schemaDefinition); + Schema newProtobufSchema(Class clazz); + + Schema newProtobufNativeSchema(Class clazz); + + Schema newJSONSchema(Class pojo); + + Schema newAvroSchema(Class pojo); + Schema newAutoConsumeSchema(); Schema newAutoProduceSchema(); diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index e1a70ed074833..e2463200df154 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -127,6 +127,11 @@ commons-codec + + com.google.guava + guava + + com.yahoo.datasketches sketches-core diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index 346eb20ef4cc5..d9f433acf3fc4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -84,13 +84,24 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfoWithVersion; import org.apache.pulsar.common.schema.SchemaType; - +import com.google.common.collect.MapMaker; +import java.util.concurrent.ConcurrentMap; /** * Helper class for class instantiations and it also contains methods to work with schemas. */ @SuppressWarnings("unchecked") public final class PulsarClientImplementationBindingImpl implements PulsarClientImplementationBinding { + private static final PulsarClientImplementationBinding IMPLEMENTATION; + + private static final ConcurrentMap, Schema> PROTOBUF_CACHE = new MapMaker().weakKeys().makeMap(); + + private static final ConcurrentMap, Schema> PROTOBUF_NATIVE_CACHE = new MapMaker().weakKeys().makeMap(); + + private static final ConcurrentMap, Schema> AVRO_CACHE = new MapMaker().weakKeys().makeMap(); + + private static final ConcurrentMap, Schema> JSON_CACHE = new MapMaker().weakKeys().makeMap(); + public SchemaDefinitionBuilder newSchemaDefinitionBuilder() { return new SchemaDefinitionBuilderImpl(); } @@ -208,21 +219,42 @@ public Schema newLocalDateTimeSchema() { public Schema newAvroSchema(SchemaDefinition schemaDefinition) { return AvroSchema.of(schemaDefinition); } + + public Schema newAvroSchema(Class pojo) { + return (Schema) AVRO_CACHE.computeIfAbsent(pojo, + k -> AvroSchema.of(SchemaDefinition.builder().withPojo(pojo).build())).clone(); + } public Schema newProtobufSchema( SchemaDefinition schemaDefinition) { return ProtobufSchema.of(schemaDefinition); } + public Schema newProtobufSchema(Class clazz) { + return (Schema) PROTOBUF_CACHE.computeIfAbsent(clazz, + k -> ProtobufSchema.of(SchemaDefinition.builder().withPojo(clazz).build())).clone(); + } + + public Schema newProtobufNativeSchema( SchemaDefinition schemaDefinition) { return ProtobufNativeSchema.of(schemaDefinition); } + public Schema newProtobufNativeSchema(Class clazz) { + return (Schema) PROTOBUF_NATIVE_CACHE.computeIfAbsent(clazz, + k -> ProtobufNativeSchema.of(SchemaDefinition.builder().withPojo(clazz).build())).clone(); + } + public Schema newJSONSchema(SchemaDefinition schemaDefinition) { return JSONSchema.of(schemaDefinition); } + public Schema newJSONSchema(Class pojo) { + return (Schema) JSON_CACHE.computeIfAbsent(pojo, + k -> JsonSchema.of(SchemaDefinition.builder().withPojo(pojo).build())).clone(); + } + public Schema newAutoConsumeSchema() { return new AutoConsumeSchema(); }