From c6ed538fc2a0fa26d7b17348c24e9a8917cc10a6 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Thu, 22 Jun 2023 12:14:03 -0700 Subject: [PATCH] Add geo data cache Signed-off-by: Heemin Kim --- .../geospatial/ip2geo/cache/GeoDataCache.java | 78 ++++++++++++ .../ip2geo/cache/GeoDataCacheTests.java | 120 ++++++++++++++++++ 2 files changed, 198 insertions(+) create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java b/src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java new file mode 100644 index 00000000..408e9edb --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCache.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.cache; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; + +import java.net.InetAddress; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +public class GeoDataCache { + private Cache> cache; + + // package private for testing + public GeoDataCache(final long maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater"); + } + this.cache = CacheBuilder.>builder().setMaximumWeight(maxSize).build(); + } + + public Map putIfAbsent ( + final String datasourceName, + final InetAddress ip, + final Function> retrieveFunction + ) throws ExecutionException { + CacheKey cacheKey = new CacheKey(datasourceName, ip); + return cache.computeIfAbsent(cacheKey, key -> retrieveFunction.apply(key.ip)); + } + + public Map get ( + final String datasourceName, + final InetAddress ip) { + return cache.get(new CacheKey(datasourceName, ip)); + } + + /** + * Create a new cache with give size and replace existing cache + * + * @param maxSize + */ + public void updateMaxSize(final long maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater"); + } + this.cache = CacheBuilder.>builder().setMaximumWeight(maxSize).build(); + } + + /** + * Invalidate all cached values of a given datasource + * + * @param datasourceName + */ + public void invalidate(final String datasourceName) { + Iterator it = cache.keys().iterator(); + while (it.hasNext()) { + CacheKey cacheKey = it.next(); + if (datasourceName.equals(cacheKey.datasourceName)) { + cache.invalidate(cacheKey); + } + } + } + + @AllArgsConstructor + @EqualsAndHashCode + private static class CacheKey { + private final String datasourceName; + private final InetAddress ip; + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java new file mode 100644 index 00000000..194c81cb --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/cache/GeoDataCacheTests.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.cache; + +import lombok.SneakyThrows; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class GeoDataCacheTests extends Ip2GeoTestCase { + + /** + * Test a correctness of invalidate + * + * 1. Fill the cache with old data with datasource 1 + * 2. Run three threads which will + * add value for datasource 1 with new data + * add value for datasource 2 with new data + * invalidate old data for datasource 1 + * 3. Verify all old data for datasource 1 is invalidated + */ + @SneakyThrows + public void testInvalidate_whenCalled_thenInvalidate() { + int cacheSize = 100000; + GeoDataCache geoDataCache = new GeoDataCache(cacheSize); + String datasource1 = "datasource1"; + String datasource2 = "datasource2"; + List ips = new ArrayList<>(cacheSize); + + // Fill the cache with old data for datasource 1 + Map oldData = Map.of("old", "old"); + for (int i = 0; i < cacheSize; i++) { + InetAddress ip = randomIp(false); + ips.add(ip); + geoDataCache.putIfAbsent(datasource1, ip, addr -> oldData); + } + + ExecutorService executor = Executors.newFixedThreadPool(3); + Map newData = Map.of("new", "new"); + executor.submit(() -> { + while (executor.isShutdown() == false) { + try { + InetAddress ip = randomIp(false); + geoDataCache.putIfAbsent(datasource1, ip, addr -> newData); + Thread.yield(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.submit(() -> { + while (executor.isShutdown() == false) { + try { + InetAddress ip = randomIp(false); + geoDataCache.putIfAbsent(datasource2, ip, addr -> newData); + Thread.yield(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + Future task = executor.submit(() -> { + geoDataCache.invalidate(datasource1); + }); + + while (task.isDone() == false) { + Thread.yield(); + } + executor.shutdown(); + + // Verify + assertTrue(ips.stream().filter(ip -> geoDataCache.get(datasource1, ip) != null).noneMatch(ip -> geoDataCache.get(datasource1, ip).containsKey("old"))); + } + + @SneakyThrows + public void testUpdateMaxSize_whenCalled_thenNewCacheIsCreated() { + int cacheSize = 10; + String datasource = GeospatialTestHelper.randomLowerCaseString(); + GeoDataCache geoDataCache = new GeoDataCache(cacheSize); + List ips = new ArrayList<>(cacheSize); + for (int i = 0; i < cacheSize; i++) { + InetAddress ip = randomIp(false); + ips.add(ip); + geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap()); + } + + // Verify all data exist in the cache + assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null)); + + // Update cache size + int newCacheSize = 5; + geoDataCache.updateMaxSize(newCacheSize); + + // Verify all data is gone from the cache + assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) == null)); + + // Add cache size + 1 items + InetAddress firstIp = randomIp(false); + geoDataCache.putIfAbsent(datasource, firstIp, addr -> Collections.emptyMap()); + for (int i = 0; i < cacheSize; i++) { + geoDataCache.putIfAbsent(datasource, randomIp(false), addr -> Collections.emptyMap()); + } + + // Verify the first item is not available in the cache + assertNull(geoDataCache.get(datasource, firstIp)); + } +} \ No newline at end of file