diff --git a/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java new file mode 100644 index 00000000000..13cbba2387e --- /dev/null +++ b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java @@ -0,0 +1,187 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.netty; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.Nullable; +import io.netty.util.ReferenceCountUtil; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.io.Closeable; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A subscriber that allows blocking reads from a publisher. Handles resource cleanup properly. + * + * @param Stream type + * @since 4.2.0 + * @author Jonas Konrad + */ +@Internal +public final class PublisherAsBlocking implements Subscriber, Closeable { + private final Lock lock = new ReentrantLock(); + private final Condition newDataCondition = lock.newCondition(); + /** + * Set when {@link #take()} is called before {@link #onSubscribe}. {@link #onSubscribe} will + * immediately request some input. + */ + private boolean pendingDemand; + /** + * Pending object, this field is used to transfer from {@link #onNext} to {@link #take}. + */ + private T swap; + /** + * The upstream subscription. + */ + private Subscription subscription; + /** + * Set by {@link #onComplete} and {@link #onError}. + */ + private boolean done; + /** + * Set by {@link #close}. Further objects will be discarded. + */ + private boolean closed; + /** + * Failure from {@link #onError}. + */ + private Throwable failure; + + /** + * The failure from {@link #onError(Throwable)}. When {@link #take()} returns {@code null}, this + * may be set if the reactive stream ended in failure. + * + * @return The failure, or {@code null} if either the stream is not done, or the stream + * completed successfully. + */ + @Nullable + public Throwable getFailure() { + return failure; + } + + @Override + public void onSubscribe(Subscription s) { + boolean pendingDemand; + lock.lock(); + try { + this.subscription = s; + pendingDemand = this.pendingDemand; + } finally { + lock.unlock(); + } + if (pendingDemand) { + s.request(1); + } + } + + @Override + public void onNext(T o) { + lock.lock(); + try { + if (closed) { + ReferenceCountUtil.release(o); + return; + } + swap = o; + newDataCondition.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public void onError(Throwable t) { + lock.lock(); + try { + if (swap != null) { + ReferenceCountUtil.release(swap); + swap = null; + } + failure = t; + done = true; + newDataCondition.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public void onComplete() { + lock.lock(); + try { + done = true; + newDataCondition.signalAll(); + } finally { + lock.unlock(); + } + } + + /** + * Get the next object. + * + * @return The next object, or {@code null} if the stream is done + */ + @Nullable + public T take() throws InterruptedException { + boolean demanded = false; + while (true) { + Subscription subscription; + lock.lock(); + try { + T swap = this.swap; + if (swap != null) { + this.swap = null; + return swap; + } + if (done) { + return null; + } + if (demanded) { + newDataCondition.await(); + } + subscription = this.subscription; + if (subscription == null) { + pendingDemand = true; + } + } finally { + lock.unlock(); + } + if (!demanded) { + demanded = true; + if (subscription != null) { + subscription.request(1); + } + } + } + } + + @Override + public void close() { + lock.lock(); + try { + closed = true; + if (swap != null) { + ReferenceCountUtil.release(swap); + swap = null; + } + } finally { + lock.unlock(); + } + } +} diff --git a/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java new file mode 100644 index 00000000000..27a9641e3d7 --- /dev/null +++ b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java @@ -0,0 +1,88 @@ +/* + * Copyright 2017-2023 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.netty; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; + +/** + * Transform a {@link PublisherAsBlocking} of buffers into a {@link InputStream}. + * + * @author Jonas Konrad + * @since 4.2.0 + */ +@Internal +public final class PublisherAsStream extends InputStream { + private final PublisherAsBlocking publisherAsBlocking; + private ByteBuf buffer; + + public PublisherAsStream(PublisherAsBlocking publisherAsBlocking) { + this.publisherAsBlocking = publisherAsBlocking; + } + + @Override + public int read() throws IOException { + byte[] arr = new byte[1]; + int n = read(arr); + return n == -1 ? -1 : arr[0] & 0xff; + } + + @Override + public int read(@NonNull byte[] b, int off, int len) throws IOException { + while (buffer == null) { + try { + ByteBuf o = publisherAsBlocking.take(); + if (o == null) { + Throwable failure = publisherAsBlocking.getFailure(); + if (failure == null) { + return -1; + } else { + throw new IOException(failure); + } + } + if (o.readableBytes() == 0) { + o.release(); + continue; + } + buffer = o; + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + + int toRead = Math.min(len, buffer.readableBytes()); + buffer.readBytes(b, off, toRead); + if (buffer.readableBytes() == 0) { + buffer.release(); + buffer = null; + } + return toRead; + } + + @Override + public void close() throws IOException { + if (buffer != null) { + buffer.release(); + buffer = null; + } + publisherAsBlocking.close(); + } +} diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java index e9c778b568d..1167d26e6b2 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/body/StreamingMultiObjectBody.java @@ -16,25 +16,16 @@ package io.micronaut.http.server.netty.body; import io.micronaut.core.annotation.Internal; -import io.micronaut.core.annotation.NonNull; -import io.micronaut.core.annotation.Nullable; +import io.micronaut.http.netty.PublisherAsBlocking; +import io.micronaut.http.netty.PublisherAsStream; import io.micronaut.http.netty.reactive.HotObservable; import io.micronaut.http.server.netty.FormRouteCompleter; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; -import java.io.Closeable; -import java.io.IOException; import java.io.InputStream; -import java.io.InterruptedIOException; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; /** @@ -79,204 +70,4 @@ public void handleForm(FormRouteCompleter formRouteCompleter) { prepareClaim().subscribe(formRouteCompleter); next(formRouteCompleter); } - - /** - * A subscriber that allows blocking reads from a publisher. Handles resource cleanup properly. - * - * @param Stream type - */ - private static final class PublisherAsBlocking implements Subscriber, Closeable { - private final Lock lock = new ReentrantLock(); - private final Condition newDataCondition = lock.newCondition(); - /** - * Set when {@link #take()} is called before {@link #onSubscribe}. {@link #onSubscribe} will - * immediately request some input. - */ - private boolean pendingDemand; - /** - * Pending object, this field is used to transfer from {@link #onNext} to {@link #take}. - */ - private T swap; - /** - * The upstream subscription. - */ - private Subscription subscription; - /** - * Set by {@link #onComplete} and {@link #onError}. - */ - private boolean done; - /** - * Set by {@link #close}. Further objects will be discarded. - */ - private boolean closed; - /** - * Failure from {@link #onError}. - */ - private Throwable failure; - - @Override - public void onSubscribe(Subscription s) { - boolean pendingDemand; - lock.lock(); - try { - this.subscription = s; - pendingDemand = this.pendingDemand; - } finally { - lock.unlock(); - } - if (pendingDemand) { - s.request(1); - } - } - - @Override - public void onNext(T o) { - lock.lock(); - try { - if (closed) { - ReferenceCountUtil.release(o); - return; - } - swap = o; - newDataCondition.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void onError(Throwable t) { - lock.lock(); - try { - if (swap != null) { - ReferenceCountUtil.release(swap); - swap = null; - } - failure = t; - done = true; - newDataCondition.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void onComplete() { - lock.lock(); - try { - done = true; - newDataCondition.signalAll(); - } finally { - lock.unlock(); - } - } - - /** - * Get the next object. - * - * @return The next object, or {@code null} if the stream is done - */ - @Nullable - public T take() throws InterruptedException { - boolean demanded = false; - while (true) { - Subscription subscription; - lock.lock(); - try { - T swap = this.swap; - if (swap != null) { - this.swap = null; - return swap; - } - if (done) { - return null; - } - if (demanded) { - newDataCondition.await(); - } - subscription = this.subscription; - if (subscription == null) { - pendingDemand = true; - } - } finally { - lock.unlock(); - } - if (!demanded) { - demanded = true; - if (subscription != null) { - subscription.request(1); - } - } - } - } - - @Override - public void close() { - lock.lock(); - try { - closed = true; - if (swap != null) { - ReferenceCountUtil.release(swap); - swap = null; - } - } finally { - lock.unlock(); - } - } - } - - private static final class PublisherAsStream extends InputStream { - private final PublisherAsBlocking publisherAsBlocking; - private ByteBuf buffer; - - private PublisherAsStream(PublisherAsBlocking publisherAsBlocking) { - this.publisherAsBlocking = publisherAsBlocking; - } - - @Override - public int read() throws IOException { - byte[] arr = new byte[1]; - int n = read(arr); - return n == -1 ? -1 : arr[0] & 0xff; - } - - @Override - public int read(@NonNull byte[] b, int off, int len) throws IOException { - while (buffer == null) { - try { - ByteBuf o = publisherAsBlocking.take(); - if (o == null) { - if (publisherAsBlocking.failure == null) { - return -1; - } else { - throw new IOException(publisherAsBlocking.failure); - } - } - if (!o.isReadable()) { - continue; - } - buffer = o; - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - - int toRead = Math.min(len, buffer.readableBytes()); - buffer.readBytes(b, off, toRead); - if (!buffer.isReadable()) { - buffer.release(); - buffer = null; - } - return toRead; - } - - @Override - public void close() throws IOException { - if (buffer != null) { - buffer.release(); - buffer = null; - } - publisherAsBlocking.close(); - } - } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyPartData.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyPartData.java index 880d5a22014..958295a2c9d 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyPartData.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyPartData.java @@ -16,7 +16,6 @@ package io.micronaut.http.server.netty.multipart; import io.micronaut.core.annotation.Internal; -import io.micronaut.core.util.functional.ThrowingSupplier; import io.micronaut.http.MediaType; import io.micronaut.http.multipart.PartData; import io.netty.buffer.ByteBuf; @@ -39,13 +38,13 @@ public class NettyPartData implements PartData { private final Supplier> mediaTypeSupplier; - private final ThrowingSupplier byteBufSupplier; + private final Supplier byteBufSupplier; /** * @param mediaTypeSupplier The content type supplier * @param byteBufSupplier The byte buffer supplier */ - public NettyPartData(Supplier> mediaTypeSupplier, ThrowingSupplier byteBufSupplier) { + public NettyPartData(Supplier> mediaTypeSupplier, Supplier byteBufSupplier) { this.mediaTypeSupplier = mediaTypeSupplier; this.byteBufSupplier = byteBufSupplier; } @@ -96,9 +95,8 @@ public Optional getContentType() { /** * @return The native netty {@link ByteBuf} for this chunk - * @throws IOException If an error occurs retrieving the buffer */ - public ByteBuf getByteBuf() throws IOException { + public ByteBuf getByteBuf() { return byteBufSupplier.get(); } } diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java index 7fb642356b0..b2397167cc0 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java @@ -23,7 +23,10 @@ import io.micronaut.http.multipart.MultipartException; import io.micronaut.http.multipart.PartData; import io.micronaut.http.multipart.StreamingFileUpload; +import io.micronaut.http.netty.PublisherAsBlocking; +import io.micronaut.http.netty.PublisherAsStream; import io.micronaut.http.server.HttpServerConfiguration; +import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.multipart.DiskFileUpload; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -36,6 +39,7 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.util.Optional; @@ -123,6 +127,13 @@ public Publisher delete() { }); } + @Override + public InputStream asInputStream() { + PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>(); + subject.map(pd -> ((NettyPartData) pd).getByteBuf()).subscribe(publisherAsBlocking); + return new PublisherAsStream(publisherAsBlocking); + } + /** * @param location The location for the temp file * @return The temporal file diff --git a/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java b/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java index 55e20d94684..66c4cb9d464 100644 --- a/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java +++ b/http/src/main/java/io/micronaut/http/multipart/StreamingFileUpload.java @@ -15,9 +15,11 @@ */ package io.micronaut.http.multipart; +import io.micronaut.core.annotation.NonNull; import org.reactivestreams.Publisher; import java.io.File; +import java.io.InputStream; import java.io.OutputStream; /** @@ -79,4 +81,15 @@ default Publisher transferTo(OutputStream outputStream) { */ Publisher delete(); + /** + * Create an {@link InputStream} that reads this file. The returned stream must be closed after + * use. The stream may block when data isn't yet available. + * + * @return An {@link InputStream} that reads this file's contents + * @since 4.2.0 + */ + @NonNull + default InputStream asInputStream() { + throw new UnsupportedOperationException("StreamingFileUpload doesn't support asInputStream"); + } } diff --git a/test-suite/src/test/groovy/io/micronaut/upload/StreamUploadSpec.groovy b/test-suite/src/test/groovy/io/micronaut/upload/StreamUploadSpec.groovy index 13048f7f3cb..2ab6e4b14a2 100644 --- a/test-suite/src/test/groovy/io/micronaut/upload/StreamUploadSpec.groovy +++ b/test-suite/src/test/groovy/io/micronaut/upload/StreamUploadSpec.groovy @@ -127,6 +127,26 @@ class StreamUploadSpec extends AbstractMicronautSpec { file.text == data } + void "test upload big FileUpload object via asInputStream"() { + given: + def val = 'Big ' + 'xxxx' * 500 + + MultipartBody requestBody = MultipartBody.builder() + .addPart("data", "val", MediaType.TEXT_PLAIN_TYPE, val.bytes) + .build() + + when: + Flux> flowable = Flux.from(client.exchange( + HttpRequest.POST("/upload/receive-file-upload-input-stream", requestBody) + .contentType(MediaType.MULTIPART_FORM_DATA) + .accept(MediaType.TEXT_PLAIN_TYPE), + String + )) + HttpResponse response = flowable.blockFirst() + then: + response.getBody().get() == val + } + void "test non-blocking upload with publisher receiving bytes"() { given: def data = 'some data ' * 500 diff --git a/test-suite/src/test/groovy/io/micronaut/upload/UploadController.java b/test-suite/src/test/groovy/io/micronaut/upload/UploadController.java index c9226a848e8..f2cbfa32b95 100644 --- a/test-suite/src/test/groovy/io/micronaut/upload/UploadController.java +++ b/test-suite/src/test/groovy/io/micronaut/upload/UploadController.java @@ -15,6 +15,7 @@ */ package io.micronaut.upload; +import io.micronaut.core.async.annotation.SingleResult; import io.micronaut.http.HttpResponse; import io.micronaut.http.HttpStatus; import io.micronaut.http.MediaType; @@ -28,17 +29,16 @@ import io.micronaut.http.multipart.PartData; import io.micronaut.http.multipart.StreamingFileUpload; import io.micronaut.http.server.multipart.MultipartBody; +import io.micronaut.scheduling.TaskExecutors; +import io.micronaut.scheduling.annotation.ExecuteOn; +import jakarta.inject.Singleton; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import io.micronaut.core.async.annotation.SingleResult; -import jakarta.inject.Singleton; import reactor.core.Exceptions; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; -import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; import java.io.IOException; @@ -88,6 +88,14 @@ public Publisher> receiveFileUpload(StreamingFileUpload d .onErrorReturn((MutableHttpResponse) HttpResponse.status(HttpStatus.INTERNAL_SERVER_ERROR, "Something bad happened")); } + @Post(value = "/receive-file-upload-input-stream", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN) + @ExecuteOn(TaskExecutors.BLOCKING) + public String receiveFileUploadInputStream(StreamingFileUpload data) throws IOException { + try (InputStream stream = data.asInputStream()) { + return new String(stream.readAllBytes(), StandardCharsets.UTF_8); + } + } + @Post(value = "/receive-completed-file-upload", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN) public String receiveCompletedFileUpload(CompletedFileUpload data) { try {