From 27c54934cde5c90ad2e4497c96fc8dc8d7fb251d Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Wed, 3 Aug 2022 14:48:54 -0400 Subject: [PATCH 01/11] Correctly ignore depandabot branches during push (#4077) Signed-off-by: Peter Zhu --- .github/workflows/gradle-check.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/gradle-check.yml b/.github/workflows/gradle-check.yml index dec5ee15d0bea..cbaa7fa10fbb6 100644 --- a/.github/workflows/gradle-check.yml +++ b/.github/workflows/gradle-check.yml @@ -2,9 +2,9 @@ name: Gradle Check (Jenkins) on: push: branches-ignore: - - 'backport/*' - - 'create-pull-request/*' - - 'dependabot/*' + - 'backport/**' + - 'create-pull-request/**' + - 'dependabot/**' pull_request_target: types: [opened, synchronize, reopened] From 3ab0d1f398e0b22b90237a6dd8db97e6376530dc Mon Sep 17 00:00:00 2001 From: Tianli Feng Date: Wed, 3 Aug 2022 12:16:51 -0700 Subject: [PATCH 02/11] Fix the bug that masterOperation(with task param) is bypassed (#4103) The method `protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener)` was not properly deprecated by the commit https://github.com/opensearch-project/OpenSearch/commit/2a1b239c2af8d7f333a404c66467a50dc0a52c80 / PR https://github.com/opensearch-project/OpenSearch/pull/403. There is a problem that it doesn't make any difference to whether overriding the method `void masterOperation(Task task, Request request, ClusterState state, ActionListener listener)` or not. The reason is the only usage for the method changed to call another method `clusterManagerOperation(task, request, clusterState, l)`, which is the default implementation for the method `masterOperation(with task param)`. There is no other usage for the method `masterOperation()` so it's bypassed. In the commit: - Change delegation model for method `clusterManagerOperation(with task param)` to call `masterOperation(with task param)`, according to the comment below https://github.com/opensearch-project/OpenSearch/pull/4103#discussion_r936946699 - Add a test to validate the backwards compatibility, which copied and modified from an existing test. Signed-off-by: Tianli Feng --- .../TransportClusterManagerNodeAction.java | 11 ++++-- .../info/TransportClusterInfoAction.java | 1 + ...ransportClusterManagerNodeActionTests.java | 37 +++++++++++++++++++ .../java/org/opensearch/test/TestCluster.java | 1 + 4 files changed, 47 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index d65ba3eddf776..a97f4ffe555b6 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -125,22 +125,27 @@ protected void masterOperation(Request request, ClusterState state, ActionListen throw new UnsupportedOperationException("Must be overridden"); } + // TODO: Add abstract keyword after removing the deprecated masterOperation() protected void clusterManagerOperation(Request request, ClusterState state, ActionListener listener) throws Exception { masterOperation(request, state, listener); } - /** @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #clusterManagerOperation(Task, ClusterManagerNodeRequest, ClusterState, ActionListener)} */ + /** + * Override this operation if access to the task parameter is needed + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #clusterManagerOperation(Task, ClusterManagerNodeRequest, ClusterState, ActionListener)} + */ @Deprecated protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - clusterManagerOperation(task, request, state, listener); + clusterManagerOperation(request, state, listener); } /** * Override this operation if access to the task parameter is needed */ + // TODO: Change the implementation to call 'clusterManagerOperation(request...)' after removing the deprecated masterOperation() protected void clusterManagerOperation(Task task, Request request, ClusterState state, ActionListener listener) throws Exception { - clusterManagerOperation(request, state, listener); + masterOperation(task, request, state, listener); } protected boolean localExecute(Request request) { diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java index 1411ff7b30695..c43256a61e8b4 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/info/TransportClusterInfoAction.java @@ -88,6 +88,7 @@ protected final void clusterManagerOperation(final Request request, final Cluste doClusterManagerOperation(request, concreteIndices, state, listener); } + // TODO: Add abstract keyword after removing the deprecated doMasterOperation() protected void doClusterManagerOperation( Request request, String[] concreteIndices, diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 9ec9e656257d6..1195ed2590b1e 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -274,6 +274,43 @@ protected void clusterManagerOperation(Task task, Request request, ClusterState } } + /* The test is copied from testLocalOperationWithoutBlocks() + to validate the backwards compatibility for the deprecated method masterOperation(with task parameter). */ + public void testDeprecatedMasterOperationWithTaskParameterCanBeCalled() throws ExecutionException, InterruptedException { + final boolean clusterManagerOperationFailure = randomBoolean(); + + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + + final Exception exception = new Exception(); + final Response response = new Response(); + + setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, allNodes)); + + new Action("internal:testAction", transportService, clusterService, threadPool) { + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + if (clusterManagerOperationFailure) { + listener.onFailure(exception); + } else { + listener.onResponse(response); + } + } + }.execute(request, listener); + assertTrue(listener.isDone()); + + if (clusterManagerOperationFailure) { + try { + listener.get(); + fail("Expected exception but returned proper result"); + } catch (ExecutionException ex) { + assertThat(ex.getCause(), equalTo(exception)); + } + } else { + assertThat(listener.get(), equalTo(response)); + } + } + public void testLocalOperationWithBlocks() throws ExecutionException, InterruptedException { final boolean retryableBlock = randomBoolean(); final boolean unblockBeforeTimeout = randomBoolean(); diff --git a/test/framework/src/main/java/org/opensearch/test/TestCluster.java b/test/framework/src/main/java/org/opensearch/test/TestCluster.java index c2e90f0369e6c..478b692fb06ef 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/TestCluster.java @@ -127,6 +127,7 @@ public void assertAfterTest() throws Exception { /** * Returns the number of data and cluster-manager eligible nodes in the cluster. */ + // TODO: Add abstract keyword after removing the deprecated numDataAndMasterNodes() public int numDataAndClusterManagerNodes() { return numDataAndMasterNodes(); } From 7af8e37f81c0a124712a8cf86dab973df26f906f Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 3 Aug 2022 15:02:39 -0700 Subject: [PATCH 03/11] Fix isAheadOf logic for ReplicationCheckpoint comparison (#4112) Signed-off-by: Suraj Singh --- .../replication/checkpoint/ReplicationCheckpoint.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index abcef1bd91944..8afb5bd055636 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -125,10 +125,13 @@ public int hashCode() { } /** - * Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null + * Checks if current replication checkpoint is AheadOf `other` replication checkpoint point by first comparing + * primaryTerm followed by segmentInfosVersion. Returns true when `other` is null. */ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { - return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm(); + return other == null + || primaryTerm > other.getPrimaryTerm() + || (primaryTerm == other.getPrimaryTerm() && segmentInfosVersion > other.getSegmentInfosVersion()); } @Override From ba9cdcdb262176a4947f13471b27450d8ac6fe63 Mon Sep 17 00:00:00 2001 From: Xue Zhou <85715413+xuezhou25@users.noreply.github.com> Date: Wed, 3 Aug 2022 18:19:28 -0700 Subject: [PATCH 04/11] Deprecate and rename abstract methods in interfaces that contain 'master' in name (#4121) Signed-off-by: Xue Zhou --- .../LocalNodeClusterManagerListener.java | 8 ++--- .../cluster/LocalNodeMasterListener.java | 29 +++++++++++++++++++ .../opensearch/cluster/ack/AckedRequest.java | 14 ++++++++- .../service/ClusterApplierServiceTests.java | 4 +-- 4 files changed, 48 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/LocalNodeClusterManagerListener.java b/server/src/main/java/org/opensearch/cluster/LocalNodeClusterManagerListener.java index c86aa00a6f2a2..c07dcc5daaee6 100644 --- a/server/src/main/java/org/opensearch/cluster/LocalNodeClusterManagerListener.java +++ b/server/src/main/java/org/opensearch/cluster/LocalNodeClusterManagerListener.java @@ -42,21 +42,21 @@ public interface LocalNodeClusterManagerListener extends ClusterStateListener { /** * Called when local node is elected to be the cluster-manager */ - void onMaster(); + void onClusterManager(); /** * Called when the local node used to be the cluster-manager, a new cluster-manager was elected and it's no longer the local node. */ - void offMaster(); + void offClusterManager(); @Override default void clusterChanged(ClusterChangedEvent event) { final boolean wasClusterManager = event.previousState().nodes().isLocalNodeElectedClusterManager(); final boolean isClusterManager = event.localNodeClusterManager(); if (wasClusterManager == false && isClusterManager) { - onMaster(); + onClusterManager(); } else if (wasClusterManager && isClusterManager == false) { - offMaster(); + offClusterManager(); } } } diff --git a/server/src/main/java/org/opensearch/cluster/LocalNodeMasterListener.java b/server/src/main/java/org/opensearch/cluster/LocalNodeMasterListener.java index eebfb60d8472d..31c0b294b8004 100644 --- a/server/src/main/java/org/opensearch/cluster/LocalNodeMasterListener.java +++ b/server/src/main/java/org/opensearch/cluster/LocalNodeMasterListener.java @@ -41,4 +41,33 @@ @Deprecated public interface LocalNodeMasterListener extends LocalNodeClusterManagerListener { + /** + * Called when local node is elected to be the cluster-manager. + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #onClusterManager()} + */ + @Deprecated + void onMaster(); + + /** + * Called when the local node used to be the cluster-manager, a new cluster-manager was elected and it's no longer the local node. + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #offClusterManager()} + */ + @Deprecated + void offMaster(); + + /** + * Called when local node is elected to be the cluster-manager. + */ + @Override + default void onClusterManager() { + onMaster(); + } + + /** + * Called when the local node used to be the cluster-manager, a new cluster-manager was elected and it's no longer the local node. + */ + @Override + default void offClusterManager() { + offMaster(); + } } diff --git a/server/src/main/java/org/opensearch/cluster/ack/AckedRequest.java b/server/src/main/java/org/opensearch/cluster/ack/AckedRequest.java index a3f74cb45a880..750f4b177cb86 100644 --- a/server/src/main/java/org/opensearch/cluster/ack/AckedRequest.java +++ b/server/src/main/java/org/opensearch/cluster/ack/AckedRequest.java @@ -48,6 +48,18 @@ public interface AckedRequest { /** * Returns the timeout for the request to be completed on the cluster-manager node + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #clusterManagerNodeTimeout()} */ - TimeValue masterNodeTimeout(); + @Deprecated + default TimeValue masterNodeTimeout() { + throw new UnsupportedOperationException("Must be overridden"); + } + + /** + * Returns the timeout for the request to be completed on the cluster-manager node + */ + // TODO: Remove default implementation after removing the deprecated masterNodeTimeout() + default TimeValue clusterManagerNodeTimeout() { + return masterNodeTimeout(); + } } diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java index 62dae0622eb85..e6da768650088 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java @@ -299,12 +299,12 @@ public void testLocalNodeClusterManagerListenerCallbacks() { AtomicBoolean isClusterManager = new AtomicBoolean(); timedClusterApplierService.addLocalNodeClusterManagerListener(new LocalNodeClusterManagerListener() { @Override - public void onMaster() { + public void onClusterManager() { isClusterManager.set(true); } @Override - public void offMaster() { + public void offClusterManager() { isClusterManager.set(false); } }); From d09c2857e50f9f6631c4da27ae046f28581d9398 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 4 Aug 2022 09:23:44 -0700 Subject: [PATCH 05/11] Disable shard idle with segment replication. (#4118) This change disables shard idle when segment replication is enabled. Primary shards will only push out new segments on refresh, we do not want to block this based on search behavior. Replicas will only refresh on an externally provided SegmentInfos, so we do not want search requests to hang waiting for a refresh. Signed-off-by: Marc Handalian --- .../index/engine/NRTReplicationEngine.java | 12 ++ .../opensearch/index/shard/IndexShard.java | 4 + .../SegmentReplicationIndexShardTests.java | 59 ++++++++ ...enSearchIndexLevelReplicationTestCase.java | 10 ++ .../index/shard/IndexShardTestCase.java | 130 ++++++++++++++++++ 5 files changed, 215 insertions(+) create mode 100644 server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index af175be286b13..c94313d2b8cee 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -193,6 +193,18 @@ protected ReferenceManager getReferenceManager(Search return readerManager; } + /** + * Refreshing of this engine will only happen internally when a new set of segments is received. The engine will ignore external + * refresh attempts so we can return false here. Further Engine's existing implementation reads DirectoryReader.isCurrent after acquiring a searcher. + * With this Engine's NRTReplicationReaderManager, This will use StandardDirectoryReader's implementation which determines if the reader is current by + * comparing the on-disk SegmentInfos version against the one in the reader, which at refresh points will always return isCurrent false and then refreshNeeded true. + * Even if this method returns refresh as needed, we ignore it and only ever refresh with incoming SegmentInfos. + */ + @Override + public boolean refreshNeeded() { + return false; + } + @Override public Closeable acquireHistoryRetentionLock() { throw new UnsupportedOperationException("Not implemented"); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9694f3dd37f80..b05eec4304cd5 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3778,6 +3778,10 @@ public boolean scheduledRefresh() { if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it && isSearchIdle() && indexSettings.isExplicitRefresh() == false + && indexSettings.isSegRepEnabled() == false + // Indices with segrep enabled will never wait on a refresh and ignore shard idle. Primary shards push out new segments only + // after a refresh, so we don't want to wait for a search to trigger that cycle. Replicas will only refresh after receiving + // a new set of segments. && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive // lets skip this refresh since we are search idle and // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java new file mode 100644 index 0000000000000..3fcf6116b11a2 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.indices.replication.common.ReplicationType; + +public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + public void testIgnoreShardIdle() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + final int numDocs = shards.indexDocs(randomInt(10)); + primary.refresh("test"); + replicateSegments(primary, shards.getReplicas()); + shards.assertAllEqual(numDocs); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + + // Update the search_idle setting, this will put both shards into search idle. + Settings updatedSettings = Settings.builder() + .put(settings) + .put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO) + .build(); + primary.indexSettings().getScopedSettings().applySettings(updatedSettings); + replica.indexSettings().getScopedSettings().applySettings(updatedSettings); + + primary.scheduledRefresh(); + replica.scheduledRefresh(); + + // Shards without segrep will register a new RefreshListener on the engine and return true when registered, + // assert with segrep enabled that awaitShardSearchActive does not register a listener. + primary.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 5e22a7c145a39..a588909085160 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -139,6 +139,16 @@ protected ReplicationGroup createGroup(int replicas, Settings settings) throws I return new ReplicationGroup(metadata); } + protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFactory engineFactory) throws IOException { + IndexMetadata metadata = buildIndexMetadata(replicas, settings, indexMapping); + return new ReplicationGroup(metadata) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return engineFactory; + } + }; + } + protected IndexMetadata buildIndexMetadata(int replicas) throws IOException { return buildIndexMetadata(replicas, indexMapping); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index c5ee54450cce2..7dedc572ff19b 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -33,9 +33,15 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.junit.Assert; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; @@ -58,6 +64,7 @@ import org.opensearch.common.lucene.uid.Versions; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.internal.io.IOUtils; @@ -82,6 +89,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; @@ -94,7 +102,14 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; +import org.opensearch.indices.replication.CheckpointInfoResponse; +import org.opensearch.indices.replication.GetSegmentFilesResponse; +import org.opensearch.indices.replication.SegmentReplicationSource; +import org.opensearch.indices.replication.SegmentReplicationTarget; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; @@ -112,6 +127,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -122,6 +138,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Mockito.mock; import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; /** @@ -1133,4 +1150,117 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { } }; } + + /** + * Segment Replication specific test method - Replicate segments to a list of replicas from a given primary. + * This test will use a real {@link SegmentReplicationTarget} for each replica with a mock {@link SegmentReplicationSource} that + * writes all segments directly to the target. + */ + public final void replicateSegments(IndexShard primaryShard, List replicaShards) throws IOException, InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); + Store.MetadataSnapshot primaryMetadata; + try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); + primaryMetadata = primaryShard.store().getMetadata(primarySegmentInfos); + } + final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); + + final ReplicationCollection replicationCollection = new ReplicationCollection<>(logger, threadPool); + final SegmentReplicationSource source = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse( + new CheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataSnapshot(), + copyState.getInfosBytes(), + copyState.getPendingDeleteFiles() + ) + ); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + try ( + final ReplicationCollection.ReplicationRef replicationRef = replicationCollection.get( + replicationId + ) + ) { + writeFileChunks(replicationRef.get(), primaryShard, filesToFetch.toArray(new StoreFileMetadata[] {})); + } catch (IOException e) { + listener.onFailure(e); + } + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + for (IndexShard replica : replicaShards) { + final SegmentReplicationTarget target = new SegmentReplicationTarget( + ReplicationCheckpoint.empty(replica.shardId), + replica, + source, + new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + try (final GatedCloseable snapshot = replica.getSegmentInfosSnapshot()) { + final SegmentInfos replicaInfos = snapshot.get(); + final Store.MetadataSnapshot replicaMetadata = replica.store().getMetadata(replicaInfos); + final Store.RecoveryDiff recoveryDiff = primaryMetadata.recoveryDiff(replicaMetadata); + assertTrue(recoveryDiff.missing.isEmpty()); + assertTrue(recoveryDiff.different.isEmpty()); + assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + assertEquals(primaryMetadata.getCommitUserData(), replicaMetadata.getCommitUserData()); + } catch (Exception e) { + throw ExceptionsHelper.convertToRuntime(e); + } + countDownLatch.countDown(); + } + + @Override + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + logger.error("Unexpected replication failure in test", e); + Assert.fail("test replication should not fail: " + e); + } + } + ); + replicationCollection.start(target, TimeValue.timeValueMillis(5000)); + target.startReplication(new ActionListener<>() { + @Override + public void onResponse(Void o) { + replicationCollection.markAsDone(target.getId()); + } + + @Override + public void onFailure(Exception e) { + replicationCollection.fail(target.getId(), new OpenSearchException("Segment Replication failed", e), true); + } + }); + } + countDownLatch.await(3, TimeUnit.SECONDS); + } + + private void writeFileChunks(SegmentReplicationTarget target, IndexShard primary, StoreFileMetadata[] files) throws IOException { + for (StoreFileMetadata md : files) { + try (IndexInput in = primary.store().directory().openInput(md.name(), IOContext.READONCE)) { + int pos = 0; + while (pos < md.length()) { + int length = between(1, Math.toIntExact(md.length() - pos)); + byte[] buffer = new byte[length]; + in.readBytes(buffer, 0, length); + target.writeFileChunk(md, pos, new BytesArray(buffer), pos + length == md.length(), 0, mock(ActionListener.class)); + pos += length; + } + } + } + } } From 797a5d98722a306bcb0e2e9f47fa054db67098d6 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 4 Aug 2022 17:44:57 -0700 Subject: [PATCH 06/11] Fix waitUntil refresh policy for segrep enabled indices. (#4137) Signed-off-by: Marc Handalian --- .../java/org/opensearch/index/WaitUntilRefreshIT.java | 9 ++++++++- .../opensearch/index/engine/NRTReplicationEngine.java | 6 ++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java b/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java index e38b128c04fde..a75057356fe8a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/WaitUntilRefreshIT.java @@ -42,7 +42,10 @@ import org.opensearch.action.support.WriteRequest.RefreshPolicy; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.rest.RestStatus; import org.opensearch.script.MockScriptPlugin; @@ -74,7 +77,11 @@ public class WaitUntilRefreshIT extends OpenSearchIntegTestCase { @Override public Settings indexSettings() { // Use a shorter refresh interval to speed up the tests. We'll be waiting on this interval several times. - return Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms").build(); + final Settings.Builder builder = Settings.builder().put(super.indexSettings()).put("index.refresh_interval", "40ms"); + if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { + builder.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); + } + return builder.build(); } @Before diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index c94313d2b8cee..1f9306cf51e1e 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -69,6 +69,12 @@ public NRTReplicationEngine(EngineConfig engineConfig) { this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); this.readerManager = readerManager; this.readerManager.addListener(completionStatsCache); + for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) { + this.readerManager.addListener(listener); + } + for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { + this.readerManager.addListener(listener); + } final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); translogManagerRef = new WriteOnlyTranslogManager( From 42058aa76a18f531ceaac7deb4e94944cf02f463 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Fri, 5 Aug 2022 10:29:22 +0530 Subject: [PATCH 07/11] [Remote Store] Add rest endpoint for remote store restore (#3576) * Add rest endpoint for remote store restore Signed-off-by: Sachin Kale --- .../client/RestHighLevelClientTests.java | 3 +- .../api/remote_store.restore.json | 34 +++++ .../org/opensearch/action/ActionModule.java | 12 ++ .../restore/RestoreRemoteStoreAction.java | 26 ++++ .../restore/RestoreRemoteStoreResponse.java | 126 ++++++++++++++++++ .../TransportRestoreRemoteStoreAction.java | 103 ++++++++++++++ .../remotestore/restore/package-info.java | 2 +- .../restore/RestoreClusterStateListener.java | 33 +++-- .../TransportRestoreSnapshotAction.java | 7 +- .../opensearch/client/ClusterAdminClient.java | 7 + .../client/support/AbstractClient.java | 8 ++ .../cluster/RestRestoreRemoteStoreAction.java | 50 +++++++ .../RestoreRemoteStoreResponseTests.java | 45 +++++++ 13 files changed, 441 insertions(+), 15 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/remote_store.restore.json create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponseTests.java diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index efcc13921c398..3da0f81023f72 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -885,7 +885,8 @@ public void testApiNamingConventions() throws Exception { "nodes.hot_threads", "nodes.usage", "nodes.reload_secure_settings", - "search_shards", }; + "search_shards", + "remote_store.restore", }; List booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password"); Set deprecatedMethods = new HashSet<>(); deprecatedMethods.add("indices.force_merge"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/remote_store.restore.json b/rest-api-spec/src/main/resources/rest-api-spec/api/remote_store.restore.json new file mode 100644 index 0000000000000..6af49f75b9f6e --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/remote_store.restore.json @@ -0,0 +1,34 @@ +{ + "remote_store.restore":{ + "documentation":{ + "url": "https://opensearch.org/docs/latest/opensearch/rest-api/remote-store#restore", + "description":"Restores from remote store." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_remotestore/_restore", + "methods":[ + "POST" + ] + } + ] + }, + "params":{ + "cluster_manager_timeout":{ + "type":"time", + "description":"Explicit operation timeout for connection to cluster-manager node" + }, + "wait_for_completion":{ + "type":"boolean", + "description":"Should this request wait until the operation has completed before returning", + "default":false + } + }, + "body":{ + "description":"A comma separated list of index IDs", + "required":true + } + } +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 2f470d7603869..052d2ec2b5764 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -61,6 +61,8 @@ import org.opensearch.action.admin.cluster.node.usage.TransportNodesUsageAction; import org.opensearch.action.admin.cluster.remote.RemoteInfoAction; import org.opensearch.action.admin.cluster.remote.TransportRemoteInfoAction; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction; +import org.opensearch.action.admin.cluster.remotestore.restore.TransportRestoreRemoteStoreAction; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; import org.opensearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction; import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction; @@ -267,6 +269,7 @@ import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.seqno.RetentionLeaseActions; import org.opensearch.indices.SystemIndices; import org.opensearch.indices.breaker.CircuitBreakerService; @@ -314,6 +317,7 @@ import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction; import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction; import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction; +import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction; import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction; import org.opensearch.rest.action.admin.cluster.RestSnapshotsStatusAction; import org.opensearch.rest.action.admin.cluster.RestVerifyRepositoryAction; @@ -668,6 +672,9 @@ public void reg actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); + // Remote Store + actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class); + return unmodifiableMap(actions.getRegistry()); } @@ -853,6 +860,11 @@ public void initRestHandlers(Supplier nodesInCluster) { } } registerHandler.accept(new RestCatAction(catActions)); + + // Remote Store APIs + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + registerHandler.accept(new RestRestoreRemoteStoreAction()); + } } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreAction.java new file mode 100644 index 0000000000000..46b1bc14e8537 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreAction.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.action.ActionType; + +/** + * Restore remote store action + * + * @opensearch.internal + */ +public final class RestoreRemoteStoreAction extends ActionType { + + public static final RestoreRemoteStoreAction INSTANCE = new RestoreRemoteStoreAction(); + public static final String NAME = "cluster:admin/remotestore/restore"; + + private RestoreRemoteStoreAction() { + super(NAME, RestoreRemoteStoreResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponse.java new file mode 100644 index 0000000000000..80bf96b6b2562 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponse.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.Nullable; +import org.opensearch.common.ParseField; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ConstructingObjectParser; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.rest.RestStatus; +import org.opensearch.snapshots.RestoreInfo; + +import java.io.IOException; +import java.util.Objects; + +import static org.opensearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * Contains information about remote store restores + * + * @opensearch.internal + */ +public final class RestoreRemoteStoreResponse extends ActionResponse implements ToXContentObject { + + @Nullable + private final RestoreInfo restoreInfo; + + public RestoreRemoteStoreResponse(@Nullable RestoreInfo restoreInfo) { + this.restoreInfo = restoreInfo; + } + + public RestoreRemoteStoreResponse(StreamInput in) throws IOException { + super(in); + restoreInfo = RestoreInfo.readOptionalRestoreInfo(in); + } + + /** + * Returns restore information if remote store restore was completed before this method returned, null otherwise + * + * @return restore information or null + */ + public RestoreInfo getRestoreInfo() { + return restoreInfo; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(restoreInfo); + } + + public RestStatus status() { + if (restoreInfo == null) { + return RestStatus.ACCEPTED; + } + return restoreInfo.status(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (restoreInfo != null) { + builder.field("remote_store"); + restoreInfo.toXContent(builder, params); + } else { + builder.field("accepted", true); + } + builder.endObject(); + return builder; + } + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "restore_remote_store", + true, + v -> { + RestoreInfo restoreInfo = (RestoreInfo) v[0]; + Boolean accepted = (Boolean) v[1]; + assert (accepted == null && restoreInfo != null) || (accepted != null && accepted && restoreInfo == null) : "accepted: [" + + accepted + + "], restoreInfo: [" + + restoreInfo + + "]"; + return new RestoreRemoteStoreResponse(restoreInfo); + } + ); + + static { + PARSER.declareObject( + optionalConstructorArg(), + (parser, context) -> RestoreInfo.fromXContent(parser), + new ParseField("remote_store") + ); + PARSER.declareBoolean(optionalConstructorArg(), new ParseField("accepted")); + } + + public static RestoreRemoteStoreResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RestoreRemoteStoreResponse that = (RestoreRemoteStoreResponse) o; + return Objects.equals(restoreInfo, that.restoreInfo); + } + + @Override + public int hashCode() { + return Objects.hash(restoreInfo); + } + + @Override + public String toString() { + return "RestoreRemoteStoreResponse{" + "restoreInfo=" + restoreInfo + '}'; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java new file mode 100644 index 0000000000000..7304ba25717ac --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/TransportRestoreRemoteStoreAction.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.snapshots.RestoreService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Transport action for restore remote store operation + * + * @opensearch.internal + */ +public final class TransportRestoreRemoteStoreAction extends TransportClusterManagerNodeAction< + RestoreRemoteStoreRequest, + RestoreRemoteStoreResponse> { + private final RestoreService restoreService; + + @Inject + public TransportRestoreRemoteStoreAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + RestoreService restoreService, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + RestoreRemoteStoreAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + RestoreRemoteStoreRequest::new, + indexNameExpressionResolver + ); + this.restoreService = restoreService; + } + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + protected RestoreRemoteStoreResponse read(StreamInput in) throws IOException { + return new RestoreRemoteStoreResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(RestoreRemoteStoreRequest request, ClusterState state) { + // Restoring a remote store might change the global state and create/change an index, + // so we need to check for METADATA_WRITE and WRITE blocks + ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + if (blockException != null) { + return blockException; + } + return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + + } + + @Override + protected void clusterManagerOperation( + final RestoreRemoteStoreRequest request, + final ClusterState state, + final ActionListener listener + ) { + restoreService.restoreFromRemoteStore( + request, + ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> { + if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { + RestoreClusterStateListener.createAndRegisterListener( + clusterService, + restoreCompletionResponse, + delegatedListener, + RestoreRemoteStoreResponse::new + ); + } else { + delegatedListener.onResponse(new RestoreRemoteStoreResponse(restoreCompletionResponse.getRestoreInfo())); + } + }) + ); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java index 363b7179f3c6c..10348f5ccfe6e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/package-info.java @@ -6,5 +6,5 @@ * compatible open source license. */ -/** Restore Snapshot transport handler. */ +/** Restore remote store transport handler. */ package org.opensearch.action.admin.cluster.remotestore.restore; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java index 7d2ca99e3dbf5..d0f78e85e26a5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreClusterStateListener.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionResponse; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.RestoreInProgress; @@ -44,6 +45,8 @@ import org.opensearch.snapshots.RestoreInfo; import org.opensearch.snapshots.RestoreService; +import java.util.function.Function; + import static org.opensearch.snapshots.RestoreService.restoreInProgress; /** @@ -51,22 +54,27 @@ * * @opensearch.internal */ -public class RestoreClusterStateListener implements ClusterStateListener { +public class RestoreClusterStateListener implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class); private final ClusterService clusterService; private final String uuid; - private final ActionListener listener; + private final String restoreIdentifier; + private final ActionListener listener; + private final Function actionResponseFactory; private RestoreClusterStateListener( ClusterService clusterService, RestoreService.RestoreCompletionResponse response, - ActionListener listener + ActionListener listener, + Function actionResponseFactory ) { this.clusterService = clusterService; this.uuid = response.getUuid(); + this.restoreIdentifier = response.getSnapshot() != null ? response.getSnapshot().getSnapshotId().getName() : "remote_store"; this.listener = listener; + this.actionResponseFactory = actionResponseFactory; } @Override @@ -78,23 +86,23 @@ public void clusterChanged(ClusterChangedEvent changedEvent) { // on the current cluster-manager and as such it might miss some intermediary cluster states due to batching. // Clean up listener in that case and acknowledge completion of restore operation to client. clusterService.removeListener(this); - listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null)); + listener.onResponse(actionResponseFactory.apply(null)); } else if (newEntry == null) { clusterService.removeListener(this); ImmutableOpenMap shards = prevEntry.shards(); - assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state(); + assert prevEntry.state().completed() : "expected completed snapshot/remote store restore state but was " + prevEntry.state(); assert RestoreService.completed(shards) : "expected all restore entries to be completed"; RestoreInfo ri = new RestoreInfo( - prevEntry.snapshot().getSnapshotId().getName(), + restoreIdentifier, prevEntry.indices(), shards.size(), shards.size() - RestoreService.failedShards(shards) ); - RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri); - logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId()); + T response = actionResponseFactory.apply(ri); + logger.debug("restore of [{}] completed", restoreIdentifier); listener.onResponse(response); } else { - // restore not completed yet, wait for next cluster state update + logger.debug("restore not completed yet, wait for next cluster state update"); } } @@ -102,11 +110,12 @@ public void clusterChanged(ClusterChangedEvent changedEvent) { * Creates a cluster state listener and registers it with the cluster service. The listener passed as a * parameter will be called when the restore is complete. */ - public static void createAndRegisterListener( + public static void createAndRegisterListener( ClusterService clusterService, RestoreService.RestoreCompletionResponse response, - ActionListener listener + ActionListener listener, + Function actionResponseFactory ) { - clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener)); + clusterService.addListener(new RestoreClusterStateListener(clusterService, response, listener, actionResponseFactory)); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java index e7d95b9e40880..c2f79b2a27157 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/TransportRestoreSnapshotAction.java @@ -109,7 +109,12 @@ protected void clusterManagerOperation( ) { restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener, (delegatedListener, restoreCompletionResponse) -> { if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) { - RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener); + RestoreClusterStateListener.createAndRegisterListener( + clusterService, + restoreCompletionResponse, + delegatedListener, + RestoreSnapshotResponse::new + ); } else { delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo())); } diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index f4eaa979ff18c..7a7b98bf724f6 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -62,6 +62,8 @@ import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequest; import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder; import org.opensearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequestBuilder; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse; @@ -577,6 +579,11 @@ public interface ClusterAdminClient extends OpenSearchClient { */ void restoreSnapshot(RestoreSnapshotRequest request, ActionListener listener); + /** + * Restores from remote store. + */ + void restoreRemoteStore(RestoreRemoteStoreRequest request, ActionListener listener); + /** * Restores a snapshot. */ diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index f99454a8a8913..7084a856ab3d1 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -77,6 +77,9 @@ import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequest; import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder; import org.opensearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreAction; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest; import org.opensearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequestBuilder; @@ -1109,6 +1112,11 @@ public void restoreSnapshot(RestoreSnapshotRequest request, ActionListener listener) { + execute(RestoreRemoteStoreAction.INSTANCE, request, listener); + } + @Override public RestoreSnapshotRequestBuilder prepareRestoreSnapshot(String repository, String snapshot) { return new RestoreSnapshotRequestBuilder(this, RestoreSnapshotAction.INSTANCE, repository, snapshot); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java new file mode 100644 index 0000000000000..fca6745167bb4 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.POST; + +/** + * Restores data from remote store + * + * @opensearch.api + */ +public final class RestRestoreRemoteStoreAction extends BaseRestHandler { + + @Override + public List routes() { + return singletonList(new Route(POST, "/_remotestore/_restore")); + } + + @Override + public String getName() { + return "restore_remote_store_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + RestoreRemoteStoreRequest restoreRemoteStoreRequest = new RestoreRemoteStoreRequest(); + restoreRemoteStoreRequest.masterNodeTimeout( + request.paramAsTime("cluster_manager_timeout", restoreRemoteStoreRequest.masterNodeTimeout()) + ); + restoreRemoteStoreRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false)); + request.applyContentParser(p -> restoreRemoteStoreRequest.source(p.mapOrdered())); + return channel -> client.admin().cluster().restoreRemoteStore(restoreRemoteStoreRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponseTests.java new file mode 100644 index 0000000000000..b52729c1bbfd7 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreResponseTests.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.remotestore.restore; + +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.snapshots.RestoreInfo; +import org.opensearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class RestoreRemoteStoreResponseTests extends AbstractXContentTestCase { + + @Override + protected RestoreRemoteStoreResponse createTestInstance() { + if (randomBoolean()) { + String name = "remote_store"; + List indices = new ArrayList<>(); + indices.add("test0"); + indices.add("test1"); + int totalShards = randomIntBetween(1, 1000); + int successfulShards = randomIntBetween(0, totalShards); + return new RestoreRemoteStoreResponse(new RestoreInfo(name, indices, totalShards, successfulShards)); + } else { + return new RestoreRemoteStoreResponse((RestoreInfo) null); + } + } + + @Override + protected RestoreRemoteStoreResponse doParseInstance(XContentParser parser) throws IOException { + return RestoreRemoteStoreResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} From fbe93d4743e37033c211fff4fbf07fa71b5b66d9 Mon Sep 17 00:00:00 2001 From: Sruti Parthiban Date: Thu, 4 Aug 2022 22:05:23 -0700 Subject: [PATCH 08/11] Task consumer Integration (#2293) * Integrate task consumers to capture task resource information during unregister. Add consumer that logs topN expensive search tasks Signed-off-by: sruti1312 --- .../src/docker/config/log4j2.properties | 10 ++ distribution/src/config/log4j2.properties | 37 ++++++ .../ImportLog4jPropertiesTaskTests.java | 2 +- .../test/resources/config/log4j2.properties | 35 ++++++ .../action/search/SearchShardTask.java | 21 ++++ .../common/settings/ClusterSettings.java | 4 +- .../search/internal/ShardSearchRequest.java | 16 ++- .../search/query/QuerySearchRequest.java | 5 +- .../org/opensearch/tasks/TaskManager.java | 43 +++++++ .../SearchShardTaskDetailsLogMessage.java | 67 +++++++++++ .../tasks/consumer/TopNSearchTasksLogger.java | 100 +++++++++++++++++ .../tasks/consumer/package-info.java | 12 ++ .../transport/TransportService.java | 15 ++- .../node/tasks/TaskManagerTestCase.java | 10 +- .../index/IndexingSlowLogTests.java | 2 +- .../opensearch/index/SearchSlowLogTests.java | 2 +- ...SearchShardTaskDetailsLogMessageTests.java | 61 ++++++++++ .../consumer/TopNSearchTasksLoggerTests.java | 105 ++++++++++++++++++ ...enSearchIndexLevelReplicationTestCase.java | 2 + .../test/transport/MockTransportService.java | 10 +- 20 files changed, 545 insertions(+), 14 deletions(-) create mode 100644 server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java create mode 100644 server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java create mode 100644 server/src/main/java/org/opensearch/tasks/consumer/package-info.java create mode 100644 server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java create mode 100644 server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java diff --git a/distribution/docker/src/docker/config/log4j2.properties b/distribution/docker/src/docker/config/log4j2.properties index a8c54137c7fd2..761478a9fdc6e 100644 --- a/distribution/docker/src/docker/config/log4j2.properties +++ b/distribution/docker/src/docker/config/log4j2.properties @@ -53,3 +53,13 @@ logger.index_indexing_slowlog.name = index.indexing.slowlog.index logger.index_indexing_slowlog.level = trace logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling logger.index_indexing_slowlog.additivity = false + +appender.task_detailslog_rolling.type = Console +appender.task_detailslog_rolling.name = task_detailslog_rolling +appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout +appender.task_detailslog_rolling.layout.type_name = task_detailslog + +logger.task_detailslog_rolling.name = task.detailslog +logger.task_detailslog_rolling.level = trace +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling +logger.task_detailslog_rolling.additivity = false diff --git a/distribution/src/config/log4j2.properties b/distribution/src/config/log4j2.properties index 4820396c79eb7..bb27aaf2e22e6 100644 --- a/distribution/src/config/log4j2.properties +++ b/distribution/src/config/log4j2.properties @@ -195,3 +195,40 @@ logger.index_indexing_slowlog.level = trace logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old logger.index_indexing_slowlog.additivity = false + +######## Task details log JSON #################### +appender.task_detailslog_rolling.type = RollingFile +appender.task_detailslog_rolling.name = task_detailslog_rolling +appender.task_detailslog_rolling.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.json +appender.task_detailslog_rolling.filePermissions = rw-r----- +appender.task_detailslog_rolling.layout.type = OpenSearchJsonLayout +appender.task_detailslog_rolling.layout.type_name = task_detailslog +appender.task_detailslog_rolling.layout.opensearchmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata + +appender.task_detailslog_rolling.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.json.gz +appender.task_detailslog_rolling.policies.type = Policies +appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling.policies.size.size = 1GB +appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling.strategy.max = 4 +################################################# +######## Task details log - old style pattern #### +appender.task_detailslog_rolling_old.type = RollingFile +appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old +appender.task_detailslog_rolling_old.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog.log +appender.task_detailslog_rolling_old.filePermissions = rw-r----- +appender.task_detailslog_rolling_old.layout.type = PatternLayout +appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n + +appender.task_detailslog_rolling_old.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}_task_detailslog-%i.log.gz +appender.task_detailslog_rolling_old.policies.type = Policies +appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling_old.policies.size.size = 1GB +appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling_old.strategy.max = 4 +################################################# +logger.task_detailslog_rolling.name = task.detailslog +logger.task_detailslog_rolling.level = trace +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old +logger.task_detailslog_rolling.additivity = false diff --git a/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java b/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java index 7f67e08c66b9e..96544d3297ad4 100644 --- a/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java +++ b/distribution/tools/upgrade-cli/src/test/java/org/opensearch/upgrade/ImportLog4jPropertiesTaskTests.java @@ -67,7 +67,7 @@ public void testImportLog4jPropertiesTask() throws IOException { Properties properties = new Properties(); properties.load(Files.newInputStream(taskInput.getOpenSearchConfig().resolve(ImportLog4jPropertiesTask.LOG4J_PROPERTIES))); assertThat(properties, is(notNullValue())); - assertThat(properties.entrySet(), hasSize(137)); + assertThat(properties.entrySet(), hasSize(165)); assertThat(properties.get("appender.rolling.layout.type"), equalTo("OpenSearchJsonLayout")); assertThat( properties.get("appender.deprecation_rolling.fileName"), diff --git a/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties b/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties index b9ad71121165a..4b92d3fc62376 100644 --- a/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties +++ b/distribution/tools/upgrade-cli/src/test/resources/config/log4j2.properties @@ -176,3 +176,38 @@ logger.index_indexing_slowlog.level = trace logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling_old.ref = index_indexing_slowlog_rolling_old logger.index_indexing_slowlog.additivity = false + +######## Task details log JSON #################### +appender.task_detailslog_rolling.type = RollingFile +appender.task_detailslog_rolling.name = task_detailslog_rolling +appender.task_detailslog_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.json +appender.task_detailslog_rolling.layout.type = ESJsonLayout +appender.task_detailslog_rolling.layout.type_name = task_detailslog +appender.task_detailslog_rolling.layout.esmessagefields=taskId,type,action,description,start_time_millis,resource_stats,metadata + +appender.task_detailslog_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.json.gz +appender.task_detailslog_rolling.policies.type = Policies +appender.task_detailslog_rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling.policies.size.size = 1GB +appender.task_detailslog_rolling.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling.strategy.max = 4 +################################################# +######## Task details log - old style pattern #### +appender.task_detailslog_rolling_old.type = RollingFile +appender.task_detailslog_rolling_old.name = task_detailslog_rolling_old +appender.task_detailslog_rolling_old.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog.log +appender.task_detailslog_rolling_old.layout.type = PatternLayout +appender.task_detailslog_rolling_old.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] [%node_name]%marker %m%n + +appender.task_detailslog_rolling_old.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_task_detailslog-%i.log.gz +appender.task_detailslog_rolling_old.policies.type = Policies +appender.task_detailslog_rolling_old.policies.size.type = SizeBasedTriggeringPolicy +appender.task_detailslog_rolling_old.policies.size.size = 1GB +appender.task_detailslog_rolling_old.strategy.type = DefaultRolloverStrategy +appender.task_detailslog_rolling_old.strategy.max = 4 +################################################# +logger.task_detailslog_rolling.name = task.detailslog +logger.task_detailslog_rolling.level = trace +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling.ref = task_detailslog_rolling +logger.task_detailslog_rolling.appenderRef.task_detailslog_rolling_old.ref = task_detailslog_rolling_old +logger.task_detailslog_rolling.additivity = false diff --git a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java index 57831896db714..c9d0d6e2d3d47 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchShardTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchShardTask.java @@ -32,12 +32,14 @@ package org.opensearch.action.search; +import org.opensearch.common.MemoizedSupplier; import org.opensearch.search.fetch.ShardFetchSearchRequest; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.TaskId; import java.util.Map; +import java.util.function.Supplier; /** * Task storing information about a currently running search shard request. @@ -46,9 +48,28 @@ * @opensearch.internal */ public class SearchShardTask extends CancellableTask { + // generating metadata in a lazy way since source can be quite big + private final MemoizedSupplier metadataSupplier; public SearchShardTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + this(id, type, action, description, parentTaskId, headers, () -> ""); + } + + public SearchShardTask( + long id, + String type, + String action, + String description, + TaskId parentTaskId, + Map headers, + Supplier metadataSupplier + ) { super(id, type, action, description, parentTaskId, headers); + this.metadataSupplier = new MemoizedSupplier<>(metadataSupplier); + } + + public String getTaskMetadata() { + return metadataSupplier.get(); } @Override diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index fe322992cd3e5..971fb518ff1da 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -41,6 +41,7 @@ import org.opensearch.index.ShardIndexingPressureMemoryManager; import org.opensearch.index.ShardIndexingPressureSettings; import org.opensearch.index.ShardIndexingPressureStore; +import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; @@ -577,7 +578,8 @@ public void apply(Settings value, Settings current, Settings previous) { ShardIndexingPressureMemoryManager.SUCCESSFUL_REQUEST_ELAPSED_TIMEOUT, ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS, IndexingPressure.MAX_INDEXING_BYTES, - TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED + TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED, + TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED ) ) ); diff --git a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java index a4d7b2bed516c..828c2f8c78d69 100644 --- a/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/opensearch/search/internal/ShardSearchRequest.java @@ -51,6 +51,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.ToXContent; import org.opensearch.index.Index; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; @@ -71,6 +72,7 @@ import org.opensearch.transport.TransportRequest; import java.io.IOException; +import java.util.Collections; import java.util.Arrays; import java.util.Map; import java.util.function.Function; @@ -85,6 +87,8 @@ * @opensearch.internal */ public class ShardSearchRequest extends TransportRequest implements IndicesRequest { + public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); + private final String clusterAlias; private final ShardId shardId; private final int numberOfShards; @@ -501,7 +505,7 @@ public String getClusterAlias() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier); } @Override @@ -510,6 +514,16 @@ public String getDescription() { return "shardId[" + shardId() + "]"; } + public String getMetadataSupplier() { + StringBuilder sb = new StringBuilder(); + if (source != null) { + sb.append("source[").append(source.toString(FORMAT_PARAMS)).append("]"); + } else { + sb.append("source[]"); + } + return sb.toString(); + } + public Rewriteable getRewriteable() { return new RequestRewritable(this); } diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java b/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java index ae2f9e8fab989..ca74942decb50 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchRequest.java @@ -123,7 +123,7 @@ public IndicesOptions indicesOptions() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); + return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers, this::getMetadataSupplier); } public String getDescription() { @@ -137,4 +137,7 @@ public String getDescription() { return sb.toString(); } + public String getMetadataSupplier() { + return shardSearchRequest().getMetadataSupplier(); + } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskManager.java b/server/src/main/java/org/opensearch/tasks/TaskManager.java index 01b6b8ea603bf..334cde81dfb6a 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskManager.java +++ b/server/src/main/java/org/opensearch/tasks/TaskManager.java @@ -51,6 +51,8 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; @@ -58,6 +60,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.concurrent.ConcurrentMapLong; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TcpChannel; @@ -75,6 +78,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -92,6 +96,15 @@ public class TaskManager implements ClusterStateApplier { private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100); + public static final String TASK_RESOURCE_CONSUMERS_ATTRIBUTES = "task_resource_consumers.enabled"; + + public static final Setting TASK_RESOURCE_CONSUMERS_ENABLED = Setting.boolSetting( + TASK_RESOURCE_CONSUMERS_ATTRIBUTES, + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** * Rest headers that are copied to the task */ @@ -116,10 +129,26 @@ public class TaskManager implements ClusterStateApplier { private final Map channelPendingTaskTrackers = ConcurrentCollections.newConcurrentMap(); private final SetOnce cancellationService = new SetOnce<>(); + private volatile boolean taskResourceConsumersEnabled; + private final Set> taskResourceConsumer; + + public static TaskManager createTaskManagerWithClusterSettings( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { + final TaskManager taskManager = new TaskManager(settings, threadPool, taskHeaders); + clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_CONSUMERS_ENABLED, taskManager::setTaskResourceConsumersEnabled); + return taskManager; + } + public TaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { this.threadPool = threadPool; this.taskHeaders = new ArrayList<>(taskHeaders); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); + this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings); + this.taskResourceConsumer = Set.of(new TopNSearchTasksLogger(settings)); } public void setTaskResultsService(TaskResultsService taskResultsService) { @@ -135,6 +164,10 @@ public void setTaskResourceTrackingService(TaskResourceTrackingService taskResou this.taskResourceTrackingService.set(taskResourceTrackingService); } + public void setTaskResourceConsumersEnabled(boolean taskResourceConsumersEnabled) { + this.taskResourceConsumersEnabled = taskResourceConsumersEnabled; + } + /** * Registers a task without parent task */ @@ -240,6 +273,16 @@ public Task unregister(Task task) { // Decrement the task's self-thread as part of unregistration. task.decrementResourceTrackingThreads(); + if (taskResourceConsumersEnabled) { + for (Consumer taskConsumer : taskResourceConsumer) { + try { + taskConsumer.accept(task); + } catch (Exception e) { + logger.error("error encountered when updating the consumer", e); + } + } + } + if (task instanceof CancellableTask) { CancellableTaskHolder holder = cancellableTasks.remove(task.getId()); if (holder != null) { diff --git a/server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java b/server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java new file mode 100644 index 0000000000000..1755db3ab4ae8 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessage.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.logging.OpenSearchLogMessage; + +import java.util.HashMap; +import java.util.Map; + +/** + * Search shard task information that will be extracted from Task and converted into + * format that will be logged + * + * @opensearch.internal + */ +public final class SearchShardTaskDetailsLogMessage extends OpenSearchLogMessage { + SearchShardTaskDetailsLogMessage(SearchShardTask task) { + super(prepareMap(task), message(task)); + } + + private static Map prepareMap(SearchShardTask task) { + Map messageFields = new HashMap<>(); + messageFields.put("taskId", task.getId()); + messageFields.put("type", task.getType()); + messageFields.put("action", task.getAction()); + messageFields.put("description", task.getDescription()); + messageFields.put("start_time_millis", task.getStartTime()); + messageFields.put("parentTaskId", task.getParentTaskId()); + messageFields.put("resource_stats", task.getResourceStats()); + messageFields.put("metadata", task.getTaskMetadata()); + return messageFields; + } + + // Message will be used in plaintext logs + private static String message(SearchShardTask task) { + StringBuilder sb = new StringBuilder(); + sb.append("taskId:[") + .append(task.getId()) + .append("], ") + .append("type:[") + .append(task.getType()) + .append("], ") + .append("action:[") + .append(task.getAction()) + .append("], ") + .append("description:[") + .append(task.getDescription()) + .append("], ") + .append("start_time_millis:[") + .append(task.getStartTime()) + .append("], ") + .append("resource_stats:[") + .append(task.getResourceStats()) + .append("], ") + .append("metadata:[") + .append(task.getTaskMetadata()) + .append("]"); + return sb.toString(); + } +} diff --git a/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java b/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java new file mode 100644 index 0000000000000..dd7e200d7f4b2 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/TopNSearchTasksLogger.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.Task; + +import java.util.Comparator; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.function.Consumer; + +/** + * A simple listener that logs resource information of high memory consuming search tasks + * + * @opensearch.internal + */ +public class TopNSearchTasksLogger implements Consumer { + public static final String TASK_DETAILS_LOG_PREFIX = "task.detailslog"; + public static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size"; + public static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency"; + + private static final Logger SEARCH_TASK_DETAILS_LOGGER = LogManager.getLogger(TASK_DETAILS_LOG_PREFIX + ".search"); + + // number of memory expensive search tasks that are logged + private static final Setting LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting( + LOG_TOP_QUERIES_SIZE, + 10, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + // frequency in which memory expensive search tasks are logged + private static final Setting LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting( + LOG_TOP_QUERIES_FREQUENCY, + TimeValue.timeValueSeconds(60L), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private final int topQueriesSize; + private final long topQueriesLogFrequencyInNanos; + private final Queue> topQueries; + private long lastReportedTimeInNanos = System.nanoTime(); + + public TopNSearchTasksLogger(Settings settings) { + this.topQueriesSize = LOG_TOP_QUERIES_SIZE_SETTING.get(settings); + this.topQueriesLogFrequencyInNanos = LOG_TOP_QUERIES_FREQUENCY_SETTING.get(settings).getNanos(); + this.topQueries = new PriorityQueue<>(topQueriesSize, Comparator.comparingLong(Tuple::v1)); + } + + /** + * Called when task is unregistered and task has resource stats present. + */ + @Override + public void accept(Task task) { + if (task instanceof SearchShardTask) { + recordSearchTask((SearchShardTask) task); + } + } + + private synchronized void recordSearchTask(SearchShardTask searchTask) { + final long memory_in_bytes = searchTask.getTotalResourceUtilization(ResourceStats.MEMORY); + if (System.nanoTime() - lastReportedTimeInNanos >= topQueriesLogFrequencyInNanos) { + publishTopNEvents(); + lastReportedTimeInNanos = System.nanoTime(); + } + if (topQueries.size() >= topQueriesSize && topQueries.peek().v1() < memory_in_bytes) { + // evict the element + topQueries.poll(); + } + if (topQueries.size() < topQueriesSize) { + topQueries.offer(new Tuple<>(memory_in_bytes, searchTask)); + } + } + + private void publishTopNEvents() { + logTopResourceConsumingQueries(); + topQueries.clear(); + } + + private void logTopResourceConsumingQueries() { + for (Tuple topQuery : topQueries) { + SEARCH_TASK_DETAILS_LOGGER.info(new SearchShardTaskDetailsLogMessage(topQuery.v2())); + } + } +} diff --git a/server/src/main/java/org/opensearch/tasks/consumer/package-info.java b/server/src/main/java/org/opensearch/tasks/consumer/package-info.java new file mode 100644 index 0000000000000..40219a1cead5b --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/consumer/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Support for adding consumers to consume task related information. + */ +package org.opensearch.tasks.consumer; diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 1a280f2475e5d..aaba06196bc59 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -210,7 +210,7 @@ public TransportService( setTracerLogInclude(TransportSettings.TRACE_LOG_INCLUDE_SETTING.get(settings)); setTracerLogExclude(TransportSettings.TRACE_LOG_EXCLUDE_SETTING.get(settings)); tracerLog = Loggers.getLogger(logger, ".tracer"); - taskManager = createTaskManager(settings, threadPool, taskHeaders); + taskManager = createTaskManager(settings, clusterSettings, threadPool, taskHeaders); this.interceptor = transportInterceptor; this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); @@ -246,8 +246,17 @@ public TaskManager getTaskManager() { return taskManager; } - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { - return new TaskManager(settings, threadPool, taskHeaders); + protected TaskManager createTaskManager( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { + if (clusterSettings != null) { + return TaskManager.createTaskManagerWithClusterSettings(settings, clusterSettings, threadPool, taskHeaders); + } else { + return new TaskManager(settings, threadPool, taskHeaders); + } } /** diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index fd6f5d17a3a80..68cf69e30f8a6 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -53,6 +53,7 @@ import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.network.NetworkService; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.transport.BoundTransportAddress; import org.opensearch.common.util.PageCacheRecycler; @@ -218,11 +219,16 @@ public TestNode(String name, ThreadPool threadPool, Settings settings) { Collections.emptySet() ) { @Override - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + protected TaskManager createTaskManager( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(settings, threadPool, taskHeaders); + return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders); } } }; diff --git a/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java b/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java index 38c8491d79150..75a346e444b73 100644 --- a/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java @@ -86,8 +86,8 @@ public static void init() throws IllegalAccessException { @AfterClass public static void cleanup() { - appender.stop(); Loggers.removeAppender(testLogger1, appender); + appender.stop(); } public void testLevelPrecedence() { diff --git a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java index ae159092a4833..ea146ec20b16a 100644 --- a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java @@ -84,9 +84,9 @@ public static void init() throws IllegalAccessException { @AfterClass public static void cleanup() { - appender.stop(); Loggers.removeAppender(queryLog, appender); Loggers.removeAppender(fetchLog, appender); + appender.stop(); } @Override diff --git a/server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java b/server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java new file mode 100644 index 0000000000000..641fdef4891bd --- /dev/null +++ b/server/src/test/java/org/opensearch/tasks/consumer/SearchShardTaskDetailsLogMessageTests.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.ResourceStatsType; +import org.opensearch.tasks.ResourceUsageMetric; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; + +public class SearchShardTaskDetailsLogMessageTests extends OpenSearchSingleNodeTestCase { + public void testTaskDetailsLogHasJsonFields() { + SearchShardTask task = new SearchShardTask( + 0, + "n/a", + "n/a", + "test", + null, + Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"), + () -> "test_metadata" + ); + SearchShardTaskDetailsLogMessage p = new SearchShardTaskDetailsLogMessage(task); + + assertThat(p.getValueFor("taskId"), equalTo("0")); + assertThat(p.getValueFor("type"), equalTo("n/a")); + assertThat(p.getValueFor("action"), equalTo("n/a")); + assertThat(p.getValueFor("description"), equalTo("test")); + assertThat(p.getValueFor("parentTaskId"), equalTo(null)); + // when no resource information present + assertThat(p.getValueFor("resource_stats"), equalTo("{}")); + assertThat(p.getValueFor("metadata"), equalTo("test_metadata")); + + task.startThreadResourceTracking( + 0, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, 0L), + new ResourceUsageMetric(ResourceStats.CPU, 0L) + ); + task.updateThreadResourceStats( + 0, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, 100), + new ResourceUsageMetric(ResourceStats.CPU, 100) + ); + assertThat( + p.getValueFor("resource_stats"), + equalTo("{0=[{cpu_time_in_nanos=100, memory_in_bytes=100}, stats_type=worker_stats, is_active=true, threadId=0]}") + ); + } +} diff --git a/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java b/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java new file mode 100644 index 0000000000000..a8fd3623ef09d --- /dev/null +++ b/server/src/test/java/org/opensearch/tasks/consumer/TopNSearchTasksLoggerTests.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks.consumer; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.logging.MockAppender; +import org.opensearch.common.settings.Settings; +import org.opensearch.tasks.ResourceStats; +import org.opensearch.tasks.ResourceStatsType; +import org.opensearch.tasks.ResourceUsageMetric; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import java.util.Collections; + +import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY; +import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE; + +public class TopNSearchTasksLoggerTests extends OpenSearchSingleNodeTestCase { + static MockAppender appender; + static Logger searchLogger = LogManager.getLogger(TopNSearchTasksLogger.TASK_DETAILS_LOG_PREFIX + ".search"); + + private TopNSearchTasksLogger topNSearchTasksLogger; + + @BeforeClass + public static void init() throws IllegalAccessException { + appender = new MockAppender("trace_appender"); + appender.start(); + Loggers.addAppender(searchLogger, appender); + } + + @AfterClass + public static void cleanup() { + Loggers.removeAppender(searchLogger, appender); + appender.stop(); + } + + public void testLoggerWithTasks() { + final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "0ms").build(); + topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + generateTasks(5); + LogEvent logEvent = appender.getLastEventAndReset(); + assertNotNull(logEvent); + assertEquals(logEvent.getLevel(), Level.INFO); + assertTrue(logEvent.getMessage().getFormattedMessage().contains("cpu_time_in_nanos=300, memory_in_bytes=300")); + } + + public void testLoggerWithoutTasks() { + final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "500ms").build(); + topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + + assertNull(appender.getLastEventAndReset()); + } + + public void testLoggerWithHighFrequency() { + // setting the frequency to a really large value and confirming that nothing gets written to log file. + final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "10m").build(); + topNSearchTasksLogger = new TopNSearchTasksLogger(settings); + generateTasks(5); + generateTasks(2); + + assertNull(appender.getLastEventAndReset()); + } + + // generate search tasks and updates the topN search tasks logger consumer. + public void generateTasks(int numberOfTasks) { + for (int i = 0; i < numberOfTasks; i++) { + Task task = new SearchShardTask( + i, + "n/a", + "n/a", + "test", + null, + Collections.singletonMap(Task.X_OPAQUE_ID, "my_id"), + () -> "n/a" + ); + task.startThreadResourceTracking( + i, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, 0L), + new ResourceUsageMetric(ResourceStats.CPU, 0L) + ); + task.updateThreadResourceStats( + i, + ResourceStatsType.WORKER_STATS, + new ResourceUsageMetric(ResourceStats.MEMORY, i * 100L), + new ResourceUsageMetric(ResourceStats.CPU, i * 100L) + ); + topNSearchTasksLogger.accept(task); + } + } +} diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index a588909085160..249ffcfd0bf6e 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -76,6 +76,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; @@ -201,6 +202,7 @@ protected class ReplicationGroup implements AutoCloseable, Iterable private final AtomicInteger docId = new AtomicInteger(); boolean closed = false; private volatile ReplicationTargets replicationTargets; + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer( new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java index 9b9baebd540c3..c80b120ad0148 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java @@ -261,11 +261,16 @@ private static TransportAddress[] extractTransportAddresses(TransportService tra } @Override - protected TaskManager createTaskManager(Settings settings, ThreadPool threadPool, Set taskHeaders) { + protected TaskManager createTaskManager( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool, + Set taskHeaders + ) { if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) { return new MockTaskManager(settings, threadPool, taskHeaders); } else { - return super.createTaskManager(settings, threadPool, taskHeaders); + return super.createTaskManager(settings, clusterSettings, threadPool, taskHeaders); } } @@ -530,7 +535,6 @@ public void clearCallback() { /** * Adds a new handling behavior that is used when the defined request is received. - * */ public void addRequestHandlingBehavior( String actionName, From 24527fcbced32185f4659898f9e9cf7413c6a8f2 Mon Sep 17 00:00:00 2001 From: pranikum <109206473+pranikum@users.noreply.github.com> Date: Fri, 5 Aug 2022 23:02:20 +0530 Subject: [PATCH 09/11] Issue:3933 -Multi-node setup for local gradle run (#3958) * Issue:3933 - Multinode setup for local gradle run Signed-off-by: Pranit Kumar --- .../testclusters/OpenSearchCluster.java | 40 ++++++++++++++++--- .../gradle/testclusters/OpenSearchNode.java | 11 ++++- gradle/run.gradle | 5 +++ 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchCluster.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchCluster.java index ef52adab6377a..0f5348d5a8dcf 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchCluster.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchCluster.java @@ -84,6 +84,8 @@ public class OpenSearchCluster implements TestClusterConfiguration, Named { private final ArchiveOperations archiveOperations; private int nodeIndex = 0; + private int zoneCount = 1; + public OpenSearchCluster( String clusterName, Project project, @@ -104,13 +106,21 @@ public OpenSearchCluster( this.bwcJdk = bwcJdk; // Always add the first node - addNode(clusterName + "-0"); + String zone = hasZoneProperty() ? "zone-1" : ""; + addNode(clusterName + "-0", zone); // configure the cluster name eagerly so all nodes know about it this.nodes.all((node) -> node.defaultConfig.put("cluster.name", safeName(clusterName))); addWaitForClusterHealth(); } + public void setNumberOfZones(int zoneCount) { + if (zoneCount < 1) { + throw new IllegalArgumentException("Number of zones should be >= 1 but was " + zoneCount + " for " + this); + } + this.zoneCount = zoneCount; + } + public void setNumberOfNodes(int numberOfNodes) { checkFrozen(); @@ -124,12 +134,31 @@ public void setNumberOfNodes(int numberOfNodes) { ); } - for (int i = nodes.size(); i < numberOfNodes; i++) { - addNode(clusterName + "-" + i); + if (numberOfNodes < zoneCount) { + throw new IllegalArgumentException( + "Number of nodes should be >= zoneCount but was " + numberOfNodes + " for " + this.zoneCount + ); } + + if (hasZoneProperty()) { + int currentZone; + for (int i = nodes.size(); i < numberOfNodes; i++) { + currentZone = i % zoneCount + 1; + String zoneName = "zone-" + currentZone; + addNode(clusterName + "-" + i, zoneName); + } + } else { + for (int i = nodes.size(); i < numberOfNodes; i++) { + addNode(clusterName + "-" + i, ""); + } + } + } + + private boolean hasZoneProperty() { + return this.project.findProperty("numZones") != null; } - private void addNode(String nodeName) { + private void addNode(String nodeName, String zoneName) { OpenSearchNode newNode = new OpenSearchNode( path, nodeName, @@ -138,7 +167,8 @@ private void addNode(String nodeName) { fileSystemOperations, archiveOperations, workingDirBase, - bwcJdk + bwcJdk, + zoneName ); // configure the cluster name eagerly newNode.defaultConfig.put("cluster.name", safeName(clusterName)); diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java index b051c15e81d6d..ab765efde7885 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java @@ -32,6 +32,7 @@ package org.opensearch.gradle.testclusters; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; import org.opensearch.gradle.Architecture; import org.opensearch.gradle.DistributionDownloadPlugin; import org.opensearch.gradle.OpenSearchDistribution; @@ -175,6 +176,8 @@ public class OpenSearchNode implements TestClusterConfiguration { private final Config legacyESConfig; private Config currentConfig; + private String zone; + OpenSearchNode( String path, String name, @@ -183,7 +186,8 @@ public class OpenSearchNode implements TestClusterConfiguration { FileSystemOperations fileSystemOperations, ArchiveOperations archiveOperations, File workingDirBase, - Jdk bwcJdk + Jdk bwcJdk, + String zone ) { this.path = path; this.name = name; @@ -205,6 +209,7 @@ public class OpenSearchNode implements TestClusterConfiguration { opensearchConfig = Config.getOpenSearchConfig(workingDir); legacyESConfig = Config.getLegacyESConfig(workingDir); currentConfig = opensearchConfig; + this.zone = zone; } /* @@ -1239,6 +1244,10 @@ private void createConfiguration() { baseConfig.put("path.logs", confPathLogs.toAbsolutePath().toString()); baseConfig.put("path.shared_data", workingDir.resolve("sharedData").toString()); baseConfig.put("node.attr.testattr", "test"); + if (StringUtils.isNotBlank(zone)) { + baseConfig.put("cluster.routing.allocation.awareness.attributes", "zone"); + baseConfig.put("node.attr.zone", zone); + } baseConfig.put("node.portsfile", "true"); baseConfig.put("http.port", httpPort); if (getVersion().onOrAfter(Version.fromString("6.7.0"))) { diff --git a/gradle/run.gradle b/gradle/run.gradle index 5a1fed06c0ef7..639479e97d28f 100644 --- a/gradle/run.gradle +++ b/gradle/run.gradle @@ -31,9 +31,14 @@ import org.opensearch.gradle.testclusters.RunTask apply plugin: 'opensearch.testclusters' +def numNodes = findProperty('numNodes') as Integer ?: 1 +def numZones = findProperty('numZones') as Integer ?: 1 + testClusters { runTask { testDistribution = 'archive' + if (numZones > 1) numberOfZones = numZones + if (numNodes > 1) numberOfNodes = numNodes } } From 203f44e94cb4f04077ae0d0c800bb48b3852252b Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 5 Aug 2022 14:25:44 -0400 Subject: [PATCH 10/11] OpenSearch crashes on closed client connection before search reply when total ops higher compared to expected (#4143) Signed-off-by: Andriy Redko --- .../search/AbstractSearchAsyncAction.java | 11 +++- .../AbstractSearchAsyncActionTests.java | 51 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 1597b31e89871..0876bf93a557b 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -302,7 +302,16 @@ public void onFailure(Exception t) { * It is possible to run into connection exceptions here because we are getting the connection early and might * run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy. */ - fork(() -> onShardFailure(shardIndex, shard, shardIt, e)); + fork(() -> { + // It only happens when onPhaseDone() is called and executePhaseOnShard() fails hard with an exception. + // In this case calling onShardFailure() would overflow the operations counter, so the best we could do + // here is to fail the phase and move on to the next one. + if (totalOps.get() == expectedTotalOps) { + onPhaseFailure(this, "The phase has failed", e); + } else { + onShardFailure(shardIndex, shard, shardIt, e); + } + }); } finally { executeNext(pendingExecutions, thread); } diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index b44b59b8a4ad5..ad2657517df9a 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -477,6 +477,57 @@ public void onFailure(Exception e) { assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); } + public void testExecutePhaseOnShardFailure() throws InterruptedException { + final Index index = new Index("test", UUID.randomUUID().toString()); + + final SearchShardIterator[] shards = IntStream.range(0, 2 + randomInt(3)) + .mapToObj(i -> new SearchShardIterator(null, new ShardId(index, i), List.of("n1", "n2", "n3"), null, null, null)) + .toArray(SearchShardIterator[]::new); + + final AtomicBoolean fail = new AtomicBoolean(true); + final CountDownLatch latch = new CountDownLatch(1); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + searchRequest.setMaxConcurrentShardRequests(5); + + final ArraySearchPhaseResults queryResult = new ArraySearchPhaseResults<>(shards.length); + AbstractSearchAsyncAction action = createAction( + searchRequest, + queryResult, + new ActionListener() { + @Override + public void onResponse(SearchResponse response) {} + + @Override + public void onFailure(Exception e) { + try { + // We end up here only when onPhaseDone() is called (causing NPE) and + // ending up in the onPhaseFailure() callback + if (fail.compareAndExchange(true, false)) { + assertThat(e, instanceOf(SearchPhaseExecutionException.class)); + throw new RuntimeException("Simulated exception"); + } + } finally { + executor.submit(() -> latch.countDown()); + } + } + }, + false, + false, + new AtomicLong(), + shards + ); + action.run(); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + + InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); + SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, action.buildShardFailures(), null, null); + assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations()); + assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest()); + assertSame(searchResponse.getProfileResults(), internalSearchResponse.profile()); + assertSame(searchResponse.getHits(), internalSearchResponse.hits()); + assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); + } + private static final class PhaseResult extends SearchPhaseResult { PhaseResult(ShardSearchContextId contextId) { this.contextId = contextId; From c665e7c5f202074ff7f910431302d0aeab4adb90 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Fri, 5 Aug 2022 15:47:30 -0700 Subject: [PATCH 11/11] 2.2.0 release notes (#4151) (#4153) * Added rough release notes for 2.2 This includes an inline sed command to rewrite PR ids to full Github URLs. Command - git log --pretty=format:"* %s" --since=6-24-2022 | sed -E "s|(\(#([0-9]*)\))|([#\2](https://github.com/opensearch-project/opensearch/pull/\2))|g" Signed-off-by: Kartik Ganesh * Organizing 2.2.0 release notes into sections Signed-off-by: Kartik Ganesh * Removed lines from changelog that were already in 2.1.0 Signed-off-by: Kartik Ganesh * Adding one last commit to release notes Signed-off-by: Kartik Ganesh --- .../opensearch.release-notes-2.2.0.md | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 release-notes/opensearch.release-notes-2.2.0.md diff --git a/release-notes/opensearch.release-notes-2.2.0.md b/release-notes/opensearch.release-notes-2.2.0.md new file mode 100644 index 0000000000000..74e76cfe46b5a --- /dev/null +++ b/release-notes/opensearch.release-notes-2.2.0.md @@ -0,0 +1,79 @@ +## 2022-08-05 Version 2.2.0 Release Notes + +### Features/Enhancements + +* Task consumer Integration ([#2293](https://github.com/opensearch-project/opensearch/pull/2293)) ([#4141](https://github.com/opensearch-project/opensearch/pull/4141)) +* [Backport 2.x] [Segment Replication] Add SegmentReplicationTargetService to orchestrate replication events. ([#4074](https://github.com/opensearch-project/opensearch/pull/4074)) +* Support task resource tracking in OpenSearch ([#3982](https://github.com/opensearch-project/opensearch/pull/3982)) ([#4087](https://github.com/opensearch-project/opensearch/pull/4087)) +* Making shard copy count a multiple of attribute count ([#3462](https://github.com/opensearch-project/opensearch/pull/3462)) ([#4086](https://github.com/opensearch-project/opensearch/pull/4086)) +* [Backport 2.x] [Segment Rreplication] Adding CheckpointRefreshListener to trigger when Segment replication is turned on and Primary shard refreshes ([#4044](https://github.com/opensearch-project/opensearch/pull/4044)) +* Add doc_count field mapper ([#3985](https://github.com/opensearch-project/opensearch/pull/3985)) ([#4037](https://github.com/opensearch-project/opensearch/pull/4037)) +* Parallelize stale blobs deletion during snapshot delete ([#3796](https://github.com/opensearch-project/opensearch/pull/3796)) ([#3990](https://github.com/opensearch-project/opensearch/pull/3990)) +* [Backport 2.x] [Segment Replication] Add a new Engine implementation for replicas with segment replication enabled. ([#4003](https://github.com/opensearch-project/opensearch/pull/4003)) +* [Backport 2.x] Adds a new parameter, max_analyzer_offset, for the highlighter ([#4031](https://github.com/opensearch-project/opensearch/pull/4031)) +* Update merge on refresh and merge on commit defaults in Opensearch (Lucene 9.3) ([#3561](https://github.com/opensearch-project/opensearch/pull/3561)) ([#4013](https://github.com/opensearch-project/opensearch/pull/4013)) +* Make HybridDirectory MMAP Extensions Configurable ([#3837](https://github.com/opensearch-project/opensearch/pull/3837)) ([#3970](https://github.com/opensearch-project/opensearch/pull/3970)) +* Add option to disable chunked transfer-encoding ([#3864](https://github.com/opensearch-project/opensearch/pull/3864)) ([#3885](https://github.com/opensearch-project/opensearch/pull/3885)) +* Introducing TranslogManager implementations decoupled from the Engine [2.x] ([#3820](https://github.com/opensearch-project/opensearch/pull/3820)) +* Changing default no_master_block from write to metadata_write ([#3621](https://github.com/opensearch-project/opensearch/pull/3621)) ([#3756](https://github.com/opensearch-project/opensearch/pull/3756)) + +### Bug Fixes + +* OpenSearch crashes on closed client connection before search reply when total ops higher compared to expected ([#4143](https://github.com/opensearch-project/opensearch/pull/4143)) ([#4145](https://github.com/opensearch-project/opensearch/pull/4145)) +* Binding empty instance of SegmentReplicationCheckpointPublisher when Feature Flag is off in IndicesModule.java file. ([#4119](https://github.com/opensearch-project/opensearch/pull/4119)) +* Fix the bug that masterOperation(with task param) is bypassed ([#4103](https://github.com/opensearch-project/opensearch/pull/4103)) ([#4115](https://github.com/opensearch-project/opensearch/pull/4115)) +* Fixing flaky org.opensearch.cluster.routing.allocation.decider.DiskThresholdDeciderIT.testHighWatermarkNotExceeded test case ([#4012](https://github.com/opensearch-project/opensearch/pull/4012)) ([#4014](https://github.com/opensearch-project/opensearch/pull/4014)) +* Correct typo: Rutime -> Runtime ([#3896](https://github.com/opensearch-project/opensearch/pull/3896)) ([#3898](https://github.com/opensearch-project/opensearch/pull/3898)) +* Fixing implausibly old time stamp 1970-01-01 00:00:00 by using the timestamp from the Git revision instead of default 0 value ([#3883](https://github.com/opensearch-project/opensearch/pull/3883)) ([#3891](https://github.com/opensearch-project/opensearch/pull/3891)) + +### Infrastructure + +* Correctly ignore depandabot branches during push ([#4077](https://github.com/opensearch-project/opensearch/pull/4077)) ([#4113](https://github.com/opensearch-project/opensearch/pull/4113)) +* Build performance improvements ([#3926](https://github.com/opensearch-project/opensearch/pull/3926)) ([#3937](https://github.com/opensearch-project/opensearch/pull/3937)) +* PR coverage requirement and default settings ([#3931](https://github.com/opensearch-project/opensearch/pull/3931)) ([#3938](https://github.com/opensearch-project/opensearch/pull/3938)) +* [Backport 2.x] Fail build on wildcard imports ([#3940](https://github.com/opensearch-project/opensearch/pull/3940)) +* Don't run EmptyDirTaskTests in a Docker container ([#3792](https://github.com/opensearch-project/opensearch/pull/3792)) ([#3912](https://github.com/opensearch-project/opensearch/pull/3912)) +* Add coverage, gha, jenkins server, documentation and forum badges ([#3886](https://github.com/opensearch-project/opensearch/pull/3886)) +* Unable to use Systemd module with tar distribution ([#3755](https://github.com/opensearch-project/opensearch/pull/3755)) ([#3903](https://github.com/opensearch-project/opensearch/pull/3903)) +* Ignore backport / autocut / dependentbot branches for gradle checks ([#3816](https://github.com/opensearch-project/opensearch/pull/3816)) ([#3825](https://github.com/opensearch-project/opensearch/pull/3825)) +* Setup branch push coverage and fix coverage uploads ([#3793](https://github.com/opensearch-project/opensearch/pull/3793)) ([#3811](https://github.com/opensearch-project/opensearch/pull/3811)) +* Enable XML test reports for Jenkins integration ([#3799](https://github.com/opensearch-project/opensearch/pull/3799)) ([#3803](https://github.com/opensearch-project/opensearch/pull/3803)) + +### Maintenance + +* OpenJDK Update (July 2022 Patch releases) ([#4023](https://github.com/opensearch-project/opensearch/pull/4023)) ([#4092](https://github.com/opensearch-project/opensearch/pull/4092)) +* Update to Lucene 9.3.0 ([#4043](https://github.com/opensearch-project/opensearch/pull/4043)) ([#4088](https://github.com/opensearch-project/opensearch/pull/4088)) +* Bump commons-configuration2 from 2.7 to 2.8.0 in /plugins/repository-hdfs ([#3764](https://github.com/opensearch-project/opensearch/pull/3764)) ([#3783](https://github.com/opensearch-project/opensearch/pull/3783)) +* Use bash in systemd-entrypoint shebang ([#4008](https://github.com/opensearch-project/opensearch/pull/4008)) ([#4009](https://github.com/opensearch-project/opensearch/pull/4009)) +* Bump com.gradle.enterprise from 3.10.1 to 3.10.2 ([#3568](https://github.com/opensearch-project/opensearch/pull/3568)) ([#3934](https://github.com/opensearch-project/opensearch/pull/3934)) +* Bump log4j-core in /buildSrc/src/testKit/thirdPartyAudit/sample_jars ([#3763](https://github.com/opensearch-project/opensearch/pull/3763)) ([#3784](https://github.com/opensearch-project/opensearch/pull/3784)) +* Added bwc version 1.3.5 ([#3911](https://github.com/opensearch-project/opensearch/pull/3911)) ([#3913](https://github.com/opensearch-project/opensearch/pull/3913)) +* Update to Gradle 7.5 ([#3594](https://github.com/opensearch-project/opensearch/pull/3594)) ([#3904](https://github.com/opensearch-project/opensearch/pull/3904)) +* Update Netty to 4.1.79.Final ([#3868](https://github.com/opensearch-project/opensearch/pull/3868)) ([#3874](https://github.com/opensearch-project/opensearch/pull/3874)) +* Upgrade MinIO image version ([#3541](https://github.com/opensearch-project/opensearch/pull/3541)) ([#3867](https://github.com/opensearch-project/opensearch/pull/3867)) +* Add netty-transport-native-unix-common to modules/transport-netty4/bu… ([#3848](https://github.com/opensearch-project/opensearch/pull/3848)) ([#3853](https://github.com/opensearch-project/opensearch/pull/3853)) +* Update outdated dependencies ([#3821](https://github.com/opensearch-project/opensearch/pull/3821)) ([#3854](https://github.com/opensearch-project/opensearch/pull/3854)) +* Added bwc version 2.1.1 ([#3806](https://github.com/opensearch-project/opensearch/pull/3806)) +* Upgrade netty from 4.1.73.Final to 4.1.78.Final ([#3772](https://github.com/opensearch-project/opensearch/pull/3772)) ([#3778](https://github.com/opensearch-project/opensearch/pull/3778)) +* Bump protobuf-java from 3.21.1 to 3.21.2 in /plugins/repository-hdfs ([#3711](https://github.com/opensearch-project/opensearch/pull/3711)) ([#3726](https://github.com/opensearch-project/opensearch/pull/3726)) +* Upgrading AWS SDK dependency for native plugins ([#3694](https://github.com/opensearch-project/opensearch/pull/3694)) ([#3701](https://github.com/opensearch-project/opensearch/pull/3701)) + +### Refactoring + +* [Backport 2.x] Changes to encapsulate Translog into TranslogManager ([#4095](https://github.com/opensearch-project/opensearch/pull/4095)) ([#4142](https://github.com/opensearch-project/opensearch/pull/4142)) +* Deprecate and rename abstract methods in interfaces that contain 'master' in name ([#4121](https://github.com/opensearch-project/opensearch/pull/4121)) ([#4123](https://github.com/opensearch-project/opensearch/pull/4123)) +* [Backport 2.x] Integrate Engine with decoupled Translog interfaces ([#3822](https://github.com/opensearch-project/opensearch/pull/3822)) +* Deprecate class FakeThreadPoolMasterService, BlockMasterServiceOnMaster and BusyMasterServiceDisruption ([#4058](https://github.com/opensearch-project/opensearch/pull/4058)) ([#4068](https://github.com/opensearch-project/opensearch/pull/4068)) +* Rename classes with name 'MasterService' to 'ClusterManagerService' in directory 'test/framework' ([#4051](https://github.com/opensearch-project/opensearch/pull/4051)) ([#4057](https://github.com/opensearch-project/opensearch/pull/4057)) +* Deprecate class 'MasterService' and create alternative class 'ClusterManagerService' ([#4022](https://github.com/opensearch-project/opensearch/pull/4022)) ([#4050](https://github.com/opensearch-project/opensearch/pull/4050)) +* Deprecate and Rename abstract methods from 'Master' terminology to 'ClusterManager'. ([#4032](https://github.com/opensearch-project/opensearch/pull/4032)) ([#4048](https://github.com/opensearch-project/opensearch/pull/4048)) +* Deprecate public methods and variables that contain 'master' terminology in class 'NoMasterBlockService' and 'MasterService' ([#4006](https://github.com/opensearch-project/opensearch/pull/4006)) ([#4038](https://github.com/opensearch-project/opensearch/pull/4038)) +* Deprecate public methods and variables that contain 'master' terminology in 'client' directory ([#3966](https://github.com/opensearch-project/opensearch/pull/3966)) ([#3981](https://github.com/opensearch-project/opensearch/pull/3981)) +* [segment replication]Introducing common Replication interfaces for segment replication and recovery code paths ([#3234](https://github.com/opensearch-project/opensearch/pull/3234)) ([#3984](https://github.com/opensearch-project/opensearch/pull/3984)) +* Deprecate public methods and variables that contain 'master' terminology in 'test/framework' directory ([#3978](https://github.com/opensearch-project/opensearch/pull/3978)) ([#3987](https://github.com/opensearch-project/opensearch/pull/3987)) +* [Backport 2.x] [Segment Replication] Moving RecoveryState.Index to a top-level class and renaming ([#3971](https://github.com/opensearch-project/opensearch/pull/3971)) +* Rename and deprecate public methods that contains 'master' in the name in 'server' directory ([#3647](https://github.com/opensearch-project/opensearch/pull/3647)) ([#3964](https://github.com/opensearch-project/opensearch/pull/3964)) +* [2.x] Deprecate public class names with master terminology ([#3871](https://github.com/opensearch-project/opensearch/pull/3871)) ([#3914](https://github.com/opensearch-project/opensearch/pull/3914)) +* [Backport 2.x] Rename public classes with 'Master' to 'ClusterManager' ([#3870](https://github.com/opensearch-project/opensearch/pull/3870)) +* Revert renaming masterOperation() to clusterManagerOperation() ([#3681](https://github.com/opensearch-project/opensearch/pull/3681)) ([#3714](https://github.com/opensearch-project/opensearch/pull/3714)) +* Revert renaming method onMaster() and offMaster() in interface LocalNodeMasterListener ([#3686](https://github.com/opensearch-project/opensearch/pull/3686)) ([#3693](https://github.com/opensearch-project/opensearch/pull/3693))