Skip to content

Commit

Permalink
Add cache layer to reduce GeoIp data retrieval latency
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jun 26, 2023
1 parent 72f2f7a commit 39e30a7
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,11 @@ public class Ip2GeoSettings {
);

/**
* Multi search max concurrent searches
*
* Multi search is used only when a field contains a list of ip addresses.
*
* When the value is 0, it will use default value which will be decided
* based on node count and search thread pool size.
* Max size for geo data cache
*/
public static final Setting<Integer> MAX_CONCURRENT_SEARCHES = Setting.intSetting(
"plugins.geospatial.ip2geo.processor.max_concurrent_searches",
0,
public static final Setting<Long> CACHE_SIZE = Setting.longSetting(
"plugins.geospatial.ip2geo.processor.cache_size",
1000,
0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
Expand All @@ -72,7 +67,7 @@ public class Ip2GeoSettings {
* @return a list of all settings for Ip2Geo feature
*/
public static final List<Setting<?>> settings() {
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, MAX_CONCURRENT_SEARCHES);
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, CACHE_SIZE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,32 @@ public void onFailure(final Exception e) {
);
}

/**
* Query a given index using a given ip address to get geoip data
*
* @param indexName index
* @param ip ip address
*/
public Map<String, Object> getGeoIpData(final String indexName, final String ip) {
SearchResponse response = StashedThreadContext.run(
client,
() -> client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setRequestCache(true)
.get(TimeValue.timeValueSeconds(30))
);

if (response.getHits().getHits().length == 0) {
return Collections.emptyMap();
} else {
return (Map<String, Object>) XContentHelper.convertToMap(response.getHits().getAt(0).getSourceRef(), false, XContentType.JSON)
.v2()
.get(DATA_FIELD_NAME);
}
}

/**
* Query a given index using a given list of ip addresses to get geoip data
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -26,7 +27,9 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.ShardId;
Expand All @@ -37,52 +40,70 @@
@Log4j2
public class Ip2GeoCachedDao implements IndexingOperationListener {
private final DatasourceDao datasourceDao;
private Map<String, DatasourceMetadata> data;
private final GeoIpDataDao geoIpDataDao;
private final GeoDataCache geoDataCache;
private Map<String, DatasourceMetadata> metadata;

public Ip2GeoCachedDao(final DatasourceDao datasourceDao) {
public Ip2GeoCachedDao(final ClusterService clusterService, final DatasourceDao datasourceDao, final GeoIpDataDao geoIpDataDao) {
this.datasourceDao = datasourceDao;
this.geoIpDataDao = geoIpDataDao;
this.geoDataCache = new GeoDataCache(clusterService.getClusterSettings().get(Ip2GeoSettings.CACHE_SIZE));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(Ip2GeoSettings.CACHE_SIZE, setting -> this.geoDataCache.updateMaxSize(setting.longValue()));
}

public String getIndexName(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName();
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName();
}

public boolean isExpired(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now());
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now());
}

public boolean has(final String datasourceName) {
return getData().containsKey(datasourceName);
return getMetadata().containsKey(datasourceName);
}

public DatasourceState getState(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState();
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState();
}

private Map<String, DatasourceMetadata> getData() {
if (data != null) {
return data;
public Map<String, Object> getGeoData(final String indexName, final String ip) {
try {
return geoDataCache.putIfAbsent(indexName, ip, addr -> geoIpDataDao.getGeoIpData(indexName, ip));
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

private Map<String, DatasourceMetadata> getMetadata() {
if (metadata != null) {
return metadata;
}
synchronized (this) {
if (data != null) {
return data;
if (metadata != null) {
return metadata;
}
Map<String, DatasourceMetadata> tempData = new ConcurrentHashMap<>();
datasourceDao.getAllDatasources()
.stream()
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
data = tempData;
return data;
try {
datasourceDao.getAllDatasources()
.stream()
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
} catch (IndexNotFoundException e) {
log.debug("Datasource has never been created");
}
metadata = tempData;
return metadata;
}
}

private void put(final Datasource datasource) {
DatasourceMetadata metadata = new DatasourceMetadata(datasource);
getData().put(datasource.getName(), metadata);
getMetadata().put(datasource.getName(), metadata);
}

private void remove(final String datasourceName) {
getData().remove(datasourceName);
getMetadata().remove(datasourceName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
Expand Down Expand Up @@ -160,36 +159,11 @@ protected void executeInternal(
return;
}

try {
geoIpDataDao.getGeoIpData(indexName, ip, getSingleGeoIpDataListener(ingestDocument, handler));
} catch (Exception e) {
handler.accept(null, e);
Map<String, Object> geoData = ip2GeoCachedDao.getGeoData(indexName, ip);
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(geoData));
}
}

@VisibleForTesting
protected ActionListener<Map<String, Object>> getSingleGeoIpDataListener(
final IngestDocument ingestDocument,
final BiConsumer<IngestDocument, Exception> handler
) {
return new ActionListener<>() {
@Override
public void onResponse(final Map<String, Object> ipToGeoData) {
try {
if (ipToGeoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData));
}
handler.accept(ingestDocument, null);
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
public void onFailure(final Exception e) {
handler.accept(null, e);
}
};
handler.accept(ingestDocument, null);
}

private Map<String, Object> filteredGeoData(final Map<String, Object> geoData) {
Expand All @@ -200,14 +174,6 @@ private Map<String, Object> filteredGeoData(final Map<String, Object> geoData) {
return properties.stream().filter(p -> geoData.containsKey(p)).collect(Collectors.toMap(p -> p, p -> geoData.get(p)));
}

private List<Map<String, Object>> filteredGeoData(final List<Map<String, Object>> geoData) {
if (properties == null) {
return geoData;
}

return geoData.stream().map(this::filteredGeoData).collect(Collectors.toList());
}

private void validateDatasourceIsInAvailableState(final String datasourceName) {
if (ip2GeoCachedDao.has(datasourceName) == false) {
throw new IllegalStateException("datasource does not exist");
Expand Down Expand Up @@ -249,32 +215,16 @@ protected void executeInternal(
return;
}

geoIpDataDao.getGeoIpData(indexName, (List<String>) ips, getMultiGeoIpDataListener(ingestDocument, handler));
}
List<Map<String, Object>> geoDataList = ips.stream()
.map(ip -> ip2GeoCachedDao.getGeoData(indexName, (String) ip))
.filter(geoData -> geoData.isEmpty() == false)
.map(this::filteredGeoData)
.collect(Collectors.toList());

@VisibleForTesting
protected ActionListener<List<Map<String, Object>>> getMultiGeoIpDataListener(
final IngestDocument ingestDocument,
final BiConsumer<IngestDocument, Exception> handler
) {
return new ActionListener<>() {
@Override
public void onResponse(final List<Map<String, Object>> ipToGeoData) {
try {
if (ipToGeoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData));
}
handler.accept(ingestDocument, null);
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
public void onFailure(final Exception e) {
handler.accept(null, e);
}
};
if (geoDataList.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, geoDataList);
}
handler.accept(ingestDocument, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
this.datasourceDao = new DatasourceDao(parameters.client, parameters.ingestService.getClusterService());
this.geoIpDataDao = new GeoIpDataDao(parameters.ingestService.getClusterService(), parameters.client);
this.ip2GeoCachedDao = new Ip2GeoCachedDao(datasourceDao);
this.ip2GeoCachedDao = new Ip2GeoCachedDao(parameters.ingestService.getClusterService(), datasourceDao, geoIpDataDao);
return MapBuilder.<String, Processor.Factory>newMapBuilder()
.put(FeatureProcessor.TYPE, new FeatureProcessor.Factory())
.put(Ip2GeoProcessor.TYPE, new Ip2GeoProcessor.Factory(parameters.ingestService, datasourceDao, geoIpDataDao, ip2GeoCachedDao))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class Ip2GeoCachedDaoTests extends Ip2GeoTestCase {

@Before
public void init() {
ip2GeoCachedDao = new Ip2GeoCachedDao(datasourceDao);
ip2GeoCachedDao = new Ip2GeoCachedDao(clusterService, datasourceDao, geoIpDataDao);
}

public void testGetIndexName_whenCalled_thenReturnIndexName() {
Expand Down
Loading

0 comments on commit 39e30a7

Please sign in to comment.