diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index ef1e0cc1feade..908fc6f81c444 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -284,8 +284,9 @@ default Optional getNativeSchema() { * @return a Schema instance */ static Schema PROTOBUF(Class clazz) { - return DefaultImplementation.getDefaultImplementation() - .newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build()); + return SchemaCache.getOrCreateSchema(clazz, SchemaType.PROTOBUF, + cls -> DefaultImplementation.getDefaultImplementation() + .newProtobufSchema(SchemaDefinition.builder().withPojo(cls).build())); } /** @@ -295,9 +296,8 @@ static Schema PROTOBUF(Cla * @return a Schema instance */ static Schema PROTOBUF(SchemaDefinition schemaDefinition) { - return DefaultImplementation.getDefaultImplementation().newProtobufSchema(schemaDefinition); + return DefaultImplementation.getDefaultImplementation().newProtobufNativeSchema(schemaDefinition); } - /** * Create a Protobuf-Native schema type by extracting the fields of the specified class. * @@ -305,8 +305,9 @@ static Schema PROTOBUF(Sch * @return a Schema instance */ static Schema PROTOBUF_NATIVE(Class clazz) { - return DefaultImplementation.getDefaultImplementation() - .newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build()); + return SchemaCache.getOrCreateSchema(clazz, SchemaType.JSON, + cls -> DefaultImplementation.getDefaultImplementation() + .newJSONSchema(SchemaDefinition.builder().withPojo(cls).build())); } /** @@ -327,8 +328,9 @@ static Schema PROTOBUF_NAT * @return a Schema instance */ static Schema AVRO(Class pojo) { - return DefaultImplementation.getDefaultImplementation() - .newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build()); + return SchemaCache.getOrCreateSchema(pojo, SchemaType.JSON, + cls -> DefaultImplementation.getDefaultImplementation() + .newJSONSchema(SchemaDefinition.builder().withPojo(cls).build())); } /** @@ -348,8 +350,9 @@ static Schema AVRO(SchemaDefinition schemaDefinition) { * @return a Schema instance */ static Schema JSON(Class pojo) { - return DefaultImplementation.getDefaultImplementation() - .newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build()); + return SchemaCache.getOrCreateSchema(pojo, SchemaType.JSON, + cls -> DefaultImplementation.getDefaultImplementation() + .newJSONSchema(SchemaDefinition.builder().withPojo(cls).build())); } /** diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaCacheCleanupStrategy .java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaCacheCleanupStrategy .java new file mode 100644 index 0000000000000..d40853c6aa49e --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaCacheCleanupStrategy .java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Iterator; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Strategy for cleaning up expired entries from schema cache. + */ +@Slf4j +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SchemaCacheCleanupStrategy { + + private final long expireTimeMillis; + private final Map, Long> lastAccessTimes; + + // Threshold for minimum number of entries to keep in cache + private static final int MIN_ENTRIES = 10; + + // Maximum number of entries to remove in single cleanup + private static final int MAX_REMOVE_PER_CLEANUP = 100; + + /** + * Create a new cleanup strategy. + * + * @param expireSeconds time in seconds after which entries expire + */ + public SchemaCacheCleanupStrategy(long expireSeconds) { + this.expireTimeMillis = expireSeconds * 1000; + this.lastAccessTimes = new ConcurrentHashMap<>(); + + if (log.isDebugEnabled()) { + log.debug("Initialized schema cache cleanup strategy with expire time: {} seconds", expireSeconds); + } + } + + /** + * Record access time for a class. + * + * @param clazz the class being accessed + */ + public void recordAccess(Class clazz) { + lastAccessTimes.put(clazz, System.currentTimeMillis()); + + if (log.isTraceEnabled()) { + log.trace("Recorded access for class: {}", clazz.getName()); + } + } + + /** + * Check if an entry should be evicted based on its last access time. + * + * @param clazz the class to check + * @return true if the entry should be evicted + */ + public boolean shouldEvict(Class clazz) { + Long lastAccess = lastAccessTimes.get(clazz); + if (lastAccess == null) { + if (log.isDebugEnabled()) { + log.debug("No last access time found for class: {}, marking for eviction", clazz.getName()); + } + return true; + } + + boolean shouldEvict = System.currentTimeMillis() - lastAccess > expireTimeMillis; + + if (log.isDebugEnabled() && shouldEvict) { + log.debug("Class {} marked for eviction, last access time: {}", clazz.getName(), lastAccess); + } + + return shouldEvict; + } + + /** + * Clean up expired entries from the cache. + * + * @param cache the schema cache to clean + */ + public void cleanup(WeakHashMap, Schema> cache) { + if (cache.size() <= MIN_ENTRIES) { + if (log.isDebugEnabled()) { + log.debug("Cache size {} is below minimum threshold {}, skipping cleanup", + cache.size(), MIN_ENTRIES); + } + return; + } + + int removedCount = 0; + Iterator, Schema>> iterator = cache.entrySet().iterator(); + + while (iterator.hasNext() && removedCount < MAX_REMOVE_PER_CLEANUP) { + Map.Entry, Schema> entry = iterator.next(); + Class clazz = entry.getKey(); + + if (shouldEvict(clazz)) { + iterator.remove(); + lastAccessTimes.remove(clazz); + removedCount++; + SchemaCacheMetrics.recordEviction(); + + if (log.isDebugEnabled()) { + log.debug("Evicted schema for class: {}", clazz.getName()); + } + } + } + + if (log.isInfoEnabled() && removedCount > 0) { + log.info("Cleaned up {} expired entries from schema cache", removedCount); + } + } + + /** + * Clear all recorded access times. + */ + public void clear() { + lastAccessTimes.clear(); + if (log.isDebugEnabled()) { + log.debug("Cleared all recorded access times"); + } + } + + /** + * Get the number of entries being tracked. + * + * @return number of entries with recorded access times + */ + public int getTrackedEntryCount() { + return lastAccessTimes.size(); + } + + /** + * Get the expiration time in milliseconds. + * + * @return expiration time in milliseconds + */ + public long getExpireTimeMillis() { + return expireTimeMillis; + } + + /** + * Check if a class has been accessed. + * + * @param clazz the class to check + * @return true if the class has a recorded access time + */ + public boolean hasAccessRecord(Class clazz) { + return lastAccessTimes.containsKey(clazz); + } + + /** + * Get the last access time for a class. + * + * @param clazz the class to check + * @return last access time in milliseconds, or null if never accessed + */ + public Long getLastAccessTime(Class clazz) { + return lastAccessTimes.get(clazz); + } +} \ No newline at end of file diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaCacheException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaCacheException.java new file mode 100644 index 0000000000000..70d3c5874586d --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaCacheException.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Exception thrown when schema cache operations fail. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class SchemaCacheException extends PulsarClientException { + private static final long serialVersionUID = 1L; + + /** + * Constructs an exception with the specified error message. + * + * @param message the error message + */ + public SchemaCacheException(String message) { + super(message); + } + + /** + * Constructs an exception with the specified error message and cause. + * + * @param message the error message + * @param cause the cause of the error + */ + public SchemaCacheException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs an exception with the specified cause. + * + * @param cause the cause of the error + */ + public SchemaCacheException(Throwable cause) { + super(cause); + } + + /** + * Creates an exception indicating schema creation failure. + * + * @param className the name of the class for which schema creation failed + * @param cause the cause of the failure + * @return a new SchemaCacheException instance + */ + public static SchemaCacheException schemaCreationFailed(String className, Throwable cause) { + return new SchemaCacheException( + String.format("Failed to create schema for class: %s", className), cause); + } + + /** + * Creates an exception indicating schema cloning failure. + * + * @param className the name of the class for which schema cloning failed + * @param cause the cause of the failure + * @return a new SchemaCacheException instance + */ + public static SchemaCacheException schemaCloneFailed(String className, Throwable cause) { + return new SchemaCacheException( + String.format("Failed to clone schema for class: %s", className), cause); + } + + /** + * Creates an exception indicating cache initialization failure. + * + * @param cause the cause of the failure + * @return a new SchemaCacheException instance + */ + public static SchemaCacheException initializationFailed(Throwable cause) { + return new SchemaCacheException("Failed to initialize schema cache", cause); + } + + /** + * Creates an exception indicating cache configuration error. + * + * @param message the error message describing the configuration problem + * @return a new SchemaCacheException instance + */ + public static SchemaCacheException invalidConfiguration(String message) { + return new SchemaCacheException("Invalid schema cache configuration: " + message); + } + + /** + * Creates an exception indicating cache access error. + * + * @param operation the operation that failed + * @param cause the cause of the failure + * @return a new SchemaCacheException instance + */ + public static SchemaCacheException operationFailed(String operation, Throwable cause) { + return new SchemaCacheException( + String.format("Schema cache operation failed: %s", operation), cause); + } + + @Override + public String toString() { + if (getCause() != null) { + return String.format("SchemaCacheException: %s, caused by: %s", + getMessage(), getCause().toString()); + } + return String.format("SchemaCacheException: %s", getMessage()); + } +} \ No newline at end of file diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemeCache.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemeCache.java new file mode 100644 index 0000000000000..e39eb5be7dc1a --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemeCache.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.HashMap; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.internal.DefaultImplementation; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Cache for Schema instances to improve performance by reusing schema objects. + */ +@Slf4j +@InterfaceAudience.Public +@InterfaceStability.Stable +public class SchemaCache { + + private final ConcurrentHashMap, Schema>> typeSchemaCache; + private final ReentrantReadWriteLock lock; + private final SchemaCacheConfig config; + private final SchemaCacheCleanupStrategy cleanupStrategy; + + public SchemaCache(SchemaCacheConfig config) { + this.typeSchemaCache = new ConcurrentHashMap<>(); + this.lock = new ReentrantReadWriteLock(); + this.config = config; + this.cleanupStrategy = new SchemaCacheCleanupStrategy(config.getExpireSeconds()); + + if (log.isInfoEnabled()) { + log.info("Initializing schema cache with config: {}", config); + } + } + + /** + * Get or create a schema instance from cache. + * + * @param clazz class to create schema for + * @param type schema type + * @param creator function to create new schema if not found in cache + * @return schema instance (either from cache or newly created) + */ + @SuppressWarnings("unchecked") + public Schema getOrCreateSchema(Class clazz, SchemaType type, + Function, Schema> creator) { + if (!config.isEnabled()) { + if (log.isDebugEnabled()) { + log.debug("Schema cache is disabled, creating new schema for class: {}", clazz.getName()); + } + return creator.apply(clazz); + } + + WeakHashMap, Schema> schemaCache = typeSchemaCache + .computeIfAbsent(type, k -> new WeakHashMap<>()); + + // Try read lock first + lock.readLock().lock(); + try { + Schema cachedSchema = (Schema) schemaCache.get(clazz); + if (cachedSchema != null) { + cleanupStrategy.recordAccess(clazz); + SchemaCacheMetrics.recordHit(); + if (log.isDebugEnabled()) { + log.debug("Schema cache hit for class: {}", clazz.getName()); + } + return cloneSchema(cachedSchema); + } + } finally { + lock.readLock().unlock(); + } + + // Use write lock to create new schema + lock.writeLock().lock(); + try { + // Double-check after acquiring write lock + Schema cachedSchema = (Schema) schemaCache.get(clazz); + if (cachedSchema != null) { + cleanupStrategy.recordAccess(clazz); + SchemaCacheMetrics.recordHit(); + if (log.isDebugEnabled()) { + log.debug("Schema cache hit after write lock for class: {}", clazz.getName()); + } + return cloneSchema(cachedSchema); + } + + // Check cache size and cleanup if needed + if (schemaCache.size() >= config.getMaxSize()) { + if (log.isDebugEnabled()) { + log.debug("Schema cache size ({}) reached limit, performing cleanup", schemaCache.size()); + } + cleanupStrategy.cleanup(schemaCache); + } + + // Create and cache new schema + long startTime = System.currentTimeMillis(); + Schema newSchema; + try { + newSchema = creator.apply(clazz); + } catch (Exception e) { + SchemaCacheMetrics.recordLoadError(); + log.error("Failed to create schema for class: " + clazz.getName(), e); + throw new SchemaCacheException("Failed to create schema for class: " + clazz.getName(), e); + } + long loadTime = System.currentTimeMillis() - startTime; + SchemaCacheMetrics.recordLoadTime(loadTime); + + schemaCache.put(clazz, newSchema); + cleanupStrategy.recordAccess(clazz); + SchemaCacheMetrics.recordMiss(); + SchemaCacheMetrics.updateSize(schemaCache.size()); + + if (log.isDebugEnabled()) { + log.debug("Created and cached new schema for class: {} (load time: {} ms)", + clazz.getName(), loadTime); + } + + return cloneSchema(newSchema); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Create a deep clone of the schema instance. + * + * @param original schema to clone + * @return cloned schema instance + */ + private static Schema cloneSchema(Schema original) { + SchemaInfo originalInfo = original.getSchemaInfo(); + SchemaInfo clonedInfo = SchemaInfo.builder() + .name(originalInfo.getName()) + .schema(originalInfo.getSchema().clone()) + .type(originalInfo.getType()) + .properties(new HashMap<>(originalInfo.getProperties())) + .timestamp(originalInfo.getTimestamp()) + .build(); + + return DefaultImplementation + .getDefaultImplementation() + .getSchema(clonedInfo); + } + + /** + * Clear all entries from the cache. + */ + public void clear() { + lock.writeLock().lock(); + try { + typeSchemaCache.clear(); + SchemaCacheMetrics.updateSize(0); + if (log.isInfoEnabled()) { + log.info("Schema cache cleared"); + } + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Get current cache statistics. + * + * @return map of metric name to value + */ + public Map getStats() { + return SchemaCacheMetrics.getMetrics(); + } +} \ No newline at end of file diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemeCacheConfig.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemeCacheConfig.java new file mode 100644 index 0000000000000..774f01dae33d7 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemeCacheConfig.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Map; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import lombok.Data; +import lombok.experimental.Accessors; +import org.apache.pulsar.common.configuration.FieldContext; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; + +/** + * Configuration for schema cache. + */ +@Data +@Accessors(chain = true) +@InterfaceAudience.Public +@InterfaceStability.Stable +public class SchemaCacheConfig { + + /** + * Maximum number of schema instances to cache. + */ + public static final String SCHEMA_CACHE_SIZE = "schemaCache.maxSize"; + + /** + * Whether schema caching is enabled. + */ + public static final String SCHEMA_CACHE_ENABLED = "schemaCache.enabled"; + + /** + * Time in seconds after which cached schema instances expire. + */ + public static final String SCHEMA_CACHE_EXPIRE_SECONDS = "schemaCache.expireSeconds"; + + private static final int DEFAULT_MAX_SIZE = 1000; + private static final boolean DEFAULT_ENABLED = true; + private static final long DEFAULT_EXPIRE_SECONDS = 3600; + + @Min(1) + @NotNull + @FieldContext( + doc = "Maximum number of schema instances to cache", + required = false + ) + private int maxSize = DEFAULT_MAX_SIZE; + + @NotNull + @FieldContext( + doc = "Whether schema caching is enabled", + required = false + ) + private boolean enabled = DEFAULT_ENABLED; + + @Min(1) + @NotNull + @FieldContext( + doc = "Time in seconds after which cached schema instances expire", + required = false + ) + private long expireSeconds = DEFAULT_EXPIRE_SECONDS; + + public SchemaCacheConfig() { + // For deserialization + } + + private SchemaCacheConfig(int maxSize, boolean enabled, long expireSeconds) { + this.maxSize = maxSize; + this.enabled = enabled; + this.expireSeconds = expireSeconds; + } + + /** + * Create a new config instance from properties. + * + * @param properties configuration properties + * @return new config instance + */ + public static SchemaCacheConfig create(Map properties) { + int maxSize = getIntValue(properties, SCHEMA_CACHE_SIZE, DEFAULT_MAX_SIZE); + boolean enabled = getBooleanValue(properties, SCHEMA_CACHE_ENABLED, DEFAULT_ENABLED); + long expireSeconds = getLongValue(properties, SCHEMA_CACHE_EXPIRE_SECONDS, DEFAULT_EXPIRE_SECONDS); + + return new SchemaCacheConfig(maxSize, enabled, expireSeconds); + } + + private static int getIntValue(Map properties, String key, int defaultValue) { + Object value = properties.get(key); + if (value == null) { + return defaultValue; + } + if (value instanceof Number) { + return ((Number) value).intValue(); + } + return Integer.parseInt(value.toString()); + } + + private static boolean getBooleanValue(Map properties, String key, boolean defaultValue) { + Object value = properties.get(key); + if (value == null) { + return defaultValue; + } + if (value instanceof Boolean) { + return (Boolean) value; + } + return Boolean.parseBoolean(value.toString()); + } + + private static long getLongValue(Map properties, String key, long defaultValue) { + Object value = properties.get(key); + if (value == null) { + return defaultValue; + } + if (value instanceof Number) { + return ((Number) value).longValue(); + } + return Long.parseLong(value.toString()); + } + + /** + * Validates the configuration values. + * + * @throws IllegalArgumentException if any configuration value is invalid + */ + public void validate() { + if (maxSize < 1) { + throw new IllegalArgumentException("maxSize must be greater than 0"); + } + if (expireSeconds < 1) { + throw new IllegalArgumentException("expireSeconds must be greater than 0"); + } + } + + @Override + public String toString() { + return "SchemaCacheConfig(" + + "maxSize=" + maxSize + + ", enabled=" + enabled + + ", expireSeconds=" + expireSeconds + + ")"; + } +} \ No newline at end of file diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemeMetrics.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemeMetrics.java new file mode 100644 index 0000000000000..a26ef99a16d4c --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemeMetrics.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import lombok.experimental.UtilityClass; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Metrics collector for schema cache performance monitoring. + */ +@UtilityClass +@InterfaceAudience.Public +@InterfaceStability.Stable +public class SchemaCacheMetrics { + private static final Logger log = LoggerFactory.getLogger(SchemaCacheMetrics.class); + + private static final AtomicLong CACHE_HITS = new AtomicLong(); + private static final AtomicLong CACHE_MISSES = new AtomicLong(); + private static final AtomicLong CACHE_EVICTIONS = new AtomicLong(); + private static final AtomicLong CACHE_SIZE = new AtomicLong(); + private static final AtomicLong CACHE_LOAD_ERRORS = new AtomicLong(); + private static final AtomicLong CACHE_LOAD_TIME_TOTAL = new AtomicLong(); + + /** + * Metric names. + */ + public static final class Names { + public static final String HITS = "hits"; + public static final String MISSES = "misses"; + public static final String EVICTIONS = "evictions"; + public static final String SIZE = "size"; + public static final String HIT_RATE = "hitRate"; + public static final String LOAD_ERRORS = "loadErrors"; + public static final String AVG_LOAD_TIME = "avgLoadTime"; + + private Names() {} + } + + /** + * Record a cache hit. + */ + public static void recordHit() { + CACHE_HITS.incrementAndGet(); + if (log.isDebugEnabled()) { + log.debug("Schema cache hit recorded"); + } + } + + /** + * Record a cache miss. + */ + public static void recordMiss() { + CACHE_MISSES.incrementAndGet(); + if (log.isDebugEnabled()) { + log.debug("Schema cache miss recorded"); + } + } + + /** + * Record a cache eviction. + */ + public static void recordEviction() { + CACHE_EVICTIONS.incrementAndGet(); + if (log.isDebugEnabled()) { + log.debug("Schema cache eviction recorded"); + } + } + + /** + * Record a schema load error. + */ + public static void recordLoadError() { + CACHE_LOAD_ERRORS.incrementAndGet(); + if (log.isDebugEnabled()) { + log.debug("Schema load error recorded"); + } + } + + /** + * Record schema load time in milliseconds. + * + * @param loadTimeMs time taken to load schema in milliseconds + */ + public static void recordLoadTime(long loadTimeMs) { + CACHE_LOAD_TIME_TOTAL.addAndGet(loadTimeMs); + if (log.isDebugEnabled()) { + log.debug("Schema load time recorded: {} ms", loadTimeMs); + } + } + + /** + * Update current cache size. + * + * @param size current number of entries in cache + */ + public static void updateSize(int size) { + CACHE_SIZE.set(size); + if (log.isDebugEnabled()) { + log.debug("Schema cache size updated to: {}", size); + } + } + + /** + * Reset all metrics to zero. + */ + public static void reset() { + CACHE_HITS.set(0); + CACHE_MISSES.set(0); + CACHE_EVICTIONS.set(0); + CACHE_SIZE.set(0); + CACHE_LOAD_ERRORS.set(0); + CACHE_LOAD_TIME_TOTAL.set(0); + if (log.isDebugEnabled()) { + log.debug("Schema cache metrics reset"); + } + } + + /** + * Get all current metric values. + * + * @return map of metric name to value + */ + public static Map getMetrics() { + Map metrics = new HashMap<>(); + metrics.put(Names.HITS, CACHE_HITS.get()); + metrics.put(Names.MISSES, CACHE_MISSES.get()); + metrics.put(Names.EVICTIONS, CACHE_EVICTIONS.get()); + metrics.put(Names.SIZE, CACHE_SIZE.get()); + metrics.put(Names.HIT_RATE, calculateHitRate()); + metrics.put(Names.LOAD_ERRORS, CACHE_LOAD_ERRORS.get()); + metrics.put(Names.AVG_LOAD_TIME, calculateAverageLoadTime()); + return Collections.unmodifiableMap(metrics); + } + + /** + * Calculate cache hit rate as percentage. + * + * @return hit rate between 0 and 100 + */ + private static double calculateHitRate() { + long hits = CACHE_HITS.get(); + long total = hits + CACHE_MISSES.get(); + if (total == 0) { + return 0.0; + } + return (double) hits * 100 / total; + } + + /** + * Calculate average schema load time in milliseconds. + * + * @return average load time in milliseconds + */ + private static double calculateAverageLoadTime() { + long totalTime = CACHE_LOAD_TIME_TOTAL.get(); + long misses = CACHE_MISSES.get(); + if (misses == 0) { + return 0.0; + } + return (double) totalTime / misses; + } + + /** + * Get current cache size. + * + * @return number of entries in cache + */ + public static long getSize() { + return CACHE_SIZE.get(); + } + + /** + * Get total number of cache hits. + * + * @return total number of cache hits + */ + public static long getHits() { + return CACHE_HITS.get(); + } + + /** + * Get total number of cache misses. + * + * @return total number of cache misses + */ + public static long getMisses() { + return CACHE_MISSES.get(); + } + + /** + * Get total number of cache evictions. + * + * @return total number of cache evictions + */ + public static long getEvictions() { + return CACHE_EVICTIONS.get(); + } +} \ No newline at end of file