Skip to content

Commit

Permalink
Updates to segrep classes to mirror refactoring in main (#2432)
Browse files Browse the repository at this point in the history
* Updating ReplicationRef to mirror RecoveryRef

The class now simply extends GatedAutoCloseable and its constructor invokes the superclass.

Signed-off-by: Kartik Ganesh <[email protected]>

* Removing the SegmentInfosRef class

As with the now-deleted IndexCommitRef class, this wrapper class is not needed. It has been replaced with a GatedCloseable instance wrapping a SegmentInfos

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg authored Mar 10, 2022
1 parent 7360562 commit 71dc3ac
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 32 deletions.
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1122,9 +1122,10 @@ public abstract void forceMerge(
/**
* Fetch a snapshot of the latest SegmentInfos from the engine and ensure that segment files are retained in the directory
* until closed.
* @return {@link SegmentInfosRef} - A ref to segmentInfos that must be closed for segment files to be deleted.
* @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that must be closed for segment files
* to be deleted.
*/
public SegmentInfosRef getLatestSegmentInfosSafe() {
public GatedCloseable<SegmentInfos> getLatestSegmentInfosSafe() {
return null;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2269,15 +2269,15 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(final boolean flushFir
}

@Override
public SegmentInfosRef getLatestSegmentInfosSafe() {
public GatedCloseable<SegmentInfos> getLatestSegmentInfosSafe() {
assert (engineConfig.isReadOnly() == false);
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
return new Engine.SegmentInfosRef(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1498,10 +1498,10 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
/**
* Fetch a ref to the latest SegmentInfos held by the engine. This ensures the files will not be deleted until
* the ref is closed.
* @return {@link Engine.SegmentInfosRef}
* @return a {@link GatedCloseable} wrapper around a {@link SegmentInfos} instance
* @throws EngineException - When infos cannot be retrieved from the Engine.
*/
public Engine.SegmentInfosRef getLatestSegmentInfosSafe() throws EngineException {
public GatedCloseable<SegmentInfos> getLatestSegmentInfosSafe() throws EngineException {
return getEngine().getLatestSegmentInfosSafe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void doReplication(final long replicationId) {
logger.trace("not running replication with id [{}] - can not find it (probably finished)", replicationId);
return;
}
final ReplicationTarget replicationTarget = replicationRef.target();
final ReplicationTarget replicationTarget = replicationRef.get();
timer = replicationTarget.state().getTimer();
final IndexShard indexShard = replicationTarget.getIndexShard();

Expand Down Expand Up @@ -217,7 +217,7 @@ public void onFailure(Exception e) {
);
onGoingReplications.failReplication(
replicationId,
new ReplicationFailedException(replicationRef.target().getIndexShard(), "unexpected error", e),
new ReplicationFailedException(replicationRef.get().getIndexShard(), "unexpected error", e),
true // be safe
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;

Expand All @@ -21,15 +21,15 @@

public class CopyState extends AbstractRefCounted {

private final Engine.SegmentInfosRef segmentInfosRef;
private final GatedCloseable<SegmentInfos> segmentInfosRef;
private final ReplicationCheckpoint checkpoint;
private final Store.MetadataSnapshot metadataSnapshot;
private final byte[] infosBytes;

CopyState(IndexShard shard) throws IOException {
super("replication-nrt-state");
this.segmentInfosRef = shard.getLatestSegmentInfosSafe();
final SegmentInfos segmentInfos = segmentInfosRef.getSegmentInfos();
final SegmentInfos segmentInfos = segmentInfosRef.get();
this.checkpoint = new ReplicationCheckpoint(
shard.shardId(),
shard.getOperationPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void messageReceived(final ReplicationFileChunkRequest request, Transport
ReplicationCollection.ReplicationRef replicationRef = segmentReplicationReplicaService.getOnGoingReplications()
.getReplicationSafe(request.getReplicationId(), request.shardId())
) {
final ReplicationTarget replicationTarget = replicationRef.target();
final ReplicationTarget replicationTarget = replicationRef.get();
final ActionListener<Void> listener = createOrFinishListener(replicationRef, channel, Actions.FILE_CHUNK, request);
if (listener == null) {
return;
Expand Down Expand Up @@ -274,7 +274,7 @@ private ActionListener<Void> createOrFinishListener(
final ReplicationFileChunkRequest request,
final CheckedFunction<Void, TransportResponse, Exception> responseFn
) {
final ReplicationTarget replicationTarget = replicationRef.target();
final ReplicationTarget replicationTarget = replicationRef.get();
final ActionListener<TransportResponse> channelListener = new ChannelActionListener<>(channel, action, request);
final ActionListener<Void> voidListener = ActionListener.map(channelListener, responseFn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.common.concurrent.GatedAutoCloseable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
Expand All @@ -48,7 +49,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
Expand Down Expand Up @@ -125,7 +125,7 @@ public ReplicationRef getReplicationSafe(long id, ShardId shardId) {
if (replicationRef == null) {
throw new IndexShardClosedException(shardId);
}
assert replicationRef.target().getIndexShard().shardId().equals(shardId);
assert replicationRef.get().getIndexShard().shardId().equals(shardId);
return replicationRef;
}

Expand Down Expand Up @@ -221,29 +221,15 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
* causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link ReplicationRef#close()} is called.
*/
public static class ReplicationRef implements AutoCloseable {

private final ReplicationTarget target;
private final AtomicBoolean closed = new AtomicBoolean(false);
public static class ReplicationRef extends GatedAutoCloseable<ReplicationTarget> {

/**
* Important: {@link ReplicationTarget#tryIncRef()} should
* be *successfully* called on status before
*/
public ReplicationRef(ReplicationTarget target) {
this.target = target;
this.target.setLastAccessTime();
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
target.decRef();
}
}

public ReplicationTarget target() {
return target;
super(target, target::decRef);
target.setLastAccessTime();
}
}

Expand Down

0 comments on commit 71dc3ac

Please sign in to comment.