Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(io): implement missing IO runtime primitives #264

Merged
merged 23 commits into from
Apr 6, 2021
Merged

feat(io): implement missing IO runtime primitives #264

merged 23 commits into from
Apr 6, 2021

Conversation

aajtodd
Copy link
Contributor

@aajtodd aajtodd commented Mar 31, 2021

Issue #, if available:
closes #130

Description of changes:

  • (refactor): rename Source -> SdkByteReadChannel
  • (feat): add wrappers around ktor-io implementation of read/write channels.
    • NOTE: we are marking the creation of these types as internal but the interfaces are public. This allow us to make use of them in the runtime but customers can only be given an instance of one (we aren't trying to implement a general purpose IO library here for others to use).
    • We are only exposing a very minimal subset of ktor's equivalent ByteRead/ByteWrite channels. ktor has lots of extension methods for doing all sorts of things. We can add as needed. Our primary use case is reading/writing to/from sockets/files (usually in larger chunks)
  • (feat): Added extensions for File / Path on JVM to read/write to/from files as a channel (ktor does the heavy lifting here of course)
    • Customers will likely only ever interact with supplying a file as LocalFileContent or ByteStream.toFile(...)
    • We will also likely not use the channel when interacting with CRT and instead special case LocalFileContent (CRT has implementations of reading a file already and supplying it as HTTP body/signing, etc).
  • (feat): Added an SdkBuffer type that we can use internally. Similar to ByteBuffer but grows as needed.
    • I struggled to figure out what I wanted from this type and ultimately landed with this. I looked heavily over ktor-io, kotlinx-io, and okio implementations. Both ktor-io and okio have a buffer abstraction that allows writing arbitrary amount of bytes to it without knowing the size up front. They both pool buffers internally and release them as they are read. (okio differs in that it's Buffer can read/write whereas ktor-io provides separate abstractions (BytePacketBuilder/ByteReadPacket)). This is neat and makes total sense but we need the ability to rewind and read content multiple times for signing. It also complicates the lifetimes of those types as you have to be sure to release the contents back to the pool (usually done internally by requiring a call to close()).
    • In the end I decided to start with something simpler albeit maybe less efficient (depending on number of re-allocations). I expect we'll use this type for implementing the protocol serializers (awsQuery first, later json and xml when we implement our own). This ended up being similar to ktor-io's Buffer type but grows on demand and can be instantiated explicitly.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.


/**
* Represents an abstract stream of bytes
* Represents an abstract read-only stream of bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that might be interesting on the comments or docs is - is this a "single read" stream or can I seek up and down / read it multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That actually depends on the variant. If it's just a ByteArray then of course it's "seekable". If it's a Reader (aka SdkByteReadChannel) then it is not seekable.

/**
* Create a [ByteStream] from a file
*/
fun ByteStream.Companion.fromFile(file: File): ByteStream = file.asByteStream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extension function on the Companion object is that even a thing? Why have this and the File based extension function below? Is this a discoverability thing?

Copy link
Contributor Author

@aajtodd aajtodd Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extension function on the Companion object is that even a thing?

is that a real question or were you just surprised?

Why have this and the File based extension function below? Is this a discoverability thing?

Consistency. We provide Bytestream.fromXYZ() functions for a few other types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

k

/**
* Write the contents of this ByteStream to file and close it
*/
suspend fun ByteStream.toFile(file: File): Long {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: might want to name this writeToFile to make it more obvious this is going to consume the stream - rather than just convert it to some File object.

import io.ktor.utils.io.core.*

@OptIn(ExperimentalIoApi::class)
internal interface Allocator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wtf is all this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this has to do with ktor-io's Memory type that abstracts away dealing with various platform "memory". You can't instantiate the type directly in common it has to be done per/platform.

This followed in ktor-io's footsteps a bit. On JVM and JS there is no free() function but on native there is. I suspect this will go away though since they are changing the memory model.

/**
* Writes all [src] bytes and suspends until all bytes written.
*/
suspend fun writeFully(src: ByteArray, offset: Int = 0, length: Int = src.size - offset): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg - how awesome are default parameters? previously this would have been 3 different methods :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right?

@Test
fun testWriteFullyInsufficientSpace() {
val buf = SdkBuffer(16)
val contents = "is it morning or is it night, the software engineer doesn't know anymore"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dark


private val AddSuppressedMethod: Method? by lazy {
try {
Throwable::class.java.getMethod("addSuppressed", Throwable::class.java)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this done by reflection because we don't support JDK8 universally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is literally copied from ktor-io which is copied into like 3 other kotlin repositories (including kotlinx-io) and I don't understand why it isn't in the stdlib (there has been a ticket opened for a while that I linked to).

So to answer your question it's not clear to me why it's done this way but I chose to not modify it. This all seems related to if you hit a second exception while handling the first. Alternatively we could just ignore the secondary exception and only propagate the original.

AddSuppressedMethod?.invoke(this, other)
}

private val AddSuppressedMethod: Method? by lazy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why is this capitalized?

Copy link

@rcoh rcoh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't see any glaring bugs or anything. Left some thoughts for improvements inline.

/**
* Write the contents of this ByteStream to file and close it
*/
suspend fun ByteStream.toFile(file: File): Long {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return value should probably be documented which I assume is the number of byte written

*/
fun rewind(count: Int = readPosition) {
val size = minOf(count, readPosition)
if (size <= 0) return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might consider adding internal APIs to modify the readHead/writeHead that take the beta unsigned int types to ensure that you never accidentally increase when you mean to decrease

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of considered this. I really wish they would stabilize unsigned types, it's weird not having them in your arsenal. I'll think about it some more and maybe play with it.

* If the total bytes available is less than [length] then as many bytes as are available will be read.
* The total bytes read is returned or `-1` if no data is available.
*/
fun SdkBuffer.readAvailable(dest: ByteArray, offset: Int = 0, length: Int = dest.size - offset): Int {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Other methods that return the number of bytes written return Long

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so this is sort of deliberate and also something that's bugs me about ByteArray. Basically a ByteArray can only be of size Int bytes. There are no constructors taking a Long. This is why there is a bit of a difference in the API in various spots. I'm not sure it makes sense to return Long if it's impossible to ever read/write more than Int bytes.

* Read from this buffer exactly [length] bytes and write to [dest] starting at [offset]
* @throws IllegalArgumentException if there are not enough bytes available for read or the offset/length combination is invalid
*/
fun SdkBuffer.readFully(dest: ByteArray, offset: Int = 0, length: Int = dest.size - offset) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/docs: even after reading some code, I think I know, but I'm not totally sure if offset pertains to dest or this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try and update the documentation but it applies to dest. It will write length bytes to dest starting at offset

* Write [length] bytes of [src] to this buffer starting at [offset]
* @throws IllegalArgumentException if there is insufficient space or the offset/length combination is invalid
*/
fun SdkBuffer.writeFully(src: ByteArray, offset: Int = 0, length: Int = src.size - offset) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/naming: I personally prefer readToEnd and writeAll

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like writeAll but I'm not sure on readToEnd. readToEnd implies to me that you'll be reading to the end of the buffer but what readFully does is try and populate the destination buffer "fully" (i.e. exactly of size length bytes). Maybe readExact would be better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although if we go that route maybe writeExact for symmetry....

/**
* Read the available (unread) contents as a UTF-8 string
*/
fun SdkBuffer.decodeToString() = bytes().decodeToString(0, readRemaining)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this accept an encoding argument that defaults to UTF-8 but admits other options?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably but we don't have a great way to deal with different charsets in multiplatform. ByteArray.decodeToString() is UTF-8 only. The JVM side of things yes we could probably do that but I think we'll have to punt for now. We can always go back and add it in as a default argument.

* This is a buffered **single-reader single writer channel**.
*
* Read operations can be invoked concurrently with write operations, but multiple reads or multiple writes
* cannot be invoked concurrently with themselves. Exceptions are [close] and [flush] which can be invoked
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like maybe a not-so-easy to maintain invariant? To me, this seems much more like a ring buffer than a channel (which to me implies at least MPSC if not MPMC)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These invariants come from ktor's implementation. I'd be happy to just describe it as a SPSC channel though and leave out the rest of this description. I can't really think of a scenario at the moment where I need multiple writers or readers and if I did there are other options that could be built on top of this type to handle that.

If you dive into the implementation your intuition isn't too far off though. It's not really a channel per say in the MPSC/MPMC sense. It's a buffer that coordinates suspension. Think of it as a buffer with a condition variable. Read requests suspend only if the read operation requested can't be fulfilled and get notified when it can. Writers suspend if the buffer is full and get notified when more room is available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I read this documentation comment just now again and the description makes sense to me.

It's just saying that you can have a single reader and a single writer invoking read/write operations at the same time but that you can't have multiple readers or multiple writers invoking read/write operations at same time (which is just a long winded way of saying SPSC).

What would improve it for you?

Copy link

@rcoh rcoh Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this exposed to customers or is this just internal? I'm mostly just worried about a customer causing a race in their own code because of the thread safety implied by the name "channel"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the SdkByteReadChannel type is exposed to customers (through the ByteStream type).

Although I expect most customers to do one of two things (1) write it to file or (2) read it completely into memory (both of which we have provided convenience functions for).

Copy link
Contributor

@kggilmer kggilmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainly looking to see if we must api coroutines, otherwise some questions and nits.

*/
fun Path.asByteStream(): ByteStream {
val f = toFile()
require(f.isFile) { "cannot create a ByteStream from a directory: $this" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

If we're guarding dir/file why not also guard if the file exists? File.exists() I think.

* Write the contents of this ByteStream to file and close it
*/
suspend fun ByteStream.toFile(file: File): Long {
require(file.isFile) { "cannot write contents of ByteStream to a directory: ${file.absolutePath}" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case the file doesn't need to exist since we may be creating it

@@ -0,0 +1,57 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question

In my experience InputStream/OutputStream is the most common way of dealing w/ I/O in Java. They are more flexibil than working directly with filetypes as they can be composed, among other features. Is there a reason you don't provide mappings to those in this file?

I realize we don't use these types in our SDK due to concurrency constraints but seems like at this level those concerns are not valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize we don't use these types in our SDK due to concurrency constraints but seems like at this level those concerns are not valid.

I'm not sure I follow what you mean, can you clarify?

I'll double check but I don't recall seeing any utilities for dealing with Input/Output stream. The reason we can for files is asynchronous utilities have been built on top of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, this logic is for customers to work with data in or out of the SDK. As such we should consider thier general utility. To rephrase, in my experience in Javaland, java libraries often use Input/Output streams for handling this data. It's likely that customers will want to use JavaLibraryX which has a function doSomethingWith(InputStream: data). Unless there is a simple way of performing this mapping that I've missed, it may be of limited utility.

It's not something that needs to be addressed as part of this PR, and customer feedback is certainly warranted here.

/**
* ByteStream backed by a local [file]
*/
public class LocalFileContent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/style

IMO the nature of type java.io.File is sufficient to express the 'locality' of the behaviour of this class, and think 'Local' could be dropped. Maybe something like FileByteStreamReader or FileReader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can drop the Local but to be consistent the rest of the types are all XyzContent so it should probably be FileContent to be consistent

implementation("io.ktor:ktor-io:$ktorVersion")

// Dispatchers.IO
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question

As I understand it this provides access to the jetbrains coroutine library from within our SDK for customers to depend on from their programs. It would be safer perhaps to require customers to depend on that directly. Is there a reason why this cannot be implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to make this implementation however I think it has to be api. The comment above indicates why, the file readers expose Dispatchers.IO which is from kotlinx-coroutines-core.

You are correct that it technically allows others to access the jetbrains library from our SDK but that's not the real purpose of api:

The api configuration should be used to declare dependencies which are exported by the library API, whereas the implementation configuration should be used to declare dependencies which are internal to the component.

https://docs.gradle.org/current/userguide/java_library_plugin.html#sec:java_library_separation

So because we use a type in our API from the library it's an api dependency to us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, appreciate the details

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to make this implementation for now by just hard coding the dispatcher to Dispatchers.IO. We can revisit as needed.

if (limit == 0L) return 0L

// delegate to ktor-io if possible which may have further optimizations based on impl
val cnt = if (this is IsKtorReadChannel && dst is IsKtorWriteChannel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/style

I prefer count to cnt

import software.aws.clientrt.testing.runSuspendTest
import kotlin.test.*

class SdkByteChannelOpsTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question

what's ops?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operations


package software.aws.clientrt.io

internal fun SdkBuffer.hasArray() = memory.buffer.hasArray() && !memory.buffer.isReadOnly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@@ -609,6 +609,8 @@ abstract class HttpBindingProtocolGenerator : ProtocolGenerator {
.map { it.member }

if (documentMembers.isNotEmpty()) {
// FIXME - we should not be slurping the entire contents into memory, instead our deserializers
// should work off of an SdkByteReadChannel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yesss

@@ -42,4 +42,30 @@ class MiddlewareTest {
}
assertEquals("Foo", handler.call("foo"))
}

@Test
fun testMapRequest() = runSuspendTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question

It's not clear to me what these tests are exercising. can you briefly explain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are covering the utility middleware components MapRequest/MapResponse. I noticed our code coverage missed these and they were easy to add so I added them.

@aajtodd
Copy link
Contributor Author

aajtodd commented Apr 1, 2021

mainly looking to see if we must api coroutines

Yeah I totally get this concern. I have the same concern / intuition but I'm not sure if we can do better unless we hard code the dispatcher to Dispatcher.IO internally (exposed here). I believe that was the only type exposed but I'd have to double check again at this point.

Copy link
Contributor

@kggilmer kggilmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nits here and there, nothing to block for.

@@ -0,0 +1,57 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, this logic is for customers to work with data in or out of the SDK. As such we should consider thier general utility. To rephrase, in my experience in Javaland, java libraries often use Input/Output streams for handling this data. It's likely that customers will want to use JavaLibraryX which has a function doSomethingWith(InputStream: data). Unless there is a simple way of performing this mapping that I've missed, it may be of limited utility.

It's not something that needs to be addressed as part of this PR, and customer feedback is certainly warranted here.

implementation("io.ktor:ktor-io:$ktorVersion")

// Dispatchers.IO
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, appreciate the details

* Wrap ktor's ByteReadChannel as our own. This implements the common API of [SdkByteReadChannel]. Only
* platform specific differences in interfaces need be implemented in inheritors.
*/
internal abstract class KtorReadChannelAdapterBase(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it was just a nit. If the long form provides more safety, that is better.

@aajtodd aajtodd merged commit b083bd9 into main Apr 6, 2021
@aajtodd aajtodd deleted the feat-io branch April 6, 2021 14:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement missing IO primitives for binary streaming
4 participants