From 4e9b5549486f53fc95d6e40299c87ed3f581adbe Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 11 Jun 2018 09:09:23 +0200 Subject: [PATCH] Don't swallow exceptions on replication (#31179) Swallowing these exceptions is dangerous as they can result in replicas going out-of-sync with the primary. Follow-up to #28571 --- .../action/bulk/TransportShardBulkAction.java | 44 ++++++++----------- .../TransportResyncReplicationAction.java | 17 +++---- .../replication/TransportWriteAction.java | 6 +-- 3 files changed, 24 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 7201b878f9800..72246ec8539fd 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -486,32 +486,24 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index BulkItemRequest item = request.items()[i]; final Engine.Result operationResult; DocWriteRequest docWriteRequest = item.request(); - try { - switch (replicaItemExecutionMode(item, i)) { - case NORMAL: - final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); - operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica); - assert operationResult != null : "operation result must never be null when primary response has no failure"; - location = syncOperationResultOrThrow(operationResult, location); - break; - case NOOP: - break; - case FAILURE: - final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); - assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned"; - operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage()); - assert operationResult != null : "operation result must never be null when primary response has no failure"; - location = syncOperationResultOrThrow(operationResult, location); - break; - default: - throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest); - } - } catch (Exception e) { - // if its not an ignore replica failure, we need to make sure to bubble up the failure - // so we will fail the shard - if (!TransportActions.isShardNotAvailableException(e)) { - throw e; - } + switch (replicaItemExecutionMode(item, i)) { + case NORMAL: + final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); + operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica); + assert operationResult != null : "operation result must never be null when primary response has no failure"; + location = syncOperationResultOrThrow(operationResult, location); + break; + case NOOP: + break; + case FAILURE: + final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); + assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned"; + operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage()); + assert operationResult != null : "operation result must never be null when primary response has no failure"; + location = syncOperationResultOrThrow(operationResult, location); + break; + default: + throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest); } } return location; diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 78c1e835d4087..f8ad58b9cac8c 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -121,19 +121,12 @@ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest re public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception { Translog.Location location = null; for (Translog.Operation operation : request.getOperations()) { - try { - final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA); - if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(), - "Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate()); - } - location = syncOperationResultOrThrow(operationResult, location); - } catch (Exception e) { - // if its not a failure to be ignored, let it bubble up - if (!TransportActions.isShardNotAvailableException(e)) { - throw e; - } + final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA); + if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + throw new TransportReplicationAction.RetryOnReplicaException(replica.shardId(), + "Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate()); } + location = syncOperationResultOrThrow(operationResult, location); } if (request.getTrimAboveSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { replica.trimOperationOfPreviousPrimaryTerms(request.getTrimAboveSeqNo()); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index b14fd156b735d..ca91a32a17a3a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -76,11 +76,7 @@ protected static Location syncOperationResultOrThrow(final Engine.Result operati // check if any transient write operation failures should be bubbled up Exception failure = operationResult.getFailure(); assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure; - if (!TransportActions.isShardNotAvailableException(failure)) { - throw failure; - } else { - location = currentLocation; - } + throw failure; } else { location = locationToSync(currentLocation, operationResult.getTranslogLocation()); }