Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

okio-async module for Kotlin coroutines based asyncio #814

Closed
25 tasks
kevincianfarini opened this issue Nov 16, 2020 · 32 comments
Closed
25 tasks

okio-async module for Kotlin coroutines based asyncio #814

kevincianfarini opened this issue Nov 16, 2020 · 32 comments
Assignees
Milestone

Comments

@kevincianfarini
Copy link
Contributor

kevincianfarini commented Nov 16, 2020

I'm aware that the okio team has made attempts at integrating coroutines but have ultimately dropped the issue in hopes that Loom would solve for this. Seeing that okio is becoming increasingly multiplatform, Loom would only solve part of the problem.

I'm proposing that this issue tracks progress on a multiplatform okio artifact, okio-async, which implements non-blocking and asynchronous I/O for the following platforms.

  • JVM/Android (via java.nio)
  • Linux (via epoll io_uring)
  • macOS/iOS/watchOS (via kqueue)
  • mingw (via IOCTL)
  • node.js (not sure if this makes sense)

There has been prior work done on this. In particular, I think that okiox by @bnorm will be a good starting resource.

The following is a tracked list of functionalities which should be made available.

Tracking

  • An AsyncBuffer which serves the same purpose as Buffer, however it implements AsyncSource as well as AsyncSink.
  • Multiplexed TCP Socket I/O
    • JVM
    • Linux
    • macOS, iOS, watchOS
    • Windows
    • node.js
  • Multiplexed Server TCP Socket I/O
    • JVM
    • Linux
    • macOS, iOS, watchOS
    • Windows
    • node.js
  • Multiplexed UDP Socket I/O
    • JVM
    • Linux
    • macOS, iOS, watchOS
    • Windows
    • node.js
  • Asyncronous File I/O (see why this can't be non-blocking here)
    • JVM
    • Linux
    • macOS, iOS, watchOS
    • Windows
    • node.js
@kevincianfarini
Copy link
Contributor Author

kevincianfarini commented Nov 16, 2020

I'm interested to hear thoughts on this approach. My rationale for this approach is that those using multiplatform kotlin to interact with okio should probably also reach for coroutines for async capabilities. Therefore it makes sense to break this out into it's own module which can only really work with Kotlin.

However, I do forsee a bit of code duplication. Particularly, things like HashingSink/HashingSource must have async counterparts. In reality, they do a lot of the same work. I don't see a very easy way to bridge to two in order to de-duplicate code which would be easy.

Additionally, the tracking approach above doesn't include other inherently async I/O functions. For example, inotify for file watching could likely be made asynchronous as well. Do you have anything in mind which should be added to the list above?

@kevincianfarini
Copy link
Contributor Author

Additionally, rather than pooling our own coroutines and interacting with libc, there are a lot of pre existing libraries out there which solve part of this for us. For example, libuv which is used by node for asyncio.

@kevincianfarini kevincianfarini changed the title okio-async module for Kotlin coroutines bases asyncio okio-async module for Kotlin coroutines based asyncio Nov 16, 2020
@swankjesse
Copy link
Collaborator

Let's back up a bit... what's the goal? Supporting coroutines callers? Supporting 10K concurrent socket connections? Everytime I look into async I/O I find myself comparing it against blocking I/O for API and performance and I find blocking I/O wins.

@yschimke
Copy link
Collaborator

yschimke commented Nov 19, 2020

My 2c as unrelated observer - Kotlin coroutines and JVM+MPP IO are an unsolved problem. Consider this a problem statement by someone just happened to be walking by.

At the moment I end up with a lot of utility methods like

suspend fun ResponseBody.readString() = withContext(Dispatchers.IO) { string() }

I'm not even clear this is correct or optimal.

I'd love this solved for me in a Kotlin coroutines first way by Okio for common file, socket and stream/writer operations.

@swankjesse
Copy link
Collaborator

One challenge with designing coroutines APIs is giving callers control the boundaries. Consider this set of functions:

suspend fun Source.suspendingReadUtf8Line() = withContext(Dispatchers.IO) { readUtf8Line() }

suspend fun readNLines(n: Int): List<String> {
  val result = mutableListOf<String>()
  for (i in 0 until n) {
    result += supendingReadUtf8Line()!!
  }
  return result
}

The readNLines() function ping-pongs back and forth between dispatchers. It’s faster & simpler for the caller to scope what work happens on the I/O dispatcher, and do do all the work there.

Adding suspending functions in Okio would potentially lead users to write needlessly slow code, because they’d introduce lots of invisible dispatcher switches.

For example, imagine a suspending Moshi that parsed JSON by just making all the functions in BufferedSource suspending. Though the code would be fully coroutines-friendly, it would be a performance disaster.

@kevincianfarini
Copy link
Contributor Author

kevincianfarini commented Nov 19, 2020

Let's back up a bit

Ah sorry. I thought the desire for non-blocking I/O in okio was already set in stone.

what's the goal?......I find blocking I/O wins.

I think this question can also be answered by asking the question "why should non-blocking I/O be used at all?"

I've done some research, and it's true. In terms of throughput, blocking I/O wins. Especially when using the thread per connection model. The numbers I saw quoted most event based I/O achieves 75% of the throughput that blocking I/O does. Why is this?

Well, using tools like epoll expose events to the user on when I/O events are ready to be consumed. Consuming these events and figuring out when and where to resume code execution of, say, a couroutine is scheduling. What are operating systems really, really good at? Scheduling. It's one of their core responsibilities.

So, the effect of this is that I/O scheduling is moved from kernel space to user space. Using the thread per connection models allows for the (linux) kernel to perform scheduling versus an inferior scheduler like the one found in kotlinx.coroutines.

This isn't the whole story though. There are other confounding factors that explain why blocking I/O throughput reigns superior over event based I/O. For example, event based I/O requires more CPU time to process events than blocking I/O. In a completely single threaded program, non-blocking I/O spends most of it's time bouncing between events that need to be processed and less time actually processing the events.

So, again. Why use non-blocking I/O at all?

It's a trade off. Non-blocking I/O requires less memory (because of the limited amount of threads used) but does require more CPU time as well as dealing with inferior schedulers. Using non-blocking I/O on memory constrained devices like a low end Android phone, or a raspberry pi could prove to be beneficial in these scenarios.

In order to avoid the single threaded non-blocking I/O problem, it's a well established pattern to have a single thread that observes the availability status of I/O events and 1 (or more) worker threads that process those events.

In a concrete example, I am working on an implementation of the BitTorrent protocol (albeit slowly). The thread per connection model is less suitable because of the protocol's nature. A single torrent can possibly have hundreds of peers. Spawning (or pooling) a thread for each of those connections would have a huge memory footprint. If each thread were to take 1-2MB with 200 connections per torrent, you're looking at 200-400MB of memory to download a file. That also doesn't include the memory needed to do in memory transformations of the data, like SHA1/SHA256 block verifications.

Because these trade-offs exist with I/O I think it makes sense to offer okio-async for those who need it.

@yschimke
Copy link
Collaborator

The readNLines() function ping-pongs back and forth between dispatchers. It’s faster & simpler for the caller to scope what work happens on the I/O dispatcher, and do do all the work there.

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-i-o.html

Implementation note

This dispatcher shares threads with a Default dispatcher, so using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread. As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used) during operations over IO dispatcher.

@swankjesse
Copy link
Collaborator

@yschimke if your code assumes that context-switching to the IO dispatcher is free, then you should just write blocking code.

@yschimke
Copy link
Collaborator

@yschimke if your code assumes that context-switching to the IO dispatcher is free, then you should just write blocking code.

That seems like a harsh interpretation, by default it does what you were hoping it would. But if it's on an incompatible dispatcher it will switch, and if it needs to it cause another thread to come alive. Seems nicer than ForkJoinPool.managedBlock.

@kevincianfarini
Copy link
Contributor Author

I found this article interesting on the topic of non-blocking I/O. Specifically the part about io_uring

https://www.scylladb.com/2020/05/05/how-io_uring-and-ebpf-will-revolutionize-programming-in-linux/

@JakeWharton
Copy link
Collaborator

JakeWharton commented Nov 27, 2020 via email

@swankjesse
Copy link
Collaborator

So there's two trends that converge:

  • The I/O layers are going away from blocking toward callbacks: epoll, io_uring. Perhaps even HTTP/3 fits here.
  • The application layer is going from callbacks to blocking: Project loom, Kotlin Coroutines

What's amazing is when we have all of this stuff, developers will write obvious blocking code and the platform & libraries will transform it into fast IO events.

Where does Okio fit?! My ideal is it's just Loom, and the VM has code to map blocking I/O calls to io_uring work automatically.

Without Loom, we do suspend functions and try to get the same effect via the compiler virtualizing threads rather than the runtime virtualizing threads.

The catch I'm hung up on is that suspend functions aren't as fast to call as blocking functions. So a Moshi or Wire that used a new suspending Okio would perform terribly.

@kevincianfarini
Copy link
Contributor Author

@swankjesse

The catch I'm hung up on is that suspend functions aren't as fast to call as blocking functions. So a Moshi or Wire that used a new suspending Okio would perform terribly.

Can you elaborate?

@kevincianfarini
Copy link
Contributor Author

kevincianfarini commented Nov 28, 2020

I've been thinking about this comment over the past few days.

The catch I'm hung up on is that suspend functions aren't as fast to call as blocking functions. So a Moshi or Wire that used a new suspending Okio would perform terribly.

My understanding of the situation is this. Kotlin coroutine suspend functions might be slower because at compile time they are transformed into finite state machines. Essentially, this boils down to a giant switch statement, where each branch of the switch statement breaks apart the sequential flow of a function that could suspend via suspendCorotuine.

So, this is inherently slower because it might turn non-branchy code into branchy code. Code which doesn't need to block and suspend written in a suspend function wouldn't have an impact though, I don't think? If we have a suspend fun that doesn't call any other suspend functions, then I don't think that there would be any finite state machine inserted at compile time.

Furthermore, Loom (I think) achieves non-blocking I/O that looks like blocking I/O via their own continuation implementation. Although I only looked at this briefly, the logic seems to be the same. A finite state machine is generated that manages the state of the continuation.

Are there additional performance concerns that suspend functions have over Loom continuations that I'm not aware of?

@yschimke
Copy link
Collaborator

Loom is JDK 17 at best, otherwise effectively next LTS is 23. So it's years away from working with various other frameworks, when you don't control the whole deployment e.g. app servers or hosted setups. Android never?

While Kotlin coroutines are supported on old Android and JDKs.

So reality is some solution that bridges between these two worlds is what users will adopt.

@swankjesse
Copy link
Collaborator

swankjesse commented Nov 28, 2020

Here’s a gist that compares three equivalent functions:

  • Wire’s readVarint64() in the original Kotlin
  • That function converted to Java (via bytecode)
  • That function built on a naïve ‘suspend everything’ Okio change, then converted to Java (via bytecode)

This function is typical of the inner-loop stuff that Okio is best at. It is written in to a different standard than conventional code. In particular, we’re trying to avoid allocations, function calls, polymorphic calls, and even member field access. The more work we can do on the CPU and the CPU alone, the happier we are.

Calling from this function to a suspending function 2x per byte processed is likely to have a significant cost! Plus there’s the cost of boxing every byte and the result.

I think coroutines are great! But unfortunately just adding suspend on everything in Okio isn’t going to work if we want to retain acceptable performance. Instead we have to figure out how to mix suspending calls that refill buffers with non-suspending calls to interpret those buffers.

@kevincianfarini
Copy link
Contributor Author

In particular, we’re trying to avoid allocations, function calls, polymorphic calls, and even member field access. The more work we can do on the CPU and the CPU alone, the happier we are.

So it seems that most of these problems are derived from the continuation itself.

  • member fields -> continuation holding the contents of the stack
  • polymorphic -> ContinuationImpl : Continutation
  • Allocations -> new ContinuationImpl
  • function calls -> not seeing too much here

I'm curious to know what your thoughts are on some of the difference between kotlinx.coroutines and Loom and how they quell some of your concerns. As far as I'm aware, Loom continuations will be storing the stack frame as member variables as well. The JVM managing this will have to instantiate new continuations as well. As for the other two, I'm not so sure.

Instead we have to figure out how to mix suspending calls that refill buffers with non-suspending calls to interpret those buffers.

While interpreting those buffers which are awaiting data...what would the caller do? We could have a non-suspend function that checks to see if data is available. The other option seems to be blocking.

@swankjesse
Copy link
Collaborator

I expect Loom will be significantly faster than Kotlin coroutines because Loom can use the JVM’s existing callstack, whereas Kotlin coroutines needs an separate mechanism to track the callstack.

@swankjesse
Copy link
Collaborator

Going back to the BitTorent motivating use case, what about a design that separates the CPU-bound parsing work that uses BufferedSource from the I/O bound work that uses Sockets?

A sketch:

sealed class AsyncRequest {
  abstract val tag: Any?

  class SocketReadRequest(
    override val tag: Any?,
    val socket: Socket,
    val byteCount: Long
  ) : AsyncRequest

  // ... also write, flush, close
}

sealed class AsyncResponse {
  abstract val tag: Any?

  class SocketReadResponse(
    override val tag: Any?,
    val socket: Socket,
    val buffer: Buffer
  ) : AsyncResponse

  // ... also failure, flush, close
}

class AsyncDispatcher {
  val requests: SendChannel<AsyncRequest>()
  val results: ReceiveChannel<AsyncResponse>()
}

This is a base primitive that we can implement with NIO, Native, and even blocking potentially. Callers don’t get high-granularity suspending (because that’s a performance trap), but protocol implementers get a way to manage many sockets on a small number of threads.

Protocol implementations would slice their protocol into frames, and write a small bit of tricky code to make sure an entire frame is ready before processing it.

Heck, we could use this in OkHttp. We might even be able to borrow some code from SelectRunner, which was where I originally smashed into the difficulties of integrating coroutines with Okio.

@yschimke
Copy link
Collaborator

@swankjesse really nice analysis. Makes sense.

Now stepping back is idea we would target solving both these problems by having an efficient infra optimised version internally, but at the key user abstraction (HTTP Request for OkHttp) we'd make sure bridging to coroutines works nicely and simply.

The I/O layers are going away from blocking toward callbacks: epoll, io_uring. Perhaps even HTTP/3 fits here. The application layer is going from callbacks to blocking: Project loom, Kotlin Coroutines

What's amazing is when we have all of this stuff, developers will write obvious blocking code and the platform & libraries will transform it into fast IO events.

What are the key user abstractions for File IO? File.readLines? socket.readMoshiObject? What is required in the filesystem abstraction to make that work nicely? Anything?

@swankjesse swankjesse self-assigned this Nov 29, 2020
@swankjesse
Copy link
Collaborator

For files my first instinct is to treat random access and beginning-to-end as separate APIs.

Random access is much less frequently needed in my experience. It came up in LeakCanary and .dx merging. I'd like to defer designing this.

For beginning-to-end, programs alternate between two complimentary tasks: I/O syscalls to move data between memory and disk, and computational work to encode or decode that data. Our goal is to saturate disk and CPU and to maximize throughput by limiting context switching.

I expect our opportunities are:

  • Encouraging better policy on how many threads to use in order to saturate both disk and CPU. For example, the Wire compiler only uses 1 thread.
  • Tuning the segment size; possibly decoupling syscall buffer size from segment size
  • Reading into direct buffers instead of heap arrays
  • Prefetching to do read I/O concurrently with decoding
    I don't necessarily think we want to persue these opportunities! The cost in API and implementation complexity is real.

I don't think there's much need for accessing N files concurrently with less than N threads. Probably the best upside for async is an event-driven API like the above to avoid context switching. As with the sockets example, the events should be on big boundaries (entire file?!) and not per byte.

@swankjesse
Copy link
Collaborator

Thinking about AsyncDispatcher, the design above isn’t coroutines-native. Here’s an ergonomic interface that’ll have equivalent performance:

class AsyncDispatcher {  
  fun source(socket: Socket): AsyncSource
  fun sink(socket: Socket): AsyncSink
}

interface AsyncSink {
  suspend fun write(source: Buffer, byteCount: Long)
  suspend fun flush()
  suspend fun close()
}

interface AsyncSource {
  suspend fun read(sink: Buffer, byteCount: Long): Long
  suspend fun close()
}

We build this then we see the performance consequences of using it in OkHttp’s Http2Connection? In theory we can shrink the number of reader threads from 1 per Socket to 1 per ConnectionPool.

@kevincianfarini
Copy link
Contributor Author

I like the AsyncDispatcher approach. I'm interested to play with it and see where complexities arise is a highly concurrent protocol like BitTorrent.

I don't think there's much need for accessing N files concurrently with less than N threads. Probably the best upside for async is an event-driven API like the above to avoid context switching. As with the sockets example, the events should be on big boundaries (entire file?!) and not per byte.

Something to think about -- although Linux specific, io_uring allows for asyncio on both sockets and files with the options to poll instead of making heavy kernel system calls. It's possible to perform I/O using io_uring with very few kernel to user space context switches.

@yschimke
Copy link
Collaborator

For JVM - I think this class could usefully bridge from coroutines safe code to blocking regular java IO.

    fileSystem.read(journalFile) {
      val magic = readUtf8LineStrict()
      val version = readUtf8LineStrict()
      val appVersionString = readUtf8LineStrict()
      val valueCountString = readUtf8LineStrict()
      val blank = readUtf8LineStrict()

Rather than duplicating all the readX and writeX methods, switch to Dispatchers.IO and allow safe calls, potentially with a NonCancellable context thrown in for good measure?

@s949492225
Copy link

I would like to know whether there is any new progress of OKIO-ASYNC module,
or any new discussion results,
and whether OKIO-ASYNC will be supported in the future

@swankjesse
Copy link
Collaborator

@s949492225 what's your use case?

@s949492225
Copy link

@swankjesse Just read or write files without blocking the thread

@swankjesse
Copy link
Collaborator

Got it. Would you consider moving your I/O operation to another thread using Dispatchers.IO? That way control flow switches threads once (very efficient) and not once per byte (very inefficient).

@s949492225
Copy link

s949492225 commented Jul 2, 2021

@swankjesse
I actually did this, but it would block the IO thread.
I'd like to have a mechanism where the stream operation can be handled by the operating system and then call back to me the result, preferably suspend, so that the originating thread isn't blocked and it can do something else while it waits,
I wonder if such a scenario is possible?

@swankjesse
Copy link
Collaborator

Blocking the IO thread is the best we can do. All async frameworks keep an extra IO thread (or multiple) just to do blocking select() calls.

We're a year or two out from Loom, which will dramatically lower the scalability cost of blocking a thread. When that lands, Okio’s blocking model will be both the simplest and most efficient.

@s949492225
Copy link

@swankjesse I see. Thank you

@swankjesse
Copy link
Collaborator

No action planned here. Eager to see what kotlinx.io does! Kotlin/kotlinx-io#163

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants