diff --git a/benchmark/src/jmh/kotlin/kotlinx/benchmarks/json/JacksonComparisonBenchmark.kt b/benchmark/src/jmh/kotlin/kotlinx/benchmarks/json/JacksonComparisonBenchmark.kt index c192fc03a2..d162418c90 100644 --- a/benchmark/src/jmh/kotlin/kotlinx/benchmarks/json/JacksonComparisonBenchmark.kt +++ b/benchmark/src/jmh/kotlin/kotlinx/benchmarks/json/JacksonComparisonBenchmark.kt @@ -8,6 +8,7 @@ import kotlinx.serialization.json.okio.encodeToBufferedSink import okio.blackholeSink import okio.buffer import org.openjdk.jmh.annotations.* +import java.io.ByteArrayInputStream import java.io.OutputStream import java.util.concurrent.* @@ -75,6 +76,7 @@ open class JacksonComparisonBenchmark { } private val stringData = Json.encodeToString(DefaultPixelEvent.serializer(), data) + private val utf8BytesData = stringData.toByteArray() @Serializable private class SmallDataClass(val id: Int, val name: String) @@ -96,6 +98,9 @@ open class JacksonComparisonBenchmark { @Benchmark fun kotlinToStream() = Json.encodeToStream(DefaultPixelEvent.serializer(), data, devNullStream) + @Benchmark + fun kotlinFromStream() = Json.decodeFromStream(DefaultPixelEvent.serializer(), ByteArrayInputStream(utf8BytesData)) + @Benchmark fun kotlinToOkio() = Json.encodeToBufferedSink(DefaultPixelEvent.serializer(), data, devNullSink) diff --git a/formats/json-tests/jvmTest/src/kotlinx/serialization/json/JsonConcurrentStressTest.kt b/formats/json-tests/jvmTest/src/kotlinx/serialization/json/JsonConcurrentStressTest.kt new file mode 100644 index 0000000000..cdcbab792e --- /dev/null +++ b/formats/json-tests/jvmTest/src/kotlinx/serialization/json/JsonConcurrentStressTest.kt @@ -0,0 +1,78 @@ +package kotlinx.serialization.json + +import kotlinx.coroutines.* +import kotlinx.serialization.Serializable +import kotlinx.serialization.builtins.* +import org.junit.Test +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream +import kotlin.random.* +import kotlin.test.* + +// Stresses out that JSON decoded in parallel does not interfere (mostly via caching of various buffers) +class JsonConcurrentStressTest : JsonTestBase() { + private val charset = "ABCDEFGHIJKLMNOPQRSTUVWXTZabcdefghiklmnopqrstuvwxyz0123456789" + + @Test + fun testDecodeInParallelSimpleList() = doTest(100) { mode -> + val value = (1..10000).map { Random.nextDouble() } + val string = Json.encodeToString(ListSerializer(Double.serializer()), value, mode) + assertEquals(value, Json.decodeFromString(ListSerializer(Double.serializer()), string, mode)) + } + + @Serializable + data class Foo(val s: String, val f: Foo?) + + @Test + fun testDecodeInParallelListOfPojo() = doTest(1_000) { mode -> + val value = (1..100).map { + val randomString = getRandomString() + val nestedFoo = Foo("null抢\u000E鋽윝䑜厼\uF70A紲ᢨ䣠null⛾䉻嘖緝ᯧnull쎶\u0005null" + randomString, null) + Foo(getRandomString(), nestedFoo) + } + val string = Json.encodeToString(ListSerializer(Foo.serializer()), value, mode) + assertEquals(value, Json.decodeFromString(ListSerializer(Foo.serializer()), string, mode)) + } + + @Test + fun testDecodeInParallelPojo() = doTest(100_000) { mode -> + val randomString = getRandomString() + val nestedFoo = Foo("null抢\u000E鋽윝䑜厼\uF70A紲ᢨ䣠null⛾䉻嘖緝ᯧnull쎶\u0005null" + randomString, null) + val randomFoo = Foo(getRandomString(), nestedFoo) + val string = Json.encodeToString(Foo.serializer(), randomFoo, mode) + assertEquals(randomFoo, Json.decodeFromString(Foo.serializer(), string, mode)) + } + + @Test + fun testDecodeInParallelSequencePojo() = runBlocking { + for (i in 1 until 1_000) { + launch(Dispatchers.Default) { + val values = (1..100).map { + val randomString = getRandomString() + val nestedFoo = Foo("null抢\u000E鋽윝䑜厼\uF70A紲ᢨ䣠null⛾䉻嘖緝ᯧnull쎶\u0005null" + randomString, null) + Foo(getRandomString(), nestedFoo) + } + val baos = ByteArrayOutputStream() + for (value in values) { + Json.encodeToStream(Foo.serializer(), value, baos) + } + val bais = ByteArrayInputStream(baos.toByteArray()) + assertEquals(values, Json.decodeToSequence(bais, Foo.serializer()).toList()) + } + } + } + + private fun getRandomString() = (1..Random.nextInt(0, charset.length)).map { charset[it] }.joinToString(separator = "") + + private fun doTest(iterations: Int, block: (JsonTestingMode) -> Unit) { + runBlocking { + for (i in 1 until iterations) { + launch(Dispatchers.Default) { + parametrizedTest { + block(it) + } + } + } + } + } +} diff --git a/formats/json-tests/jvmTest/src/kotlinx/serialization/json/ParallelJsonStressTest.kt b/formats/json-tests/jvmTest/src/kotlinx/serialization/json/ParallelJsonStressTest.kt deleted file mode 100644 index 3901aabecf..0000000000 --- a/formats/json-tests/jvmTest/src/kotlinx/serialization/json/ParallelJsonStressTest.kt +++ /dev/null @@ -1,22 +0,0 @@ -package kotlinx.serialization.json - -import kotlinx.coroutines.* -import kotlinx.serialization.builtins.* -import kotlinx.serialization.test.* -import org.junit.* -import kotlin.concurrent.* -import kotlin.random.* - -class ParallelJsonStressTest : JsonTestBase() { - private val iterations = 1_000_000 - - @Test - fun testDecodeInParallel() = runBlocking { - for (i in 1..1000) { - launch(Dispatchers.Default) { - val value = (1..10000).map { Random.nextDouble() } - assertSerializedAndRestored(value, ListSerializer(Double.serializer())) - } - } - } -} diff --git a/formats/json/commonMain/src/kotlinx/serialization/json/internal/CharArrayPool.common.kt b/formats/json/commonMain/src/kotlinx/serialization/json/internal/CharArrayPool.common.kt new file mode 100644 index 0000000000..920b65b1e6 --- /dev/null +++ b/formats/json/commonMain/src/kotlinx/serialization/json/internal/CharArrayPool.common.kt @@ -0,0 +1,9 @@ +/* + * Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +package kotlinx.serialization.json.internal + +internal expect object CharArrayPoolBatchSize { + fun take(): CharArray + fun release(array: CharArray) +} diff --git a/formats/json/commonMain/src/kotlinx/serialization/json/internal/JsonStreams.kt b/formats/json/commonMain/src/kotlinx/serialization/json/internal/JsonStreams.kt index c641f4fcc0..74dbe45d8a 100644 --- a/formats/json/commonMain/src/kotlinx/serialization/json/internal/JsonStreams.kt +++ b/formats/json/commonMain/src/kotlinx/serialization/json/internal/JsonStreams.kt @@ -34,10 +34,14 @@ internal fun Json.decodeByReader( reader: SerialReader ): T { val lexer = ReaderJsonLexer(reader) - val input = StreamingJsonDecoder(this, WriteMode.OBJ, lexer, deserializer.descriptor, null) - val result = input.decodeSerializableValue(deserializer) - lexer.expectEof() - return result + try { + val input = StreamingJsonDecoder(this, WriteMode.OBJ, lexer, deserializer.descriptor, null) + val result = input.decodeSerializableValue(deserializer) + lexer.expectEof() + return result + } finally { + lexer.release() + } } @PublishedApi @@ -47,7 +51,7 @@ internal fun Json.decodeToSequenceByReader( deserializer: DeserializationStrategy, format: DecodeSequenceMode = DecodeSequenceMode.AUTO_DETECT ): Sequence { - val lexer = ReaderJsonLexer(reader) + val lexer = ReaderJsonLexer(reader, CharArray(BATCH_SIZE)) // Unpooled buffer due to lazy nature of sequence val iter = JsonIterator(format, this, lexer, deserializer) return Sequence { iter }.constrainOnce() } diff --git a/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/JsonLexer.kt b/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/JsonLexer.kt index 83483eac48..fa4772c7d2 100644 --- a/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/JsonLexer.kt +++ b/formats/json/commonMain/src/kotlinx/serialization/json/internal/lexer/JsonLexer.kt @@ -14,7 +14,7 @@ private const val DEFAULT_THRESHOLD = 128 * For some reason this hand-rolled implementation is faster than * fun ArrayAsSequence(s: CharArray): CharSequence = java.nio.CharBuffer.wrap(s, 0, length) */ -internal class ArrayAsSequence(val buffer: CharArray) : CharSequence { +internal class ArrayAsSequence(private val buffer: CharArray) : CharSequence { override var length: Int = buffer.size override fun get(index: Int): Char = buffer[index] @@ -34,11 +34,11 @@ internal class ArrayAsSequence(val buffer: CharArray) : CharSequence { internal class ReaderJsonLexer( private val reader: SerialReader, - charsBuffer: CharArray = CharArray(BATCH_SIZE) + private val buffer: CharArray = CharArrayPoolBatchSize.take() ) : AbstractJsonLexer() { private var threshold: Int = DEFAULT_THRESHOLD // chars - override val source: ArrayAsSequence = ArrayAsSequence(charsBuffer) + override val source: ArrayAsSequence = ArrayAsSequence(buffer) init { preload(0) @@ -177,4 +177,8 @@ internal class ReaderJsonLexer( // Can be carefully implemented but postponed for now override fun consumeLeadingMatchingValue(keyToMatch: String, isLenient: Boolean): String? = null + + fun release() { + CharArrayPoolBatchSize.release(buffer) + } } diff --git a/formats/json/jsMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt b/formats/json/jsMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt new file mode 100644 index 0000000000..3f27896f05 --- /dev/null +++ b/formats/json/jsMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt @@ -0,0 +1,12 @@ +/* + * Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +package kotlinx.serialization.json.internal + + +internal actual object CharArrayPoolBatchSize { + + actual fun take(): CharArray = CharArray(BATCH_SIZE) + + actual fun release(array: CharArray) = Unit +} diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt index e51b3de38b..08d22ec83d 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt @@ -1,10 +1,14 @@ +/* + * Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ package kotlinx.serialization.json.internal import java.util.concurrent.* -internal object CharArrayPool { +internal open class CharArrayPoolBase { private val arrays = ArrayDeque() private var charsTotal = 0 + /* * Not really documented kill switch as a workaround for potential * (unlikely) problems with memory consumptions. @@ -13,7 +17,7 @@ internal object CharArrayPool { System.getProperty("kotlinx.serialization.json.pool.size").toIntOrNull() }.getOrNull() ?: 1024 * 1024 // 2 MB seems to be a reasonable constraint, (1M of chars) - fun take(): CharArray { + protected fun take(size: Int): CharArray { /* * Initially the pool is empty, so an instance will be allocated * and the pool will be populated in the 'release' @@ -21,12 +25,30 @@ internal object CharArrayPool { val candidate = synchronized(this) { arrays.removeLastOrNull()?.also { charsTotal -= it.size } } - return candidate ?: CharArray(128) + return candidate ?: CharArray(size) } - fun release(array: CharArray) = synchronized(this) { + protected fun releaseImpl(array: CharArray): Unit = synchronized(this) { if (charsTotal + array.size >= MAX_CHARS_IN_POOL) return@synchronized charsTotal += array.size arrays.addLast(array) } } + +internal object CharArrayPool : CharArrayPoolBase() { + fun take(): CharArray = super.take(128) + + // Can release array of an arbitrary size + fun release(array: CharArray) = releaseImpl(array) +} + +// Pools char arrays of size 16K +internal actual object CharArrayPoolBatchSize : CharArrayPoolBase() { + + actual fun take(): CharArray = super.take(BATCH_SIZE) + + actual fun release(array: CharArray) { + require(array.size == BATCH_SIZE) { "Inconsistent internal invariant: unexpected array size ${array.size}" } + releaseImpl(array) + } +} diff --git a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt index 17eac0a2ae..746c0441ab 100644 --- a/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt +++ b/formats/json/jvmMain/src/kotlinx/serialization/json/internal/JvmJsonStreams.kt @@ -68,12 +68,14 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr 0.toByte() -> { charArray[sz++] = ch.toChar() } + 1.toByte() -> { val escapedString = ESCAPE_STRINGS[ch]!! sz = ensureTotalCapacity(sz, escapedString.length) escapedString.toCharArray(charArray, sz, 0, escapedString.length) sz += escapedString.length } + else -> { charArray[sz] = '\\' charArray[sz + 1] = marker.toInt().toChar() @@ -204,9 +206,9 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr } } - /* - Sources taken from okio library with minor changes, see https://github.com/square/okio - */ + /** + * Sources taken from okio library with minor changes, see https://github.com/square/okio + */ private fun writeUtf8CodePoint(codePoint: Int) { when { codePoint < 0x80 -> { @@ -214,17 +216,20 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr ensure(1) write(codePoint) } + codePoint < 0x800 -> { // Emit a 11-bit code point with 2 bytes. ensure(2) write(codePoint shr 6 or 0xc0) // 110xxxxx write(codePoint and 0x3f or 0x80) // 10xxxxxx } + codePoint in 0xd800..0xdfff -> { // Emit a replacement character for a partial surrogate. ensure(1) write('?'.code) } + codePoint < 0x10000 -> { // Emit a 16-bit code point with 3 bytes. ensure(3) @@ -232,6 +237,7 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr write(codePoint shr 6 and 0x3f or 0x80) // 10xxxxxx write(codePoint and 0x3f or 0x80) // 10xxxxxx } + codePoint <= 0x10ffff -> { // Emit a 21-bit code point with 4 bytes. ensure(4) @@ -240,6 +246,7 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr write(codePoint shr 6 and 0x3f or 0x80) // 10xxyyyy write(codePoint and 0x3f or 0x80) // 10yyyyyy } + else -> { throw JsonEncodingException("Unexpected code point: $codePoint") } @@ -247,11 +254,8 @@ internal class JsonToJavaStreamWriter(private val stream: OutputStream) : JsonWr } } -internal class JavaStreamSerialReader( - stream: InputStream, - charset: Charset = Charsets.UTF_8 -) : SerialReader { - private val reader = stream.reader(charset) +internal class JavaStreamSerialReader(stream: InputStream) : SerialReader { + private val reader = stream.reader(Charsets.UTF_8) override fun read(buffer: CharArray, bufferOffset: Int, count: Int): Int { return reader.read(buffer, bufferOffset, count) diff --git a/formats/json/nativeMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt b/formats/json/nativeMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt new file mode 100644 index 0000000000..03c99262be --- /dev/null +++ b/formats/json/nativeMain/src/kotlinx/serialization/json/internal/CharArrayPool.kt @@ -0,0 +1,9 @@ +/* + * Copyright 2017-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ +package kotlinx.serialization.json.internal + +internal actual object CharArrayPoolBatchSize { + actual fun take(): CharArray = CharArray(BATCH_SIZE) + actual fun release(array: CharArray) = Unit +} diff --git a/gradle.properties b/gradle.properties index f1ea1fb117..03d74281d1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,7 +17,8 @@ dokka_version=1.7.0 native.deploy= validator_version=0.11.0 knit_version=0.4.0 -coroutines_version=1.3.9 +# Only for tests +coroutines_version=1.6.4 kover_version=0.4.2 okio_version=3.1.0