Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to segrep classes to mirror refactoring in main #2432

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1122,9 +1122,10 @@ public abstract void forceMerge(
/**
* Fetch a snapshot of the latest SegmentInfos from the engine and ensure that segment files are retained in the directory
* until closed.
* @return {@link SegmentInfosRef} - A ref to segmentInfos that must be closed for segment files to be deleted.
* @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that must be closed for segment files
* to be deleted.
*/
public SegmentInfosRef getLatestSegmentInfosSafe() {
public GatedCloseable<SegmentInfos> getLatestSegmentInfosSafe() {
return null;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2269,15 +2269,15 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(final boolean flushFir
}

@Override
public SegmentInfosRef getLatestSegmentInfosSafe() {
public GatedCloseable<SegmentInfos> getLatestSegmentInfosSafe() {
assert (engineConfig.isReadOnly() == false);
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
return new Engine.SegmentInfosRef(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1498,10 +1498,10 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
/**
* Fetch a ref to the latest SegmentInfos held by the engine. This ensures the files will not be deleted until
* the ref is closed.
* @return {@link Engine.SegmentInfosRef}
* @return a {@link GatedCloseable} wrapper around a {@link SegmentInfos} instance
* @throws EngineException - When infos cannot be retrieved from the Engine.
*/
public Engine.SegmentInfosRef getLatestSegmentInfosSafe() throws EngineException {
public GatedCloseable<SegmentInfos> getLatestSegmentInfosSafe() throws EngineException {
return getEngine().getLatestSegmentInfosSafe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void doReplication(final long replicationId) {
logger.trace("not running replication with id [{}] - can not find it (probably finished)", replicationId);
return;
}
final ReplicationTarget replicationTarget = replicationRef.target();
final ReplicationTarget replicationTarget = replicationRef.get();
timer = replicationTarget.state().getTimer();
final IndexShard indexShard = replicationTarget.getIndexShard();

Expand Down Expand Up @@ -217,7 +217,7 @@ public void onFailure(Exception e) {
);
onGoingReplications.failReplication(
replicationId,
new ReplicationFailedException(replicationRef.target().getIndexShard(), "unexpected error", e),
new ReplicationFailedException(replicationRef.get().getIndexShard(), "unexpected error", e),
true // be safe
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;

Expand All @@ -21,15 +21,15 @@

public class CopyState extends AbstractRefCounted {

private final Engine.SegmentInfosRef segmentInfosRef;
private final GatedCloseable<SegmentInfos> segmentInfosRef;
private final ReplicationCheckpoint checkpoint;
private final Store.MetadataSnapshot metadataSnapshot;
private final byte[] infosBytes;

CopyState(IndexShard shard) throws IOException {
super("replication-nrt-state");
this.segmentInfosRef = shard.getLatestSegmentInfosSafe();
final SegmentInfos segmentInfos = segmentInfosRef.getSegmentInfos();
final SegmentInfos segmentInfos = segmentInfosRef.get();
this.checkpoint = new ReplicationCheckpoint(
shard.shardId(),
shard.getOperationPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void messageReceived(final ReplicationFileChunkRequest request, Transport
ReplicationCollection.ReplicationRef replicationRef = segmentReplicationReplicaService.getOnGoingReplications()
.getReplicationSafe(request.getReplicationId(), request.shardId())
) {
final ReplicationTarget replicationTarget = replicationRef.target();
final ReplicationTarget replicationTarget = replicationRef.get();
final ActionListener<Void> listener = createOrFinishListener(replicationRef, channel, Actions.FILE_CHUNK, request);
if (listener == null) {
return;
Expand Down Expand Up @@ -274,7 +274,7 @@ private ActionListener<Void> createOrFinishListener(
final ReplicationFileChunkRequest request,
final CheckedFunction<Void, TransportResponse, Exception> responseFn
) {
final ReplicationTarget replicationTarget = replicationRef.target();
final ReplicationTarget replicationTarget = replicationRef.get();
final ActionListener<TransportResponse> channelListener = new ChannelActionListener<>(channel, action, request);
final ActionListener<Void> voidListener = ActionListener.map(channelListener, responseFn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.common.concurrent.GatedAutoCloseable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
Expand All @@ -48,7 +49,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node
Expand Down Expand Up @@ -125,7 +125,7 @@ public ReplicationRef getReplicationSafe(long id, ShardId shardId) {
if (replicationRef == null) {
throw new IndexShardClosedException(shardId);
}
assert replicationRef.target().getIndexShard().shardId().equals(shardId);
assert replicationRef.get().getIndexShard().shardId().equals(shardId);
return replicationRef;
}

Expand Down Expand Up @@ -221,29 +221,15 @@ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) {
* causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources
* will not be freed until {@link ReplicationRef#close()} is called.
*/
public static class ReplicationRef implements AutoCloseable {

private final ReplicationTarget target;
private final AtomicBoolean closed = new AtomicBoolean(false);
public static class ReplicationRef extends GatedAutoCloseable<ReplicationTarget> {
kartg marked this conversation as resolved.
Show resolved Hide resolved

/**
* Important: {@link ReplicationTarget#tryIncRef()} should
* be *successfully* called on status before
*/
public ReplicationRef(ReplicationTarget target) {
this.target = target;
this.target.setLastAccessTime();
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
target.decRef();
}
}

public ReplicationTarget target() {
return target;
super(target, target::decRef);
target.setLastAccessTime();
}
}

Expand Down