Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cache layer to reduce GeoIp data retrieval latency #343

Merged
merged 1 commit into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import lombok.Getter;
import lombok.Setter;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -52,10 +51,12 @@ public GetDatasourceRequest(final StreamInput in) throws IOException {

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = null;
if (names == null) {
throw new OpenSearchException("names should not be null");
errors = new ActionRequestValidationException();
errors.addValidationError("names should not be null");
}
return null;
return errors;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,11 @@ public class Ip2GeoSettings {
);

/**
* Multi search max concurrent searches
*
* Multi search is used only when a field contains a list of ip addresses.
*
* When the value is 0, it will use default value which will be decided
* based on node count and search thread pool size.
* Max size for geo data cache
*/
public static final Setting<Integer> MAX_CONCURRENT_SEARCHES = Setting.intSetting(
"plugins.geospatial.ip2geo.processor.max_concurrent_searches",
0,
public static final Setting<Long> CACHE_SIZE = Setting.longSetting(
"plugins.geospatial.ip2geo.processor.cache_size",
1000,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not 10k?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cache is created when Geospatial plugin is installed regardless a user uses Ip2Geo processor or not. So, I think it could be better to start with small number and let user to decide to increase the cache size later. Also, the performance test result with 1,000 wasn't really bad compared to 10,000.

0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
Expand All @@ -72,7 +67,7 @@ public class Ip2GeoSettings {
* @return a list of all settings for Ip2Geo feature
*/
public static final List<Setting<?>> settings() {
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, MAX_CONCURRENT_SEARCHES);
return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, CACHE_SIZE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.logging.log4j.util.Strings;
import org.opensearch.OpenSearchException;
import org.opensearch.SpecialPermission;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
Expand All @@ -62,7 +61,6 @@
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.shared.Constants;
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;

/**
Expand Down Expand Up @@ -242,94 +240,26 @@ public XContentBuilder createDocument(final String[] fields, final String[] valu
*
* @param indexName index
* @param ip ip address
* @param actionListener action listener
* @return geoIP data
*/
public void getGeoIpData(final String indexName, final String ip, final ActionListener<Map<String, Object>> actionListener) {
StashedThreadContext.run(
public Map<String, Object> getGeoIpData(final String indexName, final String ip) {
SearchResponse response = StashedThreadContext.run(
client,
() -> client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
try {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
}
} catch (Exception e) {
actionListener.onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
})
.get(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);
}

/**
* Query a given index using a given list of ip addresses to get geoip data
*
* @param indexName index
* @param ips list of ip addresses
* @param actionListener action listener
*/
public void getGeoIpData(
final String indexName,
final List<String> ips,
final ActionListener<List<Map<String, Object>>> actionListener
) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
ips.stream().forEach(ip -> boolQueryBuilder.should(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)));
StashedThreadContext.run(
client,
() -> client.prepareSearch(indexName)
.setSize(ips.size())
.setQuery(boolQueryBuilder)
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
try {
actionListener.onResponse(toGeoIpDataList(searchResponse));
} catch (Exception e) {
actionListener.onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
})
);
}

private List<Map<String, Object>> toGeoIpDataList(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
return Collections.emptyList();
if (response.getHits().getHits().length == 0) {
return Collections.emptyMap();
} else {
return (Map<String, Object>) XContentHelper.convertToMap(response.getHits().getAt(0).getSourceRef(), false, XContentType.JSON)
.v2()
.get(DATA_FIELD_NAME);
}

return Arrays.stream(searchResponse.getHits().getHits())
.map(
data -> (Map<String, Object>) XContentHelper.convertToMap(data.getSourceRef(), false, XContentType.JSON)
.v2()
.get(DATA_FIELD_NAME)
)
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.cache.Cache;
import org.opensearch.common.cache.CacheBuilder;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -26,63 +27,89 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.ShardId;

/**
* Data access object for Datasource and GeoIP data with added caching layer
*
* Ip2GeoCachedDao has a memory cache to store Datasource and GeoIP data. To fully utilize the cache,
* do not create multiple Ip2GeoCachedDao. Ip2GeoCachedDao instance is bound to guice so that you can use
* it through injection.
*
* All IP2Geo processors share single Ip2GeoCachedDao instance.
*/
@Log4j2
public class Ip2GeoCachedDao implements IndexingOperationListener {
private final DatasourceDao datasourceDao;
private Map<String, DatasourceMetadata> data;
private final GeoIpDataDao geoIpDataDao;
private final GeoDataCache geoDataCache;
private Map<String, DatasourceMetadata> metadata;

public Ip2GeoCachedDao(final DatasourceDao datasourceDao) {
public Ip2GeoCachedDao(final ClusterService clusterService, final DatasourceDao datasourceDao, final GeoIpDataDao geoIpDataDao) {
this.datasourceDao = datasourceDao;
this.geoIpDataDao = geoIpDataDao;
this.geoDataCache = new GeoDataCache(clusterService.getClusterSettings().get(Ip2GeoSettings.CACHE_SIZE));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we make sure there is only one instance of GeoDataCache?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we create multiple Ip2GeoCachedDao, there will be more than on GeoDataCache. However, the class is used only for Ip2Geo processor. I didn't enforce the singleton of the cache as I thought the chance is low that we end up creating multiple Ip2GeoCachedDao by mistake.

We can enforce it to be singleton but not sure if it necessary at this stage.

public class GeoDataCache {
    private static GeoDataCache INSTANCE;
    public static GeoDataCache getInstance() {
        if (INSTANCE == null) {
            throw new IllegalStateException("instance is not initialized");
        }
        return INSTANCE;
    }

    public static void synchronized initialize(ClusterStateService clusterService) {
        if (INSTANCE != null) {
            throw new IllegalStateException("instance is initialized already");
        }
        INSTANCE = new GeoDataCache(clusterService.getClusterSettings().get(Ip2GeoSettings.CACHE_SIZE));
        clusterService.getClusterSettings()
            .addSettingsUpdateConsumer(Ip2GeoSettings.CACHE_SIZE, setting -> INSTANCE.updateMaxSize(setting.longValue()));
    }

    ....
}

Usage

    GeoDataCache.initialize(clusterService);
    GeoDataCache geoDataCache = GeoDataCache.getInstance();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a code comment explaining this reasoning

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(Ip2GeoSettings.CACHE_SIZE, setting -> this.geoDataCache.updateMaxSize(setting.longValue()));
}

public String getIndexName(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName();
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName();
}

public boolean isExpired(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now());
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now());
}

public boolean has(final String datasourceName) {
return getData().containsKey(datasourceName);
return getMetadata().containsKey(datasourceName);
}

public DatasourceState getState(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState();
return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState();
}

private Map<String, DatasourceMetadata> getData() {
if (data != null) {
return data;
public Map<String, Object> getGeoData(final String indexName, final String ip) {
try {
return geoDataCache.putIfAbsent(indexName, ip, addr -> geoIpDataDao.getGeoIpData(indexName, ip));
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

private Map<String, DatasourceMetadata> getMetadata() {
if (metadata != null) {
return metadata;
}
synchronized (this) {
if (data != null) {
return data;
if (metadata != null) {
return metadata;
}
Map<String, DatasourceMetadata> tempData = new ConcurrentHashMap<>();
datasourceDao.getAllDatasources()
.stream()
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
data = tempData;
return data;
try {
datasourceDao.getAllDatasources()
.stream()
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
} catch (IndexNotFoundException e) {
log.debug("Datasource has never been created");
}
metadata = tempData;
return metadata;
}
}

private void put(final Datasource datasource) {
DatasourceMetadata metadata = new DatasourceMetadata(datasource);
getData().put(datasource.getName(), metadata);
getMetadata().put(datasource.getName(), metadata);
}

private void remove(final String datasourceName) {
getData().remove(datasourceName);
getMetadata().remove(datasourceName);
}

@Override
Expand Down
Loading