Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout enforcement to ProtocolClient #276

Merged
merged 4 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import com.connectrpc.protocols.NetworkProtocol
import java.net.URI
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration

typealias TimeoutOracle = (MethodSpec<*, *>) -> Duration?

Expand Down Expand Up @@ -81,7 +83,10 @@ class ProtocolClientConfig @JvmOverloads constructor(
// is returned, the entire call must complete before it elapses. If the
// call is still active at the end of the timeout period, it is cancelled
// and will result in an exception with a Code.DEADLINE_EXCEEDED code.
val timeoutOracle: TimeoutOracle = { null },
//
// The default oracle, if not configured, returns a 10 second timeout for
// all operations.
val timeoutOracle: TimeoutOracle = { 10.toDuration(DurationUnit.SECONDS) },
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to account for removal of the timeouts in the configuration helper below. So if users use ConnectOkHttpClient.configureClient and forget to set timeouts in this config, they get the same default behavior as the OkHttpClient was providing (except that this default applies to bidirectional streams, whereas the OkHttpClient timeouts do not).

// Schedules timeout actions.
val timeoutScheduler: Timeout.Scheduler = Timeout.DEFAULT_SCHEDULER,
) {
Expand Down
42 changes: 27 additions & 15 deletions okhttp/src/main/kotlin/com/connectrpc/okhttp/ConnectOkHttpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,34 @@ import okio.BufferedSink
import java.io.IOException
import java.io.InterruptedIOException
import java.net.SocketTimeoutException
import java.time.Duration

/**
* The OkHttp implementation of HTTPClientInterface.
*/
class ConnectOkHttpClient @JvmOverloads constructor(
unaryClient: OkHttpClient = OkHttpClient(),
private val unaryClient: OkHttpClient = OkHttpClient(),
// TODO: remove this; a separate client is only useful for configuring separate timeouts,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not just for timeouts IIRC - you may also want to configure pings on clients for long lived streams but not on ones for unary calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's some background on the two clients (aside from timeouts which will be much better behaved with this PR): #13

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear why you'd want PINGs on one client and not the other. If you have created a connection that uses PINGs, just use that same client for both unary and stream traffic. I don't see any downside to that. It wouldn't increase the PING activity since you're not creating two clients that both use PING -- but instead using the PINGing client for all RPCs.

As far as the read timeout being different between unary and streams, that is now even more configurable (based on MethodSpec) via the timeout oracle. So you can set the read timeout to zero (or use ConnectOkHttpClient.configureClient) and it should be appropriate for both unary and stream traffic.

@kohenkatz, please take a look at this thread and PR and let me know what you think. Note that this PR is not removing the second client -- not yet. But I don't think it will be needed anymore and would appreciate your input, like if you see other reasons to keep the second client in future versions.

// but that can be done in the ProtocolClientConfig instead of the HTTP client.
streamClient: OkHttpClient = unaryClient,
private val streamClient: OkHttpClient = unaryClient,
) : HTTPClientInterface {
private val unaryClient = applyNetworkInterceptor(unaryClient)
private val streamClient = applyNetworkInterceptor(streamClient)
companion object {
// Configures the given OkHttpClient builder to be more appropriate for
// use with RPC. This removes the read and write timeouts (which both default
// to 10 seconds in OkHttp) so that timeouts can instead be enforced by the
// protocol implementation, via the timeout oracle supplied in ProtocolClientConfig.
// This also disables OkHttp's auto-retry handling of 408 status codes since older
// Connect servers may use that code with error responses where the error code is
// "canceled" or "deadline exceeded".
fun configureClient(client: OkHttpClient.Builder): OkHttpClient.Builder {
return applyNetworkInterceptor(client)
.callTimeout(Duration.ZERO) // defaults to zero already, but just in case it's set...
.readTimeout(Duration.ZERO)
.writeTimeout(Duration.ZERO)
}

private fun applyNetworkInterceptor(client: OkHttpClient): OkHttpClient {
return client.newBuilder()
.addNetworkInterceptor(object : Interceptor {
private fun applyNetworkInterceptor(client: OkHttpClient.Builder): OkHttpClient.Builder {
return client.addNetworkInterceptor(object : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
val resp = chain.proceed(chain.request())
// The Connect protocol spec currently suggests 408 as the HTTP status code
Expand All @@ -83,15 +95,15 @@ class ConnectOkHttpClient @JvmOverloads constructor(
return resp
}
})
.build()
}
}

private fun isConnectUnary(req: Request): Boolean {
return when (req.method) {
"POST" -> req.headers[CONNECT_PROTOCOL_VERSION_KEY].orEmpty() == CONNECT_PROTOCOL_VERSION_VALUE &&
req.headers["Content-Type"].orEmpty().startsWith("application/")
"GET" -> req.url.queryParameter(GETConstants.CONNECT_VERSION_QUERY_PARAM_KEY) == GETConstants.CONNECT_VERSION_QUERY_PARAM_VALUE
else -> false
private fun isConnectUnary(req: Request): Boolean {
return when (req.method) {
"POST" -> req.headers[CONNECT_PROTOCOL_VERSION_KEY].orEmpty() == CONNECT_PROTOCOL_VERSION_VALUE &&
req.headers["Content-Type"].orEmpty().startsWith("application/")
"GET" -> req.url.queryParameter(GETConstants.CONNECT_VERSION_QUERY_PARAM_KEY) == GETConstants.CONNECT_VERSION_QUERY_PARAM_VALUE
else -> false
}
}
}

Expand Down