Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into schedule-reroute
Browse files Browse the repository at this point in the history
  • Loading branch information
imRishN committed Sep 3, 2024
2 parents edc02b0 + 41ba00a commit f66e558
Show file tree
Hide file tree
Showing 34 changed files with 2,076 additions and 573 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
- [Workload Management] Add query group stats constructs ([#15343](https://github.com/opensearch-project/OpenSearch/pull/15343)))
- Add limit on number of processors for Ingest pipeline([#15460](https://github.com/opensearch-project/OpenSearch/pull/15465)).
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
- Optimize NodeIndicesStats output behind flag ([#14454](https://github.com/opensearch-project/OpenSearch/pull/14454))
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- Add fieldType to AbstractQueryBuilder and FieldSortBuilder ([#15328](https://github.com/opensearch-project/OpenSearch/pull/15328)))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
---
"search with approximate range":
- do:
indices.create:
index: test
body:
mappings:
properties:
date:
type: date
index: true
doc_values: true

- do:
bulk:
index: test
refresh: true
body:
- '{"index": {"_index": "test", "_id": "1" }}'
- '{ "date": "2018-10-29T12:12:12.987Z" }'
- '{ "index": { "_index": "test", "_id": "2" }}'
- '{ "date": "2020-10-29T12:12:12.987Z" }'
- '{ "index": { "_index": "test", "_id": "3" } }'
- '{ "date": "2024-10-29T12:12:12.987Z" }'

- do:
search:
rest_total_hits_as_int: true
index: test
body:
query:
range: {
date: {
gte: "2018-10-29T12:12:12.987Z"
},
}

- match: { hits.total: 3 }

- do:
search:
rest_total_hits_as_int: true
index: test
body:
sort: [{ date: asc }]
query:
range: {
date: {
gte: "2018-10-29T12:12:12.987Z"
},
}


- match: { hits.total: 3 }
- match: { hits.hits.0._id: "1" }

- do:
search:
rest_total_hits_as_int: true
index: test
body:
sort: [{ date: desc }]
query:
range: {
date: {
gte: "2018-10-29T12:12:12.987Z",
lte: "2020-10-29T12:12:12.987Z"
},
}

- match: { hits.total: 2 }
- match: { hits.hits.0._id: "2" }
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -17,6 +18,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Set;
import java.util.concurrent.CountDownLatch;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -75,10 +77,25 @@ public void testTimestampPinUnpin() throws Exception {

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// This should be a no-op as pinning entity is different
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener);
// Unpinning already pinned entity
remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener);

// This should fail as timestamp is not pinned by pinning entity
CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", new LatchedActionListener<>(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
// onResponse should not get called.
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}, latch));
latch.await();

// Adding different entity to already pinned timestamp
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);

Expand All @@ -93,4 +110,74 @@ public void testTimestampPinUnpin() throws Exception {

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}

public void testPinnedTimestampClone() throws Exception {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

long timestamp1 = System.currentTimeMillis() + 30000L;
long timestamp2 = System.currentTimeMillis() + 60000L;
long timestamp3 = System.currentTimeMillis() + 900000L;
remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener);

// Clone timestamp1
remoteStorePinnedTimestampService.cloneTimestamp(timestamp1, "ss2", "ss2-2", noOpActionListener);

// With clone, set of pinned timestamp will not change
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// Clone timestamp1 but provide invalid existing entity
CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.cloneTimestamp(
timestamp1,
"ss3",
"ss2-3",
new LatchedActionListener<>(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
// onResponse should not get called.
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}, latch)
);
latch.await();

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// Now we have timestamp1 pinned by 2 entities, unpin 1, this should not change set of pinned timestamps
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "ss2", noOpActionListener);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// Now unpin second entity as well, set of pinned timestamp should be reduced by 1
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "ss2-2", noOpActionListener);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2()));
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.ingest.CompoundProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.IngestService;
import org.opensearch.ingest.Pipeline;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -56,9 +57,11 @@ class SimulateExecutionService {
private static final String THREAD_POOL_NAME = ThreadPool.Names.MANAGEMENT;

private final ThreadPool threadPool;
private final IngestService ingestService;

SimulateExecutionService(ThreadPool threadPool) {
SimulateExecutionService(ThreadPool threadPool, IngestService ingestService) {
this.threadPool = threadPool;
this.ingestService = ingestService;
}

void executeDocument(
Expand Down Expand Up @@ -91,6 +94,9 @@ void executeDocument(
}

public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {

ingestService.validateProcessorCountForIngestPipeline(request.getPipeline());

threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
final AtomicInteger counter = new AtomicInteger();
final List<SimulateDocumentResult> responses = new CopyOnWriteArrayList<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public SimulatePipelineTransportAction(
(Writeable.Reader<SimulatePipelineRequest>) SimulatePipelineRequest::new
);
this.ingestService = ingestService;
this.executionService = new SimulateExecutionService(threadPool);
this.executionService = new SimulateExecutionService(threadPool, ingestService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.store.IndicesStore;
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsService;
import org.opensearch.monitor.jvm.JvmGcMonitorService;
Expand Down Expand Up @@ -406,6 +407,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterService.USER_DEFINED_METADATA,
ClusterManagerService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, // deprecated
ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS,
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/common/util/FeatureFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ public class FeatureFlags {
Property.NodeScope
);

/**
* Gates the functionality of ApproximatePointRangeQuery where we approximate query results.
*/
public static final String APPROXIMATE_POINT_RANGE_QUERY = "opensearch.experimental.feature.approximate_point_range_query.enabled";
public static final Setting<Boolean> APPROXIMATE_POINT_RANGE_QUERY_SETTING = Setting.boolSetting(
APPROXIMATE_POINT_RANGE_QUERY,
false,
Property.NodeScope
);

private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of(
REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
EXTENSIONS_SETTING,
Expand Down
Loading

0 comments on commit f66e558

Please sign in to comment.