Skip to content

Commit

Permalink
[Enhancement] Add schema cache to improve performance
Browse files Browse the repository at this point in the history
- Add SchemaCache implementation
- Add cache configuration and metrics
- Add cleanup strategy
- Modify Schema creation methods
- Add unit tests and performance tests

This closes apache#23707
  • Loading branch information
qingfeng503 committed Jan 3, 2025
1 parent f199e88 commit 00dc375
Show file tree
Hide file tree
Showing 6 changed files with 892 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,9 @@ default Optional<Object> getNativeSchema() {
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> clazz) {
return DefaultImplementation.getDefaultImplementation()
.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
return SchemaCache.getOrCreateSchema(clazz, SchemaType.PROTOBUF,
cls -> DefaultImplementation.getDefaultImplementation()
.newProtobufSchema(SchemaDefinition.builder().withPojo(cls).build()));
}

/**
Expand All @@ -295,18 +296,18 @@ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Cla
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition) {
return DefaultImplementation.getDefaultImplementation().newProtobufSchema(schemaDefinition);
return DefaultImplementation.getDefaultImplementation().newProtobufNativeSchema(schemaDefinition);
}

/**
* Create a Protobuf-Native schema type by extracting the fields of the specified class.
*
* @param clazz the Protobuf generated class to be used to extract the schema
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF_NATIVE(Class<T> clazz) {
return DefaultImplementation.getDefaultImplementation()
.newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build());
return SchemaCache.getOrCreateSchema(clazz, SchemaType.JSON,
cls -> DefaultImplementation.getDefaultImplementation()
.newJSONSchema(SchemaDefinition.builder().withPojo(cls).build()));
}

/**
Expand All @@ -327,8 +328,9 @@ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF_NAT
* @return a Schema instance
*/
static <T> Schema<T> AVRO(Class<T> pojo) {
return DefaultImplementation.getDefaultImplementation()
.newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build());
return SchemaCache.getOrCreateSchema(pojo, SchemaType.JSON,
cls -> DefaultImplementation.getDefaultImplementation()
.newJSONSchema(SchemaDefinition.builder().withPojo(cls).build()));
}

/**
Expand All @@ -348,8 +350,9 @@ static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) {
* @return a Schema instance
*/
static <T> Schema<T> JSON(Class<T> pojo) {
return DefaultImplementation.getDefaultImplementation()
.newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build());
return SchemaCache.getOrCreateSchema(pojo, SchemaType.JSON,
cls -> DefaultImplementation.getDefaultImplementation()
.newJSONSchema(SchemaDefinition.builder().withPojo(cls).build()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.schema;

import java.util.Iterator;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

/**
* Strategy for cleaning up expired entries from schema cache.
*/
@Slf4j
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SchemaCacheCleanupStrategy {

private final long expireTimeMillis;
private final Map<Class<?>, Long> lastAccessTimes;

// Threshold for minimum number of entries to keep in cache
private static final int MIN_ENTRIES = 10;

// Maximum number of entries to remove in single cleanup
private static final int MAX_REMOVE_PER_CLEANUP = 100;

/**
* Create a new cleanup strategy.
*
* @param expireSeconds time in seconds after which entries expire
*/
public SchemaCacheCleanupStrategy(long expireSeconds) {
this.expireTimeMillis = expireSeconds * 1000;
this.lastAccessTimes = new ConcurrentHashMap<>();

if (log.isDebugEnabled()) {
log.debug("Initialized schema cache cleanup strategy with expire time: {} seconds", expireSeconds);
}
}

/**
* Record access time for a class.
*
* @param clazz the class being accessed
*/
public void recordAccess(Class<?> clazz) {
lastAccessTimes.put(clazz, System.currentTimeMillis());

if (log.isTraceEnabled()) {
log.trace("Recorded access for class: {}", clazz.getName());
}
}

/**
* Check if an entry should be evicted based on its last access time.
*
* @param clazz the class to check
* @return true if the entry should be evicted
*/
public boolean shouldEvict(Class<?> clazz) {
Long lastAccess = lastAccessTimes.get(clazz);
if (lastAccess == null) {
if (log.isDebugEnabled()) {
log.debug("No last access time found for class: {}, marking for eviction", clazz.getName());
}
return true;
}

boolean shouldEvict = System.currentTimeMillis() - lastAccess > expireTimeMillis;

if (log.isDebugEnabled() && shouldEvict) {
log.debug("Class {} marked for eviction, last access time: {}", clazz.getName(), lastAccess);
}

return shouldEvict;
}

/**
* Clean up expired entries from the cache.
*
* @param cache the schema cache to clean
*/
public void cleanup(WeakHashMap<Class<?>, Schema<?>> cache) {
if (cache.size() <= MIN_ENTRIES) {
if (log.isDebugEnabled()) {
log.debug("Cache size {} is below minimum threshold {}, skipping cleanup",
cache.size(), MIN_ENTRIES);
}
return;
}

int removedCount = 0;
Iterator<Map.Entry<Class<?>, Schema<?>>> iterator = cache.entrySet().iterator();

while (iterator.hasNext() && removedCount < MAX_REMOVE_PER_CLEANUP) {
Map.Entry<Class<?>, Schema<?>> entry = iterator.next();
Class<?> clazz = entry.getKey();

if (shouldEvict(clazz)) {
iterator.remove();
lastAccessTimes.remove(clazz);
removedCount++;
SchemaCacheMetrics.recordEviction();

if (log.isDebugEnabled()) {
log.debug("Evicted schema for class: {}", clazz.getName());
}
}
}

if (log.isInfoEnabled() && removedCount > 0) {
log.info("Cleaned up {} expired entries from schema cache", removedCount);
}
}

/**
* Clear all recorded access times.
*/
public void clear() {
lastAccessTimes.clear();
if (log.isDebugEnabled()) {
log.debug("Cleared all recorded access times");
}
}

/**
* Get the number of entries being tracked.
*
* @return number of entries with recorded access times
*/
public int getTrackedEntryCount() {
return lastAccessTimes.size();
}

/**
* Get the expiration time in milliseconds.
*
* @return expiration time in milliseconds
*/
public long getExpireTimeMillis() {
return expireTimeMillis;
}

/**
* Check if a class has been accessed.
*
* @param clazz the class to check
* @return true if the class has a recorded access time
*/
public boolean hasAccessRecord(Class<?> clazz) {
return lastAccessTimes.containsKey(clazz);
}

/**
* Get the last access time for a class.
*
* @param clazz the class to check
* @return last access time in milliseconds, or null if never accessed
*/
public Long getLastAccessTime(Class<?> clazz) {
return lastAccessTimes.get(clazz);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.schema;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

/**
* Exception thrown when schema cache operations fail.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SchemaCacheException extends PulsarClientException {
private static final long serialVersionUID = 1L;

/**
* Constructs an exception with the specified error message.
*
* @param message the error message
*/
public SchemaCacheException(String message) {
super(message);
}

/**
* Constructs an exception with the specified error message and cause.
*
* @param message the error message
* @param cause the cause of the error
*/
public SchemaCacheException(String message, Throwable cause) {
super(message, cause);
}

/**
* Constructs an exception with the specified cause.
*
* @param cause the cause of the error
*/
public SchemaCacheException(Throwable cause) {
super(cause);
}

/**
* Creates an exception indicating schema creation failure.
*
* @param className the name of the class for which schema creation failed
* @param cause the cause of the failure
* @return a new SchemaCacheException instance
*/
public static SchemaCacheException schemaCreationFailed(String className, Throwable cause) {
return new SchemaCacheException(
String.format("Failed to create schema for class: %s", className), cause);
}

/**
* Creates an exception indicating schema cloning failure.
*
* @param className the name of the class for which schema cloning failed
* @param cause the cause of the failure
* @return a new SchemaCacheException instance
*/
public static SchemaCacheException schemaCloneFailed(String className, Throwable cause) {
return new SchemaCacheException(
String.format("Failed to clone schema for class: %s", className), cause);
}

/**
* Creates an exception indicating cache initialization failure.
*
* @param cause the cause of the failure
* @return a new SchemaCacheException instance
*/
public static SchemaCacheException initializationFailed(Throwable cause) {
return new SchemaCacheException("Failed to initialize schema cache", cause);
}

/**
* Creates an exception indicating cache configuration error.
*
* @param message the error message describing the configuration problem
* @return a new SchemaCacheException instance
*/
public static SchemaCacheException invalidConfiguration(String message) {
return new SchemaCacheException("Invalid schema cache configuration: " + message);
}

/**
* Creates an exception indicating cache access error.
*
* @param operation the operation that failed
* @param cause the cause of the failure
* @return a new SchemaCacheException instance
*/
public static SchemaCacheException operationFailed(String operation, Throwable cause) {
return new SchemaCacheException(
String.format("Schema cache operation failed: %s", operation), cause);
}

@Override
public String toString() {
if (getCause() != null) {
return String.format("SchemaCacheException: %s, caused by: %s",
getMessage(), getCause().toString());
}
return String.format("SchemaCacheException: %s", getMessage());
}
}
Loading

0 comments on commit 00dc375

Please sign in to comment.