Skip to content

Commit

Permalink
Add geo data cache
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jun 22, 2023
1 parent a04eeeb commit 9b57b0f
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;

import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;

/**
* Cache to hold geo data
*
* GeoData in an index in immutable. Therefore, invalidation is not needed.
*/
public class GeoDataCache {
private Cache<CacheKey, Map<String, Object>> cache;

public GeoDataCache(final long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater");
}
this.cache = CacheBuilder.<CacheKey, Map<String, Object>>builder().setMaximumWeight(maxSize).build();
}

public Map<String, Object> putIfAbsent(
final String indexName,
final String ip,
final Function<String, Map<String, Object>> retrieveFunction
) throws ExecutionException {
CacheKey cacheKey = new CacheKey(indexName, ip);
return cache.computeIfAbsent(cacheKey, key -> retrieveFunction.apply(key.ip));
}

public Map<String, Object> get(final String indexName, final String ip) {
return cache.get(new CacheKey(indexName, ip));
}

/**
* Create a new cache with give size and replace existing cache
*
* Try to populate the existing value from previous cache to the new cache in best effort
*
* @param maxSize
*/
public void updateMaxSize(final long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater");
}
Cache<CacheKey, Map<String, Object>> temp = CacheBuilder.<CacheKey, Map<String, Object>>builder().setMaximumWeight(maxSize).build();
int count = 0;
Iterator<GeoDataCache.CacheKey> it = cache.keys().iterator();
while (it.hasNext() && count < maxSize) {
GeoDataCache.CacheKey key = it.next();
temp.put(key, cache.get(key));
count++;
}
cache = temp;
}

@AllArgsConstructor
@EqualsAndHashCode
private static class CacheKey {
private final String indexName;
private final String ip;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import lombok.SneakyThrows;

import org.opensearch.common.network.NetworkAddress;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;

public class GeoDataCacheTests extends Ip2GeoTestCase {
@SneakyThrows
public void testUpdateMaxSize_whenBiggerSize_thenContainsAllData() {
int cacheSize = 10;
String datasource = GeospatialTestHelper.randomLowerCaseString();
GeoDataCache geoDataCache = new GeoDataCache(cacheSize);
List<String> ips = new ArrayList<>(cacheSize);
for (int i = 0; i < cacheSize; i++) {
String ip = NetworkAddress.format(randomIp(false));
ips.add(ip);
geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap());
}

// Verify all data exist in the cache
assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null));

// Update cache size
int newCacheSize = 15;
geoDataCache.updateMaxSize(newCacheSize);

// Verify all data exist in the cache
assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null));

// Add (newCacheSize - cacheSize + 1) data and the first data should not be available in the cache
for (int i = 0; i < newCacheSize - cacheSize + 1; i++) {
geoDataCache.putIfAbsent(datasource, NetworkAddress.format(randomIp(false)), addr -> Collections.emptyMap());
}
assertNull(geoDataCache.get(datasource, ips.get(0)));
}

@SneakyThrows
public void testUpdateMaxSize_whenSmallerSize_thenContainsPartialData() {
int cacheSize = 10;
String datasource = GeospatialTestHelper.randomLowerCaseString();
GeoDataCache geoDataCache = new GeoDataCache(cacheSize);
List<String> ips = new ArrayList<>(cacheSize);
for (int i = 0; i < cacheSize; i++) {
String ip = NetworkAddress.format(randomIp(false));
ips.add(ip);
geoDataCache.putIfAbsent(datasource, ip, addr -> Collections.emptyMap());
}

// Verify all data exist in the cache
assertTrue(ips.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null));

// Update cache size
int newCacheSize = 5;
geoDataCache.updateMaxSize(newCacheSize);

// Verify the last (cacheSize - newCacheSize) data is available in the cache
List<String> deleted = ips.subList(0, ips.size() - newCacheSize);
List<String> retained = ips.subList(ips.size() - newCacheSize, ips.size());
assertTrue(deleted.stream().allMatch(ip -> geoDataCache.get(datasource, ip) == null));
assertTrue(retained.stream().allMatch(ip -> geoDataCache.get(datasource, ip) != null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public void init() {
}

public void testRunJob_whenInvalidClass_thenThrowException() {
JobExecutionContext jobExecutionContext = mock(JobExecutionContext.class);
JobDocVersion jobDocVersion = new JobDocVersion(randomInt(), randomInt(), randomInt());
String jobIndexName = randomLowerCaseString();
String jobId = randomLowerCaseString();
JobExecutionContext jobExecutionContext = new JobExecutionContext(Instant.now(), jobDocVersion, lockService, jobIndexName, jobId);
ScheduledJobParameter jobParameter = mock(ScheduledJobParameter.class);
expectThrows(IllegalStateException.class, () -> DatasourceRunner.getJobRunnerInstance().runJob(jobParameter, jobExecutionContext));
}
Expand Down

0 comments on commit 9b57b0f

Please sign in to comment.