Skip to content

Commit

Permalink
Use bool query for array form of IPs (#335)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 authored Jun 9, 2023
1 parent ad24a89 commit e082a6f
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -44,8 +43,6 @@
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.MultiSearchRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand All @@ -63,6 +60,7 @@
import org.opensearch.geospatial.constants.IndexSetting;
import org.opensearch.geospatial.shared.Constants;
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;

/**
Expand Down Expand Up @@ -255,15 +253,19 @@ public void getGeoIpData(final String indexName, final String ip, final ActionLi
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
try {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
}
} catch (Exception e) {
actionListener.onFailure(e);
}
}

Expand All @@ -276,73 +278,56 @@ public void onFailure(final Exception e) {
}

/**
* Query a given index using a given ip address iterator to get geoip data
* Query a given index using a given list of ip addresses to get geoip data
*
* This method calls itself recursively until it processes all ip addresses in bulk of {@code bulkSize}.
*
* @param indexName the index name
* @param ipIterator the iterator of ip addresses
* @param geoIpData collected geo data
* @param actionListener the action listener
* @param indexName index
* @param ips list of ip addresses
* @param actionListener action listener
*/
public void getGeoIpData(
final String indexName,
final Iterator<String> ipIterator,
final Map<String, Map<String, Object>> geoIpData,
final ActionListener<Map<String, Map<String, Object>>> actionListener
final List<String> ips,
final ActionListener<List<Map<String, Object>>> actionListener
) {
MultiSearchRequestBuilder mRequestBuilder = client.prepareMultiSearch();

List<String> ipsToSearch = new ArrayList<>(BUNDLE_SIZE);
while (ipIterator.hasNext() && ipsToSearch.size() < BUNDLE_SIZE) {
String ip = ipIterator.next();
if (geoIpData.get(ip) == null) {
mRequestBuilder.add(
client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setRequestCache(true)
);
ipsToSearch.add(ip);
}
}

if (ipsToSearch.isEmpty()) {
actionListener.onResponse(geoIpData);
return;
}

StashedThreadContext.run(client, () -> mRequestBuilder.execute(new ActionListener<>() {
@Override
public void onResponse(final MultiSearchResponse items) {
for (int i = 0; i < ipsToSearch.size(); i++) {
if (items.getResponses()[i].isFailure()) {
actionListener.onFailure(items.getResponses()[i].getFailure());
return;
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
ips.stream().forEach(ip -> boolQueryBuilder.should(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)));
StashedThreadContext.run(
client,
() -> client.prepareSearch(indexName)
.setSize(ips.size())
.setQuery(boolQueryBuilder)
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
try {
actionListener.onResponse(toGeoIpDataList(searchResponse));
} catch (Exception e) {
actionListener.onFailure(e);
}
}

if (items.getResponses()[i].getResponse().getHits().getHits().length == 0) {
geoIpData.put(ipsToSearch.get(i), Collections.emptyMap());
continue;
@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
})
);
}

Map<String, Object> data = (Map<String, Object>) XContentHelper.convertToMap(
items.getResponses()[i].getResponse().getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);

geoIpData.put(ipsToSearch.get(i), data);
}
getGeoIpData(indexName, ipIterator, geoIpData, actionListener);
}
private List<Map<String, Object>> toGeoIpDataList(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
return Collections.emptyList();
}

@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
}));
return Arrays.stream(searchResponse.getHits().getHits())
.map(
data -> (Map<String, Object>) XContentHelper.convertToMap(data.getSourceRef(), false, XContentType.JSON)
.v2()
.get(DATA_FIELD_NAME)
)
.collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
import static org.opensearch.ingest.ConfigurationUtils.readStringProperty;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
Expand All @@ -41,8 +40,6 @@
@Log4j2
public final class Ip2GeoProcessor extends AbstractProcessor {
private static final Map<String, Object> DATA_EXPIRED = Map.of("error", "ip2geo_data_expired");
private static final String PROPERTY_IP = "ip";

public static final String CONFIG_FIELD = "field";
public static final String CONFIG_TARGET_FIELD = "target_field";
public static final String CONFIG_DATASOURCE = "datasource";
Expand Down Expand Up @@ -111,19 +108,28 @@ public Ip2GeoProcessor(
*/
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
try {
Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);

if (ip == null) {
handler.accept(ingestDocument, null);
return;
}
if (ip == null) {
handler.accept(ingestDocument, null);
return;
}

if (ip instanceof String) {
executeInternal(ingestDocument, handler, (String) ip);
} else if (ip instanceof List) {
executeInternal(ingestDocument, handler, ((List<?>) ip));
} else {
throw new IllegalArgumentException("field [" + field + "] should contain only string or array of strings");
if (ip instanceof String) {
executeInternal(ingestDocument, handler, (String) ip);
} else if (ip instanceof List) {
executeInternal(ingestDocument, handler, ((List<?>) ip));
} else {
handler.accept(
null,
new IllegalArgumentException(
String.format(Locale.ROOT, "field [%s] should contain only string or array of strings", field)
)
);
}
} catch (Exception e) {
handler.accept(null, e);
}
}

Expand Down Expand Up @@ -159,17 +165,11 @@ public void onResponse(final Datasource datasource) {
return;
}

geoIpDataFacade.getGeoIpData(indexName, ip, new ActionListener<>() {
@Override
public void onResponse(final Map<String, Object> ipToGeoData) {
handleSingleIp(ip, ipToGeoData, ingestDocument, handler);
}

@Override
public void onFailure(final Exception e) {
handler.accept(null, e);
}
});
try {
geoIpDataFacade.getGeoIpData(indexName, ip, getSingleGeoIpDataListener(ingestDocument, handler));
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
Expand All @@ -180,32 +180,44 @@ public void onFailure(final Exception e) {
}

@VisibleForTesting
protected void handleSingleIp(
final String ip,
final Map<String, Object> ipToGeoData,
protected ActionListener<Map<String, Object>> getSingleGeoIpDataListener(
final IngestDocument ingestDocument,
final BiConsumer<IngestDocument, Exception> handler
) {
if (ipToGeoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData, ip));
}
handler.accept(ingestDocument, null);
return new ActionListener<>() {
@Override
public void onResponse(final Map<String, Object> ipToGeoData) {
try {
if (ipToGeoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData));
}
handler.accept(ingestDocument, null);
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
public void onFailure(final Exception e) {
handler.accept(null, e);
}
};
}

private Map<String, Object> filteredGeoData(final Map<String, Object> geoData, final String ip) {
Map<String, Object> filteredGeoData;
private Map<String, Object> filteredGeoData(final Map<String, Object> geoData) {
if (properties == null) {
return geoData;
}

filteredGeoData = properties.stream()
.filter(p -> p.equals(PROPERTY_IP) == false)
.filter(p -> geoData.containsKey(p))
.collect(Collectors.toMap(p -> p, p -> geoData.get(p)));
if (properties.contains(PROPERTY_IP)) {
filteredGeoData.put(PROPERTY_IP, ip);
return properties.stream().filter(p -> geoData.containsKey(p)).collect(Collectors.toMap(p -> p, p -> geoData.get(p)));
}

private List<Map<String, Object>> filteredGeoData(final List<Map<String, Object>> geoData) {
if (properties == null) {
return geoData;
}
return filteredGeoData;

return geoData.stream().map(this::filteredGeoData).collect(Collectors.toList());
}

/**
Expand All @@ -221,13 +233,11 @@ protected void executeInternal(
final BiConsumer<IngestDocument, Exception> handler,
final List<?> ips
) {
Map<String, Map<String, Object>> data = new HashMap<>();
for (Object ip : ips) {
if (ip instanceof String == false) {
throw new IllegalArgumentException("array in field [" + field + "] should only contain strings");
}
}
List<String> ipList = (List<String>) ips;
datasourceFacade.getDatasource(datasourceName, new ActionListener<>() {
@Override
public void onResponse(final Datasource datasource) {
Expand All @@ -243,12 +253,11 @@ public void onResponse(final Datasource datasource) {
return;
}

geoIpDataFacade.getGeoIpData(
indexName,
ipList.iterator(),
data,
listenerToAppendDataToDocument(data, ipList, ingestDocument, handler)
);
try {
geoIpDataFacade.getGeoIpData(indexName, (List<String>) ips, getMultiGeoIpDataListener(ingestDocument, handler));
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
Expand All @@ -259,31 +268,21 @@ public void onFailure(final Exception e) {
}

@VisibleForTesting
protected ActionListener<Map<String, Map<String, Object>>> listenerToAppendDataToDocument(
final Map<String, Map<String, Object>> data,
final List<String> ipList,
protected ActionListener<List<Map<String, Object>>> getMultiGeoIpDataListener(
final IngestDocument ingestDocument,
final BiConsumer<IngestDocument, Exception> handler
) {
return new ActionListener<>() {
@Override
public void onResponse(final Map<String, Map<String, Object>> response) {
boolean match = false;
List<Map<String, Object>> geoDataList = new ArrayList<>(ipList.size());
for (String ipAddr : ipList) {
Map<String, Object> geoData = data.get(ipAddr);
// GeoData for ipAddr won't be null
geoDataList.add(geoData.isEmpty() ? null : filteredGeoData(geoData, ipAddr));
if (geoData.isEmpty() == false) {
match = true;
public void onResponse(final List<Map<String, Object>> ipToGeoData) {
try {
if (ipToGeoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData));
}
}
if (match) {
ingestDocument.setFieldValue(targetField, geoDataList);
handler.accept(ingestDocument, null);
return;
} catch (Exception e) {
handler.accept(null, e);
}
handler.accept(ingestDocument, null);
}

@Override
Expand Down Expand Up @@ -384,7 +383,6 @@ private void validate(final String processorTag, final String datasourceName, fi

// Validate properties are valid. If not add all available properties.
final Set<String> availableProperties = new HashSet<>(datasource.getDatabase().getFields());
availableProperties.add(PROPERTY_IP);
for (String fieldName : propertyNames) {
if (availableProperties.contains(fieldName) == false) {
throw newConfigurationException(
Expand Down
Loading

0 comments on commit e082a6f

Please sign in to comment.