Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run system index handling code with stashed thread context #297

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
Expand Down Expand Up @@ -92,7 +93,7 @@ public void createIndexIfNotExists(final StepListener<Void> stepListener) {
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(indexSettings);
client.admin().indices().create(createIndexRequest, new ActionListener<>() {
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() {
@Override
public void onResponse(final CreateIndexResponse createIndexResponse) {
stepListener.onResponse(null);
Expand All @@ -107,7 +108,7 @@ public void onFailure(final Exception e) {
}
stepListener.onFailure(e);
}
});
}));
}

private String getIndexMapping() {
Expand All @@ -126,34 +127,44 @@ private String getIndexMapping() {
* Update datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param datasource the datasource
* @return index response
* @throws IOException exception
*/
public IndexResponse updateDatasource(final Datasource datasource) throws IOException {
public IndexResponse updateDatasource(final Datasource datasource) {
datasource.setLastUpdateTime(Instant.now());
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
return StashedThreadContext.run(client, () -> {
try {
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}

/**
* Put datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
*
* @param datasource the datasource
* @param listener the listener
* @throws IOException exception
*/
public void putDatasource(final Datasource datasource, final ActionListener listener) throws IOException {
public void putDatasource(final Datasource datasource, final ActionListener listener) {
datasource.setLastUpdateTime(Instant.now());
client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.CREATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute(listener);
StashedThreadContext.run(client, () -> {
try {
client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.CREATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute(listener);
} catch (IOException e) {
new RuntimeException(e);
}
});
}

/**
Expand All @@ -166,7 +177,7 @@ public Datasource getDatasource(final String name) throws IOException {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
GetResponse response;
try {
response = client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)));
if (response.isExists() == false) {
log.error("Datasource[{}] does not exist in an index[{}]", name, DatasourceExtension.JOB_INDEX_NAME);
return null;
Expand All @@ -191,7 +202,7 @@ public Datasource getDatasource(final String name) throws IOException {
*/
public void getDatasource(final String name, final ActionListener<Datasource> actionListener) {
GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name);
client.get(request, new ActionListener<GetResponse>() {
StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() {
@Override
public void onResponse(final GetResponse response) {
if (response.isExists() == false) {
Expand All @@ -215,7 +226,7 @@ public void onResponse(final GetResponse response) {
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
});
}));
}

/**
Expand All @@ -224,20 +235,26 @@ public void onFailure(final Exception e) {
* @param actionListener the action listener
*/
public void getDatasources(final String[] names, final ActionListener<List<Datasource>> actionListener) {
client.prepareMultiGet()
.add(DatasourceExtension.JOB_INDEX_NAME, names)
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener));
StashedThreadContext.run(
client,
() -> client.prepareMultiGet()
.add(DatasourceExtension.JOB_INDEX_NAME, names)
.execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener))
);
}

/**
* Get all datasources up to {@code MAX_SIZE} from an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param actionListener the action listener
*/
public void getAllDatasources(final ActionListener<List<Datasource>> actionListener) {
client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(MAX_SIZE)
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener));
StashedThreadContext.run(
client,
() -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME)
.setQuery(QueryBuilders.matchAllQuery())
.setSize(MAX_SIZE)
.execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener))
);
}

private <T> ActionListener<T> createGetDataSourceQueryActionLister(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.query.QueryBuilders;

/**
Expand Down Expand Up @@ -95,7 +96,10 @@ public void createIndexIfNotExists(final String indexName) {
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings).mapping(getIndexMapping());
client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
StashedThreadContext.run(
client,
() -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);
}

/**
Expand Down Expand Up @@ -210,31 +214,34 @@ public String createDocument(final String[] fields, final String[] values) {
* @param actionListener action listener
*/
public void getGeoIpData(final String indexName, final String ip, final ActionListener<Map<String, Object>> actionListener) {
client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
StashedThreadContext.run(
client,
() -> client.prepareSearch(indexName)
.setSize(1)
.setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))
.setPreference("_local")
.setRequestCache(true)
.execute(new ActionListener<>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
if (searchResponse.getHits().getHits().length == 0) {
actionListener.onResponse(Collections.emptyMap());
} else {
Map<String, Object> geoIpData = (Map<String, Object>) XContentHelper.convertToMap(
searchResponse.getHits().getAt(0).getSourceRef(),
false,
XContentType.JSON
).v2().get(DATA_FIELD_NAME);
actionListener.onResponse(geoIpData);
}
}
}

@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
});
@Override
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
})
);
}

/**
Expand Down Expand Up @@ -284,7 +291,7 @@ public void getGeoIpData(
return;
}

mRequestBuilder.execute(new ActionListener<>() {
StashedThreadContext.run(client, () -> mRequestBuilder.execute(new ActionListener<>() {
@Override
public void onResponse(final MultiSearchResponse items) {
for (int i = 0; i < ipsToSearch.size(); i++) {
Expand Down Expand Up @@ -318,7 +325,7 @@ public void onResponse(final MultiSearchResponse items) {
public void onFailure(final Exception e) {
actionListener.onFailure(e);
}
});
}));
}

/**
Expand All @@ -331,32 +338,34 @@ public void onFailure(final Exception e) {
*/
public void putGeoIpData(final String indexName, final String[] fields, final Iterator<CSVRecord> iterator, final int bulkSize) {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
while (iterator.hasNext()) {
CSVRecord record = iterator.next();
String document = createDocument(fields, record.values());
IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON);
bulkRequest.add(request);
if (iterator.hasNext() == false || bulkRequest.requests().size() == bulkSize) {
BulkResponse response = client.bulk(bulkRequest).actionGet(timeout);
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
if (response.hasFailures()) {
throw new OpenSearchException(
"error occurred while ingesting GeoIP data in {} with an error {}",
indexName,
response.buildFailureMessage()
);
}
bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
bulkRequest.requests().clear();
}
}
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Map.of(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2()))
.execute()
.actionGet(timeout);
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Map.of(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2()))
.execute()
.actionGet(timeout);
});
}

public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
Expand All @@ -367,11 +376,14 @@ public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
IP2GEO_DATA_INDEX_NAME_PREFIX
);
}
return client.admin()
.indices()
.prepareDelete(index)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
return StashedThreadContext.run(
client,
() -> client.admin()
.indices()
.prepareDelete(index)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.shared;

import java.util.function.Supplier;

import org.opensearch.client.Client;
import org.opensearch.common.util.concurrent.ThreadContext;

/**
* Helper class to run code with stashed thread context
*
* Code need to be run with stashed thread context if it interacts with system index
* when security plugin is enabled.
*/
public class StashedThreadContext {
/**
* Set the thread context to default, this is needed to allow actions on model system index
* when security plugin is enabled
* @param function runnable that needs to be executed after thread context has been stashed, accepts and returns nothing
*/
public static void run(final Client client, final Runnable function) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
function.run();
}
}

/**
* Set the thread context to default, this is needed to allow actions on model system index
* when security plugin is enabled
* @param function supplier function that needs to be executed after thread context has been stashed, return object
*/
public static <T> T run(final Client client, final Supplier<T> function) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
return function.get();
}
}
}