Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problems when creating own implementation of RawSink/RawSource #235

Closed
yuroyami opened this issue Nov 19, 2023 · 2 comments
Closed

Problems when creating own implementation of RawSink/RawSource #235

yuroyami opened this issue Nov 19, 2023 · 2 comments

Comments

@yuroyami
Copy link

yuroyami commented Nov 19, 2023

Trying to bridge Ktor and Kotlinx.io together, which for the time being is not available via either libraries, so I had to implement my own workaround. These are the issues that I faced along the way:

Problem no.1 - Most of the properties for Buffer are internal/private : In order to create my own implementation to make sure efficiency is not an issue, I wanted to create something close to JVM InputStream.asSource() and OutputStream.asSink() extension functions, but most of buffer properties are internal or private (tail, head, etc).

Problem no.2 - Bridging is more problematic when the receiver needs suspend-able writing/reading: which is the case with Ktor's ByteReadChannel and ByteWriteChannel, where I end up forced to use runBlocking.

Here's my bridging workaround, the reading seems to work OK, but writing is faulty. I wrap both of them with buffered(). Any ideas ?

    private fun ByteReadChannel.toRawSource(): RawSource {
        val src = this

        return object: RawSource {
            override fun close() {
                runIgnoring { src.cancel() }
            }

            override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
                if (byteCount == 0L) return 0L

                require(byteCount >= 0) { "byteCount ($byteCount) < 0" }

                try {
                    val ba = runBlocking { src.toByteArray(byteCount.toInt()) }

                    for (b in ba) {
                        sink.writeByte(b)
                    }

                    return ba.size.toLong()
                } catch (e: AssertionError) {
                    throw e
                }
            }
        }
    }

    private fun ByteWriteChannel.toRawSink(): RawSink {
        val src = this

        return object: RawSink {
            override fun write(source: Buffer, byteCount: Long) {
                if (byteCount == 0L) return
                try {
                    val req = source.request(byteCount)
                    val array = if (req) {
                        source.readByteArray(byteCount.toInt())
                    } else {
                        source.readByteArray()
                    }

                    runBlocking { src.writeFully(array) }

                } catch (e: AssertionError) {
                    throw e
                }
            }
            override fun flush() = src.flush()
            override fun close() {
                runIgnoring { src.close(Exception("Explicit closing by Kotlinx.io")) }
            }
        }
    }
@fzhinkin
Copy link
Collaborator

@yuroyami thanks for sharing the problem you're tackling with!

Problem no.1 - Most of the properties for Buffer are internal/private

The work on publicly accessible API to iterate over buffer's segments and access its internal data is currently in progress, so I hope it will no longer be a problem soon.

Problem no.2 - Bridging is more problematic when the receiver needs suspend-able writing/reading: which is the case with Ktor's ByteReadChannel and ByteWriteChannel, where I end up forced to use runBlocking.

I'm not sure if anything significantly different could be done here as both RawSink and RawSource provide blocking API. There is a proposal to support asynchronous IO API in kotlinx-io (#163), but it not yet merged.

Here's my bridging workaround, the reading seems to work OK, but writing is faulty. I wrap both of them with buffered(). Any ideas ?

What exactly do you mean by "faulty"? Is there a test reproducing a particular issue?

@yuroyami
Copy link
Author

@fzhinkin Thank you for your prompt response.

The work on publicly accessible API to iterate over buffer's segments and access its internal data is currently in progress, so I hope it will no longer be a problem soon.

Sublime !

I'm not sure if anything significantly different could be done here as both RawSink and RawSource provide blocking API. There is a proposal to support asynchronous IO API in kotlinx-io (#163), but it not yet merged.

The potential integration of 'nio' support would be a valuable enhancement, especially in scenarios like Ktor where asynchronous operations are essential. Closing this issue since the planned points seem sufficient to address the current problem.

What exactly do you mean by "faulty"? Is there a test reproducing a particular issue?

I suppose I don't have a particular test to reproduce this issue, but it can be reproduced using Ktor by transmitting dummy data (of any size) from a device to another (server socket and a client socket, pretty basic infrastructure), seems like a Ktor issue when I read its channel blockingly, so it's not related to kotlinx-io. The latter does the job perfectly when I am not using a custom implementation for the RawSink.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants