diff --git a/CHANGELOG.md b/CHANGELOG.md index 71f195e372fe5..6418260814365 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Create NamedRoute to map extension routes to a shortened name ([#6870](https://github.com/opensearch-project/OpenSearch/pull/6870)) - Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665)) - SegRep with Remote: Add hook for publishing checkpoint notifications after segment upload to remote store ([#7394](https://github.com/opensearch-project/OpenSearch/pull/7394)) +- [Extensions] Add ExtensionAwarePlugin extension point to add custom settings for extensions ([#7526](https://github.com/opensearch-project/OpenSearch/pull/7526)) ### Dependencies - Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.3 (#7564) @@ -52,6 +53,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `io.projectreactor.netty:reactor-netty-core` from 1.0.24 to 1.1.7 ([#7724](https://github.com/opensearch-project/OpenSearch/pull/7724)) - Bump `io.projectreactor.netty:reactor-netty` from 1.1.4 to 1.1.7 ([#7724](https://github.com/opensearch-project/OpenSearch/pull/7724)) - Bump `io.projectreactor.netty:reactor-netty-http` from 1.1.4 to 1.1.7 ([#7724](https://github.com/opensearch-project/OpenSearch/pull/7724)) +- Bump `org.apache.maven:maven-model` from 3.9.1 to 3.9.2 (#7655) ### Changed - Enable `./gradlew build` on MacOS by disabling bcw tests ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303)) @@ -59,6 +61,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow access to indices cache clear APIs for read only indexes ([#7303](https://github.com/opensearch-project/OpenSearch/pull/7303)) - Default search preference to _primary for searchable snapshot indices ([#7628](https://github.com/opensearch-project/OpenSearch/pull/7628)) - [Segment Replication] Remove codec name string match check for checkpoints ([#7741](https://github.com/opensearch-project/OpenSearch/pull/7741)) +- Changed concurrent-search threadpool type to be resizable and support task resource tracking ([#7502](https://github.com/opensearch-project/OpenSearch/pull/7502)) ### Deprecated @@ -68,6 +71,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add more index blocks check for resize APIs ([#6774](https://github.com/opensearch-project/OpenSearch/pull/6774)) - Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230)) - Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541)) +- [Search Pipelines] Better exception handling in search pipelines ([#7735](https://github.com/opensearch-project/OpenSearch/pull/7735)) +- Fix input validation in segments and delete pit request ([#6645](https://github.com/opensearch-project/OpenSearch/pull/6645)) ### Security diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index cbc3ea630b21e..d900d9f4a84d6 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -117,7 +117,7 @@ dependencies { api 'de.thetaphi:forbiddenapis:3.5.1' api 'com.avast.gradle:gradle-docker-compose-plugin:0.16.11' api "org.yaml:snakeyaml:${props.getProperty('snakeyaml')}" - api 'org.apache.maven:maven-model:3.9.1' + api 'org.apache.maven:maven-model:3.9.2' api 'com.networknt:json-schema-validator:1.0.81' api "com.fasterxml.jackson.core:jackson-databind:${props.getProperty('jackson_databind')}" diff --git a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml index 730b10d87e6a8..d5fe91aab5378 100644 --- a/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml +++ b/modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/30_filter_query.yml @@ -122,3 +122,27 @@ teardown: index: test body: { } - match: { hits.total.value: 2 } +--- +"Test invalid inline query": + - do: + catch: bad_request + search: + index: test + body: { + search_pipeline: { + "request_processors": [ + { + "filter_query": { + "query": { + "woozlewuzzle": { + "field": "foo" + } + } + } + } + ] + } + } + - match: { status: 400 } + - match: { error.type: "parsing_exception"} + - match: { error.reason: "unknown query [woozlewuzzle]"} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/AbstractTasksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/AbstractTasksIT.java new file mode 100644 index 0000000000000..fcfe9cb0aab00 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/AbstractTasksIT.java @@ -0,0 +1,190 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.node.tasks; + +import org.opensearch.ExceptionsHelper; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.plugins.Plugin; +import org.opensearch.tasks.TaskId; +import org.opensearch.tasks.TaskInfo; +import org.opensearch.tasks.ThreadResourceInfo; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.tasks.MockTaskManager; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * Base IT test class for Tasks ITs + */ +abstract class AbstractTasksIT extends OpenSearchIntegTestCase { + + protected Map, RecordingTaskManagerListener> listeners = new HashMap<>(); + + @Override + protected Collection> getMockPlugins() { + Collection> mockPlugins = new ArrayList<>(super.getMockPlugins()); + mockPlugins.remove(MockTransportService.TestPlugin.class); + return mockPlugins; + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, TestTaskPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.getKey(), true) + .build(); + } + + @Override + public void tearDown() throws Exception { + for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { + ((MockTaskManager) internalCluster().getInstance(TransportService.class, entry.getKey().v1()).getTaskManager()).removeListener( + entry.getValue() + ); + } + listeners.clear(); + super.tearDown(); + } + + /** + * Registers recording task event listeners with the given action mask on all nodes + */ + protected void registerTaskManagerListeners(String actionMasks) { + for (String nodeName : internalCluster().getNodeNames()) { + DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); + RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node.getId(), actionMasks.split(",")); + ((MockTaskManager) internalCluster().getInstance(TransportService.class, nodeName).getTaskManager()).addListener(listener); + RecordingTaskManagerListener oldListener = listeners.put(new Tuple<>(node.getName(), actionMasks), listener); + assertNull(oldListener); + } + } + + /** + * Resets all recording task event listeners with the given action mask on all nodes + */ + protected void resetTaskManagerListeners(String actionMasks) { + for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { + if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) { + entry.getValue().reset(); + } + } + } + + /** + * Returns the number of events that satisfy the criteria across all nodes + * + * @param actionMasks action masks to match + * @return number of events that satisfy the criteria + */ + protected int numberOfEvents(String actionMasks, Function, Boolean> criteria) { + return findEvents(actionMasks, criteria).size(); + } + + /** + * Returns all events that satisfy the criteria across all nodes + * + * @param actionMasks action masks to match + * @return number of events that satisfy the criteria + */ + protected List findEvents(String actionMasks, Function, Boolean> criteria) { + List events = new ArrayList<>(); + for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { + if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) { + for (Tuple taskEvent : entry.getValue().getEvents()) { + if (criteria.apply(taskEvent)) { + events.add(taskEvent.v2()); + } + } + } + } + return events; + } + + protected Map> getThreadStats(String actionMasks, TaskId taskId) { + for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { + if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) { + for (Tuple>> threadStats : entry.getValue().getThreadStats()) { + if (taskId.equals(threadStats.v1())) { + return threadStats.v2(); + } + } + } + } + return new HashMap<>(); + } + + /** + * Asserts that all tasks in the tasks list have the same parentTask + */ + protected void assertParentTask(List tasks, TaskInfo parentTask) { + for (TaskInfo task : tasks) { + assertParentTask(task, parentTask); + } + } + + protected void assertParentTask(TaskInfo task, TaskInfo parentTask) { + assertTrue(task.getParentTaskId().isSet()); + assertEquals(parentTask.getTaskId().getNodeId(), task.getParentTaskId().getNodeId()); + assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId())); + assertEquals(parentTask.getId(), task.getParentTaskId().getId()); + } + + protected void expectNotFound(ThrowingRunnable r) { + Exception e = expectThrows(Exception.class, r); + ResourceNotFoundException notFound = (ResourceNotFoundException) ExceptionsHelper.unwrap(e, ResourceNotFoundException.class); + if (notFound == null) { + throw new AssertionError("Expected " + ResourceNotFoundException.class.getSimpleName(), e); + } + } + + /** + * Fetch the task status from the list tasks API using it's "fallback to get from the task index" behavior. Asserts some obvious stuff + * about the fetched task and returns a map of it's status. + */ + protected GetTaskResponse expectFinishedTask(TaskId taskId) throws IOException { + GetTaskResponse response = client().admin().cluster().prepareGetTask(taskId).get(); + assertTrue("the task should have been completed before fetching", response.getTask().isCompleted()); + TaskInfo info = response.getTask().getTask(); + assertEquals(taskId, info.getTaskId()); + assertNull(info.getStatus()); // The test task doesn't have any status + return response; + } + + protected void indexDocumentsWithRefresh(String indexName, int numDocs) { + for (int i = 0; i < numDocs; i++) { + client().prepareIndex(indexName) + .setId("test_id_" + String.valueOf(i)) + .setSource("{\"foo_" + String.valueOf(i) + "\": \"bar_" + String.valueOf(i) + "\"}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/ConcurrentSearchTasksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/ConcurrentSearchTasksIT.java new file mode 100644 index 0000000000000..2b2421072e03b --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/ConcurrentSearchTasksIT.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.node.tasks; + +import org.hamcrest.MatcherAssert; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.search.SearchAction; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.FeatureFlagSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.tasks.TaskInfo; +import org.opensearch.tasks.ThreadResourceInfo; + +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; + +/** + * Integration tests for task management API with Concurrent Segment Search + * + * The way the test framework bootstraps the test cluster makes it difficult to parameterize the feature flag. + * Once concurrent search is moved behind a cluster setting we can parameterize these tests behind the setting. + */ +public class ConcurrentSearchTasksIT extends AbstractTasksIT { + + private static final int INDEX_SEARCHER_THREADS = 10; + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("thread_pool.index_searcher.size", INDEX_SEARCHER_THREADS) + .put("thread_pool.index_searcher.queue_size", INDEX_SEARCHER_THREADS) + .build(); + } + + private int getSegmentCount(String indexName) { + return client().admin() + .indices() + .segments(new IndicesSegmentsRequest(indexName)) + .actionGet() + .getIndices() + .get(indexName) + .getShards() + .get(0) + .getShards()[0].getSegments() + .size(); + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) { + featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY)); + } + featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, true); + return featureSettings.build(); + } + + /** + * Tests the number of threads that worked on a search task. + * + * Currently, we try to control concurrency by creating an index with 7 segments and rely on + * the way concurrent search creates leaf slices from segments. Once more concurrency controls are introduced + * we should improve this test to use those methods. + */ + public void testConcurrentSearchTaskTracking() { + final String INDEX_NAME = "test"; + final int NUM_SHARDS = 1; + final int NUM_DOCS = 7; + + registerTaskManagerListeners(SearchAction.NAME); // coordinator task + registerTaskManagerListeners(SearchAction.NAME + "[*]"); // shard task + createIndex( + INDEX_NAME, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build() + ); + ensureGreen(INDEX_NAME); // Make sure all shards are allocated to catch replication tasks + indexDocumentsWithRefresh(INDEX_NAME, NUM_DOCS); // Concurrent search requires >5 segments or >250,000 docs to have concurrency, so + // we index 7 docs flushing between each to create new segments + assertSearchResponse(client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get()); + + // the search operation should produce one coordinator task + List mainTask = findEvents(SearchAction.NAME, Tuple::v1); + assertEquals(1, mainTask.size()); + TaskInfo mainTaskInfo = mainTask.get(0); + + List shardTasks = findEvents(SearchAction.NAME + "[*]", Tuple::v1); + assertEquals(NUM_SHARDS, shardTasks.size()); // We should only have 1 shard search task per shard + for (TaskInfo taskInfo : shardTasks) { + MatcherAssert.assertThat(taskInfo.getParentTaskId(), notNullValue()); + assertEquals(mainTaskInfo.getTaskId(), taskInfo.getParentTaskId()); + + Map> threadStats = getThreadStats(SearchAction.NAME + "[*]", taskInfo.getTaskId()); + // Concurrent search forks each slice of 5 segments to different thread + assertEquals((int) Math.ceil(getSegmentCount(INDEX_NAME) / 5.0), threadStats.size()); + + // assert that all task descriptions have non-zero length + MatcherAssert.assertThat(taskInfo.getDescription().length(), greaterThan(0)); + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java index 533d22a01c4ad..67e52529ae86b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java @@ -32,10 +32,9 @@ package org.opensearch.action.admin.cluster.node.tasks; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; -import org.opensearch.ExceptionsHelper; -import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionFuture; import org.opensearch.action.ActionListener; import org.opensearch.action.TaskOperationFailure; @@ -57,15 +56,11 @@ import org.opensearch.action.support.WriteRequest; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.action.support.replication.TransportReplicationActionTests; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.Strings; import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.query.QueryBuilders; -import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; @@ -75,14 +70,9 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.tasks.MockTaskManager; import org.opensearch.test.tasks.MockTaskManagerListener; -import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.ReceiveTimeoutTransportException; import org.opensearch.transport.TransportService; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -96,12 +86,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singleton; -import static org.opensearch.common.unit.TimeValue.timeValueMillis; -import static org.opensearch.common.unit.TimeValue.timeValueSeconds; -import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFutureThrows; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -113,6 +97,12 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; +import static org.opensearch.common.unit.TimeValue.timeValueMillis; +import static org.opensearch.common.unit.TimeValue.timeValueSeconds; +import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFutureThrows; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; /** * Integration tests for task management API @@ -120,29 +110,7 @@ * We need at least 2 nodes so we have a cluster-manager node a non-cluster-manager node */ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2) -public class TasksIT extends OpenSearchIntegTestCase { - - private Map, RecordingTaskManagerListener> listeners = new HashMap<>(); - - @Override - protected Collection> getMockPlugins() { - Collection> mockPlugins = new ArrayList<>(super.getMockPlugins()); - mockPlugins.remove(MockTransportService.TestPlugin.class); - return mockPlugins; - } - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class, TestTaskPlugin.class); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.getKey(), true) - .build(); - } +public class TasksIT extends AbstractTasksIT { public void testTaskCounts() { // Run only on data nodes @@ -951,106 +919,4 @@ public void onFailure(Exception e) { assertNotNull(response.getTask().getError()); assertNull(response.getTask().getResponse()); } - - @Override - public void tearDown() throws Exception { - for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { - ((MockTaskManager) internalCluster().getInstance(TransportService.class, entry.getKey().v1()).getTaskManager()).removeListener( - entry.getValue() - ); - } - listeners.clear(); - super.tearDown(); - } - - /** - * Registers recording task event listeners with the given action mask on all nodes - */ - private void registerTaskManagerListeners(String actionMasks) { - for (String nodeName : internalCluster().getNodeNames()) { - DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode(); - RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node.getId(), actionMasks.split(",")); - ((MockTaskManager) internalCluster().getInstance(TransportService.class, nodeName).getTaskManager()).addListener(listener); - RecordingTaskManagerListener oldListener = listeners.put(new Tuple<>(node.getName(), actionMasks), listener); - assertNull(oldListener); - } - } - - /** - * Resets all recording task event listeners with the given action mask on all nodes - */ - private void resetTaskManagerListeners(String actionMasks) { - for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { - if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) { - entry.getValue().reset(); - } - } - } - - /** - * Returns the number of events that satisfy the criteria across all nodes - * - * @param actionMasks action masks to match - * @return number of events that satisfy the criteria - */ - private int numberOfEvents(String actionMasks, Function, Boolean> criteria) { - return findEvents(actionMasks, criteria).size(); - } - - /** - * Returns all events that satisfy the criteria across all nodes - * - * @param actionMasks action masks to match - * @return number of events that satisfy the criteria - */ - private List findEvents(String actionMasks, Function, Boolean> criteria) { - List events = new ArrayList<>(); - for (Map.Entry, RecordingTaskManagerListener> entry : listeners.entrySet()) { - if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) { - for (Tuple taskEvent : entry.getValue().getEvents()) { - if (criteria.apply(taskEvent)) { - events.add(taskEvent.v2()); - } - } - } - } - return events; - } - - /** - * Asserts that all tasks in the tasks list have the same parentTask - */ - private void assertParentTask(List tasks, TaskInfo parentTask) { - for (TaskInfo task : tasks) { - assertParentTask(task, parentTask); - } - } - - private void assertParentTask(TaskInfo task, TaskInfo parentTask) { - assertTrue(task.getParentTaskId().isSet()); - assertEquals(parentTask.getTaskId().getNodeId(), task.getParentTaskId().getNodeId()); - assertTrue(Strings.hasLength(task.getParentTaskId().getNodeId())); - assertEquals(parentTask.getId(), task.getParentTaskId().getId()); - } - - private void expectNotFound(ThrowingRunnable r) { - Exception e = expectThrows(Exception.class, r); - ResourceNotFoundException notFound = (ResourceNotFoundException) ExceptionsHelper.unwrap(e, ResourceNotFoundException.class); - if (notFound == null) { - throw new AssertionError("Expected " + ResourceNotFoundException.class.getSimpleName(), e); - } - } - - /** - * Fetch the task status from the list tasks API using it's "fallback to get from the task index" behavior. Asserts some obvious stuff - * about the fetched task and returns a map of it's status. - */ - private GetTaskResponse expectFinishedTask(TaskId taskId) throws IOException { - GetTaskResponse response = client().admin().cluster().prepareGetTask(taskId).get(); - assertTrue("the task should have been completed before fetching", response.getTask().isCompleted()); - TaskInfo info = response.getTask().getTask(); - assertEquals(taskId, info.getTaskId()); - assertNull(info.getStatus()); // The test task doesn't have any status - return response; - } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index 1e17b71860f39..ca379234dcee7 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -44,8 +44,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.opensearch.action.search.SearchContextId.decode; @@ -94,8 +96,7 @@ public TransportPitSegmentsAction( */ @Override protected void doExecute(Task task, PitSegmentsRequest request, ActionListener listener) { - List pitIds = request.getPitIds(); - if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + if (request.getPitIds().size() == 1 && "_all".equals(request.getPitIds().get(0))) { pitService.getAllPits(ActionListener.wrap(response -> { request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); super.doExecute(task, request, listener); @@ -114,7 +115,9 @@ protected void doExecute(Task task, PitSegmentsRequest request, ActionListener iterators = new ArrayList<>(); - for (String pitId : request.getPitIds()) { + // remove duplicates from the request + Set uniquePitIds = new LinkedHashSet<>(request.getPitIds()); + for (String pitId : uniquePitIds) { SearchContextId searchContext = decode(namedWriteableRegistry, pitId); for (Map.Entry entry : searchContext.shards().entrySet()) { final SearchContextIdForNode perNode = entry.getValue(); diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index b85fe302a748f..217fcc1489df7 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -18,8 +18,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -46,8 +48,7 @@ public TransportDeletePitAction( */ @Override protected void doExecute(Task task, DeletePitRequest request, ActionListener listener) { - List pitIds = request.getPitIds(); - if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + if (request.getPitIds().size() == 1 && "_all".equals(request.getPitIds().get(0))) { deleteAllPits(listener); } else { deletePits(listener, request); @@ -59,7 +60,9 @@ protected void doExecute(Task task, DeletePitRequest request, ActionListener listener, DeletePitRequest request) { Map> nodeToContextsMap = new HashMap<>(); - for (String pitId : request.getPitIds()) { + // remove duplicates from the request + Set uniquePitIds = new LinkedHashSet<>(request.getPitIds()); + for (String pitId : uniquePitIds) { SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, pitId); for (SearchContextIdForNode contextIdForNode : contextId.shards().values()) { PitSearchContextIdForNode pitSearchContext = new PitSearchContextIdForNode(pitId, contextIdForNode); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index df75d7c799a6a..030c249b3c726 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -401,7 +401,7 @@ private void executeRequest( ); } catch (Exception e) { originalListener.onFailure(e); - throw new RuntimeException(e); + return; } ActionListener rewriteListener = ActionListener.wrap(source -> { diff --git a/server/src/main/java/org/opensearch/common/settings/Setting.java b/server/src/main/java/org/opensearch/common/settings/Setting.java index a0cdf35ee0ad2..32a6686398a2a 100644 --- a/server/src/main/java/org/opensearch/common/settings/Setting.java +++ b/server/src/main/java/org/opensearch/common/settings/Setting.java @@ -132,6 +132,11 @@ public enum Property { */ Deprecated, + /** + * Extension scope + */ + ExtensionScope, + /** * Node scope */ diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionScopedSettings.java b/server/src/main/java/org/opensearch/extensions/ExtensionScopedSettings.java new file mode 100644 index 0000000000000..0c87ce31df737 --- /dev/null +++ b/server/src/main/java/org/opensearch/extensions/ExtensionScopedSettings.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.extensions; + +import org.opensearch.common.settings.AbstractScopedSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.SettingUpgrader; +import org.opensearch.common.settings.Settings; + +import java.util.Collections; +import java.util.Set; + +/** + * Encapsulates all valid extension level settings. + * + * @opensearch.internal + */ +public final class ExtensionScopedSettings extends AbstractScopedSettings { + + public ExtensionScopedSettings(final Set> settingsSet) { + this(settingsSet, Collections.emptySet()); + } + + public ExtensionScopedSettings(final Set> settingsSet, final Set> settingUpgraders) { + super(Settings.EMPTY, settingsSet, settingUpgraders, Property.ExtensionScope); + } +} diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 320310fc33f13..a172fc7980f30 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -14,11 +14,14 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; @@ -37,6 +40,7 @@ import org.opensearch.cluster.ClusterSettingsResponse; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.FileSystemUtils; +import org.opensearch.common.settings.Setting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsModule; @@ -105,6 +109,7 @@ public static enum OpenSearchRequestType { private CustomSettingsRequestHandler customSettingsRequestHandler; private TransportService transportService; private ClusterService clusterService; + private final Set> additionalSettings; private Settings environmentSettings; private AddSettingsUpdateConsumerRequestHandler addSettingsUpdateConsumerRequestHandler; private NodeClient client; @@ -113,9 +118,10 @@ public static enum OpenSearchRequestType { * Instantiate a new ExtensionsManager object to handle requests and responses from extensions. This is called during Node bootstrap. * * @param extensionsPath Path to a directory containing extensions. + * @param additionalSettings Additional settings to read in from extensions.yml * @throws IOException If the extensions discovery file is not properly retrieved. */ - public ExtensionsManager(Path extensionsPath) throws IOException { + public ExtensionsManager(Path extensionsPath, Set> additionalSettings) throws IOException { logger.info("ExtensionsManager initialized"); this.extensionsPath = extensionsPath; this.initializedExtensions = new HashMap(); @@ -124,6 +130,11 @@ public ExtensionsManager(Path extensionsPath) throws IOException { // will be initialized in initializeServicesAndRestHandler which is called after the Node is initialized this.transportService = null; this.clusterService = null; + // Settings added to extensions.yml by ExtensionAwarePlugins, such as security settings + this.additionalSettings = new HashSet<>(); + if (additionalSettings != null) { + this.additionalSettings.addAll(additionalSettings); + } this.client = null; this.extensionTransportActionsHandler = null; @@ -465,35 +476,68 @@ private ExtensionsSettings readFromExtensionsYml(Path filePath) throws IOExcepti } List> unreadExtensions = new ArrayList<>((Collection>) obj.get("extensions")); List readExtensions = new ArrayList(); + Set additionalSettingsKeys = additionalSettings.stream().map(s -> s.getKey()).collect(Collectors.toSet()); for (HashMap extensionMap : unreadExtensions) { - // Parse extension dependencies - List extensionDependencyList = new ArrayList(); - if (extensionMap.get("dependencies") != null) { - List> extensionDependencies = new ArrayList<>( - (Collection>) extensionMap.get("dependencies") - ); - for (HashMap dependency : extensionDependencies) { - extensionDependencyList.add( - new ExtensionDependency( - dependency.get("uniqueId").toString(), - Version.fromString(dependency.get("version").toString()) - ) + try { + // checking to see whether any required fields are missing from extension.yml file or not + String[] requiredFields = { + "name", + "uniqueId", + "hostAddress", + "port", + "version", + "opensearchVersion", + "minimumCompatibleVersion" }; + List missingFields = Arrays.stream(requiredFields) + .filter(field -> !extensionMap.containsKey(field)) + .collect(Collectors.toList()); + if (!missingFields.isEmpty()) { + throw new IOException("Extension is missing these required fields : " + missingFields); + } + + // Parse extension dependencies + List extensionDependencyList = new ArrayList(); + if (extensionMap.get("dependencies") != null) { + List> extensionDependencies = new ArrayList<>( + (Collection>) extensionMap.get("dependencies") ); + for (HashMap dependency : extensionDependencies) { + extensionDependencyList.add( + new ExtensionDependency( + dependency.get("uniqueId").toString(), + Version.fromString(dependency.get("version").toString()) + ) + ); + } } + + ExtensionScopedSettings extAdditionalSettings = new ExtensionScopedSettings(additionalSettings); + Map additionalSettingsMap = extensionMap.entrySet() + .stream() + .filter(kv -> additionalSettingsKeys.contains(kv.getKey())) + .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); + + Settings.Builder output = Settings.builder(); + output.loadFromMap(additionalSettingsMap); + extAdditionalSettings.applySettings(output.build()); + + // Create extension read from yml config + readExtensions.add( + new Extension( + extensionMap.get("name").toString(), + extensionMap.get("uniqueId").toString(), + extensionMap.get("hostAddress").toString(), + extensionMap.get("port").toString(), + extensionMap.get("version").toString(), + extensionMap.get("opensearchVersion").toString(), + extensionMap.get("minimumCompatibleVersion").toString(), + extensionDependencyList, + extAdditionalSettings + ) + ); + } catch (IOException e) { + logger.warn("loading extension has been failed because of exception : " + e.getMessage()); } - // Create extension read from yml config - readExtensions.add( - new Extension( - extensionMap.get("name").toString(), - extensionMap.get("uniqueId").toString(), - extensionMap.get("hostAddress").toString(), - extensionMap.get("port").toString(), - extensionMap.get("version").toString(), - extensionMap.get("opensearchVersion").toString(), - extensionMap.get("minimumCompatibleVersion").toString(), - extensionDependencyList - ) - ); } inputStream.close(); return new ExtensionsSettings(readExtensions); diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java index fd11aec973d42..9d21469c8fa28 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsSettings.java @@ -44,6 +44,7 @@ public static class Extension { private String opensearchVersion; private String minimumCompatibleVersion; private List dependencies = Collections.emptyList(); + private ExtensionScopedSettings additionalSettings; public Extension( String name, @@ -53,7 +54,8 @@ public Extension( String version, String opensearchVersion, String minimumCompatibleVersion, - List dependencies + List dependencies, + ExtensionScopedSettings additionalSettings ) { this.name = name; this.uniqueId = uniqueId; @@ -63,6 +65,7 @@ public Extension( this.opensearchVersion = opensearchVersion; this.minimumCompatibleVersion = minimumCompatibleVersion; this.dependencies = dependencies; + this.additionalSettings = additionalSettings; } public Extension() { @@ -127,6 +130,10 @@ public List getDependencies() { return dependencies; } + public ExtensionScopedSettings getAdditionalSettings() { + return additionalSettings; + } + public String getMinimumCompatibleVersion() { return minimumCompatibleVersion; } diff --git a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java index eb9b389b7a4b1..fb7160bc1bc67 100644 --- a/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/NoopExtensionsManager.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Optional; +import java.util.Set; import org.opensearch.action.ActionModule; import org.opensearch.client.node.NodeClient; @@ -31,7 +32,7 @@ public class NoopExtensionsManager extends ExtensionsManager { public NoopExtensionsManager() throws IOException { - super(Path.of("")); + super(Path.of(""), Set.of()); } @Override diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ba04388505881..2651da0627ae0 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -56,6 +56,7 @@ import org.opensearch.extensions.NoopExtensionsManager; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; +import org.opensearch.plugins.ExtensionAwarePlugin; import org.opensearch.plugins.SearchPipelinePlugin; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; @@ -236,6 +237,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -468,7 +470,12 @@ protected Node( final IdentityService identityService = new IdentityService(settings, identityPlugins); if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { - this.extensionsManager = new ExtensionsManager(initialEnvironment.extensionDir()); + final List extensionAwarePlugins = pluginsService.filterPlugins(ExtensionAwarePlugin.class); + Set> additionalSettings = new HashSet<>(); + for (ExtensionAwarePlugin extAwarePlugin : extensionAwarePlugins) { + additionalSettings.addAll(extAwarePlugin.getExtensionSettings()); + } + this.extensionsManager = new ExtensionsManager(initialEnvironment.extensionDir(), additionalSettings); } else { this.extensionsManager = new NoopExtensionsManager(); } diff --git a/server/src/main/java/org/opensearch/plugins/ExtensionAwarePlugin.java b/server/src/main/java/org/opensearch/plugins/ExtensionAwarePlugin.java new file mode 100644 index 0000000000000..c8426bc964287 --- /dev/null +++ b/server/src/main/java/org/opensearch/plugins/ExtensionAwarePlugin.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugins; + +import org.opensearch.common.settings.Setting; + +import java.util.Collections; +import java.util.List; + +/** + * Plugin that provides extra settings for extensions + * + * @opensearch.experimental + */ +public interface ExtensionAwarePlugin { + + /** + * Returns a list of additional {@link Setting} definitions that this plugin adds for extensions + */ + default List> getExtensionSettings() { + return Collections.emptyList(); + } +} diff --git a/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java b/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java index 4cb15f4ab3cbe..511b5595c5328 100644 --- a/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IdentityPlugin.java @@ -22,5 +22,5 @@ public interface IdentityPlugin { * * Should never return null * */ - public Subject getSubject(); + Subject getSubject(); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java index a486e636cbb7d..87c09bd971284 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchPipelineService.java @@ -400,8 +400,12 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) throws Exce pipeline = pipelineHolder.pipeline; } } - SearchRequest transformedRequest = pipeline.transformRequest(searchRequest); - return new PipelinedRequest(pipeline, transformedRequest); + try { + SearchRequest transformedRequest = pipeline.transformRequest(searchRequest); + return new PipelinedRequest(pipeline, transformedRequest); + } catch (Exception e) { + throw new SearchPipelineProcessingException(e); + } } Map> getRequestProcessorFactories() { diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 987f38e8dd8fd..e3e34378746b9 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -182,7 +182,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING); map.put(Names.REMOTE_REFRESH, ThreadPoolType.SCALING); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { - map.put(Names.INDEX_SEARCHER, ThreadPoolType.FIXED); + map.put(Names.INDEX_SEARCHER, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); } THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -279,7 +279,19 @@ public ThreadPool( new ScalingExecutorBuilder(Names.REMOTE_REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { - builders.put(Names.INDEX_SEARCHER, new FixedExecutorBuilder(settings, Names.INDEX_SEARCHER, allocatedProcessors, 1000, false)); + builders.put( + Names.INDEX_SEARCHER, + new AutoQueueAdjustingExecutorBuilder( + settings, + Names.INDEX_SEARCHER, + allocatedProcessors, + 1000, + 1000, + 1000, + 2000, + runnableTaskListener + ) + ); } for (final ExecutorBuilder builder : customBuilders) { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/RecordingTaskManagerListener.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/RecordingTaskManagerListener.java index 9bd44185baf24..768a6c73af380 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/RecordingTaskManagerListener.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/RecordingTaskManagerListener.java @@ -35,12 +35,15 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskId; import org.opensearch.tasks.TaskInfo; +import org.opensearch.tasks.ThreadResourceInfo; import org.opensearch.test.tasks.MockTaskManagerListener; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** @@ -52,6 +55,7 @@ public class RecordingTaskManagerListener implements MockTaskManagerListener { private String localNodeId; private List> events = new ArrayList<>(); + private List>>> threadStats = new ArrayList<>(); public RecordingTaskManagerListener(String localNodeId, String... actionMasks) { this.actionMasks = actionMasks; @@ -68,7 +72,9 @@ public synchronized void onTaskRegistered(Task task) { @Override public synchronized void onTaskUnregistered(Task task) { if (Regex.simpleMatch(actionMasks, task.getAction())) { - events.add(new Tuple<>(false, task.taskInfo(localNodeId, true))); + TaskInfo taskInfo = task.taskInfo(localNodeId, true); + events.add(new Tuple<>(false, taskInfo)); + threadStats.add(new Tuple<>(taskInfo.getTaskId(), task.getResourceStats())); } } @@ -82,6 +88,10 @@ public synchronized List> getEvents() { return Collections.unmodifiableList(new ArrayList<>(events)); } + public synchronized List>>> getThreadStats() { + return List.copyOf(threadStats); + } + public synchronized List getRegistrationEvents() { List events = this.events.stream().filter(Tuple::v1).map(Tuple::v2).collect(Collectors.toList()); return Collections.unmodifiableList(events); diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index c9d49a9c3e015..465956f2c9c57 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -70,6 +71,7 @@ import org.opensearch.extensions.settings.RegisterCustomSettingsRequest; import org.opensearch.identity.IdentityService; import org.opensearch.indices.breaker.NoneCircuitBreakerService; +import org.opensearch.plugins.ExtensionAwarePlugin; import org.opensearch.rest.RestController; import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.MockLogAppender; @@ -91,6 +93,8 @@ public class ExtensionsManagerTests extends OpenSearchTestCase { private RestController restController; private SettingsModule settingsModule; private ClusterService clusterService; + private ExtensionAwarePlugin extAwarePlugin; + private Setting customSetting = Setting.simpleString("custom_extension_setting", "none", Property.ExtensionScope); private NodeClient client; private MockNioTransport transport; private Path extensionDir; @@ -108,16 +112,17 @@ public class ExtensionsManagerTests extends OpenSearchTestCase { " version: '0.0.7'", " opensearchVersion: '" + Version.CURRENT.toString() + "'", " minimumCompatibleVersion: '" + Version.CURRENT.toString() + "'", + " custom_extension_setting: 'custom_setting'", " - name: secondExtension", " uniqueId: 'uniqueid2'", " hostAddress: '127.0.0.1'", " port: '9301'", " version: '3.14.16'", - " opensearchVersion: '" + Version.CURRENT.toString() + "'", - " minimumCompatibleVersion: '" + Version.CURRENT.toString() + "'", + " opensearchVersion: '1.0.0'", + " minimumCompatibleVersion: '1.0.0'", " dependencies:", " - uniqueId: 'uniqueid0'", - " version: '2.0.0'" + " version: '1.0.0'" ); private DiscoveryExtensionNode extensionNode; @@ -152,6 +157,15 @@ public void setup() throws Exception { Collections.emptySet() ); actionModule = mock(ActionModule.class); + extAwarePlugin = new ExtensionAwarePlugin() { + + @Override + public List> getExtensionSettings() { + List> settings = new ArrayList>(); + settings.add(customSetting); + return settings; + } + }; dynamicActionRegistry = mock(DynamicActionRegistry.class); restController = new RestController( emptySet(), @@ -173,7 +187,7 @@ public void setup() throws Exception { "uniqueid1", new TransportAddress(InetAddress.getByName("127.0.0.0"), 9300), new HashMap(), - Version.fromString("3.0.0"), + Version.CURRENT, Version.CURRENT, Collections.emptyList() ); @@ -192,12 +206,12 @@ public void tearDown() throws Exception { public void testDiscover() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); List expectedExtensions = new ArrayList(); String expectedUniqueId = "uniqueid0"; - Version expectedVersion = Version.fromString("2.0.0"); + Version expectedVersion = Version.fromString("1.0.0"); ExtensionDependency expectedDependency = new ExtensionDependency(expectedUniqueId, expectedVersion); expectedExtensions.add( @@ -218,8 +232,8 @@ public void testDiscover() throws Exception { "uniqueid2", new TransportAddress(InetAddress.getByName("127.0.0.1"), 9301), new HashMap(), - Version.CURRENT, - Version.CURRENT, + Version.fromString("1.0.0"), + Version.fromString("1.0.0"), List.of(expectedDependency) ) ); @@ -245,7 +259,59 @@ public void testNonUniqueExtensionsDiscovery() throws Exception { .collect(Collectors.toList()); Files.write(emptyExtensionDir.resolve("extensions.yml"), nonUniqueYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(emptyExtensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(emptyExtensionDir, Set.of()); + + List expectedExtensions = new ArrayList(); + + expectedExtensions.add( + new DiscoveryExtensionNode( + "firstExtension", + "uniqueid1", + new TransportAddress(InetAddress.getByName("127.0.0.0"), 9300), + new HashMap(), + Version.CURRENT, + Version.CURRENT, + Collections.emptyList() + ) + ); + assertEquals(expectedExtensions.size(), extensionsManager.getExtensionIdMap().values().size()); + for (DiscoveryExtensionNode extension : expectedExtensions) { + DiscoveryExtensionNode initializedExtension = extensionsManager.getExtensionIdMap().get(extension.getId()); + assertEquals(extension.getName(), initializedExtension.getName()); + assertEquals(extension.getId(), initializedExtension.getId()); + assertEquals(extension.getAddress(), initializedExtension.getAddress()); + assertEquals(extension.getAttributes(), initializedExtension.getAttributes()); + assertEquals(extension.getVersion(), initializedExtension.getVersion()); + assertEquals(extension.getMinimumCompatibleVersion(), initializedExtension.getMinimumCompatibleVersion()); + assertEquals(extension.getDependencies(), initializedExtension.getDependencies()); + } + assertTrue(expectedExtensions.containsAll(emptyList())); + assertTrue(expectedExtensions.containsAll(emptyList())); + } + + public void testMissingRequiredFieldsInExtensionDiscovery() throws Exception { + Path emptyExtensionDir = createTempDir(); + ExtensionsManager extensionsManager; + List requiredFieldMissingYmlLines = extensionsYmlLines.stream() + .map(s -> s.replace(" minimumCompatibleVersion: '1.0.0'", "")) + .collect(Collectors.toList()); + Files.write(emptyExtensionDir.resolve("extensions.yml"), requiredFieldMissingYmlLines, StandardCharsets.UTF_8); + + try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ExtensionsManager.class))) { + + mockLogAppender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "Required field is missing in extensions.yml", + "org.opensearch.extensions.ExtensionsManager", + Level.WARN, + "loading extension has been failed because of exception : Extension is missing these required fields : [minimumCompatibleVersion]" + ) + ); + + extensionsManager = new ExtensionsManager(emptyExtensionDir, Set.of()); + + mockLogAppender.assertAllExpectationsMatched(); + } List expectedExtensions = new ArrayList(); @@ -305,7 +371,7 @@ public void testDiscoveryExtension() throws Exception { public void testExtensionDependency() throws Exception { String expectedUniqueId = "Test uniqueId"; - Version expectedVersion = Version.fromString("3.0.0"); + Version expectedVersion = Version.CURRENT; ExtensionDependency dependency = new ExtensionDependency(expectedUniqueId, expectedVersion); @@ -327,7 +393,7 @@ public void testNonAccessibleDirectory() throws Exception { AccessControlException e = expectThrows( AccessControlException.class, - () -> new ExtensionsManager(PathUtils.get("")) + () -> new ExtensionsManager(PathUtils.get(""), Set.of()) ); assertEquals("access denied (\"java.io.FilePermission\" \"\" \"read\")", e.getMessage()); } @@ -346,7 +412,7 @@ public void testNoExtensionsFile() throws Exception { ) ); - new ExtensionsManager(extensionDir); + new ExtensionsManager(extensionDir, Set.of()); mockLogAppender.assertAllExpectationsMatched(); } @@ -360,12 +426,12 @@ public void testEmptyExtensionsFile() throws Exception { Settings settings = Settings.builder().build(); - expectThrows(IOException.class, () -> new ExtensionsManager(emptyExtensionDir)); + expectThrows(IOException.class, () -> new ExtensionsManager(emptyExtensionDir, Set.of())); } public void testInitialize() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); @@ -408,7 +474,7 @@ public void testInitialize() throws Exception { public void testHandleRegisterRestActionsRequest() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -423,7 +489,7 @@ public void testHandleRegisterRestActionsRequest() throws Exception { public void testHandleRegisterSettingsRequest() throws Exception { Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -439,7 +505,7 @@ public void testHandleRegisterSettingsRequest() throws Exception { } public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -454,7 +520,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidMethod() throws Excep } public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedMethod() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; @@ -469,7 +535,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedMethod() th } public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET", "PUT /bar", "POST /baz"); @@ -483,7 +549,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidUri() throws Exceptio } public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedUri() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); String uniqueIdStr = "uniqueid1"; List actionsList = List.of("GET /foo", "PUT /bar", "POST /baz"); @@ -497,7 +563,7 @@ public void testHandleRegisterRestActionsRequestWithInvalidDeprecatedUri() throw } public void testHandleExtensionRequest() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); ExtensionRequest clusterStateRequest = new ExtensionRequest(ExtensionRequestProto.RequestType.REQUEST_EXTENSION_CLUSTER_STATE); @@ -653,7 +719,7 @@ public void testEnvironmentSettingsDefaultValue() throws Exception { public void testAddSettingsUpdateConsumerRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); List> componentSettings = List.of( @@ -700,7 +766,7 @@ public void testHandleAddSettingsUpdateConsumerRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); List> componentSettings = List.of( @@ -722,7 +788,7 @@ public void testHandleAddSettingsUpdateConsumerRequest() throws Exception { public void testUpdateSettingsRequest() throws Exception { Path extensionDir = createTempDir(); Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); initialize(extensionsManager); Setting componentSetting = Setting.boolSetting("falseSetting", false, Property.Dynamic); @@ -751,7 +817,7 @@ public void testUpdateSettingsRequest() throws Exception { public void testRegisterHandler() throws Exception { - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); TransportService mockTransportService = spy( new TransportService( @@ -785,7 +851,7 @@ public void testIncompatibleExtensionRegistration() throws IOException, IllegalA "Could not load extension with uniqueId", "org.opensearch.extensions.ExtensionsManager", Level.ERROR, - "Could not load extension with uniqueId uniqueid1 due to OpenSearchException[Extension minimumCompatibleVersion: 3.99.0 is greater than current" + "Could not load extension with uniqueId uniqueid1 due to OpenSearchException[Extension minimumCompatibleVersion: 3.0.0 is greater than current" ) ); @@ -796,17 +862,69 @@ public void testIncompatibleExtensionRegistration() throws IOException, IllegalA " hostAddress: '127.0.0.0'", " port: '9300'", " version: '0.0.7'", - " opensearchVersion: '3.0.0'", - " minimumCompatibleVersion: '3.99.0'" + " opensearchVersion: '" + Version.CURRENT.toString() + "'", + " minimumCompatibleVersion: '3.0.0'" ); Files.write(extensionDir.resolve("extensions.yml"), incompatibleExtension, StandardCharsets.UTF_8); - ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir); + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, Set.of()); assertEquals(0, extensionsManager.getExtensionIdMap().values().size()); mockLogAppender.assertAllExpectationsMatched(); } } + public void testAdditionalExtensionSettingsForExtensionWithCustomSettingSet() throws Exception { + Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + + Set> additionalSettings = extAwarePlugin.getExtensionSettings().stream().collect(Collectors.toSet()); + + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, additionalSettings); + + DiscoveryExtensionNode extension = new DiscoveryExtensionNode( + "firstExtension", + "uniqueid1", + new TransportAddress(InetAddress.getByName("127.0.0.1"), 9300), + new HashMap(), + Version.CURRENT, + Version.CURRENT, + List.of() + ); + DiscoveryExtensionNode initializedExtension = extensionsManager.getExtensionIdMap().get(extension.getId()); + assertEquals(extension.getName(), initializedExtension.getName()); + assertEquals(extension.getId(), initializedExtension.getId()); + assertTrue(extensionsManager.lookupExtensionSettingsById(extension.getId()).isPresent()); + assertEquals( + "custom_setting", + extensionsManager.lookupExtensionSettingsById(extension.getId()).get().getAdditionalSettings().get(customSetting) + ); + } + + public void testAdditionalExtensionSettingsForExtensionWithoutCustomSettingSet() throws Exception { + Files.write(extensionDir.resolve("extensions.yml"), extensionsYmlLines, StandardCharsets.UTF_8); + + Set> additionalSettings = extAwarePlugin.getExtensionSettings().stream().collect(Collectors.toSet()); + + ExtensionsManager extensionsManager = new ExtensionsManager(extensionDir, additionalSettings); + + DiscoveryExtensionNode extension = new DiscoveryExtensionNode( + "secondExtension", + "uniqueid2", + new TransportAddress(InetAddress.getByName("127.0.0.1"), 9301), + new HashMap(), + Version.fromString("2.0.0"), + Version.fromString("2.0.0"), + List.of() + ); + DiscoveryExtensionNode initializedExtension = extensionsManager.getExtensionIdMap().get(extension.getId()); + assertEquals(extension.getName(), initializedExtension.getName()); + assertEquals(extension.getId(), initializedExtension.getId()); + assertTrue(extensionsManager.lookupExtensionSettingsById(extension.getId()).isPresent()); + assertEquals( + "none", + extensionsManager.lookupExtensionSettingsById(extension.getId()).get().getAdditionalSettings().get(customSetting) + ); + } + private void initialize(ExtensionsManager extensionsManager) { transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 516227e9a13d8..d49d9fd41031c 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -680,12 +680,12 @@ public void testInlinePipeline() throws Exception { requestProcessorConfig.put("scale", 2); Map requestProcessorObject = new HashMap<>(); requestProcessorObject.put("scale_request_size", requestProcessorConfig); - pipelineSourceMap.put("request_processors", List.of(requestProcessorObject)); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(requestProcessorObject)); Map responseProcessorConfig = new HashMap<>(); responseProcessorConfig.put("score", 2); Map responseProcessorObject = new HashMap<>(); responseProcessorObject.put("fixed_score", responseProcessorConfig); - pipelineSourceMap.put("response_processors", List.of(responseProcessorObject)); + pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(responseProcessorObject)); SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); @@ -723,4 +723,67 @@ public void testInfo() { assertTrue(info.containsProcessor(Pipeline.REQUEST_PROCESSORS_KEY, "scale_request_size")); assertTrue(info.containsProcessor(Pipeline.RESPONSE_PROCESSORS_KEY, "fixed_score")); } + + public void testExceptionOnPipelineCreation() { + Map> badFactory = Map.of( + "bad_factory", + (pf, t, f, c) -> { throw new RuntimeException(); } + ); + SearchPipelineService searchPipelineService = createWithProcessors(badFactory, Collections.emptyMap()); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(Map.of("bad_factory", Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + + // Exception thrown when creating the pipeline + expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + + } + + public void testExceptionOnRequestProcessing() { + SearchRequestProcessor throwingRequestProcessor = new FakeRequestProcessor("throwing_request", null, null, r -> { + throw new RuntimeException(); + }); + Map> throwingRequestProcessorFactory = Map.of( + "throwing_request", + (pf, t, f, c) -> throwingRequestProcessor + ); + + SearchPipelineService searchPipelineService = createWithProcessors(throwingRequestProcessorFactory, Collections.emptyMap()); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.REQUEST_PROCESSORS_KEY, List.of(Map.of("throwing_request", Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + + // Exception thrown when processing the request + expectThrows(SearchPipelineProcessingException.class, () -> searchPipelineService.resolvePipeline(searchRequest)); + } + + public void testExceptionOnResponseProcessing() throws Exception { + SearchResponseProcessor throwingResponseProcessor = new FakeResponseProcessor("throwing_response", null, null, r -> { + throw new RuntimeException(); + }); + Map> throwingResponseProcessorFactory = Map.of( + "throwing_response", + (pf, t, f, c) -> throwingResponseProcessor + ); + + SearchPipelineService searchPipelineService = createWithProcessors(Collections.emptyMap(), throwingResponseProcessorFactory); + + Map pipelineSourceMap = new HashMap<>(); + pipelineSourceMap.put(Pipeline.RESPONSE_PROCESSORS_KEY, List.of(Map.of("throwing_response", Collections.emptyMap()))); + + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(100).searchPipelineSource(pipelineSourceMap); + SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); + + PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(searchRequest); + + SearchResponse response = new SearchResponse(null, null, 0, 0, 0, 0, null, null); + // Exception thrown when processing response + expectThrows(SearchPipelineProcessingException.class, () -> pipelinedRequest.transformResponse(response)); + } } diff --git a/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java b/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java index 8ba23c5d3219c..03257ee2a0a84 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java @@ -20,8 +20,13 @@ import org.opensearch.threadpool.ThreadPool; import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.opensearch.tasks.ResourceStats.CPU; import static org.opensearch.tasks.ResourceStats.MEMORY; import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID; @@ -88,6 +93,49 @@ public void testStopTrackingHandlesCurrentActiveThread() { assertTrue(task.getResourceStats().get(threadId).get(0).getResourceUsageInfo().getStatsInfo().get(MEMORY).getTotalValue() > 0); } + /** + * Test if taskResourceTrackingService properly tracks resource usage when multiple threads work on the same task + */ + public void testStartingTrackingHandlesMultipleThreadsPerTask() throws InterruptedException { + ExecutorService executor = threadPool.executor(ThreadPool.Names.GENERIC); + taskResourceTrackingService.setTaskResourceTrackingEnabled(true); + Task task = new SearchTask(1, "test", "test", () -> "Test", TaskId.EMPTY_TASK_ID, new HashMap<>()); + taskResourceTrackingService.startTracking(task); + int numTasks = randomIntBetween(2, 100); + for (int i = 0; i < numTasks; i++) { + executor.execute(() -> { + long threadId = Thread.currentThread().getId(); + taskResourceTrackingService.taskExecutionStartedOnThread(task.getId(), threadId); + // The same thread may pick up multiple runnables for the same task id + assertEquals(1, task.getResourceStats().get(threadId).stream().filter(ThreadResourceInfo::isActive).count()); + taskResourceTrackingService.taskExecutionFinishedOnThread(task.getId(), threadId); + }); + } + executor.shutdown(); + while (true) { + try { + if (executor.awaitTermination(1, TimeUnit.MINUTES)) break; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + Map> stats = task.getResourceStats(); + int numExecutions = 0; + for (Long threadId : stats.keySet()) { + for (ThreadResourceInfo info : task.getResourceStats().get(threadId)) { + assertTrue(info.getResourceUsageInfo().getStatsInfo().get(MEMORY).getTotalValue() > 0); + assertTrue(info.getResourceUsageInfo().getStatsInfo().get(CPU).getTotalValue() > 0); + assertFalse(info.isActive()); + numExecutions++; + } + + } + assertTrue(task.getTotalResourceStats().getCpuTimeInNanos() > 0); + assertTrue(task.getTotalResourceStats().getMemoryInBytes() > 0); + // Each execution of a runnable should record an entry in resourceStats even if it's the same thread + assertEquals(numTasks, numExecutions); + } + private void verifyThreadContextFixedHeaders(String key, String value) { assertEquals(threadPool.getThreadContext().getHeader(key), value); assertEquals(threadPool.getThreadContext().getTransient(key), value);