-
Notifications
You must be signed in to change notification settings - Fork 427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jdkhttp-server: Write multipart parts bigger than threshold to files #3227
Conversation
recursivelyParseState(ParseData(empty, List.empty, Default)).completedParts | ||
} | ||
|
||
private val TempFileThreshold = 52_428_800 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this threshold be configurable somehow?
I got this number from http4s, but input is appreciated 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we'll read 52MB of data before falling back to a file? Seems quite a lot ;) We could make this configurable via JdkHttpServerOptions
, but I'm not sure if it's worth the effort. A reasonable default might do for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem to be too much work if i can pass it the same way createFile
is passed through the JdkHttpRequestBody
constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe let's try, then we can also quite easily add a test which would try to send large parts
@tailrec | ||
def recursivelyParseBodyBytes(outputStream: PartStream, lastXBytes: Array[Byte], numReadBytes: Int): InputStream = { | ||
val currentByte = is.read() | ||
if (currentByte == -1) throw new IllegalArgumentException("Parsing multipart failed, ran out of bytes before finding boundary") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't needed to pass the tests, but i guess some infinite loop could occur if some invalid body was passed. Is this an okay way to handle that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exceptions here are fine. I'm not sure if IAE is the correct one here, though, it's not that an invalid argument was passed, but rather that the body is malformed. Maybe simply a RuntimeException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the resulting error of a RuntimeException?
Wouldn't it make most sense if the resulting error is a 400 or 422 if its because of a malformed body?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm yeah currently any exception will end up returning a 500. Maybe let's create a separate issue to provide a built-in exception for signalling a specific result, in case it's not possible to communicate this in another way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure there is another way? I assume the http4s and others use the failure part of the effect, but the "effect" in jdkhttp-server
doesn't really offer a way we could handle this as far as i could tell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we could do is add a RespondWithException
, which
(a) would contain the status code and message to return to the user
(b) would be handled by the standard ExceptionInterceptor
then we could use it in situations as those
Iterator | ||
.continually(reader.readLine()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this previous way felt more readable, i couldn't figure out a good way to "chunk" the data up when determining whether the threshold was passed or not when reading lines (What is the body doesn't have any newlines), so i think the new way is better(?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here a good-old for loop with two mutable vars for last two bytes + a ByteArrayOutputStream
for accumulating the values would be much more performant
new FileInputStream(f) | ||
} | ||
|
||
private def readUntilNewline(inputStream: InputStream): Array[Byte] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just starting to read through the code, but doesn't this mean that if we get a line which is over 52MB, we'll read it into memory anyway? What if there are no newlines in the part data for a long time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The readUntilNewline
and readStringUntilNewline
functions are only called in places where there is expected to be a newline in a few bytes if the body is correctly formed.
When parsing headers, when
We could have a problem with a malformed body i suppose, I'll see if i can figure out a way to either throw an error in those cases or rework the parsing somehow 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, maybe simply a hard limit - something reasonable that would cover the max boundary / header size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Max boundary size is 70 characters (+ a few for delimiters) according to the rfc.
The headers doesn't really have a specced max-size as far as i could tell, but i saw a few defaulting to 4kb and 8kb max sizes for normal http headers, so i don't think something like that for a max size sounds like a terrible idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah 8KB sounds very reasonable
val partStream = outputStream.convertToFileIfThresholdMet(numReadBytes) | ||
partStream.write(currentByte) | ||
|
||
val updatedLastXBytes = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this is quite elegant in code, it might not perform that well: for each byte read, we'll allocate a new byte array with the last boundary bytes. I think a better solution would be to keep a circular buffer with the boundary bytes - moving the index where we store the next byte, and comparing the contents from the subsequent index (modulo size).
Another problem might be comparing the boundary with the current boundary buffer for each byte - it might also be a performance penalty. So I think we could keep track of how many bytes matched so far, and only check if the next one matches? One problem here would be that in case of a mismatch, we would have to try restarting the process for bytes that have already been put in the buffer. Definitely non-trivial logic, and would need some good unit-test coverage.
Going a bit further, we might delay writing bytes to the output until we are sure they are not part of the boundary, avoiding the need of truncating the streams in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds like a good idea. I'll try to implement all of these suggestions. It'll probably take me some time, but you're obviously right about the performance benefits 😄
def convertToFileIfThresholdMet(numReadBytes: Int): PartStream = this match { | ||
case ByteStream(os) if numReadBytes >= TempFileThreshold => | ||
val newFile = createTempFile() | ||
val fileOutputStream = new FileOutputStream(newFile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use a BufferedOutputStream
to avoid going to the disk for each byte?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like a good idea as well 👍
Nicely written - code was easy to follow :) I left some suggestions in the comments |
Just pushed some changes doing mostly everything we talked about, except the exception returning errors, but that could probably be a separate PR? I'm not too sure about the factoring. Tried to keep the mutable parts to the Also wondering some git stuff: Would you like me to keep pushing to this with commits like this, and for you to handle squash/merge however you like. Or would you like me to rebase the PR to a single commit? |
|
||
import scala.collection.IndexedSeqView | ||
|
||
class CircularBuffer(bufferSize: Int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not super sure if this needed to be a class or if i should just do something inline. Or even if i should use the circular buffer for everything like i did.
|
||
val b1 = | ||
if (readBytes >= bufferSize) underlying.view.slice(idx, bufferSize) | ||
else underlying.view.slice(0, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels a little weird.
But not sure if i love Array.empty[Byte].view
either.
What do you think?
Sorry to nag, but anymore feedback on this @adamw ? 😄 |
@jnatten I'm very sorry, got swamped by a long queue of work, looking now :) |
} | ||
|
||
private def lookForBoundary(currentByte: Int, boundary: Array[Byte]): Boolean = { | ||
if (currentByte == boundary(numMatchedBoundaryChars)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be problematic if the boundary has some common substring with the body. E.g. let's say the boundary is AAB...
The body goes: AAAB...
, so we have one A
byte from the "proper" body, and then the boundary. We read A
, A
, advancing numMatchedBoundaryChars
, but then we get a mismatch, which resets the counter.
So now we'll continue on reading AB...
, and we'll miss the boundary.
Instead, we should have retreated our steps and checked if some other prefix didn't match. I think it's a well known problem in CS ... maybe https://en.wikipedia.org/wiki/Knuth–Morris–Pratt_algorithm ?
Either way, we'll need tests for this :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats true! Good catch 👍
I think I'll get KMP working and try to add some tests.
The best place to add a test would be here: tapir/server/tests/src/main/scala/sttp/tapir/server/tests/ServerMultipartTests.scala Line 22 in c608feb
This will then be included in tests for all server interpreters, including this one: tapir/server/jdkhttp-server/src/test/scala/sttp/tapir/server/jdkhttp/JdkHttpServerTest.scala Line 17 in 33a1dbd
Alternatively, you can add a stand-alone test in |
As for commits/squashes, I wouldn't be concerned about it. Please just add commits as is most convenient for you, I usually do normal merges :) |
stream = ByteStream() | ||
bodySize = 0 | ||
} else if (numMatchedBoundaryChars == 0) { | ||
circularBuffer.getBytes.foreach(byte => stream.underlying.write(byte.toInt)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm are you really using CircularBuffer
as a circular buffer, not only as a buffer? It seems you are buffering bytes only when there's some agreement on the boundary bytes, but as soon as there's a mismatch the bytes are copied to the sink, and the buffer is reset (by setting its currentIndex
to -1). So it never wraps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems you are correct 😄
I went through a few iterations, so i don't think we need the wrapping anymore. I'll remove it and rewrite with a normal arraybuffer.
val foundFinalBoundary = endMatcher.matchByte(currentByte.toByte) | ||
val foundBoundary = bodyBoundaryMatcher.matchByte(currentByte.toByte) || foundFinalBoundary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this could be done with another state that looks for --
/ \r\n
instead.
Not sure which one i prefer. I guess adding the state would be marginally more performant. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's good as-is :)
|
||
import scala.collection.mutable | ||
|
||
class KMPMatcher(delimiter: Array[Byte]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one final comment here, sorry :) maybe it would be worth adding some unit tests for KMPMatcher
to check for some corner cases and "happy paths"? I know there's an additional multipart test, but there might be some data combinations which aren't covered. And I'm not that familiar with the algorithm to see that it's correct ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't be sorry to have requirements that makes the code better 😄
I'll add some tests!
completePart(bodyInputStream) | ||
stream = ByteStream() | ||
bodySize = 0 | ||
} else if (bodyBoundaryMatcher.getMatches == 0 && endMatcher.getMatches == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it possible that we'll overflow the buffer, if we constantly get a match? e.g. the body is only-As and the boundary starts with an A?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, that could make us drop bytes.
I'll try to avoid that 😄
override def tests: Resource[IO, List[Test]] = Resource.eval( | ||
IO.pure( | ||
List( | ||
Test("That matching over a set of bytes works and does not allow writing of any bytes if only matching") { | ||
Future { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the correct way to add individual tests that have "nothing" to do with the servers?
Felt a bit weird, but I'm not familiar enough to be sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the TestSuite
is more aimed at writing server/client interpreter tests. For normal unit tests, you can simply use what ScalaTest offers, see e.g.
class CodecTest extends AnyFlatSpec with Matchers with Checkers with Inside { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aha, makes sense. I'll fix 👍
This patch makes the multipart body parsing byte by byte to introduce writing big multipart parts to files to conserve memory.
This patch improves the parsing in `ParsedMultiPart` by: - Introducing mutability to improve performance by avoiding to allocate the `lastXBytes` array on every iteration. - Seek for the boundary match on character level rather than doing the entire `sameElements` comparison each iteration. - Avoid truncating file/stream on the end by not writing bytes until we know that they are not part of the boundary. - Make temp file threshold for multipart configurable via `JdkHttpServerOptions`
According to the spec the boundary should never be longer than 70 chars and that also avoids some potential problems while parsing a body.
Took some time, but this looks good - thanks for your work! :) |
Like @adamw predicted in this #3132 (comment) it would be smart to write big parts to temporary files rather than keeping it in memory no matter what (We just ran into some heap oom's on a small application which handles some uploading 😄)
This PR is an attempt at implementing this.
Not the greatest at this kind of stuff, so feel free to go hard on the feedback 😄