From 03a9650de0539754db68d98fda6e52344de11d60 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 17 Nov 2023 11:53:16 -0500 Subject: [PATCH 1/6] [BWC and API enforcement] Reduce the visibility of some existing APIs (#11242) Signed-off-by: Andriy Redko --- .../org/opensearch/action/search/SearchPhaseContext.java | 6 +++--- .../opensearch/action/search/SearchRequestContext.java | 8 +++++--- .../action/search/SearchRequestOperationsListener.java | 8 ++++---- .../opensearch/index/mapper/ParametrizedFieldMapper.java | 4 ++-- .../opensearch/index/search/stats/SearchStatsTests.java | 3 +-- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java index 0fa8569413eaf..df451e0745e3c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java @@ -34,7 +34,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.OriginalIndices; import org.opensearch.common.Nullable; -import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.annotation.InternalApi; import org.opensearch.common.lease.Releasable; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.search.SearchPhaseResult; @@ -49,9 +49,9 @@ /** * This class provide contextual state and access to resources across multiple search phases. * - * @opensearch.api + * @opensearch.internal */ -@PublicApi(since = "1.0.0") +@InternalApi public interface SearchPhaseContext extends Executor { // TODO maybe we can make this concrete later - for now we just implement this in the base class for all initial phases diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index 96e9d9efa5079..674363600b251 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.TotalHits; +import org.opensearch.common.annotation.InternalApi; import java.util.EnumMap; import java.util.HashMap; @@ -22,7 +23,8 @@ * * @opensearch.internal */ -public class SearchRequestContext { +@InternalApi +class SearchRequestContext { private final SearchRequestOperationsListener searchRequestOperationsListener; private long absoluteStartNanos; private final Map phaseTookMap; @@ -32,11 +34,11 @@ public class SearchRequestContext { /** * This constructor is for testing only */ - public SearchRequestContext() { + SearchRequestContext() { this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger())); } - public SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) { + SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) { this.searchRequestOperationsListener = searchRequestOperationsListener; this.absoluteStartNanos = System.nanoTime(); this.phaseTookMap = new HashMap<>(); diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 91e3eecbf8d13..056cb474eaf32 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -10,17 +10,17 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.annotation.InternalApi; import java.util.List; /** * A listener for search, fetch and context events at the coordinator node level * - * @opensearch.api + * @opensearch.internal */ -@PublicApi(since = "1.0.0") -public interface SearchRequestOperationsListener { +@InternalApi +interface SearchRequestOperationsListener { void onPhaseStart(SearchPhaseContext context); diff --git a/server/src/main/java/org/opensearch/index/mapper/ParametrizedFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/ParametrizedFieldMapper.java index a8e04aac45963..ee0b50024ab38 100644 --- a/server/src/main/java/org/opensearch/index/mapper/ParametrizedFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/ParametrizedFieldMapper.java @@ -282,7 +282,7 @@ public Parameter setValidator(Consumer validator) { /** * Configure a custom serializer for this parameter */ - protected Parameter setSerializer(Serializer serializer, Function conflictSerializer) { + public Parameter setSerializer(Serializer serializer, Function conflictSerializer) { this.serializer = serializer; this.conflictSerializer = conflictSerializer; return this; @@ -291,7 +291,7 @@ protected Parameter setSerializer(Serializer serializer, Function setSerializerCheck(SerializerCheck check) { + public Parameter setSerializerCheck(SerializerCheck check) { this.serializerCheck = check; return this; } diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 2ebb033899698..98c7b8e4b2bde 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -35,7 +35,6 @@ import org.opensearch.action.search.SearchPhase; import org.opensearch.action.search.SearchPhaseContext; import org.opensearch.action.search.SearchPhaseName; -import org.opensearch.action.search.SearchRequestContext; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.index.search.stats.SearchStats.Stats; import org.opensearch.test.OpenSearchTestCase; @@ -86,7 +85,7 @@ public void testShardLevelSearchGroupStats() throws Exception { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); for (int iterator = 0; iterator < paramValue; iterator++) { testRequestStats.onPhaseStart(ctx); - testRequestStats.onPhaseEnd(ctx, new SearchRequestContext()); + testRequestStats.onPhaseEnd(ctx, null /* not needed */); } } searchStats1.setSearchRequestStats(testRequestStats); From f4b25d6faecca0334331d9a4e08f91e458e92914 Mon Sep 17 00:00:00 2001 From: Sorabh Date: Fri, 17 Nov 2023 15:12:24 -0800 Subject: [PATCH 2/6] Fix flaky test MoreExpressionIT.testSpecialValueVariable in concurrent search path (#11088) * Fix flaky test MoreExpressionIT.testSpecialValueVariable in concurrent search path Signed-off-by: Sorabh Hamirwasia * Rename ReplaceableConstDoubleValueSource class to PerThreadReplaceableConstDoubleValueSource Signed-off-by: Sorabh Hamirwasia --------- Signed-off-by: Sorabh Hamirwasia --- .../script/expression/MoreExpressionIT.java | 4 --- .../ExpressionAggregationScript.java | 4 +-- .../expression/ExpressionScriptEngine.java | 8 ++--- ...eadReplaceableConstDoubleValueSource.java} | 29 ++++++++++++++----- 4 files changed, 27 insertions(+), 18 deletions(-) rename modules/lang-expression/src/main/java/org/opensearch/script/expression/{ReplaceableConstDoubleValueSource.java => PerThreadReplaceableConstDoubleValueSource.java} (62%) diff --git a/modules/lang-expression/src/internalClusterTest/java/org/opensearch/script/expression/MoreExpressionIT.java b/modules/lang-expression/src/internalClusterTest/java/org/opensearch/script/expression/MoreExpressionIT.java index bb2f652168d5c..8ca28a905f216 100644 --- a/modules/lang-expression/src/internalClusterTest/java/org/opensearch/script/expression/MoreExpressionIT.java +++ b/modules/lang-expression/src/internalClusterTest/java/org/opensearch/script/expression/MoreExpressionIT.java @@ -504,10 +504,6 @@ public void testInvalidFieldMember() { } public void testSpecialValueVariable() throws Exception { - assumeFalse( - "Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/10079", - internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) - ); // i.e. _value for aggregations createIndex("test"); ensureGreen("test"); diff --git a/modules/lang-expression/src/main/java/org/opensearch/script/expression/ExpressionAggregationScript.java b/modules/lang-expression/src/main/java/org/opensearch/script/expression/ExpressionAggregationScript.java index 5eebb9c4d60ad..a2af636ffdc8a 100644 --- a/modules/lang-expression/src/main/java/org/opensearch/script/expression/ExpressionAggregationScript.java +++ b/modules/lang-expression/src/main/java/org/opensearch/script/expression/ExpressionAggregationScript.java @@ -53,9 +53,9 @@ class ExpressionAggregationScript implements AggregationScript.LeafFactory { final SimpleBindings bindings; final DoubleValuesSource source; final boolean needsScore; - final ReplaceableConstDoubleValueSource specialValue; // _value + final PerThreadReplaceableConstDoubleValueSource specialValue; // _value - ExpressionAggregationScript(Expression e, SimpleBindings b, boolean n, ReplaceableConstDoubleValueSource v) { + ExpressionAggregationScript(Expression e, SimpleBindings b, boolean n, PerThreadReplaceableConstDoubleValueSource v) { exprScript = e; bindings = b; source = exprScript.getDoubleValuesSource(bindings); diff --git a/modules/lang-expression/src/main/java/org/opensearch/script/expression/ExpressionScriptEngine.java b/modules/lang-expression/src/main/java/org/opensearch/script/expression/ExpressionScriptEngine.java index 11b2e20eea523..5629b3b4a6972 100644 --- a/modules/lang-expression/src/main/java/org/opensearch/script/expression/ExpressionScriptEngine.java +++ b/modules/lang-expression/src/main/java/org/opensearch/script/expression/ExpressionScriptEngine.java @@ -316,14 +316,14 @@ private static AggregationScript.LeafFactory newAggregationScript( // instead of complicating SimpleBindings (which should stay simple) SimpleBindings bindings = new SimpleBindings(); boolean needsScores = false; - ReplaceableConstDoubleValueSource specialValue = null; + PerThreadReplaceableConstDoubleValueSource specialValue = null; for (String variable : expr.variables) { try { if (variable.equals("_score")) { bindings.add("_score", DoubleValuesSource.SCORES); needsScores = true; } else if (variable.equals("_value")) { - specialValue = new ReplaceableConstDoubleValueSource(); + specialValue = new PerThreadReplaceableConstDoubleValueSource(); bindings.add("_value", specialValue); // noop: _value is special for aggregations, and is handled in ExpressionScriptBindings // TODO: if some uses it in a scoring expression, they will get a nasty failure when evaluating...need a @@ -388,7 +388,7 @@ private static ScoreScript.LeafFactory newScoreScript(Expression expr, SearchLoo // NOTE: if we need to do anything complicated with bindings in the future, we can just extend Bindings, // instead of complicating SimpleBindings (which should stay simple) SimpleBindings bindings = new SimpleBindings(); - ReplaceableConstDoubleValueSource specialValue = null; + PerThreadReplaceableConstDoubleValueSource specialValue = null; boolean needsScores = false; for (String variable : expr.variables) { try { @@ -396,7 +396,7 @@ private static ScoreScript.LeafFactory newScoreScript(Expression expr, SearchLoo bindings.add("_score", DoubleValuesSource.SCORES); needsScores = true; } else if (variable.equals("_value")) { - specialValue = new ReplaceableConstDoubleValueSource(); + specialValue = new PerThreadReplaceableConstDoubleValueSource(); bindings.add("_value", specialValue); // noop: _value is special for aggregations, and is handled in ExpressionScriptBindings // TODO: if some uses it in a scoring expression, they will get a nasty failure when evaluating...need a diff --git a/modules/lang-expression/src/main/java/org/opensearch/script/expression/ReplaceableConstDoubleValueSource.java b/modules/lang-expression/src/main/java/org/opensearch/script/expression/PerThreadReplaceableConstDoubleValueSource.java similarity index 62% rename from modules/lang-expression/src/main/java/org/opensearch/script/expression/ReplaceableConstDoubleValueSource.java rename to modules/lang-expression/src/main/java/org/opensearch/script/expression/PerThreadReplaceableConstDoubleValueSource.java index 28e4707a07192..40bb957c248f2 100644 --- a/modules/lang-expression/src/main/java/org/opensearch/script/expression/ReplaceableConstDoubleValueSource.java +++ b/modules/lang-expression/src/main/java/org/opensearch/script/expression/PerThreadReplaceableConstDoubleValueSource.java @@ -39,20 +39,25 @@ import org.apache.lucene.search.IndexSearcher; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** - * A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double. + * A {@link DoubleValuesSource} which has a stub {@link DoubleValues} that holds a dynamically replaceable constant double. This is made + * thread-safe for concurrent segment search use case by keeping the {@link DoubleValues} per thread. Any update to the value happens in + * thread specific {@link DoubleValuesSource} instance. */ -final class ReplaceableConstDoubleValueSource extends DoubleValuesSource { - final ReplaceableConstDoubleValues fv; +final class PerThreadReplaceableConstDoubleValueSource extends DoubleValuesSource { + // Multiple slices can be processed by same thread but that will be sequential, so keeping per thread is fine + final Map perThreadDoubleValues; - ReplaceableConstDoubleValueSource() { - fv = new ReplaceableConstDoubleValues(); + PerThreadReplaceableConstDoubleValueSource() { + perThreadDoubleValues = new ConcurrentHashMap<>(); } @Override public DoubleValues getValues(LeafReaderContext ctx, DoubleValues scores) throws IOException { - return fv; + return perThreadDoubleValues.computeIfAbsent(Thread.currentThread().getId(), threadId -> new ReplaceableConstDoubleValues()); } @Override @@ -62,7 +67,11 @@ public boolean needsScores() { @Override public Explanation explain(LeafReaderContext ctx, int docId, Explanation scoreExplanation) throws IOException { - if (fv.advanceExact(docId)) return Explanation.match((float) fv.doubleValue(), "ReplaceableConstDoubleValues"); + final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent( + Thread.currentThread().getId(), + threadId -> new ReplaceableConstDoubleValues() + ); + if (currentFv.advanceExact(docId)) return Explanation.match((float) currentFv.doubleValue(), "ReplaceableConstDoubleValues"); else return Explanation.noMatch("ReplaceableConstDoubleValues"); } @@ -77,7 +86,11 @@ public int hashCode() { } public void setValue(double v) { - fv.setValue(v); + final ReplaceableConstDoubleValues currentFv = perThreadDoubleValues.computeIfAbsent( + Thread.currentThread().getId(), + threadId -> new ReplaceableConstDoubleValues() + ); + currentFv.setValue(v); } @Override From edbc4e27c6ef160709a28c36712b69998ec5ce4f Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Sun, 19 Nov 2023 11:06:25 -0800 Subject: [PATCH 3/6] Add indexRandomForConcurrentSearch to tests (#11259) * Update Concurrent search related tests to use slice count > 1. Signed-off-by: Harish Bhakuni Signed-off-by: Jay Deng * Add indexRandomForConcurrentSearch to tests: * CardinalityWithRequestBreakerIT.java * TopHitsIT.java * SearchFieldsIT.java * DecayFunctionScoreIT.java * FunctionScoreFieldValueIT.java * FunctionScoreIT.java * FunctionScorePluginIT.java * QueryRescorerIT.java Signed-off-by: Jay Deng --------- Signed-off-by: Harish Bhakuni Signed-off-by: Jay Deng Co-authored-by: Harish Bhakuni --- .../metrics/CardinalityWithRequestBreakerIT.java | 1 + .../opensearch/search/fields/SearchFieldsIT.java | 16 ++++++++++++++++ .../functionscore/DecayFunctionScoreIT.java | 10 ++++++++++ .../functionscore/FunctionScoreFieldValueIT.java | 7 ++++--- .../search/functionscore/FunctionScoreIT.java | 13 +++++++++---- .../functionscore/FunctionScorePluginIT.java | 1 + .../search/functionscore/QueryRescorerIT.java | 8 +++++++- 7 files changed, 48 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/CardinalityWithRequestBreakerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/CardinalityWithRequestBreakerIT.java index 85c36ec0ba78d..6cbf278317b1b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/CardinalityWithRequestBreakerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/metrics/CardinalityWithRequestBreakerIT.java @@ -100,6 +100,7 @@ public void testRequestBreaker() throws Exception { ) .get(); + indexRandomForConcurrentSearch("test"); try { client().prepareSearch("test") .addAggregation( diff --git a/server/src/internalClusterTest/java/org/opensearch/search/fields/SearchFieldsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/fields/SearchFieldsIT.java index 799bbf91a567d..ed8fe74504f92 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/fields/SearchFieldsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/fields/SearchFieldsIT.java @@ -251,6 +251,7 @@ public void testStoredFields() throws Exception { .get(); client().admin().indices().prepareRefresh().get(); + indexRandomForConcurrentSearch("test"); SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).addStoredField("field1").get(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L)); @@ -358,6 +359,7 @@ public void testScriptDocAndFields() throws Exception { ) .get(); client().admin().indices().refresh(refreshRequest()).actionGet(); + indexRandomForConcurrentSearch("test"); logger.info("running doc['num1'].value"); SearchResponse response = client().prepareSearch() @@ -458,6 +460,7 @@ public void testScriptWithUnsignedLong() throws Exception { ) .get(); client().admin().indices().refresh(refreshRequest()).actionGet(); + indexRandomForConcurrentSearch("test"); SearchResponse response = client().prepareSearch() .setQuery(matchAllQuery()) @@ -547,6 +550,7 @@ public void testScriptFieldWithNanos() throws Exception { .setSource(jsonBuilder().startObject().field("date", "1970-01-01T00:00:00.000Z").endObject()), client().prepareIndex("test").setId("2").setSource(jsonBuilder().startObject().field("date", date).endObject()) ); + indexRandomForConcurrentSearch("test"); SearchResponse response = client().prepareSearch() .setQuery(matchAllQuery()) @@ -632,6 +636,7 @@ public void testScriptFieldUsingSource() throws Exception { ) .get(); client().admin().indices().refresh(refreshRequest()).actionGet(); + indexRandomForConcurrentSearch("test"); SearchResponse response = client().prepareSearch() .setQuery(matchAllQuery()) @@ -674,6 +679,7 @@ public void testScriptFieldUsingSource() throws Exception { public void testScriptFieldsForNullReturn() throws Exception { client().prepareIndex("test").setId("1").setSource("foo", "bar").setRefreshPolicy("true").get(); + indexRandomForConcurrentSearch("test"); SearchResponse response = client().prepareSearch() .setQuery(matchAllQuery()) .addScriptField("test_script_1", new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "return null", Collections.emptyMap())) @@ -795,6 +801,7 @@ public void testStoredFieldsWithoutSource() throws Exception { .get(); client().admin().indices().prepareRefresh().get(); + indexRandomForConcurrentSearch("test"); SearchResponse searchResponse = client().prepareSearch() .setQuery(matchAllQuery()) @@ -852,6 +859,7 @@ public void testSearchFieldsMetadata() throws Exception { .setSource(jsonBuilder().startObject().field("field1", "value").endObject()) .setRefreshPolicy(IMMEDIATE) .get(); + indexRandomForConcurrentSearch("my-index"); SearchResponse searchResponse = client().prepareSearch("my-index").addStoredField("field1").addStoredField("_routing").get(); @@ -866,6 +874,7 @@ public void testSearchFieldsNonLeafField() throws Exception { .setSource(jsonBuilder().startObject().startObject("field1").field("field2", "value1").endObject().endObject()) .setRefreshPolicy(IMMEDIATE) .get(); + indexRandomForConcurrentSearch("my-index"); assertFailures( client().prepareSearch("my-index").addStoredField("field1"), @@ -932,6 +941,7 @@ public void testGetFieldsComplexField() throws Exception { ); client().prepareIndex("my-index").setId("1").setRefreshPolicy(IMMEDIATE).setSource(source, MediaTypeRegistry.JSON).get(); + indexRandomForConcurrentSearch("my-index"); String field = "field1.field2.field3.field4"; @@ -1039,6 +1049,7 @@ public void testDocValueFields() throws Exception { .get(); client().admin().indices().prepareRefresh().get(); + indexRandomForConcurrentSearch("test"); SearchRequestBuilder builder = client().prepareSearch() .setQuery(matchAllQuery()) @@ -1271,6 +1282,7 @@ public void testScriptFields() throws Exception { ); } indexRandom(true, reqs); + indexRandomForConcurrentSearch("index"); ensureSearchable(); SearchRequestBuilder req = client().prepareSearch("index"); for (String field : Arrays.asList("s", "ms", "l", "ml", "d", "md")) { @@ -1326,6 +1338,7 @@ public void testDocValueFieldsWithFieldAlias() throws Exception { index("test", MapperService.SINGLE_MAPPING_NAME, "1", "text_field", "foo", "date_field", formatter.print(date)); refresh("test"); + indexRandomForConcurrentSearch("test"); SearchRequestBuilder builder = client().prepareSearch() .setQuery(matchAllQuery()) @@ -1387,6 +1400,7 @@ public void testWildcardDocValueFieldsWithFieldAlias() throws Exception { index("test", MapperService.SINGLE_MAPPING_NAME, "1", "text_field", "foo", "date_field", formatter.print(date)); refresh("test"); + indexRandomForConcurrentSearch("test"); SearchRequestBuilder builder = client().prepareSearch() .setQuery(matchAllQuery()) @@ -1440,6 +1454,7 @@ public void testStoredFieldsWithFieldAlias() throws Exception { index("test", MapperService.SINGLE_MAPPING_NAME, "1", "field1", "value1", "field2", "value2"); refresh("test"); + indexRandomForConcurrentSearch("test"); SearchResponse searchResponse = client().prepareSearch() .setQuery(matchAllQuery()) @@ -1482,6 +1497,7 @@ public void testWildcardStoredFieldsWithFieldAlias() throws Exception { index("test", MapperService.SINGLE_MAPPING_NAME, "1", "field1", "value1", "field2", "value2"); refresh("test"); + indexRandomForConcurrentSearch("test"); SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).addStoredField("field*").get(); assertHitCount(searchResponse, 1L); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/DecayFunctionScoreIT.java b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/DecayFunctionScoreIT.java index 6eb528e0bb7d3..3a6624c2ad2e6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/DecayFunctionScoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/DecayFunctionScoreIT.java @@ -398,6 +398,7 @@ public void testBoostModeSettingWorks() throws Exception { ) ); indexRandom(true, false, indexBuilders); // force no dummy docs + indexRandomForConcurrentSearch("test"); // Test Gauss List lonlat = new ArrayList<>(); @@ -482,6 +483,7 @@ public void testParseGeoPoint() throws Exception { constantScoreQuery(termQuery("test", "value")), ScoreFunctionBuilders.weightFactorFunction(randomIntBetween(1, 10)) ); + indexRandomForConcurrentSearch("test"); GeoPoint point = new GeoPoint(20, 11); ActionFuture response = client().search( searchRequest().searchType(SearchType.QUERY_THEN_FETCH) @@ -535,6 +537,7 @@ public void testCombineModes() throws Exception { .setRefreshPolicy(IMMEDIATE) .setSource(jsonBuilder().startObject().field("test", "value value").field("num", 1.0).endObject()) .get(); + indexRandomForConcurrentSearch("test"); FunctionScoreQueryBuilder baseQuery = functionScoreQuery( constantScoreQuery(termQuery("test", "value")), ScoreFunctionBuilders.weightFactorFunction(2) @@ -654,6 +657,7 @@ public void testCombineModesExplain() throws Exception { constantScoreQuery(termQuery("test", "value")).queryName("query1"), ScoreFunctionBuilders.weightFactorFunction(2, "weight1") ); + indexRandomForConcurrentSearch("test"); // decay score should return 0.5 for this function and baseQuery should return 2.0f as it's score ActionFuture response = client().search( searchRequest().searchType(SearchType.QUERY_THEN_FETCH) @@ -762,6 +766,7 @@ public void testParseDateMath() throws Exception { ).actionGet(); refresh(); + indexRandomForConcurrentSearch("test"); SearchResponse sr = client().search( searchRequest().source( searchSource().query(functionScoreQuery(termQuery("test", "value"), gaussDecayFunction("num1", "now", "2d"))) @@ -817,6 +822,7 @@ public void testValueMissingLin() throws Exception { ).actionGet(); refresh(); + indexRandomForConcurrentSearch("test"); ActionFuture response = client().search( searchRequest().searchType(SearchType.QUERY_THEN_FETCH) @@ -893,6 +899,7 @@ public void testDateWithoutOrigin() throws Exception { ).actionGet(); refresh(); + indexRandomForConcurrentSearch("test"); ActionFuture response = client().search( searchRequest().searchType(SearchType.QUERY_THEN_FETCH) @@ -974,6 +981,7 @@ public void testManyDocsLin() throws Exception { List lonlat = new ArrayList<>(); lonlat.add(100f); lonlat.add(110f); + indexRandomForConcurrentSearch("test"); ActionFuture response = client().search( searchRequest().searchType(SearchType.QUERY_THEN_FETCH) .source( @@ -1107,6 +1115,7 @@ public void testNoQueryGiven() throws Exception { client().index(indexRequest("test").source(jsonBuilder().startObject().field("test", "value").field("num", 1.0).endObject())) .actionGet(); refresh(); + indexRandomForConcurrentSearch("test"); // so, we indexed a string field, but now we try to score a num field ActionFuture response = client().search( searchRequest().searchType(SearchType.QUERY_THEN_FETCH) @@ -1171,6 +1180,7 @@ public void testMultiFieldOptions() throws Exception { ); indexRandom(true, doc1, doc2); + indexRandomForConcurrentSearch("test"); ActionFuture response = client().search(searchRequest().source(searchSource().query(baseQuery))); SearchResponse sr = response.actionGet(); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScoreFieldValueIT.java b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScoreFieldValueIT.java index b09914c4aa764..d53f55b98bd23 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScoreFieldValueIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScoreFieldValueIT.java @@ -80,7 +80,7 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); } - public void testFieldValueFactor() throws IOException { + public void testFieldValueFactor() throws IOException, InterruptedException { assertAcked( prepareCreate("test").setMapping( jsonBuilder().startObject() @@ -99,8 +99,8 @@ public void testFieldValueFactor() throws IOException { client().prepareIndex("test").setId("1").setSource("test", 5, "body", "foo").get(); client().prepareIndex("test").setId("2").setSource("test", 17, "body", "foo").get(); client().prepareIndex("test").setId("3").setSource("body", "bar").get(); - refresh(); + indexRandomForConcurrentSearch("test"); // document 2 scores higher because 17 > 5 SearchResponse response = client().prepareSearch("test") @@ -189,7 +189,7 @@ public void testFieldValueFactor() throws IOException { } } - public void testFieldValueFactorExplain() throws IOException { + public void testFieldValueFactorExplain() throws IOException, InterruptedException { assertAcked( prepareCreate("test").setMapping( jsonBuilder().startObject() @@ -210,6 +210,7 @@ public void testFieldValueFactorExplain() throws IOException { client().prepareIndex("test").setId("3").setSource("body", "bar").get(); refresh(); + indexRandomForConcurrentSearch("test"); // document 2 scores higher because 17 > 5 final String functionName = "func1"; diff --git a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScoreIT.java b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScoreIT.java index 88395f25700d2..3b80d437e95c0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScoreIT.java @@ -126,10 +126,11 @@ protected Map, Object>> pluginScripts() { } } - public void testScriptScoresNested() throws IOException { + public void testScriptScoresNested() throws IOException, InterruptedException { createIndex(INDEX); index(INDEX, TYPE, "1", jsonBuilder().startObject().field("dummy_field", 1).endObject()); refresh(); + indexRandomForConcurrentSearch(INDEX); Script scriptOne = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "1", Collections.emptyMap()); Script scriptTwo = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "get score value", Collections.emptyMap()); @@ -148,10 +149,11 @@ public void testScriptScoresNested() throws IOException { assertThat(response.getHits().getAt(0).getScore(), equalTo(1.0f)); } - public void testScriptScoresWithAgg() throws IOException { + public void testScriptScoresWithAgg() throws IOException, InterruptedException { createIndex(INDEX); index(INDEX, TYPE, "1", jsonBuilder().startObject().field("dummy_field", 1).endObject()); refresh(); + indexRandomForConcurrentSearch(INDEX); Script script = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "get score value", Collections.emptyMap()); @@ -166,10 +168,11 @@ public void testScriptScoresWithAgg() throws IOException { assertThat(((Terms) response.getAggregations().asMap().get("score_agg")).getBuckets().get(0).getDocCount(), is(1L)); } - public void testScriptScoresWithAggWithExplain() throws IOException { + public void testScriptScoresWithAggWithExplain() throws IOException, InterruptedException { createIndex(INDEX); index(INDEX, TYPE, "1", jsonBuilder().startObject().field("dummy_field", 1).endObject()); refresh(); + indexRandomForConcurrentSearch(INDEX); Script script = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "get score value", Collections.emptyMap()); @@ -195,7 +198,7 @@ public void testScriptScoresWithAggWithExplain() throws IOException { assertThat(((Terms) response.getAggregations().asMap().get("score_agg")).getBuckets().get(0).getDocCount(), is(1L)); } - public void testMinScoreFunctionScoreBasic() throws IOException { + public void testMinScoreFunctionScoreBasic() throws IOException, InterruptedException { float score = randomValueOtherThanMany((f) -> Float.compare(f, 0) < 0, OpenSearchTestCase::randomFloat); float minScore = randomValueOtherThanMany((f) -> Float.compare(f, 0) < 0, OpenSearchTestCase::randomFloat); index( @@ -207,6 +210,7 @@ public void testMinScoreFunctionScoreBasic() throws IOException { .endObject() ); refresh(); + indexRandomForConcurrentSearch(INDEX); ensureYellow(); Script script = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "doc['random_score']", Collections.emptyMap()); @@ -291,6 +295,7 @@ public void testWithEmptyFunctions() throws IOException, ExecutionException, Int assertAcked(prepareCreate("test")); index("test", "testtype", "1", jsonBuilder().startObject().field("text", "test text").endObject()); refresh(); + indexRandomForConcurrentSearch("test"); SearchResponse termQuery = client().search(searchRequest().source(searchSource().explain(true).query(termQuery("text", "text")))) .get(); diff --git a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScorePluginIT.java b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScorePluginIT.java index 1df4acac0dcf0..a91f53dae04d2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScorePluginIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/FunctionScorePluginIT.java @@ -122,6 +122,7 @@ public void testPlugin() throws Exception { ).actionGet(); client().admin().indices().prepareRefresh().get(); + indexRandomForConcurrentSearch("test"); DecayFunctionBuilder gfb = new CustomDistanceScoreBuilder("num1", "2013-05-28", "+1d"); ActionFuture response = client().search( diff --git a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/QueryRescorerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/QueryRescorerIT.java index de4c85301547c..bda6284d9535a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/functionscore/QueryRescorerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/functionscore/QueryRescorerIT.java @@ -110,7 +110,7 @@ protected Settings featureFlagSettings() { return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); } - public void testEnforceWindowSize() { + public void testEnforceWindowSize() throws InterruptedException { createIndex("test"); // this int iters = scaledRandomIntBetween(10, 20); @@ -118,6 +118,7 @@ public void testEnforceWindowSize() { client().prepareIndex("test").setId(Integer.toString(i)).setSource("f", Integer.toString(i)).get(); } refresh(); + indexRandomForConcurrentSearch("test"); int numShards = getNumShards("test").numPrimaries; for (int j = 0; j < iters; j++) { @@ -169,6 +170,7 @@ public void testRescorePhrase() throws Exception { .setSource("field1", "quick huge brown", "field2", "the quick lazy huge brown fox jumps over the tree") .get(); refresh(); + indexRandomForConcurrentSearch("test"); SearchResponse searchResponse = client().prepareSearch() .setQuery(QueryBuilders.matchQuery("field1", "the quick brown").operator(Operator.OR)) .setRescorer( @@ -474,6 +476,7 @@ private static void assertEquivalent(String query, SearchResponse plain, SearchR public void testEquivalence() throws Exception { // no dummy docs since merges can change scores while we run queries. int numDocs = indexRandomNumbers("whitespace", -1, false); + indexRandomForConcurrentSearch("test"); final int iters = scaledRandomIntBetween(50, 100); for (int i = 0; i < iters; i++) { @@ -545,6 +548,7 @@ public void testExplain() throws Exception { .setSource("field1", "quick huge brown", "field2", "the quick lazy huge brown fox jumps over the tree") .get(); refresh(); + indexRandomForConcurrentSearch("test"); { SearchResponse searchResponse = client().prepareSearch() @@ -816,6 +820,7 @@ public void testFromSize() throws Exception { client().prepareIndex("test").setId("" + i).setSource("text", "hello world").get(); } refresh(); + indexRandomForConcurrentSearch("test"); SearchRequestBuilder request = client().prepareSearch(); request.setQuery(QueryBuilders.termQuery("text", "hello")); @@ -832,6 +837,7 @@ public void testRescorePhaseWithInvalidSort() throws Exception { client().prepareIndex("test").setId("" + i).setSource("number", 0).get(); } refresh(); + indexRandomForConcurrentSearch("test"); Exception exc = expectThrows( Exception.class, From 60c46f3a6c33b1c1074f161b629927e5d60917c0 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Sun, 19 Nov 2023 17:56:45 -0800 Subject: [PATCH 4/6] Fix maxScore check in reduce phase when some scores are NaN (#11267) Signed-off-by: Jay Deng --- .../java/org/opensearch/search/sort/FieldSortIT.java | 4 ---- .../org/opensearch/search/query/TopDocsCollectorContext.java | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/sort/FieldSortIT.java b/server/src/internalClusterTest/java/org/opensearch/search/sort/FieldSortIT.java index 3c5574f823c3b..81e948640ee94 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/sort/FieldSortIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/sort/FieldSortIT.java @@ -268,10 +268,6 @@ public void testIssue6614() throws ExecutionException, InterruptedException { } public void testTrackScores() throws Exception { - assumeFalse( - "Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11189", - internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) - ); assertAcked(client().admin().indices().prepareCreate("test").setMapping("svalue", "type=keyword").get()); ensureGreen(); index( diff --git a/server/src/main/java/org/opensearch/search/query/TopDocsCollectorContext.java b/server/src/main/java/org/opensearch/search/query/TopDocsCollectorContext.java index 86f6e542f97d1..65d3948c8401e 100644 --- a/server/src/main/java/org/opensearch/search/query/TopDocsCollectorContext.java +++ b/server/src/main/java/org/opensearch/search/query/TopDocsCollectorContext.java @@ -532,7 +532,7 @@ public ReduceableSearchResult reduce(Collection collectors) throws IO float score = collector.getMaxScore(); if (Float.isNaN(maxScore)) { maxScore = score; - } else { + } else if (!Float.isNaN(score)) { maxScore = Math.max(maxScore, score); } } From b0d6b3c0ea626a3ef8864060957b9a3550f0e8e9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 20 Nov 2023 09:39:18 -0500 Subject: [PATCH 5/6] Bump org.apache.logging.log4j:log4j-core from 2.21.1 to 2.22.0 in /buildSrc/src/testKit/thirdPartyAudit/sample_jars (#11270) * Bump org.apache.logging.log4j:log4j-core Bumps org.apache.logging.log4j:log4j-core from 2.21.1 to 2.22.0. --- updated-dependencies: - dependency-name: org.apache.logging.log4j:log4j-core dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 2 +- buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b9d3ff90cd32..a40580cbb3238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,7 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `org.codehaus.woodstox:stax2-api` from 4.2.1 to 4.2.2 ([#10639](https://github.com/opensearch-project/OpenSearch/pull/10639)) - Bump `com.google.http-client:google-http-client` from 1.43.2 to 1.43.3 ([#10635](https://github.com/opensearch-project/OpenSearch/pull/10635)) - Bump `com.squareup.okio:okio` from 3.5.0 to 3.6.0 ([#10637](https://github.com/opensearch-project/OpenSearch/pull/10637)) -- Bump `org.apache.logging.log4j:log4j-core` from 2.20.0 to 2.21.1 ([#10858](https://github.com/opensearch-project/OpenSearch/pull/10858), [#11000](https://github.com/opensearch-project/OpenSearch/pull/11000)) +- Bump `org.apache.logging.log4j:log4j-core` from 2.20.0 to 2.22.0 ([#10858](https://github.com/opensearch-project/OpenSearch/pull/10858), [#11000](https://github.com/opensearch-project/OpenSearch/pull/11000), [#11270](https://github.com/opensearch-project/OpenSearch/pull/11270)) - Bump `aws-actions/configure-aws-credentials` from 2 to 4 ([#10504](https://github.com/opensearch-project/OpenSearch/pull/10504)) - Bump `stefanzweifel/git-auto-commit-action` from 4 to 5 ([#11171](https://github.com/opensearch-project/OpenSearch/pull/11171)) diff --git a/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle b/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle index dca2bce94ea6d..f24b61ef0d165 100644 --- a/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle +++ b/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle @@ -17,7 +17,7 @@ repositories { } dependencies { - implementation "org.apache.logging.log4j:log4j-core:2.21.1" + implementation "org.apache.logging.log4j:log4j-core:2.22.0" } ["0.0.1", "0.0.2"].forEach { v -> From 00517eb21144065bd779c1777e723e5d8c1f0ecb Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Mon, 20 Nov 2023 12:06:21 -0500 Subject: [PATCH 6/6] [BUG] The thread context is not properly cleared and messes up the traces (#10873) * [BUG] The thread context is not properly cleared and messes up the traces Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko --------- Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + .../telemetry/tracing/DefaultSpanScope.java | 4 +- .../telemetry/tracing/DefaultTracer.java | 9 +- .../opensearch/telemetry/tracing/Tracer.java | 4 +- .../tracing/TracingContextPropagator.java | 4 +- .../telemetry/tracing/noop/NoopTracer.java | 4 +- .../TransportTracer.java} | 16 +- .../{http => transport}/package-info.java | 4 +- .../telemetry/tracing/DefaultTracerTests.java | 35 ++-- .../TelemetryTracerEnabledSanityIT.java | 22 ++- .../tracing/OTelTracingContextPropagator.java | 10 +- .../OTelTracingContextPropagatorTests.java | 4 +- .../http/AbstractHttpServerTransport.java | 9 +- ...hreadContextBasedTracerContextStorage.java | 13 +- .../telemetry/tracing/TracerFactory.java | 12 +- .../telemetry/tracing/WrappedTracer.java | 4 +- .../opensearch/transport/InboundHandler.java | 11 +- .../transport/TransportService.java | 122 ++++++------ ...ContextBasedTracerContextStorageTests.java | 174 ++++++++++++++++++ .../tracing/MockTracingContextPropagator.java | 4 +- .../tracing/TelemetryValidators.java | 2 +- .../NumberOfTraceIDsEqualToRequests.java | 11 +- 22 files changed, 338 insertions(+), 141 deletions(-) rename libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/{http/HttpTracer.java => transport/TransportTracer.java} (64%) rename libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/{http => transport}/package-info.java (65%) create mode 100644 server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a40580cbb3238..55f628a4532d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) - Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) - Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993)) +- [BUG] Fix the thread context that is not properly cleared and messes up the traces ([#10873](https://github.com/opensearch-project/OpenSearch/pull/10873)) ### Security diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java index a5d515443b54d..decbf49f795c4 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java @@ -44,23 +44,23 @@ private DefaultSpanScope(Span span, SpanScope previousSpanScope, TracerContextSt public static SpanScope create(Span span, TracerContextStorage tracerContextStorage) { final SpanScope beforeSpanScope = spanScopeThreadLocal.get(); SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope, tracerContextStorage); - spanScopeThreadLocal.set(newSpanScope); return newSpanScope; } @Override public void close() { detach(); - spanScopeThreadLocal.set(previousSpanScope); } @Override public SpanScope attach() { + spanScopeThreadLocal.set(this); tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span); return this; } private void detach() { + spanScopeThreadLocal.set(previousSpanScope); if (previousSpanScope != null) { tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, previousSpanScope.getSpan()); } else { diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java index a3bb64ea392a9..8f1a26d99e725 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java @@ -12,7 +12,7 @@ import java.io.Closeable; import java.io.IOException; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -53,7 +53,6 @@ public Span startSpan(SpanCreationContext context) { parentSpan = getCurrentSpanInternal(); } Span span = createSpan(context, parentSpan); - setCurrentSpanInContext(span); addDefaultAttributes(span); return span; } @@ -94,10 +93,6 @@ private Span createSpan(SpanCreationContext spanCreationContext, Span parentSpan return tracingTelemetry.createSpan(spanCreationContext, parentSpan); } - private void setCurrentSpanInContext(Span span) { - tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, span); - } - /** * Adds default attributes in the span * @param span the current active span @@ -107,7 +102,7 @@ protected void addDefaultAttributes(Span span) { } @Override - public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { + public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { Optional propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers); return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null))); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java index 8257d251e9560..9b49ca7668992 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java @@ -9,7 +9,7 @@ package org.opensearch.telemetry.tracing; import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.telemetry.tracing.http.HttpTracer; +import org.opensearch.telemetry.tracing.transport.TransportTracer; import java.io.Closeable; @@ -22,7 +22,7 @@ * @opensearch.experimental */ @ExperimentalApi -public interface Tracer extends HttpTracer, Closeable { +public interface Tracer extends TransportTracer, Closeable { /** * Starts the {@link Span} with given {@link SpanCreationContext} * diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java index 5fbc5d329e227..d7d48d1db10d6 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java @@ -10,7 +10,7 @@ import org.opensearch.common.annotation.ExperimentalApi; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -36,7 +36,7 @@ public interface TracingContextPropagator { * @param headers request headers to extract the context from * @return current span */ - Optional extractFromHeaders(Map> headers); + Optional extractFromHeaders(Map> headers); /** * Injects tracing context diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java index 50452ff5fe3b4..c57eaccf1f3df 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java @@ -16,7 +16,7 @@ import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import java.util.List; +import java.util.Collection; import java.util.Map; /** @@ -65,7 +65,7 @@ public void close() { } @Override - public Span startSpan(SpanCreationContext spanCreationContext, Map> header) { + public Span startSpan(SpanCreationContext spanCreationContext, Map> header) { return NoopSpan.INSTANCE; } } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/TransportTracer.java similarity index 64% rename from libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java rename to libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/TransportTracer.java index 50d18c0a0d040..5883d7de8e83a 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/TransportTracer.java @@ -6,31 +6,31 @@ * compatible open source license. */ -package org.opensearch.telemetry.tracing.http; +package org.opensearch.telemetry.tracing.transport; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanCreationContext; -import java.util.List; +import java.util.Collection; import java.util.Map; /** - * HttpTracer helps in creating a {@link Span} which reads the incoming tracing information - * from the HttpRequest header and propagate the span accordingly. + * TransportTracer helps in creating a {@link Span} which reads the incoming tracing information + * from the HTTP or TCP transport headers and propagate the span accordingly. *

* All methods on the Tracer object are multi-thread safe. * * @opensearch.experimental */ @ExperimentalApi -public interface HttpTracer { +public interface TransportTracer { /** * Start the span with propagating the tracing info from the HttpRequest header. * * @param spanCreationContext span name. - * @param header http request header. - * @return span. + * @param headers transport headers + * @return the span instance */ - Span startSpan(SpanCreationContext spanCreationContext, Map> header); + Span startSpan(SpanCreationContext spanCreationContext, Map> headers); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/package-info.java similarity index 65% rename from libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java rename to libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/package-info.java index 9feb862a4e010..87ffcc43184bb 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/package-info.java @@ -7,6 +7,6 @@ */ /** - * Contains No-op implementations + * Contains HTTP or TCP transport related tracer capabilities */ -package org.opensearch.telemetry.tracing.http; +package org.opensearch.telemetry.tracing.transport; diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java index 2a791f1ae4164..2182b3ea28ac8 100644 --- a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java @@ -22,6 +22,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -35,7 +37,6 @@ public class DefaultTracerTests extends OpenSearchTestCase { private Span mockSpan; private Span mockParentSpan; - private SpanScope mockSpanScope; private ThreadPool threadPool; private ExecutorService executorService; private SpanCreationContext spanCreationContext; @@ -102,11 +103,11 @@ public void testCreateSpanWithAttributes() { Span span = defaultTracer.startSpan(spanCreationContext); - assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName()); - assertEquals(1.0, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key1")); - assertEquals(2l, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key2")); - assertEquals(true, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key3")); - assertEquals("key4", ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key4")); + assertThat(defaultTracer.getCurrentSpan(), is(nullValue())); + assertEquals(1.0, ((MockSpan) span).getAttribute("key1")); + assertEquals(2l, ((MockSpan) span).getAttribute("key2")); + assertEquals(true, ((MockSpan) span).getAttribute("key3")); + assertEquals("key4", ((MockSpan) span).getAttribute("key4")); span.endSpan(); } @@ -121,16 +122,18 @@ public void testCreateSpanWithParent() { Span span = defaultTracer.startSpan(spanCreationContext, null); - SpanContext parentSpan = defaultTracer.getCurrentSpan(); - - SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan()); + try (final SpanScope scope = defaultTracer.withSpanInScope(span)) { + SpanContext parentSpan = defaultTracer.getCurrentSpan(); - Span span1 = defaultTracer.startSpan(spanCreationContext1); + SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan()); - assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName()); - assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan()); - span1.endSpan(); - span.endSpan(); + try (final ScopedSpan span1 = defaultTracer.startScopedSpan(spanCreationContext1)) { + assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName()); + assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan()); + } + } finally { + span.endSpan(); + } } @SuppressWarnings("unchecked") @@ -155,8 +158,7 @@ public void testCreateSpanWithNullParent() { Span span = defaultTracer.startSpan(spanCreationContext); - assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName()); - assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan()); + assertThat(defaultTracer.getCurrentSpan(), is(nullValue())); span.endSpan(); } @@ -403,7 +405,6 @@ private void setupMocks() { mockTracingTelemetry = mock(TracingTelemetry.class); mockSpan = mock(Span.class); mockParentSpan = mock(Span.class); - mockSpanScope = mock(SpanScope.class); mockTracerContextStorage = mock(TracerContextStorage.class); when(mockSpan.getSpanName()).thenReturn("span_name"); when(mockSpan.getSpanId()).thenReturn("span_id"); diff --git a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java index f07f2b308e801..156dc344d1ae2 100644 --- a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java +++ b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java @@ -61,20 +61,22 @@ public void testSanityChecksWhenTracingEnabled() throws Exception { // Create Index and ingest data String indexName = "test-index-11"; - Settings basicSettings = Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0).build(); + Settings basicSettings = Settings.builder() + .put("number_of_shards", 2) + .put("number_of_replicas", 0) + .put("index.routing.allocation.total_shards_per_node", 1) + .build(); createIndex(indexName, basicSettings); - indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well")); - indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field2", "another fox did the same.")); + + indexRandom(false, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well")); + indexRandom(false, client.prepareIndex(indexName).setId("2").setSource("field2", "another fox did the same.")); ensureGreen(); refresh(); // Make the search calls; adding the searchType and PreFilterShardSize to make the query path predictable across all the runs. - client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("fox")).get(); - client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("jumps")).get(); - - ensureGreen(); - refresh(); + client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("fox")).get(); + client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("jumps")).get(); // Sleep for about 3s to wait for traces are published, delay is (the delay is 1s). Thread.sleep(3000); @@ -88,8 +90,10 @@ public void testSanityChecksWhenTracingEnabled() throws Exception { ) ); + // See please https://github.com/opensearch-project/OpenSearch/issues/10291 till local transport is not instrumented, + // capturing only the inter-nodes transport actions. InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE; - validators.validate(exporter.getFinishedSpanItems(), 6); + validators.validate(exporter.getFinishedSpanItems(), 4); } private static void updateTelemetrySetting(Client client, boolean value) { diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java index f8fe885ee450c..0fb05a08c27bb 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java @@ -10,8 +10,8 @@ import org.opensearch.core.common.Strings; +import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -51,7 +51,7 @@ private static OTelPropagatedSpan getPropagatedSpan(Context context) { } @Override - public Optional extractFromHeaders(Map> headers) { + public Optional extractFromHeaders(Map> headers) { Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER); return Optional.ofNullable(getPropagatedSpan(context)); } @@ -87,9 +87,9 @@ public String get(Map headers, String key) { } }; - private static final TextMapGetter>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() { + private static final TextMapGetter>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() { @Override - public Iterable keys(Map> headers) { + public Iterable keys(Map> headers) { if (headers != null) { return headers.keySet(); } else { @@ -98,7 +98,7 @@ public Iterable keys(Map> headers) { } @Override - public String get(Map> headers, String key) { + public String get(Map> headers, String key) { if (headers != null && headers.containsKey(key)) { return Strings.collectionToCommaDelimitedString(headers.get(key)); } diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java index 16a3ec9493d5d..d865a329104c1 100644 --- a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java @@ -11,8 +11,8 @@ import org.opensearch.test.OpenSearchTestCase; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import io.opentelemetry.api.OpenTelemetry; @@ -57,7 +57,7 @@ public void testExtractTracerContextFromHeader() { } public void testExtractTracerContextFromHttpHeader() { - Map> requestHeaders = new HashMap<>(); + Map> requestHeaders = new HashMap<>(); requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00")); OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index b8f8abb6c2c23..257aca2b67990 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -69,9 +69,11 @@ import java.nio.channels.CancelledKeyException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -362,7 +364,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) { * @param httpChannel that received the http request */ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) { - final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), httpRequest.getHeaders()); + final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), extractHeaders(httpRequest.getHeaders())); try (final SpanScope httpRequestSpanScope = tracer.withSpanInScope(span)) { HttpChannel traceableHttpChannel = TraceableHttpChannel.create(httpChannel, span, tracer); handleIncomingRequest(httpRequest, traceableHttpChannel, httpRequest.getInboundException()); @@ -483,4 +485,9 @@ private static ActionListener earlyResponseListener(HttpRequest request, H return NO_OP; } } + + @SuppressWarnings("unchecked") + private static > Map> extractHeaders(Map headers) { + return (Map>) headers; + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 208df90f65d74..863f56d9fbe94 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -15,7 +15,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Optional; /** * Core's ThreadContext based TracerContextStorage implementation @@ -79,17 +78,7 @@ public Map headers(Map source) { } Span getCurrentSpan(String key) { - Optional optionalSpanFromContext = spanFromThreadContext(key); - return optionalSpanFromContext.orElse(spanFromHeader()); - } - - private Optional spanFromThreadContext(String key) { SpanReference currentSpanRef = threadContext.getTransient(key); - return (currentSpanRef == null) ? Optional.empty() : Optional.ofNullable(currentSpanRef.getSpan()); - } - - private Span spanFromHeader() { - Optional span = tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders()); - return span.orElse(null); + return (currentSpanRef == null) ? null : currentSpanRef.getSpan(); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java b/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java index 1cb73e0247c3a..b0cecb0ee485d 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java @@ -62,6 +62,13 @@ public void close() { } } + protected TracerContextStorage createTracerContextStorage( + TracingTelemetry tracingTelemetry, + ThreadContext threadContext + ) { + return new ThreadContextBasedTracerContextStorage(threadContext, tracingTelemetry); + } + private Tracer tracer(Optional telemetry, ThreadContext threadContext) { return telemetry.map(Telemetry::getTracingTelemetry) .map(tracingTelemetry -> createDefaultTracer(tracingTelemetry, threadContext)) @@ -70,10 +77,7 @@ private Tracer tracer(Optional telemetry, ThreadContext threadContext } private Tracer createDefaultTracer(TracingTelemetry tracingTelemetry, ThreadContext threadContext) { - TracerContextStorage tracerContextStorage = new ThreadContextBasedTracerContextStorage( - threadContext, - tracingTelemetry - ); + TracerContextStorage tracerContextStorage = createTracerContextStorage(tracingTelemetry, threadContext); return new DefaultTracer(tracingTelemetry, tracerContextStorage); } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java index 631fb8242d78e..dfe456a0a6784 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java @@ -13,7 +13,7 @@ import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.io.IOException; -import java.util.List; +import java.util.Collection; import java.util.Map; /** @@ -75,7 +75,7 @@ Tracer getDelegateTracer() { } @Override - public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { + public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { return defaultTracer.startSpan(spanCreationContext, headers); } } diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index c14a53e799319..a8315c3cae4e0 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -57,6 +57,10 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; /** * Handler for inbound data @@ -188,11 +192,16 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st } } + private Map> extractHeaders(Map headers) { + return headers.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> Collections.singleton(e.getValue()))); + } + private void handleRequest(TcpChannel channel, Header header, InboundMessage message) throws IOException { final String action = header.getActionName(); final long requestId = header.getRequestId(); final Version version = header.getVersion(); - Span span = tracer.startSpan(SpanBuilder.from(action, channel)); + final Map> headers = extractHeaders(header.getHeaders().v1()); + Span span = tracer.startSpan(SpanBuilder.from(action, channel), headers); try (SpanScope spanScope = tracer.withSpanInScope(span)) { if (header.isHandshake()) { messageListener.onRequestReceived(requestId, action); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index de88c3619abe8..5aeed72f306db 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -867,59 +867,18 @@ public final void sendRequest( final TransportRequestOptions options, final TransportResponseHandler handler ) { - final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); - try (SpanScope spanScope = tracer.withSpanInScope(span)) { - TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer); - try { - logger.debug("Action: " + action); - final TransportResponseHandler delegate; - if (request.getParentTask().isSet()) { - // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. - final Releasable unregisterChildNode = taskManager.registerChildNode( - request.getParentTask().getId(), - connection.getNode() - ); - delegate = new TransportResponseHandler() { - @Override - public void handleResponse(T response) { - unregisterChildNode.close(); - traceableTransportResponseHandler.handleResponse(response); - } - - @Override - public void handleException(TransportException exp) { - unregisterChildNode.close(); - traceableTransportResponseHandler.handleException(exp); - } - - @Override - public String executor() { - return traceableTransportResponseHandler.executor(); - } - - @Override - public T read(StreamInput in) throws IOException { - return traceableTransportResponseHandler.read(in); - } - - @Override - public String toString() { - return getClass().getName() + "/[" + action + "]:" + handler.toString(); - } - }; - } else { - delegate = traceableTransportResponseHandler; - } - asyncSender.sendRequest(connection, action, request, options, delegate); - } catch (final Exception ex) { - // the caller might not handle this so we invoke the handler - final TransportException te; - if (ex instanceof TransportException) { - te = (TransportException) ex; - } else { - te = new TransportException("failure to send", ex); - } - traceableTransportResponseHandler.handleException(te); + if (connection == localNodeConnection) { + // See please https://github.com/opensearch-project/OpenSearch/issues/10291 + sendRequestAsync(connection, action, request, options, handler); + } else { + final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create( + handler, + span, + tracer + ); + sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler); } } } @@ -1690,4 +1649,61 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } } } + + private void sendRequestAsync( + final Transport.Connection connection, + final String action, + final TransportRequest request, + final TransportRequestOptions options, + final TransportResponseHandler handler + ) { + try { + logger.debug("Action: " + action); + final TransportResponseHandler delegate; + if (request.getParentTask().isSet()) { + // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. + final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode()); + delegate = new TransportResponseHandler() { + @Override + public void handleResponse(T response) { + unregisterChildNode.close(); + handler.handleResponse(response); + } + + @Override + public void handleException(TransportException exp) { + unregisterChildNode.close(); + handler.handleException(exp); + } + + @Override + public String executor() { + return handler.executor(); + } + + @Override + public T read(StreamInput in) throws IOException { + return handler.read(in); + } + + @Override + public String toString() { + return getClass().getName() + "/[" + action + "]:" + handler.toString(); + } + }; + } else { + delegate = handler; + } + asyncSender.sendRequest(connection, action, request, options, delegate); + } catch (final Exception ex) { + // the caller might not handle this so we invoke the handler + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); + } + handler.handleException(te); + } + } } diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java new file mode 100644 index 0000000000000..3a98a67b53920 --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java @@ -0,0 +1,174 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.concurrent.ThreadContext.StoredContext; +import org.opensearch.telemetry.Telemetry; +import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.telemetry.metrics.MetricsTelemetry; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.telemetry.tracing.MockTracingTelemetry; +import org.junit.After; +import org.junit.Before; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING; +import static org.opensearch.telemetry.TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING; +import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; + +public class ThreadContextBasedTracerContextStorageTests extends OpenSearchTestCase { + private Tracer tracer; + private ThreadContext threadContext; + private TracerContextStorage threadContextStorage; + private ExecutorService executorService; + + @SuppressWarnings("resource") + @Before + public void setUp() throws Exception { + super.setUp(); + + final Settings settings = Settings.builder() + .put(TRACER_ENABLED_SETTING.getKey(), true) + .put(TRACER_SAMPLER_PROBABILITY.getKey(), 1d) + .put(TRACER_FEATURE_ENABLED_SETTING.getKey(), true) + .build(); + + final TelemetrySettings telemetrySettings = new TelemetrySettings( + settings, + new ClusterSettings(Settings.EMPTY, Set.of(TRACER_ENABLED_SETTING, TRACER_SAMPLER_PROBABILITY)) + ); + + final TracingTelemetry tracingTelemetry = new MockTracingTelemetry(); + + threadContext = new ThreadContext(Settings.EMPTY); + threadContextStorage = new ThreadContextBasedTracerContextStorage(threadContext, tracingTelemetry); + + tracer = new TracerFactory(telemetrySettings, Optional.of(new Telemetry() { + @Override + public MetricsTelemetry getMetricsTelemetry() { + return null; + } + + @Override + public TracingTelemetry getTracingTelemetry() { + return tracingTelemetry; + } + }), threadContext) { + @Override + protected TracerContextStorage createTracerContextStorage( + TracingTelemetry tracingTelemetry, + ThreadContext threadContext + ) { + return threadContextStorage; + } + }.getTracer(); + + executorService = Executors.newSingleThreadExecutor(); + assertThat(tracer, not(instanceOf(NoopTracer.class))); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + executorService.shutdown(); + tracer.close(); + } + + public void testStartingSpanDoesNotChangeThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testSpanInScopeChangesThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testStashingPropagatesThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + try (StoredContext ignored = threadContext.stashContext()) { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + } + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testPreservingContextThreadContext() throws InterruptedException, ExecutionException, TimeoutException { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + final Runnable r = new Runnable() { + @Override + public void run() { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + } + }; + + executorService.submit(threadContext.preserveContext(r)).get(1, TimeUnit.SECONDS); + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testPreservingContextAndStashingThreadContext() throws InterruptedException, ExecutionException, TimeoutException { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + final Runnable r = new Runnable() { + @Override + public void run() { + final Span local = tracer.startSpan(SpanCreationContext.internal().name("test-local")); + try (SpanScope localScope = tracer.withSpanInScope(local)) { + try (StoredContext ignored = threadContext.stashContext()) { + assertThat( + threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), + is(not(nullValue())) + ); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(local)); + } + } + } + }; + + executorService.submit(threadContext.preserveContext(r)).get(1, TimeUnit.SECONDS); + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } +} diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java index 6d0cd6d0b1290..4c58352531ca8 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java @@ -14,7 +14,7 @@ import org.opensearch.telemetry.tracing.TracingContextPropagator; import org.opensearch.telemetry.tracing.attributes.Attributes; -import java.util.List; +import java.util.Collection; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -52,7 +52,7 @@ public Optional extract(Map props) { } @Override - public Optional extractFromHeaders(Map> headers) { + public Optional extractFromHeaders(Map> headers) { if (headers != null) { Map convertedHeader = headers.entrySet() .stream() diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java index 8d8c18fb9ef6b..9b5d84954908b 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java @@ -49,7 +49,7 @@ private String printProblematicSpansMap(Map> spanMap) StringBuilder sb = new StringBuilder(); for (var entry : spanMap.entrySet()) { sb.append("SpanData validation failed for validator " + entry.getKey()); - sb.append("/n"); + sb.append("\n"); for (MockSpanData span : entry.getValue()) { sb.append(span.toString()); } diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java index 5fe268a8f0581..045d3a85e21e7 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java @@ -13,9 +13,9 @@ import org.opensearch.test.telemetry.tracing.TracingValidator; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; /** @@ -41,13 +41,10 @@ public NumberOfTraceIDsEqualToRequests(Attributes attributes) { */ @Override public List validate(List spans, int requests) { - Set totalTraceIDs = spans.stream() - .filter(span -> isMatchingSpan(span)) - .map(MockSpanData::getTraceID) - .collect(Collectors.toSet()); + final Collection totalTraceIDs = spans.stream().filter(span -> isMatchingSpan(span)).collect(Collectors.toList()); List problematicSpans = new ArrayList<>(); - if (totalTraceIDs.size() != requests) { - problematicSpans.addAll(spans); + if (totalTraceIDs.stream().map(MockSpanData::getTraceID).distinct().count() != requests) { + problematicSpans.addAll(totalTraceIDs); } return problematicSpans; }