Skip to content

Commit

Permalink
Fix reactive save of Flux.
Browse files Browse the repository at this point in the history
Original Pull Request #2581
Closes #2576
  • Loading branch information
sothawo authored Jun 2, 2023
1 parent 11fc225 commit d6b5540
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,42 +225,42 @@ public <T> Flux<T> save(Flux<T> entities, IndexCoordinates index, int bulkSize)

return Flux.defer(() -> {
Sinks.Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
entities //
.bufferTimeout(bulkSize, Duration.ofMillis(200)) //
.subscribe(new Subscriber<List<T>>() {
private Subscription subscription;
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(List<T> entityList) {
saveAll(entityList, index) //
.map(sink::tryEmitNext) //
.doOnComplete(() -> {
if (!upstreamComplete.get()) {
subscription.request(1);
} else {
sink.tryEmitComplete();
}
}).subscribe();
}

@Override
public void onError(Throwable throwable) {
subscription.cancel();
sink.tryEmitError(throwable);
}

@Override
public void onComplete() {
upstreamComplete.set(true);
}
});
entities.window(bulkSize) //
.concatMap(flux -> flux.collectList()) //
.subscribe(new Subscriber<List<T>>() {
private Subscription subscription;
private AtomicBoolean upstreamComplete = new AtomicBoolean(false);

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(List<T> entityList) {
saveAll(entityList, index) //
.map(sink::tryEmitNext) //
.doOnComplete(() -> {
if (!upstreamComplete.get()) {
subscription.request(1);
} else {
sink.tryEmitComplete();
}
}).subscribe();
}

@Override
public void onError(Throwable throwable) {
subscription.cancel();
sink.tryEmitError(throwable);
}

@Override
public void onComplete() {
upstreamComplete.set(true);
}
});
return sink.asFlux();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.lang.Integer;
import java.lang.Long;
import java.lang.Object;
import java.time.Duration;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
Expand Down Expand Up @@ -1171,7 +1172,7 @@ void shouldWorkWithReadonlyId() {
}).verifyComplete();
}

@Test // #2496
@Test // #2496, #2576
@DisplayName("should save data from Flux and return saved data in a flux")
void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {

Expand All @@ -1180,9 +1181,11 @@ void shouldSaveDataFromFluxAndReturnSavedDataInAFlux() {
.mapToObj(SampleEntity::of) //
.collect(Collectors.toList());

var entityFlux = Flux.fromIterable(entityList);
// we add a random delay to make suure the underlying implementation handles irregular incoming data
var entities = Flux.fromIterable(entityList).concatMap(
entity -> Mono.just(entity).delay(Duration.ofMillis((long) (Math.random() * 10))).thenReturn(entity));

operations.save(entityFlux, SampleEntity.class).collectList() //
operations.save(entities, SampleEntity.class).collectList() //
.as(StepVerifier::create) //
.consumeNextWith(savedEntities -> {
assertThat(savedEntities).isEqualTo(entityList);
Expand Down

0 comments on commit d6b5540

Please sign in to comment.