From 0a689f7d8f913de5b1ba17011fd38ce37d1acc79 Mon Sep 17 00:00:00 2001 From: yawkat Date: Mon, 28 Aug 2023 15:24:48 +0200 Subject: [PATCH 1/3] Add StreamingFileUpload.asInputStream Repurpose some existing publisher->stream code to implement StreamingFileUpload.asInputStream. This will be useful for https://github.com/micronaut-projects/micronaut-object-storage/issues/113 --- .../async/subscriber/PublisherAsBlocking.java | 188 +++++++++++++++ .../http/netty/PublisherAsStream.java | 89 +++++++ .../netty/body/StreamingMultiObjectBody.java | 220 +----------------- .../server/netty/multipart/NettyPartData.java | 8 +- .../multipart/NettyStreamingFileUpload.java | 19 ++ .../http/multipart/StreamingFileUpload.java | 11 + .../micronaut/upload/StreamUploadSpec.groovy | 20 ++ .../io/micronaut/upload/UploadController.java | 16 +- 8 files changed, 350 insertions(+), 221 deletions(-) create mode 100644 core-reactive/src/main/java/io/micronaut/core/async/subscriber/PublisherAsBlocking.java create mode 100644 http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java diff --git a/core-reactive/src/main/java/io/micronaut/core/async/subscriber/PublisherAsBlocking.java b/core-reactive/src/main/java/io/micronaut/core/async/subscriber/PublisherAsBlocking.java new file mode 100644 index 00000000000..ecab5c73852 --- /dev/null +++ b/core-reactive/src/main/java/io/micronaut/core/async/subscriber/PublisherAsBlocking.java @@ -0,0 +1,188 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.core.async.subscriber; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.Nullable; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.io.Closeable; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A subscriber that allows blocking reads from a publisher. Handles resource cleanup properly. + * + * @param Stream type + * @since 4.2.0 + * @author Jonas Konrad + */ +@Internal +public abstract class PublisherAsBlocking implements Subscriber, Closeable { + private final Lock lock = new ReentrantLock(); + private final Condition newDataCondition = lock.newCondition(); + /** + * Set when {@link #take()} is called before {@link #onSubscribe}. {@link #onSubscribe} will + * immediately request some input. + */ + private boolean pendingDemand; + /** + * Pending object, this field is used to transfer from {@link #onNext} to {@link #take}. + */ + private T swap; + /** + * The upstream subscription. + */ + private Subscription subscription; + /** + * Set by {@link #onComplete} and {@link #onError}. + */ + private boolean done; + /** + * Set by {@link #close}. Further objects will be discarded. + */ + private boolean closed; + /** + * Failure from {@link #onError}. + */ + private Throwable failure; + + /** + * The failure from {@link #onError(Throwable)}. When {@link #take()} returns {@code null}, this + * may be set if the reactive stream ended in failure. + * + * @return The failure, or {@code null} if either the stream is not done, or the stream + * completed successfully. + */ + @Nullable + public final Throwable getFailure() { + return failure; + } + + @Override + public final void onSubscribe(Subscription s) { + boolean pendingDemand; + lock.lock(); + try { + this.subscription = s; + pendingDemand = this.pendingDemand; + } finally { + lock.unlock(); + } + if (pendingDemand) { + s.request(1); + } + } + + @Override + public final void onNext(T o) { + lock.lock(); + try { + if (closed) { + release(o); + return; + } + swap = o; + newDataCondition.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public final void onError(Throwable t) { + lock.lock(); + try { + if (swap != null) { + release(swap); + swap = null; + } + failure = t; + done = true; + newDataCondition.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public final void onComplete() { + lock.lock(); + try { + done = true; + newDataCondition.signalAll(); + } finally { + lock.unlock(); + } + } + + /** + * Get the next object. + * + * @return The next object, or {@code null} if the stream is done + */ + @Nullable + public final T take() throws InterruptedException { + boolean demanded = false; + while (true) { + Subscription subscription; + lock.lock(); + try { + T swap = this.swap; + if (swap != null) { + this.swap = null; + return swap; + } + if (done) { + return null; + } + if (demanded) { + newDataCondition.await(); + } + subscription = this.subscription; + if (subscription == null) { + pendingDemand = true; + } + } finally { + lock.unlock(); + } + if (!demanded) { + demanded = true; + if (subscription != null) { + subscription.request(1); + } + } + } + } + + @Override + public final void close() { + lock.lock(); + try { + closed = true; + if (swap != null) { + release(swap); + swap = null; + } + } finally { + lock.unlock(); + } + } + + protected abstract void release(T item); +} diff --git a/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java new file mode 100644 index 00000000000..f4c2024ec6d --- /dev/null +++ b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java @@ -0,0 +1,89 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.netty; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.async.subscriber.PublisherAsBlocking; +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; + +/** + * Transform a {@link PublisherAsBlocking} of buffers into a {@link InputStream}. + * + * @author Jonas Konrad + * @since 4.2.0 + */ +@Internal +public final class PublisherAsStream extends InputStream { + private final PublisherAsBlocking publisherAsBlocking; + private ByteBuf buffer; + + public PublisherAsStream(PublisherAsBlocking publisherAsBlocking) { + this.publisherAsBlocking = publisherAsBlocking; + } + + @Override + public int read() throws IOException { + byte[] arr = new byte[1]; + int n = read(arr); + return n == -1 ? -1 : arr[0] & 0xff; + } + + @Override + public int read(@NonNull byte[] b, int off, int len) throws IOException { + while (buffer == null) { + try { + ByteBuf o = publisherAsBlocking.take(); + if (o == null) { + Throwable failure = publisherAsBlocking.getFailure(); + if (failure == null) { + return -1; + } else { + throw new IOException(failure); + } + } + if (o.readableBytes() == 0) { + o.release(); + continue; + } + buffer = o; + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + + int toRead = Math.min(len, buffer.readableBytes()); + buffer.readBytes(b, off, toRead); + if (buffer.readableBytes() == 0) { + buffer.release(); + buffer = null; + } + return toRead; + } + + @Override + public void close() throws IOException { + if (buffer != null) { + buffer.release(); + buffer = null; + } + publisherAsBlocking.close(); + } +} diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java index e9c778b568d..d1cf65174ae 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java @@ -16,25 +16,16 @@ package io.micronaut.http.server.netty.body; import io.micronaut.core.annotation.Internal; -import io.micronaut.core.annotation.NonNull; -import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.async.subscriber.PublisherAsBlocking; +import io.micronaut.http.netty.PublisherAsStream; import io.micronaut.http.netty.reactive.HotObservable; import io.micronaut.http.server.netty.FormRouteCompleter; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; -import java.io.Closeable; -import java.io.IOException; import java.io.InputStream; -import java.io.InterruptedIOException; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; /** @@ -58,7 +49,12 @@ void release(Publisher value) { @Override public InputStream coerceToInputStream(ByteBufAllocator alloc) { - PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>(); + PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>() { + @Override + protected void release(ByteBuf item) { + item.release(); + } + }; //noinspection unchecked ((Publisher) claim()).subscribe(publisherAsBlocking); return new PublisherAsStream(publisherAsBlocking); @@ -79,204 +75,4 @@ public void handleForm(FormRouteCompleter formRouteCompleter) { prepareClaim().subscribe(formRouteCompleter); next(formRouteCompleter); } - - /** - * A subscriber that allows blocking reads from a publisher. Handles resource cleanup properly. - * - * @param Stream type - */ - private static final class PublisherAsBlocking implements Subscriber, Closeable { - private final Lock lock = new ReentrantLock(); - private final Condition newDataCondition = lock.newCondition(); - /** - * Set when {@link #take()} is called before {@link #onSubscribe}. {@link #onSubscribe} will - * immediately request some input. - */ - private boolean pendingDemand; - /** - * Pending object, this field is used to transfer from {@link #onNext} to {@link #take}. - */ - private T swap; - /** - * The upstream subscription. - */ - private Subscription subscription; - /** - * Set by {@link #onComplete} and {@link #onError}. - */ - private boolean done; - /** - * Set by {@link #close}. Further objects will be discarded. - */ - private boolean closed; - /** - * Failure from {@link #onError}. - */ - private Throwable failure; - - @Override - public void onSubscribe(Subscription s) { - boolean pendingDemand; - lock.lock(); - try { - this.subscription = s; - pendingDemand = this.pendingDemand; - } finally { - lock.unlock(); - } - if (pendingDemand) { - s.request(1); - } - } - - @Override - public void onNext(T o) { - lock.lock(); - try { - if (closed) { - ReferenceCountUtil.release(o); - return; - } - swap = o; - newDataCondition.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void onError(Throwable t) { - lock.lock(); - try { - if (swap != null) { - ReferenceCountUtil.release(swap); - swap = null; - } - failure = t; - done = true; - newDataCondition.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void onComplete() { - lock.lock(); - try { - done = true; - newDataCondition.signalAll(); - } finally { - lock.unlock(); - } - } - - /** - * Get the next object. - * - * @return The next object, or {@code null} if the stream is done - */ - @Nullable - public T take() throws InterruptedException { - boolean demanded = false; - while (true) { - Subscription subscription; - lock.lock(); - try { - T swap = this.swap; - if (swap != null) { - this.swap = null; - return swap; - } - if (done) { - return null; - } - if (demanded) { - newDataCondition.await(); - } - subscription = this.subscription; - if (subscription == null) { - pendingDemand = true; - } - } finally { - lock.unlock(); - } - if (!demanded) { - demanded = true; - if (subscription != null) { - subscription.request(1); - } - } - } - } - - @Override - public void close() { - lock.lock(); - try { - closed = true; - if (swap != null) { - ReferenceCountUtil.release(swap); - swap = null; - } - } finally { - lock.unlock(); - } - } - } - - private static final class PublisherAsStream extends InputStream { - private final PublisherAsBlocking publisherAsBlocking; - private ByteBuf buffer; - - private PublisherAsStream(PublisherAsBlocking publisherAsBlocking) { - this.publisherAsBlocking = publisherAsBlocking; - } - - @Override - public int read() throws IOException { - byte[] arr = new byte[1]; - int n = read(arr); - return n == -1 ? -1 : arr[0] & 0xff; - } - - @Override - public int read(@NonNull byte[] b, int off, int len) throws IOException { - while (buffer == null) { - try { - ByteBuf o = publisherAsBlocking.take(); - if (o == null) { - if (publisherAsBlocking.failure == null) { - return -1; - } else { - throw new IOException(publisherAsBlocking.failure); - } - } - if (!o.isReadable()) { - continue; - } - buffer = o; - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - - int toRead = Math.min(len, buffer.readableBytes()); - buffer.readBytes(b, off, toRead); - if (!buffer.isReadable()) { - buffer.release(); - buffer = null; - } - return toRead; - } - - @Override - public void close() throws IOException { - if (buffer != null) { - buffer.release(); - buffer = null; - } - publisherAsBlocking.close(); - } - } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyPartData.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyPartData.java index 880d5a22014..958295a2c9d 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyPartData.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyPartData.java @@ -16,7 +16,6 @@ package io.micronaut.http.server.netty.multipart; import io.micronaut.core.annotation.Internal; -import io.micronaut.core.util.functional.ThrowingSupplier; import io.micronaut.http.MediaType; import io.micronaut.http.multipart.PartData; import io.netty.buffer.ByteBuf; @@ -39,13 +38,13 @@ public class NettyPartData implements PartData { private final Supplier> mediaTypeSupplier; - private final ThrowingSupplier byteBufSupplier; + private final Supplier byteBufSupplier; /** * @param mediaTypeSupplier The content type supplier * @param byteBufSupplier The byte buffer supplier */ - public NettyPartData(Supplier> mediaTypeSupplier, ThrowingSupplier byteBufSupplier) { + public NettyPartData(Supplier> mediaTypeSupplier, Supplier byteBufSupplier) { this.mediaTypeSupplier = mediaTypeSupplier; this.byteBufSupplier = byteBufSupplier; } @@ -96,9 +95,8 @@ public Optional getContentType() { /** * @return The native netty {@link ByteBuf} for this chunk - * @throws IOException If an error occurs retrieving the buffer */ - public ByteBuf getByteBuf() throws IOException { + public ByteBuf getByteBuf() { return byteBufSupplier.get(); } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java index 7fb642356b0..9328328aed7 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java @@ -17,13 +17,17 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.async.publisher.AsyncSingleResultPublisher; +import io.micronaut.core.async.subscriber.PublisherAsBlocking; +import io.micronaut.core.io.buffer.ReferenceCounted; import io.micronaut.core.naming.NameUtils; import io.micronaut.core.util.functional.ThrowingSupplier; import io.micronaut.http.MediaType; import io.micronaut.http.multipart.MultipartException; import io.micronaut.http.multipart.PartData; import io.micronaut.http.multipart.StreamingFileUpload; +import io.micronaut.http.netty.PublisherAsStream; import io.micronaut.http.server.HttpServerConfiguration; +import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.multipart.DiskFileUpload; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -36,6 +40,7 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.util.Optional; @@ -123,6 +128,20 @@ public Publisher delete() { }); } + @Override + public InputStream asInputStream() { + PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>() { + @Override + protected void release(ByteBuf item) { + if (item instanceof ReferenceCounted rc) { + rc.release(); + } + } + }; + subject.map(pd -> ((NettyPartData) pd).getByteBuf()).subscribe(publisherAsBlocking); + return new PublisherAsStream(publisherAsBlocking); + } + /** * @param location The location for the temp file * @return The temporal file diff --git a/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java b/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java index 55e20d94684..575d69109f2 100644 --- a/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java +++ b/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java @@ -18,6 +18,7 @@ import org.reactivestreams.Publisher; import java.io.File; +import java.io.InputStream; import java.io.OutputStream; /** @@ -79,4 +80,14 @@ default Publisher transferTo(OutputStream outputStream) { */ Publisher delete(); + /** + * Create an {@link InputStream} that reads this file. The returned stream must be closed after + * use. The stream may block when data isn't yet available. + * + * @return An {@link InputStream} that reads this file's contents + * @since 4.2.0 + */ + default InputStream asInputStream() { + throw new UnsupportedOperationException("StreamingFileUpload doesn't support asInputStream"); + } } diff --git a/test-suite/src/test/groovy/io/micronaut/upload/StreamUploadSpec.groovy b/test-suite/src/test/groovy/io/micronaut/upload/StreamUploadSpec.groovy index 13048f7f3cb..2ab6e4b14a2 100644 --- a/test-suite/src/test/groovy/io/micronaut/upload/StreamUploadSpec.groovy +++ b/test-suite/src/test/groovy/io/micronaut/upload/StreamUploadSpec.groovy @@ -127,6 +127,26 @@ class StreamUploadSpec extends AbstractMicronautSpec { file.text == data } + void "test upload big FileUpload object via asInputStream"() { + given: + def val = 'Big ' + 'xxxx' * 500 + + MultipartBody requestBody = MultipartBody.builder() + .addPart("data", "val", MediaType.TEXT_PLAIN_TYPE, val.bytes) + .build() + + when: + Flux> flowable = Flux.from(client.exchange( + HttpRequest.POST("/upload/receive-file-upload-input-stream", requestBody) + .contentType(MediaType.MULTIPART_FORM_DATA) + .accept(MediaType.TEXT_PLAIN_TYPE), + String + )) + HttpResponse response = flowable.blockFirst() + then: + response.getBody().get() == val + } + void "test non-blocking upload with publisher receiving bytes"() { given: def data = 'some data ' * 500 diff --git a/test-suite/src/test/groovy/io/micronaut/upload/UploadController.java b/test-suite/src/test/groovy/io/micronaut/upload/UploadController.java index c9226a848e8..f2cbfa32b95 100644 --- a/test-suite/src/test/groovy/io/micronaut/upload/UploadController.java +++ b/test-suite/src/test/groovy/io/micronaut/upload/UploadController.java @@ -15,6 +15,7 @@ */ package io.micronaut.upload; +import io.micronaut.core.async.annotation.SingleResult; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.http.MediaType; @@ -28,17 +29,16 @@ import io.micronaut.http.multipart.PartData; import io.micronaut.http.multipart.StreamingFileUpload; import io.micronaut.http.server.multipart.MultipartBody; +import io.micronaut.scheduling.TaskExecutors; +import io.micronaut.scheduling.annotation.ExecuteOn; +import jakarta.inject.Singleton; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import io.micronaut.core.async.annotation.SingleResult; -import jakarta.inject.Singleton; import reactor.core.Exceptions; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; -import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; import java.io.IOException; @@ -88,6 +88,14 @@ public Publisher> receiveFileUpload(StreamingFileUpload d .onErrorReturn((MutableHttpResponse) HttpResponse.status(HttpStatus.INTERNAL_SERVER_ERROR, "Something bad happened")); } + @Post(value = "/receive-file-upload-input-stream", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN) + @ExecuteOn(TaskExecutors.BLOCKING) + public String receiveFileUploadInputStream(StreamingFileUpload data) throws IOException { + try (InputStream stream = data.asInputStream()) { + return new String(stream.readAllBytes(), StandardCharsets.UTF_8); + } + } + @Post(value = "/receive-completed-file-upload", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN) public String receiveCompletedFileUpload(CompletedFileUpload data) { try { From f58cfe6f81b45d8f5dd25ab51ac7e53395edfd24 Mon Sep 17 00:00:00 2001 From: yawkat Date: Mon, 28 Aug 2023 15:50:08 +0200 Subject: [PATCH 2/3] move PublisherAsBlocking, make it non-abstract --- .../http/netty}/PublisherAsBlocking.java | 27 +++++++++---------- .../http/netty/PublisherAsStream.java | 1 - .../netty/body/StreamingMultiObjectBody.java | 9 ++----- .../multipart/NettyStreamingFileUpload.java | 12 ++------- 4 files changed, 17 insertions(+), 32 deletions(-) rename {core-reactive/src/main/java/io/micronaut/core/async/subscriber => http-netty/src/main/java/io/micronaut/http/netty}/PublisherAsBlocking.java (88%) diff --git a/core-reactive/src/main/java/io/micronaut/core/async/subscriber/PublisherAsBlocking.java b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java similarity index 88% rename from core-reactive/src/main/java/io/micronaut/core/async/subscriber/PublisherAsBlocking.java rename to http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java index ecab5c73852..13cbba2387e 100644 --- a/core-reactive/src/main/java/io/micronaut/core/async/subscriber/PublisherAsBlocking.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java @@ -13,10 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.core.async.subscriber; +package io.micronaut.http.netty; import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.Nullable; +import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -33,7 +34,7 @@ * @author Jonas Konrad */ @Internal -public abstract class PublisherAsBlocking implements Subscriber, Closeable { +public final class PublisherAsBlocking implements Subscriber, Closeable { private final Lock lock = new ReentrantLock(); private final Condition newDataCondition = lock.newCondition(); /** @@ -70,12 +71,12 @@ public abstract class PublisherAsBlocking implements Subscriber, Closeable * completed successfully. */ @Nullable - public final Throwable getFailure() { + public Throwable getFailure() { return failure; } @Override - public final void onSubscribe(Subscription s) { + public void onSubscribe(Subscription s) { boolean pendingDemand; lock.lock(); try { @@ -90,11 +91,11 @@ public final void onSubscribe(Subscription s) { } @Override - public final void onNext(T o) { + public void onNext(T o) { lock.lock(); try { if (closed) { - release(o); + ReferenceCountUtil.release(o); return; } swap = o; @@ -105,11 +106,11 @@ public final void onNext(T o) { } @Override - public final void onError(Throwable t) { + public void onError(Throwable t) { lock.lock(); try { if (swap != null) { - release(swap); + ReferenceCountUtil.release(swap); swap = null; } failure = t; @@ -121,7 +122,7 @@ public final void onError(Throwable t) { } @Override - public final void onComplete() { + public void onComplete() { lock.lock(); try { done = true; @@ -137,7 +138,7 @@ public final void onComplete() { * @return The next object, or {@code null} if the stream is done */ @Nullable - public final T take() throws InterruptedException { + public T take() throws InterruptedException { boolean demanded = false; while (true) { Subscription subscription; @@ -171,18 +172,16 @@ public final T take() throws InterruptedException { } @Override - public final void close() { + public void close() { lock.lock(); try { closed = true; if (swap != null) { - release(swap); + ReferenceCountUtil.release(swap); swap = null; } } finally { lock.unlock(); } } - - protected abstract void release(T item); } diff --git a/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java index f4c2024ec6d..27a9641e3d7 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java @@ -17,7 +17,6 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; -import io.micronaut.core.async.subscriber.PublisherAsBlocking; import io.netty.buffer.ByteBuf; import java.io.IOException; diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java index d1cf65174ae..1167d26e6b2 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java @@ -16,7 +16,7 @@ package io.micronaut.http.server.netty.body; import io.micronaut.core.annotation.Internal; -import io.micronaut.core.async.subscriber.PublisherAsBlocking; +import io.micronaut.http.netty.PublisherAsBlocking; import io.micronaut.http.netty.PublisherAsStream; import io.micronaut.http.netty.reactive.HotObservable; import io.micronaut.http.server.netty.FormRouteCompleter; @@ -49,12 +49,7 @@ void release(Publisher value) { @Override public InputStream coerceToInputStream(ByteBufAllocator alloc) { - PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>() { - @Override - protected void release(ByteBuf item) { - item.release(); - } - }; + PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>(); //noinspection unchecked ((Publisher) claim()).subscribe(publisherAsBlocking); return new PublisherAsStream(publisherAsBlocking); diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java index 9328328aed7..b2397167cc0 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java @@ -17,14 +17,13 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.async.publisher.AsyncSingleResultPublisher; -import io.micronaut.core.async.subscriber.PublisherAsBlocking; -import io.micronaut.core.io.buffer.ReferenceCounted; import io.micronaut.core.naming.NameUtils; import io.micronaut.core.util.functional.ThrowingSupplier; import io.micronaut.http.MediaType; import io.micronaut.http.multipart.MultipartException; import io.micronaut.http.multipart.PartData; import io.micronaut.http.multipart.StreamingFileUpload; +import io.micronaut.http.netty.PublisherAsBlocking; import io.micronaut.http.netty.PublisherAsStream; import io.micronaut.http.server.HttpServerConfiguration; import io.netty.buffer.ByteBuf; @@ -130,14 +129,7 @@ public Publisher delete() { @Override public InputStream asInputStream() { - PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>() { - @Override - protected void release(ByteBuf item) { - if (item instanceof ReferenceCounted rc) { - rc.release(); - } - } - }; + PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>(); subject.map(pd -> ((NettyPartData) pd).getByteBuf()).subscribe(publisherAsBlocking); return new PublisherAsStream(publisherAsBlocking); } From b8871ab1957c0d18101ec7581d7f2b70e836fe1e Mon Sep 17 00:00:00 2001 From: yawkat Date: Mon, 28 Aug 2023 17:15:28 +0200 Subject: [PATCH 3/3] nonnull --- .../java/io/micronaut/http/multipart/StreamingFileUpload.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java b/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java index 575d69109f2..66c4cb9d464 100644 --- a/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java +++ b/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java @@ -15,6 +15,7 @@ */ package io.micronaut.http.multipart; +import io.micronaut.core.annotation.NonNull; import org.reactivestreams.Publisher; import java.io.File; @@ -87,6 +88,7 @@ default Publisher transferTo(OutputStream outputStream) { * @return An {@link InputStream} that reads this file's contents * @since 4.2.0 */ + @NonNull default InputStream asInputStream() { throw new UnsupportedOperationException("StreamingFileUpload doesn't support asInputStream"); }