Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][client]Fix status html endpoint #23814

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ clientLibraryVersionCheckEnabled=false

# Path for the file used to determine the rotation status for the broker when responding
# to service discovery health checks
statusFilePath=/usr/local/apache/htdocs
statusFilePath=

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,37 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require
public void addStaticResources(String basePath, String resourcePath) {
ContextHandler capHandler = new ContextHandler();
capHandler.setContextPath(basePath);
ResourceHandler resHandler = new ResourceHandler();
resHandler.setBaseResource(Resource.newClassPathResource(resourcePath));
resHandler.setEtags(true);
resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL);
capHandler.setHandler(resHandler);
handlers.add(capHandler);
if (resourcePath != null && !resourcePath.isEmpty()) {
ResourceHandler resHandler = new ResourceHandler();
resHandler.setBaseResource(Resource.newClassPathResource(resourcePath));
resHandler.setEtags(true);
resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL);
capHandler.setHandler(resHandler);
handlers.add(capHandler);
} else {
// If statusFilePath is not set, return OK when the broker is ready, or 404 if not.
capHandler.setHandler(new DefaultHandler() {
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest,
javax.servlet.http.HttpServletRequest request,
javax.servlet.http.HttpServletResponse response) throws IOException {
if (target.equals("/status.html") ) {
if(pulsar.getState() == PulsarService.State.Started) {
response.setContentType("text/plain");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().write("OK");
baseRequest.setHandled(true);
} else {
response.setStatus(HttpServletResponse.SC_NOT_FOUND);
baseRequest.setHandled(true);
}
} else {
super.handle(target, baseRequest, request, response);
}
}
});
handlers.add(capHandler);
}
}

public void start() throws PulsarServerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ default Optional<Object> getNativeSchema() {
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> clazz) {
return DefaultImplementation.getDefaultImplementation()
.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
return DefaultImplementation.getDefaultImplementation().newProtobufSchema(clazz);

}

/**
Expand All @@ -297,16 +297,15 @@ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Cla
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition) {
return DefaultImplementation.getDefaultImplementation().newProtobufSchema(schemaDefinition);
}

/**
* Create a Protobuf-Native schema type by extracting the fields of the specified class.
*
* @param clazz the Protobuf generated class to be used to extract the schema
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF_NATIVE(Class<T> clazz) {
return DefaultImplementation.getDefaultImplementation()
.newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build());
return DefaultImplementation.getDefaultImplementation().newProtobufNativeSchema(clazz);

}

/**
Expand All @@ -327,8 +326,7 @@ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF_NAT
* @return a Schema instance
*/
static <T> Schema<T> AVRO(Class<T> pojo) {
return DefaultImplementation.getDefaultImplementation()
.newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build());
return DefaultImplementation.getDefaultImplementation().newAvroSchema(pojo);
}

/**
Expand All @@ -348,8 +346,7 @@ static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) {
* @return a Schema instance
*/
static <T> Schema<T> JSON(Class<T> pojo) {
return DefaultImplementation.getDefaultImplementation()
.newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build());
return DefaultImplementation.getDefaultImplementation().newJSONSchema(pojo);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufNativeSc

<T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition);

<T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz);

<T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufNativeSchema(Class<T> clazz);

<T> Schema<T> newJSONSchema(Class<T> pojo);

<T> Schema<T> newAvroSchema(Class<T> pojo);

Schema<GenericRecord> newAutoConsumeSchema();

Schema<byte[]> newAutoProduceSchema();
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@
<artifactId>commons-codec</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,24 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
import org.apache.pulsar.common.schema.SchemaType;

import com.google.common.collect.MapMaker;
import java.util.concurrent.ConcurrentMap;
/**
* Helper class for class instantiations and it also contains methods to work with schemas.
*/
@SuppressWarnings("unchecked")
public final class PulsarClientImplementationBindingImpl implements PulsarClientImplementationBinding {

private static final PulsarClientImplementationBinding IMPLEMENTATION;

private static final ConcurrentMap<Class<?>, Schema<?>> PROTOBUF_CACHE = new MapMaker().weakKeys().makeMap();

private static final ConcurrentMap<Class<?>, Schema<?>> PROTOBUF_NATIVE_CACHE = new MapMaker().weakKeys().makeMap();

private static final ConcurrentMap<Class<?>, Schema<?>> AVRO_CACHE = new MapMaker().weakKeys().makeMap();

private static final ConcurrentMap<Class<?>, Schema<?>> JSON_CACHE = new MapMaker().weakKeys().makeMap();

public <T> SchemaDefinitionBuilder<T> newSchemaDefinitionBuilder() {
return new SchemaDefinitionBuilderImpl();
}
Expand Down Expand Up @@ -208,21 +219,42 @@ public Schema<LocalDateTime> newLocalDateTimeSchema() {
public <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
return AvroSchema.of(schemaDefinition);
}

public <T> Schema<T> newAvroSchema(Class<T> pojo) {
return (Schema<T>) AVRO_CACHE.computeIfAbsent(pojo,
k -> AvroSchema.of(SchemaDefinition.builder().withPojo(pojo).build())).clone();
}

public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(
SchemaDefinition schemaDefinition) {
return ProtobufSchema.of(schemaDefinition);
}

public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
return (Schema<T>) PROTOBUF_CACHE.computeIfAbsent(clazz,
k -> ProtobufSchema.of(SchemaDefinition.builder().withPojo(clazz).build())).clone();
}


public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufNativeSchema(
SchemaDefinition schemaDefinition) {
return ProtobufNativeSchema.of(schemaDefinition);
}

public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufNativeSchema(Class<T> clazz) {
return (Schema<T>) PROTOBUF_NATIVE_CACHE.computeIfAbsent(clazz,
k -> ProtobufNativeSchema.of(SchemaDefinition.builder().withPojo(clazz).build())).clone();
}

public <T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition) {
return JSONSchema.of(schemaDefinition);
}

public <T> Schema<T> newJSONSchema(Class<T> pojo) {
return (Schema<T>) JSON_CACHE.computeIfAbsent(pojo,
k -> JsonSchema.of(SchemaDefinition.builder().withPojo(pojo).build())).clone();
}

public Schema<GenericRecord> newAutoConsumeSchema() {
return new AutoConsumeSchema();
}
Expand Down
Loading