Skip to content

Commit

Permalink
Use ReplicationFailedException instead of OpensearchException in Repl…
Browse files Browse the repository at this point in the history
…icationTarget (#5955)

* Use ReplicationFailedException instead of OpensearchException in Repl… (#4725)

* Use ReplicationFailedException instead of OpensearchException in ReplicationTarget

Signed-off-by: Ayush Kataria <[email protected]>

* CHANGELOG.md updated

Signed-off-by: Ayush Kataria <[email protected]>

* test fixes

Signed-off-by: Ayush Kataria <[email protected]>

* spotless fix

Signed-off-by: Ayush Kataria <[email protected]>

* spotless fix

Signed-off-by: Ayush Kataria <[email protected]>

* fixes for failing test as suggested in PR comments

Signed-off-by: Ayush Kataria <[email protected]>

Signed-off-by: Ayush Kataria <[email protected]>
Signed-off-by: Suraj Singh <[email protected]>

* Update SegmentReplicationListener to use ReplicationFailedException

Signed-off-by: Suraj Singh <[email protected]>

* Spotless fix

Signed-off-by: Suraj Singh <[email protected]>

Signed-off-by: Ayush Kataria <[email protected]>
Signed-off-by: Suraj Singh <[email protected]>
Co-authored-by: Ayush Kataria <[email protected]>
  • Loading branch information
dreamer-89 and ayushKataria authored Jan 20, 2023
1 parent 23c23dc commit a25c6b7
Show file tree
Hide file tree
Showing 17 changed files with 67 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Dependencies
- Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704))
### Changed
- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725))
### Deprecated
### Removed
### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -88,6 +87,7 @@
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
Expand Down Expand Up @@ -829,7 +829,11 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

package org.opensearch.indices.recovery;

import org.opensearch.OpenSearchException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationFailedException;

import java.io.IOException;

Expand All @@ -45,7 +45,7 @@
*
* @opensearch.internal
*/
public class RecoveryFailedException extends OpenSearchException {
public class RecoveryFailedException extends ReplicationFailedException {

public RecoveryFailedException(StartRecoveryRequest request, Throwable cause) {
this(request, null, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

package org.opensearch.indices.recovery;

import org.opensearch.OpenSearchException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;

Expand Down Expand Up @@ -49,7 +49,7 @@ public void onDone(ReplicationState state) {
}

@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
indicesClusterStateService.handleRecoveryFailure(shardRouting, sendShardFailure, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.opensearch.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -56,10 +55,11 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTarget;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationCollection;

import java.io.IOException;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -135,7 +135,7 @@ public String description() {
}

@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) {
listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.common.UUIDs;
Expand Down Expand Up @@ -105,16 +104,14 @@ public String description() {
}

@Override
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) {
// Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation
// update the stage.
final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class);
if (cancelledException != null) {
state.setStage(SegmentReplicationState.Stage.CANCELLED);
listener.onFailure(state(), (CancellableThreads.ExecutionCancelledException) cancelledException, sendShardFailure);
} else {
listener.onFailure(state(), e, sendShardFailure);
}
listener.onFailure(state(), e, sendShardFailure);
}

@Override
Expand Down Expand Up @@ -150,7 +147,7 @@ public void startReplication(ActionListener<Void> listener) {
// SegmentReplicationSource does not share CancellableThreads.
final CancellableThreads.ExecutionCancelledException executionCancelledException =
new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]");
notifyListener(executionCancelledException, false);
notifyListener(new ReplicationFailedException("Segment replication failed", executionCancelledException), false);
throw executionCancelledException;
});
state.setStage(SegmentReplicationState.Stage.REPLICATING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
Expand All @@ -27,6 +26,7 @@
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationCollection;
import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -196,7 +196,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
Expand Down Expand Up @@ -249,13 +249,13 @@ default void onDone(ReplicationState state) {
}

@Override
default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
default void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
onReplicationFailure((SegmentReplicationState) state, e, sendShardFailure);
}

void onReplicationDone(SegmentReplicationState state);

void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure);
void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}

/**
Expand Down Expand Up @@ -293,13 +293,14 @@ public void onFailure(Exception e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof CancellableThreads.ExecutionCancelledException) {
if (onGoingReplications.getTarget(replicationId) != null) {
IndexShard indexShard = onGoingReplications.getTarget(replicationId).indexShard();
// if the target still exists in our collection, the primary initiated the cancellation, fail the replication
// but do not fail the shard. Cancellations initiated by this node from Index events will be removed with
// onGoingReplications.cancel and not appear in the collection when this listener resolves.
onGoingReplications.fail(replicationId, (CancellableThreads.ExecutionCancelledException) cause, false);
onGoingReplications.fail(replicationId, new ReplicationFailedException(indexShard, cause), false);
}
} else {
onGoingReplications.fail(replicationId, new OpenSearchException("Segment Replication failed", e), true);
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), true);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.common.concurrent.AutoCloseableRefCounted;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -134,7 +132,7 @@ public T reset(final long id, final TimeValue activityTimeout) {
} catch (Exception e) {
// fail shard to be safe
assert oldTarget != null;
oldTarget.notifyListener(new OpenSearchException("Unable to reset target", e), true);
oldTarget.notifyListener(new ReplicationFailedException("Unable to reset target", e), true);
return null;
}
}
Expand Down Expand Up @@ -187,7 +185,7 @@ public boolean cancel(long id, String reason) {
* @param e exception with reason for the failure
* @param sendShardFailure true a shard failed message should be sent to the master
*/
public void fail(long id, OpenSearchException e, boolean sendShardFailure) {
public void fail(long id, ReplicationFailedException e, boolean sendShardFailure) {
T removed = onGoingTargetEvents.remove(id);
if (removed != null) {
logger.trace("failing {}. Send shard failure: [{}]", removed.description(), sendShardFailure);
Expand Down Expand Up @@ -299,7 +297,7 @@ protected void doRun() throws Exception {
String message = "no activity after [" + checkInterval + "]";
fail(
id,
new OpenSearchTimeoutException(message),
new ReplicationFailedException(message),
true // to be safe, we don't know what go stuck
);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,16 @@ public ReplicationFailedException(ShardId shardId, @Nullable String extraInfo, T
public ReplicationFailedException(StreamInput in) throws IOException {
super(in);
}

public ReplicationFailedException(Exception e) {
super(e);
}

public ReplicationFailedException(String msg) {
super(msg);
}

public ReplicationFailedException(String msg, Throwable cause) {
super(msg, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.indices.replication.common;

import org.opensearch.OpenSearchException;

/**
* Interface for listeners that run when there's a change in {@link ReplicationState}
*
Expand All @@ -19,5 +17,5 @@ public interface ReplicationListener {

void onDone(ReplicationState state);

void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure);
void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.RateLimiter;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -78,7 +77,7 @@ public CancellableThreads cancellableThreads() {
return cancellableThreads;
}

public abstract void notifyListener(OpenSearchException e, boolean sendShardFailure);
public abstract void notifyListener(ReplicationFailedException e, boolean sendShardFailure);

public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) {
super(name);
Expand Down Expand Up @@ -170,7 +169,7 @@ public void cancel(String reason) {
* @param e exception that encapsulates the failure
* @param sendShardFailure indicates whether to notify the master of the shard failure
*/
public void fail(OpenSearchException e, boolean sendShardFailure) {
public void fail(ReplicationFailedException e, boolean sendShardFailure) {
if (finished.compareAndSet(false, true)) {
try {
notifyListener(e, sendShardFailure);
Expand All @@ -187,7 +186,7 @@ public void fail(OpenSearchException e, boolean sendShardFailure) {

protected void ensureRefCount() {
if (refCount() <= 0) {
throw new OpenSearchException(
throw new ReplicationFailedException(
"ReplicationTarget is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls"
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.junit.Assert;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -45,6 +44,7 @@
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -790,8 +790,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
assertTrue(e instanceof CancellableThreads.ExecutionCancelledException);
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
assertFalse(sendShardFailure);
assertEquals(SegmentReplicationState.Stage.CANCELLED, state.getStage());
latch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.bulk.BulkShardRequest;
Expand Down Expand Up @@ -70,6 +69,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -471,7 +471,7 @@ public void onDone(ReplicationState state) {
}

@Override
public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated"));
}
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.error("Unexpected error", e);
Assert.fail("Test should succeed");
}
Expand Down Expand Up @@ -149,7 +150,7 @@ public void onReplicationDone(SegmentReplicationState state) {
}

@Override
public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) {
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
// failures leave state object in last entered stage.
assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage());
assertEquals(expectedError, e.getCause());
Expand Down
Loading

0 comments on commit a25c6b7

Please sign in to comment.