From 37cadc426e6379e05983fa84cfd8cdb139228cf1 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 24 Mar 2021 09:47:41 -0400 Subject: [PATCH 01/23] feat: add missing Closeable interface --- .../src/software/aws/clientrt/io/Closeable.kt | 37 +++++++++++++++++++ .../software/aws/clientrt/io/CloseableJVM.kt | 23 ++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 client-runtime/io/common/src/software/aws/clientrt/io/Closeable.kt create mode 100644 client-runtime/io/jvm/src/software/aws/clientrt/io/CloseableJVM.kt diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/Closeable.kt b/client-runtime/io/common/src/software/aws/clientrt/io/Closeable.kt new file mode 100644 index 000000000..f8e73f644 --- /dev/null +++ b/client-runtime/io/common/src/software/aws/clientrt/io/Closeable.kt @@ -0,0 +1,37 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +// this really should live in the stdlib... +// https://youtrack.jetbrains.com/issue/KT-31066 + +public expect interface Closeable { + public fun close() +} + +public inline fun C.use(block: (C) -> R): R { + var closed = false + + return try { + block(this) + } catch (first: Throwable) { + try { + closed = true + close() + } catch (second: Throwable) { + first.addSuppressedInternal(second) + } + + throw first + } finally { + if (!closed) { + close() + } + } +} + +@PublishedApi +internal expect fun Throwable.addSuppressedInternal(other: Throwable) diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/CloseableJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/CloseableJVM.kt new file mode 100644 index 000000000..dff64d3b6 --- /dev/null +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/CloseableJVM.kt @@ -0,0 +1,23 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import java.lang.reflect.Method + +public actual typealias Closeable = java.io.Closeable + +@PublishedApi +internal actual fun Throwable.addSuppressedInternal(other: Throwable) { + AddSuppressedMethod?.invoke(this, other) +} + +private val AddSuppressedMethod: Method? by lazy { + try { + Throwable::class.java.getMethod("addSuppressed", Throwable::class.java) + } catch (t: Throwable) { + null + } +} From 62642732591dfdbd14ad9c82b1c8d10923754cfb Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 24 Mar 2021 09:51:30 -0400 Subject: [PATCH 02/23] refactor: rename Source --- .../aws/clientrt/content/ByteStream.kt | 6 +-- client-runtime/io/build.gradle.kts | 3 ++ .../aws/clientrt/io/ByteReadChannel.kt | 45 +++++++++++++++++++ .../src/software/aws/clientrt/io/Source.kt | 38 ++-------------- .../{SourceJVM.kt => ByteReadChannelJVM.kt} | 2 +- .../http/engine/ktor/KtorRequestAdapter.kt | 7 +-- .../clientrt/http/engine/ktor/KtorUtils.kt | 9 ++-- .../software/aws/clientrt/http/HttpBody.kt | 10 ++--- 8 files changed, 68 insertions(+), 52 deletions(-) create mode 100644 client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt rename client-runtime/io/jvm/src/software/aws/clientrt/io/{SourceJVM.kt => ByteReadChannelJVM.kt} (98%) diff --git a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt index eaa323110..a0c43953e 100644 --- a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt +++ b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt @@ -4,7 +4,7 @@ */ package software.aws.clientrt.content -import software.aws.clientrt.io.Source +import software.aws.clientrt.io.ByteReadChannel /** * Represents an abstract stream of bytes @@ -31,9 +31,9 @@ sealed class ByteStream { */ abstract class Reader : ByteStream() { /** - * Provides [Source] to read from/consume + * Provides [ByteReadChannel] to read from/consume */ - abstract fun readFrom(): Source + abstract fun readFrom(): ByteReadChannel } companion object { diff --git a/client-runtime/io/build.gradle.kts b/client-runtime/io/build.gradle.kts index 54c02f243..bee5b95e8 100644 --- a/client-runtime/io/build.gradle.kts +++ b/client-runtime/io/build.gradle.kts @@ -9,11 +9,14 @@ description = "IO primitives for Smithy services generated by smithy-kotlin" extra["displayName"] = "Smithy :: Kotlin :: IO" extra["moduleName"] = "software.aws.clientrt.io" +val ktorVersion: String by project + kotlin { sourceSets { commonMain { dependencies { implementation(project(":client-runtime:utils")) + implementation("io.ktor:ktor-io:$ktorVersion") } } diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt new file mode 100644 index 000000000..7651e8bd1 --- /dev/null +++ b/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt @@ -0,0 +1,45 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.aws.clientrt.io + +/** + * Supplies a stream of bytes. Use this interface to read data from wherever it’s located: from the network, storage, or a buffer in memory. + * + * This interface is functionally equivalent to an asynchronous coroutine compatible [java.io.InputStream] + */ +expect interface ByteReadChannel { + /** + * Returns number of bytes that can be read without suspension. Read operations do no suspend and return immediately when this number is at least the number of bytes requested for read. + */ + val availableForRead: Int + + /** + * Returns true if the channel is closed and no remaining bytes are available for read. It implies that availableForRead is zero. + */ + val isClosedForRead: Boolean + + val isClosedForWrite: Boolean + + /** + * Read the entire content into a [ByteArray]. NOTE: Be careful this will read the entire byte stream into memory. + */ + suspend fun readAll(): ByteArray + + /** + * Reads all length bytes to [sink] buffer or fails if source has been closed. Suspends if not enough bytes available. + */ + suspend fun readFully(sink: ByteArray, offset: Int, length: Int) + + /** + * Reads all available bytes to [sink] buffer and returns immediately or suspends if no bytes available + */ + suspend fun readAvailable(sink: ByteArray, offset: Int, length: Int): Int + + /** + * Close channel with optional cause cancellation. + * This is an idempotent operation — subsequent invocations of this function have no effect and return false + */ + fun cancel(cause: Throwable?): Boolean +} diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/Source.kt b/client-runtime/io/common/src/software/aws/clientrt/io/Source.kt index 28012f8bd..db075d137 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/Source.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/Source.kt @@ -5,41 +5,11 @@ package software.aws.clientrt.io /** - * Supplies a stream of bytes. Use this interface to read data from wherever it’s located: from the network, storage, or a buffer in memory. - * - * This interface is functionally equivalent to an asynchronous coroutine compatible [java.io.InputStream] + * A synchronous read only stream of bytes (similar to java.io.InputStream) */ -expect interface Source { - /** - * Returns number of bytes that can be read without suspension. Read operations do no suspend and return immediately when this number is at least the number of bytes requested for read. - */ - val availableForRead: Int +public interface Source : Closeable { - /** - * Returns true if the channel is closed and no remaining bytes are available for read. It implies that availableForRead is zero. - */ - val isClosedForRead: Boolean + fun read(sink: Buffer, byteCount: Int): Int - val isClosedForWrite: Boolean - - /** - * Read the entire content into a [ByteArray]. NOTE: Be careful this will read the entire byte stream into memory. - */ - suspend fun readAll(): ByteArray - - /** - * Reads all length bytes to [sink] buffer or fails if source has been closed. Suspends if not enough bytes available. - */ - suspend fun readFully(sink: ByteArray, offset: Int, length: Int) - - /** - * Reads all available bytes to [sink] buffer and returns immediately or suspends if no bytes available - */ - suspend fun readAvailable(sink: ByteArray, offset: Int, length: Int): Int - - /** - * Close channel with optional cause cancellation. - * This is an idempotent operation — subsequent invocations of this function have no effect and return false - */ - fun cancel(cause: Throwable?): Boolean +// fun cursor(): Cursor? } diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/SourceJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/ByteReadChannelJVM.kt similarity index 98% rename from client-runtime/io/jvm/src/software/aws/clientrt/io/SourceJVM.kt rename to client-runtime/io/jvm/src/software/aws/clientrt/io/ByteReadChannelJVM.kt index 7210fd78b..da10699d9 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/SourceJVM.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/ByteReadChannelJVM.kt @@ -13,7 +13,7 @@ import java.nio.ByteBuffer * * This interface is functionally equivalent to an asynchronous coroutine compatible [java.io.InputStream] */ -actual interface Source { +actual interface ByteReadChannel { /** * Returns number of bytes that can be read without suspension. Read operations do no suspend and return immediately when this number is at least the number of bytes requested for read. */ diff --git a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt index 551482d6f..f9a2512ed 100644 --- a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt +++ b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt @@ -8,9 +8,7 @@ import io.ktor.client.utils.EmptyContent import io.ktor.http.ContentType import io.ktor.http.content.ByteArrayContent import io.ktor.http.content.OutgoingContent -import io.ktor.utils.io.ByteChannel -import io.ktor.utils.io.ByteReadChannel -import io.ktor.utils.io.close +import io.ktor.utils.io.* import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope @@ -18,7 +16,6 @@ import kotlinx.coroutines.launch import software.aws.clientrt.http.HttpBody import software.aws.clientrt.http.request.HttpRequest import software.aws.clientrt.http.request.HttpRequestBuilder -import software.aws.clientrt.io.Source import java.nio.ByteBuffer import kotlin.coroutines.CoroutineContext import io.ktor.client.request.HttpRequestBuilder as KtorRequestBuilder @@ -87,7 +84,7 @@ internal class KtorRequestAdapter( return channel } - private suspend fun forwardSource(dst: ByteChannel, source: Source) { + private suspend fun forwardSource(dst: ByteChannel, source: software.aws.clientrt.io.ByteReadChannel) { // TODO - consider a buffer pool here val buffer = ByteBuffer.allocate(BUFFER_SIZE) while (!source.isClosedForRead) { diff --git a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt index 4948967ec..a0ea318a5 100644 --- a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt +++ b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt @@ -7,15 +7,15 @@ package software.aws.clientrt.http.engine.ktor import io.ktor.client.statement.HttpResponse import io.ktor.http.* import io.ktor.utils.io.* -import io.ktor.utils.io.core.readBytes +import io.ktor.utils.io.core.* import software.aws.clientrt.http.HttpBody import software.aws.clientrt.http.HttpStatusCode import software.aws.clientrt.http.request.HttpRequest import software.aws.clientrt.http.request.HttpRequestBuilder -import software.aws.clientrt.io.Source import java.nio.ByteBuffer import io.ktor.client.request.HttpRequestBuilder as KtorHttpRequestBuilder import software.aws.clientrt.http.response.HttpResponse as SdkHttpResponse +import software.aws.clientrt.io.ByteReadChannel as SdkByteReadChannel // convert everything **except** the body from an Sdk HttpRequestBuilder to equivalent Ktor abstraction internal fun HttpRequest.toKtorRequestBuilder(): KtorHttpRequestBuilder { @@ -61,7 +61,8 @@ internal class KtorHeaders(private val headers: Headers) : software.aws.clientrt } // wrapper around ByteReadChannel that implements the [Source] interface -internal class KtorContentStream(private val channel: ByteReadChannel, private val onClose: (() -> Unit)? = null) : Source { +internal class KtorContentStream(private val channel: ByteReadChannel, private val onClose: (() -> Unit)? = null) : + SdkByteReadChannel { override val availableForRead: Int get() = channel.availableForRead @@ -112,7 +113,7 @@ internal class KtorContentStream(private val channel: ByteReadChannel, private v // wrapper around a ByteReadChannel that implements the content as an SDK (streaming) HttpBody internal class KtorHttpBody(channel: ByteReadChannel, onClose: (() -> Unit)? = null) : HttpBody.Streaming() { private val source = KtorContentStream(channel, onClose) - override fun readFrom(): Source = source + override fun readFrom(): SdkByteReadChannel = source } // convert ktor Http response to an (SDK) Http response diff --git a/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt b/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt index dfac2eb62..6bf6d86cf 100644 --- a/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt +++ b/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt @@ -5,7 +5,7 @@ package software.aws.clientrt.http import software.aws.clientrt.content.ByteStream -import software.aws.clientrt.io.Source +import software.aws.clientrt.io.ByteReadChannel /** * HTTP payload to be sent to a peer @@ -42,9 +42,9 @@ sealed class HttpBody { */ abstract class Streaming : HttpBody() { /** - * Provides [Source] for the content + * Provides [ByteReadChannel] for the content */ - abstract fun readFrom(): Source + abstract fun readFrom(): ByteReadChannel } } @@ -59,7 +59,7 @@ fun ByteStream.toHttpBody(): HttpBody { } is ByteStream.Reader -> object : HttpBody.Streaming() { override val contentLength: Long? = bytestream.contentLength - override fun readFrom(): Source = bytestream.readFrom() + override fun readFrom(): ByteReadChannel = bytestream.readFrom() } } } @@ -87,7 +87,7 @@ fun HttpBody.toByteStream(): ByteStream? { } is HttpBody.Streaming -> object : ByteStream.Reader() { override val contentLength: Long? = body.contentLength - override fun readFrom(): Source = body.readFrom() + override fun readFrom(): ByteReadChannel = body.readFrom() } } } From 6217b1fb7941990dd640cb8cd7ef3f9b06906692 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 24 Mar 2021 10:05:46 -0400 Subject: [PATCH 03/23] fix nullability --- .../src/software/aws/clientrt/content/ByteArrayContent.kt | 2 +- .../common/src/software/aws/clientrt/content/StringContent.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteArrayContent.kt b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteArrayContent.kt index a83d52570..32a8c624e 100644 --- a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteArrayContent.kt +++ b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteArrayContent.kt @@ -8,6 +8,6 @@ package software.aws.clientrt.content * Container for wrapping a ByteArray as a [ByteStream] */ class ByteArrayContent(private val bytes: ByteArray) : ByteStream.Buffer() { - override val contentLength: Long? = bytes.size.toLong() + override val contentLength: Long = bytes.size.toLong() override fun bytes(): ByteArray = bytes } diff --git a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/StringContent.kt b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/StringContent.kt index d9a39f3b0..1e8bacf9e 100644 --- a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/StringContent.kt +++ b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/StringContent.kt @@ -11,7 +11,7 @@ class StringContent(str: String) : ByteStream.Buffer() { @OptIn(ExperimentalStdlibApi::class) private val asBytes: ByteArray = str.encodeToByteArray() - override val contentLength: Long? = asBytes.size.toLong() + override val contentLength: Long = asBytes.size.toLong() override fun bytes(): ByteArray = asBytes } From 6dd0f09b1a07db138a0ac38993a0abb5d02a6451 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 24 Mar 2021 15:12:56 -0400 Subject: [PATCH 04/23] feat(io): wrap ktor byte channels --- .../client-rt-core/build.gradle.kts | 3 +- .../aws/clientrt/content/LocalFileContent.kt | 23 ++++ client-runtime/io/build.gradle.kts | 11 ++ .../software/aws/clientrt/io/ByteChannel.kt | 35 ++++++ .../aws/clientrt/io/ByteReadChannel.kt | 5 +- .../aws/clientrt/io/ByteWriteChannel.kt | 57 ++++++++++ .../software/aws/clientrt/io/KtorAdapters.kt | 103 ++++++++++++++++++ .../aws/clientrt/io/ByteWriteChannelJVM.kt | 57 ++++++++++ .../software/aws/clientrt/io/FileChannels.kt | 59 ++++++++++ .../aws/clientrt/io/KtorAdaptersJVM.kt | 21 ++++ gradle/jvm.gradle | 1 - 11 files changed, 372 insertions(+), 3 deletions(-) create mode 100644 client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt create mode 100644 client-runtime/io/common/src/software/aws/clientrt/io/ByteChannel.kt create mode 100644 client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt create mode 100644 client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt create mode 100644 client-runtime/io/jvm/src/software/aws/clientrt/io/ByteWriteChannelJVM.kt create mode 100644 client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt create mode 100644 client-runtime/io/jvm/src/software/aws/clientrt/io/KtorAdaptersJVM.kt diff --git a/client-runtime/client-rt-core/build.gradle.kts b/client-runtime/client-rt-core/build.gradle.kts index 0b51047ab..6eebec5bb 100644 --- a/client-runtime/client-rt-core/build.gradle.kts +++ b/client-runtime/client-rt-core/build.gradle.kts @@ -11,7 +11,8 @@ kotlin { sourceSets { commonMain { dependencies { - implementation(project(":client-runtime:io")) + // io types are exposed as part of content/* + api(project(":client-runtime:io")) // Attributes property bag is exposed as client options api(project(":client-runtime:utils")) } diff --git a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt new file mode 100644 index 000000000..11a02ed8d --- /dev/null +++ b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt @@ -0,0 +1,23 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.content + +import software.aws.clientrt.io.ByteReadChannel +import software.aws.clientrt.io.readChannel +import java.io.File + +/** + * ByteStream backed by a local [file] + */ +public class LocalFileContent( + public val file: File, +) : ByteStream.Reader() { + + override val contentLength: Long + get() = file.length() + + override fun readFrom(): ByteReadChannel = file.readChannel() +} diff --git a/client-runtime/io/build.gradle.kts b/client-runtime/io/build.gradle.kts index bee5b95e8..b07ad8c3d 100644 --- a/client-runtime/io/build.gradle.kts +++ b/client-runtime/io/build.gradle.kts @@ -10,6 +10,7 @@ extra["displayName"] = "Smithy :: Kotlin :: IO" extra["moduleName"] = "software.aws.clientrt.io" val ktorVersion: String by project +val coroutinesVersion: String by project kotlin { sourceSets { @@ -17,6 +18,9 @@ kotlin { dependencies { implementation(project(":client-runtime:utils")) implementation("io.ktor:ktor-io:$ktorVersion") + + // Dispatchers.IO + api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion") } } @@ -25,5 +29,12 @@ kotlin { implementation(project(":client-runtime:testing")) } } + + jvmMain { + dependencies { + // file channel utils + implementation("io.ktor:ktor-utils:$ktorVersion") + } + } } } diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/ByteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/ByteChannel.kt new file mode 100644 index 000000000..09dc0c651 --- /dev/null +++ b/client-runtime/io/common/src/software/aws/clientrt/io/ByteChannel.kt @@ -0,0 +1,35 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import io.ktor.utils.io.ByteChannel as KtorByteChannel +import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel + +/** + * Channel for asynchronous reading and writing sequences of bytes. + * + * This is a buffered **single-reader single writer channel**. + * + * Read operations can be invoked concurrently with write operations, but multiple reads or multiple writes + * cannot be invoked concurrently with themselves. Exceptions are [close] and [flush] which can be invoked + * concurrently with other operations including between themselves at any time. + */ +public interface ByteChannel : ByteReadChannel, ByteWriteChannel + +/** + * Create a buffered channel for asynchronous reading and writing of bytes + */ +public fun ByteChannel(autoFlush: Boolean = true): ByteChannel = + KtorByteChannel(autoFlush).toSdkChannel() + +/** + * Creates a channel for reading from the given byte array. + */ +public fun ByteReadChannel( + content: ByteArray, + offset: Int = 0, + length: Int = content.size - offset +): ByteReadChannel = KtorByteReadChannel(content, offset, length).toSdkChannel() diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt index 7651e8bd1..0fa369342 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt @@ -16,10 +16,13 @@ expect interface ByteReadChannel { val availableForRead: Int /** - * Returns true if the channel is closed and no remaining bytes are available for read. It implies that availableForRead is zero. + * Returns `true` if the channel is closed and no remaining bytes are available for read. It implies that availableForRead is zero. */ val isClosedForRead: Boolean + /** + * Returns `true` if the channel is closed from the writer side. [availableForRead] may be > 0 + */ val isClosedForWrite: Boolean /** diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt new file mode 100644 index 000000000..585dea7b5 --- /dev/null +++ b/client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt @@ -0,0 +1,57 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +public expect interface ByteWriteChannel { + + /** + * Returns the number of bytes that can be written without suspension. Write operations do not + * suspend and return immediately when this number is at least the number of bytes requested for + * write. + */ + val availableForWrite: Int + + /** + * Returns true if channel has been closed. Attempting to write to the channel will throw an exception + */ + val isClosedForWrite: Boolean + + /** + * Total number of bytes written to the channel. + * + * NOTE: not guaranteed to be atomic and may be updated in middle of a write operation + */ + val totalBytesWritten: Long + + /** + * Returns `true` if the channel flushes automatically all pending bytes after every write operation. + * If `false` then flush only happens when [flush] is explicitly called or when the buffer is full. + */ + val autoFlush: Boolean + + /** + * Writes all [src] bytes and suspends until all bytes written. + */ + suspend fun writeFully(src: ByteArray, offset: Int = 0, length: Int = src.size - offset): Unit + + /** + * Writes as much as possible and only suspends if buffer is full + * Returns the byte count written. + */ + suspend fun writeAvailable(src: ByteArray, offset: Int = 0, length: Int = src.size - offset): Int + + /** + * Closes this channel with an optional exceptional [cause]. All pending bytes are flushed. + * This is an idempotent operation — subsequent invocations of this function have no effect and return false + */ + suspend fun close(cause: Throwable?): Boolean + + /** + * Flushes all pending write bytes making them available for read. + * Thread safe and can be invoked at any time. It does nothing when invoked on a closed channel. + */ + fun flush(): Unit +} diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt new file mode 100644 index 000000000..9ad8ea8dd --- /dev/null +++ b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt @@ -0,0 +1,103 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import io.ktor.utils.io.* +import io.ktor.utils.io.core.* +import io.ktor.utils.io.ByteChannel as KtorByteChannel +import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel +import io.ktor.utils.io.ByteWriteChannel as KtorByteWriteChannel + +/** + * Wrap ktor's ByteReadChannel as our own. This implements the common API of [ByteReadChannel]. Only + * platform specific differences in interfaces need be implemented in inheritors. + */ +internal abstract class KtorReadChannelAdapterBase( + val chan: KtorByteReadChannel +) : ByteReadChannel { + + override val availableForRead: Int + get() = chan.availableForRead + + override val isClosedForRead: Boolean + get() = chan.isClosedForRead + + override val isClosedForWrite: Boolean + get() = chan.isClosedForWrite + + override suspend fun readAll(): ByteArray { + return chan.readRemaining().readBytes() + } + + override suspend fun readFully(sink: ByteArray, offset: Int, length: Int) { + chan.readFully(sink, offset, length) + } + + override suspend fun readAvailable(sink: ByteArray, offset: Int, length: Int): Int { + return chan.readAvailable(sink, offset, length) + } + + override fun cancel(cause: Throwable?): Boolean { + return chan.cancel(cause) + } +} + +/** + * Wrap ktor's ByteWriteChannel as our own. This implements the common API of [ByteWriteChannel]. Only + * platform specific differences in interfaces need be implemented in inheritors. + */ +internal abstract class KtorWriteChannelAdapterBase( + val chan: KtorByteWriteChannel +) : ByteWriteChannel { + override val availableForWrite: Int + get() = chan.availableForWrite + + override val isClosedForWrite: Boolean + get() = chan.isClosedForWrite + + override val totalBytesWritten: Long + get() = chan.totalBytesWritten + + override val autoFlush: Boolean + get() = chan.autoFlush + + override suspend fun writeFully(src: ByteArray, offset: Int, length: Int) { + chan.writeFully(src, offset, length) + } + + override suspend fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int { + return chan.writeAvailable(src, offset, length) + } + + override suspend fun close(cause: Throwable?): Boolean { + return chan.close(cause) + } + + override fun flush() { + chan.flush() + } +} + +/** + * Wrap ktor's ByteChannel as our own. This implements the common API of [ByteChannel]. Only + * platform specific differences in interfaces need be implemented in inheritors. + */ + +internal class KtorByteChannelAdapter( + val chan: KtorByteChannel +) : ByteChannel, + ByteReadChannel by KtorReadChannelAdapter(chan), + ByteWriteChannel by KtorWriteChannelAdapter(chan) { + override val isClosedForWrite: Boolean + get() = chan.isClosedForWrite +} + +internal expect class KtorReadChannelAdapter(chan: KtorByteReadChannel) : ByteReadChannel +internal expect class KtorWriteChannelAdapter(chan: KtorByteWriteChannel) : ByteWriteChannel + +internal fun KtorByteReadChannel.toSdkChannel(): ByteReadChannel = KtorReadChannelAdapter(this) +internal fun KtorByteWriteChannel.toSdkChannel(): ByteWriteChannel = KtorWriteChannelAdapter(this) +internal fun KtorByteChannel.toSdkChannel(): ByteChannel = KtorByteChannelAdapter(this) diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/ByteWriteChannelJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/ByteWriteChannelJVM.kt new file mode 100644 index 000000000..8dd3f8e3e --- /dev/null +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/ByteWriteChannelJVM.kt @@ -0,0 +1,57 @@ +// ktlint-disable filename +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +public actual interface ByteWriteChannel { + /** + * Returns the number of bytes that can be written without suspension. Write operations do not + * suspend and return immediately when this number is at least the number of bytes requested for + * write. + */ + actual val availableForWrite: Int + + /** + * Returns true if channel has been closed. Attempting to write to the channel will throw an exception + */ + actual val isClosedForWrite: Boolean + + /** + * Total number of bytes written to the channel. + * + * NOTE: not guaranteed to be atomic and may be updated in middle of a write operation + */ + actual val totalBytesWritten: Long + + /** + * Returns `true` if the channel flushes automatically all pending bytes after every write operation. + * If `false` then flush only happens when [flush] is explicitly called or when the buffer is full. + */ + actual val autoFlush: Boolean + + /** + * Writes all [src] bytes and suspends until all bytes written. + */ + actual suspend fun writeFully(src: ByteArray, offset: Int, length: Int) + + /** + * Writes as much as possible and only suspends if buffer is full + * Returns the byte count written. + */ + actual suspend fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int + + /** + * Closes this channel with an optional exceptional [cause]. All pending bytes are flushed. + * This is an idempotent operation — subsequent invocations of this function have no effect and return false + */ + actual suspend fun close(cause: Throwable?): Boolean + + /** + * Flushes all pending write bytes making them available for read. + * Thread safe and can be invoked at any time. It does nothing when invoked on a closed channel. + */ + actual fun flush() +} diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt new file mode 100644 index 000000000..967b918ea --- /dev/null +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt @@ -0,0 +1,59 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import kotlinx.coroutines.Dispatchers +import software.aws.clientrt.util.InternalApi +import java.io.File +import java.nio.file.Path +import kotlin.coroutines.CoroutineContext +import io.ktor.util.cio.readChannel as cioReadChannel +import io.ktor.util.cio.writeChannel as cioWriteChannel + +/** + * Open a read channel for file and launch a coroutine to fill it. + * Please note that file reading is blocking so if you are starting it on [Dispatchers.Unconfined] it may block + * your async code and freeze the whole application when runs on a pool that is not intended for blocking operations. + * This is why [coroutineContext] should have [Dispatchers.IO] or + * a coroutine dispatcher that is properly configured for blocking IO. + */ +@InternalApi +public fun File.readChannel( + start: Long = 0, + endInclusive: Long = -1, + coroutineContext: CoroutineContext = Dispatchers.IO +): ByteReadChannel = cioReadChannel(start, endInclusive, coroutineContext).toSdkChannel() + +/** + * Open a read channel for file and launch a coroutine to fill it. + * Please note that file reading is blocking so if you are starting it on [Dispatchers.Unconfined] it may block + * your async code + */ +@InternalApi +public fun Path.readChannel(start: Long, endInclusive: Long): ByteReadChannel = + toFile().readChannel(start, endInclusive) + +/** + * Open a read channel for file and launch a coroutine to fill it. + * Please note that file reading is blocking so if you are starting it on [Dispatchers.Unconfined] it may block + * your async code + */ +@InternalApi +public fun Path.readChannel(): ByteReadChannel = toFile().readChannel() + +// FIXME - CoroutineContext makes coroutines-core an API dependency + +/** + * Open a write channel for the file and launch a coroutine to read from it. + * Please note that file writing is blocking so if you are starting it on [Dispatchers.Unconfined] it may block + * your async code and freeze the whole application when runs on a pool that is not intended for blocking operations. + * This is why [coroutineContext] should have [Dispatchers.IO] or + * a coroutine dispatcher that is properly configured for blocking IO. + */ +@InternalApi +public fun File.writeChannel( + coroutineContext: CoroutineContext = Dispatchers.IO +): ByteWriteChannel = cioWriteChannel(coroutineContext).toSdkChannel() diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/KtorAdaptersJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/KtorAdaptersJVM.kt new file mode 100644 index 000000000..a108f8ab0 --- /dev/null +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/KtorAdaptersJVM.kt @@ -0,0 +1,21 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io +import java.nio.ByteBuffer +import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel +import io.ktor.utils.io.ByteWriteChannel as KtorByteWriteChannel + +internal actual class KtorReadChannelAdapter actual constructor( + chan: KtorByteReadChannel +) : ByteReadChannel, KtorReadChannelAdapterBase(chan) { + override suspend fun readAvailable(sink: ByteBuffer): Int { + return chan.readAvailable(sink) + } +} + +internal actual class KtorWriteChannelAdapter actual constructor( + chan: KtorByteWriteChannel +) : ByteWriteChannel, KtorWriteChannelAdapterBase(chan) diff --git a/gradle/jvm.gradle b/gradle/jvm.gradle index d19a23ab9..63d3efa64 100644 --- a/gradle/jvm.gradle +++ b/gradle/jvm.gradle @@ -11,7 +11,6 @@ kotlin { sourceSets { jvmMain.dependencies { api group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib', version: kotlinVersion - implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion" } jvmTest.dependencies { From 8b186387892565323e22bf8b2177ad665f55dffd Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 24 Mar 2021 15:16:47 -0400 Subject: [PATCH 05/23] docs --- .../software/aws/clientrt/io/ByteReadChannel.kt | 17 ++++++++++------- .../aws/clientrt/io/ByteWriteChannel.kt | 3 +++ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt index 0fa369342..5d4b5b23a 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt @@ -5,18 +5,19 @@ package software.aws.clientrt.io /** - * Supplies a stream of bytes. Use this interface to read data from wherever it’s located: from the network, storage, or a buffer in memory. - * - * This interface is functionally equivalent to an asynchronous coroutine compatible [java.io.InputStream] + * Supplies an asynchronous stream of bytes. Use this interface to read data from wherever it’s located: + * from the network, storage, or a buffer in memory. This is a **single-reader channel**. */ expect interface ByteReadChannel { /** - * Returns number of bytes that can be read without suspension. Read operations do no suspend and return immediately when this number is at least the number of bytes requested for read. + * Returns number of bytes that can be read without suspension. Read operations do no suspend and + * return immediately when this number is at least the number of bytes requested for read. */ val availableForRead: Int /** - * Returns `true` if the channel is closed and no remaining bytes are available for read. It implies that availableForRead is zero. + * Returns `true` if the channel is closed and no remaining bytes are available for read. It implies + * that availableForRead is zero. */ val isClosedForRead: Boolean @@ -26,12 +27,14 @@ expect interface ByteReadChannel { val isClosedForWrite: Boolean /** - * Read the entire content into a [ByteArray]. NOTE: Be careful this will read the entire byte stream into memory. + * Read the entire content into a [ByteArray]. NOTE: Be careful this will read the entire byte stream + * into memory. */ suspend fun readAll(): ByteArray /** - * Reads all length bytes to [sink] buffer or fails if source has been closed. Suspends if not enough bytes available. + * Reads all length bytes to [sink] buffer or fails if source has been closed. Suspends if not enough + * bytes available. */ suspend fun readFully(sink: ByteArray, offset: Int, length: Int) diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt index 585dea7b5..9d1106a90 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt @@ -5,6 +5,9 @@ package software.aws.clientrt.io +/** + * A channel for writing a sequence of bytes asynchronously. This is a **single writer channel**. + */ public expect interface ByteWriteChannel { /** From bd7f020d1b0eab252d36fe7921b040390ced6a64 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 24 Mar 2021 15:30:14 -0400 Subject: [PATCH 06/23] refactor: rename sdk channel types --- .../aws/clientrt/content/ByteStream.kt | 6 ++--- .../aws/clientrt/content/LocalFileContent.kt | 4 +-- .../software/aws/clientrt/io/KtorAdapters.kt | 26 +++++++++---------- .../io/{ByteChannel.kt => SdkByteChannel.kt} | 8 +++--- ...teReadChannel.kt => SdkByteReadChannel.kt} | 2 +- ...WriteChannel.kt => SdkByteWriteChannel.kt} | 2 +- .../software/aws/clientrt/io/FileChannels.kt | 8 +++--- .../aws/clientrt/io/KtorAdaptersJVM.kt | 4 +-- ...ChannelJVM.kt => SdkByteReadChannelJVM.kt} | 2 +- ...hannelJVM.kt => SdkByteWriteChannelJVM.kt} | 2 +- .../http/engine/ktor/KtorRequestAdapter.kt | 6 +++-- .../clientrt/http/engine/ktor/KtorUtils.kt | 2 +- .../software/aws/clientrt/http/HttpBody.kt | 10 +++---- 13 files changed, 42 insertions(+), 40 deletions(-) rename client-runtime/io/common/src/software/aws/clientrt/io/{ByteChannel.kt => SdkByteChannel.kt} (79%) rename client-runtime/io/common/src/software/aws/clientrt/io/{ByteReadChannel.kt => SdkByteReadChannel.kt} (97%) rename client-runtime/io/common/src/software/aws/clientrt/io/{ByteWriteChannel.kt => SdkByteWriteChannel.kt} (97%) rename client-runtime/io/jvm/src/software/aws/clientrt/io/{ByteReadChannelJVM.kt => SdkByteReadChannelJVM.kt} (97%) rename client-runtime/io/jvm/src/software/aws/clientrt/io/{ByteWriteChannelJVM.kt => SdkByteWriteChannelJVM.kt} (97%) diff --git a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt index a0c43953e..104f3c180 100644 --- a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt +++ b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt @@ -4,7 +4,7 @@ */ package software.aws.clientrt.content -import software.aws.clientrt.io.ByteReadChannel +import software.aws.clientrt.io.SdkByteReadChannel /** * Represents an abstract stream of bytes @@ -31,9 +31,9 @@ sealed class ByteStream { */ abstract class Reader : ByteStream() { /** - * Provides [ByteReadChannel] to read from/consume + * Provides [SdkByteReadChannel] to read from/consume */ - abstract fun readFrom(): ByteReadChannel + abstract fun readFrom(): SdkByteReadChannel } companion object { diff --git a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt index 11a02ed8d..054b17b81 100644 --- a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt +++ b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt @@ -5,7 +5,7 @@ package software.aws.clientrt.content -import software.aws.clientrt.io.ByteReadChannel +import software.aws.clientrt.io.SdkByteReadChannel import software.aws.clientrt.io.readChannel import java.io.File @@ -19,5 +19,5 @@ public class LocalFileContent( override val contentLength: Long get() = file.length() - override fun readFrom(): ByteReadChannel = file.readChannel() + override fun readFrom(): SdkByteReadChannel = file.readChannel() } diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt index 9ad8ea8dd..b4819d7b3 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt @@ -12,12 +12,12 @@ import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel import io.ktor.utils.io.ByteWriteChannel as KtorByteWriteChannel /** - * Wrap ktor's ByteReadChannel as our own. This implements the common API of [ByteReadChannel]. Only + * Wrap ktor's ByteReadChannel as our own. This implements the common API of [SdkByteReadChannel]. Only * platform specific differences in interfaces need be implemented in inheritors. */ internal abstract class KtorReadChannelAdapterBase( val chan: KtorByteReadChannel -) : ByteReadChannel { +) : SdkByteReadChannel { override val availableForRead: Int get() = chan.availableForRead @@ -46,12 +46,12 @@ internal abstract class KtorReadChannelAdapterBase( } /** - * Wrap ktor's ByteWriteChannel as our own. This implements the common API of [ByteWriteChannel]. Only + * Wrap ktor's ByteWriteChannel as our own. This implements the common API of [SdkByteWriteChannel]. Only * platform specific differences in interfaces need be implemented in inheritors. */ internal abstract class KtorWriteChannelAdapterBase( val chan: KtorByteWriteChannel -) : ByteWriteChannel { +) : SdkByteWriteChannel { override val availableForWrite: Int get() = chan.availableForWrite @@ -82,22 +82,22 @@ internal abstract class KtorWriteChannelAdapterBase( } /** - * Wrap ktor's ByteChannel as our own. This implements the common API of [ByteChannel]. Only + * Wrap ktor's ByteChannel as our own. This implements the common API of [SdkByteChannel]. Only * platform specific differences in interfaces need be implemented in inheritors. */ internal class KtorByteChannelAdapter( val chan: KtorByteChannel -) : ByteChannel, - ByteReadChannel by KtorReadChannelAdapter(chan), - ByteWriteChannel by KtorWriteChannelAdapter(chan) { +) : SdkByteChannel, + SdkByteReadChannel by KtorReadChannelAdapter(chan), + SdkByteWriteChannel by KtorWriteChannelAdapter(chan) { override val isClosedForWrite: Boolean get() = chan.isClosedForWrite } -internal expect class KtorReadChannelAdapter(chan: KtorByteReadChannel) : ByteReadChannel -internal expect class KtorWriteChannelAdapter(chan: KtorByteWriteChannel) : ByteWriteChannel +internal expect class KtorReadChannelAdapter(chan: KtorByteReadChannel) : SdkByteReadChannel +internal expect class KtorWriteChannelAdapter(chan: KtorByteWriteChannel) : SdkByteWriteChannel -internal fun KtorByteReadChannel.toSdkChannel(): ByteReadChannel = KtorReadChannelAdapter(this) -internal fun KtorByteWriteChannel.toSdkChannel(): ByteWriteChannel = KtorWriteChannelAdapter(this) -internal fun KtorByteChannel.toSdkChannel(): ByteChannel = KtorByteChannelAdapter(this) +internal fun KtorByteReadChannel.toSdkChannel(): SdkByteReadChannel = KtorReadChannelAdapter(this) +internal fun KtorByteWriteChannel.toSdkChannel(): SdkByteWriteChannel = KtorWriteChannelAdapter(this) +internal fun KtorByteChannel.toSdkChannel(): SdkByteChannel = KtorByteChannelAdapter(this) diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/ByteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt similarity index 79% rename from client-runtime/io/common/src/software/aws/clientrt/io/ByteChannel.kt rename to client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt index 09dc0c651..4c3fffc7c 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/ByteChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt @@ -17,19 +17,19 @@ import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel * cannot be invoked concurrently with themselves. Exceptions are [close] and [flush] which can be invoked * concurrently with other operations including between themselves at any time. */ -public interface ByteChannel : ByteReadChannel, ByteWriteChannel +public interface SdkByteChannel : SdkByteReadChannel, SdkByteWriteChannel /** * Create a buffered channel for asynchronous reading and writing of bytes */ -public fun ByteChannel(autoFlush: Boolean = true): ByteChannel = +public fun SdkByteChannel(autoFlush: Boolean = true): SdkByteChannel = KtorByteChannel(autoFlush).toSdkChannel() /** * Creates a channel for reading from the given byte array. */ -public fun ByteReadChannel( +public fun SdkByteReadChannel( content: ByteArray, offset: Int = 0, length: Int = content.size - offset -): ByteReadChannel = KtorByteReadChannel(content, offset, length).toSdkChannel() +): SdkByteReadChannel = KtorByteReadChannel(content, offset, length).toSdkChannel() diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt similarity index 97% rename from client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt rename to client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt index 5d4b5b23a..914cb57c6 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/ByteReadChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt @@ -8,7 +8,7 @@ package software.aws.clientrt.io * Supplies an asynchronous stream of bytes. Use this interface to read data from wherever it’s located: * from the network, storage, or a buffer in memory. This is a **single-reader channel**. */ -expect interface ByteReadChannel { +expect interface SdkByteReadChannel { /** * Returns number of bytes that can be read without suspension. Read operations do no suspend and * return immediately when this number is at least the number of bytes requested for read. diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt similarity index 97% rename from client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt rename to client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt index 9d1106a90..06c04b040 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/ByteWriteChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt @@ -8,7 +8,7 @@ package software.aws.clientrt.io /** * A channel for writing a sequence of bytes asynchronously. This is a **single writer channel**. */ -public expect interface ByteWriteChannel { +public expect interface SdkByteWriteChannel { /** * Returns the number of bytes that can be written without suspension. Write operations do not diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt index 967b918ea..9fcc2e9e0 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt @@ -25,7 +25,7 @@ public fun File.readChannel( start: Long = 0, endInclusive: Long = -1, coroutineContext: CoroutineContext = Dispatchers.IO -): ByteReadChannel = cioReadChannel(start, endInclusive, coroutineContext).toSdkChannel() +): SdkByteReadChannel = cioReadChannel(start, endInclusive, coroutineContext).toSdkChannel() /** * Open a read channel for file and launch a coroutine to fill it. @@ -33,7 +33,7 @@ public fun File.readChannel( * your async code */ @InternalApi -public fun Path.readChannel(start: Long, endInclusive: Long): ByteReadChannel = +public fun Path.readChannel(start: Long, endInclusive: Long): SdkByteReadChannel = toFile().readChannel(start, endInclusive) /** @@ -42,7 +42,7 @@ public fun Path.readChannel(start: Long, endInclusive: Long): ByteReadChannel = * your async code */ @InternalApi -public fun Path.readChannel(): ByteReadChannel = toFile().readChannel() +public fun Path.readChannel(): SdkByteReadChannel = toFile().readChannel() // FIXME - CoroutineContext makes coroutines-core an API dependency @@ -56,4 +56,4 @@ public fun Path.readChannel(): ByteReadChannel = toFile().readChannel() @InternalApi public fun File.writeChannel( coroutineContext: CoroutineContext = Dispatchers.IO -): ByteWriteChannel = cioWriteChannel(coroutineContext).toSdkChannel() +): SdkByteWriteChannel = cioWriteChannel(coroutineContext).toSdkChannel() diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/KtorAdaptersJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/KtorAdaptersJVM.kt index a108f8ab0..497391623 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/KtorAdaptersJVM.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/KtorAdaptersJVM.kt @@ -10,7 +10,7 @@ import io.ktor.utils.io.ByteWriteChannel as KtorByteWriteChannel internal actual class KtorReadChannelAdapter actual constructor( chan: KtorByteReadChannel -) : ByteReadChannel, KtorReadChannelAdapterBase(chan) { +) : SdkByteReadChannel, KtorReadChannelAdapterBase(chan) { override suspend fun readAvailable(sink: ByteBuffer): Int { return chan.readAvailable(sink) } @@ -18,4 +18,4 @@ internal actual class KtorReadChannelAdapter actual constructor( internal actual class KtorWriteChannelAdapter actual constructor( chan: KtorByteWriteChannel -) : ByteWriteChannel, KtorWriteChannelAdapterBase(chan) +) : SdkByteWriteChannel, KtorWriteChannelAdapterBase(chan) diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/ByteReadChannelJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt similarity index 97% rename from client-runtime/io/jvm/src/software/aws/clientrt/io/ByteReadChannelJVM.kt rename to client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt index da10699d9..8c8b5a41a 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/ByteReadChannelJVM.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt @@ -13,7 +13,7 @@ import java.nio.ByteBuffer * * This interface is functionally equivalent to an asynchronous coroutine compatible [java.io.InputStream] */ -actual interface ByteReadChannel { +actual interface SdkByteReadChannel { /** * Returns number of bytes that can be read without suspension. Read operations do no suspend and return immediately when this number is at least the number of bytes requested for read. */ diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/ByteWriteChannelJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteWriteChannelJVM.kt similarity index 97% rename from client-runtime/io/jvm/src/software/aws/clientrt/io/ByteWriteChannelJVM.kt rename to client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteWriteChannelJVM.kt index 8dd3f8e3e..4427c55df 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/ByteWriteChannelJVM.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteWriteChannelJVM.kt @@ -6,7 +6,7 @@ package software.aws.clientrt.io -public actual interface ByteWriteChannel { +public actual interface SdkByteWriteChannel { /** * Returns the number of bytes that can be written without suspension. Write operations do not * suspend and return immediately when this number is at least the number of bytes requested for diff --git a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt index f9a2512ed..cf9bdb6a1 100644 --- a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt +++ b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt @@ -16,6 +16,7 @@ import kotlinx.coroutines.launch import software.aws.clientrt.http.HttpBody import software.aws.clientrt.http.request.HttpRequest import software.aws.clientrt.http.request.HttpRequestBuilder +import software.aws.clientrt.io.SdkByteReadChannel import java.nio.ByteBuffer import kotlin.coroutines.CoroutineContext import io.ktor.client.request.HttpRequestBuilder as KtorRequestBuilder @@ -67,6 +68,8 @@ internal class KtorRequestAdapter( override val contentLength: Long? = body.contentLength override fun readFrom(): ByteReadChannel { + // FIXME - instead of reading and writing bytes we could probably proxy the underlying channel + // and/or since we use ktor under the hood if we could access the underlying channel that would be best // we want to read values off the incoming source and write them to this channel val channel = ByteChannel() @@ -84,8 +87,7 @@ internal class KtorRequestAdapter( return channel } - private suspend fun forwardSource(dst: ByteChannel, source: software.aws.clientrt.io.ByteReadChannel) { - // TODO - consider a buffer pool here + private suspend fun forwardSource(dst: ByteChannel, source: SdkByteReadChannel) { val buffer = ByteBuffer.allocate(BUFFER_SIZE) while (!source.isClosedForRead) { // fill the buffer by reading chunks from the underlying source diff --git a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt index a0ea318a5..623a41e3a 100644 --- a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt +++ b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt @@ -12,10 +12,10 @@ import software.aws.clientrt.http.HttpBody import software.aws.clientrt.http.HttpStatusCode import software.aws.clientrt.http.request.HttpRequest import software.aws.clientrt.http.request.HttpRequestBuilder +import software.aws.clientrt.io.SdkByteReadChannel import java.nio.ByteBuffer import io.ktor.client.request.HttpRequestBuilder as KtorHttpRequestBuilder import software.aws.clientrt.http.response.HttpResponse as SdkHttpResponse -import software.aws.clientrt.io.ByteReadChannel as SdkByteReadChannel // convert everything **except** the body from an Sdk HttpRequestBuilder to equivalent Ktor abstraction internal fun HttpRequest.toKtorRequestBuilder(): KtorHttpRequestBuilder { diff --git a/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt b/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt index 6bf6d86cf..3e263ca59 100644 --- a/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt +++ b/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt @@ -5,7 +5,7 @@ package software.aws.clientrt.http import software.aws.clientrt.content.ByteStream -import software.aws.clientrt.io.ByteReadChannel +import software.aws.clientrt.io.SdkByteReadChannel /** * HTTP payload to be sent to a peer @@ -42,9 +42,9 @@ sealed class HttpBody { */ abstract class Streaming : HttpBody() { /** - * Provides [ByteReadChannel] for the content + * Provides [SdkByteReadChannel] for the content */ - abstract fun readFrom(): ByteReadChannel + abstract fun readFrom(): SdkByteReadChannel } } @@ -59,7 +59,7 @@ fun ByteStream.toHttpBody(): HttpBody { } is ByteStream.Reader -> object : HttpBody.Streaming() { override val contentLength: Long? = bytestream.contentLength - override fun readFrom(): ByteReadChannel = bytestream.readFrom() + override fun readFrom(): SdkByteReadChannel = bytestream.readFrom() } } } @@ -87,7 +87,7 @@ fun HttpBody.toByteStream(): ByteStream? { } is HttpBody.Streaming -> object : ByteStream.Reader() { override val contentLength: Long? = body.contentLength - override fun readFrom(): ByteReadChannel = body.readFrom() + override fun readFrom(): SdkByteReadChannel = body.readFrom() } } } From 8a680c1aa0662697d3e5764b1415119a697eb6b7 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Thu, 25 Mar 2021 11:01:31 -0400 Subject: [PATCH 07/23] add JVM extensions for reading and writing bytestream as a file --- .../aws/clientrt/content/ByteStream.kt | 1 - .../aws/clientrt/content/ByteStreamJVM.kt | 57 +++++++++++++++++++ .../software/aws/clientrt/io/KtorAdapters.kt | 31 ++++++---- .../aws/clientrt/io/SdkByteChannel.kt | 6 +- .../aws/clientrt/io/SdkByteReadChannel.kt | 31 +++++++++- .../aws/clientrt/io/SdkByteWriteChannel.kt | 11 +++- .../aws/clientrt/io/SdkByteReadChannelJVM.kt | 4 +- .../aws/clientrt/io/SdkByteWriteChannelJVM.kt | 6 +- 8 files changed, 129 insertions(+), 18 deletions(-) create mode 100644 client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt diff --git a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt index 104f3c180..8e1a6983c 100644 --- a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt +++ b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt @@ -56,7 +56,6 @@ suspend fun ByteStream.toByteArray(): ByteArray { } } -@OptIn(ExperimentalStdlibApi::class) suspend fun ByteStream.decodeToString(): String = toByteArray().decodeToString() fun ByteStream.cancel() { diff --git a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt new file mode 100644 index 000000000..61020aea1 --- /dev/null +++ b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt @@ -0,0 +1,57 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.content + +import software.aws.clientrt.io.SdkByteReadChannel +import software.aws.clientrt.io.copyTo +import software.aws.clientrt.io.writeChannel +import java.io.File +import java.nio.file.Path + +// JVM specific extensions for dealing with ByteStream's + +/** + * Create a [ByteStream] from a file + */ +fun ByteStream.Companion.fromFile(file: File): ByteStream = file.asByteStream() + +/** + * Create a [ByteStream] from a file + */ +fun File.asByteStream(): ByteStream = LocalFileContent(this) + +/** + * Create a [ByteStream] from a path + */ +fun Path.asByteStream(): ByteStream { + val f = toFile() + require(f.isFile) { "cannot create a ByteStream from a directory: $this" } + return f.asByteStream() +} + +/** + * Write the contents of this ByteStream to file and close it + */ +suspend fun ByteStream.toFile(file: File): Long { + require(file.isFile) { "cannot write contents of ByteStream to a directory: ${file.absolutePath}" } + val writer = file.writeChannel() + val src = when (this) { + is ByteStream.Buffer -> SdkByteReadChannel(bytes()) + is ByteStream.Reader -> readFrom() + } + + try { + return src.copyTo(writer) + } finally { + writer.close() + src.close() + } +} + +/** + * Write the contents of this ByteStream to file at the given path + */ +suspend fun ByteStream.toFile(path: Path): Long = toFile(path.toFile()) diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt index b4819d7b3..58a7781cb 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt @@ -11,13 +11,22 @@ import io.ktor.utils.io.ByteChannel as KtorByteChannel import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel import io.ktor.utils.io.ByteWriteChannel as KtorByteWriteChannel +// marker interfaces used internally for accessing the underlying ktor impl +internal interface IsKtorReadChannel { + val chan: KtorByteReadChannel +} + +internal interface IsKtorWriteChannel { + val chan: KtorByteWriteChannel +} + /** * Wrap ktor's ByteReadChannel as our own. This implements the common API of [SdkByteReadChannel]. Only * platform specific differences in interfaces need be implemented in inheritors. */ internal abstract class KtorReadChannelAdapterBase( - val chan: KtorByteReadChannel -) : SdkByteReadChannel { + override val chan: KtorByteReadChannel +) : SdkByteReadChannel, IsKtorReadChannel { override val availableForRead: Int get() = chan.availableForRead @@ -50,8 +59,8 @@ internal abstract class KtorReadChannelAdapterBase( * platform specific differences in interfaces need be implemented in inheritors. */ internal abstract class KtorWriteChannelAdapterBase( - val chan: KtorByteWriteChannel -) : SdkByteWriteChannel { + override val chan: KtorByteWriteChannel +) : SdkByteWriteChannel, IsKtorWriteChannel { override val availableForWrite: Int get() = chan.availableForWrite @@ -72,7 +81,7 @@ internal abstract class KtorWriteChannelAdapterBase( return chan.writeAvailable(src, offset, length) } - override suspend fun close(cause: Throwable?): Boolean { + override fun close(cause: Throwable?): Boolean { return chan.close(cause) } @@ -82,17 +91,19 @@ internal abstract class KtorWriteChannelAdapterBase( } /** - * Wrap ktor's ByteChannel as our own. This implements the common API of [SdkByteChannel]. Only - * platform specific differences in interfaces need be implemented in inheritors. + * Wrap ktor's ByteChannel as our own */ - internal class KtorByteChannelAdapter( - val chan: KtorByteChannel + override val chan: KtorByteChannel ) : SdkByteChannel, SdkByteReadChannel by KtorReadChannelAdapter(chan), - SdkByteWriteChannel by KtorWriteChannelAdapter(chan) { + SdkByteWriteChannel by KtorWriteChannelAdapter(chan), + IsKtorWriteChannel, + IsKtorReadChannel { override val isClosedForWrite: Boolean get() = chan.isClosedForWrite + + override fun close() { chan.close(null) } } internal expect class KtorReadChannelAdapter(chan: KtorByteReadChannel) : SdkByteReadChannel diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt index 4c3fffc7c..6cc1b0ec7 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt @@ -17,7 +17,11 @@ import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel * cannot be invoked concurrently with themselves. Exceptions are [close] and [flush] which can be invoked * concurrently with other operations including between themselves at any time. */ -public interface SdkByteChannel : SdkByteReadChannel, SdkByteWriteChannel +public interface SdkByteChannel : SdkByteReadChannel, SdkByteWriteChannel { + override fun close() { + (this as SdkByteWriteChannel).close() + } +} /** * Create a buffered channel for asynchronous reading and writing of bytes diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt index 914cb57c6..ab1615937 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt @@ -4,11 +4,13 @@ */ package software.aws.clientrt.io +import io.ktor.utils.io.* + /** * Supplies an asynchronous stream of bytes. Use this interface to read data from wherever it’s located: * from the network, storage, or a buffer in memory. This is a **single-reader channel**. */ -expect interface SdkByteReadChannel { +expect interface SdkByteReadChannel : Closeable { /** * Returns number of bytes that can be read without suspension. Read operations do no suspend and * return immediately when this number is at least the number of bytes requested for read. @@ -49,3 +51,30 @@ expect interface SdkByteReadChannel { */ fun cancel(cause: Throwable?): Boolean } + +/** + * Reads up to [limit] bytes from receiver channel and writes them to [dst] channel. + * Closes [dst] channel if fails to read or write with cause exception. + * @return a number of bytes copied + */ +public suspend fun SdkByteReadChannel.copyTo(dst: SdkByteWriteChannel, limit: Long = Long.MAX_VALUE): Long { + require(this !== dst) + if (limit == 0L) return 0L + + // delegate to ktor-io if possible which may have further optimizations based on impl + if (this is IsKtorReadChannel && dst is IsKtorWriteChannel) { + return chan.copyTo(dst.chan) + } + + TODO("not implemented") +} + +private suspend fun SdkByteReadChannel.copyToImpl(dst: SdkByteWriteChannel, limit: Long): Long { + try { + } catch (t: Throwable) { + dst.close(t) + throw t + } finally { + } + TODO("not implemented") +} diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt index 06c04b040..5d4ce57ac 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt @@ -8,7 +8,7 @@ package software.aws.clientrt.io /** * A channel for writing a sequence of bytes asynchronously. This is a **single writer channel**. */ -public expect interface SdkByteWriteChannel { +public expect interface SdkByteWriteChannel : Closeable { /** * Returns the number of bytes that can be written without suspension. Write operations do not @@ -50,7 +50,7 @@ public expect interface SdkByteWriteChannel { * Closes this channel with an optional exceptional [cause]. All pending bytes are flushed. * This is an idempotent operation — subsequent invocations of this function have no effect and return false */ - suspend fun close(cause: Throwable?): Boolean + fun close(cause: Throwable?): Boolean /** * Flushes all pending write bytes making them available for read. @@ -58,3 +58,10 @@ public expect interface SdkByteWriteChannel { */ fun flush(): Unit } + +/** + * Write the UTF-8 bytes of [str] fully to the channel + */ +public suspend fun SdkByteWriteChannel.writeUtf8(str: String) { + writeFully(str.encodeToByteArray()) +} diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt index 8c8b5a41a..f5ab7d476 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt @@ -13,7 +13,7 @@ import java.nio.ByteBuffer * * This interface is functionally equivalent to an asynchronous coroutine compatible [java.io.InputStream] */ -actual interface SdkByteReadChannel { +actual interface SdkByteReadChannel : Closeable { /** * Returns number of bytes that can be read without suspension. Read operations do no suspend and return immediately when this number is at least the number of bytes requested for read. */ @@ -47,4 +47,6 @@ actual interface SdkByteReadChannel { * Close channel with optional cause cancellation */ actual fun cancel(cause: Throwable?): Boolean + + override fun close() { cancel(null) } } diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteWriteChannelJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteWriteChannelJVM.kt index 4427c55df..2b097d820 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteWriteChannelJVM.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteWriteChannelJVM.kt @@ -6,7 +6,7 @@ package software.aws.clientrt.io -public actual interface SdkByteWriteChannel { +public actual interface SdkByteWriteChannel : Closeable { /** * Returns the number of bytes that can be written without suspension. Write operations do not * suspend and return immediately when this number is at least the number of bytes requested for @@ -47,11 +47,13 @@ public actual interface SdkByteWriteChannel { * Closes this channel with an optional exceptional [cause]. All pending bytes are flushed. * This is an idempotent operation — subsequent invocations of this function have no effect and return false */ - actual suspend fun close(cause: Throwable?): Boolean + actual fun close(cause: Throwable?): Boolean /** * Flushes all pending write bytes making them available for read. * Thread safe and can be invoked at any time. It does nothing when invoked on a closed channel. */ actual fun flush() + + override fun close() { close(null) } } From 9ef7b21c2631432cc50b83bf2b41331587b9efd5 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Thu, 25 Mar 2021 15:32:10 -0400 Subject: [PATCH 08/23] bootstrap some sanity tests --- .../aws/clientrt/io/SdkByteReadChannel.kt | 19 ++- .../aws/clientrt/io/SdkByteWriteChannel.kt | 11 ++ .../clientrt/io/SdkByteChannelSmokeTest.kt | 146 ++++++++++++++++++ .../clientrt/io/middleware/MiddlewareTest.kt | 26 ++++ .../software/aws/clientrt/testing/runTest.kt | 2 + 5 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt index ab1615937..aee52a8ce 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt @@ -28,6 +28,10 @@ expect interface SdkByteReadChannel : Closeable { */ val isClosedForWrite: Boolean + // FIXME - replace with readRemaining(limit: Long): ByteArray + // this blocks until EOF which means you can only invoke this on a closed channel currently. + // Without a limit it will _always_ block when channel isn't closed + /** * Read the entire content into a [ByteArray]. NOTE: Be careful this will read the entire byte stream * into memory. @@ -38,12 +42,12 @@ expect interface SdkByteReadChannel : Closeable { * Reads all length bytes to [sink] buffer or fails if source has been closed. Suspends if not enough * bytes available. */ - suspend fun readFully(sink: ByteArray, offset: Int, length: Int) + suspend fun readFully(sink: ByteArray, offset: Int = 0, length: Int = sink.size - offset) /** * Reads all available bytes to [sink] buffer and returns immediately or suspends if no bytes available */ - suspend fun readAvailable(sink: ByteArray, offset: Int, length: Int): Int + suspend fun readAvailable(sink: ByteArray, offset: Int = 0, length: Int = sink.size - offset): Int /** * Close channel with optional cause cancellation. @@ -78,3 +82,14 @@ private suspend fun SdkByteReadChannel.copyToImpl(dst: SdkByteWriteChannel, limi } TODO("not implemented") } + +/** + * Reads a single byte from the channel and suspends until available + */ +public suspend fun SdkByteReadChannel.readByte(): Byte { + if (this is IsKtorReadChannel) return chan.readByte() + // TODO - we could pool these + val out = ByteArray(1) + readFully(out) + return out[0] +} diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt index 5d4ce57ac..d49fd2be4 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt @@ -65,3 +65,14 @@ public expect interface SdkByteWriteChannel : Closeable { public suspend fun SdkByteWriteChannel.writeUtf8(str: String) { writeFully(str.encodeToByteArray()) } + +/** + * Writes byte and suspends until written + */ +public suspend fun SdkByteWriteChannel.writeByte(value: Byte) { + if (this is IsKtorWriteChannel) { + chan.writeByte(value) + return + } + writeFully(byteArrayOf(value)) +} diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt new file mode 100644 index 000000000..515d5aac6 --- /dev/null +++ b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt @@ -0,0 +1,146 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +package software.aws.clientrt.io + +import io.ktor.utils.io.core.* +import software.aws.clientrt.testing.runSuspendTest +import kotlin.test.* + +open class SdkByteChannelSmokeTest { + + @Test + fun testCreateAndClose() { + val chan = SdkByteChannel(false) + chan.close() + } + + @Test + fun testAutoFlush() = runSuspendTest { + SdkByteChannel(false).use { chan -> + assertFalse(chan.autoFlush) + chan.writeByte(1) + chan.writeByte(2) + assertEquals(0, chan.availableForRead) + assertEquals(2, chan.totalBytesWritten) + chan.flush() + assertEquals(2, chan.availableForRead) + } + + SdkByteChannel(true).use { chan -> + assertTrue(chan.autoFlush) + chan.writeByte(1) + chan.writeByte(2) + assertEquals(2, chan.totalBytesWritten) + assertEquals(2, chan.availableForRead) + } + } + + @Test + fun testClose() = runSuspendTest { + val chan = SdkByteChannel(false) + chan.writeByte(1) + chan.writeByte(2) + chan.writeByte(3) + chan.flush() + + assertEquals(3, chan.availableForRead) + assertFalse(chan.isClosedForRead) + assertFalse(chan.isClosedForWrite) + + assertEquals(1, chan.readByte()) + chan.close() + + assertEquals(3, chan.totalBytesWritten) + assertEquals(2, chan.readByte()) + assertEquals(3, chan.readByte()) + assertEquals(0, chan.availableForRead) + assertTrue(chan.isClosedForRead) + + try { + chan.readByte() + fail("reading on an empty closed channel should have thrown") + } catch (expected: EOFException) { + } catch (expected: NoSuchElementException) { + } + } + + @Test + fun testReadAndWriteFully() = runSuspendTest { + val src = byteArrayOf(1, 2, 3, 4, 5) + val sink = ByteArray(5) + val chan = SdkByteChannel(false) + + chan.writeFully(src) + chan.flush() + assertEquals(5, chan.availableForRead) + chan.readFully(sink) + assertTrue { sink.contentEquals(src) } + + // split full read + chan.writeFully(src) + chan.flush() + val partial = ByteArray(4) + chan.readFully(partial) + assertEquals(1, chan.availableForRead) + assertEquals(5, chan.readByte()) + chan.close() + + try { + chan.readByte() + fail("reading on an empty closed channel should have thrown") + } catch (expected: EOFException) { + } catch (expected: NoSuchElementException) { + } + } + + @Test + fun testWriteString() = runSuspendTest { + val chan = SdkByteChannel(false) + val content = "I meant what I said. And said what I meant. An elephant's faithful. One hundred percent!" + chan.writeUtf8(content) + chan.close() + val actual = chan.readAll().decodeToString() + assertEquals(content, actual) + } + + @Test + fun testReadChannelByteArrayCtor() = runSuspendTest { + val src = byteArrayOf(1, 2, 3, 4, 5) + val chan = SdkByteReadChannel(src) + assertTrue(chan.isClosedForWrite) + assertFalse(chan.isClosedForRead) + + assertEquals(5, chan.availableForRead) + val sink = ByteArray(5) + var rc = chan.readAvailable(sink, 0, 4) + assertEquals(4, rc) + + assertEquals(1, chan.availableForRead) + rc = chan.readAvailable(sink, 4) + assertEquals(1, rc) + + assertTrue { sink.contentEquals(src) } + assertTrue(chan.isClosedForRead) + } + + @Test + fun testCloseableUse() = runSuspendTest { + val chan = SdkByteChannel(true) + chan.writeFully(byteArrayOf(1, 2, 3, 4, 5)) + val rc = chan.use { + assertFalse(it.isClosedForWrite) + assertFalse(it.isClosedForRead) + println(it.availableForRead) + val sink = ByteArray(4) + it.readAvailable(sink) + } + assertTrue(chan.isClosedForWrite) + assertFalse(chan.isClosedForRead) + assertEquals(4, rc) + chan.readByte() + // should only flip after all bytes read + assertTrue(chan.isClosedForRead) + } +} diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/middleware/MiddlewareTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/middleware/MiddlewareTest.kt index c238baaa9..096111611 100644 --- a/client-runtime/io/common/test/software/aws/clientrt/io/middleware/MiddlewareTest.kt +++ b/client-runtime/io/common/test/software/aws/clientrt/io/middleware/MiddlewareTest.kt @@ -42,4 +42,30 @@ class MiddlewareTest { } assertEquals("Foo", handler.call("foo")) } + + @Test + fun testMapRequest() = runSuspendTest { + val handler = HandlerLambda { + it + } + + val mr = MapRequest(handler) { r1: Int -> + r1.toString() + } + + assertEquals("12", mr.call(12)) + } + + @Test + fun testMapResponse() = runSuspendTest { + val handler = HandlerLambda { + it + } + + val mr = MapResponse(handler) { r: String -> + r.toInt() + } + + assertEquals(22, mr.call("22")) + } } diff --git a/client-runtime/testing/common/src/software/aws/clientrt/testing/runTest.kt b/client-runtime/testing/common/src/software/aws/clientrt/testing/runTest.kt index 842f9e68b..506f54d34 100644 --- a/client-runtime/testing/common/src/software/aws/clientrt/testing/runTest.kt +++ b/client-runtime/testing/common/src/software/aws/clientrt/testing/runTest.kt @@ -8,6 +8,8 @@ import kotlinx.coroutines.CoroutineScope import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext +// FIXME - this can go away if/when this lands: https://github.com/Kotlin/kotlinx.coroutines/issues/1996 + /** * MPP compatible runBlocking to run suspend tests in common modules */ From ff0e32b3c08bbb0eb4ed93933fc6979969c68bb7 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Fri, 26 Mar 2021 16:38:16 -0400 Subject: [PATCH 09/23] add a fallback copy to method --- .../aws/clientrt/io/SdkByteChannel.kt | 3 + .../aws/clientrt/io/SdkByteReadChannel.kt | 46 ++++++++++--- .../src/software/aws/clientrt/io/Source.kt | 15 ---- .../aws/clientrt/io/SdkByteChannelOpsTest.kt | 69 +++++++++++++++++++ .../clientrt/io/SdkByteChannelSmokeTest.kt | 21 ++++++ 5 files changed, 131 insertions(+), 23 deletions(-) delete mode 100644 client-runtime/io/common/src/software/aws/clientrt/io/Source.kt create mode 100644 client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelOpsTest.kt diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt index 6cc1b0ec7..97673bffb 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteChannel.kt @@ -5,6 +5,7 @@ package software.aws.clientrt.io +import software.aws.clientrt.util.InternalApi import io.ktor.utils.io.ByteChannel as KtorByteChannel import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel @@ -26,12 +27,14 @@ public interface SdkByteChannel : SdkByteReadChannel, SdkByteWriteChannel { /** * Create a buffered channel for asynchronous reading and writing of bytes */ +@InternalApi public fun SdkByteChannel(autoFlush: Boolean = true): SdkByteChannel = KtorByteChannel(autoFlush).toSdkChannel() /** * Creates a channel for reading from the given byte array. */ +@InternalApi public fun SdkByteReadChannel( content: ByteArray, offset: Int = 0, diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt index aee52a8ce..b8941f68f 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt @@ -58,29 +58,59 @@ expect interface SdkByteReadChannel : Closeable { /** * Reads up to [limit] bytes from receiver channel and writes them to [dst] channel. - * Closes [dst] channel if fails to read or write with cause exception. + * + * Closes [dst] channel when copy completes if [close] is `true`. + * NOTE: Always closes [dst] channel if fails to read or write with cause exception. + * * @return a number of bytes copied */ -public suspend fun SdkByteReadChannel.copyTo(dst: SdkByteWriteChannel, limit: Long = Long.MAX_VALUE): Long { +public suspend fun SdkByteReadChannel.copyTo( + dst: SdkByteWriteChannel, + limit: Long = Long.MAX_VALUE, + close: Boolean = true +): Long { require(this !== dst) if (limit == 0L) return 0L // delegate to ktor-io if possible which may have further optimizations based on impl - if (this is IsKtorReadChannel && dst is IsKtorWriteChannel) { - return chan.copyTo(dst.chan) + val cnt = if (this is IsKtorReadChannel && dst is IsKtorWriteChannel) { + chan.copyTo(dst.chan, limit) + } else { + copyToFallback(dst, limit) } - TODO("not implemented") + if (close) dst.close() + + return cnt } -private suspend fun SdkByteReadChannel.copyToImpl(dst: SdkByteWriteChannel, limit: Long): Long { +internal suspend fun SdkByteReadChannel.copyToFallback(dst: SdkByteWriteChannel, limit: Long): Long { + val flushDst = !dst.autoFlush + val buffer = ByteArray(4096) + try { + var copied = 0L + + while (true) { + val remaining = limit - copied + if (remaining == 0L) break + + val rc = readAvailable(buffer, 0, minOf(buffer.size.toLong(), remaining).toInt()) + if (rc == -1) break + + dst.writeFully(buffer, 0, rc) + copied += rc + + if (flushDst && availableForRead == 0) { + dst.flush() + } + } + + return copied } catch (t: Throwable) { dst.close(t) throw t - } finally { } - TODO("not implemented") } /** diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/Source.kt b/client-runtime/io/common/src/software/aws/clientrt/io/Source.kt deleted file mode 100644 index db075d137..000000000 --- a/client-runtime/io/common/src/software/aws/clientrt/io/Source.kt +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0. - */ -package software.aws.clientrt.io - -/** - * A synchronous read only stream of bytes (similar to java.io.InputStream) - */ -public interface Source : Closeable { - - fun read(sink: Buffer, byteCount: Int): Int - -// fun cursor(): Cursor? -} diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelOpsTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelOpsTest.kt new file mode 100644 index 000000000..19e9939fa --- /dev/null +++ b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelOpsTest.kt @@ -0,0 +1,69 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import software.aws.clientrt.testing.runSuspendTest +import kotlin.test.* + +class SdkByteChannelOpsTest { + + @Test + fun testCopyTo() = runSuspendTest { + val dst = SdkByteChannel(false) + + val contents = byteArrayOf(1, 2, 3, 4, 5) + val src1 = SdkByteReadChannel(contents) + val copied = src1.copyTo(dst, close = false) + assertEquals(5, copied) + + val buffer = ByteArray(5) + dst.readAvailable(buffer) + assertTrue { contents.contentEquals(buffer) } + assertFalse(dst.isClosedForWrite) + + val src2 = SdkByteReadChannel(contents) + val rc = src2.copyTo(dst, limit = 3) + assertTrue(dst.isClosedForWrite) + assertEquals(3, rc) + dst.readAvailable(buffer) + val expected = byteArrayOf(1, 2, 3) + assertTrue { expected.contentEquals(buffer.sliceArray(0..2)) } + } + + @Test + fun testCopyToFallback() = runSuspendTest { + val dst = SdkByteChannel(false) + + val contents = byteArrayOf(1, 2, 3, 4, 5) + val src1 = SdkByteReadChannel(contents) + val copied = src1.copyToFallback(dst, Long.MAX_VALUE) + assertEquals(5, copied) + + val buffer = ByteArray(5) + dst.readAvailable(buffer) + assertTrue { contents.contentEquals(buffer) } + assertFalse(dst.isClosedForWrite) + + val src2 = SdkByteReadChannel(contents) + val rc = src2.copyToFallback(dst, limit = 3) + dst.close() + assertTrue(dst.isClosedForWrite) + assertEquals(3, rc) + dst.readAvailable(buffer) + val expected = byteArrayOf(1, 2, 3) + assertTrue { expected.contentEquals(buffer.sliceArray(0..2)) } + } + + @Test + fun testCopyToSameOrZero() = runSuspendTest { + val chan = SdkByteChannel(false) + assertFailsWith { + chan.copyTo(chan) + } + val dst = SdkByteChannel(false) + assertEquals(0, chan.copyTo(dst, limit = 0)) + } +} diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt index 515d5aac6..1990fb5e7 100644 --- a/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt +++ b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt @@ -4,6 +4,7 @@ */ package software.aws.clientrt.io +import io.kotest.matchers.string.shouldContain import io.ktor.utils.io.core.* import software.aws.clientrt.testing.runSuspendTest import kotlin.test.* @@ -95,6 +96,26 @@ open class SdkByteChannelSmokeTest { } } + @Test + fun testReadAndWritePartial(): Unit = runSuspendTest { + val src = byteArrayOf(1, 2, 3, 4, 5) + val chan = SdkByteChannel(false) + chan.writeFully(src) + chan.flush() + + val buf1 = ByteArray(3) + val rc1 = chan.readAvailable(buf1) + assertEquals(3, rc1) + + chan.close() + // requested full read size is larger than what's left after close + val buf2 = ByteArray(16) + val ex = assertFails { + chan.readFully(buf2) + } + ex.message.shouldContain("expected 14 more bytes") + } + @Test fun testWriteString() = runSuspendTest { val chan = SdkByteChannel(false) From ebf574a262fd4c5894889c8cd1f187fa50598d94 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Fri, 26 Mar 2021 16:55:56 -0400 Subject: [PATCH 10/23] docs --- .../common/src/software/aws/clientrt/content/ByteStream.kt | 2 +- .../io/jvm/src/software/aws/clientrt/io/FileChannels.kt | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt index 8e1a6983c..b88330442 100644 --- a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt +++ b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt @@ -7,7 +7,7 @@ package software.aws.clientrt.content import software.aws.clientrt.io.SdkByteReadChannel /** - * Represents an abstract stream of bytes + * Represents an abstract read-only stream of bytes */ sealed class ByteStream { diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt index 9fcc2e9e0..faccbf0fd 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt @@ -44,8 +44,6 @@ public fun Path.readChannel(start: Long, endInclusive: Long): SdkByteReadChannel @InternalApi public fun Path.readChannel(): SdkByteReadChannel = toFile().readChannel() -// FIXME - CoroutineContext makes coroutines-core an API dependency - /** * Open a write channel for the file and launch a coroutine to read from it. * Please note that file writing is blocking so if you are starting it on [Dispatchers.Unconfined] it may block From 8fac87c85bf131ea4dbc3ace4bb2713d9a84ac03 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Mon, 29 Mar 2021 08:35:56 -0400 Subject: [PATCH 11/23] define buffer size as constant --- .../common/src/software/aws/clientrt/io/SdkByteReadChannel.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt index b8941f68f..e2f1fd638 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt @@ -6,6 +6,8 @@ package software.aws.clientrt.io import io.ktor.utils.io.* +internal const val DEFAULT_BUFFER_SIZE: Int = 4096 + /** * Supplies an asynchronous stream of bytes. Use this interface to read data from wherever it’s located: * from the network, storage, or a buffer in memory. This is a **single-reader channel**. @@ -86,7 +88,7 @@ public suspend fun SdkByteReadChannel.copyTo( internal suspend fun SdkByteReadChannel.copyToFallback(dst: SdkByteWriteChannel, limit: Long): Long { val flushDst = !dst.autoFlush - val buffer = ByteArray(4096) + val buffer = ByteArray(DEFAULT_BUFFER_SIZE) try { var copied = 0L From 7bdbc535eb79c948ea6cd40343458fdf77149ca9 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 31 Mar 2021 08:18:49 -0400 Subject: [PATCH 12/23] port over a reasonable buffer abstraction --- .../src/software/aws/clientrt/io/Allocator.kt | 49 ++++ .../src/software/aws/clientrt/io/SdkBuffer.kt | 238 +++++++++++++++++ .../software/aws/clientrt/io/AllocatorTest.kt | 39 +++ .../software/aws/clientrt/io/SdkBufferTest.kt | 240 ++++++++++++++++++ .../aws/clientrt/io/SdkByteChannelOpsTest.kt | 12 + .../software/aws/clientrt/io/AllocatorJVM.kt | 15 ++ .../software/aws/clientrt/io/SdkBufferJVM.kt | 15 ++ .../aws/clientrt/io/SdkByteReadChannelJVM.kt | 6 + .../http/engine/ktor/KtorRequestAdapter.kt | 1 + 9 files changed, 615 insertions(+) create mode 100644 client-runtime/io/common/src/software/aws/clientrt/io/Allocator.kt create mode 100644 client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt create mode 100644 client-runtime/io/common/test/software/aws/clientrt/io/AllocatorTest.kt create mode 100644 client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt create mode 100644 client-runtime/io/jvm/src/software/aws/clientrt/io/AllocatorJVM.kt create mode 100644 client-runtime/io/jvm/src/software/aws/clientrt/io/SdkBufferJVM.kt diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/Allocator.kt b/client-runtime/io/common/src/software/aws/clientrt/io/Allocator.kt new file mode 100644 index 000000000..835c044d2 --- /dev/null +++ b/client-runtime/io/common/src/software/aws/clientrt/io/Allocator.kt @@ -0,0 +1,49 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import io.ktor.utils.io.bits.* +import io.ktor.utils.io.core.* + +@OptIn(ExperimentalIoApi::class) +internal interface Allocator { + fun alloc(size: Int): Memory + // FIXME - we should revisit this - Kotlin/Native is only place where we would actually be manually managing memory + // and that story may change to the point where a free() function isn't even necessary + fun free(instance: Memory) +} + +// allocate using the most appropriate underlying platform type (e.g. ByteBuffer on JVM, ArrayBuffer on JS, etc) +internal expect object DefaultAllocator : Allocator + +/** + * Round up to the next power of 2. [size] should be non-negative + */ +internal fun ceilp2(size: Int): Int { + require(size >= 0) { "must be positive integer" } + var x = size - 1 + x = x or (x shr 1) + x = x or (x shr 2) + x = x or (x shr 4) + x = x or (x shr 8) + x = x or (x shr 16) + return x + 1 +} + +/** + * Allocate new memory of size [newSize], copy the contents of [instance] into it and free [instance] + * and return the newly allocated memory. + * + * The memory of [instance] should no longer be used after calling. + */ +@OptIn(ExperimentalIoApi::class) +internal fun Allocator.realloc(instance: Memory, newSize: Int): Memory { + require(newSize >= instance.size32) + val newInstance = alloc(newSize) + instance.copyTo(newInstance, 0, instance.size32, 0) + free(instance) + return newInstance +} diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt new file mode 100644 index 000000000..3678956a8 --- /dev/null +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt @@ -0,0 +1,238 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import io.ktor.utils.io.bits.* +import io.ktor.utils.io.core.* +import software.aws.clientrt.util.InternalApi + +private class SdkBufferState { + var writeHead: Int = 0 + var readHead: Int = 0 +} + +/** + * A buffer with read and write positions + * + * **concurrent unsafe**: Do not read/write using the same [SdkBuffer] instance from different threads. + */ +@OptIn(ExperimentalIoApi::class) +@InternalApi +class SdkBuffer( + capacity: Int +) { + internal var memory = DefaultAllocator.alloc(capacity) + private val state = SdkBufferState() + + /** + * The total capacity of the buffer + */ + val capacity: Int + get() = memory.size32 + + /** + * The current read position. Always non-negative and <= [writePosition]. + */ + val readPosition: Int + get() = state.readHead + + /** + * The current write position. Will never run ahead of [capacity] and always greater than [readPosition] + */ + val writePosition: Int + get() = state.writeHead + + /** + * Number of bytes available for reading + */ + val readRemaining: Int + get() = writePosition - readPosition + + /** + * Free space left for writing + */ + val writeRemaining: Int + get() = capacity - writePosition + + /** + * ensure there is enough write capacity for [minBytes] + */ + private fun ensureWriteCapacity(minBytes: Int) { + if (writeRemaining >= minBytes) return + + val newSize = (memory.size32 * 3 + 1) / 2 + memory = DefaultAllocator.realloc(memory, newSize) + } + + /** + * Discard [count] readable bytes + */ + fun discard(count: Int): Int { + require(count >= 0) { "cannot discard $count bytes; amount must be positive" } + val size = minOf(count, readRemaining) + state.readHead += size + return size + } + + /** + * Rewind [readPosition] making [count] bytes available for reading again + */ + fun rewind(count: Int = readPosition) { + val size = minOf(count, readPosition) + if (size <= 0) return + + state.readHead -= size + } + + /** + * Reset [readPosition] and [writePosition] making all bytes available for write and no bytes available to read + */ + fun reset() { + state.readHead = 0 + state.writeHead = 0 + } + + /** + * mark [count] bytes written and advance the [writePosition] by the same amount + */ + internal fun commitWritten(count: Int) { + if (count <= 0) return + require(count <= writeRemaining) { "Unable to write $count bytes; only $writeRemaining write capacity left" } + state.writeHead += count + } +} + +/** + * @return `true` if there are bytes to be read + */ +public inline val SdkBuffer.canRead: Boolean + get() = writePosition > readPosition + +/** + * @return `true` if there is any free space to write + */ +public inline val SdkBuffer.canWrite: Boolean + get() = writePosition < capacity + +/** + * Read from this buffer exactly [length] bytes and write to [dest] starting at [offset] + * @throws IllegalArgumentException if there are not enough bytes available for read or the offset/length combination is invalid + */ +fun SdkBuffer.readFully(dest: ByteArray, offset: Int = 0, length: Int = dest.size - offset) { + require(readRemaining >= length) { "Not enough bytes to read a ByteArray of size $length" } + require(offset >= 0) { "Invalid read offset, must be positive: $offset" } + require(offset + length <= dest.size) { "Invalid read: offset + length should be less than the destination size: $offset + $length < ${dest.size}" } + read { memory, readStart, _ -> + memory.loadByteArray(readStart, dest, offset, length) + length + } +} + +/** + * Read all available bytes from this buffer into [dest] starting at [offset] up to the destination buffers size. + * If the total bytes available is less than [length] then as many bytes as are available will be read. + * The total bytes read is returned or `-1` if no data is available. + */ +fun SdkBuffer.readAvailable(dest: ByteArray, offset: Int = 0, length: Int = dest.size - offset): Int { + if (!canRead) return -1 + + val rc = minOf(length, readRemaining) + readFully(dest, offset, rc) + return rc +} + +/** + * Write [length] bytes of [src] to this buffer starting at [offset] + * @throws IllegalArgumentException if there is insufficient space or the offset/length combination is invalid + */ +fun SdkBuffer.writeFully(src: ByteArray, offset: Int = 0, length: Int = src.size - offset) { + require(writeRemaining > length) { "Insufficient space to write $length bytes; capacity available: $writeRemaining" } + require(offset >= 0) { "Invalid write offset, must be positive" } + require(offset + length <= src.size) { "Invalid write: offset + length should be less than the source size: $offset + $length < ${src.size}" } + write { memory, writeStart, _ -> + memory.storeByteArray(writeStart, src, offset, length) + length + } +} + +/** + * Reads [length] bytes from this buffer into the [dst] buffer + * @return the number of bytes read + */ +@OptIn(ExperimentalIoApi::class) +fun SdkBuffer.readFully(dst: SdkBuffer, length: Int = dst.writeRemaining): Int { + require(length >= 0) + require(length <= dst.writeRemaining) + return read { memory, readStart, _ -> + memory.copyTo(dst.memory, readStart, length, dst.writePosition) + dst.commitWritten(length) + length + } +} + +/** + * Reads at most [length] bytes from this buffer or `-1` if no bytes are available for read. + * @return the number of bytes read or -1 if the buffer is empty + */ +@OptIn(ExperimentalIoApi::class) +fun SdkBuffer.readAvailable(dst: SdkBuffer, length: Int = dst.writeRemaining): Int { + if (!canRead) return -1 + val rc = minOf(dst.writeRemaining, readRemaining, length) + return readFully(dst, rc) +} + +/** + * Write at most [length] bytes from [src] to this buffer + */ +@OptIn(ExperimentalIoApi::class) +fun SdkBuffer.writeFully(src: SdkBuffer, length: Int = src.readRemaining) { + require(length >= 0) { "length must be positive: $length" } + require(length <= src.readRemaining) { + "not enough bytes in source buffer to read $length bytes (${src.readRemaining} remaining)" + } + require(length <= writeRemaining) { + "Insufficient space to write $length bytes; capacity available: $writeRemaining" + } + + write { memory, writeStart, _ -> + src.memory.copyTo(memory, src.readPosition, length, writeStart) + src.discard(length) + } +} + +/** + * Write the bytes of [str] as UTF-8 + */ +fun SdkBuffer.write(str: String) = writeFully(str.encodeToByteArray()) + +/** + * Read the available (unread) contents as a UTF-8 string + */ +fun SdkBuffer.decodeToString() = bytes().decodeToString() + +/** + * Get the available (unread) contents as a ByteArray. + * + * NOTE: This may or may not create a new backing array and perform a copy depending on the platform + * and current buffer status. + */ +expect fun SdkBuffer.bytes(): ByteArray + +@OptIn(ExperimentalIoApi::class) +private inline fun SdkBuffer.read(block: (memory: Memory, readStart: Int, endExclusive: Int) -> Int): Int { + val rc = block(memory, readPosition, writePosition) + discard(rc) + return rc +} + +@OptIn(ExperimentalIoApi::class) +private inline fun SdkBuffer.write(block: (memory: Memory, writeStart: Int, endExclusive: Int) -> Int): Int { + val wc = block(memory, writePosition, capacity) + commitWritten(wc) + return wc +} + +// FIXME - are we allowing this buffer to grow on it's own? diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/AllocatorTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/AllocatorTest.kt new file mode 100644 index 000000000..effffe19d --- /dev/null +++ b/client-runtime/io/common/test/software/aws/clientrt/io/AllocatorTest.kt @@ -0,0 +1,39 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import io.ktor.utils.io.bits.* +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class AllocatorTest { + @Test + fun testRealloc() { + val contents = byteArrayOf(5, 6, 7) + val m1 = DefaultAllocator.alloc(128) + m1.storeByteArray(2, contents) + val m2 = DefaultAllocator.realloc(m1, 512) + val buf = ByteArray(3) + m2.loadByteArray(2, buf) + assertTrue { buf.contentEquals(contents) } + DefaultAllocator.free(m2) + } + + @Test + fun testCeilPower2() { + val tests = listOf( + 0 to 0, + 1 to 1, + 2 to 2, + 3 to 4, + 4 to 4, + 5 to 8, + 17 to 32 + ) + tests.forEach { (input, expected) -> assertEquals(expected, ceilp2(input)) } + } +} diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt new file mode 100644 index 000000000..8a88ca6c1 --- /dev/null +++ b/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt @@ -0,0 +1,240 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class SdkBufferTest { + @Test + fun testCtor() { + val buf = SdkBuffer(128) + assertEquals(128, buf.capacity) + assertEquals(128, buf.writeRemaining) + assertEquals(0, buf.readPosition) + assertEquals(0, buf.writePosition) + assertEquals(0, buf.readRemaining) + } + + @Test + fun testDiscard() { + val buf = SdkBuffer(128) + assertFailsWith("cannot discard -12 bytes; amount must be positive") { + buf.discard(-12) + } + + assertEquals(0, buf.discard(12)) + assertEquals(0, buf.readPosition) + + buf.commitWritten(30) + assertEquals(12, buf.discard(12)) + assertEquals(12, buf.readPosition) + assertEquals(18, buf.readRemaining) + } + + @Test + fun testCommitWritten() { + val buf = SdkBuffer(128) + buf.commitWritten(30) + buf.discard(2) + assertEquals(2, buf.readPosition) + assertEquals(28, buf.readRemaining) + assertEquals(98, buf.writeRemaining) + assertEquals(30, buf.writePosition) + + assertFailsWith("Unable to write 212 bytes; only 98 write capacity left") { + buf.commitWritten(212) + } + } + + @Test + fun testRewind() { + val buf = SdkBuffer(128) + buf.commitWritten(30) + buf.discard(30) + assertEquals(0, buf.readRemaining) + assertEquals(30, buf.readPosition) + buf.rewind(10) + assertEquals(10, buf.readRemaining) + assertEquals(20, buf.readPosition) + + // past the beginning + buf.rewind(1024) + assertEquals(30, buf.readRemaining) + assertEquals(0, buf.readPosition) + + // test full rewind (default) + buf.reset() + buf.commitWritten(30) + buf.discard(20) + assertEquals(10, buf.readRemaining) + assertEquals(20, buf.readPosition) + buf.rewind() + assertEquals(30, buf.readRemaining) + assertEquals(0, buf.readPosition) + } + + @Test + fun testReset() { + val buf = SdkBuffer(128) + buf.commitWritten(30) + buf.discard(30) + assertEquals(0, buf.readRemaining) + assertEquals(30, buf.readPosition) + + buf.reset() + assertEquals(0, buf.writePosition) + assertEquals(0, buf.readPosition) + assertEquals(128, buf.writeRemaining) + assertEquals(0, buf.readRemaining) + } + + @Test + fun testReadFullyNotEnoughRemaining() { + val buf = SdkBuffer(16) + buf.commitWritten(12) + val sink = ByteArray(32) + assertFailsWith("Not enough bytes to read a ByteArray of size 32") { + buf.readFully(sink) + } + } + + @Test + fun testReadFullyInvalidOffset() { + val buf = SdkBuffer(16) + buf.commitWritten(12) + val sink = ByteArray(32) + assertFailsWith("Invalid read offset, must be positive: -2") { + buf.readFully(sink, offset = -2) + } + } + + @Test + fun testReadFullyInvalidLengthAndOffset() { + val buf = SdkBuffer(16) + buf.commitWritten(12) + val sink = ByteArray(8) + assertFailsWith( + "Invalid read: offset + length should be less than the destination size: 7 + 4 < 8" + ) { + buf.readFully(sink, offset = 7, length = 4) + } + } + + @Test + fun testReadFully() { + val buf = SdkBuffer(8) + val contents = "Mad dog" + buf.write(contents) + val sink = ByteArray(8) + buf.readFully(sink, length = 7) + assertEquals(0, buf.readRemaining) + assertEquals(7, buf.readPosition) + assertEquals(7, buf.writePosition) + assertEquals(1, buf.writeRemaining) + + assertEquals(contents, sink.sliceArray(0..6).decodeToString()) + + // write at an offset + buf.reset() + buf.write(contents) + buf.readFully(sink, offset = 2, length = 5) + assertEquals("Mad d", sink.sliceArray(2..6).decodeToString()) + } + + @Test + fun testReadAvailable() { + val buf = SdkBuffer(8) + val contents = "Mad dog" + buf.writeFully(contents.encodeToByteArray()) + val sink = ByteArray(8) + val rc = buf.readAvailable(sink) + assertEquals(7, rc) + + // nothing left to read + assertEquals(-1, buf.readAvailable(sink)) + + buf.reset() + buf.write(contents) + val sink2 = ByteArray(16) + assertEquals(7, buf.readAvailable(sink2, offset = 2, length = 12)) + assertEquals(contents, sink2.sliceArray(2..8).decodeToString()) + } + + @Test + fun testWriteFully() { + val buf = SdkBuffer(128) + val contents = "is it morning or is it night, the software engineer doesn't know anymore" + buf.writeFully(contents.encodeToByteArray()) + val sink = ByteArray(buf.readRemaining) + buf.readFully(sink) + assertEquals(contents, sink.decodeToString()) + } + + @Test + fun testWriteFullyInsufficientSpace() { + val buf = SdkBuffer(16) + val contents = "is it morning or is it night, the software engineer doesn't know anymore" + assertFailsWith ( + "Insufficient space to write ${contents.length} bytes; capacity available: 16" + ) { + buf.writeFully(contents.encodeToByteArray()) + } + } + + @Test + fun testWriteFullyPastDestSize() { + val buf = SdkBuffer(16) + val contents = byteArrayOf(1, 2, 3, 4, 5) + assertFailsWith( + "Invalid write: offset + length should be less than the source size: 2 + 4 < 5" + ) { + buf.writeFully(contents, offset = 2, length = 4) + } + } + + @Test + fun testReadWriteString() { + val buf = SdkBuffer(128) + val contents = "foo bar baz" + buf.write(contents) + assertEquals(contents.length, buf.readRemaining) + val actual = buf.decodeToString() + assertEquals(contents, actual) + } + + @Test + fun testReadAvailableSdkBuffer() { + val buf = SdkBuffer(8) + val contents = "Mad dog" + buf.writeFully(contents.encodeToByteArray()) + val sink = SdkBuffer(8) + val rc = buf.readAvailable(sink) + assertEquals(7, rc) + + // nothing left to read + assertEquals(-1, buf.readAvailable(sink)) + } + + @Test + fun testReadFullySdkBuffer() { + val buf = SdkBuffer(8) + val contents = "Mad dog" + buf.write(contents) + val sink = SdkBuffer(8) + buf.readFully(sink, length = 7) + assertEquals(0, buf.readRemaining) + assertEquals(7, buf.readPosition) + assertEquals(7, buf.writePosition) + assertEquals(1, buf.writeRemaining) + + assertEquals(1, sink.writeRemaining) + assertEquals(7, sink.writePosition) + assertEquals(7, sink.readRemaining) + assertEquals(0, sink.readPosition) + } +} diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelOpsTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelOpsTest.kt index 19e9939fa..1be78fff1 100644 --- a/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelOpsTest.kt +++ b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelOpsTest.kt @@ -66,4 +66,16 @@ class SdkByteChannelOpsTest { val dst = SdkByteChannel(false) assertEquals(0, chan.copyTo(dst, limit = 0)) } + + @Test + fun testReadFromClosedChannel() = runSuspendTest { + val chan = SdkByteReadChannel(byteArrayOf(1, 2, 3, 4, 5)) + val buffer = ByteArray(3) + var rc = chan.readAvailable(buffer) + assertEquals(3, rc) + chan.close() + + rc = chan.readAvailable(buffer) + assertEquals(2, rc) + } } diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/AllocatorJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/AllocatorJVM.kt new file mode 100644 index 000000000..e28438c7e --- /dev/null +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/AllocatorJVM.kt @@ -0,0 +1,15 @@ +// ktlint-disable filename +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +import io.ktor.utils.io.bits.* +import java.nio.ByteBuffer + +internal actual object DefaultAllocator : Allocator { + override fun alloc(size: Int): Memory = Memory.of(ByteBuffer.allocate(size)) + override fun free(instance: Memory) {} +} diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkBufferJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkBufferJVM.kt new file mode 100644 index 000000000..bab9b9200 --- /dev/null +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkBufferJVM.kt @@ -0,0 +1,15 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package software.aws.clientrt.io + +internal fun SdkBuffer.hasArray() = memory.buffer.hasArray() && !memory.buffer.isReadOnly + +actual fun SdkBuffer.bytes(): ByteArray { + return when (hasArray()) { + true -> memory.buffer.array() + false -> ByteArray(readRemaining).apply { readFully(this) } + } +} diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt index f5ab7d476..253b3f0a5 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt @@ -7,6 +7,7 @@ package software.aws.clientrt.io import java.nio.ByteBuffer +import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel /** * Supplies a stream of bytes. Use this interface to read data from wherever it’s located: from the network, storage, or a buffer in memory. @@ -50,3 +51,8 @@ actual interface SdkByteReadChannel : Closeable { override fun close() { cancel(null) } } + +/** + * Creates a channel for reading from the given buffer + */ +fun SdkByteReadChannel(content: ByteBuffer): SdkByteReadChannel = KtorByteReadChannel(content).toSdkChannel() diff --git a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt index cf9bdb6a1..87f219bf4 100644 --- a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt +++ b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorRequestAdapter.kt @@ -66,6 +66,7 @@ internal class KtorRequestAdapter( return object : OutgoingContent.ReadChannelContent() { override val contentType: ContentType? = contentType override val contentLength: Long? = body.contentLength + // FIXME - ensure the `source` is closed? override fun readFrom(): ByteReadChannel { // FIXME - instead of reading and writing bytes we could probably proxy the underlying channel From 9b477e7968a6252d908a326f49da2a5eacfe398c Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 31 Mar 2021 09:31:10 -0400 Subject: [PATCH 13/23] implicitly grow buffer as needed --- .../src/software/aws/clientrt/io/SdkBuffer.kt | 67 +++++++++++-------- .../software/aws/clientrt/io/SdkBufferTest.kt | 46 +++++++++++-- 2 files changed, 79 insertions(+), 34 deletions(-) diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt index 3678956a8..ad575e111 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt @@ -15,16 +15,23 @@ private class SdkBufferState { } /** - * A buffer with read and write positions + * A buffer with read and write positions. Similar in spirit to `java.nio.ByteBuffer` but for use + * in Kotlin Multiplatform. + * + * Unlike `ByteBuffer`, this buffer will implicitly grow as needed to fulfill write requests. + * However, explicitly reserving the required space up-front before a series of writes will be + * more efficient. * * **concurrent unsafe**: Do not read/write using the same [SdkBuffer] instance from different threads. */ @OptIn(ExperimentalIoApi::class) @InternalApi -class SdkBuffer( - capacity: Int -) { - internal var memory = DefaultAllocator.alloc(capacity) +class SdkBuffer(initialCapacity: Int) { + + // we make use of ktor-io's `Memory` type which already implements most of the functionality in a platform + // agnostic way. We just need to wrap some methods around it + internal var memory = DefaultAllocator.alloc(initialCapacity) + private val state = SdkBufferState() /** @@ -58,12 +65,16 @@ class SdkBuffer( get() = capacity - writePosition /** - * ensure there is enough write capacity for [minBytes] + * Reserve capacity for at least [count] bytes to be written. + * + * More than [count] bytes may be allocated in order to avoid frequent re-allocations. */ - private fun ensureWriteCapacity(minBytes: Int) { - if (writeRemaining >= minBytes) return + fun reserve(count: Int) { + if (writeRemaining >= count) return - val newSize = (memory.size32 * 3 + 1) / 2 + val minp2 = ceilp2(count) + val currp2 = ceilp2(memory.size32 + 1) + val newSize = maxOf(minp2, currp2) memory = DefaultAllocator.realloc(memory, newSize) } @@ -111,12 +122,6 @@ class SdkBuffer( public inline val SdkBuffer.canRead: Boolean get() = writePosition > readPosition -/** - * @return `true` if there is any free space to write - */ -public inline val SdkBuffer.canWrite: Boolean - get() = writePosition < capacity - /** * Read from this buffer exactly [length] bytes and write to [dest] starting at [offset] * @throws IllegalArgumentException if there are not enough bytes available for read or the offset/length combination is invalid @@ -149,27 +154,28 @@ fun SdkBuffer.readAvailable(dest: ByteArray, offset: Int = 0, length: Int = dest * @throws IllegalArgumentException if there is insufficient space or the offset/length combination is invalid */ fun SdkBuffer.writeFully(src: ByteArray, offset: Int = 0, length: Int = src.size - offset) { - require(writeRemaining > length) { "Insufficient space to write $length bytes; capacity available: $writeRemaining" } require(offset >= 0) { "Invalid write offset, must be positive" } require(offset + length <= src.size) { "Invalid write: offset + length should be less than the source size: $offset + $length < ${src.size}" } - write { memory, writeStart, _ -> + writeSized(length) { memory, writeStart -> memory.storeByteArray(writeStart, src, offset, length) length } } /** - * Reads [length] bytes from this buffer into the [dst] buffer + * Reads at most [length] bytes from this buffer into the [dst] buffer * @return the number of bytes read */ @OptIn(ExperimentalIoApi::class) fun SdkBuffer.readFully(dst: SdkBuffer, length: Int = dst.writeRemaining): Int { require(length >= 0) - require(length <= dst.writeRemaining) + val rc = minOf(readRemaining, length) + if (rc == 0) return 0 return read { memory, readStart, _ -> - memory.copyTo(dst.memory, readStart, length, dst.writePosition) - dst.commitWritten(length) - length + dst.reserve(rc) + memory.copyTo(dst.memory, readStart, rc, dst.writePosition) + dst.commitWritten(rc) + rc } } @@ -180,7 +186,7 @@ fun SdkBuffer.readFully(dst: SdkBuffer, length: Int = dst.writeRemaining): Int { @OptIn(ExperimentalIoApi::class) fun SdkBuffer.readAvailable(dst: SdkBuffer, length: Int = dst.writeRemaining): Int { if (!canRead) return -1 - val rc = minOf(dst.writeRemaining, readRemaining, length) + val rc = minOf(readRemaining, length) return readFully(dst, rc) } @@ -193,11 +199,8 @@ fun SdkBuffer.writeFully(src: SdkBuffer, length: Int = src.readRemaining) { require(length <= src.readRemaining) { "not enough bytes in source buffer to read $length bytes (${src.readRemaining} remaining)" } - require(length <= writeRemaining) { - "Insufficient space to write $length bytes; capacity available: $writeRemaining" - } - write { memory, writeStart, _ -> + writeSized(length) { memory, writeStart -> src.memory.copyTo(memory, src.readPosition, length, writeStart) src.discard(length) } @@ -211,7 +214,7 @@ fun SdkBuffer.write(str: String) = writeFully(str.encodeToByteArray()) /** * Read the available (unread) contents as a UTF-8 string */ -fun SdkBuffer.decodeToString() = bytes().decodeToString() +fun SdkBuffer.decodeToString() = bytes().decodeToString(0, readRemaining) /** * Get the available (unread) contents as a ByteArray. @@ -235,4 +238,10 @@ private inline fun SdkBuffer.write(block: (memory: Memory, writeStart: Int, endE return wc } -// FIXME - are we allowing this buffer to grow on it's own? +@OptIn(ExperimentalIoApi::class) +private inline fun SdkBuffer.writeSized(count: Int, block: (memory: Memory, writeStart: Int) -> Int): Int { + reserve(count) + return write { memory, writeStart, _ -> + block(memory, writeStart) + } +} diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt index 8a88ca6c1..731aa543d 100644 --- a/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt +++ b/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt @@ -179,11 +179,31 @@ class SdkBufferTest { fun testWriteFullyInsufficientSpace() { val buf = SdkBuffer(16) val contents = "is it morning or is it night, the software engineer doesn't know anymore" - assertFailsWith ( - "Insufficient space to write ${contents.length} bytes; capacity available: 16" - ) { - buf.writeFully(contents.encodeToByteArray()) - } + assertEquals(16, buf.capacity) + buf.writeFully(contents.encodeToByteArray()) + // content is 72 bytes. next power of 2 is greater than exp growth of current buffer + assertEquals(128, buf.capacity) + + val buf2 = SdkBuffer(16) + assertEquals(16, buf2.capacity) + buf2.commitWritten(12) + val smallContent = byteArrayOf(1, 2, 3, 4, 5) + buf2.writeFully(smallContent) + // doubling the current capacity is greater + assertEquals(32, buf2.capacity) + } + + @Test + fun testReserve() { + val buf = SdkBuffer(8) + assertEquals(8, buf.capacity) + buf.reserve(5) + assertEquals(8, buf.capacity) + buf.reserve(12) + assertEquals(16, buf.capacity) + + buf.reserve(72) + assertEquals(128, buf.capacity) } @Test @@ -237,4 +257,20 @@ class SdkBufferTest { assertEquals(7, sink.readRemaining) assertEquals(0, sink.readPosition) } + + @Test + fun testWriteFullySdkBuffer() { + val src = SdkBuffer(16) + src.write("buffers are fun!") + assertEquals(16, src.readRemaining) + assertEquals(16, src.capacity) + + val dest = SdkBuffer(8) + assertEquals(8, dest.capacity) + + dest.writeFully(src) + assertEquals(0, src.readRemaining) + assertEquals(16, dest.readRemaining) + assertEquals(16, dest.capacity) + } } From 94f2e1358fb445a52b0fb530c4044b85d639c552 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 31 Mar 2021 10:14:22 -0400 Subject: [PATCH 14/23] minor cleanup --- .../io/common/src/software/aws/clientrt/io/SdkBuffer.kt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt index ad575e111..3abac6802 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt @@ -6,6 +6,7 @@ package software.aws.clientrt.io import io.ktor.utils.io.bits.* +import io.ktor.utils.io.charsets.* import io.ktor.utils.io.core.* import software.aws.clientrt.util.InternalApi @@ -27,6 +28,8 @@ private class SdkBufferState { @OptIn(ExperimentalIoApi::class) @InternalApi class SdkBuffer(initialCapacity: Int) { + // TODO - we could implement Appendable but we would need to deal with Char as UTF-16 character + // (e.g. convert code points to number of bytes and write the correct utf bytes 1..4) // we make use of ktor-io's `Memory` type which already implements most of the functionality in a platform // agnostic way. We just need to wrap some methods around it @@ -72,9 +75,9 @@ class SdkBuffer(initialCapacity: Int) { fun reserve(count: Int) { if (writeRemaining >= count) return - val minp2 = ceilp2(count) - val currp2 = ceilp2(memory.size32 + 1) - val newSize = maxOf(minp2, currp2) + val minP2 = ceilp2(count) + val currP2 = ceilp2(memory.size32 + 1) + val newSize = maxOf(minP2, currP2) memory = DefaultAllocator.realloc(memory, newSize) } From dbc6b51795c72e4ecabcd9f7c48fe17466152fb1 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 31 Mar 2021 10:37:03 -0400 Subject: [PATCH 15/23] cleanup potential for overflow --- .../software/aws/clientrt/content/ByteStream.kt | 2 +- .../src/software/aws/clientrt/io/KtorAdapters.kt | 4 ++-- .../src/software/aws/clientrt/io/SdkBuffer.kt | 1 + .../software/aws/clientrt/io/SdkByteReadChannel.kt | 14 +++++++------- .../aws/clientrt/io/SdkByteChannelSmokeTest.kt | 2 +- .../aws/clientrt/io/SdkByteReadChannelJVM.kt | 7 +++++-- .../aws/clientrt/http/engine/ktor/KtorUtilsTest.kt | 2 +- .../src/software/aws/clientrt/http/HttpBody.kt | 2 +- 8 files changed, 19 insertions(+), 15 deletions(-) diff --git a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt index b88330442..c0dc02958 100644 --- a/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt +++ b/client-runtime/client-rt-core/common/src/software/aws/clientrt/content/ByteStream.kt @@ -52,7 +52,7 @@ sealed class ByteStream { suspend fun ByteStream.toByteArray(): ByteArray { return when (val stream = this) { is ByteStream.Buffer -> stream.bytes() - is ByteStream.Reader -> stream.readFrom().readAll() + is ByteStream.Reader -> stream.readFrom().readRemaining() } } diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt index 58a7781cb..979dab870 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt @@ -37,8 +37,8 @@ internal abstract class KtorReadChannelAdapterBase( override val isClosedForWrite: Boolean get() = chan.isClosedForWrite - override suspend fun readAll(): ByteArray { - return chan.readRemaining().readBytes() + override suspend fun readRemaining(limit: Int): ByteArray { + return chan.readRemaining(limit.toLong()).readBytes() } override suspend fun readFully(sink: ByteArray, offset: Int, length: Int) { diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt index 3abac6802..3d4a01f43 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkBuffer.kt @@ -212,6 +212,7 @@ fun SdkBuffer.writeFully(src: SdkBuffer, length: Int = src.readRemaining) { /** * Write the bytes of [str] as UTF-8 */ +// TODO - remove in favor of implementing Appendable in such a way as to not allocate an entire new byte array fun SdkBuffer.write(str: String) = writeFully(str.encodeToByteArray()) /** diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt index e2f1fd638..d409cd285 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt @@ -30,15 +30,15 @@ expect interface SdkByteReadChannel : Closeable { */ val isClosedForWrite: Boolean - // FIXME - replace with readRemaining(limit: Long): ByteArray - // this blocks until EOF which means you can only invoke this on a closed channel currently. - // Without a limit it will _always_ block when channel isn't closed - /** - * Read the entire content into a [ByteArray]. NOTE: Be careful this will read the entire byte stream - * into memory. + * Read up to [limit] bytes into a [ByteArray] suspending until [limit] is reached or the channel + * is closed. + * + * NOTE: Be careful as this will potentially read the entire byte stream into memory (up to limit) + * + * Check [availableForRead] and/or [isClosedForRead] to see if there is additional data left */ - suspend fun readAll(): ByteArray + suspend fun readRemaining(limit: Int = Int.MAX_VALUE): ByteArray /** * Reads all length bytes to [sink] buffer or fails if source has been closed. Suspends if not enough diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt index 1990fb5e7..dffd36f14 100644 --- a/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt +++ b/client-runtime/io/common/test/software/aws/clientrt/io/SdkByteChannelSmokeTest.kt @@ -122,7 +122,7 @@ open class SdkByteChannelSmokeTest { val content = "I meant what I said. And said what I meant. An elephant's faithful. One hundred percent!" chan.writeUtf8(content) chan.close() - val actual = chan.readAll().decodeToString() + val actual = chan.readRemaining().decodeToString() assertEquals(content, actual) } diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt index 253b3f0a5..726569c96 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkByteReadChannelJVM.kt @@ -28,9 +28,12 @@ actual interface SdkByteReadChannel : Closeable { actual val isClosedForWrite: Boolean /** - * Read the entire content into a [ByteArray]. NOTE: Be careful this will read the entire byte stream into memory. + * Read up to [limit] bytes into a [ByteArray] suspending until [limit] is reached or the channel + * is closed. + * + * NOTE: Be careful as this will potentially read the entire byte stream into memory (up to limit) */ - actual suspend fun readAll(): ByteArray + actual suspend fun readRemaining(limit: Int): ByteArray /** * Reads all length bytes to [sink] buffer or fails if source has been closed. Suspends if not enough bytes available. diff --git a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/test/software/aws/clientrt/http/engine/ktor/KtorUtilsTest.kt b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/test/software/aws/clientrt/http/engine/ktor/KtorUtilsTest.kt index 8e46102c3..98252209d 100644 --- a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/test/software/aws/clientrt/http/engine/ktor/KtorUtilsTest.kt +++ b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/test/software/aws/clientrt/http/engine/ktor/KtorUtilsTest.kt @@ -130,7 +130,7 @@ class KtorUtilsTest { channel.close() val content = KtorContentStream(channel, notify) - val actual = content.readAll() + val actual = content.readRemaining() assertEquals(bytes.size, actual.size) called.shouldBeTrue() } diff --git a/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt b/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt index 3e263ca59..31f67f620 100644 --- a/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt +++ b/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt @@ -72,7 +72,7 @@ fun ByteStream.toHttpBody(): HttpBody { suspend fun HttpBody.readAll(): ByteArray? = when (this) { is HttpBody.Empty -> null is HttpBody.Bytes -> this.bytes() - is HttpBody.Streaming -> this.readFrom().readAll() + is HttpBody.Streaming -> this.readFrom().readRemaining() } /** From cfc2756b137d4938c3eca8c6e97acb68db0629da Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Wed, 31 Mar 2021 10:52:27 -0400 Subject: [PATCH 16/23] notes for posterity --- .../src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt | 4 ++-- .../codegen/integration/HttpBindingProtocolGenerator.kt | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt index 623a41e3a..40d54bbda 100644 --- a/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt +++ b/client-runtime/protocol/http-client-engines/http-client-engine-ktor/jvm/src/software/aws/clientrt/http/engine/ktor/KtorUtils.kt @@ -72,8 +72,8 @@ internal class KtorContentStream(private val channel: ByteReadChannel, private v override val isClosedForWrite: Boolean get() = channel.isClosedForWrite - override suspend fun readAll(): ByteArray { - val packet = channel.readRemaining() + override suspend fun readRemaining(limit: Int): ByteArray { + val packet = channel.readRemaining(limit.toLong()) notifyIfExhausted() return packet.readBytes() } diff --git a/smithy-kotlin-codegen/src/main/kotlin/software/amazon/smithy/kotlin/codegen/integration/HttpBindingProtocolGenerator.kt b/smithy-kotlin-codegen/src/main/kotlin/software/amazon/smithy/kotlin/codegen/integration/HttpBindingProtocolGenerator.kt index 9830182c4..eb8202d2b 100644 --- a/smithy-kotlin-codegen/src/main/kotlin/software/amazon/smithy/kotlin/codegen/integration/HttpBindingProtocolGenerator.kt +++ b/smithy-kotlin-codegen/src/main/kotlin/software/amazon/smithy/kotlin/codegen/integration/HttpBindingProtocolGenerator.kt @@ -609,6 +609,8 @@ abstract class HttpBindingProtocolGenerator : ProtocolGenerator { .map { it.member } if (documentMembers.isNotEmpty()) { + // FIXME - we should not be slurping the entire contents into memory, instead our deserializers + // should work off of an SdkByteReadChannel writer.write("val payload = response.body.readAll()") writer.withBlock("if (payload != null) {", "}") { writer.write("val deserializer = context.deserializer(payload)") From ead19c7afa200d03243c18c0c18b9123dd4f33d4 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Thu, 1 Apr 2021 09:20:30 -0400 Subject: [PATCH 17/23] rename toFile() -> writeToFile() --- .../jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt index 61020aea1..be62b94a0 100644 --- a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt +++ b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt @@ -34,8 +34,9 @@ fun Path.asByteStream(): ByteStream { /** * Write the contents of this ByteStream to file and close it + * @return the number of bytes written */ -suspend fun ByteStream.toFile(file: File): Long { +suspend fun ByteStream.writeToFile(file: File): Long { require(file.isFile) { "cannot write contents of ByteStream to a directory: ${file.absolutePath}" } val writer = file.writeChannel() val src = when (this) { @@ -53,5 +54,6 @@ suspend fun ByteStream.toFile(file: File): Long { /** * Write the contents of this ByteStream to file at the given path + * @return the number of bytes written */ -suspend fun ByteStream.toFile(path: Path): Long = toFile(path.toFile()) +suspend fun ByteStream.writeToFile(path: Path): Long = writeToFile(path.toFile()) From 79c308b06d73df68c4077a7ba168c75df9d15b96 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Mon, 5 Apr 2021 10:08:04 -0400 Subject: [PATCH 18/23] remove api dependency requirement by hard coding to Dispatchers.IO --- client-runtime/io/build.gradle.kts | 4 +--- .../src/software/aws/clientrt/io/FileChannels.kt | 14 ++++---------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/client-runtime/io/build.gradle.kts b/client-runtime/io/build.gradle.kts index b07ad8c3d..7247a8c01 100644 --- a/client-runtime/io/build.gradle.kts +++ b/client-runtime/io/build.gradle.kts @@ -18,9 +18,7 @@ kotlin { dependencies { implementation(project(":client-runtime:utils")) implementation("io.ktor:ktor-io:$ktorVersion") - - // Dispatchers.IO - api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion") } } diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt index faccbf0fd..ddf8edc12 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/FileChannels.kt @@ -9,7 +9,6 @@ import kotlinx.coroutines.Dispatchers import software.aws.clientrt.util.InternalApi import java.io.File import java.nio.file.Path -import kotlin.coroutines.CoroutineContext import io.ktor.util.cio.readChannel as cioReadChannel import io.ktor.util.cio.writeChannel as cioWriteChannel @@ -17,15 +16,13 @@ import io.ktor.util.cio.writeChannel as cioWriteChannel * Open a read channel for file and launch a coroutine to fill it. * Please note that file reading is blocking so if you are starting it on [Dispatchers.Unconfined] it may block * your async code and freeze the whole application when runs on a pool that is not intended for blocking operations. - * This is why [coroutineContext] should have [Dispatchers.IO] or - * a coroutine dispatcher that is properly configured for blocking IO. + * NOTE: Always runs on [Dispatchers.IO] */ @InternalApi public fun File.readChannel( start: Long = 0, endInclusive: Long = -1, - coroutineContext: CoroutineContext = Dispatchers.IO -): SdkByteReadChannel = cioReadChannel(start, endInclusive, coroutineContext).toSdkChannel() +): SdkByteReadChannel = cioReadChannel(start, endInclusive).toSdkChannel() /** * Open a read channel for file and launch a coroutine to fill it. @@ -48,10 +45,7 @@ public fun Path.readChannel(): SdkByteReadChannel = toFile().readChannel() * Open a write channel for the file and launch a coroutine to read from it. * Please note that file writing is blocking so if you are starting it on [Dispatchers.Unconfined] it may block * your async code and freeze the whole application when runs on a pool that is not intended for blocking operations. - * This is why [coroutineContext] should have [Dispatchers.IO] or - * a coroutine dispatcher that is properly configured for blocking IO. + * NOTE: Always runs on [Dispatchers.IO] */ @InternalApi -public fun File.writeChannel( - coroutineContext: CoroutineContext = Dispatchers.IO -): SdkByteWriteChannel = cioWriteChannel(coroutineContext).toSdkChannel() +public fun File.writeChannel(): SdkByteWriteChannel = cioWriteChannel().toSdkChannel() From a515957649d8422fbc8fc577971cd200eb360ce6 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Mon, 5 Apr 2021 10:10:36 -0400 Subject: [PATCH 19/23] guard file exists --- .../jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt index be62b94a0..efb72d750 100644 --- a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt +++ b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt @@ -29,6 +29,7 @@ fun File.asByteStream(): ByteStream = LocalFileContent(this) fun Path.asByteStream(): ByteStream { val f = toFile() require(f.isFile) { "cannot create a ByteStream from a directory: $this" } + require(f.exists()) { "cannot create ByteStream, invalid file: $this" } return f.asByteStream() } From 920840f44afaa6c3dbe611bbc5014e7b701fe299 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Mon, 5 Apr 2021 10:12:02 -0400 Subject: [PATCH 20/23] refactor: rename LocalFileContent -> FileContent --- .../jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt | 2 +- .../clientrt/content/{LocalFileContent.kt => FileContent.kt} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/{LocalFileContent.kt => FileContent.kt} (94%) diff --git a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt index efb72d750..8d7d8607d 100644 --- a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt +++ b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/ByteStreamJVM.kt @@ -21,7 +21,7 @@ fun ByteStream.Companion.fromFile(file: File): ByteStream = file.asByteStream() /** * Create a [ByteStream] from a file */ -fun File.asByteStream(): ByteStream = LocalFileContent(this) +fun File.asByteStream(): ByteStream = FileContent(this) /** * Create a [ByteStream] from a path diff --git a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/FileContent.kt similarity index 94% rename from client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt rename to client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/FileContent.kt index 054b17b81..0dba1b630 100644 --- a/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/LocalFileContent.kt +++ b/client-runtime/client-rt-core/jvm/src/software/aws/clientrt/content/FileContent.kt @@ -12,7 +12,7 @@ import java.io.File /** * ByteStream backed by a local [file] */ -public class LocalFileContent( +public class FileContent( public val file: File, ) : ByteStream.Reader() { From 8784db71aea1f72d02787a4931622fb42f67c0d1 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Mon, 5 Apr 2021 10:14:41 -0400 Subject: [PATCH 21/23] refactor: rename ktor marker interface --- .../src/software/aws/clientrt/io/KtorAdapters.kt | 12 ++++++------ .../software/aws/clientrt/io/SdkByteReadChannel.kt | 4 ++-- .../software/aws/clientrt/io/SdkByteWriteChannel.kt | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt index 979dab870..9b3701d9d 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt @@ -12,11 +12,11 @@ import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel import io.ktor.utils.io.ByteWriteChannel as KtorByteWriteChannel // marker interfaces used internally for accessing the underlying ktor impl -internal interface IsKtorReadChannel { +internal interface KtorReadChannel { val chan: KtorByteReadChannel } -internal interface IsKtorWriteChannel { +internal interface KtorWriteChannel { val chan: KtorByteWriteChannel } @@ -26,7 +26,7 @@ internal interface IsKtorWriteChannel { */ internal abstract class KtorReadChannelAdapterBase( override val chan: KtorByteReadChannel -) : SdkByteReadChannel, IsKtorReadChannel { +) : SdkByteReadChannel, KtorReadChannel { override val availableForRead: Int get() = chan.availableForRead @@ -60,7 +60,7 @@ internal abstract class KtorReadChannelAdapterBase( */ internal abstract class KtorWriteChannelAdapterBase( override val chan: KtorByteWriteChannel -) : SdkByteWriteChannel, IsKtorWriteChannel { +) : SdkByteWriteChannel, KtorWriteChannel { override val availableForWrite: Int get() = chan.availableForWrite @@ -98,8 +98,8 @@ internal class KtorByteChannelAdapter( ) : SdkByteChannel, SdkByteReadChannel by KtorReadChannelAdapter(chan), SdkByteWriteChannel by KtorWriteChannelAdapter(chan), - IsKtorWriteChannel, - IsKtorReadChannel { + KtorWriteChannel, + KtorReadChannel { override val isClosedForWrite: Boolean get() = chan.isClosedForWrite diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt index d409cd285..8aaa850d5 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteReadChannel.kt @@ -75,7 +75,7 @@ public suspend fun SdkByteReadChannel.copyTo( if (limit == 0L) return 0L // delegate to ktor-io if possible which may have further optimizations based on impl - val cnt = if (this is IsKtorReadChannel && dst is IsKtorWriteChannel) { + val cnt = if (this is KtorReadChannel && dst is KtorWriteChannel) { chan.copyTo(dst.chan, limit) } else { copyToFallback(dst, limit) @@ -119,7 +119,7 @@ internal suspend fun SdkByteReadChannel.copyToFallback(dst: SdkByteWriteChannel, * Reads a single byte from the channel and suspends until available */ public suspend fun SdkByteReadChannel.readByte(): Byte { - if (this is IsKtorReadChannel) return chan.readByte() + if (this is KtorReadChannel) return chan.readByte() // TODO - we could pool these val out = ByteArray(1) readFully(out) diff --git a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt index d49fd2be4..64067b8a5 100644 --- a/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt +++ b/client-runtime/io/common/src/software/aws/clientrt/io/SdkByteWriteChannel.kt @@ -70,7 +70,7 @@ public suspend fun SdkByteWriteChannel.writeUtf8(str: String) { * Writes byte and suspends until written */ public suspend fun SdkByteWriteChannel.writeByte(value: Byte) { - if (this is IsKtorWriteChannel) { + if (this is KtorWriteChannel) { chan.writeByte(value) return } From 244d075a3ed9c08637bc4d7d8f6efec373068b85 Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Mon, 5 Apr 2021 10:23:25 -0400 Subject: [PATCH 22/23] fix: bytes() should only return valid bytes --- .../common/test/software/aws/clientrt/io/SdkBufferTest.kt | 8 ++++++++ .../io/jvm/src/software/aws/clientrt/io/SdkBufferJVM.kt | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt b/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt index 731aa543d..9d3839d8c 100644 --- a/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt +++ b/client-runtime/io/common/test/software/aws/clientrt/io/SdkBufferTest.kt @@ -273,4 +273,12 @@ class SdkBufferTest { assertEquals(16, dest.readRemaining) assertEquals(16, dest.capacity) } + + @Test + fun testBytes() { + val buf = SdkBuffer(32) + buf.commitWritten(16) + val bytes = buf.bytes() + assertEquals(16, bytes.size) + } } diff --git a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkBufferJVM.kt b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkBufferJVM.kt index bab9b9200..21e283003 100644 --- a/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkBufferJVM.kt +++ b/client-runtime/io/jvm/src/software/aws/clientrt/io/SdkBufferJVM.kt @@ -9,7 +9,7 @@ internal fun SdkBuffer.hasArray() = memory.buffer.hasArray() && !memory.buffer.i actual fun SdkBuffer.bytes(): ByteArray { return when (hasArray()) { - true -> memory.buffer.array() + true -> memory.buffer.array().sliceArray(readPosition until readRemaining) false -> ByteArray(readRemaining).apply { readFully(this) } } } From d42c1830343ab2b334abbcebf9b21a1df63f21db Mon Sep 17 00:00:00 2001 From: Aaron J Todd Date: Mon, 5 Apr 2021 10:30:23 -0400 Subject: [PATCH 23/23] add state check --- .../src/software/aws/clientrt/http/HttpBody.kt | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt b/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt index 31f67f620..ac0c75ba4 100644 --- a/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt +++ b/client-runtime/protocol/http/common/src/software/aws/clientrt/http/HttpBody.kt @@ -22,7 +22,7 @@ sealed class HttpBody { * Variant of a [HttpBody] without a payload */ object Empty : HttpBody() { - override val contentLength: Long? = 0 + override val contentLength: Long = 0 } /** @@ -72,7 +72,16 @@ fun ByteStream.toHttpBody(): HttpBody { suspend fun HttpBody.readAll(): ByteArray? = when (this) { is HttpBody.Empty -> null is HttpBody.Bytes -> this.bytes() - is HttpBody.Streaming -> this.readFrom().readRemaining() + is HttpBody.Streaming -> { + val readChan = readFrom() + val bytes = readChan.readRemaining() + // readRemaining will read up to `limit` bytes (which is defaulted to Int.MAX_VALUE) or until + // the stream is closed and no more bytes remain. + // This is usually sufficient to consume the stream but technically that's not what it's doing. + // Save us a painful debug session later in the very rare chance this were to occur.. + check(readChan.isClosedForRead) { "failed to read all HttpBody bytes from stream" } + bytes + } } /**