Skip to content

Commit

Permalink
Fix handling of @id property in Java records.
Browse files Browse the repository at this point in the history
  • Loading branch information
sothawo committed Nov 8, 2023
1 parent 9abcacb commit de13e06
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,16 @@ public String doIndex(IndexQuery query, IndexCoordinates indexCoordinates) {
Object queryObject = query.getObject();

if (queryObject != null) {
query.setObject(updateIndexedObject(queryObject, new IndexedObjectInformation(indexResponse.id(),
indexResponse.index(), indexResponse.seqNo(), indexResponse.primaryTerm(), indexResponse.version())));
query.setObject(entityOperations.updateIndexedObject(
queryObject,
new IndexedObjectInformation(
indexResponse.id(),
indexResponse.index(),
indexResponse.seqNo(),
indexResponse.primaryTerm(),
indexResponse.version()),
elasticsearchConverter,
routingResolver));
}

return indexResponse.id();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,16 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli
.flatMap(indexAndResponse -> {
T savedEntity = entities.entityAt(indexAndResponse.getT1());
BulkResponseItem response = indexAndResponse.getT2();
updateIndexedObject(savedEntity, new IndexedObjectInformation( //
response.id(), //
response.index(), //
response.seqNo(), //
response.primaryTerm(), //
response.version()));
return maybeCallbackAfterSave(savedEntity, index);
var updatedEntity = entityOperations.updateIndexedObject(
savedEntity, new IndexedObjectInformation( //
response.id(), //
response.index(), //
response.seqNo(), //
response.primaryTerm(), //
response.version()),
converter,
routingResolver);
return maybeCallbackAfterSave(updatedEntity, index);
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
import org.springframework.data.elasticsearch.core.script.Script;
import org.springframework.data.elasticsearch.support.VersionInfo;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.mapping.callback.EntityCallbacks;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.util.Streamable;
Expand Down Expand Up @@ -240,7 +239,11 @@ public <T> Iterable<T> save(Iterable<T> entities, IndexCoordinates index) {
// noinspection unchecked
return indexQueries.stream() //
.map(IndexQuery::getObject) //
.map(entity -> (T) updateIndexedObject(entity, iterator.next())) //
.map(entity -> (T) entityOperations.updateIndexedObject(
entity,
iterator.next(),
elasticsearchConverter,
routingResolver)) //
.collect(Collectors.toList()); //
}

Expand Down Expand Up @@ -397,47 +400,6 @@ protected <T> UpdateQuery buildUpdateQueryByEntity(T entity) {

return updateQueryBuilder.build();
}

protected <T> T updateIndexedObject(T entity, IndexedObjectInformation indexedObjectInformation) {

ElasticsearchPersistentEntity<?> persistentEntity = elasticsearchConverter.getMappingContext()
.getPersistentEntity(entity.getClass());

if (persistentEntity != null) {
PersistentPropertyAccessor<Object> propertyAccessor = persistentEntity.getPropertyAccessor(entity);
ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty();

// Only deal with text because ES generated Ids are strings!
if (indexedObjectInformation.id() != null && idProperty != null && idProperty.isReadable()
&& idProperty.getType().isAssignableFrom(String.class)) {
propertyAccessor.setProperty(idProperty, indexedObjectInformation.id());
}

if (indexedObjectInformation.seqNo() != null && indexedObjectInformation.primaryTerm() != null
&& persistentEntity.hasSeqNoPrimaryTermProperty()) {
ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty();
// noinspection ConstantConditions
propertyAccessor.setProperty(seqNoPrimaryTermProperty,
new SeqNoPrimaryTerm(indexedObjectInformation.seqNo(), indexedObjectInformation.primaryTerm()));
}

if (indexedObjectInformation.version() != null && persistentEntity.hasVersionProperty()) {
ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty();
// noinspection ConstantConditions
propertyAccessor.setProperty(versionProperty, indexedObjectInformation.version());
}

var indexedIndexNameProperty = persistentEntity.getIndexedIndexNameProperty();
if (indexedIndexNameProperty != null) {
propertyAccessor.setProperty(indexedIndexNameProperty, indexedObjectInformation.index());
}

// noinspection unchecked
return (T) propertyAccessor.getBean();
}
return entity;
}

// endregion

// region SearchOperations
Expand Down Expand Up @@ -736,7 +698,11 @@ protected void updateIndexedObjectsWithQueries(List<?> queries,
Object queryObject = indexQuery.getObject();

if (queryObject != null) {
indexQuery.setObject(updateIndexedObject(queryObject, indexedObjectInformationList.get(i)));
indexQuery.setObject(entityOperations.updateIndexedObject(
queryObject,
indexedObjectInformationList.get(i),
elasticsearchConverter,
routingResolver));
}
}
}
Expand Down Expand Up @@ -802,7 +768,11 @@ public T doWith(@Nullable Document document) {
documentAfterLoad.hasSeqNo() ? documentAfterLoad.getSeqNo() : null, //
documentAfterLoad.hasPrimaryTerm() ? documentAfterLoad.getPrimaryTerm() : null, //
documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null); //
entity = updateIndexedObject(entity, indexedObjectInformation);
entity = entityOperations.updateIndexedObject(
entity,
indexedObjectInformation,
elasticsearchConverter,
routingResolver);

return maybeCallbackAfterConvert(entity, documentAfterLoad, index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
Expand All @@ -55,7 +54,6 @@
import org.springframework.data.elasticsearch.core.script.Script;
import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
import org.springframework.data.elasticsearch.support.VersionInfo;
import org.springframework.data.mapping.PersistentPropertyAccessor;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -320,51 +318,6 @@ protected IndexQuery getIndexQuery(Object value) {
return query;
}

protected <T> T updateIndexedObject(T entity, IndexedObjectInformation indexedObjectInformation) {

ElasticsearchPersistentEntity<?> persistentEntity = converter.getMappingContext()
.getPersistentEntity(entity.getClass());

if (persistentEntity != null) {
// noinspection DuplicatedCode
PersistentPropertyAccessor<Object> propertyAccessor = persistentEntity.getPropertyAccessor(entity);
ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty();

// Only deal with text because ES generated Ids are strings!
if (indexedObjectInformation.id() != null && idProperty != null && idProperty.isReadable()
&& idProperty.getType().isAssignableFrom(String.class)) {
propertyAccessor.setProperty(idProperty, indexedObjectInformation.id());
}

if (indexedObjectInformation.seqNo() != null && indexedObjectInformation.primaryTerm() != null
&& persistentEntity.hasSeqNoPrimaryTermProperty()) {
ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty();
// noinspection ConstantConditions
propertyAccessor.setProperty(seqNoPrimaryTermProperty,
new SeqNoPrimaryTerm(indexedObjectInformation.seqNo(), indexedObjectInformation.primaryTerm()));
}

if (indexedObjectInformation.version() != null && persistentEntity.hasVersionProperty()) {
ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty();
// noinspection ConstantConditions
propertyAccessor.setProperty(versionProperty, indexedObjectInformation.version());
}

var indexedIndexNameProperty = persistentEntity.getIndexedIndexNameProperty();
if (indexedIndexNameProperty != null) {
propertyAccessor.setProperty(indexedIndexNameProperty, indexedObjectInformation.index());
}

// noinspection unchecked
return (T) propertyAccessor.getBean();
} else {
EntityOperations.AdaptableEntity<T> adaptableEntity = entityOperations.forEntity(entity,
converter.getConversionService(), routingResolver);
adaptableEntity.populateIdIfNecessary(indexedObjectInformation.id());
}
return entity;
}

@Override
public <T> Flux<MultiGetItem<T>> multiGet(Query query, Class<T> clazz) {
return multiGet(query, clazz, getIndexCoordinatesFor(clazz));
Expand All @@ -391,12 +344,16 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
.map(it -> {
T savedEntity = it.getT1();
IndexResponseMetaData indexResponseMetaData = it.getT2();
return updateIndexedObject(savedEntity, new IndexedObjectInformation(
indexResponseMetaData.id(),
indexResponseMetaData.index(),
indexResponseMetaData.seqNo(),
indexResponseMetaData.primaryTerm(),
indexResponseMetaData.version()));
return entityOperations.updateIndexedObject(
savedEntity,
new IndexedObjectInformation(
indexResponseMetaData.id(),
indexResponseMetaData.index(),
indexResponseMetaData.seqNo(),
indexResponseMetaData.primaryTerm(),
indexResponseMetaData.version()),
converter,
routingResolver);
}).flatMap(saved -> maybeCallbackAfterSave(saved, index));
}

Expand Down Expand Up @@ -652,7 +609,11 @@ public Mono<T> toEntity(@Nullable Document document) {
documentAfterLoad.hasSeqNo() ? documentAfterLoad.getSeqNo() : null,
documentAfterLoad.hasPrimaryTerm() ? documentAfterLoad.getPrimaryTerm() : null,
documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null);
entity = updateIndexedObject(entity, indexedObjectInformation);
entity = entityOperations.updateIndexedObject(
entity,
indexedObjectInformation,
converter,
routingResolver);

return maybeCallbackAfterConvert(entity, documentAfterLoad, index);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;

import org.springframework.core.convert.ConversionService;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.join.JoinField;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
Expand Down Expand Up @@ -94,6 +95,69 @@ public <T> AdaptableEntity<T> forEntity(T entity, ConversionService conversionSe
return AdaptableMappedEntity.of(entity, context, conversionService, routingResolver);
}

/**
* Updates an entity after it is stored in Elasticsearch with additional data like id, version, seqno...
*
* @param <T> the entity class
* @param entity the entity to update
* @param indexedObjectInformation the update information
* @param elasticsearchConverter the converter providing necessary mapping information
* @param routingResolver routing resolver to use
* @return
*/
public <T> T updateIndexedObject(T entity,
IndexedObjectInformation indexedObjectInformation,
ElasticsearchConverter elasticsearchConverter,
RoutingResolver routingResolver) {

Assert.notNull(entity, "entity must not be null");
Assert.notNull(indexedObjectInformation, "indexedObjectInformation must not be null");
Assert.notNull(elasticsearchConverter, "elasticsearchConverter must not be null");

ElasticsearchPersistentEntity<?> persistentEntity = elasticsearchConverter.getMappingContext()
.getPersistentEntity(entity.getClass());

if (persistentEntity != null) {
PersistentPropertyAccessor<Object> propertyAccessor = persistentEntity.getPropertyAccessor(entity);
ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty();

// Only deal with text because ES generated Ids are strings!
if (indexedObjectInformation.id() != null && idProperty != null
// isReadable from the base class is false in case of records
&& (idProperty.isReadable() || idProperty.getOwner().getType().isRecord())
&& idProperty.getType().isAssignableFrom(String.class)) {
propertyAccessor.setProperty(idProperty, indexedObjectInformation.id());
}

if (indexedObjectInformation.seqNo() != null && indexedObjectInformation.primaryTerm() != null
&& persistentEntity.hasSeqNoPrimaryTermProperty()) {
ElasticsearchPersistentProperty seqNoPrimaryTermProperty = persistentEntity.getSeqNoPrimaryTermProperty();
// noinspection ConstantConditions
propertyAccessor.setProperty(seqNoPrimaryTermProperty,
new SeqNoPrimaryTerm(indexedObjectInformation.seqNo(), indexedObjectInformation.primaryTerm()));
}

if (indexedObjectInformation.version() != null && persistentEntity.hasVersionProperty()) {
ElasticsearchPersistentProperty versionProperty = persistentEntity.getVersionProperty();
// noinspection ConstantConditions
propertyAccessor.setProperty(versionProperty, indexedObjectInformation.version());
}

var indexedIndexNameProperty = persistentEntity.getIndexedIndexNameProperty();
if (indexedIndexNameProperty != null) {
propertyAccessor.setProperty(indexedIndexNameProperty, indexedObjectInformation.index());
}

// noinspection unchecked
return (T) propertyAccessor.getBean();
} else {
EntityOperations.AdaptableEntity<T> adaptableEntity = forEntity(entity,
elasticsearchConverter.getConversionService(), routingResolver);
adaptableEntity.populateIdIfNecessary(indexedObjectInformation.id());
}
return entity;
}

/**
* Determine index name and type name from {@link Entity} with {@code index} and {@code type} overrides. Allows using
* preferred values for index and type if provided, otherwise fall back to index and type defined on entity level.
Expand Down
Loading

0 comments on commit de13e06

Please sign in to comment.