Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamingFileUpload.asInputStream #9787

Merged
merged 3 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.io.Closeable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* A subscriber that allows blocking reads from a publisher. Handles resource cleanup properly.
*
* @param <T> Stream type
* @since 4.2.0
* @author Jonas Konrad
*/
@Internal
public final class PublisherAsBlocking<T> implements Subscriber<T>, Closeable {
yawkat marked this conversation as resolved.
Show resolved Hide resolved
private final Lock lock = new ReentrantLock();
private final Condition newDataCondition = lock.newCondition();
/**
* Set when {@link #take()} is called before {@link #onSubscribe}. {@link #onSubscribe} will
* immediately request some input.
*/
private boolean pendingDemand;
/**
* Pending object, this field is used to transfer from {@link #onNext} to {@link #take}.
*/
private T swap;
/**
* The upstream subscription.
*/
private Subscription subscription;
/**
* Set by {@link #onComplete} and {@link #onError}.
*/
private boolean done;
/**
* Set by {@link #close}. Further objects will be discarded.
*/
private boolean closed;
/**
* Failure from {@link #onError}.
*/
private Throwable failure;

/**
* The failure from {@link #onError(Throwable)}. When {@link #take()} returns {@code null}, this
* may be set if the reactive stream ended in failure.
*
* @return The failure, or {@code null} if either the stream is not done, or the stream
* completed successfully.
*/
@Nullable
public Throwable getFailure() {
return failure;
}

@Override
public void onSubscribe(Subscription s) {
boolean pendingDemand;
lock.lock();
try {
this.subscription = s;
pendingDemand = this.pendingDemand;
} finally {
lock.unlock();
}
if (pendingDemand) {
s.request(1);
}
}

@Override
public void onNext(T o) {
lock.lock();
try {
if (closed) {
ReferenceCountUtil.release(o);
return;
}
swap = o;
newDataCondition.signalAll();
} finally {
lock.unlock();
}
}

@Override
public void onError(Throwable t) {
lock.lock();
try {
if (swap != null) {
ReferenceCountUtil.release(swap);
swap = null;
}
failure = t;
done = true;
newDataCondition.signalAll();
} finally {
lock.unlock();
}
}

@Override
public void onComplete() {
lock.lock();
try {
done = true;
newDataCondition.signalAll();
} finally {
lock.unlock();
}
}

/**
* Get the next object.
*
* @return The next object, or {@code null} if the stream is done
*/
@Nullable
public T take() throws InterruptedException {
boolean demanded = false;
while (true) {
Subscription subscription;
lock.lock();
try {
T swap = this.swap;
if (swap != null) {
this.swap = null;
return swap;
}
if (done) {
return null;
}
if (demanded) {
newDataCondition.await();
}
subscription = this.subscription;
if (subscription == null) {
pendingDemand = true;
}
} finally {
lock.unlock();
}
if (!demanded) {
demanded = true;
if (subscription != null) {
subscription.request(1);
}
}
}
}

@Override
public void close() {
lock.lock();
try {
closed = true;
if (swap != null) {
ReferenceCountUtil.release(swap);
swap = null;
}
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2017-2023 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;

/**
* Transform a {@link PublisherAsBlocking} of buffers into a {@link InputStream}.
*
* @author Jonas Konrad
* @since 4.2.0
*/
@Internal
public final class PublisherAsStream extends InputStream {
yawkat marked this conversation as resolved.
Show resolved Hide resolved
private final PublisherAsBlocking<ByteBuf> publisherAsBlocking;
private ByteBuf buffer;

public PublisherAsStream(PublisherAsBlocking<ByteBuf> publisherAsBlocking) {
this.publisherAsBlocking = publisherAsBlocking;
}

@Override
public int read() throws IOException {
byte[] arr = new byte[1];
int n = read(arr);
return n == -1 ? -1 : arr[0] & 0xff;
}

@Override
public int read(@NonNull byte[] b, int off, int len) throws IOException {
while (buffer == null) {
try {
ByteBuf o = publisherAsBlocking.take();
if (o == null) {
Throwable failure = publisherAsBlocking.getFailure();
if (failure == null) {
return -1;
} else {
throw new IOException(failure);
}
}
if (o.readableBytes() == 0) {
o.release();
continue;
}
buffer = o;
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}

int toRead = Math.min(len, buffer.readableBytes());
buffer.readBytes(b, off, toRead);
if (buffer.readableBytes() == 0) {
buffer.release();
buffer = null;
}
return toRead;
}

@Override
public void close() throws IOException {
if (buffer != null) {
buffer.release();
buffer = null;
}
publisherAsBlocking.close();
}
}
Loading