Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout enforcement to ProtocolClient #276

Merged
merged 4 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.v1.ConformanceServiceClient
import com.connectrpc.conformance.v1.ServerStreamRequest
import com.connectrpc.conformance.v1.ServerStreamResponse
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout

class JavaServerStreamClient(
private val client: ConformanceServiceClient,
Expand All @@ -33,6 +35,21 @@ class JavaServerStreamClient(
try {
sendResult = stream.sendAndClose(req)
if (sendResult.isFailure) {
// It can't be because stream.sendClose was already closed. So the operation
// must have already failed. Extract the reason via a call to receive. But
// if something is awry, don't block forever on the receive call.
try {
withTimeout(50) {
// Waits up to 50 milliseconds.
stream.responseChannel().receive()
}
} catch (_: TimeoutCancellationException) {
// Receive did not complete :(
} catch (ex: Throwable) {
throw ex
}
// Either receive did not complete or it did not fail (which
// shouldn't actually be possible).
throw sendResult.exceptionOrNull()!!
}
} catch (ex: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.v1.ConformanceServiceClient
import com.connectrpc.conformance.v1.ServerStreamRequest
import com.connectrpc.conformance.v1.ServerStreamResponse
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.withTimeout

class JavaLiteServerStreamClient(
private val client: ConformanceServiceClient,
Expand All @@ -33,6 +35,21 @@ class JavaLiteServerStreamClient(
try {
sendResult = stream.sendAndClose(req)
if (sendResult.isFailure) {
// It can't be because stream.sendClose was already closed. So the operation
// must have already failed. Extract the reason via a call to receive. But
// if something is awry, don't block forever on the receive call.
try {
withTimeout(50) {
// Waits up to 50 milliseconds.
stream.responseChannel().receive()
}
} catch (_: TimeoutCancellationException) {
// Receive did not complete :(
} catch (ex: Throwable) {
throw ex
}
// Either receive did not complete or it did not fail (which
// shouldn't actually be possible).
throw sendResult.exceptionOrNull()!!
}
} catch (ex: Throwable) {
Expand Down
9 changes: 1 addition & 8 deletions conformance/client/known-failing-stream-cases.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1 @@
# We currently rely on OkHttp's "call timeout" to handle
# RPC deadlines, but that is not enforced when the request
# body is duplex. So timeouts don't currently work with
# bidi streams.
Timeouts/HTTPVersion:2/**/bidi-stream/**

# Deadline headers are not currently set.
Deadline Propagation/**
# Currently there are zero failing tests.
3 changes: 1 addition & 2 deletions conformance/client/known-failing-unary-cases.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
# Deadline headers are not currently set.
Deadline Propagation/**
# Currently there are zero failing tests.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import com.connectrpc.conformance.client.adapt.Invoker
import com.connectrpc.conformance.client.adapt.ResponseStream
import com.connectrpc.conformance.client.adapt.ServerStreamClient
import com.connectrpc.conformance.client.adapt.UnaryClient
import com.connectrpc.http.Cancelable
import com.connectrpc.http.HTTPClientInterface
import com.connectrpc.http.Timeout
import com.connectrpc.impl.ProtocolClient
import com.connectrpc.okhttp.ConnectOkHttpClient
import com.connectrpc.protocols.GETConfiguration
Expand All @@ -57,6 +59,8 @@ import java.security.spec.PKCS8EncodedKeySpec
import java.time.Duration
import java.util.Base64
import kotlin.reflect.cast
import kotlin.time.DurationUnit
import kotlin.time.toDuration

/**
* The conformance client. This contains the logic for invoking an
Expand Down Expand Up @@ -441,9 +445,6 @@ class Client(
val certs = certs(req)
clientBuilder = clientBuilder.sslSocketFactory(certs.sslSocketFactory(), certs.trustManager)
}
if (req.timeoutMs != 0) {
clientBuilder = clientBuilder.callTimeout(Duration.ofMillis(req.timeoutMs.toLong()))
}
// TODO: need to support max receive bytes and use req.receiveLimitBytes
val getConfig = if (req.useGetHttpMethod) GETConfiguration.Enabled else GETConfiguration.Disabled
val requestCompression =
Expand All @@ -458,11 +459,25 @@ class Client(
} else {
emptyList()
}
val httpClient = clientBuilder.build()
val httpClient = ConnectOkHttpClient.configureClient(clientBuilder).build()
var connectHttpClient: HTTPClientInterface = ConnectOkHttpClient(httpClient)
args.verbose.withPrefix("http client interface: ").verbosity(3) {
connectHttpClient = TracingHTTPClient(connectHttpClient, this)
}
var timeoutScheduler = Timeout.DEFAULT_SCHEDULER
args.verbose.verbosity(3) {
val verbosePrinter = this
timeoutScheduler = object : Timeout.Scheduler {
override fun scheduleTimeout(delay: kotlin.time.Duration, action: Cancelable): Timeout {
verbosePrinter.println("Scheduling timeout in $delay...")
val timeout = Timeout.DEFAULT_SCHEDULER.scheduleTimeout(delay) {
verbosePrinter.println("Timeout elapsed! Cancelling...")
action()
}
return timeout
}
}
}
return Pair(
httpClient,
ProtocolClient(
Expand All @@ -474,6 +489,14 @@ class Client(
getConfiguration = getConfig,
requestCompression = requestCompression,
compressionPools = compressionPools,
timeoutScheduler = timeoutScheduler,
timeoutOracle = {
if (req.timeoutMs == 0) {
null
} else {
req.timeoutMs.toDuration(DurationUnit.MILLISECONDS)
}
},
),
),
)
Expand Down
35 changes: 35 additions & 0 deletions library/src/main/kotlin/com/connectrpc/ProtocolClientConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,42 @@ package com.connectrpc

import com.connectrpc.compression.CompressionPool
import com.connectrpc.compression.GzipCompressionPool
import com.connectrpc.http.Timeout
import com.connectrpc.protocols.ConnectInterceptor
import com.connectrpc.protocols.GETConfiguration
import com.connectrpc.protocols.GRPCInterceptor
import com.connectrpc.protocols.GRPCWebInterceptor
import com.connectrpc.protocols.NetworkProtocol
import java.net.URI
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration

typealias TimeoutOracle = (MethodSpec<*, *>) -> Duration?

/**
* Returns an oracle that provides the given timeouts for unary or stream
* operations, respectively.
*/
fun simpleTimeouts(unaryTimeout: Duration?, streamTimeout: Duration?): TimeoutOracle {
return { methodSpec ->
when (methodSpec.streamType) {
StreamType.UNARY -> unaryTimeout
else -> streamTimeout
}
}
}

/**
* Set of configuration used to set up clients.
*/
class ProtocolClientConfig @JvmOverloads constructor(
// TODO: Use a block-based construction pattern instead of JvmOverloads
// so we can add new fields in the future without having to worry
// about their ordering or potentially breaking compatibility with
// already-compiled byte code.

// The host (e.g., https://connectrpc.com).
val host: String,
// The client to use for performing requests.
Expand All @@ -54,6 +78,17 @@ class ProtocolClientConfig @JvmOverloads constructor(
// blocking will automatically be dispatched using the given context,
// so the caller does not need to worry about it.
val ioCoroutineContext: CoroutineContext? = null,
// A function that is consulted to determine timeouts for each RPC. If
// the function returns null, no timeout is applied. If a non-null value
// is returned, the entire call must complete before it elapses. If the
// call is still active at the end of the timeout period, it is cancelled
// and will result in an exception with a Code.DEADLINE_EXCEEDED code.
//
// The default oracle, if not configured, returns a 10 second timeout for
// all operations.
val timeoutOracle: TimeoutOracle = { 10.toDuration(DurationUnit.SECONDS) },
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to account for removal of the timeouts in the configuration helper below. So if users use ConnectOkHttpClient.configureClient and forget to set timeouts in this config, they get the same default behavior as the OkHttpClient was providing (except that this default applies to bidirectional streams, whereas the OkHttpClient timeouts do not).

// Schedules timeout actions.
val timeoutScheduler: Timeout.Scheduler = Timeout.DEFAULT_SCHEDULER,
) {
private val internalInterceptorFactoryList = mutableListOf<(ProtocolClientConfig) -> Interceptor>()
private val compressionPools = mutableMapOf<String, CompressionPool>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.connectrpc.http
import com.connectrpc.StreamResult
import okio.Buffer

/** A function that cancels an operation when called. */
typealias Cancelable = () -> Unit

/**
Expand Down
13 changes: 12 additions & 1 deletion library/src/main/kotlin/com/connectrpc/http/HTTPRequest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.connectrpc.Headers
import com.connectrpc.MethodSpec
import okio.Buffer
import java.net.URL
import kotlin.time.Duration

enum class HTTPMethod(
val string: String,
Expand All @@ -34,6 +35,8 @@ open class HTTPRequest internal constructor(
val url: URL,
// Value to assign to the `content-type` header.
val contentType: String,
// The optional timeout for this request.
val timeout: Duration?,
// Additional outbound headers for the request.
val headers: Headers,
// The method spec associated with the request.
Expand All @@ -51,6 +54,8 @@ fun HTTPRequest.clone(
url: URL = this.url,
// Value to assign to the `content-type` header.
contentType: String = this.contentType,
// The optional timeout for this request.
timeout: Duration? = this.timeout,
// Additional outbound headers for the request.
headers: Headers = this.headers,
// The method spec associated with the request.
Expand All @@ -59,6 +64,7 @@ fun HTTPRequest.clone(
return HTTPRequest(
url,
contentType,
timeout,
headers,
methodSpec,
)
Expand All @@ -73,6 +79,8 @@ class UnaryHTTPRequest(
url: URL,
// Value to assign to the `content-type` header.
contentType: String,
// The optional timeout for this request.
timeout: Duration?,
// Additional outbound headers for the request.
headers: Headers,
// The method spec associated with the request.
Expand All @@ -82,13 +90,15 @@ class UnaryHTTPRequest(
// HTTP method to use with the request.
// Almost always POST, but side effect free unary RPCs may be made with GET.
val httpMethod: HTTPMethod = HTTPMethod.POST,
) : HTTPRequest(url, contentType, headers, methodSpec)
) : HTTPRequest(url, contentType, timeout, headers, methodSpec)

fun UnaryHTTPRequest.clone(
// The URL for the request.
url: URL = this.url,
// Value to assign to the `content-type` header.
contentType: String = this.contentType,
// The optional timeout for this request.
timeout: Duration? = this.timeout,
// Additional outbound headers for the request.
headers: Headers = this.headers,
// The method spec associated with the request.
Expand All @@ -101,6 +111,7 @@ fun UnaryHTTPRequest.clone(
return UnaryHTTPRequest(
url,
contentType,
timeout,
headers,
methodSpec,
message,
Expand Down
86 changes: 86 additions & 0 deletions library/src/main/kotlin/com/connectrpc/http/Timeout.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2022-2023 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.connectrpc.http

import kotlinx.coroutines.delay
import java.util.Timer
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.concurrent.timerTask
import kotlin.time.Duration

/**
* Represents the timeout state for an RPC.
*/
class Timeout private constructor(
private val timeoutAction: Cancelable,
) {
private val done = AtomicBoolean(false)

@Volatile private var triggered: Boolean = false
private var onCancel: Cancelable? = null

/** Returns true if this timeout has lapsed and the associated RPC canceled. */
val timedOut: Boolean
get() = triggered

/**
* Cancels the timeout. Should only be called when the RPC completes before the
* timeout elapses. Returns true if the timeout was canceled or false if either
* it was already previously canceled or has already timed out. The `timedOut`
* property can be queried to distinguish between these two possibilities.
*/
fun cancel(): Boolean {
if (done.compareAndSet(false, true)) {
onCancel?.invoke()
return true
}
return false
}

private fun trigger() {
if (done.compareAndSet(false, true)) {
triggered = true
timeoutAction()
}
}

/** Schedules timeouts for RPCs. */
interface Scheduler {
/**
* Schedules a timeout that should invoke the given action to cancel
* an RPC after the given delay.
*/

fun scheduleTimeout(delay: Duration, action: Cancelable): Timeout
}

companion object {
/**
* A default implementation that a Timer backed by a single daemon thread.
* The thread isn't started until the first cancelation is scheduled.
*/
val DEFAULT_SCHEDULER = object : Scheduler {
override fun scheduleTimeout(delay: Duration, action: Cancelable): Timeout {
val timeout = Timeout(action)
val task = timerTask { timeout.trigger() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works but we might consider tradeoffs vs. ScheduledThreadPoolExecutor. Probably not a big deal for timeout scheduling.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change it. When I was searching, trying to figure out the idiomatic way to do this in Kotlin and Android apps, this was cited more than use of a ScheduledExecutorService. There were also a couple of Android-specific ways to do it, and then there was the coroutine way (that I couldn't get to work correctly -- to just create a coroutine and then delay(...) in the coroutine before executing the action).

For this, there's not really much of a tradeoff -- both solutions are roughly equivalent. Both approaches require creation of a heavyweight thread. Both implementations are similar, using a thread that polls a priority queue and then executes the tasks. The only potentially meaningful difference is that ScheduledExecutorService impls use a DelayQueue and the stuff in java.util.concurrent whereas the Timer just uses intrinsic locks and Object.wait and Object.notify.

But it's super easy to switch if you think that's more appropriate.

timer.value.schedule(task, delay.inWholeMilliseconds)
timeout.onCancel = { task.cancel() }
return timeout
}
}

private val timer = lazy { Timer(Scheduler::class.qualifiedName, true) }
}
}
Loading