Skip to content

Commit

Permalink
Merge branch 'main' into 9574
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish <[email protected]>
  • Loading branch information
ashking94 authored Sep 1, 2023
2 parents 015257d + 04c90c7 commit fd8e526
Show file tree
Hide file tree
Showing 34 changed files with 1,303 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [BWC and API enforcement] Decorate the existing APIs with proper annotations (part 1) ([#9520](https://github.com/opensearch-project/OpenSearch/pull/9520))
- Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622))
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))
- Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479))
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))
- Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584))

### Dependencies
Expand Down Expand Up @@ -170,6 +172,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.lucene.analysis.lt.LithuanianAnalyzer;
import org.apache.lucene.analysis.lv.LatvianAnalyzer;
import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
import org.apache.lucene.analysis.miscellaneous.DelimitedTermFrequencyTokenFilter;
import org.apache.lucene.analysis.miscellaneous.DisableGraphAttribute;
import org.apache.lucene.analysis.miscellaneous.KeywordRepeatFilter;
import org.apache.lucene.analysis.miscellaneous.LengthFilter;
Expand Down Expand Up @@ -265,6 +266,7 @@ public Map<String, AnalysisProvider<TokenFilterFactory>> getTokenFilters() {
);
filters.put("decimal_digit", DecimalDigitFilterFactory::new);
filters.put("delimited_payload", DelimitedPayloadTokenFilterFactory::new);
filters.put("delimited_term_freq", DelimitedTermFrequencyTokenFilterFactory::new);
filters.put("dictionary_decompounder", requiresAnalysisSettings(DictionaryCompoundWordTokenFilterFactory::new));
filters.put("dutch_stem", DutchStemTokenFilterFactory::new);
filters.put("edge_ngram", EdgeNGramTokenFilterFactory::new);
Expand Down Expand Up @@ -500,6 +502,13 @@ public List<PreConfiguredTokenFilter> getPreConfiguredTokenFilters() {
)
)
);
filters.add(
PreConfiguredTokenFilter.singleton(
"delimited_term_freq",
false,
input -> new DelimitedTermFrequencyTokenFilter(input, DelimitedTermFrequencyTokenFilterFactory.DEFAULT_DELIMITER)
)
);
filters.add(PreConfiguredTokenFilter.singleton("dutch_stem", false, input -> new SnowballFilter(input, new DutchStemmer())));
filters.add(PreConfiguredTokenFilter.singleton("edge_ngram", false, false, input -> new EdgeNGramTokenFilter(input, 1)));
filters.add(PreConfiguredTokenFilter.openSearchVersion("edgeNGram", false, false, (reader, version) -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analysis.common;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.miscellaneous.DelimitedTermFrequencyTokenFilter;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.AbstractTokenFilterFactory;

public class DelimitedTermFrequencyTokenFilterFactory extends AbstractTokenFilterFactory {
public static final char DEFAULT_DELIMITER = '|';
private static final String DELIMITER = "delimiter";
private final char delimiter;

DelimitedTermFrequencyTokenFilterFactory(IndexSettings indexSettings, Environment environment, String name, Settings settings) {
super(indexSettings, name, settings);
delimiter = parseDelimiter(settings);
}

@Override
public TokenStream create(TokenStream tokenStream) {
return new DelimitedTermFrequencyTokenFilter(tokenStream, delimiter);
}

private static char parseDelimiter(Settings settings) {
String delimiter = settings.get(DELIMITER);
if (delimiter == null) {
return DEFAULT_DELIMITER;
} else if (delimiter.length() == 1) {
return delimiter.charAt(0);
}

throw new IllegalArgumentException(
"Setting [" + DELIMITER + "] must be a single, non-null character. [" + delimiter + "] was provided."
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ protected Map<String, Class<?>> getTokenFilters() {
filters.put("cjkwidth", CJKWidthFilterFactory.class);
filters.put("cjkbigram", CJKBigramFilterFactory.class);
filters.put("delimitedpayload", DelimitedPayloadTokenFilterFactory.class);
filters.put("delimitedtermfrequency", DelimitedTermFrequencyTokenFilterFactory.class);
filters.put("keepword", KeepWordFilterFactory.class);
filters.put("type", KeepTypesFilterFactory.class);
filters.put("classic", ClassicFilterFactory.class);
Expand Down Expand Up @@ -202,6 +203,7 @@ protected Map<String, Class<?>> getPreConfiguredTokenFilters() {
filters.put("decimal_digit", null);
filters.put("delimited_payload_filter", org.apache.lucene.analysis.payloads.DelimitedPayloadTokenFilterFactory.class);
filters.put("delimited_payload", org.apache.lucene.analysis.payloads.DelimitedPayloadTokenFilterFactory.class);
filters.put("delimited_term_freq", org.apache.lucene.analysis.miscellaneous.DelimitedTermFrequencyTokenFilterFactory.class);
filters.put("dutch_stem", SnowballPorterFilterFactory.class);
filters.put("edge_ngram", null);
filters.put("edgeNGram", null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.analysis.common;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.Tokenizer;
import org.apache.lucene.analysis.core.WhitespaceTokenizer;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.analysis.tokenattributes.TermFrequencyAttribute;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.index.analysis.AnalysisTestsHelper;
import org.opensearch.index.analysis.TokenFilterFactory;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.OpenSearchTokenStreamTestCase;

import java.io.StringReader;

public class DelimitedTermFrequencyTokenFilterFactoryTests extends OpenSearchTokenStreamTestCase {

public void testDefault() throws Exception {
OpenSearchTestCase.TestAnalysis analysis = AnalysisTestsHelper.createTestAnalysisFromSettings(
Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put("index.analysis.filter.my_delimited_term_freq.type", "delimited_term_freq")
.build(),
new CommonAnalysisModulePlugin()
);
doTest(analysis, "cat|4 dog|5");
}

public void testDelimiter() throws Exception {
OpenSearchTestCase.TestAnalysis analysis = AnalysisTestsHelper.createTestAnalysisFromSettings(
Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put("index.analysis.filter.my_delimited_term_freq.type", "delimited_term_freq")
.put("index.analysis.filter.my_delimited_term_freq.delimiter", ":")
.build(),
new CommonAnalysisModulePlugin()
);
doTest(analysis, "cat:4 dog:5");
}

public void testDelimiterLongerThanOneCharThrows() {
IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> AnalysisTestsHelper.createTestAnalysisFromSettings(
Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
.put("index.analysis.filter.my_delimited_term_freq.type", "delimited_term_freq")
.put("index.analysis.filter.my_delimited_term_freq.delimiter", "^^")
.build(),
new CommonAnalysisModulePlugin()
)
);

assertEquals("Setting [delimiter] must be a single, non-null character. [^^] was provided.", ex.getMessage());
}

private void doTest(OpenSearchTestCase.TestAnalysis analysis, String source) throws Exception {
TokenFilterFactory tokenFilter = analysis.tokenFilter.get("my_delimited_term_freq");
Tokenizer tokenizer = new WhitespaceTokenizer();
tokenizer.setReader(new StringReader(source));

TokenStream stream = tokenFilter.create(tokenizer);

CharTermAttribute termAtt = stream.getAttribute(CharTermAttribute.class);
TermFrequencyAttribute tfAtt = stream.getAttribute(TermFrequencyAttribute.class);
stream.reset();
assertTermEquals("cat", stream, termAtt, tfAtt, 4);
assertTermEquals("dog", stream, termAtt, tfAtt, 5);
assertFalse(stream.incrementToken());
stream.end();
stream.close();
}

void assertTermEquals(String expected, TokenStream stream, CharTermAttribute termAtt, TermFrequencyAttribute tfAtt, int expectedTf)
throws Exception {
assertTrue(stream.incrementToken());
assertEquals(expected, termAtt.toString());
assertEquals(expectedTf, tfAtt.getTermFrequency());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,46 @@
- match: { tokens.0.token: foo }

---
"delimited_term_freq":
- skip:
version: " - 2.9.99"
reason: "delimited_term_freq token filter was added in v2.10.0"
- do:
indices.create:
index: test
body:
settings:
analysis:
filter:
my_delimited_term_freq:
type: delimited_term_freq
delimiter: ^
- do:
indices.analyze:
index: test
body:
text: foo^3
tokenizer: keyword
filter: [my_delimited_term_freq]
attributes: termFrequency
explain: true
- length: { detail.tokenfilters: 1 }
- match: { detail.tokenfilters.0.tokens.0.token: foo }
- match: { detail.tokenfilters.0.tokens.0.termFrequency: 3 }

# Test pre-configured token filter too:
- do:
indices.analyze:
body:
text: foo|100
tokenizer: keyword
filter: [delimited_term_freq]
attributes: termFrequency
explain: true
- length: { detail.tokenfilters: 1 }
- match: { detail.tokenfilters.0.tokens.0.token: foo }
- match: { detail.tokenfilters.0.tokens.0.termFrequency: 100 }
---
"keep_filter":
- do:
indices.create:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.opensearch.common.blobstore.BlobStoreException;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
import org.opensearch.common.blobstore.stream.read.ReadContext;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
Expand Down Expand Up @@ -211,6 +212,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
}
}

@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
throw new UnsupportedOperationException();
}

// package private for testing
long getLargeBlobThresholdInBytes() {
return blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;

import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -881,6 +882,17 @@ public void onFailure(Exception e) {}
}
}

public void testAsyncBlobDownload() {
final S3BlobStore blobStore = mock(S3BlobStore.class);
final BlobPath blobPath = mock(BlobPath.class);
final String blobName = "test-blob";

final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.readBlobAsync(blobName, new PlainActionFuture<>());
});
}

public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException {
testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public IntegrationTestOTelTelemetryPlugin(Settings settings) {
/**
* This method overrides getTelemetry() method in OTel plugin class, so we create only one instance of global OpenTelemetry
* resetForTest() will set OpenTelemetry to null again.
* @param settings cluster settings
* @param telemetrySettings telemetry settings
*/
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
GlobalOpenTelemetry.resetForTest();
return super.getTelemetry(settings);
return super.getTelemetry(telemetrySettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,17 @@ public List<Setting<?>> getSettings() {
}

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
return Optional.of(telemetry());
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
return Optional.of(telemetry(telemetrySettings));
}

@Override
public String getName() {
return OTEL_TRACER_NAME;
}

private Telemetry telemetry() {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(settings)), new MetricsTelemetry() {
private Telemetry telemetry(TelemetrySettings telemetrySettings) {
return new OTelTelemetry(new OTelTracingTelemetry(OTelResourceProvider.get(telemetrySettings, settings)), new MetricsTelemetry() {
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler;
import org.opensearch.telemetry.tracing.sampler.RequestSampler;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -37,15 +40,16 @@ private OTelResourceProvider() {}

/**
* Creates OpenTelemetry instance with default configuration
* @param telemetrySettings telemetry settings
* @param settings cluster settings
* @return OpenTelemetry instance
*/
public static OpenTelemetry get(Settings settings) {
public static OpenTelemetry get(TelemetrySettings telemetrySettings, Settings settings) {
return get(
settings,
OTelSpanExporterFactory.create(settings),
ContextPropagators.create(W3CTraceContextPropagator.getInstance()),
Sampler.alwaysOn()
Sampler.parentBased(new RequestSampler(new ProbabilisticSampler(telemetrySettings)))
);
}

Expand Down
Loading

0 comments on commit fd8e526

Please sign in to comment.