Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don’t ack if unable to remove failing replica #39584

Merged
merged 6 commits into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;

public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {
Expand Down Expand Up @@ -130,10 +129,8 @@ class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy {
}

@Override
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted, final Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
Expand Down Expand Up @@ -210,10 +209,9 @@ class ResyncActionReplicasProxy extends ReplicasProxy {
}

@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(
replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportException;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -43,7 +47,6 @@
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
Expand Down Expand Up @@ -133,10 +136,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
);
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
}

Expand Down Expand Up @@ -192,9 +192,8 @@ public void onFailure(Exception replicaException) {
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
replicasProxy.failShardIfNeeded(shard, message, replicaException,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}

@Override
Expand All @@ -204,13 +203,26 @@ public String toString() {
});
}

private void onPrimaryDemoted(Exception demotionFailure) {
String primaryFail = String.format(Locale.ROOT,
"primary shard [%s] was demoted while failing replica shard",
primary.routingEntry());
// we are no longer the primary, fail ourselves and start over
primary.failShard(primaryFail, demotionFailure);
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure));
private void onNoLongerPrimary(Exception failure) {
final boolean nodeIsClosing = failure instanceof NodeClosedException ||
(failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage()));
final String message;
if (nodeIsClosing) {
message = String.format(Locale.ROOT,
"node with primary [%s] is shutting down while failing replica shard", primary.routingEntry());
// We prefer not to fail the primary to avoid unnecessary warning log
// when the node with the primary shard is gracefully shutting down.
} else {
if (Assertions.ENABLED) {
if (failure instanceof ShardStateAction.NoLongerPrimaryShardException == false) {
throw new AssertionError("unexpected failure", failure);
}
}
// we are no longer the primary, fail ourselves and start over
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard", primary.routingEntry());
primary.failShard(message, failure);
}
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
}

/**
Expand Down Expand Up @@ -370,31 +382,23 @@ void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpo
* of active shards. Whether a failure is needed is left up to the
* implementation.
*
* @param replica shard to fail
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
* @param replica shard to fail
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener);

/**
* Marks shard copy as stale if needed, removing its allocation id from
* the set of in-sync allocation ids. Whether marking as stale is needed
* is left up to the implementation.
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set.
* @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted
* by the master.
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
Expand Down Expand Up @@ -1173,47 +1172,21 @@ public void performOn(
}

@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
// This does not need to fail the shard. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to fail.
onSuccess.run();
listener.onResponse(null);
}

@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
// This does not need to make the shard stale. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to be marked as stale.
onSuccess.run();
}

protected final ActionListener<Void> createShardActionListener(final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
onSuccess.run();
}

@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onPrimaryDemoted.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
}
};
listener.onResponse(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -376,20 +375,17 @@ class WriteActionReplicasProxy extends ReplicasProxy {
}

@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
if (TransportActions.isShardNotAvailableException(exception) == false) {
logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
}
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
shardStateAction.remoteShardFailed(
replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, listener);
}

@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
}
}
}
Loading