diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index 09caf8f1e4358..855cb34ae0587 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -101,8 +101,8 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.NodeIndicesStats; import org.opensearch.indices.analysis.AnalysisModule; -import org.opensearch.indices.recovery.RecoveryState.Stage; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; +import org.opensearch.indices.recovery.RecoveryState.Stage; import org.opensearch.node.NodeClosedException; import org.opensearch.node.RecoverySettingsChunkSizePlugin; import org.opensearch.plugins.AnalysisPlugin; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 0f088a13d5c5a..8002dfe688def 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -157,6 +157,7 @@ import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryFailedException; +import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.repositories.RepositoriesService; @@ -2876,7 +2877,7 @@ protected Engine getEngineOrNull() { public void startRecovery( RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, - PeerRecoveryTargetService.RecoveryListener recoveryListener, + RecoveryListener recoveryListener, RepositoriesService repositoriesService, Consumer mappingUpdateConsumer, IndicesService indicesService @@ -2909,7 +2910,7 @@ public void startRecovery( recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener); } catch (Exception e) { failShard("corrupted preexisting index", e); - recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); + recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); } break; case SNAPSHOT: @@ -2984,15 +2985,15 @@ public void startRecovery( private void executeRecovery( String reason, RecoveryState recoveryState, - PeerRecoveryTargetService.RecoveryListener recoveryListener, + RecoveryListener recoveryListener, CheckedConsumer, Exception> action ) { markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> { if (r) { - recoveryListener.onRecoveryDone(recoveryState); + recoveryListener.onDone(recoveryState); } - }, e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action)); + }, e -> recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action)); } /** diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b5da0ae1f7688..1c7e45323813c 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -136,6 +136,7 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.PeerRecoveryTargetService; +import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.node.Node; import org.opensearch.plugins.IndexStorePlugin; @@ -839,7 +840,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada public IndexShard createShard( final ShardRouting shardRouting, final PeerRecoveryTargetService recoveryTargetService, - final PeerRecoveryTargetService.RecoveryListener recoveryListener, + final RecoveryListener recoveryListener, final RepositoriesService repositoriesService, final Consumer onShardFailure, final Consumer globalCheckpointSyncer, diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 29f74f8a86d85..d1623df156593 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -78,8 +78,9 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.PeerRecoverySourceService; import org.opensearch.indices.recovery.PeerRecoveryTargetService; -import org.opensearch.indices.recovery.RecoveryFailedException; +import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.search.SearchService; import org.opensearch.snapshots.SnapshotShardsService; @@ -624,7 +625,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR indicesService.createShard( shardRouting, recoveryTargetService, - new RecoveryListener(shardRouting, primaryTerm), + new RecoveryListener(shardRouting, primaryTerm, this), repositoriesService, failedShardHandler, globalCheckpointSyncer, @@ -739,39 +740,16 @@ private static DiscoveryNode findSourceNodeForPeerRecovery( return sourceNode; } - private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener { - - /** - * ShardRouting with which the shard was created - */ - private final ShardRouting shardRouting; - - /** - * Primary term with which the shard was created - */ - private final long primaryTerm; - - private RecoveryListener(final ShardRouting shardRouting, final long primaryTerm) { - this.shardRouting = shardRouting; - this.primaryTerm = primaryTerm; - } - - @Override - public void onRecoveryDone(final RecoveryState state) { - shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); - } - - @Override - public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { - handleRecoveryFailure(shardRouting, sendShardFailure, e); - } - } - // package-private for testing - synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) { + public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolean sendShardFailure, Exception failure) { failAndRemoveShard(shardRouting, sendShardFailure, "failed recovery", failure, clusterService.state()); } + public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) { + RecoveryState RecState = (RecoveryState) state; + shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER); + } + private void failAndRemoveShard( ShardRouting shardRouting, boolean sendShardFailure, @@ -1004,7 +982,7 @@ U createIndex(IndexMetadata indexMetadata, List builtInIndex T createShard( ShardRouting shardRouting, PeerRecoveryTargetService recoveryTargetService, - PeerRecoveryTargetService.RecoveryListener recoveryListener, + RecoveryListener recoveryListener, RepositoriesService repositoriesService, Consumer onShardFailure, Consumer globalCheckpointSyncer, diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 4ae188abe5896..e13022afa81ba 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -37,10 +37,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.RateLimiter; +import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; -import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.support.ChannelActionListener; @@ -69,7 +69,8 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogCorruptedException; -import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; @@ -124,7 +125,7 @@ public static class Actions { private final RecoverySettings recoverySettings; private final ClusterService clusterService; - private final RecoveriesCollection onGoingRecoveries; + private final ReplicationCollection onGoingRecoveries; public PeerRecoveryTargetService( ThreadPool threadPool, @@ -136,7 +137,7 @@ public PeerRecoveryTargetService( this.transportService = transportService; this.recoverySettings = recoverySettings; this.clusterService = clusterService; - this.onGoingRecoveries = new RecoveriesCollection(logger, threadPool); + this.onGoingRecoveries = new ReplicationCollection<>(logger, threadPool); transportService.registerRequestHandler( Actions.FILES_INFO, @@ -185,13 +186,16 @@ public PeerRecoveryTargetService( @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { if (indexShard != null) { - onGoingRecoveries.cancelRecoveriesForShard(shardId, "shard closed"); + onGoingRecoveries.cancelForShard(shardId, "shard closed"); } } public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) { // create a new recovery status, and process... - final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout()); + final long recoveryId = onGoingRecoveries.start( + new RecoveryTarget(indexShard, sourceNode, listener), + recoverySettings.activityTimeout() + ); // we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause // assertions to trip if we executed it on the same thread hence we fork off to the generic threadpool. threadPool.generic().execute(new RecoveryRunner(recoveryId)); @@ -208,9 +212,9 @@ protected void retryRecovery(final long recoveryId, final String reason, TimeVal } private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) { - RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout); + RecoveryTarget newTarget = onGoingRecoveries.reset(recoveryId, activityTimeout); if (newTarget != null) { - threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId())); + threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.getId())); } } @@ -225,7 +229,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi final TransportRequest requestToSend; final StartRecoveryRequest startRequest; final ReplicationTimer timer; - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { + try (ReplicationRef recoveryRef = onGoingRecoveries.get(recoveryId)) { if (recoveryRef == null) { logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId); return; @@ -248,7 +252,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi } catch (final Exception e) { // this will be logged as warning later on... logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); - onGoingRecoveries.failRecovery( + onGoingRecoveries.fail( recoveryId, new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true @@ -339,28 +343,17 @@ public static StartRecoveryRequest getStartRecoveryRequest( localNode, metadataSnapshot, recoveryTarget.state().getPrimary(), - recoveryTarget.recoveryId(), + recoveryTarget.getId(), startingSeqNo ); return request; } - /** - * The recovery listener - * - * @opensearch.internal - */ - public interface RecoveryListener { - void onRecoveryDone(RecoveryState state); - - void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure); - } - class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler { @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request); if (listener == null) { return; @@ -375,7 +368,7 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request); if (listener == null) { return; @@ -391,7 +384,7 @@ class HandoffPrimaryContextRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { recoveryRef.get().handoffPrimaryContext(request.primaryContext()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); @@ -404,7 +397,7 @@ class TranslogOperationsRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener listener = createOrFinishListener( recoveryRef, @@ -424,7 +417,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin private void performTranslogOps( final RecoveryTranslogOperationsRequest request, final ActionListener listener, - final RecoveryRef recoveryRef + final ReplicationRef recoveryRef ) { final RecoveryTarget recoveryTarget = recoveryRef.get(); @@ -439,7 +432,12 @@ private void performTranslogOps( @Override public void onNewClusterState(ClusterState state) { threadPool.generic().execute(ActionRunnable.wrap(listener, l -> { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { + try ( + ReplicationRef recoveryRef = onGoingRecoveries.getSafe( + request.recoveryId(), + request.shardId() + ) + ) { performTranslogOps(request, listener, recoveryRef); } })); @@ -485,7 +483,7 @@ class FilesInfoRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request); if (listener == null) { return; @@ -508,7 +506,7 @@ class CleanFilesRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request); if (listener == null) { return; @@ -527,7 +525,7 @@ class FileChunkTransportRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request); if (listener == null) { @@ -563,7 +561,7 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha } private ActionListener createOrFinishListener( - final RecoveryRef recoveryRef, + final ReplicationRef recoveryRef, final TransportChannel channel, final String action, final RecoveryTransportRequest request @@ -572,7 +570,7 @@ private ActionListener createOrFinishListener( } private ActionListener createOrFinishListener( - final RecoveryRef recoveryRef, + final ReplicationRef recoveryRef, final TransportChannel channel, final String action, final RecoveryTransportRequest request, @@ -609,10 +607,10 @@ class RecoveryRunner extends AbstractRunnable { @Override public void onFailure(Exception e) { - try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { + try (ReplicationRef recoveryRef = onGoingRecoveries.get(recoveryId)) { if (recoveryRef != null) { logger.error(() -> new ParameterizedMessage("unexpected error during recovery [{}], failing shard", recoveryId), e); - onGoingRecoveries.failRecovery( + onGoingRecoveries.fail( recoveryId, new RecoveryFailedException(recoveryRef.get().state(), "unexpected error", e), true // be safe @@ -648,7 +646,7 @@ private RecoveryResponseHandler(final StartRecoveryRequest request, final Replic public void handleResponse(RecoveryResponse recoveryResponse) { final TimeValue recoveryTime = new TimeValue(timer.time()); // do this through ongoing recoveries to remove it from the collection - onGoingRecoveries.markRecoveryAsDone(recoveryId); + onGoingRecoveries.markAsDone(recoveryId); if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); sb.append('[') @@ -709,11 +707,7 @@ private void onException(Exception e) { Throwable cause = ExceptionsHelper.unwrapCause(e); if (cause instanceof CancellableThreads.ExecutionCancelledException) { // this can also come from the source wrapped in a RemoteTransportException - onGoingRecoveries.failRecovery( - recoveryId, - new RecoveryFailedException(request, "source has canceled the recovery", cause), - false - ); + onGoingRecoveries.fail(recoveryId, new RecoveryFailedException(request, "source has canceled the recovery", cause), false); return; } if (cause instanceof RecoveryEngineException) { @@ -766,11 +760,11 @@ private void onException(Exception e) { } if (cause instanceof AlreadyClosedException) { - onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, "source shard is closed", cause), false); + onGoingRecoveries.fail(recoveryId, new RecoveryFailedException(request, "source shard is closed", cause), false); return; } - onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true); + onGoingRecoveries.fail(recoveryId, new RecoveryFailedException(request, e), true); } @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveriesCollection.java deleted file mode 100644 index 38b72dd0f7dee..0000000000000 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveriesCollection.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.indices.recovery; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.OpenSearchTimeoutException; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.concurrent.AutoCloseableRefCounted; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.AbstractRunnable; -import org.opensearch.common.util.concurrent.ConcurrentCollections; -import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardClosedException; -import org.opensearch.index.shard.ShardId; -import org.opensearch.threadpool.ThreadPool; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ConcurrentMap; - -/** - * This class holds a collection of all on going recoveries on the current node (i.e., the node is the target node - * of those recoveries). The class is used to guarantee concurrent semantics such that once a recoveries was done/cancelled/failed - * no other thread will be able to find it. Last, the {@link RecoveryRef} inner class verifies that recovery temporary files - * and store will only be cleared once on going usage is finished. - * - * @opensearch.internal - */ -public class RecoveriesCollection { - - /** This is the single source of truth for ongoing recoveries. If it's not here, it was canceled or done */ - private final ConcurrentMap onGoingRecoveries = ConcurrentCollections.newConcurrentMap(); - - private final Logger logger; - private final ThreadPool threadPool; - - public RecoveriesCollection(Logger logger, ThreadPool threadPool) { - this.logger = logger; - this.threadPool = threadPool; - } - - /** - * Starts are new recovery for the given shard, source node and state - * - * @return the id of the new recovery. - */ - public long startRecovery( - IndexShard indexShard, - DiscoveryNode sourceNode, - PeerRecoveryTargetService.RecoveryListener listener, - TimeValue activityTimeout - ) { - RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener); - startRecoveryInternal(recoveryTarget, activityTimeout); - return recoveryTarget.recoveryId(); - } - - private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue activityTimeout) { - RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.recoveryId(), recoveryTarget); - assert existingTarget == null : "found two RecoveryStatus instances with the same id"; - logger.trace( - "{} started recovery from {}, id [{}]", - recoveryTarget.shardId(), - recoveryTarget.sourceNode(), - recoveryTarget.recoveryId() - ); - threadPool.schedule( - new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout), - activityTimeout, - ThreadPool.Names.GENERIC - ); - } - - /** - * Resets the recovery and performs a recovery restart on the currently recovering index shard - * - * @see IndexShard#performRecoveryRestart() - * @return newly created RecoveryTarget - */ - public RecoveryTarget resetRecovery(final long recoveryId, final TimeValue activityTimeout) { - RecoveryTarget oldRecoveryTarget = null; - final RecoveryTarget newRecoveryTarget; - - try { - synchronized (onGoingRecoveries) { - // swap recovery targets in a synchronized block to ensure that the newly added recovery target is picked up by - // cancelRecoveriesForShard whenever the old recovery target is picked up - oldRecoveryTarget = onGoingRecoveries.remove(recoveryId); - if (oldRecoveryTarget == null) { - return null; - } - - newRecoveryTarget = oldRecoveryTarget.retryCopy(); - startRecoveryInternal(newRecoveryTarget, activityTimeout); - } - - // Closes the current recovery target - boolean successfulReset = oldRecoveryTarget.resetRecovery(newRecoveryTarget.cancellableThreads()); - if (successfulReset) { - logger.trace( - "{} restarted recovery from {}, id [{}], previous id [{}]", - newRecoveryTarget.shardId(), - newRecoveryTarget.sourceNode(), - newRecoveryTarget.recoveryId(), - oldRecoveryTarget.recoveryId() - ); - return newRecoveryTarget; - } else { - logger.trace( - "{} recovery could not be reset as it is already cancelled, recovery from {}, id [{}], previous id [{}]", - newRecoveryTarget.shardId(), - newRecoveryTarget.sourceNode(), - newRecoveryTarget.recoveryId(), - oldRecoveryTarget.recoveryId() - ); - cancelRecovery(newRecoveryTarget.recoveryId(), "recovery cancelled during reset"); - return null; - } - } catch (Exception e) { - // fail shard to be safe - oldRecoveryTarget.notifyListener(new RecoveryFailedException(oldRecoveryTarget.state(), "failed to retry recovery", e), true); - return null; - } - } - - public RecoveryTarget getRecoveryTarget(long id) { - return onGoingRecoveries.get(id); - } - - /** - * gets the {@link RecoveryTarget } for a given id. The RecoveryStatus returned has it's ref count already incremented - * to make sure it's safe to use. However, you must call {@link RecoveryTarget#decRef()} when you are done with it, typically - * by using this method in a try-with-resources clause. - *

- * Returns null if recovery is not found - */ - public RecoveryRef getRecovery(long id) { - RecoveryTarget status = onGoingRecoveries.get(id); - if (status != null && status.tryIncRef()) { - return new RecoveryRef(status); - } - return null; - } - - /** Similar to {@link #getRecovery(long)} but throws an exception if no recovery is found */ - public RecoveryRef getRecoverySafe(long id, ShardId shardId) { - RecoveryRef recoveryRef = getRecovery(id); - if (recoveryRef == null) { - throw new IndexShardClosedException(shardId); - } - assert recoveryRef.get().shardId().equals(shardId); - return recoveryRef; - } - - /** cancel the recovery with the given id (if found) and remove it from the recovery collection */ - public boolean cancelRecovery(long id, String reason) { - RecoveryTarget removed = onGoingRecoveries.remove(id); - boolean cancelled = false; - if (removed != null) { - logger.trace( - "{} canceled recovery from {}, id [{}] (reason [{}])", - removed.shardId(), - removed.sourceNode(), - removed.recoveryId(), - reason - ); - removed.cancel(reason); - cancelled = true; - } - return cancelled; - } - - /** - * fail the recovery with the given id (if found) and remove it from the recovery collection - * - * @param id id of the recovery to fail - * @param e exception with reason for the failure - * @param sendShardFailure true a shard failed message should be sent to the master - */ - public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFailure) { - RecoveryTarget removed = onGoingRecoveries.remove(id); - if (removed != null) { - logger.trace( - "{} failing recovery from {}, id [{}]. Send shard failure: [{}]", - removed.shardId(), - removed.sourceNode(), - removed.recoveryId(), - sendShardFailure - ); - removed.fail(e, sendShardFailure); - } - } - - /** mark the recovery with the given id as done (if found) */ - public void markRecoveryAsDone(long id) { - RecoveryTarget removed = onGoingRecoveries.remove(id); - if (removed != null) { - logger.trace("{} marking recovery from {} as done, id [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId()); - removed.markAsDone(); - } - } - - /** the number of ongoing recoveries */ - public int size() { - return onGoingRecoveries.size(); - } - - /** - * cancel all ongoing recoveries for the given shard - * - * @param reason reason for cancellation - * @param shardId shardId for which to cancel recoveries - * @return true if a recovery was cancelled - */ - public boolean cancelRecoveriesForShard(ShardId shardId, String reason) { - boolean cancelled = false; - List matchedRecoveries = new ArrayList<>(); - synchronized (onGoingRecoveries) { - for (Iterator it = onGoingRecoveries.values().iterator(); it.hasNext();) { - RecoveryTarget status = it.next(); - if (status.shardId().equals(shardId)) { - matchedRecoveries.add(status); - it.remove(); - } - } - } - for (RecoveryTarget removed : matchedRecoveries) { - logger.trace( - "{} canceled recovery from {}, id [{}] (reason [{}])", - removed.shardId(), - removed.sourceNode(), - removed.recoveryId(), - reason - ); - removed.cancel(reason); - cancelled = true; - } - return cancelled; - } - - /** - * a reference to {@link RecoveryTarget}, which implements {@link AutoCloseable}. closing the reference - * causes {@link RecoveryTarget#decRef()} to be called. This makes sure that the underlying resources - * will not be freed until {@link RecoveryRef#close()} is called. - * - * @opensearch.internal - */ - public static class RecoveryRef extends AutoCloseableRefCounted { - - /** - * Important: {@link RecoveryTarget#tryIncRef()} should - * be *successfully* called on status before - */ - public RecoveryRef(RecoveryTarget status) { - super(status); - status.setLastAccessTime(); - } - } - - private class RecoveryMonitor extends AbstractRunnable { - private final long recoveryId; - private final TimeValue checkInterval; - - private volatile long lastSeenAccessTime; - - private RecoveryMonitor(long recoveryId, long lastSeenAccessTime, TimeValue checkInterval) { - this.recoveryId = recoveryId; - this.checkInterval = checkInterval; - this.lastSeenAccessTime = lastSeenAccessTime; - } - - @Override - public void onFailure(Exception e) { - logger.error(() -> new ParameterizedMessage("unexpected error while monitoring recovery [{}]", recoveryId), e); - } - - @Override - protected void doRun() throws Exception { - RecoveryTarget status = onGoingRecoveries.get(recoveryId); - if (status == null) { - logger.trace("[monitor] no status found for [{}], shutting down", recoveryId); - return; - } - long accessTime = status.lastAccessTime(); - if (accessTime == lastSeenAccessTime) { - String message = "no activity after [" + checkInterval + "]"; - failRecovery( - recoveryId, - new RecoveryFailedException(status.state(), message, new OpenSearchTimeoutException(message)), - true // to be safe, we don't know what go stuck - ); - return; - } - lastSeenAccessTime = accessTime; - logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", recoveryId, lastSeenAccessTime); - threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC); - } - } - -} diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java new file mode 100644 index 0000000000000..b93c054ffa4bf --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.OpenSearchException; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.indices.cluster.IndicesClusterStateService; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; + +/** + * Listener that runs on changes in Recovery state + * + * @opensearch.internal + */ +public class RecoveryListener implements ReplicationListener { + + /** + * ShardRouting with which the shard was created + */ + private final ShardRouting shardRouting; + + /** + * Primary term with which the shard was created + */ + private final long primaryTerm; + + private final IndicesClusterStateService indicesClusterStateService; + + public RecoveryListener( + final ShardRouting shardRouting, + final long primaryTerm, + IndicesClusterStateService indicesClusterStateService + ) { + this.shardRouting = shardRouting; + this.primaryTerm = primaryTerm; + this.indicesClusterStateService = indicesClusterStateService; + } + + @Override + public void onDone(ReplicationState state) { + indicesClusterStateService.handleRecoveryDone(state, shardRouting, primaryTerm); + } + + @Override + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + indicesClusterStateService.handleRecoveryFailure(shardRouting, sendShardFailure, e); + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java index 35ac5cbc12bde..a3c7adb755145 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java @@ -45,6 +45,7 @@ import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; @@ -56,7 +57,7 @@ * * @opensearch.internal */ -public class RecoveryState implements ToXContentFragment, Writeable { +public class RecoveryState implements ReplicationState, ToXContentFragment, Writeable { /** * The stage of the recovery state diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index bb557cc6837ab..92897ab19ad64 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -32,22 +32,18 @@ package org.opensearch.indices.recovery; -import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.opensearch.Assertions; -import org.opensearch.OpenSearchException; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; -import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.engine.Engine; import org.opensearch.index.mapper.MapperException; import org.opensearch.index.seqno.ReplicationTracker; @@ -56,48 +52,33 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardNotRecoveringException; import org.opensearch.index.shard.IndexShardState; -import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; +import org.opensearch.indices.replication.common.ReplicationTarget; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationCollection; import java.io.IOException; import java.nio.file.Path; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** * Represents a recovery where the current node is the target node of the recovery. To track recoveries in a central place, instances of - * this class are created through {@link RecoveriesCollection}. + * this class are created through {@link ReplicationCollection}. * * @opensearch.internal */ -public class RecoveryTarget extends AbstractRefCounted implements RecoveryTargetHandler { - - private final Logger logger; - - private static final AtomicLong idGenerator = new AtomicLong(); +public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetHandler { private static final String RECOVERY_PREFIX = "recovery."; - private final ShardId shardId; - private final long recoveryId; - private final IndexShard indexShard; private final DiscoveryNode sourceNode; - private final MultiFileWriter multiFileWriter; - private final RecoveryRequestTracker requestTracker = new RecoveryRequestTracker(); - private final Store store; - private final PeerRecoveryTargetService.RecoveryListener listener; - - private final AtomicBoolean finished = new AtomicBoolean(); - private final CancellableThreads cancellableThreads; - - // last time this status was accessed - private volatile long lastAccessTime = System.nanoTime(); + protected final MultiFileWriter multiFileWriter; + protected final Store store; // latch that can be used to blockingly wait for RecoveryTarget to be closed private final CountDownLatch closedLatch = new CountDownLatch(1); @@ -109,27 +90,15 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget * @param sourceNode source node of the recovery where we recover from * @param listener called when recovery is completed/failed */ - public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener) { - super("recovery_status"); + public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) { + super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener); this.cancellableThreads = new CancellableThreads(); - this.recoveryId = idGenerator.incrementAndGet(); - this.listener = listener; - this.logger = Loggers.getLogger(getClass(), indexShard.shardId()); - this.indexShard = indexShard; this.sourceNode = sourceNode; - this.shardId = indexShard.shardId(); - final String tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + "."; - this.multiFileWriter = new MultiFileWriter( - indexShard.store(), - indexShard.recoveryState().getIndex(), - tempFilePrefix, - logger, - this::ensureRefCount - ); + indexShard.recoveryStats().incCurrentAsTarget(); this.store = indexShard.store(); - // make sure the store is not released until we are done. + final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + "."; + this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount); store.incRef(); - indexShard.recoveryStats().incCurrentAsTarget(); } /** @@ -141,23 +110,15 @@ public RecoveryTarget retryCopy() { return new RecoveryTarget(indexShard, sourceNode, listener); } - public ActionListener markRequestReceivedAndCreateListener(long requestSeqNo, ActionListener listener) { - return requestTracker.markReceivedAndCreateListener(requestSeqNo, listener); - } - - public long recoveryId() { - return recoveryId; - } - - public ShardId shardId() { - return shardId; - } - public IndexShard indexShard() { ensureRefCount(); return indexShard; } + public String source() { + return sourceNode.toString(); + } + public DiscoveryNode sourceNode() { return this.sourceNode; } @@ -170,29 +131,29 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } - /** return the last time this RecoveryStatus was used (based on System.nanoTime() */ - public long lastAccessTime() { - return lastAccessTime; + public Store store() { + ensureRefCount(); + return store; } - /** sets the lasAccessTime flag to now */ - public void setLastAccessTime() { - lastAccessTime = System.nanoTime(); + public String description() { + return "recovery from " + source(); } - public Store store() { - ensureRefCount(); - return store; + @Override + public void notifyListener(Exception e, boolean sendShardFailure) { + listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure); } /** * Closes the current recovery target and waits up to a certain timeout for resources to be freed. * Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done. */ - boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOException { + public boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException { + final long recoveryId = getId(); if (finished.compareAndSet(false, true)) { try { - logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId); + logger.debug("reset of recovery with shard {} and id [{}]", shardId(), recoveryId); } finally { // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now. decRef(); @@ -202,7 +163,7 @@ boolean resetRecovery(CancellableThreads newTargetCancellableThreads) throws IOE } catch (CancellableThreads.ExecutionCancelledException e) { logger.trace( "new recovery target cancelled for shard {} while waiting on old recovery target with id [{}] to close", - shardId, + shardId(), recoveryId ); return false; @@ -248,22 +209,7 @@ public void cancel(String reason) { * @param sendShardFailure indicates whether to notify the cluster-manager of the shard failure */ public void fail(RecoveryFailedException e, boolean sendShardFailure) { - if (finished.compareAndSet(false, true)) { - try { - notifyListener(e, sendShardFailure); - } finally { - try { - cancellableThreads.cancel("failed recovery [" + ExceptionsHelper.stackTrace(e) + "]"); - } finally { - // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now - decRef(); - } - } - } - } - - public void notifyListener(RecoveryFailedException e, boolean sendShardFailure) { - listener.onRecoveryFailure(state(), e, sendShardFailure); + super.fail(e, sendShardFailure); } /** mark the current recovery as done */ @@ -278,7 +224,7 @@ public void markAsDone() { // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now decRef(); } - listener.onRecoveryDone(state()); + listener.onDone(state()); } } @@ -287,7 +233,6 @@ protected void closeInternal() { try { multiFileWriter.close(); } finally { - // free store. increment happens in constructor store.decRef(); indexShard.recoveryStats().decCurrentAsTarget(); closedLatch.countDown(); @@ -296,15 +241,28 @@ protected void closeInternal() { @Override public String toString() { - return shardId + " [" + recoveryId + "]"; + return shardId() + " [" + getId() + "]"; } - private void ensureRefCount() { - if (refCount() <= 0) { - throw new OpenSearchException( - "RecoveryStatus is used but it's refcount is 0. Probably a mismatch between incRef/decRef " + "calls" - ); - } + @Override + protected String getPrefix() { + return RECOVERY_PREFIX; + } + + @Override + protected void onDone() { + assert multiFileWriter.tempFileNames.isEmpty() : "not all temporary files are renamed"; + // this might still throw an exception ie. if the shard is CLOSED due to some other event. + // it's safer to decrement the reference in a try finally here. + indexShard.postRecovery("peer recovery done"); + } + + /** + * if {@link #cancellableThreads()} was used, the threads will be interrupted. + */ + @Override + protected void onCancel(String reason) { + cancellableThreads.cancel(reason); } /*** Implementation of {@link RecoveryTargetHandler } */ @@ -374,7 +332,7 @@ public void indexTranslogOperations( translog.totalOperations(totalTranslogOps); assert indexShard().recoveryState() == state(); if (indexShard().state() != IndexShardState.RECOVERING) { - throw new IndexShardNotRecoveringException(shardId, indexShard().state()); + throw new IndexShardNotRecoveringException(shardId(), indexShard().state()); } /* * The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation @@ -460,7 +418,7 @@ public void cleanFiles( final String translogUUID = Translog.createEmptyTranslog( indexShard.shardPath().resolveTranslog(), globalCheckpoint, - shardId, + shardId(), indexShard.getPendingPrimaryTerm() ); store.associateIndexWithNewTranslog(translogUUID); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java new file mode 100644 index 0000000000000..609825eb5227b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -0,0 +1,297 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices.replication.common; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.common.concurrent.AutoCloseableRefCounted; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.AbstractRunnable; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardClosedException; +import org.opensearch.index.shard.ShardId; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +/** + * This class holds a collection of all on going replication events on the current node (i.e., the node is the target node + * of those events). The class is used to guarantee concurrent semantics such that once an event was done/cancelled/failed + * no other thread will be able to find it. Last, the {@link ReplicationRef} inner class verifies that temporary files + * and store will only be cleared once on going usage is finished. + * + * @opensearch.internal + */ +public class ReplicationCollection { + + /** This is the single source of truth for ongoing target events. If it's not here, it was canceled or done */ + private final ConcurrentMap onGoingTargetEvents = ConcurrentCollections.newConcurrentMap(); + + private final Logger logger; + private final ThreadPool threadPool; + + public ReplicationCollection(Logger logger, ThreadPool threadPool) { + this.logger = logger; + this.threadPool = threadPool; + } + + /** + * Starts a new target event for the given shard, source node and state + * + * @return the id of the new target event. + */ + public long start(T target, TimeValue activityTimeout) { + startInternal(target, activityTimeout); + return target.getId(); + } + + private void startInternal(T target, TimeValue activityTimeout) { + T existingTarget = onGoingTargetEvents.putIfAbsent(target.getId(), target); + assert existingTarget == null : "found two Target instances with the same id"; + logger.trace("started {}", target.description()); + threadPool.schedule( + new ReplicationMonitor(target.getId(), target.lastAccessTime(), activityTimeout), + activityTimeout, + ThreadPool.Names.GENERIC + ); + } + + /** + * Resets the target event and performs a restart on the current index shard + * + * @see IndexShard#performRecoveryRestart() + * @return newly created Target + */ + @SuppressWarnings(value = "unchecked") + public T reset(final long id, final TimeValue activityTimeout) { + T oldTarget = null; + final T newTarget; + + try { + synchronized (onGoingTargetEvents) { + // swap targets in a synchronized block to ensure that the newly added target is picked up by + // cancelForShard whenever the old target is picked up + oldTarget = onGoingTargetEvents.remove(id); + if (oldTarget == null) { + return null; + } + + newTarget = (T) oldTarget.retryCopy(); + startInternal(newTarget, activityTimeout); + } + + // Closes the current target + boolean successfulReset = oldTarget.reset(newTarget.cancellableThreads()); + if (successfulReset) { + logger.trace("restarted {}, previous id [{}]", newTarget.description(), oldTarget.getId()); + return newTarget; + } else { + logger.trace( + "{} could not be reset as it is already cancelled, previous id [{}]", + newTarget.description(), + oldTarget.getId() + ); + cancel(newTarget.getId(), "cancelled during reset"); + return null; + } + } catch (Exception e) { + // fail shard to be safe + assert oldTarget != null; + oldTarget.notifyListener(e, true); + return null; + } + } + + public T getTarget(long id) { + return onGoingTargetEvents.get(id); + } + + /** + * gets the {@link ReplicationTarget } for a given id. The ShardTarget returned has it's ref count already incremented + * to make sure it's safe to use. However, you must call {@link ReplicationTarget#decRef()} when you are done with it, typically + * by using this method in a try-with-resources clause. + *

+ * Returns null if target event is not found + */ + public ReplicationRef get(long id) { + T status = onGoingTargetEvents.get(id); + if (status != null && status.tryIncRef()) { + return new ReplicationRef(status); + } + return null; + } + + /** Similar to {@link #get(long)} but throws an exception if no target is found */ + public ReplicationRef getSafe(long id, ShardId shardId) { + ReplicationRef ref = get(id); + if (ref == null) { + throw new IndexShardClosedException(shardId); + } + assert ref.get().indexShard().shardId().equals(shardId); + return ref; + } + + /** cancel the target with the given id (if found) and remove it from the target collection */ + public boolean cancel(long id, String reason) { + T removed = onGoingTargetEvents.remove(id); + boolean cancelled = false; + if (removed != null) { + logger.trace("canceled {} (reason [{}])", removed.description(), reason); + removed.cancel(reason); + cancelled = true; + } + return cancelled; + } + + /** + * fail the target with the given id (if found) and remove it from the target collection + * + * @param id id of the target to fail + * @param e exception with reason for the failure + * @param sendShardFailure true a shard failed message should be sent to the master + */ + public void fail(long id, OpenSearchException e, boolean sendShardFailure) { + T removed = onGoingTargetEvents.remove(id); + if (removed != null) { + logger.trace("failing {}. Send shard failure: [{}]", removed.description(), sendShardFailure); + removed.fail(e, sendShardFailure); + } + } + + /** mark the target with the given id as done (if found) */ + public void markAsDone(long id) { + T removed = onGoingTargetEvents.remove(id); + if (removed != null) { + logger.trace("Marking {} as done", removed.description()); + removed.markAsDone(); + } + } + + /** the number of ongoing target events */ + public int size() { + return onGoingTargetEvents.size(); + } + + /** + * cancel all ongoing targets for the given shard + * + * @param reason reason for cancellation + * @param shardId shardId for which to cancel targets + * @return true if a target was cancelled + */ + public boolean cancelForShard(ShardId shardId, String reason) { + boolean cancelled = false; + List matchedTargets = new ArrayList<>(); + synchronized (onGoingTargetEvents) { + for (Iterator it = onGoingTargetEvents.values().iterator(); it.hasNext();) { + T status = it.next(); + if (status.indexShard().shardId().equals(shardId)) { + matchedTargets.add(status); + it.remove(); + } + } + } + for (T removed : matchedTargets) { + logger.trace("canceled {} (reason [{}])", removed.description(), reason); + removed.cancel(reason); + cancelled = true; + } + return cancelled; + } + + /** + * a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference + * causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources + * will not be freed until {@link ReplicationRef#close()} is called. + * + * @opensearch.internal + */ + public static class ReplicationRef extends AutoCloseableRefCounted { + + /** + * Important: {@link ReplicationTarget#tryIncRef()} should + * be *successfully* called on status before + */ + public ReplicationRef(T status) { + super(status); + status.setLastAccessTime(); + } + } + + private class ReplicationMonitor extends AbstractRunnable { + private final long id; + private final TimeValue checkInterval; + + private volatile long lastSeenAccessTime; + + private ReplicationMonitor(long id, long lastSeenAccessTime, TimeValue checkInterval) { + this.id = id; + this.checkInterval = checkInterval; + this.lastSeenAccessTime = lastSeenAccessTime; + } + + @Override + public void onFailure(Exception e) { + logger.error(() -> new ParameterizedMessage("unexpected error while monitoring [{}]", id), e); + } + + @Override + protected void doRun() throws Exception { + T status = onGoingTargetEvents.get(id); + if (status == null) { + logger.trace("[monitor] no status found for [{}], shutting down", id); + return; + } + long accessTime = status.lastAccessTime(); + if (accessTime == lastSeenAccessTime) { + String message = "no activity after [" + checkInterval + "]"; + fail( + id, + new OpenSearchTimeoutException(message), + true // to be safe, we don't know what go stuck + ); + return; + } + lastSeenAccessTime = accessTime; + logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", id, lastSeenAccessTime); + threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC); + } + } + +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java new file mode 100644 index 0000000000000..0666f475d496a --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.opensearch.OpenSearchException; + +/** + * Interface for listeners that run when there's a change in {@link ReplicationState} + * + * @opensearch.internal + */ +public interface ReplicationListener { + + void onDone(ReplicationState state); + + void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure); +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryRequestTracker.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationRequestTracker.java similarity index 96% rename from server/src/main/java/org/opensearch/indices/recovery/RecoveryRequestTracker.java rename to server/src/main/java/org/opensearch/indices/replication/common/ReplicationRequestTracker.java index 71a7f2776f324..0b0d20fc9f17e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryRequestTracker.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationRequestTracker.java @@ -30,7 +30,7 @@ * GitHub history for details. */ -package org.opensearch.indices.recovery; +package org.opensearch.indices.replication.common; import org.opensearch.action.ActionListener; import org.opensearch.common.Nullable; @@ -45,11 +45,11 @@ import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; /** - * Tracks recovery requests + * Tracks replication requests * * @opensearch.internal */ -public class RecoveryRequestTracker { +public class ReplicationRequestTracker { private final Map> ongoingRequests = Collections.synchronizedMap(new HashMap<>()); private final LocalCheckpointTracker checkpointTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java new file mode 100644 index 0000000000000..7942fa8938dd0 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +/** + * Represents a state object used to track copying of segments from an external source + * + * @opensearch.internal + */ +public interface ReplicationState { + +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java new file mode 100644 index 0000000000000..0192270907fd2 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -0,0 +1,175 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.AbstractRefCounted; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Represents the target of a replication operation performed on a shard + * + * @opensearch.internal + */ +public abstract class ReplicationTarget extends AbstractRefCounted { + + private static final AtomicLong ID_GENERATOR = new AtomicLong(); + + // last time the target/status was accessed + private volatile long lastAccessTime = System.nanoTime(); + private final ReplicationRequestTracker requestTracker = new ReplicationRequestTracker(); + private final long id; + + protected final AtomicBoolean finished = new AtomicBoolean(); + private final ShardId shardId; + protected final IndexShard indexShard; + protected final ReplicationListener listener; + protected final Logger logger; + protected final CancellableThreads cancellableThreads; + protected final ReplicationLuceneIndex stateIndex; + + protected abstract String getPrefix(); + + protected abstract void onDone(); + + protected abstract void onCancel(String reason); + + public abstract ReplicationState state(); + + public abstract ReplicationTarget retryCopy(); + + public abstract String description(); + + public ReplicationListener getListener() { + return listener; + } + + public CancellableThreads cancellableThreads() { + return cancellableThreads; + } + + public abstract void notifyListener(Exception e, boolean sendShardFailure); + + public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) { + super(name); + this.logger = Loggers.getLogger(getClass(), indexShard.shardId()); + this.listener = listener; + this.id = ID_GENERATOR.incrementAndGet(); + this.stateIndex = stateIndex; + this.indexShard = indexShard; + this.shardId = indexShard.shardId(); + // make sure the store is not released until we are done. + this.cancellableThreads = new CancellableThreads(); + } + + public long getId() { + return id; + } + + public abstract boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException; + + /** + * return the last time this ReplicationStatus was used (based on System.nanoTime() + */ + public long lastAccessTime() { + return lastAccessTime; + } + + /** + * sets the lasAccessTime flag to now + */ + public void setLastAccessTime() { + lastAccessTime = System.nanoTime(); + } + + public ActionListener markRequestReceivedAndCreateListener(long requestSeqNo, ActionListener listener) { + return requestTracker.markReceivedAndCreateListener(requestSeqNo, listener); + } + + public IndexShard indexShard() { + ensureRefCount(); + return indexShard; + } + + public ShardId shardId() { + return shardId; + } + + /** + * mark the current replication as done + */ + public void markAsDone() { + if (finished.compareAndSet(false, true)) { + try { + onDone(); + } finally { + // release the initial reference. replication files will be cleaned as soon as ref count goes to zero, potentially now + decRef(); + } + listener.onDone(state()); + } + } + + /** + * cancel the replication. calling this method will clean temporary files and release the store + * unless this object is in use (in which case it will be cleaned once all ongoing users call + * {@link #decRef()} + */ + public void cancel(String reason) { + if (finished.compareAndSet(false, true)) { + try { + logger.debug("replication cancelled (reason: [{}])", reason); + onCancel(reason); + } finally { + // release the initial reference. replication files will be cleaned as soon as ref count goes to zero, potentially now + decRef(); + } + } + } + + /** + * fail the replication and call listener + * + * @param e exception that encapsulates the failure + * @param sendShardFailure indicates whether to notify the master of the shard failure + */ + public void fail(OpenSearchException e, boolean sendShardFailure) { + if (finished.compareAndSet(false, true)) { + try { + notifyListener(e, sendShardFailure); + } finally { + try { + cancellableThreads.cancel("failed" + description() + "[" + ExceptionsHelper.stackTrace(e) + "]"); + } finally { + // release the initial reference. replication files will be cleaned as soon as ref count goes to zero, potentially now + decRef(); + } + } + } + } + + protected void ensureRefCount() { + if (refCount() <= 0) { + throw new OpenSearchException( + "ReplicationTarget is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls" + ); + } + } + +} diff --git a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java index add2ecd34e3af..509d1f52daa0d 100644 --- a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java @@ -69,9 +69,9 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; -import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; +import org.opensearch.indices.replication.common.ReplicationListener; import java.io.IOException; import java.util.ArrayList; @@ -809,7 +809,7 @@ public BlockingTarget( CountDownLatch releaseRecovery, IndexShard shard, DiscoveryNode sourceNode, - PeerRecoveryTargetService.RecoveryListener listener, + ReplicationListener listener, Logger logger ) { super(shard, sourceNode, listener); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 9d83071c177f5..97cb1dc341b13 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -32,6 +32,7 @@ package org.opensearch.indices.cluster; +import org.junit.Before; import org.opensearch.action.ActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; @@ -56,10 +57,10 @@ import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices; import org.opensearch.indices.cluster.IndicesClusterStateService.Shard; import org.opensearch.indices.recovery.PeerRecoveryTargetService; +import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchTestCase; -import org.junit.Before; import java.io.IOException; import java.util.HashMap; @@ -73,9 +74,9 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; -import static org.opensearch.common.collect.MapBuilder.newMapBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.opensearch.common.collect.MapBuilder.newMapBuilder; /** * Abstract base class for tests against {@link IndicesClusterStateService} @@ -253,7 +254,7 @@ public MockIndexService indexService(Index index) { public MockIndexShard createShard( final ShardRouting shardRouting, final PeerRecoveryTargetService recoveryTargetService, - final PeerRecoveryTargetService.RecoveryListener recoveryListener, + final RecoveryListener recoveryListener, final RepositoriesService repositoriesService, final Consumer onShardFailure, final Consumer globalCheckpointSyncer, diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 5e09e0f2253df..5224a54a35e96 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -41,6 +41,7 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.bulk.BulkShardRequest; @@ -68,6 +69,8 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; import java.io.IOException; import java.util.HashMap; @@ -448,20 +451,17 @@ public long addDocument(Iterable doc) throws IOExcepti IndexShard replica = group.addReplica(); expectThrows( Exception.class, - () -> group.recoverReplica( - replica, - (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, new PeerRecoveryTargetService.RecoveryListener() { - @Override - public void onRecoveryDone(RecoveryState state) { - throw new AssertionError("recovery must fail"); - } + () -> group.recoverReplica(replica, (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + throw new AssertionError("recovery must fail"); + } - @Override - public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { - assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated")); - } - }) - ) + @Override + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated")); + } + })) ); expectThrows(AlreadyClosedException.class, () -> replica.refresh("test")); group.removeReplica(replica); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryRequestTrackerTests.java b/server/src/test/java/org/opensearch/indices/recovery/ReplicationRequestTrackerTests.java similarity index 95% rename from server/src/test/java/org/opensearch/indices/recovery/RecoveryRequestTrackerTests.java rename to server/src/test/java/org/opensearch/indices/recovery/ReplicationRequestTrackerTests.java index 931d36f587db8..afad385deabe4 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryRequestTrackerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/ReplicationRequestTrackerTests.java @@ -36,6 +36,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.indices.replication.common.ReplicationRequestTracker; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -44,7 +45,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; -public class RecoveryRequestTrackerTests extends OpenSearchTestCase { +public class ReplicationRequestTrackerTests extends OpenSearchTestCase { private TestThreadPool threadPool; @@ -64,7 +65,7 @@ public void testIdempotencyIsEnforced() { Set seqNosReturned = ConcurrentCollections.newConcurrentSet(); ConcurrentMap>> seqToResult = ConcurrentCollections.newConcurrentMap(); - RecoveryRequestTracker requestTracker = new RecoveryRequestTracker(); + ReplicationRequestTracker requestTracker = new ReplicationRequestTracker(); int numberOfRequests = randomIntBetween(100, 200); for (int j = 0; j < numberOfRequests; ++j) { diff --git a/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java similarity index 65% rename from server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java rename to server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java index 6a08f5115d1e2..7587f48503625 100644 --- a/server/src/test/java/org/opensearch/recovery/RecoveriesCollectionTests.java +++ b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java @@ -38,10 +38,10 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; -import org.opensearch.indices.recovery.RecoveriesCollection; -import org.opensearch.indices.recovery.RecoveryFailedException; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryTarget; import java.util.concurrent.CountDownLatch; @@ -51,64 +51,58 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; -public class RecoveriesCollectionTests extends OpenSearchIndexLevelReplicationTestCase { - static final PeerRecoveryTargetService.RecoveryListener listener = new PeerRecoveryTargetService.RecoveryListener() { +public class ReplicationCollectionTests extends OpenSearchIndexLevelReplicationTestCase { + static final ReplicationListener listener = new ReplicationListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onDone(ReplicationState state) { } @Override - public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { } }; public void testLastAccessTimeUpdate() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final ReplicationCollection collection = new ReplicationCollection<>(logger, threadPool); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); - try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) { + try (ReplicationCollection.ReplicationRef status = collection.get(recoveryId)) { final long lastSeenTime = status.get().lastAccessTime(); assertBusy(() -> { - try (RecoveriesCollection.RecoveryRef currentStatus = collection.getRecovery(recoveryId)) { + try (ReplicationCollection.ReplicationRef currentStatus = collection.get(recoveryId)) { assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.get().lastAccessTime())); } }); } finally { - collection.cancelRecovery(recoveryId, "life"); + collection.cancel(recoveryId, "life"); } } } public void testRecoveryTimeout() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final ReplicationCollection collection = new ReplicationCollection<>(logger, threadPool); final AtomicBoolean failed = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); - final long recoveryId = startRecovery( - collection, - shards.getPrimaryNode(), - shards.addReplica(), - new PeerRecoveryTargetService.RecoveryListener() { - @Override - public void onRecoveryDone(RecoveryState state) { - latch.countDown(); - } - - @Override - public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { - failed.set(true); - latch.countDown(); - } - }, - TimeValue.timeValueMillis(100) - ); + final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica(), new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + latch.countDown(); + } + + @Override + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + failed.set(true); + latch.countDown(); + } + }, TimeValue.timeValueMillis(100)); try { latch.await(30, TimeUnit.SECONDS); assertTrue("recovery failed to timeout", failed.get()); } finally { - collection.cancelRecovery(recoveryId, "meh"); + collection.cancel(recoveryId, "meh"); } } @@ -116,16 +110,16 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo public void testRecoveryCancellation() throws Exception { try (ReplicationGroup shards = createGroup(0)) { - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final ReplicationCollection collection = new ReplicationCollection<>(logger, threadPool); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); - try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) { - ShardId shardId = recoveryRef.get().shardId(); - assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test")); + try (ReplicationCollection.ReplicationRef recoveryRef = collection.get(recoveryId)) { + ShardId shardId = recoveryRef.get().indexShard().shardId(); + assertTrue("failed to cancel recoveries", collection.cancelForShard(shardId, "test")); assertThat("all recoveries should be cancelled", collection.size(), equalTo(0)); } finally { - collection.cancelRecovery(recoveryId, "meh"); - collection.cancelRecovery(recoveryId2, "meh"); + collection.cancel(recoveryId, "meh"); + collection.cancel(recoveryId2, "meh"); } } } @@ -135,17 +129,17 @@ public void testResetRecovery() throws Exception { shards.startAll(); int numDocs = randomIntBetween(1, 15); shards.indexDocs(numDocs); - final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool); + final ReplicationCollection collection = new ReplicationCollection<>(logger, threadPool); IndexShard shard = shards.addReplica(); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard); - RecoveryTarget recoveryTarget = collection.getRecoveryTarget(recoveryId); + RecoveryTarget recoveryTarget = collection.getTarget(recoveryId); final int currentAsTarget = shard.recoveryStats().currentAsTarget(); final int referencesToStore = recoveryTarget.store().refCount(); IndexShard indexShard = recoveryTarget.indexShard(); Store store = recoveryTarget.store(); String tempFileName = recoveryTarget.getTempNameForFile("foobar"); - RecoveryTarget resetRecovery = collection.resetRecovery(recoveryId, TimeValue.timeValueMinutes(60)); - final long resetRecoveryId = resetRecovery.recoveryId(); + RecoveryTarget resetRecovery = collection.reset(recoveryId, TimeValue.timeValueMinutes(60)); + final long resetRecoveryId = resetRecovery.getId(); assertNotSame(recoveryTarget, resetRecovery); assertNotSame(recoveryTarget.cancellableThreads(), resetRecovery.cancellableThreads()); assertSame(indexShard, resetRecovery.indexShard()); @@ -158,31 +152,31 @@ public void testResetRecovery() throws Exception { String resetTempFileName = resetRecovery.getTempNameForFile("foobar"); assertNotEquals(tempFileName, resetTempFileName); assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget()); - try (RecoveriesCollection.RecoveryRef newRecoveryRef = collection.getRecovery(resetRecoveryId)) { + try (ReplicationCollection.ReplicationRef newRecoveryRef = collection.get(resetRecoveryId)) { shards.recoverReplica(shard, (s, n) -> { assertSame(s, newRecoveryRef.get().indexShard()); return newRecoveryRef.get(); }, false); } shards.assertAllEqual(numDocs); - assertNull("recovery is done", collection.getRecovery(recoveryId)); + assertNull("recovery is done", collection.get(recoveryId)); } } - long startRecovery(RecoveriesCollection collection, DiscoveryNode sourceNode, IndexShard shard) { + long startRecovery(ReplicationCollection collection, DiscoveryNode sourceNode, IndexShard shard) { return startRecovery(collection, sourceNode, shard, listener, TimeValue.timeValueMinutes(60)); } long startRecovery( - RecoveriesCollection collection, + ReplicationCollection collection, DiscoveryNode sourceNode, IndexShard indexShard, - PeerRecoveryTargetService.RecoveryListener listener, + ReplicationListener listener, TimeValue timeValue ) { final DiscoveryNode rNode = getDiscoveryNode(indexShard.routingEntry().currentNodeId()); indexShard.markAsRecovering("remote", new RecoveryState(indexShard.routingEntry(), sourceNode, rNode)); indexShard.prepareForIndexRecovery(); - return collection.startRecovery(indexShard, sourceNode, listener, timeValue); + return collection.start(new RecoveryTarget(indexShard, sourceNode, listener), timeValue); } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 509edfd1b9103..298fdcaea6465 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.store.Directory; +import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; @@ -93,6 +94,8 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.recovery.StartRecoveryRequest; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.OpenSearchBlobStoreRepositoryIntegTestCase; @@ -138,14 +141,14 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase { } }; - protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() { + protected static final ReplicationListener recoveryListener = new ReplicationListener() { @Override - public void onRecoveryDone(RecoveryState state) { + public void onDone(ReplicationState state) { } @Override - public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { throw new AssertionError(e); } };