-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add StreamingFileUpload.asInputStream (#9787)
* Add StreamingFileUpload.asInputStream Repurpose some existing publisher->stream code to implement StreamingFileUpload.asInputStream. This will be useful for micronaut-projects/micronaut-object-storage#113 * move PublisherAsBlocking, make it non-abstract * nonnull
- Loading branch information
Showing
8 changed files
with
336 additions
and
220 deletions.
There are no files selected for viewing
187 changes: 187 additions & 0 deletions
187
http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
/* | ||
* Copyright 2017-2023 original authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.micronaut.http.netty; | ||
|
||
import io.micronaut.core.annotation.Internal; | ||
import io.micronaut.core.annotation.Nullable; | ||
import io.netty.util.ReferenceCountUtil; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
|
||
import java.io.Closeable; | ||
import java.util.concurrent.locks.Condition; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
|
||
/** | ||
* A subscriber that allows blocking reads from a publisher. Handles resource cleanup properly. | ||
* | ||
* @param <T> Stream type | ||
* @since 4.2.0 | ||
* @author Jonas Konrad | ||
*/ | ||
@Internal | ||
public final class PublisherAsBlocking<T> implements Subscriber<T>, Closeable { | ||
private final Lock lock = new ReentrantLock(); | ||
private final Condition newDataCondition = lock.newCondition(); | ||
/** | ||
* Set when {@link #take()} is called before {@link #onSubscribe}. {@link #onSubscribe} will | ||
* immediately request some input. | ||
*/ | ||
private boolean pendingDemand; | ||
/** | ||
* Pending object, this field is used to transfer from {@link #onNext} to {@link #take}. | ||
*/ | ||
private T swap; | ||
/** | ||
* The upstream subscription. | ||
*/ | ||
private Subscription subscription; | ||
/** | ||
* Set by {@link #onComplete} and {@link #onError}. | ||
*/ | ||
private boolean done; | ||
/** | ||
* Set by {@link #close}. Further objects will be discarded. | ||
*/ | ||
private boolean closed; | ||
/** | ||
* Failure from {@link #onError}. | ||
*/ | ||
private Throwable failure; | ||
|
||
/** | ||
* The failure from {@link #onError(Throwable)}. When {@link #take()} returns {@code null}, this | ||
* may be set if the reactive stream ended in failure. | ||
* | ||
* @return The failure, or {@code null} if either the stream is not done, or the stream | ||
* completed successfully. | ||
*/ | ||
@Nullable | ||
public Throwable getFailure() { | ||
return failure; | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Subscription s) { | ||
boolean pendingDemand; | ||
lock.lock(); | ||
try { | ||
this.subscription = s; | ||
pendingDemand = this.pendingDemand; | ||
} finally { | ||
lock.unlock(); | ||
} | ||
if (pendingDemand) { | ||
s.request(1); | ||
} | ||
} | ||
|
||
@Override | ||
public void onNext(T o) { | ||
lock.lock(); | ||
try { | ||
if (closed) { | ||
ReferenceCountUtil.release(o); | ||
return; | ||
} | ||
swap = o; | ||
newDataCondition.signalAll(); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
lock.lock(); | ||
try { | ||
if (swap != null) { | ||
ReferenceCountUtil.release(swap); | ||
swap = null; | ||
} | ||
failure = t; | ||
done = true; | ||
newDataCondition.signalAll(); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
lock.lock(); | ||
try { | ||
done = true; | ||
newDataCondition.signalAll(); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
|
||
/** | ||
* Get the next object. | ||
* | ||
* @return The next object, or {@code null} if the stream is done | ||
*/ | ||
@Nullable | ||
public T take() throws InterruptedException { | ||
boolean demanded = false; | ||
while (true) { | ||
Subscription subscription; | ||
lock.lock(); | ||
try { | ||
T swap = this.swap; | ||
if (swap != null) { | ||
this.swap = null; | ||
return swap; | ||
} | ||
if (done) { | ||
return null; | ||
} | ||
if (demanded) { | ||
newDataCondition.await(); | ||
} | ||
subscription = this.subscription; | ||
if (subscription == null) { | ||
pendingDemand = true; | ||
} | ||
} finally { | ||
lock.unlock(); | ||
} | ||
if (!demanded) { | ||
demanded = true; | ||
if (subscription != null) { | ||
subscription.request(1); | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
lock.lock(); | ||
try { | ||
closed = true; | ||
if (swap != null) { | ||
ReferenceCountUtil.release(swap); | ||
swap = null; | ||
} | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
} |
88 changes: 88 additions & 0 deletions
88
http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Copyright 2017-2023 original authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.micronaut.http.netty; | ||
|
||
import io.micronaut.core.annotation.Internal; | ||
import io.micronaut.core.annotation.NonNull; | ||
import io.netty.buffer.ByteBuf; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.InterruptedIOException; | ||
|
||
/** | ||
* Transform a {@link PublisherAsBlocking} of buffers into a {@link InputStream}. | ||
* | ||
* @author Jonas Konrad | ||
* @since 4.2.0 | ||
*/ | ||
@Internal | ||
public final class PublisherAsStream extends InputStream { | ||
private final PublisherAsBlocking<ByteBuf> publisherAsBlocking; | ||
private ByteBuf buffer; | ||
|
||
public PublisherAsStream(PublisherAsBlocking<ByteBuf> publisherAsBlocking) { | ||
this.publisherAsBlocking = publisherAsBlocking; | ||
} | ||
|
||
@Override | ||
public int read() throws IOException { | ||
byte[] arr = new byte[1]; | ||
int n = read(arr); | ||
return n == -1 ? -1 : arr[0] & 0xff; | ||
} | ||
|
||
@Override | ||
public int read(@NonNull byte[] b, int off, int len) throws IOException { | ||
while (buffer == null) { | ||
try { | ||
ByteBuf o = publisherAsBlocking.take(); | ||
if (o == null) { | ||
Throwable failure = publisherAsBlocking.getFailure(); | ||
if (failure == null) { | ||
return -1; | ||
} else { | ||
throw new IOException(failure); | ||
} | ||
} | ||
if (o.readableBytes() == 0) { | ||
o.release(); | ||
continue; | ||
} | ||
buffer = o; | ||
} catch (InterruptedException e) { | ||
throw new InterruptedIOException(); | ||
} | ||
} | ||
|
||
int toRead = Math.min(len, buffer.readableBytes()); | ||
buffer.readBytes(b, off, toRead); | ||
if (buffer.readableBytes() == 0) { | ||
buffer.release(); | ||
buffer = null; | ||
} | ||
return toRead; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
if (buffer != null) { | ||
buffer.release(); | ||
buffer = null; | ||
} | ||
publisherAsBlocking.close(); | ||
} | ||
} |
Oops, something went wrong.