Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip performOnPrimary step when executing PublishCheckpoint. #6366

Merged
merged 2 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
Expand All @@ -33,6 +31,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
Expand All @@ -43,34 +42,26 @@ public class SegmentReplicationRelocationIT extends SegmentReplicationBaseIT {
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);

private void createIndex(int replicaCount) {
prepareCreate(
INDEX_NAME,
Settings.builder()
.put("index.number_of_shards", 1)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.number_of_replicas", replicaCount)
.put("index.refresh_interval", -1)
).get();
prepareCreate(INDEX_NAME, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

/**
* This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before
* relocation and after relocation documents are indexed and documents are verified
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
Copy link
Member

@dreamer-89 dreamer-89 Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: For completenes perspective, do we need relocation tests for IMMEDIATE & default refresh policy ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good idea, I've added randomness to select a refresh policy in all of the tests in this class other than the explicit wait_until test.

public void testPrimaryRelocation() throws Exception {
final String oldPrimary = internalCluster().startNode();
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand Down Expand Up @@ -115,7 +106,7 @@ public void testPrimaryRelocation() throws Exception {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand All @@ -135,19 +126,19 @@ public void testPrimaryRelocation() throws Exception {
* failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the
* replicas.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryRelocationWithSegRepFailure() throws Exception {
final String oldPrimary = internalCluster().startNode();
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand Down Expand Up @@ -200,7 +191,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand All @@ -220,7 +211,6 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
* This test verifies primary recovery behavior with continuous ingestion
*
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception {
final String primary = internalCluster().startNode();
createIndex(1);
Expand Down Expand Up @@ -297,7 +287,6 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E
* operations during handoff. The test verifies all docs ingested are searchable on new primary.
*
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
final String primary = internalCluster().startNode();
createIndex(1);
Expand All @@ -310,14 +299,15 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
}
logger.info("--> flush to have segments on disk");
client().admin().indices().prepareFlush().execute().actionGet();
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());

logger.info("--> index more docs so there are ops in the transaction log");
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 10; i < 20; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand Down Expand Up @@ -396,7 +386,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
}, 2, TimeUnit.MINUTES);
flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(totalDocCount, replica, newPrimary);
verifyStoreContent();
Expand All @@ -406,13 +396,10 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
* This test verifies that adding a new node which results in peer recovery as replica; also bring replica's
* replication checkpoint upto the primary's by performing a round of segment replication.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testNewlyAddedReplicaIsUpdated() throws Exception {
final String primary = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
prepareCreate(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
.get();
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
Expand All @@ -430,10 +417,7 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception {
ensureGreen(INDEX_NAME);
// Update replica count settings to 1 so that peer recovery triggers and recover replica
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1))
);

ClusterHealthResponse clusterHealthResponse = client().admin()
Expand All @@ -454,18 +438,15 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception {

/**
* This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)

).get();

Expand Down Expand Up @@ -505,10 +486,7 @@ public void testAddNewReplicaFailure() throws Exception {
ensureGreen(INDEX_NAME);
// Add Replica shard to the new empty replica node
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1))
);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica);
waitForRecovery.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.ReplicationTask;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -33,15 +39,12 @@
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

import org.opensearch.action.support.replication.ReplicationMode;

/**
* Replication action responsible for publishing checkpoint to a replica shard.
*
Expand Down Expand Up @@ -107,36 +110,33 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) {
* Publish checkpoint request to shard
*/
final void publish(IndexShard indexShard) {
String primaryAllocationId = indexShard.routingEntry().allocationId().getId();
long primaryTerm = indexShard.getPendingPrimaryTerm();
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint());
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

final List<ShardRouting> replicationTargets = indexShard.getReplicationGroup().getReplicationTargets();
for (ShardRouting replicationTarget : replicationTargets) {
if (replicationTarget.primary()) {
continue;
}
final DiscoveryNode node = clusterService.state().nodes().get(replicationTarget.currentNodeId());
final ConcreteReplicaRequest<PublishCheckpointRequest> replicaRequest = new ConcreteReplicaRequest<>(
request,
replicationTarget.allocationId().getId(),
primaryTerm,
indexShard.getLastKnownGlobalCheckpoint(),
indexShard.getMaxSeqNoOfUpdatesOrDeletes()
);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
ActionListener<ReplicationOperation.ReplicaResponse> listener = new ActionListener<>() {
@Override
public void handleResponse(ReplicationResponse response) {
public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) {
timer.stop();
logger.trace(
() -> new ParameterizedMessage(
Expand All @@ -151,7 +151,7 @@ public void handleResponse(ReplicationResponse response) {
}

@Override
public void handleException(TransportException e) {
public void onFailure(Exception e) {
timer.stop();
logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time());
task.setPhase("finished");
Expand All @@ -174,8 +174,13 @@ public void handleException(TransportException e) {
e
);
}
}
);
};
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(
listener,
ReplicaResponse::new
);
transportService.sendChildRequest(node, transportReplicaAction, replicaRequest, task, transportOptions, handler);
}
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] Publishing replication checkpoint [{}]",
Expand All @@ -192,7 +197,7 @@ protected void shardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<PublishCheckpointRequest, ReplicationResponse>> listener
) {
ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse()));
throw new OpenSearchException("PublishCheckpointAction should not hit primary shards");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

package org.opensearch.indices.replication.checkpoint;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActionTestUtils;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.TransportReplicationAction;
Expand All @@ -33,7 +33,6 @@
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -105,14 +104,9 @@ public void testPublishCheckpointActionOnPrimary() {
);

final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1);

final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint);

action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> {
// we should forward the request containing the current publish checkpoint to the replica
assertThat(result.replicaRequest(), sameInstance(request));
}));

expectThrows(OpenSearchException.class, () -> { action.shardOperationOnPrimary(request, indexShard, mock(ActionListener.class)); });
}

public void testPublishCheckpointActionOnReplica() {
Expand Down