Skip to content

Commit

Permalink
Add geo data cache
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jun 22, 2023
1 parent a04eeeb commit c6ed538
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;

import java.net.InetAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

public class GeoDataCache {
private Cache<CacheKey, Map<String, Object>> cache;

// package private for testing
public GeoDataCache(final long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater");
}
this.cache = CacheBuilder.<CacheKey, Map<String, Object>>builder().setMaximumWeight(maxSize).build();
}

public Map<String, Object> putIfAbsent (
final String datasourceName,
final InetAddress ip,
final Function<InetAddress, Map<String, Object>> retrieveFunction
) throws ExecutionException {
CacheKey cacheKey = new CacheKey(datasourceName, ip);
return cache.computeIfAbsent(cacheKey, key -> retrieveFunction.apply(key.ip));
}

public Map<String, Object> get (
final String datasourceName,
final InetAddress ip) {
return cache.get(new CacheKey(datasourceName, ip));
}

/**
* Create a new cache with give size and replace existing cache
*
* @param maxSize
*/
public void updateMaxSize(final long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater");
}
this.cache = CacheBuilder.<CacheKey, Map<String, Object>>builder().setMaximumWeight(maxSize).build();
}

/**
* Invalidate all cached values of a given datasource
*
* @param datasourceName
*/
public void invalidate(final String datasourceName) {
Iterator<CacheKey> it = cache.keys().iterator();
while (it.hasNext()) {
CacheKey cacheKey = it.next();
if (datasourceName.equals(cacheKey.datasourceName)) {
cache.invalidate(cacheKey);
}
}
}

@AllArgsConstructor
@EqualsAndHashCode
private static class CacheKey {
private final String datasourceName;
private final InetAddress ip;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;

import lombok.SneakyThrows;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class GeoDataCacheTests extends Ip2GeoTestCase {

/**
* Test a correctness of invalidate
*
* 1. Fill the cache with old data with datasource 1
* 2. Run three threads which will
* add value for datasource 1 with new data
* add value for datasource 2 with new data
* invalidate old data for datasource 1
* 3. Verify all old data for datasource 1 is invalidated
*/
@SneakyThrows
public void testInvalidate_whenCalled_thenInvalidate() {
int cacheSize = 100000;
GeoDataCache geoDataCache = new GeoDataCache(cacheSize);
String datasource1 = "datasource1";
String datasource2 = "datasource2";
List<InetAddress> ips = new ArrayList<>(cacheSize);

// Fill the cache with old data for datasource 1
Map<String, Object> oldData = Map.of("old", "old");
for (int i = 0; i < cacheSize; i++) {
InetAddress ip = randomIp(false);
ips.add(ip);
geoDataCache.putIfAbsent(datasource1, ip, addr -> oldData);
}

ExecutorService executor = Executors.newFixedThreadPool(3);
Map<String, Object> newData = Map.of("new", "new");
executor.submit(() -> {
while (executor.isShutdown() == false) {
try {
InetAddress ip = randomIp(false);
geoDataCache.putIfAbsent(datasource1, ip, addr -> newData);
Thread.yield();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});

executor.submit(() -> {
while (executor.isShutdown() == false) {
try {
InetAddress ip = randomIp(false);
geoDataCache.putIfAbsent(datasource2, ip, addr -> newData);
Thread.yield();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});

Future task = executor.submit(() -> {
geoDataCache.invalidate(datasource1);
});

while (task.isDone() == false) {
Thread.yield();
}
executor.shutdown();

// Verify
assertTrue(ips.stream().filter(ip -> geoDataCache.get(datasource1, ip) != null).noneMatch(ip -> geoDataCache.get(datasource1, ip).containsKey("old")));
}

@SneakyThrows
public void testUpdateMaxSize_whenCalled_thenNewCacheIsCreated() {
int cacheSize = 10;
String datasource = GeospatialTestHelper.randomLowerCaseString();
GeoDataCache geoDataCache = new GeoDataCache(cacheSize);
List<InetAddress> ips = new ArrayList<>(cacheSize);
for (int i = 0; i < cacheSize; i++) {
InetAddress ip = randomIp(false);
ips.add(ip);
geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap());
}

// Verify all data exist in the cache
assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null));

// Update cache size
int newCacheSize = 5;
geoDataCache.updateMaxSize(newCacheSize);

// Verify all data is gone from the cache
assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) == null));

// Add cache size + 1 items
InetAddress firstIp = randomIp(false);
geoDataCache.putIfAbsent(datasource, firstIp, addr -> Collections.emptyMap());
for (int i = 0; i < cacheSize; i++) {
geoDataCache.putIfAbsent(datasource, randomIp(false), addr -> Collections.emptyMap());
}

// Verify the first item is not available in the cache
assertNull(geoDataCache.get(datasource, firstIp));
}
}

0 comments on commit c6ed538

Please sign in to comment.