Skip to content

Commit

Permalink
auto-dispatch to I/O context
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump committed Feb 6, 2024
1 parent 3f69420 commit 897ec70
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,12 @@ internal class TracingHTTPClient(
return res
}

override fun sendClose() {
override suspend fun sendClose() {
printer.printlnWithStackTrace("Half-closing stream")
delegate.sendClose()
}

override fun receiveClose() {
override suspend fun receiveClose() {
printer.printlnWithStackTrace("Closing stream")
delegate.receiveClose()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ interface BidirectionalStreamInterface<Input, Output> {
/**
* Close the send stream. No calls to [send] are valid after calling [sendClose].
*/
fun sendClose()
suspend fun sendClose()

/**
* Close the receive stream.
*/
fun receiveClose()
suspend fun receiveClose()

/**
* Determine if the underlying client send stream is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ interface ClientOnlyStreamInterface<Input, Output> {
/**
* Close the stream. No calls to [send] are valid after calling [sendClose].
*/
fun sendClose()
suspend fun sendClose()

/**
* Cancels the stream. This closes both send and receive sides of the stream
* without awaiting any server reply.
*/
fun cancel()
suspend fun cancel()

/**
* Determine if the underlying client send stream is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.connectrpc.protocols.GRPCInterceptor
import com.connectrpc.protocols.GRPCWebInterceptor
import com.connectrpc.protocols.NetworkProtocol
import java.net.URI
import kotlin.coroutines.CoroutineContext

/**
* Set of configuration used to set up clients.
Expand All @@ -45,6 +46,14 @@ class ProtocolClientConfig @JvmOverloads constructor(
// Compression pools that provide support for the provided `compressionName`, as well as any
// other compression methods that need to be supported for inbound responses.
compressionPools: List<CompressionPool> = listOf(GzipCompressionPool),
// The coroutine context to use for I/O, such as sending RPC messages.
// If null, the current/calling coroutine context is used. So the caller
// may need explicitly dispatch send calls using contexts where I/O is
// appropriate (using the withContext extension function). If non-null
// (such as Dispatchers.IO), operations that involve I/O pr other
// blocking will automatically be dispatched using the biven context,
// so the caller does not need to worry about it.
val ioCoroutineContext: CoroutineContext? = null,
) {
private val internalInterceptorFactoryList = mutableListOf<(ProtocolClientConfig) -> Interceptor>()
private val compressionPools = mutableMapOf<String, CompressionPool>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ interface ServerOnlyStreamInterface<Input, Output> {
/**
* Close the receive stream.
*/
fun receiveClose()
suspend fun receiveClose()

/**
* Determine if the underlying client receive stream is closed.
Expand Down
90 changes: 0 additions & 90 deletions library/src/main/kotlin/com/connectrpc/http/HTTPClientInterface.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package com.connectrpc.http

import com.connectrpc.StreamResult
import okio.Buffer
import java.util.concurrent.atomic.AtomicBoolean

typealias Cancelable = () -> Unit

Expand Down Expand Up @@ -46,92 +45,3 @@ interface HTTPClientInterface {
*/
fun stream(request: HTTPRequest, duplex: Boolean, onResult: suspend (StreamResult<Buffer>) -> Unit): Stream
}

interface Stream {
suspend fun send(buffer: Buffer): Result<Unit>

fun sendClose()

fun receiveClose()

fun isSendClosed(): Boolean

fun isReceiveClosed(): Boolean
}

fun Stream(
onSend: suspend (Buffer) -> Result<Unit>,
onSendClose: () -> Unit = {},
onReceiveClose: () -> Unit = {},
): Stream {
val isSendClosed = AtomicBoolean()
val isReceiveClosed = AtomicBoolean()
return object : Stream {
override suspend fun send(buffer: Buffer): Result<Unit> {
if (isSendClosed()) {
return Result.failure(IllegalStateException("cannot send. underlying stream is closed"))
}
return try {
onSend(buffer)
} catch (e: Throwable) {
Result.failure(e)
}
}

override fun sendClose() {
if (isSendClosed.compareAndSet(false, true)) {
onSendClose()
}
}

override fun receiveClose() {
if (isReceiveClosed.compareAndSet(false, true)) {
try {
onReceiveClose()
} finally {
// When receive side is closed, the send side is
// implicitly closed as well.
// We don't use sendClose() because we don't want to
// invoke onSendClose() since that will try to actually
// half-close the HTTP stream, which will fail since
// closing the receive side cancels the entire thing.
isSendClosed.set(true)
}
}
}

override fun isSendClosed(): Boolean {
return isSendClosed.get()
}

override fun isReceiveClosed(): Boolean {
return isReceiveClosed.get()
}
}
}

/**
* Returns a new stream that applies the given function to each
* buffer when send is called. The result of that function is
* what is passed along to the original stream.
*/
fun Stream.transform(apply: (Buffer) -> Buffer): Stream {
val delegate = this
return object : Stream {
override suspend fun send(buffer: Buffer): Result<Unit> {
return delegate.send(apply(buffer))
}
override fun sendClose() {
delegate.sendClose()
}
override fun receiveClose() {
delegate.receiveClose()
}
override fun isSendClosed(): Boolean {
return delegate.isSendClosed()
}
override fun isReceiveClosed(): Boolean {
return delegate.isReceiveClosed()
}
}
}
145 changes: 145 additions & 0 deletions library/src/main/kotlin/com/connectrpc/http/Stream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2022-2023 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.connectrpc.http

import kotlinx.coroutines.withContext
import okio.Buffer
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext

/**
* Stream represents the communications for a single streaming RPC.
* It can be used to send messages and to close the stream. Receiving
* messages is done via callbacks provided when the stream is created.
*
* See HTTPClientInterface#stream.
*/
interface Stream {
suspend fun send(buffer: Buffer): Result<Unit>

suspend fun sendClose()

suspend fun receiveClose()

fun isSendClosed(): Boolean

fun isReceiveClosed(): Boolean
}

/**
* Creates a new stream whose implementation of sending and
* closing is delegated to the given lambdas.
*/
fun Stream(
onSend: suspend (Buffer) -> Result<Unit>,
onSendClose: suspend () -> Unit = {},
onReceiveClose: suspend () -> Unit = {},
): Stream {
val isSendClosed = AtomicBoolean()
val isReceiveClosed = AtomicBoolean()
return object : Stream {
override suspend fun send(buffer: Buffer): Result<Unit> {
if (isSendClosed()) {
return Result.failure(IllegalStateException("cannot send. underlying stream is closed"))
}
return try {
onSend(buffer)
} catch (e: Throwable) {
Result.failure(e)
}
}

override suspend fun sendClose() {
if (isSendClosed.compareAndSet(false, true)) {
onSendClose()
}
}

override suspend fun receiveClose() {
if (isReceiveClosed.compareAndSet(false, true)) {
try {
onReceiveClose()
} finally {
// When receive side is closed, the send side is
// implicitly closed as well.
// We don't use sendClose() because we don't want to
// invoke onSendClose() since that will try to actually
// half-close the HTTP stream, which will fail since
// closing the receive side cancels the entire thing.
isSendClosed.set(true)
}
}
}

override fun isSendClosed(): Boolean {
return isSendClosed.get()
}

override fun isReceiveClosed(): Boolean {
return isReceiveClosed.get()
}
}
}

/**
* Returns a new stream that applies the given function to each
* buffer when send is called. The result of that function is
* what is passed along to the original stream.
*/
fun Stream.transform(apply: (Buffer) -> Buffer): Stream {
val delegate = this
return object : Stream {
override suspend fun send(buffer: Buffer): Result<Unit> {
return delegate.send(apply(buffer))
}
override suspend fun sendClose() {
delegate.sendClose()
}
override suspend fun receiveClose() {
delegate.receiveClose()
}
override fun isSendClosed(): Boolean {
return delegate.isSendClosed()
}
override fun isReceiveClosed(): Boolean {
return delegate.isReceiveClosed()
}
}
}

/**
* Returns a new stream that dispatches suspending operations
* (sending and closing) using the given coroutine context.
*/
fun Stream.dispatchIn(context: CoroutineContext): Stream {
val delegate = this
return object : Stream {
override suspend fun send(buffer: Buffer): Result<Unit> = withContext(context) {
delegate.send(buffer)
}
override suspend fun sendClose() = withContext(context) {
delegate.sendClose()
}
override suspend fun receiveClose() = withContext(context) {
delegate.receiveClose()
}
override fun isSendClosed(): Boolean {
return delegate.isSendClosed()
}
override fun isReceiveClosed(): Boolean {
return delegate.isReceiveClosed()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ internal class BidirectionalStream<Input, Output>(
return stream.isReceiveClosed()
}

override fun sendClose() {
override suspend fun sendClose() {
stream.sendClose()
}

override fun receiveClose() {
override suspend fun receiveClose() {
stream.receiveClose()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ internal class ClientOnlyStream<Input, Output>(
return messageStream.responseTrailers()
}

override fun sendClose() {
override suspend fun sendClose() {
return messageStream.sendClose()
}

override fun cancel() {
override suspend fun cancel() {
return messageStream.receiveClose()
}

Expand Down
Loading

0 comments on commit 897ec70

Please sign in to comment.