Skip to content

Commit

Permalink
Hide JsonIterator and provide LazyStreamingMode
Browse files Browse the repository at this point in the history
  • Loading branch information
sandwwraith committed Oct 8, 2021
1 parent d41ca9a commit 0354812
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 59 deletions.
44 changes: 15 additions & 29 deletions formats/json/jvmMain/src/kotlinx/serialization/json/JvmStreams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,38 +81,24 @@ public fun <T> Json.decodeFromStream(
public inline fun <reified T> Json.decodeFromStream(stream: InputStream): T =
decodeFromStream(serializersModule.serializer(), stream)

public sealed interface JsonIterator {
public fun <T> next(deserializer: DeserializationStrategy<T>): T

public fun hasNext(): Boolean
public enum class LazyStreamingFormat {
WHITESPACE_SEPARATED,
ARRAY_WRAPPED,
AUTO_DETECT; // Or camel case?
}

public fun <T> JsonIterator.asIterator(deserializer: DeserializationStrategy<T>): Iterator<T> =
object : Iterator<T> {
override fun hasNext(): Boolean = this@asIterator.hasNext()

override fun next(): T = this@asIterator.next(deserializer)
}

public fun <T> JsonIterator.asSequence(deserializer: DeserializationStrategy<T>): Sequence<T> = asIterator(deserializer).asSequence()

public fun Json.iterateOverStream(stream: InputStream): JsonIterator {
public fun <T> Json.decodeToSequence(
stream: InputStream,
deserializer: DeserializationStrategy<T>,
format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT
): Sequence<T> {
val lexer = ReaderJsonLexer(stream)
return JsonIteratorImpl(this, lexer)
val iter = JsonIterator(format, this, lexer, deserializer)
return Sequence { iter }.constrainOnce()
}

public fun <T> Json.decodeToSequence(stream: InputStream, deserializer: DeserializationStrategy<T>): Sequence<T> =
Sequence { iterateOverStream(stream).asIterator(deserializer) }.constrainOnce() // or just iterateOverStream().asSequence(deserializer)

public inline fun <reified T> Json.decodeToSequence(
stream: InputStream,
format: LazyStreamingFormat = LazyStreamingFormat.AUTO_DETECT
): Sequence<T> = decodeToSequence(stream, serializersModule.serializer(), format)




internal class JsonIteratorImpl(private val json: Json, private val lexer: ReaderJsonLexer): JsonIterator {
override fun <T> next(deserializer: DeserializationStrategy<T>): T {
val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
return input.decodeSerializableValue(deserializer)
}

override fun hasNext(): Boolean = lexer.isNotEof()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2017-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("FunctionName")

package kotlinx.serialization.json.internal

import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.json.*

internal fun <T> JsonIterator(
mode: LazyStreamingFormat,
json: Json,
lexer: ReaderJsonLexer,
deserializer: DeserializationStrategy<T>
): Iterator<T> = when (lexer.determineFormat(mode)) {
LazyStreamingFormat.WHITESPACE_SEPARATED -> JsonIteratorWsSeparated(
json,
lexer,
deserializer
) // Can be many WS-separated independent arrays
LazyStreamingFormat.ARRAY_WRAPPED -> JsonIteratorArrayWrapped(
json,
lexer,
deserializer
)
LazyStreamingFormat.AUTO_DETECT -> error("AbstractJsonLexer.determineFormat must be called beforehand.")
}


private fun AbstractJsonLexer.determineFormat(suggested: LazyStreamingFormat): LazyStreamingFormat = when (suggested) {
LazyStreamingFormat.WHITESPACE_SEPARATED ->
LazyStreamingFormat.WHITESPACE_SEPARATED // do not call consumeStartArray here so we don't confuse parser with stream of lists
LazyStreamingFormat.ARRAY_WRAPPED ->
if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED
else throw JsonDecodingException("Expected array start")
LazyStreamingFormat.AUTO_DETECT ->
if (tryConsumeStartArray()) LazyStreamingFormat.ARRAY_WRAPPED
else LazyStreamingFormat.WHITESPACE_SEPARATED
}

private fun AbstractJsonLexer.tryConsumeStartArray(): Boolean {
if (peekNextToken() == TC_BEGIN_LIST) {
consumeNextToken(TC_BEGIN_LIST)
return true
}
return false
}

private class JsonIteratorWsSeparated<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
override fun next(): T =
StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
.decodeSerializableValue(deserializer)

override fun hasNext(): Boolean = lexer.isNotEof()
}

private class JsonIteratorArrayWrapped<T>(
private val json: Json,
private val lexer: ReaderJsonLexer,
private val deserializer: DeserializationStrategy<T>
) : Iterator<T> {
private var first = true

override fun next(): T {
if (first) {
first = false
} else {
lexer.consumeNextToken(COMMA)
}
val input = StreamingJsonDecoder(json, WriteMode.OBJ, lexer, deserializer.descriptor)
return input.decodeSerializableValue(deserializer)
}

// todo: should we check comma here?
override fun hasNext(): Boolean {
if (lexer.peekNextToken() == TC_END_LIST) {
lexer.consumeNextToken(TC_END_LIST)
if (lexer.isNotEof()) {
// allow unclosed? depend on isLenient?
if (lexer.peekNextToken() == TC_BEGIN_LIST) throw JsonDecodingException("Looks like you need another mode")
throw JsonDecodingException("Dangling data after last bracket")
} // todo: replace with lexer.fail?
return false
}
if (!lexer.isNotEof()) throw JsonDecodingException("Unexpected EOF: no closing bracket")
return true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.*
import kotlinx.serialization.builtins.serializer
import kotlinx.serialization.json.*
import kotlinx.serialization.json.internal.JsonDecodingException
import kotlinx.serialization.test.assertFailsWithMessage
import org.junit.Test
import java.io.*
Expand All @@ -23,15 +25,16 @@ class JsonStreamFlowTest {
}
}

suspend inline fun <reified T> Json.readFromStream(iss: InputStream): Flow<T> = flow {
val iter = iterateOverStream(iss)
private suspend inline fun <reified T> Json.readFromStream(iss: InputStream): Flow<T> = flow {
val serial = serializer<T>()
val iter = iterateOverStream(iss, serial)
while (iter.hasNext()) {
emit(iter.next(serial))
emit(iter.next())
}
}.flowOn(Dispatchers.IO)

val inputString = """{"data":"a"}{"data":"b"}{"data":"c"}"""
val inputStringWsSeparated = """{"data":"a"}{"data":"b"}{"data":"c"}"""
val inputStringWrapped = """[{"data":"a"},{"data":"b"},{"data":"c"}]"""
val inputList = listOf(StringData("a"), StringData("b"), StringData("c"))

@Test
Expand All @@ -43,68 +46,102 @@ class JsonStreamFlowTest {
f.writeToStream(os)
}

assertEquals(inputString, os.toString(Charsets.UTF_8.name()))
assertEquals(inputStringWsSeparated, os.toString(Charsets.UTF_8.name()))
}

@Test
fun testDecodeSeveralItems() {
val ins = ByteArrayInputStream(inputString.encodeToByteArray())
val ins = ByteArrayInputStream(inputStringWsSeparated.encodeToByteArray())
assertFailsWithMessage<SerializationException>("EOF") {
json.decodeFromStream<StringData>(ins)
}
}

inline fun <reified T> JsonIterator.assertNext(expected: T) {
private inline fun <reified T> Iterator<T>.assertNext(expected: T) {
assertTrue(hasNext())
assertEquals(expected, next(serializer()))
assertEquals(expected, next())
}

@Test
fun testIterateSeveralItems() {
private fun <T> Json.iterateOverStream(stream: InputStream, deserializer: DeserializationStrategy<T>): Iterator<T> =
decodeToSequence(stream, deserializer).iterator()

private fun withInputs(vararg inputs: String = arrayOf(inputStringWsSeparated, inputStringWrapped), block: (InputStream) -> Unit) {
for (input in inputs) {
val res = runCatching { block(input.asInputStream()) }
if (res.isFailure) throw AssertionError("Failed test with input $input", res.exceptionOrNull())
}
}

val ins = ByteArrayInputStream(inputString.encodeToByteArray())
val iter = json.iterateOverStream(ins)
private fun String.asInputStream() = ByteArrayInputStream(this.encodeToByteArray())

@Test
fun testIterateSeveralItems() = withInputs { ins ->
val iter = json.iterateOverStream(ins, StringData.serializer())
iter.assertNext(StringData("a"))
iter.assertNext(StringData("b"))
iter.assertNext(StringData("c"))
assertFalse(iter.hasNext())
assertFailsWithMessage<SerializationException>("EOF") {
iter.next(StringData.serializer())
iter.next()
}
}

@Test
fun testDecodeToSequence() {
val ins = ByteArrayInputStream(inputString.encodeToByteArray())
assertEquals(inputList, json.decodeToSequence(ins, StringData.serializer()).toList())
fun testDecodeToSequence() = withInputs { ins ->
val sequence = json.decodeToSequence(ins, StringData.serializer())
assertEquals(inputList, sequence.toList(), "For input $inputStringWsSeparated")
assertFailsWith<IllegalStateException> { sequence.toList() } // assert constrained once
}

@Test
fun testDecodeAsFlow() {
val ins = ByteArrayInputStream(inputString.encodeToByteArray())
fun testDecodeAsFlow() = withInputs { ins ->
val list = runBlocking {
buildList { json.readFromStream<StringData>(ins).toCollection(this) }
}
assertEquals(inputList, list)
}

@Test
fun testDecodeDifferentItems() {
val input = """{"data":"a"}{"intV":10}null{"data":"b"}"""
val ins = ByteArrayInputStream(input.encodeToByteArray())
val iter = json.iterateOverStream(ins)
iter.assertNext(StringData("a"))
iter.assertNext(IntData(10))
iter.assertNext<String?>(null)
iter.assertNext(StringData("b"))
assertFalse(iter.hasNext())
}

@Test
fun testItemsSeparatedByWs() {
val input = "{\"data\":\"a\"} {\"data\":\"b\"}\n\t{\"data\":\"c\"}"
val ins = ByteArrayInputStream(input.encodeToByteArray())
assertEquals(inputList, json.decodeToSequence(ins, StringData.serializer()).toList())
}

@Test
fun testMalformedArray() {
val input1 = """[1, 2, 3"""
val input2 = """[1, 2, 3]qwert"""
val input3 = """[1,2 3]"""
withInputs(input1, input2, input3) {
assertFailsWith<JsonDecodingException> {
json.decodeToSequence(it, Int.serializer()).toList()
}
}
}

@Test
fun testMultilineArrays() {
val input = "[1,2,3]\n[4,5,6]\n[7,8,9]"
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<List<Int>>(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList()
}
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<Int>(input.asInputStream(), LazyStreamingFormat.AUTO_DETECT).toList()
}
assertFailsWith<JsonDecodingException> { // we do not merge lists
json.decodeToSequence<Int>(input.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED).toList()
}
val parsed = json.decodeToSequence<List<Int>>(input.asInputStream(), LazyStreamingFormat.WHITESPACE_SEPARATED).toList()
val expected = listOf(listOf(1,2,3), listOf(4,5,6), listOf(7,8,9))
assertEquals(expected, parsed)
}

@Test
fun testStrictArrayCheck() {
assertFailsWith<JsonDecodingException> {
json.decodeToSequence<StringData>(inputStringWsSeparated.asInputStream(), LazyStreamingFormat.ARRAY_WRAPPED)
}
}

}

0 comments on commit 0354812

Please sign in to comment.