Skip to content

Commit

Permalink
PR cleanup.
Browse files Browse the repository at this point in the history
Renamed SegmentReplicationService -> SegmentReplicationReplicaService.
Removed if conditions in SyncedFlushService and TransportShardFlushAction.
Improved comments and documentation.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Feb 22, 2022
1 parent f619be7 commit f646a9e
Show file tree
Hide file tree
Showing 18 changed files with 85 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected void shardOperationOnPrimary(
@Override
protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
ActionListener.completeWith(listener, () -> {
// replica.flush(request.getRequest());
replica.flush(request.getRequest());
logger.trace("{} flush request executed on replica", replica.shardId());
return new ReplicaResult();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public final class IndexSettings {
*/
public static final Setting<Boolean> INDEX_SEGMENT_REPLICATION_SETTING = Setting.boolSetting(
"index.replication.segment_replication",
true,
false,
Property.IndexScope,
Property.Final
);
Expand Down
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1200,8 +1200,18 @@ public abstract void forceMerge(
*/
public abstract IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException;

/**
* Fetch a snapshot of the latest SegmentInfos from the engine and ensure that segment files are retained in the directory
* until closed.
* @return {@link SegmentInfosRef} - A ref to segmentInfos that must be closed for segment files to be deleted.
*/
public SegmentInfosRef getLatestSegmentInfosSafe() { return null; };

/**
* Fetch a snapshot of the latest SegmentInfos from the engine.
* This method does not ensure that segment files are retained in the directory.
* @return {@link SegmentInfos}
*/
public SegmentInfos getLatestSegmentInfos() { return null; };

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws I
toIndexInput(infosBytes),
gen);
assert gen == infos.getGeneration();
externalReaderManager.internalReaderManager.setCurrentInfos(infos);
externalReaderManager.internalReaderManager.updateSegments(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
}
Expand Down Expand Up @@ -2110,6 +2110,9 @@ public boolean shouldPeriodicallyFlush() {

@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (engineConfig.isPrimary() == false) {
return new CommitId(lastCommittedSegmentInfos.getId());
}
ensureOpen();
if (force && waitIfOngoing == false) {
assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
* Switch to new segments, refreshing if necessary. Note that it's the caller job to ensure
* there's a held refCount for the incoming infos, so all files exist.
*/
public synchronized void setCurrentInfos(SegmentInfos infos) throws IOException {
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
currentInfos = infos;
maybeRefresh();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

/**
* A {@link ReferenceManager.RefreshListener} that publishes a checkpoint to be consumed by replicas.
* This class is only used with Segment Replication enabled.
*/
public class CheckpointRefreshListener implements ReferenceManager.RefreshListener {

Expand Down
38 changes: 24 additions & 14 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.SegmentReplicationService;
import org.opensearch.indices.replication.SegmentReplicationReplicaService;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher;
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
Expand Down Expand Up @@ -1507,10 +1507,20 @@ public Engine.IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws E
}
}

/**
* Fetch a ref to the latest SegmentInfos held by the engine. This ensures the files will not be deleted until
* the ref is closed.
* @return {@link Engine.SegmentInfosRef}
* @throws EngineException - When infos cannot be retrieved from the Engine.
*/
public Engine.SegmentInfosRef getLatestSegmentInfosSafe() throws EngineException {
return getEngine().getLatestSegmentInfosSafe();
}

/**
* Fetch a snapshot of the latest SegmentInfos held by the engine.
* @return {@link SegmentInfos}
*/
public SegmentInfos getLatestSegmentInfos() {
return getEngine().getLatestSegmentInfos();
}
Expand All @@ -1523,10 +1533,10 @@ public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws I
getEngine().updateCurrentInfos(infosBytes, gen, seqNo);
}

/**
* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
*/
/**
* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
*/
public Engine.IndexCommitRef acquireSafeIndexCommit() throws EngineException {
final IndexShardState state = this.state; // one time volatile read
// we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine
Expand Down Expand Up @@ -3048,8 +3058,8 @@ protected Engine getEngineOrNull() {

public void startRecovery(
RecoveryState recoveryState,
SegmentReplicationService segmentReplicationService,
SegmentReplicationService.ReplicationListener replicationListener,
SegmentReplicationReplicaService segmentReplicationReplicaService,
SegmentReplicationReplicaService.ReplicationListener replicationListener,
PrimaryShardReplicationSource replicationSource,
PeerRecoveryTargetService peerRecoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
Expand Down Expand Up @@ -3083,7 +3093,7 @@ public void startRecovery(
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
IndexShard indexShard = this;
segmentReplicationService.prepareForReplication(this, recoveryState.getTargetNode(), recoveryState.getSourceNode(), new ActionListener<TrackShardResponse>() {
segmentReplicationReplicaService.prepareForReplication(this, recoveryState.getTargetNode(), recoveryState.getSourceNode(), new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
Expand All @@ -3097,7 +3107,7 @@ public void onFailure(Exception e) {
}
});
} catch (Exception e) {
logger.error("Error", e);
logger.error("Error preparing the shard for Segment replication", e);
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
Expand Down Expand Up @@ -3711,7 +3721,7 @@ public long getProcessedLocalCheckpoint() {
*/
public synchronized void onNewCheckpoint(final PublishCheckpointRequest request,
final PrimaryShardReplicationSource source,
final SegmentReplicationService segmentReplicationService) {
final SegmentReplicationReplicaService segmentReplicationReplicaService) {
logger.debug("Checkpoint received {}", request.getCheckpoint());
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint());
Expand All @@ -3732,16 +3742,16 @@ public synchronized void onNewCheckpoint(final PublishCheckpointRequest request,
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
logger.trace("Received new checkpoint {}", checkpoint);
// TODO: segrep - these are the states set after we perform our initial store recovery.
segmentReplicationService.startReplication(checkpoint, this, source, new SegmentReplicationService.ReplicationListener() {
segmentReplicationReplicaService.startReplication(checkpoint, this, source, new SegmentReplicationReplicaService.ReplicationListener() {
@Override
public void onReplicationDone(ReplicationState state) {
finalizeReplication();
markReplicationComplete();
logger.debug("Replication complete to {}", getLatestReplicationCheckpoint());
}

@Override
public void onReplicationFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
finalizeReplication();
markReplicationComplete();
logger.error("Failure", e);
}
});
Expand All @@ -3758,7 +3768,7 @@ public void markAsReplicating() {
this.replicationState.setStage(ReplicationState.Stage.ACTIVE);
}

public void finalizeReplication() {
public void markReplicationComplete() {
this.replicationState.setStage(ReplicationState.Stage.INACTIVE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationService;
import org.opensearch.indices.replication.SegmentReplicationReplicaService;
import org.opensearch.indices.replication.checkpoint.TransportCheckpointPublisher;
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
import org.opensearch.node.Node;
Expand Down Expand Up @@ -836,8 +836,8 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
@Override
public IndexShard createShard(
final ShardRouting shardRouting,
final SegmentReplicationService replicaService,
final SegmentReplicationService.ReplicationListener replicationListener,
final SegmentReplicationReplicaService segmentReplicationReplicaService,
final SegmentReplicationReplicaService.ReplicationListener replicationListener,
final PrimaryShardReplicationSource replicationSource,
final PeerRecoveryTargetService recoveryTargetService,
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
Expand All @@ -855,7 +855,7 @@ public IndexShard createShard(
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
indexShard.addShardFailureCallback(onShardFailure);
indexShard.startRecovery(recoveryState, replicaService, replicationListener, replicationSource, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> {
indexShard.startRecovery(recoveryState, segmentReplicationReplicaService, replicationListener, replicationSource, recoveryTargetService, recoveryListener, repositoriesService, (type, mapping) -> {
assert recoveryState.getRecoverySource()
.getType() == RecoverySource.Type.LOCAL_SHARDS : "mapping update consumer only required by local shards recovery";
client.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
import org.opensearch.indices.recovery.RecoveryFailedException;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationService;
import org.opensearch.indices.replication.SegmentReplicationReplicaService;
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.ReplicationState;
Expand Down Expand Up @@ -118,7 +118,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final ClusterService clusterService;
private final ThreadPool threadPool;

private final SegmentReplicationService replicationReplicaService;
private final SegmentReplicationReplicaService segmentReplicationReplicaService;
private final PrimaryShardReplicationSource replicationSource;
private final PeerRecoveryTargetService recoveryTargetService;
private final ShardStateAction shardStateAction;
Expand Down Expand Up @@ -157,7 +157,7 @@ public IndicesClusterStateService(
final PrimaryReplicaSyncer primaryReplicaSyncer,
final GlobalCheckpointSyncAction globalCheckpointSyncAction,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationService replicationReplicaService,
final SegmentReplicationReplicaService replicationReplicaService,
final PrimaryShardReplicationSource replicationSource) {
this(
settings,
Expand Down Expand Up @@ -186,7 +186,7 @@ public IndicesClusterStateService(
final AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>> indicesService,
final ClusterService clusterService,
final ThreadPool threadPool,
final SegmentReplicationService replicationReplicaService,
final SegmentReplicationReplicaService replicationReplicaService,
final PrimaryShardReplicationSource replicationSource,
final PeerRecoveryTargetService recoveryTargetService,
final ShardStateAction shardStateAction,
Expand All @@ -201,7 +201,7 @@ public IndicesClusterStateService(
final RetentionLeaseSyncer retentionLeaseSyncer
) {
this.settings = settings;
this.replicationReplicaService = replicationReplicaService;
this.segmentReplicationReplicaService = replicationReplicaService;
this.replicationSource = replicationSource;
this.buildInIndexListener = Arrays.asList(
peerRecoverySourceService,
Expand Down Expand Up @@ -638,7 +638,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR
logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);
indicesService.createShard(
shardRouting,
replicationReplicaService,
segmentReplicationReplicaService,
new ReplicationListener(shardRouting, primaryTerm),
replicationSource,
recoveryTargetService,
Expand Down Expand Up @@ -759,7 +759,7 @@ private static DiscoveryNode findSourceNodeForPeerRecovery(
}


private class ReplicationListener implements SegmentReplicationService.ReplicationListener {
private class ReplicationListener implements SegmentReplicationReplicaService.ReplicationListener {

/**
* ShardRouting with which the shard was created
Expand Down Expand Up @@ -1038,8 +1038,8 @@ U createIndex(IndexMetadata indexMetadata, List<IndexEventListener> builtInIndex
*/
T createShard(
ShardRouting shardRouting,
SegmentReplicationService replicaService,
SegmentReplicationService.ReplicationListener replicationListener,
SegmentReplicationReplicaService replicaService,
SegmentReplicationReplicaService.ReplicationListener replicationListener,
PrimaryShardReplicationSource replicationSource,
PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,6 @@ public void onFailure(Exception e) {
}

private void performNormalFlushOnInactive(IndexShard shard) {
if (shard.routingEntry().primary() == false) {
return;
}
logger.debug("flushing shard {} on inactive", shard.routingEntry());
shard.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF

final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();


public void writeFileChunk(StoreFileMetadata fileMetadata, long position, BytesReference content, boolean lastChunk)
throws IOException {
assert Transports.assertNotTransportThread("multi_file_writer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@
/**
* Orchestrator of replication events.
*/
public class SegmentReplicationService implements IndexEventListener {
public class SegmentReplicationReplicaService implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(SegmentReplicationService.class);
private static final Logger logger = LogManager.getLogger(SegmentReplicationReplicaService.class);

private final ThreadPool threadPool;
private final RecoverySettings recoverySettings;
Expand All @@ -70,9 +70,9 @@ public ReplicationCollection getOnGoingReplications() {

private final ReplicationCollection onGoingReplications;

public SegmentReplicationService(final ThreadPool threadPool,
final RecoverySettings recoverySettings,
final TransportService transportService) {
public SegmentReplicationReplicaService(final ThreadPool threadPool,
final RecoverySettings recoverySettings,
final TransportService transportService) {
this.threadPool = threadPool;
this.recoverySettings = recoverySettings;
this.transportService = transportService;
Expand Down Expand Up @@ -145,7 +145,7 @@ private void setupReplicaShard(IndexShard indexShard) throws IndexShardRecoveryE

public void startReplication(final ReplicationCheckpoint checkpoint, final IndexShard indexShard, PrimaryShardReplicationSource source, final ReplicationListener listener) {
final long replicationId = onGoingReplications.startReplication(checkpoint, indexShard, source, listener, recoverySettings.activityTimeout());
logger.info("Starting replication {}", replicationId);
logger.trace("Starting replication {}", replicationId);
threadPool.generic().execute(new ReplicationRunner(replicationId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationService;
import org.opensearch.indices.replication.SegmentReplicationReplicaService;
import org.opensearch.indices.replication.copy.PrimaryShardReplicationSource;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -37,7 +37,7 @@ public class TransportPublishShardCheckpointAction extends TransportReplicationA

public static final String ACTION_NAME = PublishCheckpointAction.NAME + "[s]";

private final SegmentReplicationService replicationService;
private final SegmentReplicationReplicaService replicationService;
private final PrimaryShardReplicationSource source;

@Inject
Expand All @@ -49,7 +49,7 @@ public TransportPublishShardCheckpointAction(
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
SegmentReplicationService segmentCopyService,
SegmentReplicationReplicaService segmentCopyService,
PrimaryShardReplicationSource source) {
super(
settings,
Expand Down
Loading

0 comments on commit f646a9e

Please sign in to comment.