diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index a9054e5ca4122..e036280e947ce 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -1122,9 +1122,10 @@ public abstract void forceMerge( /** * Fetch a snapshot of the latest SegmentInfos from the engine and ensure that segment files are retained in the directory * until closed. - * @return {@link SegmentInfosRef} - A ref to segmentInfos that must be closed for segment files to be deleted. + * @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that must be closed for segment files + * to be deleted. */ - public SegmentInfosRef getLatestSegmentInfosSafe() { + public GatedCloseable getLatestSegmentInfosSafe() { return null; }; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index f6ea1c73271dc..a42a157fd071b 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2269,7 +2269,7 @@ public GatedCloseable acquireLastIndexCommit(final boolean flushFir } @Override - public SegmentInfosRef getLatestSegmentInfosSafe() { + public GatedCloseable getLatestSegmentInfosSafe() { assert (engineConfig.isReadOnly() == false); final SegmentInfos segmentInfos = getLatestSegmentInfos(); try { @@ -2277,7 +2277,7 @@ public SegmentInfosRef getLatestSegmentInfosSafe() { } catch (IOException e) { throw new EngineException(shardId, e.getMessage(), e); } - return new Engine.SegmentInfosRef(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos)); + return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos)); } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 87984477e57e2..7720103085669 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1498,10 +1498,10 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th /** * Fetch a ref to the latest SegmentInfos held by the engine. This ensures the files will not be deleted until * the ref is closed. - * @return {@link Engine.SegmentInfosRef} + * @return a {@link GatedCloseable} wrapper around a {@link SegmentInfos} instance * @throws EngineException - When infos cannot be retrieved from the Engine. */ - public Engine.SegmentInfosRef getLatestSegmentInfosSafe() throws EngineException { + public GatedCloseable getLatestSegmentInfosSafe() throws EngineException { return getEngine().getLatestSegmentInfosSafe(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java index 353a2ebefe8b2..a2e403f563dfa 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java @@ -178,7 +178,7 @@ private void doReplication(final long replicationId) { logger.trace("not running replication with id [{}] - can not find it (probably finished)", replicationId); return; } - final ReplicationTarget replicationTarget = replicationRef.target(); + final ReplicationTarget replicationTarget = replicationRef.get(); timer = replicationTarget.state().getTimer(); final IndexShard indexShard = replicationTarget.getIndexShard(); @@ -217,7 +217,7 @@ public void onFailure(Exception e) { ); onGoingReplications.failReplication( replicationId, - new ReplicationFailedException(replicationRef.target().getIndexShard(), "unexpected error", e), + new ReplicationFailedException(replicationRef.get().getIndexShard(), "unexpected error", e), true // be safe ); } else { diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java index 8b99b74b75a81..7d57cf8188d16 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java @@ -11,8 +11,8 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.util.concurrent.AbstractRefCounted; -import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; @@ -21,7 +21,7 @@ public class CopyState extends AbstractRefCounted { - private final Engine.SegmentInfosRef segmentInfosRef; + private final GatedCloseable segmentInfosRef; private final ReplicationCheckpoint checkpoint; private final Store.MetadataSnapshot metadataSnapshot; private final byte[] infosBytes; @@ -29,7 +29,7 @@ public class CopyState extends AbstractRefCounted { CopyState(IndexShard shard) throws IOException { super("replication-nrt-state"); this.segmentInfosRef = shard.getLatestSegmentInfosSafe(); - final SegmentInfos segmentInfos = segmentInfosRef.getSegmentInfos(); + final SegmentInfos segmentInfos = segmentInfosRef.get(); this.checkpoint = new ReplicationCheckpoint( shard.shardId(), shard.getOperationPrimaryTerm(), diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java index 1d252897d5e93..3be08e6c10ea9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationSource.java @@ -231,7 +231,7 @@ public void messageReceived(final ReplicationFileChunkRequest request, Transport ReplicationCollection.ReplicationRef replicationRef = segmentReplicationReplicaService.getOnGoingReplications() .getReplicationSafe(request.getReplicationId(), request.shardId()) ) { - final ReplicationTarget replicationTarget = replicationRef.target(); + final ReplicationTarget replicationTarget = replicationRef.get(); final ActionListener listener = createOrFinishListener(replicationRef, channel, Actions.FILE_CHUNK, request); if (listener == null) { return; @@ -274,7 +274,7 @@ private ActionListener createOrFinishListener( final ReplicationFileChunkRequest request, final CheckedFunction responseFn ) { - final ReplicationTarget replicationTarget = replicationRef.target(); + final ReplicationTarget replicationTarget = replicationRef.get(); final ActionListener channelListener = new ChannelActionListener<>(channel, action, request); final ActionListener voidListener = ActionListener.map(channelListener, responseFn); diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java index 9c659cbdc52b2..248d8d8f833c6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/ReplicationCollection.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.common.concurrent.GatedAutoCloseable; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; @@ -48,7 +49,6 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; /** * This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node @@ -125,7 +125,7 @@ public ReplicationRef getReplicationSafe(long id, ShardId shardId) { if (replicationRef == null) { throw new IndexShardClosedException(shardId); } - assert replicationRef.target().getIndexShard().shardId().equals(shardId); + assert replicationRef.get().getIndexShard().shardId().equals(shardId); return replicationRef; } @@ -221,29 +221,15 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) { * causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources * will not be freed until {@link ReplicationRef#close()} is called. */ - public static class ReplicationRef implements AutoCloseable { - - private final ReplicationTarget target; - private final AtomicBoolean closed = new AtomicBoolean(false); + public static class ReplicationRef extends GatedAutoCloseable { /** * Important: {@link ReplicationTarget#tryIncRef()} should * be *successfully* called on status before */ public ReplicationRef(ReplicationTarget target) { - this.target = target; - this.target.setLastAccessTime(); - } - - @Override - public void close() { - if (closed.compareAndSet(false, true)) { - target.decRef(); - } - } - - public ReplicationTarget target() { - return target; + super(target, target::decRef); + target.setLastAccessTime(); } }