diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java b/src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java new file mode 100644 index 00000000..23ffa571 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.cache; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; + +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; + +/** + * Cache to hold geo data + * + * GeoData in an index in immutable. Therefore, invalidation is not needed. + */ +public class GeoDataCache { + private Cache> cache; + + public GeoDataCache(final long maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater"); + } + this.cache = CacheBuilder.>builder().setMaximumWeight(maxSize).build(); + } + + public Map putIfAbsent( + final String indexName, + final String ip, + final Function> retrieveFunction + ) throws ExecutionException { + CacheKey cacheKey = new CacheKey(indexName, ip); + return cache.computeIfAbsent(cacheKey, key -> retrieveFunction.apply(key.ip)); + } + + public Map get(final String indexName, final String ip) { + return cache.get(new CacheKey(indexName, ip)); + } + + /** + * Create a new cache with give size and replace existing cache + * + * Try to populate the existing value from previous cache to the new cache in best effort + * + * @param maxSize + */ + public void updateMaxSize(final long maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater"); + } + Cache> temp = CacheBuilder.>builder().setMaximumWeight(maxSize).build(); + int count = 0; + Iterator it = cache.keys().iterator(); + while (it.hasNext() && count < maxSize) { + GeoDataCache.CacheKey key = it.next(); + temp.put(key, cache.get(key)); + count++; + } + cache = temp; + } + + @AllArgsConstructor + @EqualsAndHashCode + private static class CacheKey { + private final String indexName; + private final String ip; + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java new file mode 100644 index 00000000..8082c3cd --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import lombok.SneakyThrows; + +import org.opensearch.common.network.NetworkAddress; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +public class GeoDataCacheTests extends Ip2GeoTestCase { + @SneakyThrows + public void testUpdateMaxSize_whenBiggerSize_thenContainsAllData() { + int cacheSize = 10; + String datasource = GeospatialTestHelper.randomLowerCaseString(); + GeoDataCache geoDataCache = new GeoDataCache(cacheSize); + List ips = new ArrayList<>(cacheSize); + for (int i = 0; i < cacheSize; i++) { + String ip = NetworkAddress.format(randomIp(false)); + ips.add(ip); + geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap()); + } + + // Verify all data exist in the cache + assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null)); + + // Update cache size + int newCacheSize = 15; + geoDataCache.updateMaxSize(newCacheSize); + + // Verify all data exist in the cache + assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null)); + + // Add (newCacheSize - cacheSize + 1) data and the first data should not be available in the cache + for (int i = 0; i < newCacheSize - cacheSize + 1; i++) { + geoDataCache.putIfAbsent(datasource, NetworkAddress.format(randomIp(false)), addr -> Collections.emptyMap()); + } + assertNull(geoDataCache.get(datasource, ips.get(0))); + } + + @SneakyThrows + public void testUpdateMaxSize_whenSmallerSize_thenContainsPartialData() { + int cacheSize = 10; + String datasource = GeospatialTestHelper.randomLowerCaseString(); + GeoDataCache geoDataCache = new GeoDataCache(cacheSize); + List ips = new ArrayList<>(cacheSize); + for (int i = 0; i < cacheSize; i++) { + String ip = NetworkAddress.format(randomIp(false)); + ips.add(ip); + geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap()); + } + + // Verify all data exist in the cache + assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null)); + + // Update cache size + int newCacheSize = 5; + geoDataCache.updateMaxSize(newCacheSize); + + // Verify the last (cacheSize - newCacheSize) data is available in the cache + List deleted = ips.subList(0, ips.size() - newCacheSize); + List retained = ips.subList(ips.size() - newCacheSize, ips.size()); + assertTrue(deleted.stream().allMatch(ip -> geoDataCache.get(datasource, ip) == null)); + assertTrue(retained.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null)); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java index d08dc19a..4e8256c2 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java @@ -44,7 +44,10 @@ public void init() { } public void testRunJob_whenInvalidClass_thenThrowException() { - JobExecutionContext jobExecutionContext = mock(JobExecutionContext.class); + JobDocVersion jobDocVersion = new JobDocVersion(randomInt(), randomInt(), randomInt()); + String jobIndexName = randomLowerCaseString(); + String jobId = randomLowerCaseString(); + JobExecutionContext jobExecutionContext = new JobExecutionContext(Instant.now(), jobDocVersion, lockService, jobIndexName, jobId); ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class); expectThrows(IllegalStateException.class, () -> DatasourceRunner.getJobRunnerInstance().runJob(jobParameter, jobExecutionContext)); }