Skip to content

Commit

Permalink
feat(io): implement missing IO runtime primitives (#264)
Browse files Browse the repository at this point in the history
* (refactor): rename `Source` -> `SdkByteReadChannel`
* (feat): add wrappers around `ktor-io` implementation of read/write channels. 
    * NOTE: we are marking the creation of these types as internal but the interfaces are public. This allow us to make use of them in the runtime but customers can only be given an instance of one (we aren't trying to implement a general purpose IO library here for others to use).
    * We are only exposing a _very_ minimal subset of ktor's equivalent `ByteRead/ByteWrite` channels. ktor has lots of extension methods for doing all sorts of things. We can add as needed. Our primary use case is reading/writing to/from sockets/files (usually in larger chunks)
* (feat): Added extensions for `File` / `Path` on JVM to read/write to/from files as a channel (ktor does the heavy lifting here of course)
    * Customers will likely only ever interact with supplying a file as `FileContent` or `ByteStream.writeToFile(...)`
* (feat): Added an `SdkBuffer` type that we can use internally. Similar to `ByteBuffer` but grows as needed.
  • Loading branch information
aajtodd authored Apr 6, 2021
1 parent f6f6f7f commit b083bd9
Show file tree
Hide file tree
Showing 34 changed files with 1,631 additions and 78 deletions.
3 changes: 2 additions & 1 deletion client-runtime/client-rt-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ kotlin {
sourceSets {
commonMain {
dependencies {
implementation(project(":client-runtime:io"))
// io types are exposed as part of content/*
api(project(":client-runtime:io"))
// Attributes property bag is exposed as client options
api(project(":client-runtime:utils"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ package software.aws.clientrt.content
* Container for wrapping a ByteArray as a [ByteStream]
*/
class ByteArrayContent(private val bytes: ByteArray) : ByteStream.Buffer() {
override val contentLength: Long? = bytes.size.toLong()
override val contentLength: Long = bytes.size.toLong()
override fun bytes(): ByteArray = bytes
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
*/
package software.aws.clientrt.content

import software.aws.clientrt.io.Source
import software.aws.clientrt.io.SdkByteReadChannel

/**
* Represents an abstract stream of bytes
* Represents an abstract read-only stream of bytes
*/
sealed class ByteStream {

Expand All @@ -31,9 +31,9 @@ sealed class ByteStream {
*/
abstract class Reader : ByteStream() {
/**
* Provides [Source] to read from/consume
* Provides [SdkByteReadChannel] to read from/consume
*/
abstract fun readFrom(): Source
abstract fun readFrom(): SdkByteReadChannel
}

companion object {
Expand All @@ -52,11 +52,10 @@ sealed class ByteStream {
suspend fun ByteStream.toByteArray(): ByteArray {
return when (val stream = this) {
is ByteStream.Buffer -> stream.bytes()
is ByteStream.Reader -> stream.readFrom().readAll()
is ByteStream.Reader -> stream.readFrom().readRemaining()
}
}

@OptIn(ExperimentalStdlibApi::class)
suspend fun ByteStream.decodeToString(): String = toByteArray().decodeToString()

fun ByteStream.cancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class StringContent(str: String) : ByteStream.Buffer() {
@OptIn(ExperimentalStdlibApi::class)
private val asBytes: ByteArray = str.encodeToByteArray()

override val contentLength: Long? = asBytes.size.toLong()
override val contentLength: Long = asBytes.size.toLong()

override fun bytes(): ByteArray = asBytes
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.aws.clientrt.content

import software.aws.clientrt.io.SdkByteReadChannel
import software.aws.clientrt.io.copyTo
import software.aws.clientrt.io.writeChannel
import java.io.File
import java.nio.file.Path

// JVM specific extensions for dealing with ByteStream's

/**
* Create a [ByteStream] from a file
*/
fun ByteStream.Companion.fromFile(file: File): ByteStream = file.asByteStream()

/**
* Create a [ByteStream] from a file
*/
fun File.asByteStream(): ByteStream = FileContent(this)

/**
* Create a [ByteStream] from a path
*/
fun Path.asByteStream(): ByteStream {
val f = toFile()
require(f.isFile) { "cannot create a ByteStream from a directory: $this" }
require(f.exists()) { "cannot create ByteStream, invalid file: $this" }
return f.asByteStream()
}

/**
* Write the contents of this ByteStream to file and close it
* @return the number of bytes written
*/
suspend fun ByteStream.writeToFile(file: File): Long {
require(file.isFile) { "cannot write contents of ByteStream to a directory: ${file.absolutePath}" }
val writer = file.writeChannel()
val src = when (this) {
is ByteStream.Buffer -> SdkByteReadChannel(bytes())
is ByteStream.Reader -> readFrom()
}

try {
return src.copyTo(writer)
} finally {
writer.close()
src.close()
}
}

/**
* Write the contents of this ByteStream to file at the given path
* @return the number of bytes written
*/
suspend fun ByteStream.writeToFile(path: Path): Long = writeToFile(path.toFile())
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.aws.clientrt.content

import software.aws.clientrt.io.SdkByteReadChannel
import software.aws.clientrt.io.readChannel
import java.io.File

/**
* ByteStream backed by a local [file]
*/
public class FileContent(
public val file: File,
) : ByteStream.Reader() {

override val contentLength: Long
get() = file.length()

override fun readFrom(): SdkByteReadChannel = file.readChannel()
}
12 changes: 12 additions & 0 deletions client-runtime/io/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ description = "IO primitives for Smithy services generated by smithy-kotlin"
extra["displayName"] = "Smithy :: Kotlin :: IO"
extra["moduleName"] = "software.aws.clientrt.io"

val ktorVersion: String by project
val coroutinesVersion: String by project

kotlin {
sourceSets {
commonMain {
dependencies {
implementation(project(":client-runtime:utils"))
implementation("io.ktor:ktor-io:$ktorVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")
}
}

Expand All @@ -22,5 +27,12 @@ kotlin {
implementation(project(":client-runtime:testing"))
}
}

jvmMain {
dependencies {
// file channel utils
implementation("io.ktor:ktor-utils:$ktorVersion")
}
}
}
}
49 changes: 49 additions & 0 deletions client-runtime/io/common/src/software/aws/clientrt/io/Allocator.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.aws.clientrt.io

import io.ktor.utils.io.bits.*
import io.ktor.utils.io.core.*

@OptIn(ExperimentalIoApi::class)
internal interface Allocator {
fun alloc(size: Int): Memory
// FIXME - we should revisit this - Kotlin/Native is only place where we would actually be manually managing memory
// and that story may change to the point where a free() function isn't even necessary
fun free(instance: Memory)
}

// allocate using the most appropriate underlying platform type (e.g. ByteBuffer on JVM, ArrayBuffer on JS, etc)
internal expect object DefaultAllocator : Allocator

/**
* Round up to the next power of 2. [size] should be non-negative
*/
internal fun ceilp2(size: Int): Int {
require(size >= 0) { "must be positive integer" }
var x = size - 1
x = x or (x shr 1)
x = x or (x shr 2)
x = x or (x shr 4)
x = x or (x shr 8)
x = x or (x shr 16)
return x + 1
}

/**
* Allocate new memory of size [newSize], copy the contents of [instance] into it and free [instance]
* and return the newly allocated memory.
*
* The memory of [instance] should no longer be used after calling.
*/
@OptIn(ExperimentalIoApi::class)
internal fun Allocator.realloc(instance: Memory, newSize: Int): Memory {
require(newSize >= instance.size32)
val newInstance = alloc(newSize)
instance.copyTo(newInstance, 0, instance.size32, 0)
free(instance)
return newInstance
}
37 changes: 37 additions & 0 deletions client-runtime/io/common/src/software/aws/clientrt/io/Closeable.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.aws.clientrt.io

// this really should live in the stdlib...
// https://youtrack.jetbrains.com/issue/KT-31066

public expect interface Closeable {
public fun close()
}

public inline fun <C : Closeable, R> C.use(block: (C) -> R): R {
var closed = false

return try {
block(this)
} catch (first: Throwable) {
try {
closed = true
close()
} catch (second: Throwable) {
first.addSuppressedInternal(second)
}

throw first
} finally {
if (!closed) {
close()
}
}
}

@PublishedApi
internal expect fun Throwable.addSuppressedInternal(other: Throwable)
114 changes: 114 additions & 0 deletions client-runtime/io/common/src/software/aws/clientrt/io/KtorAdapters.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

package software.aws.clientrt.io

import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import io.ktor.utils.io.ByteChannel as KtorByteChannel
import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel
import io.ktor.utils.io.ByteWriteChannel as KtorByteWriteChannel

// marker interfaces used internally for accessing the underlying ktor impl
internal interface KtorReadChannel {
val chan: KtorByteReadChannel
}

internal interface KtorWriteChannel {
val chan: KtorByteWriteChannel
}

/**
* Wrap ktor's ByteReadChannel as our own. This implements the common API of [SdkByteReadChannel]. Only
* platform specific differences in interfaces need be implemented in inheritors.
*/
internal abstract class KtorReadChannelAdapterBase(
override val chan: KtorByteReadChannel
) : SdkByteReadChannel, KtorReadChannel {

override val availableForRead: Int
get() = chan.availableForRead

override val isClosedForRead: Boolean
get() = chan.isClosedForRead

override val isClosedForWrite: Boolean
get() = chan.isClosedForWrite

override suspend fun readRemaining(limit: Int): ByteArray {
return chan.readRemaining(limit.toLong()).readBytes()
}

override suspend fun readFully(sink: ByteArray, offset: Int, length: Int) {
chan.readFully(sink, offset, length)
}

override suspend fun readAvailable(sink: ByteArray, offset: Int, length: Int): Int {
return chan.readAvailable(sink, offset, length)
}

override fun cancel(cause: Throwable?): Boolean {
return chan.cancel(cause)
}
}

/**
* Wrap ktor's ByteWriteChannel as our own. This implements the common API of [SdkByteWriteChannel]. Only
* platform specific differences in interfaces need be implemented in inheritors.
*/
internal abstract class KtorWriteChannelAdapterBase(
override val chan: KtorByteWriteChannel
) : SdkByteWriteChannel, KtorWriteChannel {
override val availableForWrite: Int
get() = chan.availableForWrite

override val isClosedForWrite: Boolean
get() = chan.isClosedForWrite

override val totalBytesWritten: Long
get() = chan.totalBytesWritten

override val autoFlush: Boolean
get() = chan.autoFlush

override suspend fun writeFully(src: ByteArray, offset: Int, length: Int) {
chan.writeFully(src, offset, length)
}

override suspend fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int {
return chan.writeAvailable(src, offset, length)
}

override fun close(cause: Throwable?): Boolean {
return chan.close(cause)
}

override fun flush() {
chan.flush()
}
}

/**
* Wrap ktor's ByteChannel as our own
*/
internal class KtorByteChannelAdapter(
override val chan: KtorByteChannel
) : SdkByteChannel,
SdkByteReadChannel by KtorReadChannelAdapter(chan),
SdkByteWriteChannel by KtorWriteChannelAdapter(chan),
KtorWriteChannel,
KtorReadChannel {
override val isClosedForWrite: Boolean
get() = chan.isClosedForWrite

override fun close() { chan.close(null) }
}

internal expect class KtorReadChannelAdapter(chan: KtorByteReadChannel) : SdkByteReadChannel
internal expect class KtorWriteChannelAdapter(chan: KtorByteWriteChannel) : SdkByteWriteChannel

internal fun KtorByteReadChannel.toSdkChannel(): SdkByteReadChannel = KtorReadChannelAdapter(this)
internal fun KtorByteWriteChannel.toSdkChannel(): SdkByteWriteChannel = KtorWriteChannelAdapter(this)
internal fun KtorByteChannel.toSdkChannel(): SdkByteChannel = KtorByteChannelAdapter(this)
Loading

0 comments on commit b083bd9

Please sign in to comment.