Skip to content

Commit

Permalink
Split stream close functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
buildbreaker committed Jul 31, 2023
1 parent fab9e07 commit d001fef
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ package build.buf.connect
* eventually receives a response) that can send request messages and initiate closes.
*/
interface ClientOnlyStreamInterface<Input, Output> {
/**
*
*/
suspend fun receiveAndClose(): StreamResult<Output>
/**
* Send a request to the server over the stream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ interface HTTPClientInterface {

class Stream(
private val onSend: (Buffer) -> Unit,
private val onClose: () -> Unit
private val onSendClose: () -> Unit = {},
private val onReceiveClose: () -> Unit = {}
) {
private val isClosed = AtomicReference(false)
private val isSendClosed = AtomicReference(false)
private val isReceiveClosed = AtomicReference(false)

fun send(buffer: Buffer): Result<Unit> {
if (isClosed()) {
Expand All @@ -65,13 +67,19 @@ class Stream(
}
}

fun close() {
if (!isClosed.getAndSet(true)) {
onClose()
fun sendClose() {
if (!isSendClosed.getAndSet(true)) {
onSendClose()
}
}

fun receiveClose() {
if (!isReceiveClosed.getAndSet(true)) {
onReceiveClose()
}
}

fun isClosed(): Boolean {
return isClosed.get()
return isSendClosed.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal class BidirectionalStream<Input, Output>(
}

override fun close() {
stream.close()
stream.sendClose()
}

override fun isClosed(): Boolean {
Expand Down
10 changes: 10 additions & 0 deletions library/src/main/kotlin/build/buf/connect/impl/ClientOnlyStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package build.buf.connect.impl

import build.buf.connect.BidirectionalStreamInterface
import build.buf.connect.ClientOnlyStreamInterface
import build.buf.connect.StreamResult

/**
* Concrete implementation of [ClientOnlyStreamInterface].
Expand All @@ -27,6 +28,15 @@ internal class ClientOnlyStream<Input, Output>(
return messageStream.send(input)
}

override suspend fun receiveAndClose(): StreamResult<Output> {
val resultChannel = messageStream.resultChannel()
try {
return resultChannel.receive()
} finally {
resultChannel.cancel()
}
}

override fun close() {
messageStream.close()
}
Expand Down
19 changes: 10 additions & 9 deletions library/src/main/kotlin/build/buf/connect/impl/ProtocolClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,19 @@ class ProtocolClient(
channel.send(result)
}
continuation.invokeOnCancellation {
httpStream.close()
httpStream.sendClose()
}
val stream = Stream(
onSend = { buffer ->
httpStream.send(streamFunc.requestBodyFunction(buffer))
}
)
channel.invokeOnClose {
stream.receiveClose()
}
continuation.resume(
BidirectionalStream(
Stream(
onSend = { buffer ->
httpStream.send(streamFunc.requestBodyFunction(buffer))
},
onClose = {
httpStream.close()
}
),
stream,
requestCodec,
channel
)
Expand Down
23 changes: 11 additions & 12 deletions okhttp/src/main/kotlin/build/buf/connect/okhttp/OkHttpStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ internal fun OkHttpClient.initializeStream(
request: HTTPRequest,
onResult: suspend (StreamResult<Buffer>) -> Unit
): Stream {
val isClosed = AtomicBoolean(false)
val isSendClosed = AtomicBoolean(false)
val isReceiveClosed = AtomicBoolean(false)
val duplexRequestBody = PipeDuplexRequestBody(request.contentType.toMediaType())
val builder = Request.Builder()
.url(request.url)
Expand All @@ -60,23 +61,21 @@ internal fun OkHttpClient.initializeStream(
}
val callRequest = builder.build()
val call = newCall(callRequest)
call.enqueue(ResponseCallback(onResult, isClosed))
call.enqueue(ResponseCallback(onResult, isSendClosed))
return Stream(
onSend = { buffer ->
if (!isClosed.get()) {
if (!isSendClosed.get()) {
duplexRequestBody.forConsume(buffer)
}
},
onClose = {
try {
isClosed.set(true)
call.cancel()
duplexRequestBody.close()
} catch (_: Throwable) {
// No-op
}
onSendClose = {
isSendClosed.set(true)
duplexRequestBody.close()
}
)
) {
isReceiveClosed.set(true)
call.cancel()
}
}

private class ResponseCallback(
Expand Down

0 comments on commit d001fef

Please sign in to comment.