Skip to content

Commit

Permalink
Extract method for dynamic mapping update path in TransportShardBulkA…
Browse files Browse the repository at this point in the history
…ction (elastic#109961)

Move this complicated and relatively rarely executed logic to a separate method
to enable further refactoring and improve readability.
  • Loading branch information
original-brownbear authored Jun 21, 2024
1 parent fa2f928 commit d080ca8
Showing 1 changed file with 66 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -379,64 +378,79 @@ static boolean executeBulkItemRequest(
request.getAutoGeneratedTimestamp(),
request.isRetry()
);

if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
return handleMappingUpdateRequired(
context,
mappingUpdater,
waitForMappingUpdate,
itemDoneListener,
primary,
result,
version,
updateResult
);
}
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

try {
Optional<CompressedXContent> mergedSource = Optional.ofNullable(
primary.mapperService()
.merge(
MapperService.SINGLE_MAPPING_NAME,
new CompressedXContent(result.getRequiredMappingUpdate()),
MapperService.MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT
)
).map(DocumentMapper::mappingSource);
Optional<CompressedXContent> previousSource = Optional.ofNullable(primary.mapperService().documentMapper())
.map(DocumentMapper::mappingSource);
onComplete(result, context, updateResult);
return true;
}

if (mergedSource.equals(previousSource)) {
context.resetForNoopMappingUpdateRetry(primary.mapperService().mappingVersion());
return true;
}
} catch (Exception e) {
logger.info(() -> format("%s mapping update rejected by primary", primary.shardId()), e);
assert result.getId() != null;
onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult);
private static boolean handleMappingUpdateRequired(
BulkPrimaryExecutionContext context,
MappingUpdatePerformer mappingUpdater,
Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<Void> itemDoneListener,
IndexShard primary,
Engine.Result result,
long version,
UpdateHelper.Result updateResult
) {
final var mapperService = primary.mapperService();
try {
CompressedXContent mergedSource = mapperService.merge(
MapperService.SINGLE_MAPPING_NAME,
new CompressedXContent(result.getRequiredMappingUpdate()),
MapperService.MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT
).mappingSource();
final DocumentMapper existingDocumentMapper = mapperService.documentMapper();
if (existingDocumentMapper != null && mergedSource.equals(existingDocumentMapper.mappingSource())) {
context.resetForNoopMappingUpdateRetry(mapperService.mappingVersion());
return true;
}
} catch (Exception e) {
logger.info(() -> format("%s mapping update rejected by primary", primary.shardId()), e);
assert result.getId() != null;
onComplete(exceptionToResult(e, primary, false, version, result.getId()), context, updateResult);
return true;
}

mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), new ActionListener<>() {
@Override
public void onResponse(Void v) {
context.markAsRequiringMappingUpdate();
waitForMappingUpdate.accept(ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Void v) {
assert context.requiresWaitingForMappingUpdate();
context.resetForMappingUpdateRetry();
}
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), new ActionListener<>() {
@Override
public void onResponse(Void v) {
context.markAsRequiringMappingUpdate();
waitForMappingUpdate.accept(ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Void v) {
assert context.requiresWaitingForMappingUpdate();
context.resetForMappingUpdateRetry();
}

@Override
public void onFailure(Exception e) {
context.failOnMappingUpdate(e);
}
}, () -> itemDoneListener.onResponse(null)));
}
@Override
public void onFailure(Exception e) {
context.failOnMappingUpdate(e);
}
}, () -> itemDoneListener.onResponse(null)));
}

@Override
public void onFailure(Exception e) {
onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult);
// Requesting mapping update failed, so we don't have to wait for a cluster state update
assert context.isInitial();
itemDoneListener.onResponse(null);
}
});
return false;
} else {
onComplete(result, context, updateResult);
}
return true;
@Override
public void onFailure(Exception e) {
onComplete(exceptionToResult(e, primary, false, version, result.getId()), context, updateResult);
// Requesting mapping update failed, so we don't have to wait for a cluster state update
assert context.isInitial();
itemDoneListener.onResponse(null);
}
});
return false;
}

/**
Expand Down

0 comments on commit d080ca8

Please sign in to comment.