diff --git a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java index c429767fbfb7..6c1413ab4f81 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java @@ -132,7 +132,8 @@ public Mono write(Publisher inputStream, ResolvableType eleme message.getHeaders().setContentLength(buffer.readableByteCount()); return message.writeWith(Mono.just(buffer) .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); - }); + }) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } if (isStreamingMediaType(contentType)) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index 41b691c83a4e..1feaaa5ca4a8 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -213,17 +213,27 @@ public final Mono writeWith(Publisher body) { return ((Mono) body) .flatMap(buffer -> { touchDataBuffer(buffer); - return doCommit(() -> { - try { - return writeWithInternal(Mono.fromCallable(() -> buffer) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); - } - catch (Throwable ex) { - return Mono.error(ex); - } - }).doOnError(ex -> DataBufferUtils.release(buffer)); + AtomicReference subscribed = new AtomicReference<>(false); + return doCommit( + () -> { + try { + return writeWithInternal(Mono.fromCallable(() -> buffer) + .doOnSubscribe(s -> subscribed.set(true)) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + } + catch (Throwable ex) { + return Mono.error(ex); + } + }) + .doOnError(ex -> DataBufferUtils.release(buffer)) + .doOnCancel(() -> { + if (!subscribed.get()) { + DataBufferUtils.release(buffer); + } + }); }) - .doOnError(t -> getHeaders().clearContentHeaders()); + .doOnError(t -> getHeaders().clearContentHeaders()) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } else { return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner)))