diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index 5e771ae2017f..602734f43b48 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -124,7 +124,7 @@ public Flux decode(Publisher input, ResolvableType elementTy chunks.clear(); return Mono.just(lastBuffer); })) - .doOnTerminate(chunks::releaseAndClear) + .doFinally(signalType -> chunks.releaseAndClear()) .doOnDiscard(DataBuffer.class, DataBufferUtils::release) .map(buffer -> decode(buffer, elementType, mimeType, hints)); } diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index ade19bdfd46e..e5593a7be833 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,15 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; @@ -75,14 +78,15 @@ public void decode() { String s = String.format("%s\n%s\n%s", u, e, o); Flux input = toDataBuffers(s, 1, UTF_8); - // TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty - // see https://github.com/reactor/reactor-core/issues/2041 + testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); + } -// testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); -// testDecodeCancel(input, TYPE, null, null); -// testDecodeEmpty(TYPE, null, null); + @Test // gh-30299 + public void decodeAndCancelWithPendingChunks() { + Flux input = toDataBuffers("abc", 1, UTF_8).concatWith(Flux.never()); + Flux result = this.decoder.decode(input, TYPE, null, null); - testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); + StepVerifier.create(result).thenAwait(Duration.ofMillis(100)).thenCancel().verify(); } @Test @@ -264,4 +268,13 @@ private DataBuffer stringBuffer(String value) { return buffer; } + + private static class SingleRequestSubscriber extends BaseSubscriber { + + @Override + protected void hookOnSubscribe(Subscription subscription) { + subscription.request(1); + } + } + }