Skip to content

Commit

Permalink
Add cache layer to reduce GeoIp data retrieval latency
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jun 27, 2023
1 parent 72f2f7a commit 07d7e72
Show file tree
Hide file tree
Showing 13 changed files with 278 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import lombok.Getter;
import lombok.Setter;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -52,10 +51,12 @@ public GetDatasourceRequest(final StreamInput in) throws IOException {

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = null;
if (names == null) {
throw new OpenSearchException("names should not be null");
errors = new ActionRequestValidationException();
errors.addValidationError("names should not be null");
}
return null;
return errors;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,11 @@ public class Ip2GeoSettings {
);

/**
* Multi search max concurrent searches
*
* Multi search is used only when a field contains a list of ip addresses.
*
* When the value is 0, it will use default value which will be decided
* based on node count and search thread pool size.
* Max size for geo data cache
*/
public static final Setting<Integer> MAX_CONCURRENT_SEARCHES = Setting.intSetting(
"plugins.geospatial.ip2geo.processor.max_concurrent_searches",
0,
public static final Setting<Long> CACHE_SIZE = Setting.longSetting(
"plugins.geospatial.ip2geo.processor.cache_size",
1000,
0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
Expand All @@ -72,7 +67,7 @@ public class Ip2GeoSettings {
* @return a list of all settings for Ip2Geo feature
*/
public static final List<Setting<?>> settings() {
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, MAX_CONCURRENT_SEARCHES);
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, CACHE_SIZE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.logging.log4j.util.Strings;
import org.opensearch.OpenSearchException;
import org.opensearch.SpecialPermission;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
Expand All @@ -62,7 +61,6 @@
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.shared.Constants;
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;

/**
Expand Down Expand Up @@ -242,94 +240,26 @@ public XContentBuilder createDocument(final String[] fields, final String[] valu
*
* @param indexName index
* @param ip ip address
* @param actionListener action listener
* @return geoIP data
*/
public void getGeoIpData(final String indexName, final String ip, final ActionListener<Map<String, Object>> actionListener) {
StashedThreadContext.run(
public Map<String, Object> getGeoIpData(final String indexName, final String ip) {
SearchResponse response = StashedThreadContext.run(
client,
() -> client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
try {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
}
} catch (Exception e) {
actionListener.onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
})
.get(TimeValue.timeValueSeconds(30))
);
}

/**
* Query a given index using a given list of ip addresses to get geoip data
*
* @param indexName index
* @param ips list of ip addresses
* @param actionListener action listener
*/
public void getGeoIpData(
final String indexName,
final List<String> ips,
final ActionListener<List<Map<String, Object>>> actionListener
) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
ips.stream().forEach(ip -> boolQueryBuilder.should(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)));
StashedThreadContext.run(
client,
() -> client.prepareSearch(indexName)
.setSize(ips.size())
.setQuery(boolQueryBuilder)
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
try {
actionListener.onResponse(toGeoIpDataList(searchResponse));
} catch (Exception e) {
actionListener.onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
})
);
}

private List<Map<String, Object>> toGeoIpDataList(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
return Collections.emptyList();
if (response.getHits().getHits().length == 0) {
return Collections.emptyMap();
} else {
return (Map<String, Object>) XContentHelper.convertToMap(response.getHits().getAt(0).getSourceRef(), false, XContentType.JSON)
.v2()
.get(DATA_FIELD_NAME);
}

return Arrays.stream(searchResponse.getHits().getHits())
.map(
data -> (Map<String, Object>) XContentHelper.convertToMap(data.getSourceRef(), false, XContentType.JSON)
.v2()
.get(DATA_FIELD_NAME)
)
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -26,7 +27,9 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.ShardId;
Expand All @@ -37,52 +40,70 @@
@Log4j2
public class Ip2GeoCachedDao implements IndexingOperationListener {
private final DatasourceDao datasourceDao;
private Map<String, DatasourceMetadata> data;
private final GeoIpDataDao geoIpDataDao;
private final GeoDataCache geoDataCache;
private Map<String, DatasourceMetadata> metadata;

public Ip2GeoCachedDao(final DatasourceDao datasourceDao) {
public Ip2GeoCachedDao(final ClusterService clusterService, final DatasourceDao datasourceDao, final GeoIpDataDao geoIpDataDao) {
this.datasourceDao = datasourceDao;
this.geoIpDataDao = geoIpDataDao;
this.geoDataCache = new GeoDataCache(clusterService.getClusterSettings().get(Ip2GeoSettings.CACHE_SIZE));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(Ip2GeoSettings.CACHE_SIZE, setting -> this.geoDataCache.updateMaxSize(setting.longValue()));
}

public String getIndexName(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName();
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName();
}

public boolean isExpired(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now());
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now());
}

public boolean has(final String datasourceName) {
return getData().containsKey(datasourceName);
return getMetadata().containsKey(datasourceName);
}

public DatasourceState getState(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState();
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState();
}

private Map<String, DatasourceMetadata> getData() {
if (data != null) {
return data;
public Map<String, Object> getGeoData(final String indexName, final String ip) {
try {
return geoDataCache.putIfAbsent(indexName, ip, addr -> geoIpDataDao.getGeoIpData(indexName, ip));
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

private Map<String, DatasourceMetadata> getMetadata() {
if (metadata != null) {
return metadata;
}
synchronized (this) {
if (data != null) {
return data;
if (metadata != null) {
return metadata;
}
Map<String, DatasourceMetadata> tempData = new ConcurrentHashMap<>();
datasourceDao.getAllDatasources()
.stream()
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
data = tempData;
return data;
try {
datasourceDao.getAllDatasources()
.stream()
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
} catch (IndexNotFoundException e) {
log.debug("Datasource has never been created");
}
metadata = tempData;
return metadata;
}
}

private void put(final Datasource datasource) {
DatasourceMetadata metadata = new DatasourceMetadata(datasource);
getData().put(datasource.getName(), metadata);
getMetadata().put(datasource.getName(), metadata);
}

private void remove(final String datasourceName) {
getData().remove(datasourceName);
getMetadata().remove(datasourceName);
}

@Override
Expand Down
Loading

0 comments on commit 07d7e72

Please sign in to comment.