Skip to content

Commit

Permalink
Use correct overload of Flux.bufferTimeout().
Browse files Browse the repository at this point in the history
Original Pull Request #2753
Closes #2607
  • Loading branch information
sothawo authored Nov 3, 2023
1 parent d281df7 commit 3b93b6a
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package org.springframework.data.elasticsearch.core;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
Expand Down Expand Up @@ -73,9 +71,6 @@
abstract public class AbstractReactiveElasticsearchTemplate
implements ReactiveElasticsearchOperations, ApplicationContextAware {

protected static final Log QUERY_LOGGER = LogFactory
.getLog("org.springframework.data.elasticsearch.core.QUERY");

protected final ElasticsearchConverter converter;
protected final SimpleElasticsearchMappingContext mappingContext;
protected final EntityOperations entityOperations;
Expand Down Expand Up @@ -175,11 +170,12 @@ public void setEntityCallbacks(ReactiveEntityCallbacks entityCallbacks) {
* @return a Mono signalling finished execution
* @since 4.3
*/
@SuppressWarnings("unused")
public Mono<Void> logVersions() {

return getVendor() //
.zipWith(getRuntimeLibraryVersion()) //
.zipWith(getClusterVersion()) //
return getVendor()
.zipWith(getRuntimeLibraryVersion())
.zipWith(getClusterVersion())
.doOnNext(objects -> VersionInfo.logVersions(objects.getT1().getT1(), objects.getT1().getT2(), objects.getT2()))
.then();
}
Expand Down Expand Up @@ -233,42 +229,48 @@ public <T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize)

return Flux.defer(() -> {
Sinks.Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
entities.window(bulkSize) //
.concatMap(flux -> flux.collectList()) //
.subscribe(new Subscriber<List<T>>() {
private Subscription subscription;
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(List<T> entityList) {
saveAll(entityList, index) //
.map(sink::tryEmitNext) //
.doOnComplete(() -> {
if (!upstreamComplete.get()) {
subscription.request(1);
} else {
sink.tryEmitComplete();
}
}).subscribe();
}

@Override
public void onError(Throwable throwable) {
subscription.cancel();
sink.tryEmitError(throwable);
}

@Override
public void onComplete() {
upstreamComplete.set(true);
}
});
// noinspection ReactiveStreamsSubscriberImplementation
entities
.bufferTimeout(bulkSize, Duration.ofMillis(200), true)
.subscribe(new Subscriber<>() {
@Nullable private Subscription subscription = null;
private final AtomicBoolean upstreamComplete = new AtomicBoolean(false);

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(List<T> entityList) {
saveAll(entityList, index)
.map(sink::tryEmitNext)
.doOnComplete(() -> {
if (!upstreamComplete.get()) {
if (subscription == null) {
throw new IllegalStateException("no subscription");
}
subscription.request(1);
} else {
sink.tryEmitComplete();
}
}).subscribe();
}

@Override
public void onError(Throwable throwable) {
if (subscription != null) {
subscription.cancel();
}
sink.tryEmitError(throwable);
}

@Override
public void onComplete() {
upstreamComplete.set(true);
}
});
return sink.asFlux();
});

Expand Down Expand Up @@ -324,6 +326,7 @@ protected <T> T updateIndexedObject(T entity, IndexedObjectInformation indexedOb
.getPersistentEntity(entity.getClass());

if (persistentEntity != null) {
// noinspection DuplicatedCode
PersistentPropertyAccessor<Object> propertyAccessor = persistentEntity.getPropertyAccessor(entity);
ElasticsearchPersistentProperty idProperty = persistentEntity.getIdProperty();

Expand Down Expand Up @@ -353,8 +356,7 @@ protected <T> T updateIndexedObject(T entity, IndexedObjectInformation indexedOb
}

// noinspection unchecked
T updatedEntity = (T) propertyAccessor.getBean();
return updatedEntity;
return (T) propertyAccessor.getBean();
} else {
EntityOperations.AdaptableEntity<T> adaptableEntity = entityOperations.forEntity(entity,
converter.getConversionService(), routingResolver);
Expand Down Expand Up @@ -385,15 +387,15 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
Assert.notNull(index, "index must not be null");

return maybeCallbackBeforeConvert(entity, index)
.flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index)) //
.flatMap(entityAfterBeforeConversionCallback -> doIndex(entityAfterBeforeConversionCallback, index))
.map(it -> {
T savedEntity = it.getT1();
IndexResponseMetaData indexResponseMetaData = it.getT2();
return updateIndexedObject(savedEntity, new IndexedObjectInformation( //
indexResponseMetaData.id(), //
indexResponseMetaData.index(), //
indexResponseMetaData.seqNo(), //
indexResponseMetaData.primaryTerm(), //
return updateIndexedObject(savedEntity, new IndexedObjectInformation(
indexResponseMetaData.id(),
indexResponseMetaData.index(),
indexResponseMetaData.seqNo(),
indexResponseMetaData.primaryTerm(),
indexResponseMetaData.version()));
}).flatMap(saved -> maybeCallbackAfterSave(saved, index));
}
Expand Down Expand Up @@ -478,12 +480,12 @@ public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, C

SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);

return doFindForResponse(query, entityType, index) //
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
.flatMap(callback::toEntity) //
.collectList() //
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
.mapHits(searchDocumentResponse, entities))) //
return doFindForResponse(query, entityType, index)
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments())
.flatMap(callback::toEntity)
.collectList()
.map(entities -> SearchHitMapping.mappingFor(resultType, converter)
.mapHits(searchDocumentResponse, entities)))
.map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable()));
}

Expand All @@ -503,17 +505,18 @@ public <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entit

SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);

return doFindForResponse(query, entityType, index) //
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
.flatMap(callback::toEntity) //
.collectList() //
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
.mapHits(searchDocumentResponse, entities))) //
return doFindForResponse(query, entityType, index)
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments())
.flatMap(callback::toEntity)
.collectList()
.map(entities -> SearchHitMapping.mappingFor(resultType, converter)
.mapHits(searchDocumentResponse, entities)))
.map(ReactiveSearchHitSupport::searchHitsFor);
}

abstract protected Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index);

@SuppressWarnings("unused")
abstract protected <T> Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz,
IndexCoordinates index);

Expand Down Expand Up @@ -639,17 +642,16 @@ public Mono<T> toEntity(@Nullable Document document) {
return Mono.empty();
}

return maybeCallbackAfterLoad(document, type, index) //
return maybeCallbackAfterLoad(document, type, index)
.flatMap(documentAfterLoad -> {

// noinspection DuplicatedCode
T entity = reader.read(type, documentAfterLoad);

IndexedObjectInformation indexedObjectInformation = new IndexedObjectInformation( //
documentAfterLoad.hasId() ? documentAfterLoad.getId() : null, //
documentAfterLoad.getIndex(), //
documentAfterLoad.hasSeqNo() ? documentAfterLoad.getSeqNo() : null, //
documentAfterLoad.hasPrimaryTerm() ? documentAfterLoad.getPrimaryTerm() : null, //
documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null); //
IndexedObjectInformation indexedObjectInformation = new IndexedObjectInformation(
documentAfterLoad.hasId() ? documentAfterLoad.getId() : null,
documentAfterLoad.getIndex(),
documentAfterLoad.hasSeqNo() ? documentAfterLoad.getSeqNo() : null,
documentAfterLoad.hasPrimaryTerm() ? documentAfterLoad.getPrimaryTerm() : null,
documentAfterLoad.hasVersion() ? documentAfterLoad.getVersion() : null);
entity = updateIndexedObject(entity, indexedObjectInformation);

return maybeCallbackAfterConvert(entity, documentAfterLoad, index);
Expand All @@ -667,16 +669,16 @@ protected interface SearchDocumentCallback<T> {
/**
* converts a {@link SearchDocument} to an entity
*
* @param searchDocument
* @param searchDocument the document to convert
* @return the entity in a MOno
*/
Mono<T> toEntity(SearchDocument searchDocument);

/**
* converts a {@link SearchDocument} into a SearchHit
*
* @param searchDocument
* @return
* @param searchDocument the document to convert
* @return the converted SearchHit
*/
Mono<SearchHit<T>> toSearchHit(SearchDocument searchDocument);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1189,9 +1189,11 @@ void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
.mapToObj(SampleEntity::of) //
.collect(Collectors.toList());

// we add a random delay to make suure the underlying implementation handles irregular incoming data
// we add a random delay to make sure the underlying implementation handles irregular incoming data
var entities = Flux.fromIterable(entityList).concatMap(
entity -> Mono.just(entity).delay(Duration.ofMillis((long) (Math.random() * 10))).thenReturn(entity));
entity -> Mono.just(entity)
.delay(Duration.ofMillis((long) (Math.random() * 10)))
.thenReturn(entity));

operations.save(entities, SampleEntity.class).collectList() //
.as(StepVerifier::create) //
Expand Down

0 comments on commit 3b93b6a

Please sign in to comment.