diff --git a/CHANGELOG.md b/CHANGELOG.md index 441de802f6ea8..6c26302cb2272 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -122,6 +122,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `actions/setup-java` from 2 to 3 ([#9457](https://github.com/opensearch-project/OpenSearch/pull/9457)) - Bump `com.google.api:gax` from 2.27.0 to 2.32.0 ([#9300](https://github.com/opensearch-project/OpenSearch/pull/9300)) - Bump `netty` from 4.1.96.Final to 4.1.97.Final ([#9553](https://github.com/opensearch-project/OpenSearch/pull/9553)) +- Bump `io.grpc:grpc-api` from 1.57.1 to 1.57.2 ([#9578](https://github.com/opensearch-project/OpenSearch/pull/9578)) ### Changed - Default to mmapfs within hybridfs ([#8508](https://github.com/opensearch-project/OpenSearch/pull/8508)) @@ -156,9 +157,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412)) - Fix sort related ITs for concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9466) - Removing the vec file extension from INDEX_STORE_HYBRID_NIO_EXTENSIONS, to ensure the no performance degradation for vector search via Lucene Engine.([#9528](https://github.com/opensearch-project/OpenSearch/pull/9528))) +- Add support to use trace propagated from client ([#9506](https://github.com/opensearch-project/OpenSearch/pull/9506)) - Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469)) - [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/)) - [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264)) +- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562)) ### Deprecated @@ -172,8 +175,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Handle null partSize in OnDemandBlockSnapshotIndexInput ([#9291](https://github.com/opensearch-project/OpenSearch/issues/9291)) - Fix condition to remove index create block ([#9437](https://github.com/opensearch-project/OpenSearch/pull/9437)) - Add support to clear archived index setting ([#9019](https://github.com/opensearch-project/OpenSearch/pull/9019)) +- [Segment Replication] Fixed bug where replica shard temporarily serves stale data during an engine reset ([#9495](https://github.com/opensearch-project/OpenSearch/pull/9495)) ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x \ No newline at end of file diff --git a/TESTING.md b/TESTING.md index a76ee07e1faac..1c91d60840d61 100644 --- a/TESTING.md +++ b/TESTING.md @@ -23,6 +23,7 @@ OpenSearch uses [jUnit](https://junit.org/junit5/) for testing, it also uses ran - [Iterating on packaging tests](#iterating-on-packaging-tests) - [Testing backwards compatibility](#testing-backwards-compatibility) - [BWC Testing against a specific remote/branch](#bwc-testing-against-a-specific-remotebranch) + - [BWC Testing with security](#bwc-testing-with-security) - [Skip fetching latest](#skip-fetching-latest) - [How to write good tests?](#how-to-write-good-tests) - [Base classes for test cases](#base-classes-for-test-cases) @@ -406,6 +407,29 @@ Example: Say you need to make a change to `main` and have a BWC layer in `5.x`. You will need to: . Create a branch called `index_req_change` off your remote `${remote}`. This will contain your change. . Create a branch called `index_req_bwc_5.x` off `5.x`. This will contain your bwc layer. . Push both branches to your remote repository. . Run the tests with `./gradlew check -Dbwc.remote=${remote} -Dbwc.refspec.5.x=index_req_bwc_5.x`. +## BWC Testing with security + +You may want to run BWC tests for a secure OpenSearch cluster. In order to do this, you will need to follow a few additional steps: + +1. Clone the OpenSearch Security repository from https://github.com/opensearch-project/security. +2. Get both the old version of the Security plugin (the version you wish to come from) and the new version of the Security plugin (the version you wish to go to). This can be done either by fetching the maven artifact with a command like `wget https://repo1.maven.org/maven2/org/opensearch/plugin/opensearch-security/.0/opensearch-security-.0.zip` or by running `./gradlew assemble` from the base of the Security repository. +3. Move both of the Security artifacts into new directories at the path `/security/bwc-test/src/test/resources/.0`. You should end up with two different directories in `/security/bwc-test/src/test/resources/`, one named the old version and one the new version. +4. Run the following command from the base of the Security repository: + +``` + ./gradlew -p bwc-test clean bwcTestSuite \ + -Dtests.security.manager=false \ + -Dtests.opensearch.http.protocol=https \ + -Dtests.opensearch.username=admin \ + -Dtests.opensearch.password=admin \ + -PcustomDistributionUrl="/OpenSearch/distribution/archives/linux-tar/build/distributions/opensearch-min--SNAPSHOT-linux-x64.tar.gz" \ + -i +``` + +`-Dtests.security.manager=false` handles access issues when attempting to read the certificates from the file system. +`-Dtests.opensearch.http.protocol=https` tells the wait for cluster startup task to do the right thing. +`-PcustomDistributionUrl=...` uses a custom build of the distribution of OpenSearch. This is unnecessary when running against standard/unmodified OpenSearch core distributions. + ### Skip fetching latest For some BWC testing scenarios, you want to use the local clone of the repository without fetching latest. For these use cases, you can set the system property `tests.bwc.git_fetch_latest` to `false` and the BWC builds will skip fetching the latest from the remote. diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java index 2f3a425f96703..bc1a08e2d3c72 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java @@ -12,6 +12,9 @@ import java.io.Closeable; import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; /** * @@ -44,7 +47,7 @@ public SpanScope startSpan(String spanName) { @Override public SpanScope startSpan(String spanName, Attributes attributes) { - return startSpan(spanName, null, attributes); + return startSpan(spanName, (SpanContext) null, attributes); } @Override @@ -97,4 +100,10 @@ protected void addDefaultAttributes(Span span) { span.addAttribute(THREAD_NAME, Thread.currentThread().getName()); } + @Override + public SpanScope startSpan(String spanName, Map> headers, Attributes attributes) { + Optional propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers); + return startSpan(spanName, propagatedSpan.map(SpanContext::new).orElse(null), attributes); + } + } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java index bc55b26abc761..40cc5dfd2d743 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java @@ -9,6 +9,7 @@ package org.opensearch.telemetry.tracing; import org.opensearch.telemetry.tracing.attributes.Attributes; +import org.opensearch.telemetry.tracing.http.HttpTracer; import java.io.Closeable; @@ -18,7 +19,7 @@ * * All methods on the Tracer object are multi-thread safe. */ -public interface Tracer extends Closeable { +public interface Tracer extends HttpTracer, Closeable { /** * Starts the {@link Span} with given name diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java index 3e4a377d33a3d..52e272afcd07f 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java @@ -8,7 +8,9 @@ package org.opensearch.telemetry.tracing; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; /** @@ -23,7 +25,15 @@ public interface TracingContextPropagator { * @param props properties * @return current span */ - Span extract(Map props); + Optional extract(Map props); + + /** + * Extracts current span from HTTP headers. + * + * @param headers request headers to extract the context from + * @return current span + */ + Optional extractFromHeaders(Map> headers); /** * Injects tracing context diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java new file mode 100644 index 0000000000000..64ef84335a95b --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.http; + +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.attributes.Attributes; + +import java.util.List; +import java.util.Map; + +/** + * HttpTracer helps in creating a {@link Span} which reads the incoming tracing information + * from the HttpRequest header and propagate the span accordingly. + * + * All methods on the Tracer object are multi-thread safe. + */ +public interface HttpTracer { + /** + * Start the span with propagating the tracing info from the HttpRequest header. + * + * @param spanName span name. + * @param header http request header. + * @param attributes span attributes. + * @return scope of the span, must be closed with explicit close or with try-with-resource + */ + SpanScope startSpan(String spanName, Map> header, Attributes attributes); +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java new file mode 100644 index 0000000000000..9feb862a4e010 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Contains No-op implementations + */ +package org.opensearch.telemetry.tracing.http; diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java index 1a37ed0d0f245..2ff50bf3bcb18 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java @@ -13,6 +13,9 @@ import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.telemetry.tracing.attributes.Attributes; +import java.util.List; +import java.util.Map; + /** * No-op implementation of Tracer * @@ -51,4 +54,9 @@ public SpanContext getCurrentSpan() { public void close() { } + + @Override + public SpanScope startSpan(String spanName, Map> header, Attributes attributes) { + return SpanScope.NO_OP; + } } diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java index 07abd43c8dd7b..150992da06f89 100644 --- a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java @@ -17,6 +17,10 @@ import org.junit.Assert; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -104,6 +108,28 @@ public void testCreateSpanWithParent() { Assert.assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan()); } + public void testHttpTracer() { + String traceId = "trace_id"; + String spanId = "span_id"; + TracingTelemetry tracingTelemetry = new MockTracingTelemetry(); + + DefaultTracer defaultTracer = new DefaultTracer( + tracingTelemetry, + new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry) + ); + + Map> requestHeaders = new HashMap<>(); + requestHeaders.put("traceparent", Arrays.asList(traceId + "~" + spanId)); + + SpanScope spanScope = defaultTracer.startSpan("test_span", requestHeaders, Attributes.EMPTY); + SpanContext currentSpan = defaultTracer.getCurrentSpan(); + assertNotNull(currentSpan); + assertEquals(traceId, currentSpan.getSpan().getTraceId()); + assertEquals(traceId, currentSpan.getSpan().getParentSpan().getTraceId()); + assertEquals(spanId, currentSpan.getSpan().getParentSpan().getSpanId()); + spanScope.close(); + } + public void testCreateSpanWithNullParent() { TracingTelemetry tracingTelemetry = new MockTracingTelemetry(); DefaultTracer defaultTracer = new DefaultTracer( @@ -111,7 +137,7 @@ public void testCreateSpanWithNullParent() { new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry) ); - defaultTracer.startSpan("span_name", null, Attributes.EMPTY); + defaultTracer.startSpan("span_name"); Assert.assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName()); Assert.assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan()); diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/TraceableRunnableTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/TraceableRunnableTests.java index f1df3b24e1c9b..bcd8ffe41a17b 100644 --- a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/TraceableRunnableTests.java +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/TraceableRunnableTests.java @@ -54,16 +54,16 @@ public void testRunnableWithParent() throws Exception { DefaultTracer defaultTracer = new DefaultTracer(new MockTracingTelemetry(), contextStorage); defaultTracer.startSpan(parentSpanName); SpanContext parentSpan = defaultTracer.getCurrentSpan(); - AtomicReference currrntSpan = new AtomicReference<>(new SpanContext(null)); + AtomicReference currentSpan = new AtomicReference<>(); final AtomicBoolean isRunnableCompleted = new AtomicBoolean(false); TraceableRunnable traceableRunnable = new TraceableRunnable(defaultTracer, spanName, parentSpan, Attributes.EMPTY, () -> { isRunnableCompleted.set(true); - currrntSpan.set(defaultTracer.getCurrentSpan()); + currentSpan.set(defaultTracer.getCurrentSpan()); }); traceableRunnable.run(); assertTrue(isRunnableCompleted.get()); - assertEquals(spanName, currrntSpan.get().getSpan().getSpanName()); - assertEquals(parentSpan.getSpan(), currrntSpan.get().getSpan().getParentSpan()); + assertEquals(spanName, currentSpan.get().getSpan().getSpanName()); + assertEquals(parentSpan.getSpan(), currentSpan.get().getSpan().getParentSpan()); assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan()); } } diff --git a/plugins/repository-gcs/build.gradle b/plugins/repository-gcs/build.gradle index 6d4e56cb81a11..e7ee980114e5e 100644 --- a/plugins/repository-gcs/build.gradle +++ b/plugins/repository-gcs/build.gradle @@ -86,7 +86,7 @@ dependencies { api "org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}" api "commons-codec:commons-codec:${versions.commonscodec}" api 'org.threeten:threetenbp:1.4.4' - api 'io.grpc:grpc-api:1.57.1' + api 'io.grpc:grpc-api:1.57.2' api 'io.opencensus:opencensus-api:0.31.1' api 'io.opencensus:opencensus-contrib-http-util:0.31.1' diff --git a/plugins/repository-gcs/licenses/grpc-api-1.57.1.jar.sha1 b/plugins/repository-gcs/licenses/grpc-api-1.57.1.jar.sha1 deleted file mode 100644 index c52d208334070..0000000000000 --- a/plugins/repository-gcs/licenses/grpc-api-1.57.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -2a7f06d11b65839cf222159b4e947a22eddc59e6 \ No newline at end of file diff --git a/plugins/repository-gcs/licenses/grpc-api-1.57.2.jar.sha1 b/plugins/repository-gcs/licenses/grpc-api-1.57.2.jar.sha1 new file mode 100644 index 0000000000000..8b320fdd2f9cc --- /dev/null +++ b/plugins/repository-gcs/licenses/grpc-api-1.57.2.jar.sha1 @@ -0,0 +1 @@ +c71a006b81ddae7bc4b7cb1d2da78c1b173761f4 \ No newline at end of file diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java index 739a6367ccb2e..f8fe885ee450c 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java @@ -8,7 +8,12 @@ package org.opensearch.telemetry.tracing; +import org.opensearch.core.common.Strings; + +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; import io.opentelemetry.api.OpenTelemetry; @@ -32,8 +37,12 @@ public OTelTracingContextPropagator(OpenTelemetry openTelemetry) { } @Override - public Span extract(Map props) { + public Optional extract(Map props) { Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), props, TEXT_MAP_GETTER); + return Optional.ofNullable(getPropagatedSpan(context)); + } + + private static OTelPropagatedSpan getPropagatedSpan(Context context) { if (context != null) { io.opentelemetry.api.trace.Span span = io.opentelemetry.api.trace.Span.fromContext(context); return new OTelPropagatedSpan(span); @@ -41,6 +50,12 @@ public Span extract(Map props) { return null; } + @Override + public Optional extractFromHeaders(Map> headers) { + Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER); + return Optional.ofNullable(getPropagatedSpan(context)); + } + @Override public void inject(Span currentSpan, BiConsumer setter) { openTelemetry.getPropagators().getTextMapPropagator().inject(context((OTelSpan) currentSpan), setter, TEXT_MAP_SETTER); @@ -72,4 +87,23 @@ public String get(Map headers, String key) { } }; + private static final TextMapGetter>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() { + @Override + public Iterable keys(Map> headers) { + if (headers != null) { + return headers.keySet(); + } else { + return Collections.emptySet(); + } + } + + @Override + public String get(Map> headers, String key) { + if (headers != null && headers.containsKey(key)) { + return Strings.collectionToCommaDelimitedString(headers.get(key)); + } + return null; + } + }; + } diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java index fcf7495f331af..16a3ec9493d5d 100644 --- a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java @@ -10,7 +10,9 @@ import org.opensearch.test.OpenSearchTestCase; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import io.opentelemetry.api.OpenTelemetry; @@ -19,6 +21,7 @@ import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.ContextPropagators; import static org.mockito.Mockito.mock; @@ -48,8 +51,39 @@ public void testExtractTracerContextFromHeader() { OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry); - org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders); + org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders).orElse(null); assertEquals(TRACE_ID, span.getTraceId()); assertEquals(SPAN_ID, span.getSpanId()); } + + public void testExtractTracerContextFromHttpHeader() { + Map> requestHeaders = new HashMap<>(); + requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00")); + OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); + when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); + TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry); + org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(requestHeaders).get(); + assertEquals(TRACE_ID, span.getTraceId()); + assertEquals(SPAN_ID, span.getSpanId()); + } + + public void testExtractTracerContextFromHttpHeaderNull() { + OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); + when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); + TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry); + org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(null).get(); + org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root())); + assertEquals(propagatedSpan.getTraceId(), span.getTraceId()); + assertEquals(propagatedSpan.getSpanId(), span.getSpanId()); + } + + public void testExtractTracerContextFromHttpHeaderEmpty() { + OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); + when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); + TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry); + org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(new HashMap<>()).get(); + org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root())); + assertEquals(propagatedSpan.getTraceId(), span.getTraceId()); + assertEquals(propagatedSpan.getSpanId(), span.getSpanId()); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 69cdd80bb5085..5855ed7470559 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -143,8 +143,9 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); assertNotNull(replicaShardRouting); assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); - refresh(INDEX_NAME); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + final SearchResponse response = client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + // new primary should have at least the doc count from the first set of segments. + assertTrue(response.getHits().getTotalHits().value >= 1); // assert we can index into the new primary. client().prepareIndex(INDEX_NAME).setId("3").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); diff --git a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java index 401bb390a9df1..0c444732fb12b 100644 --- a/server/src/main/java/org/opensearch/action/get/TransportGetAction.java +++ b/server/src/main/java/org/opensearch/action/get/TransportGetAction.java @@ -36,7 +36,6 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.single.shard.TransportSingleShardAction; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.routing.ShardIterator; @@ -49,12 +48,10 @@ import org.opensearch.index.get.GetResult; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Optional; /** * Performs the get operation. @@ -92,20 +89,11 @@ protected boolean resolveIndex(GetRequest request) { return true; } - static boolean isSegmentReplicationEnabled(ClusterState state, String indexName) { - return Optional.ofNullable(state.getMetadata().index(indexName)) - .map( - indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) - .equals(ReplicationType.SEGMENT) - ) - .orElse(false); - } - /** * Returns true if GET request should be routed to primary shards, else false. */ - public static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) { - return isSegmentReplicationEnabled(state, indexName) && realtime && preference == null; + protected static boolean shouldForcePrimaryRouting(ClusterState state, boolean realtime, String preference, String indexName) { + return state.isSegmentReplicationEnabled(indexName) && realtime && preference == null; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/ClusterState.java b/server/src/main/java/org/opensearch/cluster/ClusterState.java index 1b87a60c2ccf5..2fd58d3db4975 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterState.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterState.java @@ -61,6 +61,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.discovery.Discovery; +import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; import java.util.Collections; @@ -409,6 +410,21 @@ public boolean supersedes(ClusterState other) { } + /** + * Utility to identify whether input index belongs to SEGMENT replication in established cluster state. + * + * @param indexName Index name + * @return true if index belong SEGMENT replication, false otherwise + */ + public boolean isSegmentReplicationEnabled(String indexName) { + return Optional.ofNullable(this.getMetadata().index(indexName)) + .map( + indexMetadata -> ReplicationType.parseString(indexMetadata.getSettings().get(IndexMetadata.SETTING_REPLICATION_TYPE)) + .equals(ReplicationType.SEGMENT) + ) + .orElse(false); + } + /** * Metrics for cluster state. * diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 48556cc6b9709..b529dfbe13bf4 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -39,6 +39,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; @@ -57,6 +59,7 @@ public class NRTReplicationEngine extends Engine { private final CompletionStatsCache completionStatsCache; private final LocalCheckpointTracker localCheckpointTracker; private final WriteOnlyTranslogManager translogManager; + private final Lock flushLock = new ReentrantLock(); protected final ReplicaFileTracker replicaFileTracker; private volatile long lastReceivedPrimaryGen = SequenceNumbers.NO_OPS_PERFORMED; @@ -156,7 +159,7 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep // a lower gen from a newly elected primary shard that is behind this shard's last commit gen. // In that case we still commit into the next local generation. if (incomingGeneration != this.lastReceivedPrimaryGen) { - commitSegmentInfos(); + flush(false, true); translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo); translogManager.rollTranslogGeneration(); } @@ -184,7 +187,7 @@ private void commitSegmentInfos(SegmentInfos infos) throws IOException { translogManager.syncTranslog(); } - protected void commitSegmentInfos() throws IOException { + private void commitSegmentInfos() throws IOException { commitSegmentInfos(getLatestSegmentInfos()); } @@ -351,7 +354,27 @@ public boolean shouldPeriodicallyFlush() { } @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException {} + public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + ensureOpen(); + // readLock is held here to wait/block any concurrent close that acquires the writeLock. + try (final ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + if (flushLock.tryLock() == false) { + if (waitIfOngoing == false) { + return; + } + flushLock.lock(); + } + // we are now locked. + try { + commitSegmentInfos(); + } catch (IOException e) { + throw new FlushFailedEngineException(shardId, e); + } finally { + flushLock.unlock(); + } + } + } @Override public void forceMerge( @@ -365,6 +388,9 @@ public void forceMerge( @Override public GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException { + if (flushFirst) { + flush(false, true); + } try { final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory()); return new GatedCloseable<>(indexCommit, () -> {}); diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index ef8a6c9f36b0c..28931bb5a860f 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -890,11 +890,15 @@ public boolean shouldUseConcurrentSearch() { * Evaluate if parsed request supports concurrent segment search */ public void evaluateRequestShouldUseConcurrentSearch() { - if (aggregations() != null && aggregations().factories() != null) { - requestShouldUseConcurrentSearch.set(aggregations().factories().allFactoriesSupportConcurrentSearch()); - } else { - requestShouldUseConcurrentSearch.set(true); - } + if (sort != null && sort.isSortOnTimeSeriesField()) { + requestShouldUseConcurrentSearch.set(false); + } else if (aggregations() != null + && aggregations().factories() != null + && !aggregations().factories().allFactoriesSupportConcurrentSearch()) { + requestShouldUseConcurrentSearch.set(false); + } else { + requestShouldUseConcurrentSearch.set(true); + } } public void setProfilers(Profilers profilers) { @@ -965,4 +969,12 @@ public int getTargetMaxSliceCount() { } return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING); } + + @Override + public boolean shouldUseTimeSeriesDescSortOptimization() { + return indexShard.isTimeSeriesDescSortOptimizationEnabled() + && sort != null + && sort.isSortOnTimeSeriesField() + && sort.sort.getSort()[0].getReverse() == false; + } } diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index 7974561d6187c..6c3d2bb278bd0 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -64,7 +64,6 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.CombinedBitSet; import org.apache.lucene.util.SparseFixedBitSet; -import org.opensearch.cluster.metadata.DataStream; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.search.DocValueFormat; @@ -268,10 +267,11 @@ public void search( @Override protected void search(List leaves, Weight weight, Collector collector) throws IOException { - if (shouldReverseLeafReaderContexts()) { - // reverse the segment search order if this flag is true. - // Certain queries can benefit if we reverse the segment read order, - // for example time series based queries if searched for desc sort order. + // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. + // This is actually beneficial for search queries to start search on latest segments first for time series workload. + // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf + // reader order here. + if (searchContext.shouldUseTimeSeriesDescSortOptimization()) { for (int i = leaves.size() - 1; i >= 0; i--) { searchLeaf(leaves.get(i), weight, collector); } @@ -517,26 +517,6 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException { return true; } - private boolean shouldReverseLeafReaderContexts() { - // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. - // This is actually beneficial for search queries to start search on latest segments first for time series workload. - // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf - // reader order here. - if (searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) { - // Only reverse order for asc order sort queries - if (searchContext.sort() != null - && searchContext.sort().sort != null - && searchContext.sort().sort.getSort() != null - && searchContext.sort().sort.getSort().length > 0 - && searchContext.sort().sort.getSort()[0].getReverse() == false - && searchContext.sort().sort.getSort()[0].getField() != null - && searchContext.sort().sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) { - return true; - } - } - return false; - } - // package-private for testing LeafSlice[] slicesInternal(List leaves, int targetMaxSlice) { LeafSlice[] leafSlices; diff --git a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java index 32de5fc9864ce..151ef97a2a141 100644 --- a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java @@ -569,4 +569,9 @@ public boolean shouldUseConcurrentSearch() { public int getTargetMaxSliceCount() { return in.getTargetMaxSliceCount(); } + + @Override + public boolean shouldUseTimeSeriesDescSortOptimization() { + return in.shouldUseTimeSeriesDescSortOptimization(); + } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 590ce4b077cbc..dce6da897a74b 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -487,4 +487,6 @@ public String toString() { public abstract BucketCollectorProcessor bucketCollectorProcessor(); public abstract int getTargetMaxSliceCount(); + + public abstract boolean shouldUseTimeSeriesDescSortOptimization(); } diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java index 115f7503631c1..631ace41090d7 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java @@ -58,9 +58,10 @@ public boolean searchWith( boolean hasTimeout ) throws IOException { if (searchContext.shouldUseConcurrentSearch()) { - LOGGER.info("Using concurrent search over segments (experimental)"); + LOGGER.debug("Using concurrent search over segments (experimental) for request with context id {}", searchContext.id()); return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); } else { + LOGGER.debug("Using non-concurrent search over segments for request with context id {}", searchContext.id()); return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); } } @@ -73,9 +74,13 @@ public boolean searchWith( @Override public AggregationProcessor aggregationProcessor(SearchContext searchContext) { if (searchContext.shouldUseConcurrentSearch()) { - LOGGER.info("Using concurrent search over segments (experimental)"); + LOGGER.debug( + "Using concurrent aggregation processor over segments (experimental) for request with context id {}", + searchContext.id() + ); return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext); } else { + LOGGER.debug("Using non-concurrent aggregation processor over segments for request with context id {}", searchContext.id()); return defaultQueryPhaseSearcher.aggregationProcessor(searchContext); } } diff --git a/server/src/main/java/org/opensearch/search/sort/SortAndFormats.java b/server/src/main/java/org/opensearch/search/sort/SortAndFormats.java index 272b1e9c1dc8d..e65187e558aef 100644 --- a/server/src/main/java/org/opensearch/search/sort/SortAndFormats.java +++ b/server/src/main/java/org/opensearch/search/sort/SortAndFormats.java @@ -32,6 +32,7 @@ package org.opensearch.search.sort; import org.apache.lucene.search.Sort; +import org.opensearch.cluster.metadata.DataStream; import org.opensearch.search.DocValueFormat; /** @@ -52,4 +53,13 @@ public SortAndFormats(Sort sort, DocValueFormat[] formats) { this.formats = formats; } + /** + * @return true: if sort is on timestamp field, false: otherwise + */ + public boolean isSortOnTimeSeriesField() { + return sort.getSort().length > 0 + && sort.getSort()[0].getField() != null + && sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME); + } + } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index c009ab2391aab..a32facdc71146 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -90,6 +90,7 @@ private Optional spanFromThreadContext(String key) { } private Span spanFromHeader() { - return tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders()); + Optional span = tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders()); + return span.orElse(null); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java index 466abaac435f3..b699471be7f4c 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java @@ -13,6 +13,8 @@ import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.io.IOException; +import java.util.List; +import java.util.Map; /** * Wrapper implementation of Tracer. This delegates call to right tracer based on the tracer settings @@ -42,7 +44,7 @@ public SpanScope startSpan(String spanName) { @Override public SpanScope startSpan(String spanName, Attributes attributes) { - return startSpan(spanName, null, attributes); + return startSpan(spanName, (SpanContext) null, attributes); } @Override @@ -66,4 +68,9 @@ public void close() throws IOException { Tracer getDelegateTracer() { return telemetrySettings.isTracingEnabled() ? defaultTracer : NoopTracer.INSTANCE; } + + @Override + public SpanScope startSpan(String spanName, Map> headers, Attributes attributes) { + return defaultTracer.startSpan(spanName, headers, attributes); + } } diff --git a/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java b/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java index 63fe65d70d020..c4fb3271ae3ce 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterStateTests.java @@ -57,6 +57,7 @@ import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.TestCustomMetadata; @@ -73,6 +74,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -84,7 +86,7 @@ public void testSupersedes() { final DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), version); final DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), emptyMap(), emptySet(), version); final DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).build(); - ClusterName name = ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY); + ClusterName name = CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY); ClusterState noClusterManager1 = ClusterState.builder(name).version(randomInt(5)).nodes(nodes).build(); ClusterState noClusterManager2 = ClusterState.builder(name).version(randomInt(5)).nodes(nodes).build(); ClusterState withClusterManager1a = ClusterState.builder(name) @@ -115,6 +117,39 @@ public void testSupersedes() { ); } + public void testIsSegmentReplicationEnabled() { + final String indexName = "test"; + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); + Settings.Builder builder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) + .settings(builder) + .numberOfShards(1) + .numberOfReplicas(1); + Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build()); + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) + .build(); + assertTrue(clusterState.isSegmentReplicationEnabled(indexName)); + } + + public void testIsSegmentReplicationDisabled() { + final String indexName = "test"; + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1); + Metadata.Builder metadataBuilder = Metadata.builder().put(indexMetadataBuilder); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder().addAsNew(indexMetadataBuilder.build()); + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) + .build(); + assertFalse(clusterState.isSegmentReplicationEnabled(indexName)); + } + public void testBuilderRejectsNullCustom() { final ClusterState.Builder builder = ClusterState.builder(ClusterName.DEFAULT); final String key = randomAlphaOfLength(10); diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index cb93d3a8db20e..22eb5195af507 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -152,7 +152,7 @@ public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOExcepti assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); // commit the infos to push us to segments_3. - nrtEngine.commitSegmentInfos(); + nrtEngine.flush(); assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); @@ -283,7 +283,7 @@ public void testTrimTranslogOps() throws Exception { } } - public void testCommitSegmentInfos() throws Exception { + public void testFlush() throws Exception { // This test asserts that NRTReplication#commitSegmentInfos creates a new commit point with the latest checkpoints // stored in user data. final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -304,7 +304,7 @@ public void testCommitSegmentInfos() throws Exception { LocalCheckpointTracker localCheckpointTracker = nrtEngine.getLocalCheckpointTracker(); final long maxSeqNo = localCheckpointTracker.getMaxSeqNo(); final long processedCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - nrtEngine.commitSegmentInfos(); + nrtEngine.flush(); // ensure getLatestSegmentInfos returns an updated infos ref with correct userdata. final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); @@ -322,6 +322,10 @@ public void testCommitSegmentInfos() throws Exception { userData = committedInfos.getUserData(); assertEquals(processedCheckpoint, Long.parseLong(userData.get(LOCAL_CHECKPOINT_KEY))); assertEquals(maxSeqNo, Long.parseLong(userData.get(MAX_SEQ_NO))); + + try (final GatedCloseable indexCommit = nrtEngine.acquireLastIndexCommit(true)) { + assertEquals(committedInfos.getGeneration() + 1, indexCommit.get().getGeneration()); + } } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 8e27c9ff9ae1a..ead9c1c22c931 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -236,7 +236,7 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception { MatcherAssert.assertThat( "Replica commits infos bytes referencing latest refresh point", latestReplicaCommit.files(true), - containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_5") + containsInAnyOrder("_0.cfe", "_0.si", "_0.cfs", "segments_6") ); MatcherAssert.assertThat( "Segments are referenced in memory", @@ -294,20 +294,20 @@ public void testRepicaCleansUpOldCommitsWhenReceivingNew() throws Exception { replicateSegments(primary, shards.getReplicas()); assertDocCount(primary, 1); assertDocCount(replica, 1); - assertEquals("segments_4", replica.store().readLastCommittedSegmentsInfo().getSegmentsFileName()); - assertSingleSegmentFile(replica, "segments_4"); + assertEquals("segments_5", replica.store().readLastCommittedSegmentsInfo().getSegmentsFileName()); + assertSingleSegmentFile(replica, "segments_5"); shards.indexDocs(1); primary.refresh("test"); replicateSegments(primary, shards.getReplicas()); assertDocCount(replica, 2); - assertSingleSegmentFile(replica, "segments_4"); + assertSingleSegmentFile(replica, "segments_5"); shards.indexDocs(1); flushShard(primary); replicateSegments(primary, shards.getReplicas()); assertDocCount(replica, 3); - assertSingleSegmentFile(replica, "segments_5"); + assertSingleSegmentFile(replica, "segments_6"); final Store.RecoveryDiff diff = Store.segmentReplicationDiff(primary.getSegmentMetadataMap(), replica.getSegmentMetadataMap()); assertTrue(diff.missing.isEmpty()); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 807b4a9cd7482..e8220830063ee 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -34,6 +34,7 @@ import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.engine.ReadOnlyEngine; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.replication.TestReplicationSource; @@ -71,6 +72,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; @@ -773,6 +775,35 @@ public void testNoDuplicateSeqNo() throws Exception { } } + public void testQueryDuringEngineResetShowsDocs() throws Exception { + final NRTReplicationEngineFactory engineFactory = new NRTReplicationEngineFactory(); + final NRTReplicationEngineFactory spy = spy(engineFactory); + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, spy, createTempDir())) { + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + shards.startAll(); + shards.indexDocs(10); + shards.refresh("test"); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(10); + + final AtomicReference failed = new AtomicReference<>(); + doAnswer(ans -> { + try { + final Engine engineOrNull = replicaShard.getEngineOrNull(); + assertNotNull(engineOrNull); + assertTrue(engineOrNull instanceof ReadOnlyEngine); + shards.assertAllEqual(10); + } catch (Throwable e) { + failed.set(e); + } + return ans.callRealMethod(); + }).when(spy).newReadWriteEngine(any()); + shards.promoteReplicaToPrimary(replicaShard).get(); + assertNull("Expected correct doc count during engine reset", failed.get()); + } + } + /** * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because * it asserts point in time seqNos are relative to the doc counts. diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java index f569fe3b63af0..347011af98c6d 100644 --- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java @@ -40,16 +40,21 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.opensearch.Version; import org.opensearch.action.OriginalIndices; import org.opensearch.action.search.SearchType; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -75,6 +80,7 @@ import org.opensearch.search.rescore.RescoreContext; import org.opensearch.search.slice.SliceBuilder; import org.opensearch.search.sort.SortAndFormats; +import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -547,6 +553,159 @@ protected Engine.Searcher acquireSearcherInternal(String source) { } } + public void testSearchPathEvaluationUsingSortField() throws Exception { + // enable the concurrent set FeatureFlag + FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH); + ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class); + when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT); + ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1); + when(shardSearchRequest.shardId()).thenReturn(shardId); + + ThreadPool threadPool = new TestThreadPool(this.getClass().getName()); + IndexShard indexShard = mock(IndexShard.class); + QueryCachingPolicy queryCachingPolicy = mock(QueryCachingPolicy.class); + when(indexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy); + when(indexShard.getThreadPool()).thenReturn(threadPool); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .build(); + + IndexService indexService = mock(IndexService.class); + QueryShardContext queryShardContext = mock(QueryShardContext.class); + when(indexService.newQueryShardContext(eq(shardId.id()), any(), any(), nullable(String.class), anyBoolean())).thenReturn( + queryShardContext + ); + + IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); + when(indexService.getIndexSettings()).thenReturn(indexSettings); + + BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + + try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { + + final Supplier searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) { + @Override + protected void doClose() {} + + @Override + protected Engine.Searcher acquireSearcherInternal(String source) { + try { + IndexReader reader = w.getReader(); + return new Engine.Searcher( + "test", + reader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + reader + ); + } catch (IOException exc) { + throw new AssertionError(exc); + } + } + }; + + SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE); + ReaderContext readerContext = new ReaderContext( + newContextId(), + indexService, + indexShard, + searcherSupplier.get(), + randomNonNegativeLong(), + false + ); + + final ClusterService clusterService = mock(ClusterService.class); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING); + clusterSettings.applySettings( + Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + DefaultSearchContext context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + null, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + + // Case1: if sort is on timestamp field, non-concurrent path is used + context.sort( + new SortAndFormats(new Sort(new SortField("@timestamp", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW }) + ); + context.evaluateRequestShouldUseConcurrentSearch(); + assertFalse(context.shouldUseConcurrentSearch()); + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // Case2: if sort is on other field, concurrent path is used + context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + clusterService, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + context.sort( + new SortAndFormats(new Sort(new SortField("test2", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW }) + ); + context.evaluateRequestShouldUseConcurrentSearch(); + if (executor == null) { + assertFalse(context.shouldUseConcurrentSearch()); + } else { + assertTrue(context.shouldUseConcurrentSearch()); + } + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // Case 3: With no sort, concurrent path is used + context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + clusterService, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + context.evaluateRequestShouldUseConcurrentSearch(); + if (executor == null) { + assertFalse(context.shouldUseConcurrentSearch()); + } else { + assertTrue(context.shouldUseConcurrentSearch()); + } + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // shutdown the threadpool + threadPool.shutdown(); + } + } + private ShardSearchContextId newContextId() { return new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong()); } diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/WrappedTracerTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/WrappedTracerTests.java index f45381e3b4cc4..b70fe81d5f9c4 100644 --- a/server/src/test/java/org/opensearch/telemetry/tracing/WrappedTracerTests.java +++ b/server/src/test/java/org/opensearch/telemetry/tracing/WrappedTracerTests.java @@ -51,7 +51,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracer() throws Excepti wrappedTracer.startSpan("foo"); assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer); - verify(mockDefaultTracer).startSpan(eq("foo"), eq(null), any(Attributes.class)); + verify(mockDefaultTracer).startSpan(eq("foo"), eq((SpanContext) null), any(Attributes.class)); } } @@ -64,7 +64,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracerWithAttr() throws wrappedTracer.startSpan("foo", attributes); assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer); - verify(mockDefaultTracer).startSpan("foo", null, attributes); + verify(mockDefaultTracer).startSpan("foo", (SpanContext) null, attributes); } } diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index dd4a05b67271c..2fb345f73fb06 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -692,6 +692,15 @@ public int getTargetMaxSliceCount() { return maxSliceCount; } + @Override + public boolean shouldUseTimeSeriesDescSortOptimization() { + return indexShard != null + && indexShard.isTimeSeriesDescSortOptimizationEnabled() + && sort != null + && sort.isSortOnTimeSeriesField() + && sort.sort.getSort()[0].getReverse() == false; + } + /** * Clean the query results by consuming all of it */ diff --git a/test/telemetry/build.gradle b/test/telemetry/build.gradle index fbabe43aa5e5a..ca523a9204f4c 100644 --- a/test/telemetry/build.gradle +++ b/test/telemetry/build.gradle @@ -13,6 +13,7 @@ apply plugin: 'opensearch.build' apply plugin: 'opensearch.publish' dependencies { + api project(":libs:opensearch-core") api project(":libs:opensearch-common") api project(":libs:opensearch-telemetry") } diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java index dccf062df5ca5..7525b4424c243 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java @@ -8,13 +8,17 @@ package org.opensearch.test.telemetry.tracing; +import org.opensearch.core.common.Strings; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.TracingContextPropagator; import org.opensearch.telemetry.tracing.attributes.Attributes; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; +import java.util.stream.Collectors; /** * Mock {@link TracingContextPropagator} to persist the span for internode communication. @@ -34,18 +38,31 @@ public MockTracingContextPropagator(SpanProcessor spanProcessor) { } @Override - public Span extract(Map props) { + public Optional extract(Map props) { String value = props.get(TRACE_PARENT); if (value != null) { String[] values = value.split(SEPARATOR); String traceId = values[0]; String spanId = values[1]; - return new MockSpan(null, null, traceId, spanId, spanProcessor, Attributes.EMPTY); + return Optional.of(new MockSpan(null, null, traceId, spanId, spanProcessor, Attributes.EMPTY)); } else { - return null; + return Optional.empty(); } } + @Override + public Optional extractFromHeaders(Map> headers) { + if (headers != null) { + Map convertedHeader = headers.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> Strings.collectionToCommaDelimitedString(e.getValue()))); + return extract(convertedHeader); + } else { + return Optional.empty(); + } + + } + @Override public void inject(Span currentSpan, BiConsumer setter) { if (currentSpan instanceof MockSpan) {