Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implmented multiparts support for netty #2560

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/sttp/tapir/internal/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.nio.charset.{Charset, StandardCharsets}
import scala.collection.immutable
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
import sttp.monad.syntax._

package object internal {
// some definitions are intentionally left public as they are used in server/client interpreters
Expand Down Expand Up @@ -352,4 +353,15 @@ package object internal {
case null => true
case _ => false
}

implicit class SequenceSupport[F[_], T](lt: List[F[T]])(implicit me: MonadError[F]) {
def sequence(): F[List[T]] = {
def sequence(lt: List[F[T]]): F[List[T]] = lt match {
case Nil => (Nil: List[T]).unit
case head :: tail => head.flatMap(ht => sequence(tail).map(ht :: _))
}
sequence(lt)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class NettyCatsServerTest extends TestSuite with EitherValues {
val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher)
val createServerTest = new DefaultCreateServerTest(backend, interpreter)

val tests = new AllServerTests(createServerTest, interpreter, backend, staticContent = false, multipart = false).tests()
val tests = new AllServerTests(createServerTest, interpreter, backend, staticContent = false, multipart = false).tests() ++
new ServerMultipartTests(createServerTest, partOtherHeaderSupport = false).tests()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we support the additional part headers? seems this might be a matter of adding the headers when serialising the part?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far I as check there is no way to get those headers while parsing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm it would be weird if the data just got lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will check it once more

Copy link
Contributor Author

@Pask423 Pask423 Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some more digging I cannot see any way to get parts headers from netty - they are not included in overall request headers nor in HttpDataType's used in getParts method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it seems so; so we'll have to leave this as is, thanks for checking


IO.pure((tests, eventLoopGroup))
} { case (_, eventLoopGroup) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package sttp.tapir.server.netty.internal

import io.netty.buffer.{ByteBufInputStream, ByteBufUtil}
import io.netty.handler.codec.http.FullHttpRequest
import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType
import io.netty.handler.codec.http.multipart.{Attribute, FileUpload, HttpData, HttpPostMultipartRequestDecoder}
import sttp.capabilities
import sttp.model.{MediaType, Part}
import sttp.monad.MonadError
import sttp.tapir.{FileRange, RawBodyType, TapirFile}
import sttp.tapir.model.ServerRequest
import sttp.monad.syntax._
import sttp.tapir.capabilities.NoStreams
import sttp.tapir.internal.SequenceSupport
import sttp.tapir.model.ServerRequest
import sttp.tapir.server.interpreter.{RawValue, RequestBody}
import sttp.tapir.server.netty.NettyServerRequest
import sttp.tapir.{FileRange, RawBodyType, RawPart, TapirFile}

import java.nio.ByteBuffer
import java.nio.file.Files
import scala.collection.JavaConverters._

class NettyRequestBody[F[_]](createFile: ServerRequest => F[TapirFile])(implicit
monadError: MonadError[F]
Expand All @@ -36,11 +40,74 @@ class NettyRequestBody[F[_]](createFile: ServerRequest => F[TapirFile])(implicit
Files.write(file.toPath, requestContentAsByteArray)
RawValue(FileRange(file), Seq(FileRange(file)))
})
case _: RawBodyType.MultipartBody => ???
case m: RawBodyType.MultipartBody =>
monadError
.unit(new HttpPostMultipartRequestDecoder(nettyRequest(serverRequest)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a .unit syntax; or, if this can throw an exception, we should use .eval

.flatMap(decoder => {
getParts(serverRequest, m, decoder)
.sequence()
.map(RawValue.fromParts)
.map(a => {
decoder.destroy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be in the "finally" caluse using .ensure

a.asInstanceOf[RawValue[RAW]]
})
})
}
}

override def toStream(serverRequest: ServerRequest): streams.BinaryStream = throw new UnsupportedOperationException()

private def nettyRequest(serverRequest: ServerRequest): FullHttpRequest = serverRequest.underlying.asInstanceOf[FullHttpRequest]

private def getParts(
serverRequest: ServerRequest,
m: RawBodyType.MultipartBody,
decoder: HttpPostMultipartRequestDecoder
): List[F[Part[Any]]] = {
decoder.getBodyHttpDatas.asScala
.flatMap(httpData =>
httpData.getHttpDataType match {
case HttpDataType.Attribute =>
m.partType(httpData.getName).map(c => toPart(serverRequest, c, httpData.asInstanceOf[Attribute], None)).toList
case HttpDataType.FileUpload =>
m.partType(httpData.getName)
.map(c => {
val upload = httpData.asInstanceOf[FileUpload]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we cast to FileUpload if we just need a HttpData?

toPart(serverRequest, c, upload, Some(upload.getContentType))
})
.toList
case HttpDataType.InternalAttribute => throw new UnsupportedOperationException("DataType not supported")
}
)
.toList
}

private def toPart(
serverRequest: ServerRequest,
m: RawBodyType[_],
upload: HttpData,
contentType: Option[String]
): F[Part[Any]] = {
val mediaType = contentType.flatMap(c => MediaType.parse(c).toOption)
m match {
case RawBodyType.StringBody(charset) =>
monadError.unit(Part(name = upload.getName, body = upload.getString(charset), contentType = mediaType))
Copy link
Member

@adamw adamw Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need two variants, for parts which are in-memory and file-based (as reported by netty). If they are in memory, then this should be a .unit or monad.eval(...) (depending if exceptions are possible).

If these are in a file, then these should be .blocking, as reading from the file is done in a blocking way?

case RawBodyType.ByteBufferBody => monadError.unit(Part(name = upload.getName, body = upload.content(), contentType = mediaType))
case RawBodyType.InputStreamBody =>
monadError.unit(Part(name = upload.getName, body = new ByteBufInputStream(upload.content()), contentType = mediaType))
case RawBodyType.ByteArrayBody => monadError.unit(Part(name = upload.getName, body = upload.get(), contentType = mediaType))
case RawBodyType.FileBody =>
createFile(serverRequest)
.map(file => {
Files.write(file.toPath, ByteBufUtil.getBytes(upload.content()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this read the whole file content into memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well that's not good then. We need to transfer these bytes without reading the content into memory; and if there's any blocking code, surround this with monadError.blocking

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand we could extract the package-private method readBytes from netty (https://github.com/netty/netty/blob/4.1/buffer/src/main/java/io/netty/buffer/ByteBufUtil.java#L1878) and use it here so, that bytes from ByteBuffer could be written to FileOutputStream, depending on the size it is done directly or chunk by chunk.

Part(
name = upload.getName,
body = FileRange(file),
contentType = mediaType,
fileName = Some(file.getName)
)
})
case _ => throw new UnsupportedOperationException("BodyType not supported as FileUpload type")
}
}

override def toStream(serverRequest: ServerRequest): streams.BinaryStream = throw new UnsupportedOperationException()
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package sttp.tapir.server.netty.internal

import io.netty.buffer.Unpooled
import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.stream.{ChunkedFile, ChunkedStream}
import sttp.capabilities
import sttp.model.HasHeaders
import sttp.model.{HasHeaders, Part}
import sttp.tapir.capabilities.NoStreams
import sttp.tapir.server.interpreter.ToResponseBody
import sttp.tapir.server.netty.NettyResponse
import sttp.tapir.server.netty.NettyResponseContent.{ByteBufNettyResponseContent, ChunkedFileNettyResponseContent, ChunkedStreamNettyResponseContent}
import sttp.tapir.{CodecFormat, FileRange, RawBodyType, WebSocketBodyOutput}
import sttp.tapir.server.netty.NettyResponseContent.{
ByteBufNettyResponseContent,
ChunkedFileNettyResponseContent,
ChunkedStreamNettyResponseContent
}
import sttp.tapir.{CodecFormat, FileRange, RawBodyType, RawPart, WebSocketBodyOutput}

import java.io.{InputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.charset.Charset
import java.nio.file.Files
import java.util.UUID

class NettyToResponseBody extends ToResponseBody[NettyResponse, NoStreams] {
override val streams: capabilities.Streams[NoStreams] = NoStreams
Expand All @@ -40,7 +46,14 @@ class NettyToResponseBody extends ToResponseBody[NettyResponse, NoStreams] {
val fileRange = v.asInstanceOf[FileRange]
(ctx: ChannelHandlerContext) => ChunkedFileNettyResponseContent(ctx.newPromise(), wrap(fileRange))

case _: RawBodyType.MultipartBody => throw new UnsupportedOperationException
case m: RawBodyType.MultipartBody =>
val buffers: List[ByteBuf] = v
.asInstanceOf[List[RawPart]]
.flatMap(part =>
m.partType(part.name)
.map(bodyType => convertToBuffs(bodyType, part))
)
(ctx: ChannelHandlerContext) => ByteBufNettyResponseContent(ctx.newPromise(), Unpooled.wrappedBuffer(buffers: _*))
}
}

Expand All @@ -57,14 +70,46 @@ class NettyToResponseBody extends ToResponseBody[NettyResponse, NoStreams] {
} yield (start, end + NettyToResponseBody.IncludingLastOffset)

maybeRange match {
case Some((start, end)) => {
case Some((start, end)) =>
val randomAccessFile = new RandomAccessFile(file, NettyToResponseBody.ReadOnlyAccessMode)
new ChunkedFile(randomAccessFile, start, end - start, NettyToResponseBody.DefaultChunkSize)
}
case None => new ChunkedFile(file)
}
}

private def convertToBuffs(bodyType: RawBodyType[_], part: Part[Any]): ByteBuf = {
bodyType match {
case RawBodyType.StringBody(_) =>
toPart(part.body, part.contentType, part.name, None)
case RawBodyType.ByteArrayBody =>
toPart(part.body, part.contentType, part.name, None)
case RawBodyType.ByteBufferBody =>
toPart(part.body, part.contentType, part.name, None)
case RawBodyType.InputStreamBody =>
toPart(part.body, part.contentType, part.name, None)
case RawBodyType.FileBody =>
val fileRange = part.body.asInstanceOf[FileRange]
toPart(Files.readString(fileRange.file.toPath), part.contentType, part.name, Some(fileRange.file.getName))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the same, we can't read the file into memory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand we would need to introduce our own ChunkedMultipart data structure. It needs to be constructed by passing a list of ByteBufs with small content and multipart parts' like headers and footers, and chunked streams and chunked files. And it needs to behave like ChunkedFile or ChunkedStream i.e. to implement ChunkedInput<ByteBuf>, a way to treat all the parts as something homogeneous, that could be read as a one whole. At the first glance, it seems to be quite tricky.

case RawBodyType.MultipartBody(_, _) =>
throw new UnsupportedOperationException("Nested multipart messages are not supported.")
}
}

private def toPart(data: Any, contentType: Option[String], name: String, filename: Option[String]): ByteBuf = {
val boundary = UUID.randomUUID.toString
val fileNameStr = filename.map(name => s"""filename="$name";""").getOrElse("")
val contentTypeStr = contentType.map(ct => s"Content-Type: $ct").getOrElse("")
val textPart =
s"""
$boundary
$contentTypeStr
Content-Disposition: form-data; $fileNameStr name="$name"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we should be able to add more headers ... however, maybe the names/file names would need some escaping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If i will find the way to read those headers from request then no problem.


$data
$boundary
"""
Unpooled.wrappedBuffer(textPart.getBytes)
}
override def fromStreamValue(
v: streams.BinaryStream,
headers: HasHeaders,
Expand All @@ -76,6 +121,7 @@ class NettyToResponseBody extends ToResponseBody[NettyResponse, NoStreams] {
pipe: streams.Pipe[REQ, RESP],
o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, NoStreams]
): NettyResponse = throw new UnsupportedOperationException

}

object NettyToResponseBody {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class NettyFutureServerTest extends TestSuite with EitherValues {
val interpreter = new NettyFutureTestServerInterpreter(eventLoopGroup)
val createServerTest = new DefaultCreateServerTest(backend, interpreter)

val tests = new AllServerTests(createServerTest, interpreter, backend, multipart = false).tests()
val tests = new AllServerTests(createServerTest, interpreter, backend, multipart = false).tests() ++
new ServerMultipartTests(createServerTest, partOtherHeaderSupport = false).tests()

(tests, eventLoopGroup)
}) { case (_, eventLoopGroup) =>
Expand Down