Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to decode more than one object from InputStream #1662

Closed
sandwwraith opened this issue Sep 6, 2021 · 9 comments
Closed

Allow to decode more than one object from InputStream #1662

sandwwraith opened this issue Sep 6, 2021 · 9 comments

Comments

@sandwwraith
Copy link
Member

This is a meta issue for collecting various use-cases for designing an API that will allow to actually 'stream' objects from InputStream, instead of decoding a single one using Json.decodeFromStream.

Please describe your use case in detail, if this is an HTTP long-polling stream or WebSocket, first-party or third-party API, or just a very large file.

@chris-hatton
Copy link

chris-hatton commented Sep 6, 2021

Really happy to see this question being asked. I can provide a concrete example:

I'm implementing an App where a Mobile or Desktop client scans food product barcodes and I look up some basic product info from several upstream, 3rd party services. The general state of food product barcodes, is that no single 'source of truth' API exists; it's necessary to integrate with multiple 3rd party services to achieve 'good enough' coverage across even a typical mix of domestic groceries.

So, my own back-end takes the barcode request from the client and will broadcast this request to multiple upstream API's concurrently. Now, these services each have their own varied response times, but I wish to update my client as each response comes in, as soon as possible, to achieve a well-performing user interface. It feels like a pragmatic solution for the response (from my back-end server to the client) to be a long-lived HTTP stream, on which I will 'stream' back all the up-stream responses, one after the other, until all upstream backends have provided their answers, then I'll close the response to the client. I expect this process to take around 2-8 seconds overall.

I appreciate that was a verbose explanation; but maybe the context helps.

TL;DR
In more concise technical language; I want to be able to issue a request and have several asynchronous processes provide their own response objects to the requestor within the same HTTP request, and in the most timely manner possible: That means streaming several complete JSON objects on the same stream, closing the stream after the last one is transmitted.

@rgmz
Copy link

rgmz commented Sep 7, 2021

Our use case is ingesting >1GB JSON files provided by a vendor, read from the file system. For example:

{
    "items": [
        { ... }
    ]
}

Presently, we're using Vert.x's JsonParser utility; however, being able to do this with Kotlin Serialization would be great.

Vert.x JsonParser example
package foo;
​
import io.vertx.core.json.JsonObject
import io.vertx.core.parsetools.JsonEventType
import io.vertx.core.parsetools.JsonParser
import io.vertx.kotlin.core.file.openOptionsOf
import io.vertx.kotlin.coroutines.CoroutineVerticle
import io.vertx.kotlin.coroutines.await/**
 * Whether the [JsonParser] has begun parsing the "items" field.
 */
enum class ParseState { NOT_STARTED, STARTED, FINISHED }
​
class MainVerticle2 : CoroutineVerticle() {
    override suspend fun start() {
        var state = ParseState.NOT_STARTED// Open a ReadStream of the large JSON file.
        val asyncFile = vertx.fileSystem()
            .open("/path/to/large.json", openOptionsOf())
            .await()
​
        // Parse the file's ReadStream.
        val parser = JsonParser.newParser(asyncFile)
            // Parse JsonObjects as a single value, rather than separate field/token events.
            // e.g. VALUE->{"foo": ["bar"]} vs [START_OBJECT,VALUE->"foo",START_ARRAY...].
            .objectValueMode()
​
        parser.handler { event ->
            when (event.type()) {
                JsonEventType.START_ARRAY -> {
                    if (state == ParseState.NOT_STARTED && event.fieldName() == "items") {
                        // Indicate that we're parsing the "items".
                        state = ParseState.STARTED
                    }
                }
                JsonEventType.END_ARRAY -> {
                    if (state == ParseState.STARTED) {
                        // Stop the parser once all items have been read.
                        state = ParseState.FINISHED
                        parser.end()
                    }
                }
                JsonEventType.VALUE -> {
                    if (state == ParseState.STARTED) {
                        // Consume individual items.
                        val item = event.value() as JsonObject
                        // TODO: do something with item
                    }
                }
            }
        }
    }
}

@psh
Copy link

psh commented Sep 7, 2021

Our app interfaces with external (3rd party) hardware that sends multiple JSON messages over a TCP socket connection. Polymorphic JSON over a streaming connection, that we want to expose to the rest of our app as a Flow.

@lion7
Copy link
Contributor

lion7 commented Sep 7, 2021

I'd like to tail a JSON lines file from a potentially infinite stream. Examples of this format can be found here: https://jsonlines.org/examples/ and look like this:

["Name", "Session", "Score", "Completed"]
["Gilbert", "2013", 24, true]
["Alexa", "2013", 29, true]
["May", "2012B", 14, false]
["Deloise", "2012A", 19, true] 

Note that this format is somewhat different from the JSON spec.
Instead of a really large array containing the objects, multiple 'root' objects are used where each object is separated by a line separator.

In practice we often use logback with https://github.com/logstash/logstash-logback-encoder for our logging, which writes each log event as a single-line JSON to some output such as a file or socket.

@OliverO2
Copy link

OliverO2 commented Sep 7, 2021

My use case is using WebSocket streaming to drive the entire user interface.

  • Backend -> frontend: The backend knows or tries to predict the object tree required by the frontend and sends it. The backend also sends updates to existing objects and additional objects in response to frontend requests.
  • Frontend -> backend: The frontend sends requests for additional objects as necessary (e.g. lists rendering more elements than expected, additional content made visible by scrolling). The frontend also sends updates to objects resulting from user interaction.

The current implementation uses streams of Event objects (one stream per direction) and serializes/deserializes them into/from WebSocket frames via ProtoBuf. It also uses special synchronization events for backpressure (e.g. throttling the backend when it sends updates faster than the frontend can process them).

So although this is a streaming scenario, serialization is currently dealing with single objects only (events enveloped in frames). A drawback of the current approach is that a large event might have to be split into (artificially created) smaller chunks before serialization, to allow for fine-grained backpressure (in-flight synchronization events) and stuff like progress indicators. Chunking can occur at the binary level, though.

@jeffdcamp
Copy link

Our use case is that we call an https JSON REST service that retrieves all items for a user on a mobile phone. We save the full response to a local json file (we don't hold the stream open, or process data while we are downloading from the REST call), then we use gson to open the saved json file as a stream, and read each item individually and save each item to a local database (we never have more than 1 read item in memory at a time).

  • We cannot load the full json file into memory, because we would get an out of memory exception
  • We have some calls that would return more that 100,000 items
  • We also do the same thing when writing changes back to the REST service. Example:
    • Read IDs for all changes from the database
    • For each item ID, we individually load a single full item record
    • Then stream the item to a local outgoing json file
    • Once all items have been written to the outgoing json file, we then https stream/send the json file contents to the service

Example Code for reading/saving json data

    // example code using gson (at this point we know the next character is a '[')
    private suspend fun itemsToDb(reader: JsonReader) {
        reader.beginArray()

        while (reader.peek() == JsonToken.BEGIN_OBJECT) {
            val itemDto = gson.fromJson<ItemDto>(reader, ItemDto::class.java)
            saveToDb(itemDto)
        }

        reader.endArray()
    }

@CharlieTap
Copy link

CharlieTap commented Sep 21, 2021

My use case is that I have a large json file which I want parse object by object, accumulating the results into sensible chunks for a memory constrained system. Ideally the API would provide both low level parser hooks like gson does i.e.

        JsonReader reader = new JsonReader(new InputStreamReader(in, "UTF-8"));
        List<Message> messages = new ArrayList<Message>();
        reader.beginArray();
        while (reader.hasNext()) {
            Message message = gson.fromJson(reader, Message.class);
            messages.add(message);
        }
        reader.endArray();
        reader.close();
        return messages;
    }

Similarly it would be nice to see a high level coroutines wrapper exposing a Flow (in the above case Flow) which would allow me to use the built in combinators to batch and transform the stream in a concurrent manor.

@GrahamBorland
Copy link

GrahamBorland commented Sep 27, 2021

My use case is streaming a huge (hundreds of MB) JSON payload, containing potentially hundreds of thousands of entities, from a REST API call into a local SQLite database. We cannot afford the memory cost of inflating this payload into a single huge model object.

Currently we do this with Gson's low-level JsonReader.

@sandwwraith
Copy link
Member Author

sandwwraith commented Oct 14, 2021

There's a prototype in #1691. It supports 'plain' objects in the stream one after another or objects, wrapped in a top-level array. You can check it out and tell if it fits your needs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants