Skip to content

Commit

Permalink
Added a cache to store datasource metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jun 26, 2023
1 parent a04eeeb commit 88702ed
Show file tree
Hide file tree
Showing 10 changed files with 499 additions and 159 deletions.
120 changes: 120 additions & 0 deletions src/main/java/org/opensearch/geospatial/ip2geo/cache/Ip2GeoCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.cache;

import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.ShardId;

@Log4j2
public class Ip2GeoCache implements IndexingOperationListener {
private final DatasourceFacade datasourceFacade;
private Map<String, DatasourceMetadata> data;

public Ip2GeoCache(final DatasourceFacade datasourceFacade) {
this.datasourceFacade = datasourceFacade;
}

public String getIndexName(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName();
}

public boolean isExpired(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now());
}

public boolean has(final String datasourceName) {
return getData().containsKey(datasourceName);
}

public DatasourceState getState(final String datasourceName) {
return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState();
}

private Map<String, DatasourceMetadata> getData() {
if (data != null) {
return data;
}
synchronized (this) {
if (data != null) {
return data;
}
Map<String, DatasourceMetadata> tempData = new ConcurrentHashMap<>();
datasourceFacade.getAllDatasources()
.stream()
.forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource)));
data = tempData;
return data;
}
}

private void put(final Datasource datasource) {
DatasourceMetadata metadata = new DatasourceMetadata(datasource);
getData().put(datasource.getName(), metadata);
}

private void remove(final String datasourceName) {
getData().remove(datasourceName);
}

@Override
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
if (Engine.Result.Type.FAILURE.equals(result.getResultType())) {
return;
}

try {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, index.source().utf8ToString());
parser.nextToken();
Datasource datasource = Datasource.PARSER.parse(parser, null);
put(datasource);
} catch (IOException e) {
log.error("IOException occurred updating datasource metadata for datasource {} ", index.id(), e);
}
}

@Override
public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResult result) {
if (result.getResultType().equals(Engine.Result.Type.FAILURE)) {
return;
}
remove(delete.id());
}

@Getter
private static class DatasourceMetadata {
private static DatasourceMetadata EMPTY_METADATA = new DatasourceMetadata();
private String indexName;
private Instant expirationDate;
private DatasourceState state;

private DatasourceMetadata() {
expirationDate = Instant.MIN;
}

public DatasourceMetadata(final Datasource datasource) {
this.indexName = datasource.currentIndexName();
this.expirationDate = datasource.expirationDay();
this.state = datasource.getState();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,23 @@ public void getAllDatasources(final ActionListener<List<Datasource>> actionListe
);
}

/**
* Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME}
*/
public List<Datasource> getAllDatasources() {
SearchResponse response = StashedThreadContext.run(
client,
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(MAX_SIZE)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);

List<BytesReference> bytesReferences = toBytesReferences(response);
return bytesReferences.stream().map(bytesRef -> toDatasource(bytesRef)).collect(Collectors.toList());
}

private <T> ActionListener<T> createGetDataSourceQueryActionLister(
final Class<T> response,
final ActionListener<List<Datasource>> actionListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.cache.Ip2GeoCache;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.IngestService;
Expand Down Expand Up @@ -57,6 +57,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
private final ClusterSettings clusterSettings;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;
private final Ip2GeoCache ip2GeoCache;

/**
* Ip2Geo processor type
Expand All @@ -75,6 +76,7 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
* @param clusterSettings the cluster settings
* @param datasourceFacade the datasource facade
* @param geoIpDataFacade the geoip data facade
* @param ip2GeoCache the cache
*/
public Ip2GeoProcessor(
final String tag,
Expand All @@ -86,7 +88,8 @@ public Ip2GeoProcessor(
final boolean ignoreMissing,
final ClusterSettings clusterSettings,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade
final GeoIpDataFacade geoIpDataFacade,
final Ip2GeoCache ip2GeoCache
) {
super(tag, description);
this.field = field;
Expand All @@ -97,6 +100,7 @@ public Ip2GeoProcessor(
this.clusterSettings = clusterSettings;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
this.ip2GeoCache = ip2GeoCache;
}

/**
Expand Down Expand Up @@ -149,42 +153,18 @@ protected void executeInternal(
final BiConsumer<IngestDocument, Exception> handler,
final String ip
) {
datasourceFacade.getDatasource(datasourceName, new ActionListener<>() {
@Override
public void onResponse(final Datasource datasource) {
if (datasource == null) {
handler.accept(null, new IllegalStateException("datasource is not available"));
return;
}

if (DatasourceState.AVAILABLE.equals(datasource.getState()) == false) {
handler.accept(null, new IllegalStateException("datasource is not in an available state"));
return;
}

String indexName = datasource.currentIndexName();
if (indexName == null) {
ingestDocument.setFieldValue(targetField, DATA_EXPIRED);
handler.accept(ingestDocument, null);
return;
}

try {
geoIpDataFacade.getGeoIpData(indexName, ip, getSingleGeoIpDataListener(ingestDocument, handler));
} catch (Exception e) {
handler.accept(null, e);
}
}
validateDatasourceIsInAvailableState(datasourceName);
String indexName = ip2GeoCache.getIndexName(datasourceName);
if (ip2GeoCache.isExpired(datasourceName) || indexName == null) {
handleExpiredData(ingestDocument, handler);
return;
}

@Override
public void onFailure(final Exception e) {
if (e instanceof IndexNotFoundException) {
handler.accept(null, new IllegalStateException("datasource is not available"));
return;
}
handler.accept(null, e);
}
});
try {
geoIpDataFacade.getGeoIpData(indexName, ip, getSingleGeoIpDataListener(ingestDocument, handler));
} catch (Exception e) {
handler.accept(null, e);
}
}

@VisibleForTesting
Expand Down Expand Up @@ -228,6 +208,21 @@ private List<Map<String, Object>> filteredGeoData(final List<Map<String, Object>
return geoData.stream().map(this::filteredGeoData).collect(Collectors.toList());
}

private void validateDatasourceIsInAvailableState(final String datasourceName) {
if (ip2GeoCache.has(datasourceName) == false) {
throw new IllegalStateException("datasource does not exist");
}

if (DatasourceState.AVAILABLE.equals(ip2GeoCache.getState(datasourceName)) == false) {
throw new IllegalStateException("datasource is not in an available state");
}
}

private void handleExpiredData(final IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
ingestDocument.setFieldValue(targetField, DATA_EXPIRED);
handler.accept(ingestDocument, null);
}

/**
* Handle multiple ips
*
Expand All @@ -246,37 +241,15 @@ protected void executeInternal(
throw new IllegalArgumentException("array in field [" + field + "] should only contain strings");
}
}
datasourceFacade.getDatasource(datasourceName, new ActionListener<>() {
@Override
public void onResponse(final Datasource datasource) {
if (datasource == null || DatasourceState.AVAILABLE.equals(datasource.getState()) == false) {
handler.accept(null, new IllegalStateException("datasource is not available"));
return;
}

String indexName = datasource.currentIndexName();
if (indexName == null) {
ingestDocument.setFieldValue(targetField, DATA_EXPIRED);
handler.accept(ingestDocument, null);
return;
}

try {
geoIpDataFacade.getGeoIpData(indexName, (List<String>) ips, getMultiGeoIpDataListener(ingestDocument, handler));
} catch (Exception e) {
handler.accept(null, e);
}
}
validateDatasourceIsInAvailableState(datasourceName);
String indexName = ip2GeoCache.getIndexName(datasourceName);
if (ip2GeoCache.isExpired(datasourceName) || indexName == null) {
handleExpiredData(ingestDocument, handler);
return;
}

@Override
public void onFailure(final Exception e) {
if (e instanceof IndexNotFoundException) {
handler.accept(null, new IllegalStateException("datasource is not available"));
return;
}
handler.accept(null, e);
}
});
geoIpDataFacade.getGeoIpData(indexName, (List<String>) ips, getMultiGeoIpDataListener(ingestDocument, handler));
}

@VisibleForTesting
Expand Down Expand Up @@ -312,23 +285,12 @@ public String getType() {
/**
* Ip2Geo processor factory
*/
@AllArgsConstructor
public static final class Factory implements Processor.Factory {
private final IngestService ingestService;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;

/**
* Default constructor
*
* @param ingestService the ingest service
* @param datasourceFacade the datasource facade
* @param geoIpDataFacade the geoip data facade
*/
public Factory(final IngestService ingestService, final DatasourceFacade datasourceFacade, final GeoIpDataFacade geoIpDataFacade) {
this.ingestService = ingestService;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
}
private final Ip2GeoCache ip2GeoCache;

/**
* Within this method, blocking request cannot be called because this method is executed in a transport thread.
Expand Down Expand Up @@ -357,7 +319,8 @@ public Ip2GeoProcessor create(
ignoreMissing,
ingestService.getClusterService().getClusterSettings(),
datasourceFacade,
geoIpDataFacade
geoIpDataFacade,
ip2GeoCache
);
}
}
Expand Down
Loading

0 comments on commit 88702ed

Please sign in to comment.