Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Dec 27, 2022
1 parent 4c84970 commit bca1cde
Show file tree
Hide file tree
Showing 29 changed files with 665 additions and 424 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- [Segment Replication] Update peer recovery logic for segment replication ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -95,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fixed compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
- Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299))
- Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460))
- [Segment Replication] Fix for peer recovery ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -702,8 +701,7 @@ public static final IndexShard newIndexShard(
cbs,
new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null,
SegmentReplicationTargetService.NO_OP
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@
import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -27,9 +24,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
Expand All @@ -51,7 +46,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
import java.util.List;
Expand All @@ -72,14 +66,13 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;
protected static final String INDEX_NAME = "test-idx-1";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@BeforeClass
public static void assumeFeatureFlag() {
// assumeTrue("Segment replication Feature flag is enabled",
// Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
Expand All @@ -103,8 +96,6 @@ protected boolean addMockInternalEngine() {
return false;
}

private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);

public void ingestDocs(int docCount) throws Exception {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
Expand All @@ -124,166 +115,6 @@ public void ingestDocs(int docCount) throws Exception {
}
}

/**
* This test relocates a primary shard to a newly added node in the cluster. Before relocation and after relocation
* we index documents. We don't perform any flush before relocation is done.
*/
public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exception {
logger.info("--> starting [primary] ...");
final String old_primary = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder()
.put("index.number_of_shards", 1)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.number_of_replicas", 1)
).get();

final String replica = internalCluster().startNode();

ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(0, 200);
ingestDocs(initialDocCount);

logger.info("--> verifying count {}", initialDocCount);
assertHitCount(client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

logger.info("--> start another node");
final String new_primary = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
logger.info("--> cluster state before relocation {}", state);

logger.info("--> relocate the shard");

client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary))
.execute()
.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> get the state, verify shard 1 primary moved from node1 to node2");
state = client().admin().cluster().prepareState().execute().actionGet().getState();
// assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
assertThat(
state.getRoutingNodes().node(state.nodes().resolveNode(new_primary).getId()).iterator().next().state(),
equalTo(ShardRoutingState.STARTED)
);

final int finalDocCount = 1;
client().prepareIndex(INDEX_NAME).setId("201").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
refresh(INDEX_NAME);
waitForReplicaUpdate();

logger.info("--> verifying count again {} + {}", initialDocCount, finalDocCount);
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(
client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);
assertHitCount(
client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
initialDocCount + finalDocCount
);

}

public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception {
logger.info("--> starting [primary] ...");
final String primary = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).put("index.refresh_interval", -1)
).get();

logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have an actual index");
client().admin().indices().prepareFlush().execute().actionGet();
logger.info("--> index more docs so we have something in the translog");
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 10; i < 20; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}

logger.info("--> start another node");
final String replica = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> relocate the shard from primary to replica");
ActionFuture<ClusterRerouteResponse> relocationListener = client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica))
.execute();
logger.info("--> index 100 docs while relocating");
for (int i = 20; i < 120; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setSource("field", "value" + i)
.execute()
);
}
relocationListener.actionGet();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> verifying count");
assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), 120);
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -950,7 +781,7 @@ private void waitForReplicaUpdate() throws Exception {
// if we don't have any segments yet, proceed.
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
logger.debug("Primary Segments: {}", primaryShardSegments.getSegments());
if (primaryShardSegments.getSegments().isEmpty() == false) {
if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) {
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
Expand All @@ -965,7 +796,7 @@ private void waitForReplicaUpdate() throws Exception {
});
}

private IndexShard getIndexShard(String node) {
protected IndexShard getIndexShard(String node) {
final Index index = resolveIndex(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
Expand Down
Loading

0 comments on commit bca1cde

Please sign in to comment.