-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(io): implement missing IO runtime primitives #264
Changes from all commits
37cadc4
6264273
6217b1f
6dd0f09
8b18638
bd7f020
8a680c1
9ef7b21
ff0e32b
ebf574a
8fac87c
7bdbc53
9b477e7
94f2e13
dbc6b51
cfc2756
ead19c7
79c308b
a515957
920840f
8784db7
244d075
d42c183
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question In my experience I realize we don't use these types in our SDK due to concurrency constraints but seems like at this level those concerns are not valid. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow what you mean, can you clarify? I'll double check but I don't recall seeing any utilities for dealing with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean, this logic is for customers to work with data in or out of the SDK. As such we should consider thier general utility. To rephrase, in my experience in Javaland, java libraries often use Input/Output streams for handling this data. It's likely that customers will want to use It's not something that needs to be addressed as part of this PR, and customer feedback is certainly warranted here. |
||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0. | ||
*/ | ||
|
||
package software.aws.clientrt.content | ||
|
||
import software.aws.clientrt.io.SdkByteReadChannel | ||
import software.aws.clientrt.io.copyTo | ||
import software.aws.clientrt.io.writeChannel | ||
import java.io.File | ||
import java.nio.file.Path | ||
|
||
// JVM specific extensions for dealing with ByteStream's | ||
|
||
/** | ||
* Create a [ByteStream] from a file | ||
*/ | ||
fun ByteStream.Companion.fromFile(file: File): ByteStream = file.asByteStream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extension function on the Companion object is that even a thing? Why have this and the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
is that a real question or were you just surprised?
Consistency. We provide There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. k |
||
|
||
/** | ||
* Create a [ByteStream] from a file | ||
*/ | ||
fun File.asByteStream(): ByteStream = FileContent(this) | ||
|
||
/** | ||
* Create a [ByteStream] from a path | ||
*/ | ||
fun Path.asByteStream(): ByteStream { | ||
val f = toFile() | ||
require(f.isFile) { "cannot create a ByteStream from a directory: $this" } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion If we're guarding dir/file why not also guard if the file exists? |
||
require(f.exists()) { "cannot create ByteStream, invalid file: $this" } | ||
return f.asByteStream() | ||
} | ||
|
||
/** | ||
* Write the contents of this ByteStream to file and close it | ||
* @return the number of bytes written | ||
*/ | ||
suspend fun ByteStream.writeToFile(file: File): Long { | ||
require(file.isFile) { "cannot write contents of ByteStream to a directory: ${file.absolutePath}" } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this case the file doesn't need to exist since we may be creating it |
||
val writer = file.writeChannel() | ||
val src = when (this) { | ||
is ByteStream.Buffer -> SdkByteReadChannel(bytes()) | ||
is ByteStream.Reader -> readFrom() | ||
} | ||
|
||
try { | ||
return src.copyTo(writer) | ||
} finally { | ||
writer.close() | ||
src.close() | ||
} | ||
} | ||
|
||
/** | ||
* Write the contents of this ByteStream to file at the given path | ||
* @return the number of bytes written | ||
*/ | ||
suspend fun ByteStream.writeToFile(path: Path): Long = writeToFile(path.toFile()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0. | ||
*/ | ||
|
||
package software.aws.clientrt.content | ||
|
||
import software.aws.clientrt.io.SdkByteReadChannel | ||
import software.aws.clientrt.io.readChannel | ||
import java.io.File | ||
|
||
/** | ||
* ByteStream backed by a local [file] | ||
*/ | ||
public class FileContent( | ||
public val file: File, | ||
) : ByteStream.Reader() { | ||
|
||
override val contentLength: Long | ||
get() = file.length() | ||
|
||
override fun readFrom(): SdkByteReadChannel = file.readChannel() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0. | ||
*/ | ||
|
||
package software.aws.clientrt.io | ||
|
||
import io.ktor.utils.io.bits.* | ||
import io.ktor.utils.io.core.* | ||
|
||
@OptIn(ExperimentalIoApi::class) | ||
internal interface Allocator { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wtf is all this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so this has to do with This followed in |
||
fun alloc(size: Int): Memory | ||
// FIXME - we should revisit this - Kotlin/Native is only place where we would actually be manually managing memory | ||
// and that story may change to the point where a free() function isn't even necessary | ||
fun free(instance: Memory) | ||
} | ||
|
||
// allocate using the most appropriate underlying platform type (e.g. ByteBuffer on JVM, ArrayBuffer on JS, etc) | ||
internal expect object DefaultAllocator : Allocator | ||
|
||
/** | ||
* Round up to the next power of 2. [size] should be non-negative | ||
*/ | ||
internal fun ceilp2(size: Int): Int { | ||
require(size >= 0) { "must be positive integer" } | ||
var x = size - 1 | ||
x = x or (x shr 1) | ||
x = x or (x shr 2) | ||
x = x or (x shr 4) | ||
x = x or (x shr 8) | ||
x = x or (x shr 16) | ||
return x + 1 | ||
} | ||
|
||
/** | ||
* Allocate new memory of size [newSize], copy the contents of [instance] into it and free [instance] | ||
* and return the newly allocated memory. | ||
* | ||
* The memory of [instance] should no longer be used after calling. | ||
*/ | ||
@OptIn(ExperimentalIoApi::class) | ||
internal fun Allocator.realloc(instance: Memory, newSize: Int): Memory { | ||
require(newSize >= instance.size32) | ||
val newInstance = alloc(newSize) | ||
instance.copyTo(newInstance, 0, instance.size32, 0) | ||
free(instance) | ||
return newInstance | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0. | ||
*/ | ||
|
||
package software.aws.clientrt.io | ||
|
||
// this really should live in the stdlib... | ||
// https://youtrack.jetbrains.com/issue/KT-31066 | ||
|
||
public expect interface Closeable { | ||
public fun close() | ||
} | ||
|
||
public inline fun <C : Closeable, R> C.use(block: (C) -> R): R { | ||
var closed = false | ||
|
||
return try { | ||
block(this) | ||
} catch (first: Throwable) { | ||
try { | ||
closed = true | ||
close() | ||
} catch (second: Throwable) { | ||
first.addSuppressedInternal(second) | ||
} | ||
|
||
throw first | ||
} finally { | ||
if (!closed) { | ||
close() | ||
} | ||
} | ||
} | ||
|
||
@PublishedApi | ||
internal expect fun Throwable.addSuppressedInternal(other: Throwable) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. comment I don't understand what this function is for, maybe some docs? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* SPDX-License-Identifier: Apache-2.0. | ||
*/ | ||
|
||
package software.aws.clientrt.io | ||
|
||
import io.ktor.utils.io.* | ||
import io.ktor.utils.io.core.* | ||
import io.ktor.utils.io.ByteChannel as KtorByteChannel | ||
import io.ktor.utils.io.ByteReadChannel as KtorByteReadChannel | ||
import io.ktor.utils.io.ByteWriteChannel as KtorByteWriteChannel | ||
|
||
// marker interfaces used internally for accessing the underlying ktor impl | ||
internal interface KtorReadChannel { | ||
val chan: KtorByteReadChannel | ||
} | ||
|
||
internal interface KtorWriteChannel { | ||
val chan: KtorByteWriteChannel | ||
} | ||
|
||
/** | ||
* Wrap ktor's ByteReadChannel as our own. This implements the common API of [SdkByteReadChannel]. Only | ||
* platform specific differences in interfaces need be implemented in inheritors. | ||
*/ | ||
internal abstract class KtorReadChannelAdapterBase( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit/style i think you can just use the shorter form her and have every property and function be one line... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can but I have found in particular that when wrapping a 3P library type that you have to be very careful or else you end up with downstream issues depending on whether you specify e.g.
You may intend to have this function be
I chose to explicitly use the longer form so that type inference wasn't part of the equation and I could guarantee I wasn't accidentally exposing details in the API. There are also some minor type differences as well that the short form can't be used for all of them. I'll take a second look though since I have a better handle on some of the sharp corners. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah it was just a nit. If the long form provides more safety, that is better. |
||
override val chan: KtorByteReadChannel | ||
) : SdkByteReadChannel, KtorReadChannel { | ||
|
||
override val availableForRead: Int | ||
get() = chan.availableForRead | ||
|
||
override val isClosedForRead: Boolean | ||
get() = chan.isClosedForRead | ||
|
||
override val isClosedForWrite: Boolean | ||
get() = chan.isClosedForWrite | ||
|
||
override suspend fun readRemaining(limit: Int): ByteArray { | ||
return chan.readRemaining(limit.toLong()).readBytes() | ||
} | ||
|
||
override suspend fun readFully(sink: ByteArray, offset: Int, length: Int) { | ||
chan.readFully(sink, offset, length) | ||
} | ||
|
||
override suspend fun readAvailable(sink: ByteArray, offset: Int, length: Int): Int { | ||
return chan.readAvailable(sink, offset, length) | ||
} | ||
|
||
override fun cancel(cause: Throwable?): Boolean { | ||
return chan.cancel(cause) | ||
} | ||
} | ||
|
||
/** | ||
* Wrap ktor's ByteWriteChannel as our own. This implements the common API of [SdkByteWriteChannel]. Only | ||
* platform specific differences in interfaces need be implemented in inheritors. | ||
*/ | ||
internal abstract class KtorWriteChannelAdapterBase( | ||
override val chan: KtorByteWriteChannel | ||
) : SdkByteWriteChannel, KtorWriteChannel { | ||
override val availableForWrite: Int | ||
get() = chan.availableForWrite | ||
|
||
override val isClosedForWrite: Boolean | ||
get() = chan.isClosedForWrite | ||
|
||
override val totalBytesWritten: Long | ||
get() = chan.totalBytesWritten | ||
|
||
override val autoFlush: Boolean | ||
get() = chan.autoFlush | ||
|
||
override suspend fun writeFully(src: ByteArray, offset: Int, length: Int) { | ||
chan.writeFully(src, offset, length) | ||
} | ||
|
||
override suspend fun writeAvailable(src: ByteArray, offset: Int, length: Int): Int { | ||
return chan.writeAvailable(src, offset, length) | ||
} | ||
|
||
override fun close(cause: Throwable?): Boolean { | ||
return chan.close(cause) | ||
} | ||
|
||
override fun flush() { | ||
chan.flush() | ||
} | ||
} | ||
|
||
/** | ||
* Wrap ktor's ByteChannel as our own | ||
*/ | ||
internal class KtorByteChannelAdapter( | ||
override val chan: KtorByteChannel | ||
) : SdkByteChannel, | ||
SdkByteReadChannel by KtorReadChannelAdapter(chan), | ||
SdkByteWriteChannel by KtorWriteChannelAdapter(chan), | ||
KtorWriteChannel, | ||
KtorReadChannel { | ||
override val isClosedForWrite: Boolean | ||
get() = chan.isClosedForWrite | ||
|
||
override fun close() { chan.close(null) } | ||
} | ||
|
||
internal expect class KtorReadChannelAdapter(chan: KtorByteReadChannel) : SdkByteReadChannel | ||
internal expect class KtorWriteChannelAdapter(chan: KtorByteWriteChannel) : SdkByteWriteChannel | ||
|
||
internal fun KtorByteReadChannel.toSdkChannel(): SdkByteReadChannel = KtorReadChannelAdapter(this) | ||
internal fun KtorByteWriteChannel.toSdkChannel(): SdkByteWriteChannel = KtorWriteChannelAdapter(this) | ||
internal fun KtorByteChannel.toSdkChannel(): SdkByteChannel = KtorByteChannelAdapter(this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing that might be interesting on the comments or docs is - is this a "single read" stream or can I seek up and down / read it multiple times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That actually depends on the variant. If it's just a
ByteArray
then of course it's "seekable". If it's aReader
(akaSdkByteReadChannel
) then it is not seekable.