-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Async API #163
Comments
Another option based on combining ktor + okio:
We found the key to okio working well in practice was the use of a single read/write methods taking a concrete buffer type (which is what allows the implementation to change owners rather than copy). The channel above is similar to ktor byte channels but goes through a concrete buffer type. Effectively it's one extra copy/move between buffers allowing a single producer and consumer to sync. The downside is it's a new type. |
ForewordWhile good old blocking IO remains the winner when it comes to throughput, Below are some thoughts and observations that directed the design of the proposed async API. Design directionThere are several approaches to the asynchronous IO API that I would split into two major categories:
The first category seems to provide a very convenient API From a user perspective, such APIs encourage writing code having many asynchronous calls From a library developer perspective, having an asynchronous API mirroring the synchronous API is a burden, On the contrary, asynchronous APIs providing only bulk operations reduce the scope of asynchronous code Yet another observation regarding the data formats being actively used for data exchange is that the size Described above was the main reasoning behind the proposed Asynchronous kotlinx-io APIAs a foundation for the asynchronous IO, two interfaces mirroring their blocking counterparts in terms of naming public interface AsyncRawSink {
public suspend fun write(buffer: Buffer, bytesCount: Long)
public suspend fun flush()
public suspend fun close()
}
public interface AsyncRawSource : AutoCloseable {
public suspend fun readAtMostTo(buffer: Buffer, bytesCount: Long): Long
override fun close()
} These interfaces aim to implement asynchronous sources and sinks with the same semantics as the synchronous The API specifies neither when the On AsyncRawSource and AsyncRawSink implementationsThere might be different reasons to implement own asynchronous sinks or sources, but the two most probable are:
If a sink/source decorates another sink/source, most likely, it'll suspend only when waiting for incoming data to arrive/output data Here's how CRC32Sink @OptIn(ExperimentalUnsignedTypes::class)
class AsyncCRC32Sink(private val upstream: AsyncRawSink): AsyncRawSink {
private val tempBuffer = Buffer()
private val crc32Table = generateCrc32Table()
private var crc32: UInt = 0xffffffffU
private fun update(value: Byte) {
val index = value.xor(crc32.toByte()).toUByte()
crc32 = crc32Table[index.toInt()].xor(crc32.shr(8))
}
fun crc32(): UInt = crc32.xor(0xffffffffU)
override suspend fun write(source: Buffer, byteCount: Long) {
source.copyTo(tempBuffer, 0, byteCount)
while (!tempBuffer.exhausted()) {
update(tempBuffer.readByte())
}
upstream.write(source, byteCount)
}
override suspend fun flush() = upstream.flush()
override suspend fun close() = upstream.close()
} The only difference compared to If both blocking and asynchronous versions of such classes need to exist, @OptIn(ExperimentalUnsignedTypes::class)
abstract class Crc32SinkCommon {
internal val tempBuffer = Buffer()
internal val crc32Table = generateCrc32Table()
internal var crc32Val: UInt = 0xffffffffU
fun update(value: Byte) {
val index = value.xor(crc32Val.toByte()).toUByte()
crc32Val = crc32Table[index.toInt()].xor(crc32Val.shr(8))
}
fun crc32(): UInt = crc32Val.xor(0xffffffffU)
internal inline fun onWrite(source: Buffer, byteCount: Long, write: (Buffer, Long) -> Unit) {
source.copyTo(tempBuffer, 0, byteCount)
while (!tempBuffer.exhausted()) {
update(tempBuffer.readByte())
}
write(source, byteCount)
}
}
class AsyncCrc32Sink(private val sink: AsyncRawSink): Crc32SinkCommon(), AsyncRawSink {
override suspend fun write(source: Buffer, byteCount: Long) {
onWrite(source, byteCount) { buf, count ->
sink.write(buf, count)
}
}
override suspend fun flush() = sink.flush()
override suspend fun close() = sink.close()
}
class BlockingCrc32Sink(private val sink: RawSink): Crc32SinkCommon(), RawSink {
override fun write(source: Buffer, byteCount: Long) {
onWrite(source, byteCount) { buf, count ->
sink.write(buf, count)
}
}
override fun flush() = sink.flush()
override fun close() = sink.close()
} When it comes to implementing a truly asynchronous sink/source, there are different types of platform-specific sinks/source and different ways to work with them.
As an example of integration with poll-based API, // Source is a channel that will be registered in selector for OP_READ,
// for example - a SocketChannel created after ServerSocketChannel accepted an incoming connection.
// It's up to a caller to register a channel in selector.
class SelectableChannelSource<T>(private val source: T) : AsyncRawSource, Selectable
where T : ReadableByteChannel, T : SelectableChannel {
private val internalBuffer = ByteBuffer.allocate(8192)
private val continuation = atomic<Continuation<Unit>?>(null)
init {
require(source.validOps() and SelectionKey.OP_READ != 0) {
"Source channel ($source) does support OP_READ operation."
}
}
override suspend fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
internalBuffer.clear()
internalBuffer.limit(min(byteCount, internalBuffer.capacity().toLong()).toInt())
var bytesRead = source.read(internalBuffer)
while (bytesRead == 0) {
// Let's suspend the coroutine until there will data to read
suspend()
// Try to read once again after resuming
bytesRead = source.read(internalBuffer)
}
// The source is exhausted
if (bytesRead == -1) return -1L
internalBuffer.flip()
return sink.write(internalBuffer).toLong()
}
override fun close() = source.close()
// Reactor should call this method once a key corresponding to this source is selected
override fun select(ops: Int) {
require(ops and SelectionKey.OP_READ != 0)
// Resume a previously suspended source
resume()
}
suspend fun suspend() {
suspendCoroutine<Unit> {
continuation.update { it }
}
}
fun resume() {
val cont = continuation.getAndUpdate { null }
cont?.resume(Unit)
}
}
interface Selectable {
fun select(ops: Int)
} If call to Usually, there is a reactor thread that polls operations that could be performed on a set of registered channels. val selector: Selector = Selector.open()
...
while (true) {
if (selector.select(1000L) == 0) continue
val keys = selector.selectedKeys().iterator()
while (keys.hasNext()) {
val key = keys.next()
keys.remove()
val attachment = key.attachment() as Selectable
attachment.select(key.readyOps())
}
} And here's a simple example of a wrapper around blocking API: @OptIn(ExperimentalForeignApi::class)
class AsyncRandomSource : AsyncRawSource {
private val fd: Int = open("/tmp/random", O_RDONLY).apply {
if (this < 0) {
throw IOException("Failed to open /dev/random: ${strerror(errno)?.toKString()}")
}
}
private val internalBuffer = ByteArray(1024)
@OptIn(UnsafeNumber::class)
override suspend fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
val capacity = min(byteCount, internalBuffer.size.toLong())
val bytesRead: Int = withContext(Dispatchers.IO) {
internalBuffer.usePinned {
val x = read(fd, it.addressOf(0), capacity.convert())
return@withContext x.convert()
}
}
if (bytesRead > 0) {
sink.write(internalBuffer, 0, bytesRead)
}
return bytesRead.toLong()
}
override fun close() {
close(fd)
}
} ∎ For the buffered sinks and sources, there is no one-to-one mapping between the synchronous and asynchronous API: public class AsyncSource(private val source: AsyncRawSource) : AsyncRawSource {
public val buffer: Buffer
/**
* Throws an exception when the source is exhausted before fulfilling the predicate.
*/
public suspend fun await(until: AwaitPredicate): Unit
/**
* Returns `true` if the predicate was fulfilled, `false` otherwise.
*/
public suspend fun tryAwait(until: AwaitPredicate): Boolean
override suspend fun readAtMostTo(buffer: Buffer, bytesCount: Long): Long
override fun close()
}
public class AsyncSink(private val sink: AsyncRawSink) : AsyncRawSink {
public val buffer: Buffer = Buffer()
override suspend fun write(buffer: Buffer, bytesCount: Long)
override suspend fun flush()
override suspend fun close()
} These classes don't provide the same future-rich interface as Sink or Source.
public interface AwaitPredicate {
public suspend fun apply(buffer: Buffer, fetchMore: suspend () -> Boolean): Boolean
} There will be some predefined predicates checking for the minimum number of bytes available, In the simplest form, a predicate might look like this: public class MinNumberOfBytesAvailable : AwaitPredicate {
override suspend fun apply(buffer: Buffer, fetchMore: suspend () -> Boolean): Boolean {
while (buffer.size < bytesCount && fetchMore()) { /* do nothing */ }
return buffer.size >= bytesCount
}
} The
Here's how the BSON reading/writing example from the suspend fun Message.toBson(sink: AsyncSink) {
val buffer = Buffer()
with (buffer) {
writeByte(0x9) // UTC-timestamp field
writeString("timestamp") // field name
writeByte(0)
writeLongLe(timestamp) // field value
writeByte(0x2) // string field
writeString("text") // field name
writeByte(0)
writeIntLe(text.utf8Size().toInt() + 1) // field value: length followed by the string
writeString(text)
writeByte(0)
writeByte(0) // end of BSON document
}
// Write document length and then its body
sink.buffer.writeIntLe(buffer.size.toInt() + 4)
buffer.transferTo(sink.buffer)
sink.flush()
}
suspend fun Message.Companion.fromBson(source: AsyncSource): Message {
source.await(AwaitPredicate.dataAvailable(4)) // check if the source contains length
val buffer = source.buffer
val length = buffer.readIntLe() - 4L
source.await(AwaitPredicate.dataAvailable(length)) // check if the source contains the whole message
fun readFieldName(source: Buffer): String {
val delimiterOffset = source.indexOf(0) // find offset of the 0-byte terminating the name
check(delimiterOffset >= 0) // indexOf return -1 if value not found
val fieldName = source.readString(delimiterOffset) // read the string until terminator
source.skip(1) // skip the terminator
return fieldName
}
// for simplicity, let's assume that the order of fields matches serialization order
var tag = buffer.readByte().toInt() // read the field type
check(tag == 0x9 && readFieldName(buffer) == "timestamp")
val timestamp = buffer.readLongLe() // read long value
tag = buffer.readByte().toInt()
check(tag == 0x2 && readFieldName(buffer) == "text")
val textLen = buffer.readIntLe() - 1L // read string length (it includes the terminator)
val text = buffer.readString(textLen) // read value
buffer.skip(1) // skip terminator
buffer.skip(1) // skip end of the document
return Message(timestamp, text)
} The only suspending calls are bulk reading and flushing, while almost all the parsing and all the marshaling are done synchronously. |
This is brilliant. Great design, easy to understand, and comprehensive examples. Yay! I can’t yet say how it’ll work in practice, but I’m optimistic. |
One small nitpick! I think the Source.close function aught to suspend. In OkHttp’s cache we have a Source implementation that writes its content to a file. When this source is closed, it closes both the upstream source and the file sink that now contains the cached data. That file close may need to flush, and that operation is suspending. |
In Moshi + Retrofit we’ve found it’s efficient to start decoding a JSON response before the last byte is received. I’m very interested in learning the fastest way to decode JSON with this API. You could put suspends everywhere, or you could put ‘em on the beginnings of objects, arrays, and strings, or you could defer decoding until the entire document is returned. |
At Google, our thinking has largely been as follows: if asynchrony is necessary at all for I/O, then it is usually caused by RPCs and the like whose overhead exceeds the cost of memory management for buffers etc. As a result, we've adopted a thin wrapper around To be clear, this is not necessarily in opposition to the above API -- but I do think it's worth considering what sorts of scenarios we might be using async I/O in, and whether performance sacrifices in the API design might produce clearer code without significant cost relative to the overhead already happening. |
Update on async IO API. After processing the feedback, the API was slightly reworked.
|
Some questions arise:
|
While
There will be two extension functions, |
For And less related but still somewhat relevant, is there a plan to introduce an (async) type similar to (Asking because I have an implementation of Java 8 NIO File API in my file manager Material Files (including local, archive and some network FS), and I'm also looking to create a more generic Kotlin file system API from that.) |
Such an interface, most likely, will be introduced.
That's a good question, thanks! |
Thanks for the link to the discussion! I also found that the Another question - why do we need the |
The |
I understand this part. But my question was, if a class implementing the original blocking Is the use cases for it worth requiring everyone to put effort into implementing both methods when they implement this |
In the context of this proposal, the fundamental difference between synchronous and asynchronous sinks/sources I see is that operations on the latter may require some no-trivial interactions between multiple agents, some of which may be non-local. It applies not only to read/write operations, but also to That's, in general, may not be the case if we're talking about asynchronous file I/O, but that's the case for asynchronous sinks/sources over logical HTTP/2.0 or WebSocket channels, where a graceful close may imply notifying a reactor thread, that then has to mux the logical stream into a "physical" one, and then send it and finally notify the
Nobody asked for it so far, thus it's not a part of the interface. :)
We can have a default |
Ktor migrated from okio to kotlinx-io but the migration resulted in them only using Sink/Source internally and on the interface expose a non-blocking API ( If something like that was provided in the common io library (i.e. kotlinx-io), it would allow parser libraries such as kotlinx-serialization-json to offer an API for consuming async streams (use case e.g. parsing a really large JSON while the bytes are being received over the wire). |
One use case where this would be helpful is when reading large files in JS/WASM in the browser. All of the file reading APIs are Promise based and since the main event loop is single threaded there is no way to force file reading to be synchronous. Maybe when WASM fully supports threads there will be some more possibilities. For now, in order to use Sink/Source interfaces I have to read the whole file into memory which has obvious downsides. The other option is to create new interfaces entirely, but then I lose out on the convenience functions built into kotlinx-io. |
kotlinx-io
provides only synchronous API at the moment. In some scenarios, an async API could be more convenient and useful, so it is worth supporting it.There is no particular plan right now, and this issue rather claims the intent and provides a place to log different possible approaches to provide async support.
Some issues/discussions and libraries showing how the problem could be approached (or how and why it shouldn't):
The text was updated successfully, but these errors were encountered: