From 395c5f05ba825ab00ec0fe3c02311eb2401ce963 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Szynkiewicz?= Date: Mon, 29 Nov 2021 09:58:30 +0100 Subject: [PATCH] Sending Multi as files in multipart in REST Client Reactive --- .../JaxrsClientReactiveProcessor.java | 78 +- .../reactive/multipart/MultiByteFileTest.java | 293 ++++ .../MultiByteWithRemoteErrorTest.java | 87 + .../MultipartFilenameTest.java | 6 +- .../devconsole/RestClientsContainer.java | 2 +- .../handlers/ClientSendRequestHandler.java | 9 +- .../impl/multipart/MultiByteHttpData.java | 371 +++++ .../PausableHttpPostRequestEncoder.java | 1423 +++++++++++++++++ .../multipart/QuarkusHttpPostBodyUtil.java | 273 ++++ .../multipart/QuarkusInternalAttribute.java | 157 ++ .../impl/multipart/QuarkusMultipartForm.java | 13 + .../QuarkusMultipartFormDataPart.java | 31 + .../multipart/QuarkusMultipartFormUpload.java | 82 +- .../client/multipart/MultipartClient.java | 18 + .../client/multipart/MultipartResource.java | 23 + .../multipart/MultipartResourceTest.java | 12 + 16 files changed, 2824 insertions(+), 54 deletions(-) create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultiByteFileTest.java create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultiByteWithRemoteErrorTest.java rename extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/{ => multipart}/MultipartFilenameTest.java (88%) create mode 100644 independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java create mode 100644 independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/PausableHttpPostRequestEncoder.java create mode 100644 independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusHttpPostBodyUtil.java create mode 100644 independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusInternalAttribute.java diff --git a/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java b/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java index 45b556c80be7c..67dacc9029692 100644 --- a/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java +++ b/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java @@ -26,6 +26,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.regex.Pattern; +import java.util.stream.Collectors; import javax.ws.rs.ProcessingException; import javax.ws.rs.RuntimeType; @@ -165,6 +166,7 @@ public class JaxrsClientReactiveProcessor { private static final DotName BUFFER = DotName.createSimple(Buffer.class.getName()); private static final Set ASYNC_RETURN_TYPES = Set.of(COMPLETION_STAGE, UNI, MULTI); + public static final DotName BYTE = DotName.createSimple(Byte.class.getName()); @BuildStep void addFeature(BuildProducer features) { @@ -1070,12 +1072,14 @@ private ResultHandle createMultipartForm(MethodCreator methodCreator, ResultHand Type fieldType = field.type(); + BytecodeCreator ifValueNotNull = methodCreator.ifNotNull(fieldValue).trueBranch(); + switch (fieldType.kind()) { case CLASS: // we support string, and send it as an attribute ClassInfo fieldClass = index.getClassByName(fieldType.name()); if (DotNames.STRING.equals(fieldClass.name())) { - addString(methodCreator, multipartForm, formParamName, fieldValue); + addString(ifValueNotNull, multipartForm, formParamName, fieldValue); } else if (is(FILE, fieldClass, index)) { // file is sent as file :) if (partType == null) { @@ -1083,10 +1087,9 @@ private ResultHandle createMultipartForm(MethodCreator methodCreator, ResultHand "No @PartType annotation found on multipart form field of type File: " + formClass.name() + "." + field.name()); } - BytecodeCreator ifFileNotNull = methodCreator.ifNotNull(fieldValue).trueBranch(); - ResultHandle filePath = ifFileNotNull.invokeVirtualMethod( + ResultHandle filePath = ifValueNotNull.invokeVirtualMethod( MethodDescriptor.ofMethod(File.class, "toPath", Path.class), fieldValue); - addFile(ifFileNotNull, multipartForm, formParamName, partType, filePath); + addFile(ifValueNotNull, multipartForm, formParamName, partType, filePath); } else if (is(PATH, fieldClass, index)) { // and so is path if (partType == null) { @@ -1094,14 +1097,12 @@ private ResultHandle createMultipartForm(MethodCreator methodCreator, ResultHand "No @PartType annotation found on multipart form field of type Path: " + formClass.name() + "." + field.name()); } - BytecodeCreator ifPathNotNull = methodCreator.ifNotNull(fieldValue).trueBranch(); - addFile(ifPathNotNull, multipartForm, formParamName, partType, fieldValue); + addFile(ifValueNotNull, multipartForm, formParamName, partType, fieldValue); } else if (is(BUFFER, fieldClass, index)) { // and buffer - BytecodeCreator ifBufferNotNull = methodCreator.ifNotNull(fieldValue).trueBranch(); - addBuffer(ifBufferNotNull, multipartForm, formParamName, partType, fieldValue, field); + addBuffer(ifValueNotNull, multipartForm, formParamName, partType, fieldValue, field); } else { // assume POJO: - addPojo(methodCreator, multipartForm, formParamName, partType, fieldValue, field); + addPojo(ifValueNotNull, multipartForm, formParamName, partType, fieldValue, field); } break; case ARRAY: @@ -1112,22 +1113,30 @@ private ResultHandle createMultipartForm(MethodCreator methodCreator, ResultHand throw new IllegalArgumentException("Array of unsupported type: " + componentType.name() + " on " + formClassType.name() + "." + field.name()); } - BytecodeCreator ifArrayNotNull = methodCreator.ifNotNull(fieldValue).trueBranch(); - ResultHandle buffer = ifArrayNotNull.invokeStaticInterfaceMethod( + ResultHandle buffer = ifValueNotNull.invokeStaticInterfaceMethod( MethodDescriptor.ofMethod(Buffer.class, "buffer", Buffer.class, byte[].class), fieldValue); - addBuffer(ifArrayNotNull, multipartForm, formParamName, partType, buffer, field); + addBuffer(ifValueNotNull, multipartForm, formParamName, partType, buffer, field); break; case PRIMITIVE: // primitives are converted to text and sent as attribute - ResultHandle string = primitiveToString(methodCreator, fieldValue, field); - addString(methodCreator, multipartForm, formParamName, string); + ResultHandle string = primitiveToString(ifValueNotNull, fieldValue, field); + addString(ifValueNotNull, multipartForm, formParamName, string); break; + case PARAMETERIZED_TYPE: + ParameterizedType parameterizedType = fieldType.asParameterizedType(); + List args = parameterizedType.arguments(); + if (parameterizedType.name().equals(MULTI) && args.size() == 1 && args.get(0).name().equals(BYTE)) { + addMultiAsFile(ifValueNotNull, multipartForm, formParamName, partType, field, fieldValue); + break; + } + throw new IllegalArgumentException("Unsupported multipart form field type: " + parameterizedType + "<" + + args.stream().map(a -> a.name().toString()).collect(Collectors.joining(",")) + + "> in field class " + formClassType.name()); case VOID: case TYPE_VARIABLE: case UNRESOLVED_TYPE_VARIABLE: case WILDCARD_TYPE: - case PARAMETERIZED_TYPE: throw new IllegalArgumentException("Unsupported multipart form field type: " + fieldType + " in " + "field class " + formClassType.name()); } @@ -1136,7 +1145,7 @@ private ResultHandle createMultipartForm(MethodCreator methodCreator, ResultHand return multipartForm; } - private void addPojo(MethodCreator methodCreator, AssignableResultHandle multipartForm, String formParamName, + private void addPojo(BytecodeCreator methodCreator, AssignableResultHandle multipartForm, String formParamName, String partType, ResultHandle fieldValue, FieldInfo field) { methodCreator.assign(multipartForm, methodCreator.invokeVirtualMethod(MethodDescriptor.ofMethod(QuarkusMultipartForm.class, "entity", @@ -1177,7 +1186,7 @@ private void addFile(BytecodeCreator methodCreator, AssignableResultHandle multi } } - private ResultHandle primitiveToString(MethodCreator methodCreator, ResultHandle fieldValue, FieldInfo field) { + private ResultHandle primitiveToString(BytecodeCreator methodCreator, ResultHandle fieldValue, FieldInfo field) { PrimitiveType primitiveType = field.type().asPrimitiveType(); switch (primitiveType.primitive()) { case BOOLEAN: @@ -1204,7 +1213,7 @@ private ResultHandle primitiveToString(MethodCreator methodCreator, ResultHandle } } - private void addString(MethodCreator methodCreator, AssignableResultHandle multipartForm, String formParamName, + private void addString(BytecodeCreator methodCreator, AssignableResultHandle multipartForm, String formParamName, ResultHandle fieldValue) { methodCreator.assign(multipartForm, methodCreator.invokeVirtualMethod( @@ -1213,6 +1222,37 @@ private void addString(MethodCreator methodCreator, AssignableResultHandle multi multipartForm, methodCreator.load(formParamName), fieldValue)); } + private void addMultiAsFile(BytecodeCreator methodCreator, AssignableResultHandle multipartForm, String formParamName, + String partType, FieldInfo field, + ResultHandle multi) { + if (partType == null) { + throw new IllegalArgumentException( + "No @PartType annotation found on multipart form field " + + field.declaringClass().name() + "." + field.name()); + } + if (partType.equalsIgnoreCase(MediaType.APPLICATION_OCTET_STREAM)) { + methodCreator.assign(multipartForm, + // MultipartForm#binaryFileUpload(String name, String filename, Multi content, String mediaType); + // filename = name + methodCreator.invokeVirtualMethod( + MethodDescriptor.ofMethod(QuarkusMultipartForm.class, "multiAsBinaryFileUpload", + QuarkusMultipartForm.class, String.class, String.class, Multi.class, + String.class), + multipartForm, methodCreator.load(formParamName), methodCreator.load(formParamName), + multi, methodCreator.load(partType))); + } else { + methodCreator.assign(multipartForm, + // MultipartForm#multiAsTextFileUpload(String name, String filename, Multi content, String mediaType) + // filename = name + methodCreator.invokeVirtualMethod( + MethodDescriptor.ofMethod(QuarkusMultipartForm.class, "multiAsTextFileUpload", + QuarkusMultipartForm.class, String.class, String.class, Multi.class, + String.class), + multipartForm, methodCreator.load(formParamName), methodCreator.load(formParamName), + multi, methodCreator.load(partType))); + } + } + private void addBuffer(BytecodeCreator methodCreator, AssignableResultHandle multipartForm, String formParamName, String partType, ResultHandle buffer, FieldInfo field) { if (partType == null) { @@ -1222,7 +1262,7 @@ private void addBuffer(BytecodeCreator methodCreator, AssignableResultHandle mul } if (partType.equalsIgnoreCase(MediaType.APPLICATION_OCTET_STREAM)) { methodCreator.assign(multipartForm, - // MultipartForm#binaryFileUpload(String name, String filename, String pathname, String mediaType); + // MultipartForm#binaryFileUpload(String name, String filename, io.vertx.mutiny.core.buffer.Buffer content, String mediaType); // filename = name methodCreator.invokeVirtualMethod( MethodDescriptor.ofMethod(QuarkusMultipartForm.class, "binaryFileUpload", diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultiByteFileTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultiByteFileTest.java new file mode 100644 index 0000000000000..5d470e36c27dc --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultiByteFileTest.java @@ -0,0 +1,293 @@ +package io.quarkus.rest.client.reactive.multipart; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.File; +import java.io.FileInputStream; +import java.net.URI; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.FormParam; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.jboss.logging.Logger; +import org.jboss.resteasy.reactive.MultipartForm; +import org.jboss.resteasy.reactive.PartType; +import org.jboss.resteasy.reactive.multipart.FileUpload; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.MultiEmitter; +import io.vertx.core.Vertx; + +public class MultiByteFileTest { + public static final int BYTES_SENT = 5_000_000; // 5 megs + + private static final Logger log = Logger.getLogger(MultiByteFileTest.class); + + @TestHTTPResource + URI baseUri; + + @Inject + Vertx vertx; + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(Resource.class, FormData.class, Client.class, ClientForm.class)); + + @Test + void shouldUploadBiggishFile() { + Client client = RestClientBuilder.newBuilder().baseUri(baseUri).build(Client.class); + + ClientForm form = new ClientForm(); + AtomicLong i = new AtomicLong(); + form.file = Multi.createBy().repeating().supplier( + () -> (byte) ((i.getAndIncrement() + 1) % 123)).atMost(BYTES_SENT); + String result = client.postMultipart(form); + assertThat(result).isEqualTo("myFile"); + } + + @Test + void shouldUploadTwoSmallFiles() { + Client client = RestClientBuilder.newBuilder().baseUri(baseUri).build(Client.class); + + ClientFormWithTwoFiles form = new ClientFormWithTwoFiles(); + + form.file1 = Multi.createBy().repeating().supplier(() -> (byte) 4).atMost(100); + form.file2 = Multi.createBy().repeating().supplier(() -> (byte) 7).atMost(100); + + String result = client.postMultipartWithTwoFiles(form); + assertThat(result).isEqualTo("myFile1myFile2"); + } + + @Test + void shouldUploadSlowlyProducedData() { + Client client = RestClientBuilder.newBuilder().baseUri(baseUri).build(Client.class); + + ClientForm form = new ClientForm(); + AtomicLong i = new AtomicLong(); + form.file = Multi.createFrom().emitter(em -> delayedEmit(i, em)); + String result = client.postMultipartFromSlowMulti(form); + assertThat(result).isEqualTo("myFile"); + } + + @Test + void shouldWorkOnNullMulti() { + Client client = RestClientBuilder.newBuilder().baseUri(baseUri).build(Client.class); + + ClientForm form = new ClientForm(); + form.file = null; + form.param = "some-param"; + String result = client.postNull(form); + assertThat(result).isEqualTo("NULL_FILE"); + } + + @Test + void shouldWorkOnEmptyMulti() { + Client client = RestClientBuilder.newBuilder().baseUri(baseUri).build(Client.class); + + ClientForm form = new ClientForm(); + form.file = Multi.createFrom().items(); + String result = client.postEmpty(form); + assertThat(result).isEqualTo("myFile"); + } + + @Test + @Timeout(10) + void shouldBehaveWellWithErrorOnMulti() { + Client client = RestClientBuilder.newBuilder().baseUri(baseUri).build(Client.class); + + ClientForm form = new ClientForm(); + AtomicLong i = new AtomicLong(); + form.file = Multi.createBy().repeating().supplier( + () -> { + long iteration = i.getAndIncrement(); + if (iteration > BYTES_SENT / 2) { + throw new RuntimeException("forced"); + } + return (byte) ((iteration + 1) % 123); + }).atMost(BYTES_SENT); + assertThatThrownBy(() -> client.postMultipart(form)).isInstanceOf(Exception.class); + } + + @Test + void shouldSendFromMultiEmittingOutsideEventLoop() { + Client client = RestClientBuilder.newBuilder().baseUri(baseUri).build(Client.class); + + ClientForm form = new ClientForm(); + AtomicLong i = new AtomicLong(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + form.file = Multi.createBy().repeating().supplier( + () -> (byte) ((i.getAndIncrement() + 1) % 123)).atMost(BYTES_SENT) + .emitOn(executor); + String result = client.postMultipart(form); + assertThat(result).isEqualTo("myFile"); + } + + private void delayedEmit(AtomicLong i, MultiEmitter em) { + vertx.setTimer(100, id -> { + long index = i.getAndIncrement(); + if (index < 10) { + em.emit((byte) (12 + index)); // should emit bytes from 12 to 22 + delayedEmit(i, em); + } else { + em.complete(); + } + }); + } + + @Path("/multipart") + @ApplicationScoped + public static class Resource { + @Path("/from-slow") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + public String uploadFromSlow(@MultipartForm FormData form) { + return verifyFile(form.myFile, 10, position -> (byte) (12 + position)); + } + + @Path("/null") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + public String uploadNull(@MultipartForm FormDataWithFile form) { + return form.myFile != null ? "NON_NULL_FILE_FROM_NULL_MULTI" : "NULL_FILE"; + } + + @Path("/") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + public String upload(@MultipartForm FormData form) { + return verifyFile(form.myFile, BYTES_SENT, position -> (byte) (((1 + position) % 123))); + } + + @Path("/empty") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + public String uploadEmpty(@MultipartForm FormData form) { + return verifyFile(form.myFile, 0, b -> (byte) 0); + } + + @Path("/two-files") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + public String uploadEmpty(@MultipartForm FormWithTwoFiles form) { + return verifyFile(form.file1, 100, position -> (byte) 4) + + verifyFile(form.file2, 100, position -> (byte) 7); + } + + private String verifyFile(FileUpload upload, int expectedSize, Function expectedByte) { + var uploadedFile = upload.uploadedFile(); + int size; + + try (FileInputStream inputStream = new FileInputStream(uploadedFile.toFile())) { + int position = 0; + int b; + while ((b = inputStream.read()) != -1) { + long expected = expectedByte.apply(position); + position++; + if (expected != b) { + throw new RuntimeException( + "WRONG_BYTE_READ at pos " + (position - 1) + " expected: " + expected + " got: " + b); + } + } + size = position; + } catch (RuntimeException e) { + return e.getMessage(); + } catch (Exception e) { + log.error("Unexpected error in the test resource", e); + return "UNEXPECTED ERROR"; + } + + if (size != expectedSize) { + return "READ_WRONG_AMOUNT_OF_BYTES " + size; + } + return upload.fileName(); + } + } + + public static class FormData { + @FormParam("myFile") + public FileUpload myFile; + @FormParam("myParam") + @PartType(MediaType.TEXT_PLAIN) + public String myParam; + } + + public static class FormDataWithFile { + @FormParam("myFile") + public File myFile; + @FormParam("myParam") + @PartType(MediaType.TEXT_PLAIN) + public String myParam; + } + + public static class FormWithTwoFiles { + @FormParam("myFile1") + @PartType(MediaType.APPLICATION_OCTET_STREAM) + public FileUpload file1; + @FormParam("myFile2") + @PartType(MediaType.APPLICATION_OCTET_STREAM) + public FileUpload file2; + } + + @Path("/multipart") + public interface Client { + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + String postMultipart(@MultipartForm ClientForm clientForm); + + @Path("/two-files") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + String postMultipartWithTwoFiles(@MultipartForm ClientFormWithTwoFiles twoFiles); + + @Path("/from-slow") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + String postMultipartFromSlowMulti(@MultipartForm ClientForm clientForm); + + @Path("/null") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + String postNull(@MultipartForm ClientForm clientForm); + + @Path("/empty") + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + String postEmpty(@MultipartForm ClientForm form); + } + + public static class ClientForm { + + @FormParam("myParam") + @PartType(MediaType.TEXT_PLAIN) + public String param; + @FormParam("myFile") + @PartType(MediaType.APPLICATION_OCTET_STREAM) + public Multi file; + } + + public static class ClientFormWithTwoFiles { + @FormParam("myFile1") + @PartType(MediaType.APPLICATION_OCTET_STREAM) + public Multi file1; + @FormParam("myFile2") + @PartType(MediaType.APPLICATION_OCTET_STREAM) + public Multi file2; + } +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultiByteWithRemoteErrorTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultiByteWithRemoteErrorTest.java new file mode 100644 index 0000000000000..62ec116ddd5cb --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultiByteWithRemoteErrorTest.java @@ -0,0 +1,87 @@ +package io.quarkus.rest.client.reactive.multipart; + +import static org.awaitility.Awaitility.await; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.FormParam; +import javax.ws.rs.POST; +import javax.ws.rs.core.MediaType; + +import org.assertj.core.api.Assertions; +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.jboss.resteasy.reactive.MultipartForm; +import org.jboss.resteasy.reactive.PartType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Vertx; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; + +public class MultiByteWithRemoteErrorTest { + public static final int BYTES_SENT = 5_000_000; // 5 megs + + @Inject + Vertx vertx; + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(Client.class, Form.class)); + + /* + * try to send 5MB file, server closes the connection after 1MB + * verify that the client didn't hang and got some exception + */ + @Test + @Timeout(10) + void shouldFailGracefullyOnRemoteError() throws ExecutionException, InterruptedException { + NetServerOptions options = new NetServerOptions() + .setHost("localhost"); + AtomicInteger counter = new AtomicInteger(); + NetServer netServer = vertx.createNetServer(options).connectHandler( + socket -> socket.handler( + data -> { + if (counter.addAndGet(data.length()) > 1_000_000) { + socket.close(); + } + })); + + CompletableFuture port = new CompletableFuture<>(); + netServer.listen(server -> port.complete(server.result().actualPort())); + + await().atMost(Duration.ofSeconds(5)).until(port::isDone); + + String uri = String.format("http://localhost:%s", port.get()); + + Client client = RestClientBuilder.newBuilder() + .baseUri(URI.create(uri)) + .build(Client.class); + + Form form = new Form(); + form.file = Multi.createBy().repeating().supplier(() -> (byte) 13).atMost(BYTES_SENT); + Assertions.assertThatThrownBy(() -> client.post(form)).isInstanceOf(Exception.class); + } + + public interface Client { + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + String post(@MultipartForm Form clientForm); + } + + public static class Form { + @FormParam("file") + @PartType(MediaType.APPLICATION_OCTET_STREAM) + public Multi file; + } +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/MultipartFilenameTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultipartFilenameTest.java similarity index 88% rename from extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/MultipartFilenameTest.java rename to extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultipartFilenameTest.java index 2cbc100586460..c77b541c0d366 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/MultipartFilenameTest.java +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/multipart/MultipartFilenameTest.java @@ -1,4 +1,4 @@ -package io.quarkus.rest.client.reactive; +package io.quarkus.rest.client.reactive.multipart; import static org.assertj.core.api.Assertions.assertThat; @@ -30,9 +30,7 @@ public class MultipartFilenameTest { @RegisterExtension static final QuarkusUnitTest TEST = new QuarkusUnitTest() - .withApplicationRoot((jar) -> jar - .addClasses(Resource.class, FormData.class, Client.class, ClientForm.class)) - .withConfigurationResource("mp-global-scope-test-application.properties"); + .withApplicationRoot(jar -> jar.addClasses(Resource.class, FormData.class, Client.class, ClientForm.class)); @Test void shouldPassOriginalFileName() throws IOException { diff --git a/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/devconsole/RestClientsContainer.java b/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/devconsole/RestClientsContainer.java index dba40d767fe1e..982aabccd0d09 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/devconsole/RestClientsContainer.java +++ b/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/devconsole/RestClientsContainer.java @@ -68,7 +68,7 @@ public static class RestClientData { public RestClientData(List clients, List possibleClients) { this.clients = clients; - this.possibleClients = possibleClients; + this.possibleClients = possibleClients; // TODO: present this info } } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java index ba579b9bc3da5..7909b823f22e7 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java @@ -1,6 +1,5 @@ package org.jboss.resteasy.reactive.client.handlers; -import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; import io.smallrye.mutiny.Uni; import io.smallrye.stork.ServiceInstance; import io.smallrye.stork.Stork; @@ -36,6 +35,7 @@ import org.jboss.resteasy.reactive.client.api.QuarkusRestClientProperties; import org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl; import org.jboss.resteasy.reactive.client.impl.RestClientRequestContext; +import org.jboss.resteasy.reactive.client.impl.multipart.PausableHttpPostRequestEncoder; import org.jboss.resteasy.reactive.client.impl.multipart.QuarkusMultipartForm; import org.jboss.resteasy.reactive.client.impl.multipart.QuarkusMultipartFormUpload; import org.jboss.resteasy.reactive.client.spi.ClientRestHandler; @@ -285,13 +285,12 @@ private QuarkusMultipartFormUpload setMultipartHeadersAndPrepareBody(HttpClientR multipartForm.preparePojos(state); Object property = state.getConfiguration().getProperty(QuarkusRestClientProperties.MULTIPART_ENCODER_MODE); - HttpPostRequestEncoder.EncoderMode mode = HttpPostRequestEncoder.EncoderMode.RFC1738; + PausableHttpPostRequestEncoder.EncoderMode mode = PausableHttpPostRequestEncoder.EncoderMode.RFC1738; if (property != null) { - mode = (HttpPostRequestEncoder.EncoderMode) property; + mode = (PausableHttpPostRequestEncoder.EncoderMode) property; } QuarkusMultipartFormUpload multipartFormUpload = new QuarkusMultipartFormUpload(Vertx.currentContext(), multipartForm, - true, - mode); + true, mode); setEntityRelatedHeaders(headerMap, state.getEntity()); // multipart has its own headers: diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java new file mode 100644 index 0000000000000..a47dfa367f024 --- /dev/null +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/MultiByteHttpData.java @@ -0,0 +1,371 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.resteasy.reactive.client.impl.multipart; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.multipart.AbstractHttpData; +import io.netty.handler.codec.http.multipart.FileUpload; +import io.netty.handler.codec.http.multipart.InterfaceHttpData; +import io.netty.util.internal.ObjectUtil; +import io.smallrye.mutiny.Multi; +import io.vertx.core.Context; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.impl.VertxByteBufAllocator; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import org.jboss.logging.Logger; +import org.reactivestreams.Subscription; + +/** + * A FileUpload implementation that is responsible for sending Multi<Byte> as a file in a multipart message. + * It is meant to be used by the {@link PausableHttpPostRequestEncoder} + * + * When created, MultiByteHttpData will subscribe to the underlying Multi and request {@link MultiByteHttpData#BUFFER_SIZE} + * of bytes. + * + * Before reading the next chunk of data with {@link #getChunk(int)}, the post encoder checks if data {@link #isReady(int)} + * and if not, triggers {@link #suspend(int)}. That's because a chunk smaller than requested is treated as the end of input. + * Then, when the requested amount of bytes is ready, or the underlying Multi is completed, `resumption` is executed. + * + */ +public class MultiByteHttpData extends AbstractHttpData implements FileUpload { + private static final Logger log = Logger.getLogger(MultiByteHttpData.class); + + public static final int DEFAULT_BUFFER_SIZE = 16384; + private static final int BUFFER_SIZE; + + private Subscription subscription; + private String filename; + + private String contentType; + + private String contentTransferEncoding; + + // TODO: replace with a simple array based? + // TODO: we do `discardReadBytes` on it which is not optimal - copies array every time + private final ByteBuf buffer = VertxByteBufAllocator.DEFAULT.heapBuffer(BUFFER_SIZE, BUFFER_SIZE); + + private final Context context; + + private volatile boolean done = false; + + private boolean paused = false; + private int awaitedBytes; + + static { + BUFFER_SIZE = Integer + .parseInt(System.getProperty("quarkus.rest.client.multipart-buffer-size", String.valueOf(DEFAULT_BUFFER_SIZE))); + if (BUFFER_SIZE < DEFAULT_BUFFER_SIZE) { + throw new IllegalStateException( + "quarkus.rest.client.multipart-buffer-size cannot be lower than " + DEFAULT_BUFFER_SIZE); + } + } + + /** + * + * @param name name of the parameter + * @param filename file name + * @param contentType content type + * @param contentTransferEncoding "binary" for sending binary files + * @param charset the charset + * @param content the Multi to send + * @param errorHandler error handler invoked when the Multi emits an exception + * @param context Vertx context on which the data is sent + * @param resumption the action to execute when the requested amount of bytes is ready, or the Multi is completed + */ + public MultiByteHttpData(String name, String filename, String contentType, + String contentTransferEncoding, Charset charset, Multi content, + Consumer errorHandler, Context context, Runnable resumption) { + super(name, charset, 0); + this.context = context; + setFilename(filename); + setContentType(contentType); + setContentTransferEncoding(contentTransferEncoding); + + var contextualExecutor = new ExecutorWithContext(context); + content.emitOn(contextualExecutor).runSubscriptionOn(contextualExecutor).subscribe().with( + subscription -> { + MultiByteHttpData.this.subscription = subscription; + subscription.request(BUFFER_SIZE); + }, + b -> { + buffer.writeByte(b); + if (paused && (done || buffer.readableBytes() >= awaitedBytes)) { + paused = false; + awaitedBytes = 0; + resumption.run(); + } + }, + th -> { + log.error("Multi used to send a multipart message failed", th); + done = true; + errorHandler.accept(th); + }, + () -> { + done = true; + if (paused) { + paused = false; + resumption.run(); + } + }); + } + + void suspend(int awaitedBytes) { + this.awaitedBytes = awaitedBytes; + this.paused = true; + } + + @Override + public void setContent(ByteBuf buffer) throws IOException { + throw new IllegalStateException("setting content of MultiByteHttpData is not supported"); + } + + @Override + public void addContent(ByteBuf buffer, boolean last) throws IOException { + throw new IllegalStateException("adding content to MultiByteHttpData is not supported"); + } + + @Override + public void setContent(File file) throws IOException { + throw new IllegalStateException("setting content of MultiByteHttpData is not supported"); + } + + @Override + public void setContent(InputStream inputStream) throws IOException { + throw new IllegalStateException("setting content of MultiByteHttpData is not supported"); + } + + @Override + public void delete() { + // do nothing + } + + @Override + public byte[] get() throws IOException { + throw new IllegalStateException("getting all the contents of a MultiByteHttpData is not supported"); + } + + @Override + public ByteBuf getByteBuf() { + throw new IllegalStateException("getting all the contents of a MultiByteHttpData is not supported"); + } + + /** + * check if it is possible to read the next chunk of data of a given size + * + * @param chunkSize amount of bytes + * @return true if the requested amount of bytes is ready to be read or the Multi is completed, i.e. there will be + * no more bytes to read + */ + public boolean isReady(int chunkSize) { + return done || buffer.readableBytes() >= chunkSize; + } + + /** + * {@inheritDoc} + *
+ * NOTE: should only be invoked when {@link #isReady(int)} returns true + * + * @param toRead amount of bytes to read + * @return ByteBuf with the requested bytes + */ + @Override + public ByteBuf getChunk(int toRead) { + if (Vertx.currentContext() != context) { + throw new IllegalStateException("MultiByteHttpData invoked on an invalid context : " + Vertx.currentContext() + + ", thread: " + Thread.currentThread()); + } + if (buffer.readableBytes() == 0 && done) { + return Unpooled.EMPTY_BUFFER; + } + + ByteBuf result = VertxByteBufAllocator.DEFAULT.heapBuffer(toRead, toRead); + + // finish if the whole buffer is filled + // or we hit the end, `done` && buffer.readableBytes == 0 + while (toRead > 0 && !(buffer.readableBytes() == 0 && done)) { + int readBytes = Math.min(buffer.readableBytes(), toRead); + result.writeBytes(buffer.readBytes(readBytes)); + buffer.discardReadBytes(); + subscription.request(readBytes); + + toRead -= readBytes; + } + return result; + } + + @Override + public String getString() { + throw new IllegalStateException("Reading MultiByteHttpData as String is not supported"); + } + + @Override + public String getString(Charset encoding) { + throw new IllegalStateException("Reading MultiByteHttpData as String is not supported"); + } + + @Override + public boolean renameTo(File dest) { + throw new IllegalStateException("Renaming destination file for MultiByteHttpData is not supported"); + } + + @Override + public boolean isInMemory() { + return true; + } + + @Override + public File getFile() { + return null; + } + + @Override + public FileUpload copy() { + throw new IllegalStateException("Copying MultiByteHttpData is not supported"); + } + + @Override + public FileUpload duplicate() { + throw new IllegalStateException("Duplicating MultiByteHttpData is not supported"); + } + + @Override + public FileUpload retainedDuplicate() { + throw new IllegalStateException("Duplicating MultiByteHttpData is not supported"); + } + + @Override + public FileUpload replace(ByteBuf content) { + throw new IllegalStateException("Replacing MultiByteHttpData is not supported"); + } + + @Override + public FileUpload retain(int increment) { + super.retain(increment); + return this; + } + + @Override + public FileUpload retain() { + super.retain(); + return this; + } + + @Override + public FileUpload touch() { + touch(null); + return this; + } + + @Override + public FileUpload touch(Object hint) { + buffer.touch(hint); + return this; + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object o) { + return System.identityHashCode(this) == System.identityHashCode(o); + } + + @Override + public int compareTo(InterfaceHttpData o) { + if (!(o instanceof MultiByteHttpData)) { + throw new ClassCastException("Cannot compare " + getHttpDataType() + + " with " + o.getHttpDataType()); + } + return compareTo((MultiByteHttpData) o); + } + + public int compareTo(MultiByteHttpData o) { + return Integer.compare(System.identityHashCode(this), System.identityHashCode(o)); + } + + @Override + public HttpDataType getHttpDataType() { + return HttpDataType.FileUpload; + } + + @Override + public String getFilename() { + return filename; + } + + @Override + public void setFilename(String filename) { + this.filename = ObjectUtil.checkNotNull(filename, "filename"); + } + + @Override + public void setContentType(String contentType) { + this.contentType = ObjectUtil.checkNotNull(contentType, "contentType"); + } + + @Override + public String getContentType() { + return contentType; + } + + @Override + public String getContentTransferEncoding() { + return contentTransferEncoding; + } + + @Override + public void setContentTransferEncoding(String contentTransferEncoding) { + this.contentTransferEncoding = contentTransferEncoding; + } + + @Override + public String toString() { + return HttpHeaderNames.CONTENT_DISPOSITION + ": " + + HttpHeaderValues.FORM_DATA + "; " + HttpHeaderValues.NAME + "=\"" + getName() + + "\"; " + HttpHeaderValues.FILENAME + "=\"" + filename + "\"\r\n" + + HttpHeaderNames.CONTENT_TYPE + ": " + contentType + + (getCharset() != null ? "; " + HttpHeaderValues.CHARSET + '=' + getCharset().name() + "\r\n" : "\r\n") + + HttpHeaderNames.CONTENT_LENGTH + ": " + length() + "\r\n" + + "Completed: " + isCompleted(); + } + + static class ExecutorWithContext implements Executor { + Context context; + + public ExecutorWithContext(Context context) { + this.context = context; + } + + @Override + public void execute(Runnable command) { + if (Vertx.currentContext() == context) { + command.run(); + } else { + context.runOnContext(v -> command.run()); + } + } + } +} diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/PausableHttpPostRequestEncoder.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/PausableHttpPostRequestEncoder.java new file mode 100644 index 0000000000000..a98522fc97d51 --- /dev/null +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/PausableHttpPostRequestEncoder.java @@ -0,0 +1,1423 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.resteasy.reactive.client.impl.multipart; + +import static io.netty.buffer.Unpooled.wrappedBuffer; +import static io.netty.util.internal.ObjectUtil.checkNotNull; +import static java.util.AbstractMap.SimpleImmutableEntry; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.EmptyHttpHeaders; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.multipart.Attribute; +import io.netty.handler.codec.http.multipart.FileUpload; +import io.netty.handler.codec.http.multipart.HttpData; +import io.netty.handler.codec.http.multipart.HttpDataFactory; +import io.netty.handler.codec.http.multipart.InterfaceHttpData; +import io.netty.handler.stream.ChunkedInput; +import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * A copy of {@link io.netty.handler.codec.http.multipart.HttpPostRequestEncoder} that supports pause/resume + * + * This encoder will help to encode Request for a FORM as POST. + * + *

+ * According to RFC 7231, POST, PUT and OPTIONS allow to have a body. + * This encoder will support widely all methods except TRACE since the RFC notes + * for GET, DELETE, HEAD and CONNECT: (replaces XXX by one of these methods) + *

+ *

+ * "A payload within a XXX request message has no defined semantics; + * sending a payload body on a XXX request might cause some existing + * implementations to reject the request." + *

+ *

+ * On the contrary, for TRACE method, RFC says: + *

+ *

+ * "A client MUST NOT send a message body in a TRACE request." + *

+ */ +public class PausableHttpPostRequestEncoder implements ChunkedInput { + + public static final HttpContent WAIT_MARKER = new HttpContent() { + @Override + public HttpContent copy() { + return null; + } + + @Override + public HttpContent duplicate() { + return null; + } + + @Override + public HttpContent retainedDuplicate() { + return null; + } + + @Override + public HttpContent replace(ByteBuf content) { + return null; + } + + @Override + public HttpContent retain() { + return null; + } + + @Override + public HttpContent retain(int increment) { + return null; + } + + @Override + public HttpContent touch() { + return null; + } + + @Override + public HttpContent touch(Object hint) { + return null; + } + + @Override + public ByteBuf content() { + return null; + } + + @Override + public DecoderResult getDecoderResult() { + return null; + } + + @Override + public DecoderResult decoderResult() { + return null; + } + + @Override + public void setDecoderResult(DecoderResult decoderResult) { + + } + + @Override + public int refCnt() { + return 0; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int decrement) { + return false; + } + }; + + /** + * Different modes to use to encode form data. + */ + public enum EncoderMode { + /** + * Legacy mode which should work for most. It is known to not work with OAUTH. For OAUTH use + * {@link EncoderMode#RFC3986}. The W3C form recommendations this for submitting post form data. + */ + RFC1738, + + /** + * Mode which is more new and is used for OAUTH + */ + RFC3986, + + /** + * The HTML5 spec disallows mixed mode in multipart/form-data + * requests. More concretely this means that more files submitted + * under the same name will not be encoded using mixed mode, but + * will be treated as distinct fields. + * + * Reference: + * https://www.w3.org/TR/html5/forms.html#multipart-form-data + */ + HTML5 + } + + @SuppressWarnings("rawtypes") + private static final Map.Entry[] percentEncodings; + + static { + percentEncodings = new Map.Entry[] { + new SimpleImmutableEntry<>(Pattern.compile("\\*"), "%2A"), + new SimpleImmutableEntry<>(Pattern.compile("\\+"), "%20"), + new SimpleImmutableEntry<>(Pattern.compile("~"), "%7E") + }; + } + + /** + * Factory used to create InterfaceHttpData + */ + private final HttpDataFactory factory; + + /** + * Request to encode + */ + private final HttpRequest request; + + /** + * Default charset to use + */ + private final Charset charset; + + /** + * Chunked false by default + */ + private boolean isChunked; + + /** + * InterfaceHttpData for Body (without encoding) + */ + private final List bodyListDatas; + + /** + * The final Multipart List of InterfaceHttpData including encoding + */ + final List multipartHttpDatas; + /** + * Does this request is a Multipart request + */ + private final boolean isMultipart; + + /** + * If multipart, this is the boundary for the flobal multipart + */ + String multipartDataBoundary; + + /** + * If multipart, there could be internal multiparts (mixed) to the global multipart. Only one level is allowed. + */ + String multipartMixedBoundary; + + /** + * To check if the header has been finalized + */ + private boolean headerFinalized; + private final EncoderMode encoderMode; + + /** + * + * @param factory + * the factory used to create InterfaceHttpData + * @param request + * the request to encode + * @param multipart + * True if the FORM is a ENCTYPE="multipart/form-data" + * @param charset + * the charset to use as default + * @param encoderMode + * the mode for the encoder to use. See {@link EncoderMode} for the details. + * @throws NullPointerException + * for request or charset or factory + * @throws ErrorDataEncoderException + * if the request is a TRACE + */ + public PausableHttpPostRequestEncoder( + HttpDataFactory factory, HttpRequest request, boolean multipart, Charset charset, + EncoderMode encoderMode) + throws ErrorDataEncoderException { + this.request = checkNotNull(request, "request"); + this.charset = checkNotNull(charset, "charset"); + this.factory = checkNotNull(factory, "factory"); + if (HttpMethod.TRACE.equals(request.method())) { + throw new ErrorDataEncoderException("Cannot create a Encoder if request is a TRACE"); + } + // Fill default values + bodyListDatas = new ArrayList<>(); + // default mode + isLastChunk = false; + isLastChunkSent = false; + isMultipart = multipart; + multipartHttpDatas = new ArrayList<>(); + this.encoderMode = encoderMode; + if (isMultipart) { + initDataMultipart(); + } + } + + /** + * Clean all HttpDatas (on Disk) for the current request. + */ + public void cleanFiles() { + factory.cleanRequestHttpData(request); + } + + /** + * Does the last non empty chunk already encoded so that next chunk will be empty (last chunk) + */ + private boolean isLastChunk; + /** + * Last chunk already sent + */ + private boolean isLastChunkSent; + /** + * The current FileUpload that is currently in encode process + */ + private FileUpload currentFileUpload; + /** + * While adding a FileUpload, is the multipart currently in Mixed Mode + */ + private boolean duringMixedMode; + /** + * Global Body size + */ + private long globalBodySize; + /** + * Global Transfer progress + */ + private long globalProgress; + + /** + * True if this request is a Multipart request + * + * @return True if this request is a Multipart request + */ + public boolean isMultipart() { + return isMultipart; + } + + /** + * Init the delimiter for Global Part (Data). + */ + private void initDataMultipart() { + multipartDataBoundary = getNewMultipartDelimiter(); + } + + /** + * Init the delimiter for Mixed Part (Mixed). + */ + private void initMixedMultipart() { + multipartMixedBoundary = getNewMultipartDelimiter(); + } + + /** + * + * @return a newly generated Delimiter (either for DATA or MIXED) + */ + private static String getNewMultipartDelimiter() { + // construct a generated delimiter + return Long.toHexString(PlatformDependent.threadLocalRandom().nextLong()); + } + + /** + * This getMethod returns a List of all InterfaceHttpData from body part.
+ * + * @return the list of InterfaceHttpData from Body part + */ + public List getBodyListAttributes() { + return bodyListDatas; + } + + /** + * Set the Body HttpDatas list + * + * @throws NullPointerException + * for datas + * @throws ErrorDataEncoderException + * if the encoding is in error or if the finalize were already done + */ + public void setBodyHttpDatas(List datas) throws ErrorDataEncoderException { + ObjectUtil.checkNotNull(datas, "datas"); + globalBodySize = 0; + bodyListDatas.clear(); + currentFileUpload = null; + duringMixedMode = false; + multipartHttpDatas.clear(); + for (InterfaceHttpData data : datas) { + addBodyHttpData(data); + } + } + + /** + * Add a simple attribute in the body as Name=Value + * + * @param name + * name of the parameter + * @param value + * the value of the parameter + * @throws NullPointerException + * for name + * @throws ErrorDataEncoderException + * if the encoding is in error or if the finalize were already done + */ + public void addBodyAttribute(String name, String value) throws ErrorDataEncoderException { + String svalue = value != null ? value : StringUtil.EMPTY_STRING; + Attribute data = factory.createAttribute(request, checkNotNull(name, "name"), svalue); + addBodyHttpData(data); + } + + /** + * Add a file as a FileUpload + * + * @param name + * the name of the parameter + * @param file + * the file to be uploaded (if not Multipart mode, only the filename will be included) + * @param contentType + * the associated contentType for the File + * @param isText + * True if this file should be transmitted in Text format (else binary) + * @throws NullPointerException + * for name and file + * @throws ErrorDataEncoderException + * if the encoding is in error or if the finalize were already done + */ + public void addBodyFileUpload(String name, File file, String contentType, boolean isText) + throws ErrorDataEncoderException { + addBodyFileUpload(name, file.getName(), file, contentType, isText); + } + + /** + * Add a file as a FileUpload + * + * @param name + * the name of the parameter + * @param file + * the file to be uploaded (if not Multipart mode, only the filename will be included) + * @param filename + * the filename to use for this File part, empty String will be ignored by + * the encoder + * @param contentType + * the associated contentType for the File + * @param isText + * True if this file should be transmitted in Text format (else binary) + * @throws NullPointerException + * for name and file + * @throws ErrorDataEncoderException + * if the encoding is in error or if the finalize were already done + */ + public void addBodyFileUpload(String name, String filename, File file, String contentType, boolean isText) + throws ErrorDataEncoderException { + checkNotNull(name, "name"); + checkNotNull(file, "file"); + if (filename == null) { + filename = StringUtil.EMPTY_STRING; + } + String scontentType = contentType; + String contentTransferEncoding = null; + if (contentType == null) { + if (isText) { + scontentType = QuarkusHttpPostBodyUtil.DEFAULT_TEXT_CONTENT_TYPE; + } else { + scontentType = QuarkusHttpPostBodyUtil.DEFAULT_BINARY_CONTENT_TYPE; + } + } + if (!isText) { + contentTransferEncoding = QuarkusHttpPostBodyUtil.TransferEncodingMechanism.BINARY.value(); + } + FileUpload fileUpload = factory.createFileUpload(request, name, filename, scontentType, + contentTransferEncoding, null, file.length()); + try { + fileUpload.setContent(file); + } catch (IOException e) { + throw new ErrorDataEncoderException(e); + } + addBodyHttpData(fileUpload); + } + + /** + * Add a series of Files associated with one File parameter + * + * @param name + * the name of the parameter + * @param file + * the array of files + * @param contentType + * the array of content Types associated with each file + * @param isText + * the array of isText attribute (False meaning binary mode) for each file + * @throws IllegalArgumentException + * also throws if array have different sizes + * @throws ErrorDataEncoderException + * if the encoding is in error or if the finalize were already done + */ + public void addBodyFileUploads(String name, File[] file, String[] contentType, boolean[] isText) + throws ErrorDataEncoderException { + if (file.length != contentType.length && file.length != isText.length) { + throw new IllegalArgumentException("Different array length"); + } + for (int i = 0; i < file.length; i++) { + addBodyFileUpload(name, file[i], contentType[i], isText[i]); + } + } + + /** + * Add the InterfaceHttpData to the Body list + * + * @throws NullPointerException + * for data + * @throws ErrorDataEncoderException + * if the encoding is in error or if the finalize were already done + */ + public void addBodyHttpData(InterfaceHttpData data) throws ErrorDataEncoderException { + if (headerFinalized) { + throw new ErrorDataEncoderException("Cannot add value once finalized"); + } + bodyListDatas.add(checkNotNull(data, "data")); + if (!isMultipart) { + if (data instanceof Attribute) { + Attribute attribute = (Attribute) data; + try { + // name=value& with encoded name and attribute + String key = encodeAttribute(attribute.getName(), charset); + String value = encodeAttribute(attribute.getValue(), charset); + Attribute newattribute = factory.createAttribute(request, key, value); + multipartHttpDatas.add(newattribute); + globalBodySize += newattribute.getName().length() + 1 + newattribute.length() + 1; + } catch (IOException e) { + throw new ErrorDataEncoderException(e); + } + } else if (data instanceof FileUpload) { + // since not Multipart, only name=filename => Attribute + FileUpload fileUpload = (FileUpload) data; + // name=filename& with encoded name and filename + String key = encodeAttribute(fileUpload.getName(), charset); + String value = encodeAttribute(fileUpload.getFilename(), charset); + Attribute newattribute = factory.createAttribute(request, key, value); + multipartHttpDatas.add(newattribute); + globalBodySize += newattribute.getName().length() + 1 + newattribute.length() + 1; + } + return; + } + /* + * Logic: + * if not Attribute: + * add Data to body list + * if (duringMixedMode) + * add endmixedmultipart delimiter + * currentFileUpload = null + * duringMixedMode = false; + * add multipart delimiter, multipart body header and Data to multipart list + * reset currentFileUpload, duringMixedMode + * if FileUpload: take care of multiple file for one field => mixed mode + * if (duringMixedMode) + * if (currentFileUpload.name == data.name) + * add mixedmultipart delimiter, mixedmultipart body header and Data to multipart list + * else + * add endmixedmultipart delimiter, multipart body header and Data to multipart list + * currentFileUpload = data + * duringMixedMode = false; + * else + * if (currentFileUpload.name == data.name) + * change multipart body header of previous file into multipart list to + * mixedmultipart start, mixedmultipart body header + * add mixedmultipart delimiter, mixedmultipart body header and Data to multipart list + * duringMixedMode = true + * else + * add multipart delimiter, multipart body header and Data to multipart list + * currentFileUpload = data + * duringMixedMode = false; + * Do not add last delimiter! Could be: + * if duringmixedmode: endmixedmultipart + endmultipart + * else only endmultipart + */ + if (data instanceof Attribute) { + if (duringMixedMode) { + QuarkusInternalAttribute internal = new QuarkusInternalAttribute(charset); + internal.addValue("\r\n--" + multipartMixedBoundary + "--"); + multipartHttpDatas.add(internal); + multipartMixedBoundary = null; + currentFileUpload = null; + duringMixedMode = false; + } + QuarkusInternalAttribute internal = new QuarkusInternalAttribute(charset); + if (!multipartHttpDatas.isEmpty()) { + // previously a data field so CRLF + internal.addValue("\r\n"); + } + internal.addValue("--" + multipartDataBoundary + "\r\n"); + // content-disposition: form-data; name="field1" + Attribute attribute = (Attribute) data; + internal.addValue(HttpHeaderNames.CONTENT_DISPOSITION + ": " + HttpHeaderValues.FORM_DATA + "; " + + HttpHeaderValues.NAME + "=\"" + attribute.getName() + "\"\r\n"); + // Add Content-Length: xxx + internal.addValue(HttpHeaderNames.CONTENT_LENGTH + ": " + + attribute.length() + "\r\n"); + Charset localcharset = attribute.getCharset(); + if (localcharset != null) { + // Content-Type: text/plain; charset=charset + internal.addValue(HttpHeaderNames.CONTENT_TYPE + ": " + + QuarkusHttpPostBodyUtil.DEFAULT_TEXT_CONTENT_TYPE + "; " + + HttpHeaderValues.CHARSET + '=' + + localcharset.name() + "\r\n"); + } + // CRLF between body header and data + internal.addValue("\r\n"); + multipartHttpDatas.add(internal); + multipartHttpDatas.add(data); + globalBodySize += attribute.length() + internal.size(); + } else if (data instanceof FileUpload) { + FileUpload fileUpload = (FileUpload) data; + QuarkusInternalAttribute internal = new QuarkusInternalAttribute(charset); + if (!multipartHttpDatas.isEmpty()) { + // previously a data field so CRLF + internal.addValue("\r\n"); + } + boolean localMixed; + if (duringMixedMode) { + if (currentFileUpload != null && currentFileUpload.getName().equals(fileUpload.getName())) { + // continue a mixed mode + + localMixed = true; + } else { + // end a mixed mode + + // add endmixedmultipart delimiter, multipart body header + // and + // Data to multipart list + internal.addValue("--" + multipartMixedBoundary + "--"); + multipartHttpDatas.add(internal); + multipartMixedBoundary = null; + // start a new one (could be replaced if mixed start again + // from here + internal = new QuarkusInternalAttribute(charset); + internal.addValue("\r\n"); + localMixed = false; + // new currentFileUpload and no more in Mixed mode + currentFileUpload = fileUpload; + duringMixedMode = false; + } + } else { + if (encoderMode != EncoderMode.HTML5 && currentFileUpload != null + && currentFileUpload.getName().equals(fileUpload.getName())) { + // create a new mixed mode (from previous file) + + // change multipart body header of previous file into + // multipart list to + // mixedmultipart start, mixedmultipart body header + + // change Internal (size()-2 position in multipartHttpDatas) + // from (line starting with *) + // --AaB03x + // * Content-Disposition: form-data; name="files"; + // filename="file1.txt" + // Content-Type: text/plain + // to (lines starting with *) + // --AaB03x + // * Content-Disposition: form-data; name="files" + // * Content-Type: multipart/mixed; boundary=BbC04y + // * + // * --BbC04y + // * Content-Disposition: attachment; filename="file1.txt" + // Content-Type: text/plain + initMixedMultipart(); + QuarkusInternalAttribute pastAttribute = (QuarkusInternalAttribute) multipartHttpDatas + .get(multipartHttpDatas + .size() - 2); + // remove past size + globalBodySize -= pastAttribute.size(); + StringBuilder replacement = new StringBuilder( + 139 + multipartDataBoundary.length() + multipartMixedBoundary.length() * 2 + + fileUpload.getFilename().length() + fileUpload.getName().length()) + + .append("--") + .append(multipartDataBoundary) + .append("\r\n") + + .append(HttpHeaderNames.CONTENT_DISPOSITION) + .append(": ") + .append(HttpHeaderValues.FORM_DATA) + .append("; ") + .append(HttpHeaderValues.NAME) + .append("=\"") + .append(fileUpload.getName()) + .append("\"\r\n") + + .append(HttpHeaderNames.CONTENT_TYPE) + .append(": ") + .append(HttpHeaderValues.MULTIPART_MIXED) + .append("; ") + .append(HttpHeaderValues.BOUNDARY) + .append('=') + .append(multipartMixedBoundary) + .append("\r\n\r\n") + + .append("--") + .append(multipartMixedBoundary) + .append("\r\n") + + .append(HttpHeaderNames.CONTENT_DISPOSITION) + .append(": ") + .append(HttpHeaderValues.ATTACHMENT); + + if (!fileUpload.getFilename().isEmpty()) { + replacement.append("; ") + .append(HttpHeaderValues.FILENAME) + .append("=\"") + .append(currentFileUpload.getFilename()) + .append('"'); + } + + replacement.append("\r\n"); + + pastAttribute.setValue(replacement.toString(), 1); + pastAttribute.setValue("", 2); + + // update past size + globalBodySize += pastAttribute.size(); + + // now continue + // add mixedmultipart delimiter, mixedmultipart body header + // and + // Data to multipart list + localMixed = true; + duringMixedMode = true; + } else { + // a simple new multipart + // add multipart delimiter, multipart body header and Data + // to multipart list + localMixed = false; + currentFileUpload = fileUpload; + duringMixedMode = false; + } + } + + if (localMixed) { + // add mixedmultipart delimiter, mixedmultipart body header and + // Data to multipart list + internal.addValue("--" + multipartMixedBoundary + "\r\n"); + + if (fileUpload.getFilename().isEmpty()) { + // Content-Disposition: attachment + internal.addValue(HttpHeaderNames.CONTENT_DISPOSITION + ": " + + HttpHeaderValues.ATTACHMENT + "\r\n"); + } else { + // Content-Disposition: attachment; filename="file1.txt" + internal.addValue(HttpHeaderNames.CONTENT_DISPOSITION + ": " + + HttpHeaderValues.ATTACHMENT + "; " + + HttpHeaderValues.FILENAME + "=\"" + fileUpload.getFilename() + "\"\r\n"); + } + } else { + internal.addValue("--" + multipartDataBoundary + "\r\n"); + + if (fileUpload.getFilename().isEmpty()) { + // Content-Disposition: form-data; name="files"; + internal.addValue(HttpHeaderNames.CONTENT_DISPOSITION + ": " + HttpHeaderValues.FORM_DATA + "; " + + HttpHeaderValues.NAME + "=\"" + fileUpload.getName() + "\"\r\n"); + } else { + // Content-Disposition: form-data; name="files"; + // filename="file1.txt" + internal.addValue(HttpHeaderNames.CONTENT_DISPOSITION + ": " + HttpHeaderValues.FORM_DATA + "; " + + HttpHeaderValues.NAME + "=\"" + fileUpload.getName() + "\"; " + + HttpHeaderValues.FILENAME + "=\"" + fileUpload.getFilename() + "\"\r\n"); + } + } + // Add Content-Length: xxx + internal.addValue(HttpHeaderNames.CONTENT_LENGTH + ": " + + fileUpload.length() + "\r\n"); + // Content-Type: image/gif + // Content-Type: text/plain; charset=ISO-8859-1 + // Content-Transfer-Encoding: binary + internal.addValue(HttpHeaderNames.CONTENT_TYPE + ": " + fileUpload.getContentType()); + String contentTransferEncoding = fileUpload.getContentTransferEncoding(); + if (contentTransferEncoding != null + && contentTransferEncoding.equals(QuarkusHttpPostBodyUtil.TransferEncodingMechanism.BINARY.value())) { + internal.addValue("\r\n" + HttpHeaderNames.CONTENT_TRANSFER_ENCODING + ": " + + QuarkusHttpPostBodyUtil.TransferEncodingMechanism.BINARY.value() + "\r\n\r\n"); + } else if (fileUpload.getCharset() != null) { + internal.addValue("; " + HttpHeaderValues.CHARSET + '=' + fileUpload.getCharset().name() + "\r\n\r\n"); + } else { + internal.addValue("\r\n\r\n"); + } + multipartHttpDatas.add(internal); + multipartHttpDatas.add(data); + globalBodySize += fileUpload.length() + internal.size(); + } + } + + /** + * Iterator to be used when encoding will be called chunk after chunk + */ + private ListIterator iterator; + + /** + * Finalize the request by preparing the Header in the request and returns the request ready to be sent.
+ * Once finalized, no data must be added.
+ * If the request does not need chunk (isChunked() == false), this request is the only object to send to the remote + * server. + * + * @return the request object (chunked or not according to size of body) + * @throws ErrorDataEncoderException + * if the encoding is in error or if the finalize were already done + */ + public HttpRequest finalizeRequest() throws ErrorDataEncoderException { + // Finalize the multipartHttpDatas + if (!headerFinalized) { + if (isMultipart) { + QuarkusInternalAttribute internal = new QuarkusInternalAttribute(charset); + if (duringMixedMode) { + internal.addValue("\r\n--" + multipartMixedBoundary + "--"); + } + internal.addValue("\r\n--" + multipartDataBoundary + "--\r\n"); + multipartHttpDatas.add(internal); + multipartMixedBoundary = null; + currentFileUpload = null; + duringMixedMode = false; + globalBodySize += internal.size(); + } + headerFinalized = true; + } else { + throw new ErrorDataEncoderException("Header already encoded"); + } + + HttpHeaders headers = request.headers(); + List contentTypes = headers.getAll(HttpHeaderNames.CONTENT_TYPE); + List transferEncoding = headers.getAll(HttpHeaderNames.TRANSFER_ENCODING); + if (contentTypes != null) { + headers.remove(HttpHeaderNames.CONTENT_TYPE); + for (String contentType : contentTypes) { + // "multipart/form-data; boundary=--89421926422648" + String lowercased = contentType.toLowerCase(); + if (lowercased.startsWith(HttpHeaderValues.MULTIPART_FORM_DATA.toString()) || + lowercased.startsWith(HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString())) { + // ignore + } else { + headers.add(HttpHeaderNames.CONTENT_TYPE, contentType); + } + } + } + if (isMultipart) { + String value = HttpHeaderValues.MULTIPART_FORM_DATA + "; " + HttpHeaderValues.BOUNDARY + '=' + + multipartDataBoundary; + headers.add(HttpHeaderNames.CONTENT_TYPE, value); + } else { + // Not multipart + headers.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED); + } + // Now consider size for chunk or not + long realSize = globalBodySize; + if (!isMultipart) { + realSize -= 1; // last '&' removed + } + iterator = multipartHttpDatas.listIterator(); + + headers.set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(realSize)); + if (realSize > QuarkusHttpPostBodyUtil.chunkSize || isMultipart) { + isChunked = true; + if (transferEncoding != null) { + headers.remove(HttpHeaderNames.TRANSFER_ENCODING); + for (CharSequence v : transferEncoding) { + if (HttpHeaderValues.CHUNKED.contentEqualsIgnoreCase(v)) { + // ignore + } else { + headers.add(HttpHeaderNames.TRANSFER_ENCODING, v); + } + } + } + HttpUtil.setTransferEncodingChunked(request, true); + + // wrap to hide the possible content + return new WrappedHttpRequest(request); + } else { + // get the only one body and set it to the request + HttpContent chunk = nextChunk(); + if (request instanceof FullHttpRequest) { + FullHttpRequest fullRequest = (FullHttpRequest) request; + ByteBuf chunkContent = chunk.content(); + if (fullRequest.content() != chunkContent) { + fullRequest.content().clear().writeBytes(chunkContent); + chunkContent.release(); + } + return fullRequest; + } else { + return new WrappedFullHttpRequest(request, chunk); + } + } + } + + /** + * @return True if the request is by Chunk + */ + public boolean isChunked() { + return isChunked; + } + + /** + * Encode one attribute + * + * @return the encoded attribute + * @throws ErrorDataEncoderException + * if the encoding is in error + */ + @SuppressWarnings("unchecked") + private String encodeAttribute(String s, Charset charset) throws ErrorDataEncoderException { + if (s == null) { + return ""; + } + try { + String encoded = URLEncoder.encode(s, charset.name()); + if (encoderMode == EncoderMode.RFC3986) { + for (Map.Entry entry : percentEncodings) { + String replacement = entry.getValue(); + encoded = entry.getKey().matcher(encoded).replaceAll(replacement); + } + } + return encoded; + } catch (UnsupportedEncodingException e) { + throw new ErrorDataEncoderException(charset.name(), e); + } + } + + /** + * The ByteBuf currently used by the encoder + */ + private ByteBuf currentBuffer; + /** + * The current InterfaceHttpData to encode (used if more chunks are available) + */ + private InterfaceHttpData currentData; + /** + * If not multipart, does the currentBuffer stands for the Key or for the Value + */ + private boolean isKey = true; + + /** + * + * @return the next ByteBuf to send as an HttpChunk and modifying currentBuffer accordingly + */ + private ByteBuf fillByteBuf() { + int length = currentBuffer.readableBytes(); + if (length > QuarkusHttpPostBodyUtil.chunkSize) { + return currentBuffer.readRetainedSlice(QuarkusHttpPostBodyUtil.chunkSize); + } else { + // to continue + ByteBuf slice = currentBuffer; + currentBuffer = null; + return slice; + } + } + + /** + * From the current context (currentBuffer and currentData), returns the next HttpChunk (if possible) trying to get + * sizeleft bytes more into the currentBuffer. This is the Multipart version. + * + * NOTE: this differs from the original (Netty) version of the method in that it returns {@link #WAIT_MARKER} if + * the data is not ready to be read + * + * @param sizeleft + * the number of bytes to try to get from currentData + * @return the next HttpChunk or null if not enough bytes were found + * @throws ErrorDataEncoderException + * if the encoding is in error + */ + private HttpContent encodeNextChunkMultipart(int sizeleft) throws ErrorDataEncoderException { + if (currentData == null) { + return null; + } + ByteBuf buffer; + if (currentData instanceof QuarkusInternalAttribute) { + buffer = ((QuarkusInternalAttribute) currentData).toByteBuf(); + currentData = null; + } else { + if (currentData instanceof MultiByteHttpData) { + MultiByteHttpData multiByteHttpData = (MultiByteHttpData) this.currentData; + if (!multiByteHttpData.isReady(sizeleft)) { + multiByteHttpData.suspend(sizeleft); + return WAIT_MARKER; // we'll invoke this method once more when the data is ready + } + } + try { + buffer = ((HttpData) currentData).getChunk(sizeleft); + } catch (IOException e) { + throw new ErrorDataEncoderException(e); + } + if (buffer.capacity() == 0) { + // end for current InterfaceHttpData, need more data + currentData = null; + return null; + } + } + if (currentBuffer == null) { + currentBuffer = buffer; + } else { + currentBuffer = wrappedBuffer(currentBuffer, buffer); + } + if (currentBuffer.readableBytes() < QuarkusHttpPostBodyUtil.chunkSize) { + currentData = null; + return null; + } + buffer = fillByteBuf(); + return new DefaultHttpContent(buffer); + } + + /** + * From the current context (currentBuffer and currentData), returns the next HttpChunk (if possible) trying to get + * sizeleft bytes more into the currentBuffer. This is the UrlEncoded version. + * + * @param sizeleft + * the number of bytes to try to get from currentData + * @return the next HttpChunk or null if not enough bytes were found + * @throws ErrorDataEncoderException + * if the encoding is in error + */ + private HttpContent encodeNextChunkUrlEncoded(int sizeleft) throws ErrorDataEncoderException { + if (currentData == null) { + return null; + } + int size = sizeleft; + ByteBuf buffer; + + // Set name= + if (isKey) { + String key = currentData.getName(); + buffer = wrappedBuffer(key.getBytes(charset)); + isKey = false; + if (currentBuffer == null) { + currentBuffer = wrappedBuffer(buffer, wrappedBuffer("=".getBytes(charset))); + } else { + currentBuffer = wrappedBuffer(currentBuffer, buffer, wrappedBuffer("=".getBytes(charset))); + } + // continue + size -= buffer.readableBytes() + 1; + if (currentBuffer.readableBytes() >= QuarkusHttpPostBodyUtil.chunkSize) { + buffer = fillByteBuf(); + return new DefaultHttpContent(buffer); + } + } + + // Put value into buffer + try { + buffer = ((HttpData) currentData).getChunk(size); + } catch (IOException e) { + throw new ErrorDataEncoderException(e); + } + + // Figure out delimiter + ByteBuf delimiter = null; + if (buffer.readableBytes() < size) { + isKey = true; + delimiter = iterator.hasNext() ? wrappedBuffer("&".getBytes(charset)) : null; + } + + // End for current InterfaceHttpData, need potentially more data + if (buffer.capacity() == 0) { + currentData = null; + if (currentBuffer == null) { + if (delimiter == null) { + return null; + } else { + currentBuffer = delimiter; + } + } else { + if (delimiter != null) { + currentBuffer = wrappedBuffer(currentBuffer, delimiter); + } + } + if (currentBuffer.readableBytes() >= QuarkusHttpPostBodyUtil.chunkSize) { + buffer = fillByteBuf(); + return new DefaultHttpContent(buffer); + } + return null; + } + + // Put it all together: name=value& + if (currentBuffer == null) { + if (delimiter != null) { + currentBuffer = wrappedBuffer(buffer, delimiter); + } else { + currentBuffer = buffer; + } + } else { + if (delimiter != null) { + currentBuffer = wrappedBuffer(currentBuffer, buffer, delimiter); + } else { + currentBuffer = wrappedBuffer(currentBuffer, buffer); + } + } + + // end for current InterfaceHttpData, need more data + if (currentBuffer.readableBytes() < QuarkusHttpPostBodyUtil.chunkSize) { + currentData = null; + isKey = true; + return null; + } + + buffer = fillByteBuf(); + return new DefaultHttpContent(buffer); + } + + @Override + public void close() throws Exception { + // NO since the user can want to reuse (broadcast for instance) + // cleanFiles(); + } + + @Deprecated + @Override + public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + /** + * Returns the next available HttpChunk. The caller is responsible to test if this chunk is the last one (isLast()), + * in order to stop calling this getMethod. + * + * @return the next available HttpChunk + * @throws ErrorDataEncoderException + * if the encoding is in error + */ + @Override + public HttpContent readChunk(ByteBufAllocator allocator) throws Exception { + if (isLastChunkSent) { + return null; + } else { + HttpContent nextChunk = nextChunk(); + if (nextChunk != WAIT_MARKER) { + globalProgress += nextChunk.content().readableBytes(); + } + return nextChunk; + } + } + + /** + * Returns the next available HttpChunk. The caller is responsible to test if this chunk is the last one (isLast()), + * in order to stop calling this getMethod. + * + * @return the next available HttpChunk + * @throws ErrorDataEncoderException + * if the encoding is in error + */ + private HttpContent nextChunk() throws ErrorDataEncoderException { + if (isLastChunk) { + isLastChunkSent = true; + return LastHttpContent.EMPTY_LAST_CONTENT; + } + // first test if previous buffer is not empty + int size = calculateRemainingSize(); + if (size <= 0) { + // NextChunk from buffer + ByteBuf buffer = fillByteBuf(); + return new DefaultHttpContent(buffer); + } + // size > 0 + if (currentData != null) { + // continue to read data + HttpContent chunk; + if (isMultipart) { + chunk = encodeNextChunkMultipart(size); + if (chunk == WAIT_MARKER) { + return WAIT_MARKER; // we'll come back + } + } else { + chunk = encodeNextChunkUrlEncoded(size); + } + if (chunk != null) { + // NextChunk from data + return chunk; + } + size = calculateRemainingSize(); + } + if (!iterator.hasNext()) { + return lastChunk(); + } + while (size > 0 && iterator.hasNext()) { + currentData = iterator.next(); + HttpContent chunk; + if (isMultipart) { + chunk = encodeNextChunkMultipart(size); + if (chunk == WAIT_MARKER) { + return WAIT_MARKER; + } + } else { + chunk = encodeNextChunkUrlEncoded(size); + } + if (chunk == null) { + // not enough + size = calculateRemainingSize(); + continue; + } + // NextChunk from data + return chunk; + } + // end since no more data + return lastChunk(); + } + + private int calculateRemainingSize() { + int size = QuarkusHttpPostBodyUtil.chunkSize; + if (currentBuffer != null) { + size -= currentBuffer.readableBytes(); + } + return size; + } + + private HttpContent lastChunk() { + isLastChunk = true; + if (currentBuffer == null) { + isLastChunkSent = true; + // LastChunk with no more data + return LastHttpContent.EMPTY_LAST_CONTENT; + } + // NextChunk as last non empty from buffer + ByteBuf buffer = currentBuffer; + currentBuffer = null; + return new DefaultHttpContent(buffer); + } + + @Override + public boolean isEndOfInput() { + return isLastChunkSent; + } + + @Override + public long length() { + return isMultipart ? globalBodySize : globalBodySize - 1; + } + + @Override + public long progress() { + return globalProgress; + } + + /** + * Exception when an error occurs while encoding + */ + public static class ErrorDataEncoderException extends Exception { + private static final long serialVersionUID = 5020247425493164465L; + + public ErrorDataEncoderException() { + } + + public ErrorDataEncoderException(String msg) { + super(msg); + } + + public ErrorDataEncoderException(Throwable cause) { + super(cause); + } + + public ErrorDataEncoderException(String msg, Throwable cause) { + super(msg, cause); + } + } + + private static class WrappedHttpRequest implements HttpRequest { + private final HttpRequest request; + + WrappedHttpRequest(HttpRequest request) { + this.request = request; + } + + @Override + public HttpRequest setProtocolVersion(HttpVersion version) { + request.setProtocolVersion(version); + return this; + } + + @Override + public HttpRequest setMethod(HttpMethod method) { + request.setMethod(method); + return this; + } + + @Override + public HttpRequest setUri(String uri) { + request.setUri(uri); + return this; + } + + @Override + public HttpMethod getMethod() { + return request.method(); + } + + @Override + public HttpMethod method() { + return request.method(); + } + + @Override + public String getUri() { + return request.uri(); + } + + @Override + public String uri() { + return request.uri(); + } + + @Override + public HttpVersion getProtocolVersion() { + return request.protocolVersion(); + } + + @Override + public HttpVersion protocolVersion() { + return request.protocolVersion(); + } + + @Override + public HttpHeaders headers() { + return request.headers(); + } + + @Override + public DecoderResult decoderResult() { + return request.decoderResult(); + } + + @Override + @Deprecated + public DecoderResult getDecoderResult() { + return request.getDecoderResult(); + } + + @Override + public void setDecoderResult(DecoderResult result) { + request.setDecoderResult(result); + } + } + + private static final class WrappedFullHttpRequest extends WrappedHttpRequest implements FullHttpRequest { + private final HttpContent content; + + private WrappedFullHttpRequest(HttpRequest request, HttpContent content) { + super(request); + this.content = content; + } + + @Override + public FullHttpRequest setProtocolVersion(HttpVersion version) { + super.setProtocolVersion(version); + return this; + } + + @Override + public FullHttpRequest setMethod(HttpMethod method) { + super.setMethod(method); + return this; + } + + @Override + public FullHttpRequest setUri(String uri) { + super.setUri(uri); + return this; + } + + @Override + public FullHttpRequest copy() { + return replace(content().copy()); + } + + @Override + public FullHttpRequest duplicate() { + return replace(content().duplicate()); + } + + @Override + public FullHttpRequest retainedDuplicate() { + return replace(content().retainedDuplicate()); + } + + @Override + public FullHttpRequest replace(ByteBuf content) { + DefaultFullHttpRequest duplicate = new DefaultFullHttpRequest(protocolVersion(), method(), uri(), content); + duplicate.headers().set(headers()); + duplicate.trailingHeaders().set(trailingHeaders()); + return duplicate; + } + + @Override + public FullHttpRequest retain(int increment) { + content.retain(increment); + return this; + } + + @Override + public FullHttpRequest retain() { + content.retain(); + return this; + } + + @Override + public FullHttpRequest touch() { + content.touch(); + return this; + } + + @Override + public FullHttpRequest touch(Object hint) { + content.touch(hint); + return this; + } + + @Override + public ByteBuf content() { + return content.content(); + } + + @Override + public HttpHeaders trailingHeaders() { + if (content instanceof LastHttpContent) { + return ((LastHttpContent) content).trailingHeaders(); + } else { + return EmptyHttpHeaders.INSTANCE; + } + } + + @Override + public int refCnt() { + return content.refCnt(); + } + + @Override + public boolean release() { + return content.release(); + } + + @Override + public boolean release(int decrement) { + return content.release(decrement); + } + } +} diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusHttpPostBodyUtil.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusHttpPostBodyUtil.java new file mode 100644 index 0000000000000..59b84ac780030 --- /dev/null +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusHttpPostBodyUtil.java @@ -0,0 +1,273 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.resteasy.reactive.client.impl.multipart; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.HttpConstants; + +/** + * A copy of Netty's HttpPostBodyUtil which is not public + * + * Shared Static object between HttpMessageDecoder, HttpPostRequestDecoder and HttpPostRequestEncoder + */ +final class QuarkusHttpPostBodyUtil { + + public static final int chunkSize = 8096; + + /** + * Default Content-Type in binary form + */ + public static final String DEFAULT_BINARY_CONTENT_TYPE = "application/octet-stream"; + + /** + * Default Content-Type in Text form + */ + public static final String DEFAULT_TEXT_CONTENT_TYPE = "text/plain"; + + /** + * Allowed mechanism for multipart + * mechanism := "7bit" + * / "8bit" + * / "binary" + * Not allowed: "quoted-printable" + * / "base64" + */ + public enum TransferEncodingMechanism { + /** + * Default encoding + */ + BIT7("7bit"), + /** + * Short lines but not in ASCII - no encoding + */ + BIT8("8bit"), + /** + * Could be long text not in ASCII - no encoding + */ + BINARY("binary"); + + private final String value; + + TransferEncodingMechanism(String value) { + this.value = value; + } + + public String value() { + return value; + } + + @Override + public String toString() { + return value; + } + } + + private QuarkusHttpPostBodyUtil() { + } + + /** + * This class intends to decrease the CPU in seeking ahead some bytes in + * HttpPostRequestDecoder + */ + static class SeekAheadOptimize { + byte[] bytes; + int readerIndex; + int pos; + int origPos; + int limit; + ByteBuf buffer; + + /** + * @param buffer buffer with a backing byte array + */ + SeekAheadOptimize(ByteBuf buffer) { + if (!buffer.hasArray()) { + throw new IllegalArgumentException("buffer hasn't backing byte array"); + } + this.buffer = buffer; + bytes = buffer.array(); + readerIndex = buffer.readerIndex(); + origPos = pos = buffer.arrayOffset() + readerIndex; + limit = buffer.arrayOffset() + buffer.writerIndex(); + } + + /** + * + * @param minus this value will be used as (currentPos - minus) to set + * the current readerIndex in the buffer. + */ + void setReadPosition(int minus) { + pos -= minus; + readerIndex = getReadPosition(pos); + buffer.readerIndex(readerIndex); + } + + /** + * + * @param index raw index of the array (pos in general) + * @return the value equivalent of raw index to be used in readerIndex(value) + */ + int getReadPosition(int index) { + return index - origPos + readerIndex; + } + } + + /** + * Find the first non whitespace + * + * @return the rank of the first non whitespace + */ + static int findNonWhitespace(String sb, int offset) { + int result; + for (result = offset; result < sb.length(); result++) { + if (!Character.isWhitespace(sb.charAt(result))) { + break; + } + } + return result; + } + + /** + * Find the end of String + * + * @return the rank of the end of string + */ + static int findEndOfString(String sb) { + int result; + for (result = sb.length(); result > 0; result--) { + if (!Character.isWhitespace(sb.charAt(result - 1))) { + break; + } + } + return result; + } + + /** + * Try to find first LF or CRLF as Line Breaking + * + * @param buffer the buffer to search in + * @param index the index to start from in the buffer + * @return a relative position from index > 0 if LF or CRLF is found + * or < 0 if not found + */ + static int findLineBreak(ByteBuf buffer, int index) { + int toRead = buffer.readableBytes() - (index - buffer.readerIndex()); + int posFirstChar = buffer.bytesBefore(index, toRead, HttpConstants.LF); + if (posFirstChar == -1) { + // No LF, so neither CRLF + return -1; + } + if (posFirstChar > 0 && buffer.getByte(index + posFirstChar - 1) == HttpConstants.CR) { + posFirstChar--; + } + return posFirstChar; + } + + /** + * Try to find last LF or CRLF as Line Breaking + * + * @param buffer the buffer to search in + * @param index the index to start from in the buffer + * @return a relative position from index > 0 if LF or CRLF is found + * or < 0 if not found + */ + static int findLastLineBreak(ByteBuf buffer, int index) { + int candidate = findLineBreak(buffer, index); + int findCRLF = 0; + if (candidate >= 0) { + if (buffer.getByte(index + candidate) == HttpConstants.CR) { + findCRLF = 2; + } else { + findCRLF = 1; + } + candidate += findCRLF; + } + int next; + while (candidate > 0 && (next = findLineBreak(buffer, index + candidate)) >= 0) { + candidate += next; + if (buffer.getByte(index + candidate) == HttpConstants.CR) { + findCRLF = 2; + } else { + findCRLF = 1; + } + candidate += findCRLF; + } + return candidate - findCRLF; + } + + /** + * Try to find the delimiter, with LF or CRLF in front of it (added as delimiters) if needed + * + * @param buffer the buffer to search in + * @param index the index to start from in the buffer + * @param delimiter the delimiter as byte array + * @param precededByLineBreak true if it must be preceded by LF or CRLF, else false + * @return a relative position from index > 0 if delimiter found designing the start of it + * (including LF or CRLF is asked) + * or a number < 0 if delimiter is not found + * @throws IndexOutOfBoundsException + * if {@code offset + delimiter.length} is greater than {@code buffer.capacity} + */ + static int findDelimiter(ByteBuf buffer, int index, byte[] delimiter, boolean precededByLineBreak) { + final int delimiterLength = delimiter.length; + final int readerIndex = buffer.readerIndex(); + final int writerIndex = buffer.writerIndex(); + int toRead = writerIndex - index; + int newOffset = index; + boolean delimiterNotFound = true; + while (delimiterNotFound && delimiterLength <= toRead) { + // Find first position: delimiter + int posDelimiter = buffer.bytesBefore(newOffset, toRead, delimiter[0]); + if (posDelimiter < 0) { + return -1; + } + newOffset += posDelimiter; + toRead -= posDelimiter; + // Now check for delimiter + if (toRead >= delimiterLength) { + delimiterNotFound = false; + for (int i = 0; i < delimiterLength; i++) { + if (buffer.getByte(newOffset + i) != delimiter[i]) { + newOffset++; + toRead--; + delimiterNotFound = true; + break; + } + } + } + if (!delimiterNotFound) { + // Delimiter found, find if necessary: LF or CRLF + if (precededByLineBreak && newOffset > readerIndex) { + if (buffer.getByte(newOffset - 1) == HttpConstants.LF) { + newOffset--; + // Check if CR before: not mandatory to be there + if (newOffset > readerIndex && buffer.getByte(newOffset - 1) == HttpConstants.CR) { + newOffset--; + } + } else { + // Delimiter with Line Break could be further: iterate after first char of delimiter + newOffset++; + toRead--; + delimiterNotFound = true; + continue; + } + } + return newOffset - readerIndex; + } + } + return -1; + } +} diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusInternalAttribute.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusInternalAttribute.java new file mode 100644 index 0000000000000..d1727380edb80 --- /dev/null +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusInternalAttribute.java @@ -0,0 +1,157 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.resteasy.reactive.client.impl.multipart; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.multipart.InterfaceHttpData; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.internal.ObjectUtil; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +/** + * A copy of Netty's InternalAttribute which is not public + * + * This Attribute is only for Encoder use to insert special command between object if needed + * (like Multipart Mixed mode) + */ +final class QuarkusInternalAttribute extends AbstractReferenceCounted implements InterfaceHttpData { + private final List value = new ArrayList<>(); + private final Charset charset; + private int size; + + QuarkusInternalAttribute(Charset charset) { + this.charset = charset; + } + + @Override + public HttpDataType getHttpDataType() { + return HttpDataType.InternalAttribute; + } + + public void addValue(String value) { + ObjectUtil.checkNotNull(value, "value"); + ByteBuf buf = Unpooled.copiedBuffer(value, charset); + this.value.add(buf); + size += buf.readableBytes(); + } + + public void addValue(String value, int rank) { + ObjectUtil.checkNotNull(value, "value"); + ByteBuf buf = Unpooled.copiedBuffer(value, charset); + this.value.add(rank, buf); + size += buf.readableBytes(); + } + + public void setValue(String value, int rank) { + ObjectUtil.checkNotNull(value, "value"); + ByteBuf buf = Unpooled.copiedBuffer(value, charset); + ByteBuf old = this.value.set(rank, buf); + if (old != null) { + size -= old.readableBytes(); + old.release(); + } + size += buf.readableBytes(); + } + + @Override + public int hashCode() { + return getName().hashCode(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof QuarkusInternalAttribute)) { + return false; + } + QuarkusInternalAttribute attribute = (QuarkusInternalAttribute) o; + return getName().equalsIgnoreCase(attribute.getName()); + } + + @Override + public int compareTo(InterfaceHttpData o) { + if (!(o instanceof QuarkusInternalAttribute)) { + throw new ClassCastException("Cannot compare " + getHttpDataType() + + " with " + o.getHttpDataType()); + } + return compareTo((QuarkusInternalAttribute) o); + } + + public int compareTo(QuarkusInternalAttribute o) { + return getName().compareToIgnoreCase(o.getName()); + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + for (ByteBuf elt : value) { + result.append(elt.toString(charset)); + } + return result.toString(); + } + + public int size() { + return size; + } + + public ByteBuf toByteBuf() { + return Unpooled.compositeBuffer().addComponents(value).writerIndex(size()).readerIndex(0); + } + + @Override + public String getName() { + return "InternalAttribute"; + } + + @Override + protected void deallocate() { + // Do nothing + } + + @Override + public InterfaceHttpData retain() { + for (ByteBuf buf : value) { + buf.retain(); + } + return this; + } + + @Override + public InterfaceHttpData retain(int increment) { + for (ByteBuf buf : value) { + buf.retain(increment); + } + return this; + } + + @Override + public InterfaceHttpData touch() { + for (ByteBuf buf : value) { + buf.touch(); + } + return this; + } + + @Override + public InterfaceHttpData touch(Object hint) { + for (ByteBuf buf : value) { + buf.touch(hint); + } + return this; + } +} diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartForm.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartForm.java index 394252966cd82..12ee1d958c0c1 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartForm.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartForm.java @@ -1,5 +1,6 @@ package org.jboss.resteasy.reactive.client.impl.multipart; +import io.smallrye.mutiny.Multi; import io.vertx.core.buffer.Buffer; import java.io.IOException; import java.lang.reflect.Type; @@ -75,6 +76,18 @@ public QuarkusMultipartForm binaryFileUpload(String name, String filename, Buffe return this; } + @SuppressWarnings("unused") + public QuarkusMultipartForm multiAsBinaryFileUpload(String name, String filename, Multi content, String mediaType) { + parts.add(new QuarkusMultipartFormDataPart(name, filename, content, mediaType, false)); + return this; + } + + @SuppressWarnings("unused") + public QuarkusMultipartForm multiAsTextFileUpload(String name, String filename, Multi content, String mediaType) { + parts.add(new QuarkusMultipartFormDataPart(name, filename, content, mediaType, true)); + return this; + } + @Override public Iterator iterator() { return parts.iterator(); diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormDataPart.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormDataPart.java index 61f07868d3d07..218e3d38222e0 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormDataPart.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormDataPart.java @@ -1,5 +1,6 @@ package org.jboss.resteasy.reactive.client.impl.multipart; +import io.smallrye.mutiny.Multi; import io.vertx.core.buffer.Buffer; /** @@ -16,12 +17,14 @@ public class QuarkusMultipartFormDataPart { private final boolean isObject; private final Class type; private final Buffer content; + private final Multi multiByteContent; public QuarkusMultipartFormDataPart(String name, Buffer content, String mediaType, Class type) { this.name = name; this.content = content; this.mediaType = mediaType; this.type = type; + this.multiByteContent = null; if (name == null) { throw new NullPointerException("Multipart field name cannot be null"); @@ -39,6 +42,27 @@ public QuarkusMultipartFormDataPart(String name, Buffer content, String mediaTyp this.text = false; } + public QuarkusMultipartFormDataPart(String name, String filename, Multi content, String mediaType, boolean text) { + if (name == null) { + throw new NullPointerException("Multipart field name cannot be null"); + } + if (mediaType == null) { + throw new NullPointerException("Multipart field media type cannot be null"); + } + + this.name = name; + this.multiByteContent = content; + this.mediaType = mediaType; + this.filename = filename; + this.text = text; + + this.isObject = false; + this.value = null; + this.pathname = null; + this.type = null; + this.content = null; + } + public QuarkusMultipartFormDataPart(String name, String value) { if (name == null) { throw new NullPointerException("Multipart field name cannot be null"); @@ -51,6 +75,7 @@ public QuarkusMultipartFormDataPart(String name, String value) { this.filename = null; this.pathname = null; this.content = null; + this.multiByteContent = null; this.mediaType = null; this.text = false; this.isObject = false; @@ -75,6 +100,7 @@ public QuarkusMultipartFormDataPart(String name, String filename, String pathnam this.filename = filename; this.pathname = pathname; this.content = null; + this.multiByteContent = null; this.mediaType = mediaType; this.text = text; this.isObject = false; @@ -99,6 +125,7 @@ public QuarkusMultipartFormDataPart(String name, String filename, Buffer content this.filename = filename; this.pathname = null; this.content = content; + this.multiByteContent = null; this.mediaType = mediaType; this.text = text; this.isObject = false; @@ -133,6 +160,10 @@ public Buffer content() { return content; } + public Multi multiByteContent() { + return multiByteContent; + } + public String mediaType() { return mediaType; } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormUpload.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormUpload.java index 70f13fc378d4b..14af23304ae44 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormUpload.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormUpload.java @@ -9,7 +9,6 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; import io.netty.handler.codec.http.multipart.FileUpload; -import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; import io.netty.handler.codec.http.multipart.MemoryFileUpload; import io.vertx.core.Context; import io.vertx.core.Handler; @@ -21,16 +20,17 @@ import io.vertx.core.streams.impl.InboundBuffer; import java.io.File; import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicInteger; /** * based on {@link io.vertx.ext.web.client.impl.MultipartFormUpload} */ -public class QuarkusMultipartFormUpload implements ReadStream { +public class QuarkusMultipartFormUpload implements ReadStream, Runnable { private static final UnpooledByteBufAllocator ALLOC = new UnpooledByteBufAllocator(false); private DefaultFullHttpRequest request; - private HttpPostRequestEncoder encoder; + private PausableHttpPostRequestEncoder encoder; private Handler exceptionHandler; private Handler dataHandler; private Handler endHandler; @@ -41,7 +41,7 @@ public class QuarkusMultipartFormUpload implements ReadStream { public QuarkusMultipartFormUpload(Context context, QuarkusMultipartForm parts, boolean multipart, - HttpPostRequestEncoder.EncoderMode encoderMode) throws Exception { + PausableHttpPostRequestEncoder.EncoderMode encoderMode) throws Exception { this.context = context; this.pending = new InboundBuffer<>(context) .handler(this::handleChunk) @@ -51,22 +51,18 @@ public QuarkusMultipartFormUpload(Context context, io.netty.handler.codec.http.HttpMethod.POST, "/"); Charset charset = parts.getCharset() != null ? parts.getCharset() : HttpConstants.DEFAULT_CHARSET; - this.encoder = new HttpPostRequestEncoder( - new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE, charset) { - @Override - public FileUpload createFileUpload(HttpRequest request, String name, String filename, String contentType, - String contentTransferEncoding, Charset _charset, long size) { - if (_charset == null) { - _charset = charset; - } - return super.createFileUpload(request, name, filename, contentType, contentTransferEncoding, _charset, - size); - } - }, - request, - multipart, - charset, - encoderMode); + DefaultHttpDataFactory httpDataFactory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE, charset) { + @Override + public FileUpload createFileUpload(HttpRequest request, String name, String filename, String contentType, + String contentTransferEncoding, Charset _charset, long size) { + if (_charset == null) { + _charset = charset; + } + return super.createFileUpload(request, name, filename, contentType, contentTransferEncoding, _charset, + size); + } + }; + this.encoder = new PausableHttpPostRequestEncoder(httpDataFactory, request, multipart, charset, encoderMode); for (QuarkusMultipartFormDataPart formDataPart : parts) { if (formDataPart.isAttribute()) { encoder.addBodyAttribute(formDataPart.name(), formDataPart.value()); @@ -75,6 +71,30 @@ public FileUpload createFileUpload(HttpRequest request, String name, String file formDataPart.isText() ? null : "binary", null, formDataPart.content().length()); data.setContent(formDataPart.content().getByteBuf()); encoder.addBodyHttpData(data); + } else if (formDataPart.multiByteContent() != null) { + String contentTransferEncoding = null; + String contentType = formDataPart.mediaType(); + if (contentType == null) { + if (formDataPart.isText()) { + contentType = "text/plain"; + } else { + contentType = "application/octet-stream"; + } + } + if (!formDataPart.isText()) { + contentTransferEncoding = "binary"; + } + + encoder.addBodyHttpData(new MultiByteHttpData( + formDataPart.name(), + formDataPart.filename(), + contentType, + contentTransferEncoding, + Charset.defaultCharset(), + formDataPart.multiByteContent(), + this::handleError, + context, + this)); } else { String pathname = formDataPart.pathname(); if (pathname != null) { @@ -121,16 +141,23 @@ private void handleChunk(Object item) { handler.handle(item); } + @Override public void run() { if (Vertx.currentContext() != context) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Wrong Vert.x context used for multipart upload. Expected: " + context + + ", actual: " + Vertx.currentContext()); } + AtomicInteger counter = new AtomicInteger(); while (!ended) { if (encoder.isChunked()) { try { HttpContent chunk = encoder.readChunk(ALLOC); + if (chunk == PausableHttpPostRequestEncoder.WAIT_MARKER) { + return; // resumption will be scheduled by encoder + } ByteBuf content = chunk.content(); Buffer buff = Buffer.buffer(content); + counter.incrementAndGet(); boolean writable = pending.write(buff); if (encoder.isEndOfInput()) { ended = true; @@ -141,10 +168,7 @@ public void run() { break; } } catch (Exception e) { - ended = true; - request = null; - encoder = null; - pending.write(e); + handleError(e); break; } } else { @@ -159,6 +183,13 @@ public void run() { } } + private void handleError(Throwable e) { + ended = true; + request = null; + encoder = null; + pending.write(e); + } + public MultiMap headers() { return new HeadersAdaptor(request.headers()); } @@ -188,6 +219,7 @@ public ReadStream fetch(long amount) { } @Override + @Deprecated public synchronized QuarkusMultipartFormUpload resume() { pending.resume(); return this; diff --git a/integration-tests/rest-client-reactive-multipart/src/main/java/io/quarkus/it/rest/client/multipart/MultipartClient.java b/integration-tests/rest-client-reactive-multipart/src/main/java/io/quarkus/it/rest/client/multipart/MultipartClient.java index e59cced458d9e..08542f443a73e 100644 --- a/integration-tests/rest-client-reactive-multipart/src/main/java/io/quarkus/it/rest/client/multipart/MultipartClient.java +++ b/integration-tests/rest-client-reactive-multipart/src/main/java/io/quarkus/it/rest/client/multipart/MultipartClient.java @@ -13,6 +13,7 @@ import org.jboss.resteasy.reactive.MultipartForm; import org.jboss.resteasy.reactive.PartType; +import io.smallrye.mutiny.Multi; import io.vertx.core.buffer.Buffer; @Path("/echo") @@ -25,6 +26,12 @@ public interface MultipartClient { @Path("/binary") String sendByteArrayAsBinaryFile(@MultipartForm WithByteArrayAsBinaryFile data); + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Produces(MediaType.TEXT_PLAIN) + @Path("/binary") + String sendMultiByteAsBinaryFile(@MultipartForm WithMultiByteAsBinaryFile data); + @POST @Consumes(MediaType.MULTIPART_FORM_DATA) @Produces(MediaType.TEXT_PLAIN) @@ -134,6 +141,17 @@ class WithByteArrayAsBinaryFile { public String fileName; } + class WithMultiByteAsBinaryFile { + + @FormParam("file") + @PartType(MediaType.APPLICATION_OCTET_STREAM) + public Multi file; + + @FormParam("fileName") + @PartType(MediaType.TEXT_PLAIN) + public String fileName; + } + class WithBufferAsBinaryFile { @FormParam("file") @PartType(MediaType.APPLICATION_OCTET_STREAM) diff --git a/integration-tests/rest-client-reactive-multipart/src/main/java/io/quarkus/it/rest/client/multipart/MultipartResource.java b/integration-tests/rest-client-reactive-multipart/src/main/java/io/quarkus/it/rest/client/multipart/MultipartResource.java index de36ac30f2fb3..92b7ea3d63464 100644 --- a/integration-tests/rest-client-reactive-multipart/src/main/java/io/quarkus/it/rest/client/multipart/MultipartResource.java +++ b/integration-tests/rest-client-reactive-multipart/src/main/java/io/quarkus/it/rest/client/multipart/MultipartResource.java @@ -6,6 +6,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -32,9 +34,11 @@ import io.quarkus.it.rest.client.multipart.MultipartClient.WithByteArrayAsTextFile; import io.quarkus.it.rest.client.multipart.MultipartClient.WithFileAsBinaryFile; import io.quarkus.it.rest.client.multipart.MultipartClient.WithFileAsTextFile; +import io.quarkus.it.rest.client.multipart.MultipartClient.WithMultiByteAsBinaryFile; import io.quarkus.it.rest.client.multipart.MultipartClient.WithPathAsBinaryFile; import io.quarkus.it.rest.client.multipart.MultipartClient.WithPathAsTextFile; import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Multi; import io.vertx.core.buffer.Buffer; @Path("") @@ -85,6 +89,25 @@ public String sendByteArray(@QueryParam("nullFile") @DefaultValue("false") boole return client.sendByteArrayAsBinaryFile(data); } + @GET + @Path("/client/multi-byte-as-binary-file") + @Consumes(MediaType.TEXT_PLAIN) + @Produces(MediaType.TEXT_PLAIN) + @Blocking + public String sendMultiByte(@QueryParam("nullFile") @DefaultValue("false") boolean nullFile) { + WithMultiByteAsBinaryFile data = new WithMultiByteAsBinaryFile(); + if (!nullFile) { + List bytes = new ArrayList<>(); + for (byte b : HELLO_WORLD.getBytes(UTF_8)) { + bytes.add(b); + } + + data.file = Multi.createFrom().iterable(bytes); + } + data.fileName = GREETING_TXT; + return client.sendMultiByteAsBinaryFile(data); + } + @GET @Path("/client/buffer-as-binary-file") @Consumes(MediaType.TEXT_PLAIN) diff --git a/integration-tests/rest-client-reactive-multipart/src/test/java/io/quarkus/it/rest/client/multipart/MultipartResourceTest.java b/integration-tests/rest-client-reactive-multipart/src/test/java/io/quarkus/it/rest/client/multipart/MultipartResourceTest.java index c347399bbe0b7..7e9e339baa449 100644 --- a/integration-tests/rest-client-reactive-multipart/src/test/java/io/quarkus/it/rest/client/multipart/MultipartResourceTest.java +++ b/integration-tests/rest-client-reactive-multipart/src/test/java/io/quarkus/it/rest/client/multipart/MultipartResourceTest.java @@ -82,6 +82,18 @@ public void shouldSendFileAsBinaryFile() { // @formatter:on } + @Test + public void shouldMultiAsBinaryFile() { + // @formatter:off + given() + .header("Content-Type", "text/plain") + .when().get("/client/multi-byte-as-binary-file") + .then() + .statusCode(200) + .body(equalTo("fileOk:true,nameOk:true")); + // @formatter:on + } + @Test public void shouldSendNullFileAsBinaryFile() { // @formatter:off