Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CharArray caching for InputStream decoding #2100

Merged
merged 4 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.serialization.json.okio.encodeToBufferedSink
import okio.blackholeSink
import okio.buffer
import org.openjdk.jmh.annotations.*
import java.io.ByteArrayInputStream
import java.io.OutputStream
import java.util.concurrent.*

Expand Down Expand Up @@ -75,6 +76,7 @@ open class JacksonComparisonBenchmark {
}

private val stringData = Json.encodeToString(DefaultPixelEvent.serializer(), data)
private val utf8BytesData = stringData.toByteArray()

@Serializable
private class SmallDataClass(val id: Int, val name: String)
Expand All @@ -96,6 +98,9 @@ open class JacksonComparisonBenchmark {
@Benchmark
fun kotlinToStream() = Json.encodeToStream(DefaultPixelEvent.serializer(), data, devNullStream)

@Benchmark
fun kotlinFromStream() = Json.decodeFromStream(DefaultPixelEvent.serializer(), ByteArrayInputStream(utf8BytesData))

@Benchmark
fun kotlinToOkio() = Json.encodeToBufferedSink(DefaultPixelEvent.serializer(), data, devNullSink)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package kotlinx.serialization.json

import kotlinx.coroutines.*
import kotlinx.serialization.Serializable
import kotlinx.serialization.builtins.*
import org.junit.Test
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import kotlin.random.*
import kotlin.test.*

// Stresses out that JSON decoded in parallel does not interfere (mostly via caching of various buffers)
class JsonConcurrentStressTest : JsonTestBase() {
private val charset = "ABCDEFGHIJKLMNOPQRSTUVWXTZabcdefghiklmnopqrstuvwxyz0123456789"

@Test
fun testDecodeInParallelSimpleList() = doTest(100) { mode ->
val value = (1..10000).map { Random.nextDouble() }
val string = Json.encodeToString(ListSerializer(Double.serializer()), value, mode)
assertEquals(value, Json.decodeFromString(ListSerializer(Double.serializer()), string, mode))
}

@Serializable
data class Foo(val s: String, val f: Foo?)

@Test
fun testDecodeInParallelListOfPojo() = doTest(1_000) { mode ->
val value = (1..100).map {
val randomString = getRandomString()
val nestedFoo = Foo("null抢\u000E鋽윝䑜厼\uF70A紲ᢨ䣠null⛾䉻嘖緝ᯧnull쎶\u0005null" + randomString, null)
Foo(getRandomString(), nestedFoo)
}
val string = Json.encodeToString(ListSerializer(Foo.serializer()), value, mode)
assertEquals(value, Json.decodeFromString(ListSerializer(Foo.serializer()), string, mode))
}

@Test
fun testDecodeInParallelPojo() = doTest(100_000) { mode ->
val randomString = getRandomString()
val nestedFoo = Foo("null抢\u000E鋽윝䑜厼\uF70A紲ᢨ䣠null⛾䉻嘖緝ᯧnull쎶\u0005null" + randomString, null)
val randomFoo = Foo(getRandomString(), nestedFoo)
val string = Json.encodeToString(Foo.serializer(), randomFoo, mode)
assertEquals(randomFoo, Json.decodeFromString(Foo.serializer(), string, mode))
}

@Test
fun testDecodeInParallelSequencePojo() = runBlocking<Unit> {
for (i in 1 until 1_000) {
launch(Dispatchers.Default) {
val values = (1..100).map {
val randomString = getRandomString()
val nestedFoo = Foo("null抢\u000E鋽윝䑜厼\uF70A紲ᢨ䣠null⛾䉻嘖緝ᯧnull쎶\u0005null" + randomString, null)
Foo(getRandomString(), nestedFoo)
}
val baos = ByteArrayOutputStream()
for (value in values) {
Json.encodeToStream(Foo.serializer(), value, baos)
}
val bais = ByteArrayInputStream(baos.toByteArray())
assertEquals(values, Json.decodeToSequence(bais, Foo.serializer()).toList())
}
}
}

private fun getRandomString() = (1..Random.nextInt(0, charset.length)).map { charset[it] }.joinToString(separator = "")

private fun doTest(iterations: Int, block: (JsonTestingMode) -> Unit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JsonTestingMode seems to be not used anywhere.

I'd also add test for decodeFromStream

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in the test bodies, streams are tested as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I haven't noticed because it has the default name it. Maybe add a name for better readability?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

runBlocking<Unit> {
for (i in 1 until iterations) {
launch(Dispatchers.Default) {
parametrizedTest {
block(it)
}
}
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.serialization.json.internal

internal expect object CharArrayPoolBatchSize {
fun take(): CharArray
fun release(array: CharArray)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ internal fun <T> Json.decodeByReader(
reader: SerialReader
): T {
val lexer = ReaderJsonLexer(reader)
val input = StreamingJsonDecoder(this, WriteMode.OBJ, lexer, deserializer.descriptor, null)
val result = input.decodeSerializableValue(deserializer)
lexer.expectEof()
return result
try {
val input = StreamingJsonDecoder(this, WriteMode.OBJ, lexer, deserializer.descriptor, null)
val result = input.decodeSerializableValue(deserializer)
lexer.expectEof()
return result
} finally {
lexer.release()
}
}

@PublishedApi
Expand All @@ -47,7 +51,7 @@ internal fun <T> Json.decodeToSequenceByReader(
deserializer: DeserializationStrategy<T>,
format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT
): Sequence<T> {
val lexer = ReaderJsonLexer(reader)
val lexer = ReaderJsonLexer(reader, CharArray(BATCH_SIZE)) // Unpooled buffer due to lazy nature of sequence
val iter = JsonIterator(format, this, lexer, deserializer)
return Sequence { iter }.constrainOnce()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ private const val DEFAULT_THRESHOLD = 128
* For some reason this hand-rolled implementation is faster than
* fun ArrayAsSequence(s: CharArray): CharSequence = java.nio.CharBuffer.wrap(s, 0, length)
*/
internal class ArrayAsSequence(val buffer: CharArray) : CharSequence {
internal class ArrayAsSequence(private val buffer: CharArray) : CharSequence {
override var length: Int = buffer.size

override fun get(index: Int): Char = buffer[index]
Expand All @@ -34,11 +34,11 @@ internal class ArrayAsSequence(val buffer: CharArray) : CharSequence {

internal class ReaderJsonLexer(
private val reader: SerialReader,
charsBuffer: CharArray = CharArray(BATCH_SIZE)
private val buffer: CharArray = CharArrayPoolBatchSize.take()
) : AbstractJsonLexer() {
private var threshold: Int = DEFAULT_THRESHOLD // chars

override val source: ArrayAsSequence = ArrayAsSequence(charsBuffer)
override val source: ArrayAsSequence = ArrayAsSequence(buffer)

init {
preload(0)
Expand Down Expand Up @@ -177,4 +177,8 @@ internal class ReaderJsonLexer(

// Can be carefully implemented but postponed for now
override fun consumeLeadingMatchingValue(keyToMatch: String, isLenient: Boolean): String? = null

fun release() {
CharArrayPoolBatchSize.release(buffer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.serialization.json.internal


internal actual object CharArrayPoolBatchSize {

actual fun take(): CharArray = CharArray(BATCH_SIZE)

actual fun release(array: CharArray) = Unit
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
/*
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.serialization.json.internal

import java.util.concurrent.*

internal object CharArrayPool {
internal open class CharArrayPoolBase {
private val arrays = ArrayDeque<CharArray>()
private var charsTotal = 0

/*
* Not really documented kill switch as a workaround for potential
* (unlikely) problems with memory consumptions.
Expand All @@ -13,20 +17,38 @@ internal object CharArrayPool {
System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull()
}.getOrNull() ?: 1024 * 1024 // 2 MB seems to be a reasonable constraint, (1M of chars)

fun take(): CharArray {
protected fun take(size: Int): CharArray {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to take the size in the class constructor rather than in the function call.

In fact, this is the case for each instance of the pool right now.
Another reason is a potential error if I call like this

val a = take(100)
release(a)
val b = take(100)
release(b)
val c = take(200)
c[101] // !!!

The API does not advise in any way that you should not do this.

If you add a size to the constructor, then you can also add a check for the size of the array inside the releaseImpl

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good remark for any general-purpose API, but in this situation, we are dealing with well-tailored internal pools for one specific purpose. Also, it will force us to introduce a few more bytecodes -- for class with ctor, for outer class that will store the instance etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the byte code, only a new field with the int type will be added (the parameter will be transferred from the take method to the init method), and in the child class, only the value from the parameter will be setted to the child constructor.

I think this is a good tradeoff so that if someone else refactor the code after a while, they don't add an unobvious bug.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is a protected method, I don't see that it is necessary for now, but the idea overall is a valid concern.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is indeed protected and sued as a base for the implementation.
None of the implementations are prone to the potential bug described above and I prefer to avoid overcomplications in a 50 lines of fully internal code that has a single purpose

/*
* Initially the pool is empty, so an instance will be allocated
* and the pool will be populated in the 'release'
*/
val candidate = synchronized(this) {
arrays.removeLastOrNull()?.also { charsTotal -= it.size }
}
return candidate ?: CharArray(128)
return candidate ?: CharArray(size)
}

fun release(array: CharArray) = synchronized(this) {
protected fun releaseImpl(array: CharArray): Unit = synchronized(this) {
if (charsTotal + array.size >= MAX_CHARS_IN_POOL) return@synchronized
charsTotal += array.size
arrays.addLast(array)
}
}

internal object CharArrayPool : CharArrayPoolBase() {
fun take(): CharArray = super.take(128)

// Can release array of an arbitrary size
fun release(array: CharArray) = releaseImpl(array)
}

// Pools char arrays of size 16K
internal actual object CharArrayPoolBatchSize : CharArrayPoolBase() {

actual fun take(): CharArray = super.take(BATCH_SIZE)

actual fun release(array: CharArray) {
require(array.size == BATCH_SIZE) { "Inconsistent internal invariant: unexpected array size ${array.size}" }
releaseImpl(array)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr
0.toByte() -> {
charArray[sz++] = ch.toChar()
}

1.toByte() -> {
val escapedString = ESCAPE_STRINGS[ch]!!
sz = ensureTotalCapacity(sz, escapedString.length)
escapedString.toCharArray(charArray, sz, 0, escapedString.length)
sz += escapedString.length
}

else -> {
charArray[sz] = '\\'
charArray[sz + 1] = marker.toInt().toChar()
Expand Down Expand Up @@ -204,34 +206,38 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr
}
}

/*
Sources taken from okio library with minor changes, see https://github.com/square/okio
*/
/**
* Sources taken from okio library with minor changes, see https://github.com/square/okio
*/
private fun writeUtf8CodePoint(codePoint: Int) {
when {
codePoint < 0x80 -> {
// Emit a 7-bit code point with 1 byte.
ensure(1)
write(codePoint)
}

codePoint < 0x800 -> {
// Emit a 11-bit code point with 2 bytes.
ensure(2)
write(codePoint shr 6 or 0xc0) // 110xxxxx
write(codePoint and 0x3f or 0x80) // 10xxxxxx
}

codePoint in 0xd800..0xdfff -> {
// Emit a replacement character for a partial surrogate.
ensure(1)
write('?'.code)
}

codePoint < 0x10000 -> {
// Emit a 16-bit code point with 3 bytes.
ensure(3)
write(codePoint shr 12 or 0xe0) // 1110xxxx
write(codePoint shr 6 and 0x3f or 0x80) // 10xxxxxx
write(codePoint and 0x3f or 0x80) // 10xxxxxx
}

codePoint <= 0x10ffff -> {
// Emit a 21-bit code point with 4 bytes.
ensure(4)
Expand All @@ -240,18 +246,16 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr
write(codePoint shr 6 and 0x3f or 0x80) // 10xxyyyy
write(codePoint and 0x3f or 0x80) // 10yyyyyy
}

else -> {
throw JsonEncodingException("Unexpected code point: $codePoint")
}
}
}
}

internal class JavaStreamSerialReader(
stream: InputStream,
charset: Charset = Charsets.UTF_8
) : SerialReader {
private val reader = stream.reader(charset)
internal class JavaStreamSerialReader(stream: InputStream) : SerialReader {
private val reader = stream.reader(Charsets.UTF_8)

override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int {
return reader.read(buffer, bufferOffset, count)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.serialization.json.internal

internal actual object CharArrayPoolBatchSize {
actual fun take(): CharArray = CharArray(BATCH_SIZE)
actual fun release(array: CharArray) = Unit
}
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ dokka_version=1.7.0
native.deploy=
validator_version=0.11.0
knit_version=0.4.0
coroutines_version=1.3.9
# Only for tests
coroutines_version=1.6.4
kover_version=0.4.2
okio_version=3.1.0

Expand Down