Skip to content

Commit

Permalink
Skip performOnPrimary step when executing PublishCheckpoint. (#6366)
Browse files Browse the repository at this point in the history
* Skip performOnPrimary step when executing PublishCheckpoint.

Signed-off-by: Marc Handalian <[email protected]>

Fix spotless.

Signed-off-by: Marc Handalian <[email protected]>

* PR Feedback:
- Add random refresh policy to reloation ITs.
- add back finishing ReplicationTask.

Signed-off-by: Marc Handalian <[email protected]>

---------

Signed-off-by: Marc Handalian <[email protected]>
(cherry picked from commit 72314f7)
  • Loading branch information
mch2 committed Feb 21, 2023
1 parent daec8fb commit 9cf2da3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
Expand All @@ -33,6 +31,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
Expand All @@ -43,34 +42,26 @@ public class SegmentReplicationRelocationIT extends SegmentReplicationBaseIT {
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);

private void createIndex(int replicaCount) {
prepareCreate(
INDEX_NAME,
Settings.builder()
.put("index.number_of_shards", 1)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.number_of_replicas", replicaCount)
.put("index.refresh_interval", -1)
).get();
prepareCreate(INDEX_NAME, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)).get();
}

/**
* This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before
* relocation and after relocation documents are indexed and documents are verified
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryRelocation() throws Exception {
final String oldPrimary = internalCluster().startNode();
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand Down Expand Up @@ -115,7 +106,7 @@ public void testPrimaryRelocation() throws Exception {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand All @@ -135,19 +126,19 @@ public void testPrimaryRelocation() throws Exception {
* failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the
* replicas.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testPrimaryRelocationWithSegRepFailure() throws Exception {
final String oldPrimary = internalCluster().startNode();
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand Down Expand Up @@ -200,7 +191,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand All @@ -220,7 +211,6 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
* This test verifies primary recovery behavior with continuous ingestion
*
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception {
final String primary = internalCluster().startNode();
createIndex(1);
Expand Down Expand Up @@ -297,7 +287,6 @@ public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws E
* operations during handoff. The test verifies all docs ingested are searchable on new primary.
*
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
final String primary = internalCluster().startNode();
createIndex(1);
Expand All @@ -310,14 +299,15 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
}
logger.info("--> flush to have segments on disk");
client().admin().indices().prepareFlush().execute().actionGet();
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());

logger.info("--> index more docs so there are ops in the transaction log");
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 10; i < 20; i++) {
pendingIndexResponses.add(
client().prepareIndex(INDEX_NAME)
.setId(Integer.toString(i))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(refreshPolicy)
.setSource("field", "value" + i)
.execute()
);
Expand Down Expand Up @@ -396,7 +386,7 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
assertBusy(() -> {
client().admin().indices().prepareRefresh().execute().actionGet();
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
}, 1, TimeUnit.MINUTES);
}, 2, TimeUnit.MINUTES);
flushAndRefresh(INDEX_NAME);
waitForSearchableDocs(totalDocCount, replica, newPrimary);
verifyStoreContent();
Expand All @@ -406,13 +396,10 @@ public void testRelocateWithQueuedOperationsDuringHandoff() throws Exception {
* This test verifies that adding a new node which results in peer recovery as replica; also bring replica's
* replication checkpoint upto the primary's by performing a round of segment replication.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testNewlyAddedReplicaIsUpdated() throws Exception {
final String primary = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
prepareCreate(INDEX_NAME, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
.get();
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
Expand All @@ -430,10 +417,7 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception {
ensureGreen(INDEX_NAME);
// Update replica count settings to 1 so that peer recovery triggers and recover replica
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1))
);

ClusterHealthResponse clusterHealthResponse = client().admin()
Expand All @@ -454,18 +438,15 @@ public void testNewlyAddedReplicaIsUpdated() throws Exception {

/**
* This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();

logger.info("--> creating test index ...");
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)

).get();

Expand Down Expand Up @@ -505,10 +486,7 @@ public void testAddNewReplicaFailure() throws Exception {
ensureGreen(INDEX_NAME);
// Add Replica shard to the new empty replica node
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
client().admin().indices().prepareUpdateSettings(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1))
);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replica);
waitForRecovery.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.ReplicationTask;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -33,15 +39,12 @@
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

import org.opensearch.action.support.replication.ReplicationMode;

/**
* Replication action responsible for publishing checkpoint to a replica shard.
*
Expand Down Expand Up @@ -107,36 +110,33 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) {
* Publish checkpoint request to shard
*/
final void publish(IndexShard indexShard) {
String primaryAllocationId = indexShard.routingEntry().allocationId().getId();
long primaryTerm = indexShard.getPendingPrimaryTerm();
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint());
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

final List<ShardRouting> replicationTargets = indexShard.getReplicationGroup().getReplicationTargets();
for (ShardRouting replicationTarget : replicationTargets) {
if (replicationTarget.primary()) {
continue;
}
final DiscoveryNode node = clusterService.state().nodes().get(replicationTarget.currentNodeId());
final ConcreteReplicaRequest<PublishCheckpointRequest> replicaRequest = new ConcreteReplicaRequest<>(
request,
replicationTarget.allocationId().getId(),
primaryTerm,
indexShard.getLastKnownGlobalCheckpoint(),
indexShard.getMaxSeqNoOfUpdatesOrDeletes()
);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
ActionListener<ReplicationOperation.ReplicaResponse> listener = new ActionListener<>() {
@Override
public void handleResponse(ReplicationResponse response) {
public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) {
timer.stop();
logger.trace(
() -> new ParameterizedMessage(
Expand All @@ -151,7 +151,7 @@ public void handleResponse(ReplicationResponse response) {
}

@Override
public void handleException(TransportException e) {
public void onFailure(Exception e) {
timer.stop();
logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time());
task.setPhase("finished");
Expand All @@ -174,8 +174,13 @@ public void handleException(TransportException e) {
e
);
}
}
);
};
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(
listener,
ReplicaResponse::new
);
transportService.sendChildRequest(node, transportReplicaAction, replicaRequest, task, transportOptions, handler);
}
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] Publishing replication checkpoint [{}]",
Expand All @@ -192,7 +197,7 @@ protected void shardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<PublishCheckpointRequest, ReplicationResponse>> listener
) {
ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse()));
throw new OpenSearchException("PublishCheckpointAction should not hit primary shards");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

package org.opensearch.indices.replication.checkpoint;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActionTestUtils;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.TransportReplicationAction;
Expand All @@ -33,10 +33,9 @@
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;

Expand Down Expand Up @@ -105,14 +104,9 @@ public void testPublishCheckpointActionOnPrimary() {
);

final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1);

final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint);

action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> {
// we should forward the request containing the current publish checkpoint to the replica
assertThat(result.replicaRequest(), sameInstance(request));
}));

expectThrows(OpenSearchException.class, () -> { action.shardOperationOnPrimary(request, indexShard, mock(ActionListener.class)); });
}

public void testPublishCheckpointActionOnReplica() {
Expand Down

0 comments on commit 9cf2da3

Please sign in to comment.