Skip to content

Commit

Permalink
Add geo data cache
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jun 22, 2023
1 parent a04eeeb commit 0ab004f
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;

import java.net.InetAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;

import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;

/**
* Cache to hold geo data
*/
public class GeoDataCache {
private Cache<CacheKey, Map<String, Object>> cache;

public GeoDataCache(final long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater");
}
this.cache = CacheBuilder.<CacheKey, Map<String, Object>>builder().setMaximumWeight(maxSize).build();
}

public Map<String, Object> putIfAbsent(
final String datasourceName,
final InetAddress ip,
final Function<InetAddress, Map<String, Object>> retrieveFunction
) throws ExecutionException {
CacheKey cacheKey = new CacheKey(datasourceName, ip);
return cache.computeIfAbsent(cacheKey, key -> retrieveFunction.apply(key.ip));
}

public Map<String, Object> get(final String datasourceName, final InetAddress ip) {
return cache.get(new CacheKey(datasourceName, ip));
}

/**
* Create a new cache with give size and replace existing cache
*
* Try to populate the existing value from previous cache to the new cache in best effort
*
* @param maxSize
*/
public void updateMaxSize(final long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater");
}
Cache<CacheKey, Map<String, Object>> temp = CacheBuilder.<CacheKey, Map<String, Object>>builder().setMaximumWeight(maxSize).build();
int count = 0;
Iterator<GeoDataCache.CacheKey> it = cache.keys().iterator();
while (it.hasNext() && count < maxSize) {
GeoDataCache.CacheKey key = it.next();
temp.put(key, cache.get(key));
count++;
}
cache = temp;
}

/**
* Invalidate all cached values of a given datasource
*
* @param datasourceName
*/
public void invalidate(final String datasourceName) {
Iterator<CacheKey> it = cache.keys().iterator();
while (it.hasNext()) {
CacheKey cacheKey = it.next();
if (datasourceName.equals(cacheKey.datasourceName)) {
cache.invalidate(cacheKey);
}
}
}

@AllArgsConstructor
@EqualsAndHashCode
private static class CacheKey {
private final String datasourceName;
private final InetAddress ip;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import lombok.SneakyThrows;

import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;

public class GeoDataCacheTests extends Ip2GeoTestCase {

/**
* Test a correctness of invalidate
*
* 1. Fill the cache with old data with datasource 1
* 2. Run three threads which will
* add value for datasource 1 with new data
* add value for datasource 2 with new data
* invalidate old data for datasource 1
* 3. Verify all old data for datasource 1 is invalidated
*/
@SneakyThrows
public void testInvalidate_whenCalled_thenInvalidate() {
int cacheSize = 100000;
GeoDataCache geoDataCache = new GeoDataCache(cacheSize);
String datasource1 = "datasource1";
String datasource2 = "datasource2";
List<InetAddress> ips = new ArrayList<>(cacheSize);

// Fill the cache with old data for datasource 1
Map<String, Object> oldData = Map.of("old", "old");
for (int i = 0; i < cacheSize; i++) {
InetAddress ip = randomIp(false);
ips.add(ip);
geoDataCache.putIfAbsent(datasource1, ip, addr -> oldData);
}

ExecutorService executor = Executors.newFixedThreadPool(3);
Map<String, Object> newData = Map.of("new", "new");
executor.submit(() -> {
while (executor.isShutdown() == false) {
try {
InetAddress ip = randomIp(false);
geoDataCache.putIfAbsent(datasource1, ip, addr -> newData);
Thread.yield();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});

executor.submit(() -> {
while (executor.isShutdown() == false) {
try {
InetAddress ip = randomIp(false);
geoDataCache.putIfAbsent(datasource2, ip, addr -> newData);
Thread.yield();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});

Future task = executor.submit(() -> { geoDataCache.invalidate(datasource1); });

while (task.isDone() == false) {
Thread.yield();
}
executor.shutdown();

// Verify
assertTrue(
ips.stream()
.filter(ip -> geoDataCache.get(datasource1, ip) != null)
.noneMatch(ip -> geoDataCache.get(datasource1, ip).containsKey("old"))
);
}

@SneakyThrows
public void testUpdateMaxSize_whenBiggerSize_thenContainsAllData() {
int cacheSize = 10;
String datasource = GeospatialTestHelper.randomLowerCaseString();
GeoDataCache geoDataCache = new GeoDataCache(cacheSize);
List<InetAddress> ips = new ArrayList<>(cacheSize);
for (int i = 0; i < cacheSize; i++) {
InetAddress ip = randomIp(false);
ips.add(ip);
geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap());
}

// Verify all data exist in the cache
assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null));

// Update cache size
int newCacheSize = 15;
geoDataCache.updateMaxSize(newCacheSize);

// Verify all data exist in the cache
assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null));

// Add (newCacheSize - cacheSize + 1) data and the first data should not be available in the cache
for (int i = 0; i < newCacheSize - cacheSize + 1; i++) {
geoDataCache.putIfAbsent(datasource, randomIp(false), addr -> Collections.emptyMap());
}
assertNull(geoDataCache.get(datasource, ips.get(0)));
}

@SneakyThrows
public void testUpdateMaxSize_whenSmallerSize_thenContainsPartialData() {
int cacheSize = 10;
String datasource = GeospatialTestHelper.randomLowerCaseString();
GeoDataCache geoDataCache = new GeoDataCache(cacheSize);
List<InetAddress> ips = new ArrayList<>(cacheSize);
for (int i = 0; i < cacheSize; i++) {
InetAddress ip = randomIp(false);
ips.add(ip);
geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap());
}

// Verify all data exist in the cache
assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null));

// Update cache size
int newCacheSize = 5;
geoDataCache.updateMaxSize(newCacheSize);

// Verify the last (cacheSize - newCacheSize) data is available in the cache
List<InetAddress> deleted = ips.subList(0, ips.size() - newCacheSize);
List<InetAddress> retained = ips.subList(ips.size() - newCacheSize, ips.size());
assertTrue(deleted.stream().allMatch(ip -> geoDataCache.get(datasource, ip) == null));
assertTrue(retained.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null));
}
}

0 comments on commit 0ab004f

Please sign in to comment.