Skip to content

Commit

Permalink
Address review comment, move force segrep sync handler to SegRepTarge…
Browse files Browse the repository at this point in the history
…tService

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Jan 5, 2023
1 parent 0776c28 commit aa5e3f3
Show file tree
Hide file tree
Showing 17 changed files with 268 additions and 226 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))



### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ private void waitForReplicaUpdate() throws Exception {
});
}

protected IndexShard getIndexShard(String node) {
private IndexShard getIndexShard(String node) {
final Index index = resolveIndex(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
createIndex();
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(0, 200);
final int initialDocCount = scaledRandomIntBetween(1, 100);
ingestDocs(initialDocCount);

logger.info("--> verifying count {}", initialDocCount);
Expand Down Expand Up @@ -183,10 +183,9 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {

final int finalDocCount = initialDocCount;
ingestDocs(finalDocCount);
refresh(INDEX_NAME);

logger.info(
"Verify all documents are available on both old primary and replica i.e. older primary is still refreshing replica nodes"
);
logger.info("Verify older primary is still refreshing replica nodes");
client().admin().indices().prepareRefresh().execute().actionGet();
assertHitCount(
client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1633,7 +1633,17 @@ static Engine.Searcher wrapSearcher(
}
}

public void resetEngine() throws IOException, InterruptedException, TimeoutException {
/**
* Used with segment replication during relocation handoff, this method updates current read only engine to global
* checkpoint followed by changing to writeable engine
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*
* @opensearch.internal
*/
public void resetToWriteableEngine() throws IOException, InterruptedException, TimeoutException {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}

Expand Down Expand Up @@ -3348,7 +3358,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
* promote to during relocation handoff post segment replication.
* change to a writeable engine during relocation handoff after a round of segment replication.
*/
boolean isReadOnlyReplica = indexSettings.isSegRepEnabled()
&& (shardRouting.primary() == false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationTimer;
Expand Down Expand Up @@ -112,7 +111,6 @@ public static class Actions {
public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog";
public static final String FINALIZE = "internal:index/shard/recovery/finalize";
public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context";
public static final String FORCE_SYNC = "internal:index/shard/recovery/segments_sync";
}

private final ThreadPool threadPool;
Expand All @@ -124,21 +122,17 @@ public static class Actions {

private final ReplicationCollection<RecoveryTarget> onGoingRecoveries;

private final SegmentReplicationTargetService segmentReplicationTargetService;

public PeerRecoveryTargetService(
ThreadPool threadPool,
TransportService transportService,
RecoverySettings recoverySettings,
ClusterService clusterService,
SegmentReplicationTargetService segmentReplicationTargetService
ClusterService clusterService
) {
this.threadPool = threadPool;
this.transportService = transportService;
this.recoverySettings = recoverySettings;
this.clusterService = clusterService;
this.onGoingRecoveries = new ReplicationCollection<>(logger, threadPool);
this.segmentReplicationTargetService = segmentReplicationTargetService;

transportService.registerRequestHandler(
Actions.FILES_INFO,
Expand Down Expand Up @@ -182,12 +176,6 @@ public PeerRecoveryTargetService(
RecoveryHandoffPrimaryContextRequest::new,
new HandoffPrimaryContextRequestHandler()
);
transportService.registerRequestHandler(
Actions.FORCE_SYNC,
ThreadPool.Names.GENERIC,
ForceSyncRequest::new,
new ForceSyncTransportRequestHandler()
);
}

@Override
Expand All @@ -200,7 +188,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.start(
new RecoveryTarget(indexShard, sourceNode, listener, segmentReplicationTargetService),
new RecoveryTarget(indexShard, sourceNode, listener),
recoverySettings.activityTimeout()
);
// we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause
Expand Down Expand Up @@ -575,20 +563,6 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
}
}

class ForceSyncTransportRequestHandler implements TransportRequestHandler<ForceSyncRequest> {
@Override
public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.getRecoveryId(), request.getShardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FORCE_SYNC, request);
if (listener == null) {
return;
}
recoveryTarget.forceSegmentFileSync(listener);
}
}
}

class RecoveryRunner extends AbstractRunnable {

final long recoveryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,6 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
final StepListener<Void> finalizeListener = new StepListener<>();
cancellableThreads.checkForCancel();
recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener);

finalizeListener.whenComplete(r -> {
RunUnderPrimaryPermit.run(
() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

package org.opensearch.indices.recovery;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
Expand All @@ -57,9 +56,6 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
Expand All @@ -71,7 +67,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;

import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY;

Expand All @@ -91,44 +86,19 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH
// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);

private final SegmentReplicationTargetService segmentReplicationTargetService;

/**
* Creates a new recovery target object that represents a recovery to the provided shard.
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed/failed
* @param segmentReplicationTargetService used to force a segment replication round
*/
public RecoveryTarget(
IndexShard indexShard,
DiscoveryNode sourceNode,
ReplicationListener listener,
SegmentReplicationTargetService segmentReplicationTargetService
) {
super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
this.sourceNode = sourceNode;
indexShard.recoveryStats().incCurrentAsTarget();
final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount);
this.segmentReplicationTargetService = segmentReplicationTargetService;
}

/**
* Creates a new recovery target object that represents a recovery to the provided shard. Used for tests.
*
* @param indexShard local shard where we want to recover to
* @param sourceNode source node of the recovery where we recover from
* @param listener called when recovery is completed/failed
*/
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) {
super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
this.sourceNode = sourceNode;
indexShard.recoveryStats().incCurrentAsTarget();
final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount);
this.segmentReplicationTargetService = SegmentReplicationTargetService.NO_OP;
}

/**
Expand All @@ -137,7 +107,7 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, Replicati
* @return a copy of this recovery target
*/
public RecoveryTarget retryCopy() {
return new RecoveryTarget(indexShard, sourceNode, listener, segmentReplicationTargetService);
return new RecoveryTarget(indexShard, sourceNode, listener);
}

public IndexShard indexShard() {
Expand Down Expand Up @@ -249,45 +219,7 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo

@Override
public void forceSegmentFileSync(ActionListener<Void> listener) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardId()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
try {
indexShard.resetEngine();
listener.onResponse(null);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
indexShard.failShard("replication failure", e);
}
listener.onFailure(e);
}
}
);
throw new UnsupportedOperationException("Method not supported on target!");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.RemoteSegmentFileChunkWriter;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.transport.EmptyTransportResponseHandler;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponse;
Expand Down Expand Up @@ -195,7 +196,7 @@ public void indexTranslogOperations(
*/
@Override
public void forceSegmentFileSync(ActionListener<Void> listener) {
final String action = PeerRecoveryTargetService.Actions.FORCE_SYNC;
final String action = SegmentReplicationTargetService.Actions.FORCE_SYNC;
final long requestSeqNo = requestSeqNoGenerator.getAndIncrement();
final ForceSyncRequest request = new ForceSyncRequest(requestSeqNo, recoveryId, shardId);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
Expand Down
Loading

0 comments on commit aa5e3f3

Please sign in to comment.