diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java b/src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java new file mode 100644 index 00000000..b36cdb74 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.cache; + +import java.net.InetAddress; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; + +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; + +/** + * Cache to hold geo data + */ +public class GeoDataCache { + private Cache> cache; + + public GeoDataCache(final long maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater"); + } + this.cache = CacheBuilder.>builder().setMaximumWeight(maxSize).build(); + } + + public Map putIfAbsent( + final String datasourceName, + final InetAddress ip, + final Function> retrieveFunction + ) throws ExecutionException { + CacheKey cacheKey = new CacheKey(datasourceName, ip); + return cache.computeIfAbsent(cacheKey, key -> retrieveFunction.apply(key.ip)); + } + + public Map get(final String datasourceName, final InetAddress ip) { + return cache.get(new CacheKey(datasourceName, ip)); + } + + /** + * Create a new cache with give size and replace existing cache + * + * Try to populate the existing value from previous cache to the new cache in best effort + * + * @param maxSize + */ + public void updateMaxSize(final long maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater"); + } + Cache> temp = CacheBuilder.>builder().setMaximumWeight(maxSize).build(); + int count = 0; + Iterator it = cache.keys().iterator(); + while (it.hasNext() && count < maxSize) { + GeoDataCache.CacheKey key = it.next(); + temp.put(key, cache.get(key)); + count++; + } + cache = temp; + } + + /** + * Invalidate all cached values of a given datasource + * + * @param datasourceName + */ + public void invalidate(final String datasourceName) { + Iterator it = cache.keys().iterator(); + while (it.hasNext()) { + CacheKey cacheKey = it.next(); + if (datasourceName.equals(cacheKey.datasourceName)) { + cache.invalidate(cacheKey); + } + } + } + + @AllArgsConstructor + @EqualsAndHashCode + private static class CacheKey { + private final String datasourceName; + private final InetAddress ip; + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java new file mode 100644 index 00000000..9e9eb62f --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java @@ -0,0 +1,145 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.cache; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import lombok.SneakyThrows; + +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +public class GeoDataCacheTests extends Ip2GeoTestCase { + + /** + * Test a correctness of invalidate + * + * 1. Fill the cache with old data with datasource 1 + * 2. Run three threads which will + * add value for datasource 1 with new data + * add value for datasource 2 with new data + * invalidate old data for datasource 1 + * 3. Verify all old data for datasource 1 is invalidated + */ + @SneakyThrows + public void testInvalidate_whenCalled_thenInvalidate() { + int cacheSize = 100000; + GeoDataCache geoDataCache = new GeoDataCache(cacheSize); + String datasource1 = "datasource1"; + String datasource2 = "datasource2"; + List ips = new ArrayList<>(cacheSize); + + // Fill the cache with old data for datasource 1 + Map oldData = Map.of("old", "old"); + for (int i = 0; i < cacheSize; i++) { + InetAddress ip = randomIp(false); + ips.add(ip); + geoDataCache.putIfAbsent(datasource1, ip, addr -> oldData); + } + + ExecutorService executor = Executors.newFixedThreadPool(3); + Map newData = Map.of("new", "new"); + executor.submit(() -> { + while (executor.isShutdown() == false) { + try { + InetAddress ip = randomIp(false); + geoDataCache.putIfAbsent(datasource1, ip, addr -> newData); + Thread.yield(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.submit(() -> { + while (executor.isShutdown() == false) { + try { + InetAddress ip = randomIp(false); + geoDataCache.putIfAbsent(datasource2, ip, addr -> newData); + Thread.yield(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + Future task = executor.submit(() -> { geoDataCache.invalidate(datasource1); }); + + while (task.isDone() == false) { + Thread.yield(); + } + executor.shutdown(); + + // Verify + assertTrue( + ips.stream() + .filter(ip -> geoDataCache.get(datasource1, ip) != null) + .noneMatch(ip -> geoDataCache.get(datasource1, ip).containsKey("old")) + ); + } + + @SneakyThrows + public void testUpdateMaxSize_whenBiggerSize_thenContainsAllData() { + int cacheSize = 10; + String datasource = GeospatialTestHelper.randomLowerCaseString(); + GeoDataCache geoDataCache = new GeoDataCache(cacheSize); + List ips = new ArrayList<>(cacheSize); + for (int i = 0; i < cacheSize; i++) { + InetAddress ip = randomIp(false); + ips.add(ip); + geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap()); + } + + // Verify all data exist in the cache + assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null)); + + // Update cache size + int newCacheSize = 15; + geoDataCache.updateMaxSize(newCacheSize); + + // Verify all data exist in the cache + assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null)); + + // Add (newCacheSize - cacheSize + 1) data and the first data should not be available in the cache + for (int i = 0; i < newCacheSize - cacheSize + 1; i++) { + geoDataCache.putIfAbsent(datasource, randomIp(false), addr -> Collections.emptyMap()); + } + assertNull(geoDataCache.get(datasource, ips.get(0))); + } + + @SneakyThrows + public void testUpdateMaxSize_whenSmallerSize_thenContainsPartialData() { + int cacheSize = 10; + String datasource = GeospatialTestHelper.randomLowerCaseString(); + GeoDataCache geoDataCache = new GeoDataCache(cacheSize); + List ips = new ArrayList<>(cacheSize); + for (int i = 0; i < cacheSize; i++) { + InetAddress ip = randomIp(false); + ips.add(ip); + geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap()); + } + + // Verify all data exist in the cache + assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null)); + + // Update cache size + int newCacheSize = 5; + geoDataCache.updateMaxSize(newCacheSize); + + // Verify the last (cacheSize - newCacheSize) data is available in the cache + List deleted = ips.subList(0, ips.size() - newCacheSize); + List retained = ips.subList(ips.size() - newCacheSize, ips.size()); + assertTrue(deleted.stream().allMatch(ip -> geoDataCache.get(datasource, ip) == null)); + assertTrue(retained.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null)); + } +}