Skip to content

Commit

Permalink
Split ktor transports implementation (#219)
Browse files Browse the repository at this point in the history
* make `rsocket-transport-ktor-*` modules just provide standalone transport implementations
* introduce `rsocket-ktor-client/server` modules with plugins and future more deep ktor integrations
  • Loading branch information
olme04 authored Apr 13, 2022
1 parent ea22902 commit 9198a98
Show file tree
Hide file tree
Showing 18 changed files with 394 additions and 160 deletions.
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" }
ktor-client-okhttp = { module = "io.ktor:ktor-client-okhttp", version.ref = "ktor" }
ktor-client-darwin = { module = "io.ktor:ktor-client-darwin", version.ref = "ktor" }
ktor-server-core = { module = "io.ktor:ktor-server-core", version.ref = "ktor" }
ktor-server-host-common = { module = "io.ktor:ktor-server-host-common", version.ref = "ktor" }
ktor-server-websockets = { module = "io.ktor:ktor-server-websockets", version.ref = "ktor" }
ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor" }
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@
* limitations under the License.
*/

package io.rsocket.kotlin.transport.ktor.websocket.server

import io.ktor.server.application.*
import io.ktor.server.routing.*
import io.rsocket.kotlin.*
import kotlinx.coroutines.*

@OptIn(DelicateCoroutinesApi::class)
public fun Route.rSocket(
path: String? = null,
protocol: String? = null,
acceptor: ConnectionAcceptor,
) {
val serverTransport = serverTransport(path, protocol)
val server = application.plugin(RSocketSupport).server
plugins {
rsocket.template.library
}

server.bind(serverTransport, acceptor)
kotlin {
configureCommon {
main {
dependencies {
api(projects.rsocketCore)
api(projects.rsocketTransportKtor.rsocketTransportKtorWebsocket)
//TODO ContentNegotiation will be here later
}
}
}
configureJvm()
configureJs()
configureNative()
}

description = "Ktor RSocket integration"
35 changes: 35 additions & 0 deletions rsocket-ktor/rsocket-ktor-client/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
rsocket.template.library
}

kotlin {
configureCommon {
main {
dependencies {
api(projects.rsocketKtor)
api(libs.ktor.client.websockets)
}
}
}
configureJvm()
configureJs()
configureNative()
}

description = "Ktor Client RSocket plugin"
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.ktor.client

import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.*
import kotlin.coroutines.*

public suspend fun HttpClient.rSocket(
request: HttpRequestBuilder.() -> Unit,
): RSocket = plugin(RSocketSupport).run {
connector.connect(KtorClientTransport(this@rSocket, request, bufferPool))
}

public suspend fun HttpClient.rSocket(
urlString: String,
secure: Boolean = false,
request: HttpRequestBuilder.() -> Unit = {},
): RSocket = rSocket {
url {
this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS
this.port = protocol.defaultPort
takeFrom(urlString)
}
request()
}

public suspend fun HttpClient.rSocket(
host: String? = null,
port: Int? = null,
path: String? = null,
secure: Boolean = false,
request: HttpRequestBuilder.() -> Unit = {},
): RSocket = rSocket {
url {
this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS
this.port = protocol.defaultPort
set(host = host, port = port, path = path)
}
request()
}

private class KtorClientTransport(
private val client: HttpClient,
private val request: HttpRequestBuilder.() -> Unit,
private val pool: ObjectPool<ChunkBuffer>
) : ClientTransport {
override val coroutineContext: CoroutineContext get() = client.coroutineContext

@TransportApi
override suspend fun connect(): Connection = WebSocketConnection(client.webSocketSession(request), pool)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,34 @@
* limitations under the License.
*/

package io.rsocket.kotlin.transport.ktor.websocket.client
package io.rsocket.kotlin.ktor.client

import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.plugins.websocket.*
import io.ktor.util.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.core.*

public class RSocketSupport(
public class RSocketSupport private constructor(
internal val connector: RSocketConnector,
internal val bufferPool: ObjectPool<ChunkBuffer>
) {

public class Config internal constructor() {
public var bufferPool: ObjectPool<ChunkBuffer> = ChunkBuffer.Pool
public var connector: RSocketConnector = RSocketConnector()
public fun connector(block: RSocketConnectorBuilder.() -> Unit) {
connector = RSocketConnector(block)
}
}

public companion object Feature : HttpClientPlugin<Config, RSocketSupport> {
public companion object Plugin : HttpClientPlugin<Config, RSocketSupport> {
override val key: AttributeKey<RSocketSupport> = AttributeKey("RSocket")
override fun prepare(block: Config.() -> Unit): RSocketSupport {
val connector = Config().apply(block).connector
return RSocketSupport(connector)
override fun prepare(block: Config.() -> Unit): RSocketSupport = Config().run {
block()
RSocketSupport(connector, bufferPool)
}

override fun install(plugin: RSocketSupport, scope: HttpClient) {
Expand Down
42 changes: 42 additions & 0 deletions rsocket-ktor/rsocket-ktor-server/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
rsocket.template.library
}

kotlin {
configureCommon {
main {
dependencies {
api(projects.rsocketKtor)
api(libs.ktor.server.websockets)
}
}
test {
dependencies {
implementation(projects.rsocketKtor.rsocketKtorClient)
implementation(projects.rsocketTransportTests) //port provider
implementation(libs.ktor.client.cio)
implementation(libs.ktor.server.cio)
}
}
}
configureJvm()
configureNative(NativeTargets.Nix)
}

description = "Ktor Server RSocket Plugin"
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,37 @@
* limitations under the License.
*/

package io.rsocket.kotlin.transport.ktor.websocket.server
package io.rsocket.kotlin.ktor.server

import io.ktor.server.application.*
import io.ktor.server.websocket.*
import io.ktor.util.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.core.*

public class RSocketSupport(
public class RSocketSupport private constructor(
internal val server: RSocketServer,
internal val bufferPool: ObjectPool<ChunkBuffer>
) {
public class Config internal constructor() {
public var bufferPool: ObjectPool<ChunkBuffer> = ChunkBuffer.Pool
public var server: RSocketServer = RSocketServer()
public fun server(block: RSocketServerBuilder.() -> Unit) {
server = RSocketServer(block)
}
}

public companion object Feature : BaseApplicationPlugin<Application, Config, RSocketSupport> {
override val key: AttributeKey<RSocketSupport> = AttributeKey("RSocket")
override fun install(pipeline: Application, configure: Config.() -> Unit): RSocketSupport {
pipeline.pluginOrNull(WebSockets)
?: error("RSocket require WebSockets to work. You must install WebSockets plugin first.")
val server = Config().apply(configure).server
return RSocketSupport(server)

return Config().run {
configure()
RSocketSupport(server, bufferPool)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.ktor.server

import io.ktor.server.application.*
import io.ktor.server.routing.*
import io.ktor.server.websocket.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.ktor.websocket.*
import kotlinx.coroutines.*

public fun Route.rSocket(
path: String? = null,
protocol: String? = null,
acceptor: ConnectionAcceptor,
): Unit = application.plugin(RSocketSupport).run {
server.bindIn(application, KtorServerTransport(this@rSocket, path, protocol, bufferPool), acceptor)
}

private class KtorServerTransport(
private val route: Route,
private val path: String?,
private val protocol: String?,
private val pool: ObjectPool<ChunkBuffer>
) : ServerTransport<Unit> {
@TransportApi
override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit) {
val handler: suspend DefaultWebSocketServerSession.() -> Unit = {
val connection = WebSocketConnection(this, pool)
accept(connection)
}
when (path) {
null -> route.webSocket(protocol, handler)
else -> route.webSocket(path, protocol, handler)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import io.ktor.server.engine.*
import io.ktor.server.routing.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.ktor.client.*
import io.rsocket.kotlin.ktor.server.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.test.*
import io.rsocket.kotlin.transport.ktor.websocket.client.*
import io.rsocket.kotlin.transport.tests.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
Expand All @@ -33,8 +34,8 @@ import io.ktor.client.engine.cio.CIO as ClientCIO
import io.ktor.client.plugins.websocket.WebSockets as ClientWebSockets
import io.ktor.server.cio.CIO as ServerCIO
import io.ktor.server.websocket.WebSockets as ServerWebSockets
import io.rsocket.kotlin.transport.ktor.websocket.client.RSocketSupport as ClientRSocketSupport
import io.rsocket.kotlin.transport.ktor.websocket.server.RSocketSupport as ServerRSocketSupport
import io.rsocket.kotlin.ktor.client.RSocketSupport as ClientRSocketSupport
import io.rsocket.kotlin.ktor.server.RSocketSupport as ServerRSocketSupport

class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck {
private val port = PortProvider.next()
Expand Down
Loading

0 comments on commit 9198a98

Please sign in to comment.