From 4cb9d148bc305cd3ca463691ad54e74557ff5e1c Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Mon, 5 Feb 2024 21:41:32 -0500 Subject: [PATCH 1/3] auto-dispatch to I/O context --- .../conformance/client/TracingHTTPClient.kt | 4 +- .../BidirectionalStreamInterface.kt | 4 +- .../connectrpc/ClientOnlyStreamInterface.kt | 4 +- .../com/connectrpc/ProtocolClientConfig.kt | 9 ++ .../connectrpc/ServerOnlyStreamInterface.kt | 2 +- .../connectrpc/http/HTTPClientInterface.kt | 90 ----------- .../main/kotlin/com/connectrpc/http/Stream.kt | 145 ++++++++++++++++++ .../connectrpc/impl/BidirectionalStream.kt | 4 +- .../com/connectrpc/impl/ClientOnlyStream.kt | 4 +- .../com/connectrpc/impl/ProtocolClient.kt | 65 +++++--- .../com/connectrpc/impl/ServerOnlyStream.kt | 2 +- 11 files changed, 207 insertions(+), 126 deletions(-) create mode 100644 library/src/main/kotlin/com/connectrpc/http/Stream.kt diff --git a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/TracingHTTPClient.kt b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/TracingHTTPClient.kt index f20acd70..a1824313 100644 --- a/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/TracingHTTPClient.kt +++ b/conformance/client/src/main/kotlin/com/connectrpc/conformance/client/TracingHTTPClient.kt @@ -97,12 +97,12 @@ internal class TracingHTTPClient( return res } - override fun sendClose() { + override suspend fun sendClose() { printer.printlnWithStackTrace("Half-closing stream") delegate.sendClose() } - override fun receiveClose() { + override suspend fun receiveClose() { printer.printlnWithStackTrace("Closing stream") delegate.receiveClose() } diff --git a/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt b/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt index 87778622..7703aada 100644 --- a/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/BidirectionalStreamInterface.kt @@ -67,12 +67,12 @@ interface BidirectionalStreamInterface { /** * Close the send stream. No calls to [send] are valid after calling [sendClose]. */ - fun sendClose() + suspend fun sendClose() /** * Close the receive stream. */ - fun receiveClose() + suspend fun receiveClose() /** * Determine if the underlying client send stream is closed. diff --git a/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt b/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt index 9fe150a5..7975d0e2 100644 --- a/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/ClientOnlyStreamInterface.kt @@ -57,13 +57,13 @@ interface ClientOnlyStreamInterface { /** * Close the stream. No calls to [send] are valid after calling [sendClose]. */ - fun sendClose() + suspend fun sendClose() /** * Cancels the stream. This closes both send and receive sides of the stream * without awaiting any server reply. */ - fun cancel() + suspend fun cancel() /** * Determine if the underlying client send stream is closed. diff --git a/library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt b/library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt index ae38bca6..84b6836c 100644 --- a/library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt +++ b/library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt @@ -22,6 +22,7 @@ import com.connectrpc.protocols.GRPCInterceptor import com.connectrpc.protocols.GRPCWebInterceptor import com.connectrpc.protocols.NetworkProtocol import java.net.URI +import kotlin.coroutines.CoroutineContext /** * Set of configuration used to set up clients. @@ -45,6 +46,14 @@ class ProtocolClientConfig @JvmOverloads constructor( // Compression pools that provide support for the provided `compressionName`, as well as any // other compression methods that need to be supported for inbound responses. compressionPools: List = listOf(GzipCompressionPool), + // The coroutine context to use for I/O, such as sending RPC messages. + // If null, the current/calling coroutine context is used. So the caller + // may need explicitly dispatch send calls using contexts where I/O is + // appropriate (using the withContext extension function). If non-null + // (such as Dispatchers.IO), operations that involve I/O pr other + // blocking will automatically be dispatched using the biven context, + // so the caller does not need to worry about it. + val ioCoroutineContext: CoroutineContext? = null, ) { private val internalInterceptorFactoryList = mutableListOf<(ProtocolClientConfig) -> Interceptor>() private val compressionPools = mutableMapOf() diff --git a/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt b/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt index f7adbfae..abf9b5b7 100644 --- a/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/ServerOnlyStreamInterface.kt @@ -62,7 +62,7 @@ interface ServerOnlyStreamInterface { /** * Close the receive stream. */ - fun receiveClose() + suspend fun receiveClose() /** * Determine if the underlying client receive stream is closed. diff --git a/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt b/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt index 0ae96877..2424b86f 100644 --- a/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt +++ b/library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt @@ -16,7 +16,6 @@ package com.connectrpc.http import com.connectrpc.StreamResult import okio.Buffer -import java.util.concurrent.atomic.AtomicBoolean typealias Cancelable = () -> Unit @@ -46,92 +45,3 @@ interface HTTPClientInterface { */ fun stream(request: HTTPRequest, duplex: Boolean, onResult: suspend (StreamResult) -> Unit): Stream } - -interface Stream { - suspend fun send(buffer: Buffer): Result - - fun sendClose() - - fun receiveClose() - - fun isSendClosed(): Boolean - - fun isReceiveClosed(): Boolean -} - -fun Stream( - onSend: suspend (Buffer) -> Result, - onSendClose: () -> Unit = {}, - onReceiveClose: () -> Unit = {}, -): Stream { - val isSendClosed = AtomicBoolean() - val isReceiveClosed = AtomicBoolean() - return object : Stream { - override suspend fun send(buffer: Buffer): Result { - if (isSendClosed()) { - return Result.failure(IllegalStateException("cannot send. underlying stream is closed")) - } - return try { - onSend(buffer) - } catch (e: Throwable) { - Result.failure(e) - } - } - - override fun sendClose() { - if (isSendClosed.compareAndSet(false, true)) { - onSendClose() - } - } - - override fun receiveClose() { - if (isReceiveClosed.compareAndSet(false, true)) { - try { - onReceiveClose() - } finally { - // When receive side is closed, the send side is - // implicitly closed as well. - // We don't use sendClose() because we don't want to - // invoke onSendClose() since that will try to actually - // half-close the HTTP stream, which will fail since - // closing the receive side cancels the entire thing. - isSendClosed.set(true) - } - } - } - - override fun isSendClosed(): Boolean { - return isSendClosed.get() - } - - override fun isReceiveClosed(): Boolean { - return isReceiveClosed.get() - } - } -} - -/** - * Returns a new stream that applies the given function to each - * buffer when send is called. The result of that function is - * what is passed along to the original stream. - */ -fun Stream.transform(apply: (Buffer) -> Buffer): Stream { - val delegate = this - return object : Stream { - override suspend fun send(buffer: Buffer): Result { - return delegate.send(apply(buffer)) - } - override fun sendClose() { - delegate.sendClose() - } - override fun receiveClose() { - delegate.receiveClose() - } - override fun isSendClosed(): Boolean { - return delegate.isSendClosed() - } - override fun isReceiveClosed(): Boolean { - return delegate.isReceiveClosed() - } - } -} diff --git a/library/src/main/kotlin/com/connectrpc/http/Stream.kt b/library/src/main/kotlin/com/connectrpc/http/Stream.kt new file mode 100644 index 00000000..f8a92b9a --- /dev/null +++ b/library/src/main/kotlin/com/connectrpc/http/Stream.kt @@ -0,0 +1,145 @@ +// Copyright 2022-2023 The Connect Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.connectrpc.http + +import kotlinx.coroutines.withContext +import okio.Buffer +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.coroutines.CoroutineContext + +/** + * Stream represents the communications for a single streaming RPC. + * It can be used to send messages and to close the stream. Receiving + * messages is done via callbacks provided when the stream is created. + * + * See HTTPClientInterface#stream. + */ +interface Stream { + suspend fun send(buffer: Buffer): Result + + suspend fun sendClose() + + suspend fun receiveClose() + + fun isSendClosed(): Boolean + + fun isReceiveClosed(): Boolean +} + +/** + * Creates a new stream whose implementation of sending and + * closing is delegated to the given lambdas. + */ +fun Stream( + onSend: suspend (Buffer) -> Result, + onSendClose: suspend () -> Unit = {}, + onReceiveClose: suspend () -> Unit = {}, +): Stream { + val isSendClosed = AtomicBoolean() + val isReceiveClosed = AtomicBoolean() + return object : Stream { + override suspend fun send(buffer: Buffer): Result { + if (isSendClosed()) { + return Result.failure(IllegalStateException("cannot send. underlying stream is closed")) + } + return try { + onSend(buffer) + } catch (e: Throwable) { + Result.failure(e) + } + } + + override suspend fun sendClose() { + if (isSendClosed.compareAndSet(false, true)) { + onSendClose() + } + } + + override suspend fun receiveClose() { + if (isReceiveClosed.compareAndSet(false, true)) { + try { + onReceiveClose() + } finally { + // When receive side is closed, the send side is + // implicitly closed as well. + // We don't use sendClose() because we don't want to + // invoke onSendClose() since that will try to actually + // half-close the HTTP stream, which will fail since + // closing the receive side cancels the entire thing. + isSendClosed.set(true) + } + } + } + + override fun isSendClosed(): Boolean { + return isSendClosed.get() + } + + override fun isReceiveClosed(): Boolean { + return isReceiveClosed.get() + } + } +} + +/** + * Returns a new stream that applies the given function to each + * buffer when send is called. The result of that function is + * what is passed along to the original stream. + */ +fun Stream.transform(apply: (Buffer) -> Buffer): Stream { + val delegate = this + return object : Stream { + override suspend fun send(buffer: Buffer): Result { + return delegate.send(apply(buffer)) + } + override suspend fun sendClose() { + delegate.sendClose() + } + override suspend fun receiveClose() { + delegate.receiveClose() + } + override fun isSendClosed(): Boolean { + return delegate.isSendClosed() + } + override fun isReceiveClosed(): Boolean { + return delegate.isReceiveClosed() + } + } +} + +/** + * Returns a new stream that dispatches suspending operations + * (sending and closing) using the given coroutine context. + */ +fun Stream.dispatchIn(context: CoroutineContext): Stream { + val delegate = this + return object : Stream { + override suspend fun send(buffer: Buffer): Result = withContext(context) { + delegate.send(buffer) + } + override suspend fun sendClose() = withContext(context) { + delegate.sendClose() + } + override suspend fun receiveClose() = withContext(context) { + delegate.receiveClose() + } + override fun isSendClosed(): Boolean { + return delegate.isSendClosed() + } + override fun isReceiveClosed(): Boolean { + return delegate.isReceiveClosed() + } + } +} diff --git a/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt b/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt index df2d01b8..45b56eba 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/BidirectionalStream.kt @@ -59,11 +59,11 @@ internal class BidirectionalStream( return stream.isReceiveClosed() } - override fun sendClose() { + override suspend fun sendClose() { stream.sendClose() } - override fun receiveClose() { + override suspend fun receiveClose() { stream.receiveClose() } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt b/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt index 583793f6..0edbf07e 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ClientOnlyStream.kt @@ -54,11 +54,11 @@ internal class ClientOnlyStream( return messageStream.responseTrailers() } - override fun sendClose() { + override suspend fun sendClose() { return messageStream.sendClose() } - override fun cancel() { + override suspend fun cancel() { return messageStream.receiveClose() } diff --git a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt index 07807730..909b5be8 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ProtocolClient.kt @@ -31,11 +31,14 @@ import com.connectrpc.http.Cancelable import com.connectrpc.http.HTTPClientInterface import com.connectrpc.http.HTTPRequest import com.connectrpc.http.UnaryHTTPRequest +import com.connectrpc.http.dispatchIn import com.connectrpc.http.transform import com.connectrpc.protocols.GETConfiguration import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withContext import java.net.URI import java.util.concurrent.CountDownLatch import kotlin.coroutines.resume @@ -138,6 +141,19 @@ class ProtocolClient( request: Input, headers: Headers, methodSpec: MethodSpec, + ): ResponseMessage { + if (config.ioCoroutineContext != null) { + return withContext(config.ioCoroutineContext) { + suspendUnary(request, headers, methodSpec) + } + } + return suspendUnary(request, headers, methodSpec) + } + + private suspend fun suspendUnary( + request: Input, + headers: Headers, + methodSpec: MethodSpec, ): ResponseMessage { return suspendCancellableCoroutine { continuation -> val cancelable = unary(request, headers, methodSpec) { responseMessage -> @@ -168,18 +184,11 @@ class ProtocolClient( return call } - override suspend fun stream( - headers: Headers, - methodSpec: MethodSpec, - ): BidirectionalStreamInterface { - return bidirectionalStream(methodSpec, headers) - } - override suspend fun serverStream( headers: Headers, methodSpec: MethodSpec, ): ServerOnlyStreamInterface { - val stream = bidirectionalStream(methodSpec, headers) + val stream = stream(headers, methodSpec) return ServerOnlyStream(stream) } @@ -191,10 +200,10 @@ class ProtocolClient( return ClientOnlyStream(stream) } - private suspend fun bidirectionalStream( - methodSpec: MethodSpec, + override suspend fun stream( headers: Headers, - ): BidirectionalStream = suspendCancellableCoroutine { continuation -> + methodSpec: MethodSpec, + ): BidirectionalStreamInterface { val channel = Channel(1) val responseHeaders = CompletableDeferred() val responseTrailers = CompletableDeferred() @@ -209,10 +218,13 @@ class ProtocolClient( val streamFunc = config.createStreamingInterceptorChain() val finalRequest = streamFunc.requestFunction(request) var isComplete = false - val httpStream = httpClient.stream(finalRequest, methodSpec.streamType == StreamType.BIDI) { initialResult -> + val httpStream = httpClient.stream( + finalRequest, + methodSpec.streamType == StreamType.BIDI, + ) httpStream@{ initialResult -> if (isComplete) { // No-op on remaining handlers after a completion. - return@stream + return@httpStream } // Pass through the interceptor chain. when (val streamResult = streamFunc.streamResultFunction(initialResult)) { @@ -257,22 +269,27 @@ class ProtocolClient( } } } - continuation.invokeOnCancellation { - httpStream.receiveClose() - } - val stream = httpStream.transform { streamFunc.requestBodyFunction(it) } - channel.invokeOnClose { - stream.receiveClose() - } - continuation.resume( - BidirectionalStream( + try { + channel.invokeOnClose { + runBlocking { httpStream.receiveClose() } + } + var stream = httpStream.transform { streamFunc.requestBodyFunction(it) } + if (config.ioCoroutineContext != null) { + stream = stream.dispatchIn(config.ioCoroutineContext) + } + return BidirectionalStream( stream, requestCodec, channel, responseHeaders, responseTrailers, - ), - ) + ) + } catch (ex: Throwable) { + // If something in these last steps prevents us + // from returning, don't leak the stream. + httpStream.receiveClose() + throw ex + } } private fun urlFromMethodSpec(methodSpec: MethodSpec<*, *>) = baseURIWithTrailingSlash.resolve(methodSpec.path).toURL() diff --git a/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt b/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt index d1fa9b3e..e04eb38c 100644 --- a/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt +++ b/library/src/main/kotlin/com/connectrpc/impl/ServerOnlyStream.kt @@ -46,7 +46,7 @@ internal class ServerOnlyStream( } } - override fun receiveClose() { + override suspend fun receiveClose() { messageStream.receiveClose() } From 7e6334e15c0fd6ba8eb03a90901744caefee378a Mon Sep 17 00:00:00 2001 From: Josh Humphries <2035234+jhump@users.noreply.github.com> Date: Mon, 5 Feb 2024 22:08:09 -0500 Subject: [PATCH 2/3] update examples to use new feature --- .../examples/android/ElizaChatActivity.kt | 23 ++++++++++--------- .../com/connectrpc/examples/kotlin/Main.kt | 16 ++++++------- .../com/connectrpc/examples/kotlin/Main.kt | 16 ++++++------- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/examples/android/src/main/kotlin/com/connectrpc/examples/android/ElizaChatActivity.kt b/examples/android/src/main/kotlin/com/connectrpc/examples/android/ElizaChatActivity.kt index e4f82802..0acd934c 100644 --- a/examples/android/src/main/kotlin/com/connectrpc/examples/android/ElizaChatActivity.kt +++ b/examples/android/src/main/kotlin/com/connectrpc/examples/android/ElizaChatActivity.kt @@ -86,6 +86,9 @@ class ElizaChatActivity : AppCompatActivity() { host = host, serializationStrategy = GoogleJavaLiteProtobufStrategy(), networkProtocol = selectedNetworkProtocolOption, + // RPC operations that involve network I/O will + // use this coroutine context. + ioCoroutineContext = Dispatchers.IO, ), ) // Create the Eliza service client. @@ -113,7 +116,7 @@ class ElizaChatActivity : AppCompatActivity() { adapter.add(MessageData(sentence, false)) editTextView.setText("") // Ensure IO context for unary requests. - lifecycleScope.launch(Dispatchers.IO) { + lifecycleScope.launch { // Make a unary request to Eliza. val response = elizaServiceClient.say(SayRequest.newBuilder().setSentence(sentence).build()) response.success { success -> @@ -133,7 +136,7 @@ class ElizaChatActivity : AppCompatActivity() { private fun setupStreamingChat(elizaServiceClient: ElizaServiceClient) { // On stream result, this callback can be called multiple times. - lifecycleScope.launch(Dispatchers.IO) { + lifecycleScope.launch { // Initialize a bidi stream with Eliza. val stream = elizaServiceClient.converse() try { @@ -156,15 +159,13 @@ class ElizaChatActivity : AppCompatActivity() { } catch (e: ConnectException) { adapter.add(MessageData("Session failed with code ${e.code}", true)) } - lifecycleScope.launch(Dispatchers.Main) { - buttonView.setOnClickListener { - val sentence = editTextView.text.toString() - adapter.add(MessageData(sentence, false)) - editTextView.setText("") - // Send will be streaming a message to Eliza. - lifecycleScope.launch(Dispatchers.IO) { - stream.send(ConverseRequest.newBuilder().setSentence(sentence).build()) - } + buttonView.setOnClickListener { + val sentence = editTextView.text.toString() + adapter.add(MessageData(sentence, false)) + editTextView.setText("") + // Send will be streaming a message to Eliza. + lifecycleScope.launch { + stream.send(ConverseRequest.newBuilder().setSentence(sentence).build()) } } } diff --git a/examples/kotlin-google-java/src/main/kotlin/com/connectrpc/examples/kotlin/Main.kt b/examples/kotlin-google-java/src/main/kotlin/com/connectrpc/examples/kotlin/Main.kt index 3d3181da..e2508537 100644 --- a/examples/kotlin-google-java/src/main/kotlin/com/connectrpc/examples/kotlin/Main.kt +++ b/examples/kotlin-google-java/src/main/kotlin/com/connectrpc/examples/kotlin/Main.kt @@ -22,7 +22,6 @@ import com.connectrpc.impl.ProtocolClient import com.connectrpc.okhttp.ConnectOkHttpClient import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext import okhttp3.OkHttpClient import java.time.Duration @@ -44,6 +43,9 @@ class Main { ProtocolClientConfig( host = host, serializationStrategy = GoogleJavaProtobufStrategy(), + // RPC operations that involve network I/O will + // use this coroutine context. + ioCoroutineContext = Dispatchers.IO, ), ) val elizaServiceClient = ElizaServiceClient(client) @@ -57,13 +59,11 @@ class Main { private suspend fun connectStreaming(elizaServiceClient: ElizaServiceClient) { val stream = elizaServiceClient.converse() - withContext(Dispatchers.IO) { - // Add the message the user is sending to the views. - stream.send(converseRequest { sentence = "hello" }) - stream.sendClose() - for (response in stream.responseChannel()) { - println(response.sentence) - } + // Add the message the user is sending to the views. + stream.send(converseRequest { sentence = "hello" }) + stream.sendClose() + for (response in stream.responseChannel()) { + println(response.sentence) } } } diff --git a/examples/kotlin-google-javalite/src/main/kotlin/com/connectrpc/examples/kotlin/Main.kt b/examples/kotlin-google-javalite/src/main/kotlin/com/connectrpc/examples/kotlin/Main.kt index 8205d4ad..7e6179ee 100644 --- a/examples/kotlin-google-javalite/src/main/kotlin/com/connectrpc/examples/kotlin/Main.kt +++ b/examples/kotlin-google-javalite/src/main/kotlin/com/connectrpc/examples/kotlin/Main.kt @@ -22,7 +22,6 @@ import com.connectrpc.impl.ProtocolClient import com.connectrpc.okhttp.ConnectOkHttpClient import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext import okhttp3.OkHttpClient import java.time.Duration @@ -44,6 +43,9 @@ class Main { ProtocolClientConfig( host = host, serializationStrategy = GoogleJavaLiteProtobufStrategy(), + // RPC operations that involve network I/O will + // use this coroutine context. + ioCoroutineContext = Dispatchers.IO, ), ) val elizaServiceClient = ElizaServiceClient(client) @@ -57,13 +59,11 @@ class Main { private suspend fun connectStreaming(elizaServiceClient: ElizaServiceClient) { val stream = elizaServiceClient.converse() - withContext(Dispatchers.IO) { - // Add the message the user is sending to the views. - stream.send(converseRequest { sentence = "hello" }) - stream.sendClose() - for (response in stream.responseChannel()) { - println(response.sentence) - } + // Add the message the user is sending to the views. + stream.send(converseRequest { sentence = "hello" }) + stream.sendClose() + for (response in stream.responseChannel()) { + println(response.sentence) } } } From 759b015163a41c391c79ec6da13e85baa38faf4f Mon Sep 17 00:00:00 2001 From: Joshua Humphries <2035234+jhump@users.noreply.github.com> Date: Thu, 8 Feb 2024 16:42:38 -0500 Subject: [PATCH 3/3] Fix typos in comment --- .../main/kotlin/com/connectrpc/ProtocolClientConfig.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt b/library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt index 84b6836c..87cc6bb3 100644 --- a/library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt +++ b/library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt @@ -48,10 +48,10 @@ class ProtocolClientConfig @JvmOverloads constructor( compressionPools: List = listOf(GzipCompressionPool), // The coroutine context to use for I/O, such as sending RPC messages. // If null, the current/calling coroutine context is used. So the caller - // may need explicitly dispatch send calls using contexts where I/O is - // appropriate (using the withContext extension function). If non-null - // (such as Dispatchers.IO), operations that involve I/O pr other - // blocking will automatically be dispatched using the biven context, + // may need to explicitly dispatch send calls using contexts where I/O + // is appropriate (using the withContext extension function). If non-null + // (such as Dispatchers.IO), operations that involve I/O or other + // blocking will automatically be dispatched using the given context, // so the caller does not need to worry about it. val ioCoroutineContext: CoroutineContext? = null, ) {