diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 5e5c73835..5e54f53fd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -32,6 +32,7 @@ ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" } ktor-client-okhttp = { module = "io.ktor:ktor-client-okhttp", version.ref = "ktor" } ktor-client-darwin = { module = "io.ktor:ktor-client-darwin", version.ref = "ktor" } ktor-server-core = { module = "io.ktor:ktor-server-core", version.ref = "ktor" } +ktor-server-host-common = { module = "io.ktor:ktor-server-host-common", version.ref = "ktor" } ktor-server-websockets = { module = "io.ktor:ktor-server-websockets", version.ref = "ktor" } ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor" } ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/Routing.kt b/rsocket-ktor/build.gradle.kts similarity index 55% rename from rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/Routing.kt rename to rsocket-ktor/build.gradle.kts index 7959dab3f..b2a4b0b51 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/Routing.kt +++ b/rsocket-ktor/build.gradle.kts @@ -14,21 +14,23 @@ * limitations under the License. */ -package io.rsocket.kotlin.transport.ktor.websocket.server - -import io.ktor.server.application.* -import io.ktor.server.routing.* -import io.rsocket.kotlin.* -import kotlinx.coroutines.* - -@OptIn(DelicateCoroutinesApi::class) -public fun Route.rSocket( - path: String? = null, - protocol: String? = null, - acceptor: ConnectionAcceptor, -) { - val serverTransport = serverTransport(path, protocol) - val server = application.plugin(RSocketSupport).server +plugins { + rsocket.template.library +} - server.bind(serverTransport, acceptor) +kotlin { + configureCommon { + main { + dependencies { + api(projects.rsocketCore) + api(projects.rsocketTransportKtor.rsocketTransportKtorWebsocket) + //TODO ContentNegotiation will be here later + } + } + } + configureJvm() + configureJs() + configureNative() } + +description = "Ktor RSocket integration" diff --git a/rsocket-ktor/rsocket-ktor-client/build.gradle.kts b/rsocket-ktor/rsocket-ktor-client/build.gradle.kts new file mode 100644 index 000000000..e14e43067 --- /dev/null +++ b/rsocket-ktor/rsocket-ktor-client/build.gradle.kts @@ -0,0 +1,35 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + rsocket.template.library +} + +kotlin { + configureCommon { + main { + dependencies { + api(projects.rsocketKtor) + api(libs.ktor.client.websockets) + } + } + } + configureJvm() + configureJs() + configureNative() +} + +description = "Ktor Client RSocket plugin" diff --git a/rsocket-ktor/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt b/rsocket-ktor/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt new file mode 100644 index 000000000..41576f14c --- /dev/null +++ b/rsocket-ktor/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/Builders.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.ktor.client + +import io.ktor.client.* +import io.ktor.client.plugins.* +import io.ktor.client.plugins.websocket.* +import io.ktor.client.request.* +import io.ktor.http.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.websocket.* +import kotlin.coroutines.* + +public suspend fun HttpClient.rSocket( + request: HttpRequestBuilder.() -> Unit, +): RSocket = plugin(RSocketSupport).run { + connector.connect(KtorClientTransport(this@rSocket, request, bufferPool)) +} + +public suspend fun HttpClient.rSocket( + urlString: String, + secure: Boolean = false, + request: HttpRequestBuilder.() -> Unit = {}, +): RSocket = rSocket { + url { + this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS + this.port = protocol.defaultPort + takeFrom(urlString) + } + request() +} + +public suspend fun HttpClient.rSocket( + host: String? = null, + port: Int? = null, + path: String? = null, + secure: Boolean = false, + request: HttpRequestBuilder.() -> Unit = {}, +): RSocket = rSocket { + url { + this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS + this.port = protocol.defaultPort + set(host = host, port = port, path = path) + } + request() +} + +private class KtorClientTransport( + private val client: HttpClient, + private val request: HttpRequestBuilder.() -> Unit, + private val pool: ObjectPool +) : ClientTransport { + override val coroutineContext: CoroutineContext get() = client.coroutineContext + + @TransportApi + override suspend fun connect(): Connection = WebSocketConnection(client.webSocketSession(request), pool) +} diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/RSocketSupport.kt b/rsocket-ktor/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/RSocketSupport.kt similarity index 69% rename from rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/RSocketSupport.kt rename to rsocket-ktor/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/RSocketSupport.kt index 48d2e26ba..5289294d9 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/RSocketSupport.kt +++ b/rsocket-ktor/rsocket-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/ktor/client/RSocketSupport.kt @@ -14,27 +14,34 @@ * limitations under the License. */ -package io.rsocket.kotlin.transport.ktor.websocket.client +package io.rsocket.kotlin.ktor.client import io.ktor.client.* import io.ktor.client.plugins.* import io.ktor.client.plugins.websocket.* import io.ktor.util.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.core.* -public class RSocketSupport( +public class RSocketSupport private constructor( internal val connector: RSocketConnector, + internal val bufferPool: ObjectPool ) { public class Config internal constructor() { + public var bufferPool: ObjectPool = ChunkBuffer.Pool public var connector: RSocketConnector = RSocketConnector() + public fun connector(block: RSocketConnectorBuilder.() -> Unit) { + connector = RSocketConnector(block) + } } - public companion object Feature : HttpClientPlugin { + public companion object Plugin : HttpClientPlugin { override val key: AttributeKey = AttributeKey("RSocket") - override fun prepare(block: Config.() -> Unit): RSocketSupport { - val connector = Config().apply(block).connector - return RSocketSupport(connector) + override fun prepare(block: Config.() -> Unit): RSocketSupport = Config().run { + block() + RSocketSupport(connector, bufferPool) } override fun install(plugin: RSocketSupport, scope: HttpClient) { diff --git a/rsocket-ktor/rsocket-ktor-server/build.gradle.kts b/rsocket-ktor/rsocket-ktor-server/build.gradle.kts new file mode 100644 index 000000000..df0feaaa1 --- /dev/null +++ b/rsocket-ktor/rsocket-ktor-server/build.gradle.kts @@ -0,0 +1,42 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + rsocket.template.library +} + +kotlin { + configureCommon { + main { + dependencies { + api(projects.rsocketKtor) + api(libs.ktor.server.websockets) + } + } + test { + dependencies { + implementation(projects.rsocketKtor.rsocketKtorClient) + implementation(projects.rsocketTransportTests) //port provider + implementation(libs.ktor.client.cio) + implementation(libs.ktor.server.cio) + } + } + } + configureJvm() + configureNative(NativeTargets.Nix) +} + +description = "Ktor Server RSocket Plugin" diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/RSocketSupport.kt b/rsocket-ktor/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt similarity index 71% rename from rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/RSocketSupport.kt rename to rsocket-ktor/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt index 93f478764..09a1b3ad2 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/RSocketSupport.kt +++ b/rsocket-ktor/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/RSocketSupport.kt @@ -14,18 +14,25 @@ * limitations under the License. */ -package io.rsocket.kotlin.transport.ktor.websocket.server +package io.rsocket.kotlin.ktor.server import io.ktor.server.application.* import io.ktor.server.websocket.* import io.ktor.util.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.core.* -public class RSocketSupport( +public class RSocketSupport private constructor( internal val server: RSocketServer, + internal val bufferPool: ObjectPool ) { public class Config internal constructor() { + public var bufferPool: ObjectPool = ChunkBuffer.Pool public var server: RSocketServer = RSocketServer() + public fun server(block: RSocketServerBuilder.() -> Unit) { + server = RSocketServer(block) + } } public companion object Feature : BaseApplicationPlugin { @@ -33,8 +40,11 @@ public class RSocketSupport( override fun install(pipeline: Application, configure: Config.() -> Unit): RSocketSupport { pipeline.pluginOrNull(WebSockets) ?: error("RSocket require WebSockets to work. You must install WebSockets plugin first.") - val server = Config().apply(configure).server - return RSocketSupport(server) + + return Config().run { + configure() + RSocketSupport(server, bufferPool) + } } } } diff --git a/rsocket-ktor/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt b/rsocket-ktor/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt new file mode 100644 index 000000000..d07e9d880 --- /dev/null +++ b/rsocket-ktor/rsocket-ktor-server/src/commonMain/kotlin/io/rsocket/kotlin/ktor/server/Routing.kt @@ -0,0 +1,54 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.ktor.server + +import io.ktor.server.application.* +import io.ktor.server.routing.* +import io.ktor.server.websocket.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.websocket.* +import kotlinx.coroutines.* + +public fun Route.rSocket( + path: String? = null, + protocol: String? = null, + acceptor: ConnectionAcceptor, +): Unit = application.plugin(RSocketSupport).run { + server.bindIn(application, KtorServerTransport(this@rSocket, path, protocol, bufferPool), acceptor) +} + +private class KtorServerTransport( + private val route: Route, + private val path: String?, + private val protocol: String?, + private val pool: ObjectPool +) : ServerTransport { + @TransportApi + override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit) { + val handler: suspend DefaultWebSocketServerSession.() -> Unit = { + val connection = WebSocketConnection(this, pool) + accept(connection) + } + when (path) { + null -> route.webSocket(protocol, handler) + else -> route.webSocket(path, protocol, handler) + } + } +} diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketConnectionTest.kt b/rsocket-ktor/rsocket-ktor-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketConnectionTest.kt similarity index 92% rename from rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketConnectionTest.kt rename to rsocket-ktor/rsocket-ktor-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketConnectionTest.kt index 842c02dbc..096752590 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketConnectionTest.kt +++ b/rsocket-ktor/rsocket-ktor-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketConnectionTest.kt @@ -22,9 +22,10 @@ import io.ktor.server.engine.* import io.ktor.server.routing.* import io.rsocket.kotlin.* import io.rsocket.kotlin.keepalive.* +import io.rsocket.kotlin.ktor.client.* +import io.rsocket.kotlin.ktor.server.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.test.* -import io.rsocket.kotlin.transport.ktor.websocket.client.* import io.rsocket.kotlin.transport.tests.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* @@ -33,8 +34,8 @@ import io.ktor.client.engine.cio.CIO as ClientCIO import io.ktor.client.plugins.websocket.WebSockets as ClientWebSockets import io.ktor.server.cio.CIO as ServerCIO import io.ktor.server.websocket.WebSockets as ServerWebSockets -import io.rsocket.kotlin.transport.ktor.websocket.client.RSocketSupport as ClientRSocketSupport -import io.rsocket.kotlin.transport.ktor.websocket.server.RSocketSupport as ServerRSocketSupport +import io.rsocket.kotlin.ktor.client.RSocketSupport as ClientRSocketSupport +import io.rsocket.kotlin.ktor.server.RSocketSupport as ServerRSocketSupport class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck { private val port = PortProvider.next() diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/Builders.kt b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/Builders.kt deleted file mode 100644 index f4d108732..000000000 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/Builders.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.ktor.websocket.client - -import io.ktor.client.* -import io.ktor.client.plugins.* -import io.ktor.client.request.* -import io.ktor.http.* -import io.rsocket.kotlin.* -import io.rsocket.kotlin.transport.* - -public suspend fun HttpClient.rSocket( - request: HttpRequestBuilder.() -> Unit, -): RSocket = rSocket(WebSocketClientTransport(this, request)) - -public suspend fun HttpClient.rSocket( - urlString: String, - secure: Boolean = false, - request: HttpRequestBuilder.() -> Unit = {}, -): RSocket = rSocket(WebSocketClientTransport(this, urlString, secure, request)) - -public suspend fun HttpClient.rSocket( - host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/", - secure: Boolean = false, -): RSocket = rSocket(WebSocketClientTransport(this, host, port, path, secure)) - -private suspend fun HttpClient.rSocket( - transport: ClientTransport, -): RSocket = plugin(RSocketSupport).connector.connect(transport) diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransport.kt b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransport.kt index 41e084e18..e08c1de33 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransport.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/WebSocketClientTransport.kt @@ -20,47 +20,83 @@ package io.rsocket.kotlin.transport.ktor.websocket.client import io.ktor.client.* +import io.ktor.client.engine.* import io.ktor.client.plugins.websocket.* import io.ktor.client.request.* import io.ktor.http.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.transport.* import io.rsocket.kotlin.transport.ktor.websocket.* import kotlinx.coroutines.* +import kotlin.coroutines.* -public fun WebSocketClientTransport( - httpClient: HttpClient, - request: HttpRequestBuilder.() -> Unit, -): ClientTransport = ClientTransport(httpClient.coroutineContext + SupervisorJob(httpClient.coroutineContext[Job])) { - val session = httpClient.webSocketSession(request) - WebSocketConnection(session) +//TODO: will be reworked later with transport API rework + +public fun WebSocketClientTransport( + engineFactory: HttpClientEngineFactory, + context: CoroutineContext = EmptyCoroutineContext, + pool: ObjectPool = ChunkBuffer.Pool, + engine: T.() -> Unit = {}, + webSockets: WebSockets.Config.() -> Unit = {}, + request: HttpRequestBuilder.() -> Unit +): ClientTransport { + val clientEngine = engineFactory.create(engine) + + val transportJob = SupervisorJob(context[Job]) + val transportContext = clientEngine.coroutineContext + context + transportJob + + val httpClient = HttpClient(clientEngine) { + WebSockets(webSockets) + } + + Job(transportJob).invokeOnCompletion { + httpClient.close() + httpClient.cancel() + clientEngine.close() + clientEngine.cancel() + } + + return ClientTransport(transportContext) { + val session = httpClient.webSocketSession(request) + WebSocketConnection(session, pool) + } } -public fun WebSocketClientTransport( - httpClient: HttpClient, - urlString: String, - secure: Boolean = false, - request: HttpRequestBuilder.() -> Unit = {}, -): ClientTransport = WebSocketClientTransport(httpClient) { +public fun WebSocketClientTransport( + engineFactory: HttpClientEngineFactory, + urlString: String, secure: Boolean = false, + context: CoroutineContext = EmptyCoroutineContext, + pool: ObjectPool = ChunkBuffer.Pool, + engine: HttpClientEngineConfig.() -> Unit = {}, + webSockets: WebSockets.Config.() -> Unit = {}, + request: HttpRequestBuilder.() -> Unit = {} +): ClientTransport = WebSocketClientTransport(engineFactory, context, pool, engine, webSockets) { url { - protocol = if (secure) URLProtocol.WSS else URLProtocol.WS - port = url.protocol.defaultPort + this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS + this.port = protocol.defaultPort takeFrom(urlString) } request() } -public fun WebSocketClientTransport( - httpClient: HttpClient, - host: String = "localhost", port: Int = DEFAULT_PORT, path: String = "/", +public fun WebSocketClientTransport( + engineFactory: HttpClientEngineFactory, + host: String? = null, + port: Int? = null, + path: String? = null, secure: Boolean = false, - request: HttpRequestBuilder.() -> Unit = {}, -): ClientTransport = WebSocketClientTransport(httpClient) { + context: CoroutineContext = EmptyCoroutineContext, + pool: ObjectPool = ChunkBuffer.Pool, + engine: HttpClientEngineConfig.() -> Unit = {}, + webSockets: WebSockets.Config.() -> Unit = {}, + request: HttpRequestBuilder.() -> Unit = {} +): ClientTransport = WebSocketClientTransport(engineFactory, context, pool, engine, webSockets) { url { this.protocol = if (secure) URLProtocol.WSS else URLProtocol.WS - this.port = port - this.host = host - this.encodedPath = path + this.port = protocol.defaultPort + set(host = host, port = port, path = path) } request() } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/jsTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/ClientWebSocketTransportTest.kt b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/jsTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/ClientWebSocketTransportTest.kt index c187fde18..fec400693 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/jsTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/ClientWebSocketTransportTest.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/src/jsTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/client/ClientWebSocketTransportTest.kt @@ -16,26 +16,14 @@ package io.rsocket.kotlin.transport.ktor.websocket.client -import io.ktor.client.* import io.ktor.client.engine.js.* -import io.ktor.client.plugins.websocket.* +import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.tests.* -import kotlinx.coroutines.* class ClientWebSocketTransportTest : TransportTest() { - - private val httpClient = HttpClient(Js) { - install(WebSockets) - install(RSocketSupport) { connector = CONNECTOR } - } - override suspend fun before() { - client = httpClient.rSocket(port = 9000) + client = connectClient( + WebSocketClientTransport(Js, port = 9000, context = testContext, pool = InUseTrackingPool) + ) } - - override suspend fun after() { - super.after() - httpClient.coroutineContext.job.cancelAndJoin() - } - } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts index db3a8c441..bab44fc55 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts @@ -24,7 +24,7 @@ kotlin { dependencies { api(projects.rsocketCore) api(projects.rsocketTransportKtor.rsocketTransportKtorWebsocket) - api(libs.ktor.server.core) + api(libs.ktor.server.host.common) api(libs.ktor.server.websockets) } } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransport.kt b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransport.kt index 026333524..f19af41f6 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransport.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketServerTransport.kt @@ -16,25 +16,62 @@ package io.rsocket.kotlin.transport.ktor.websocket.server +import io.ktor.server.application.* +import io.ktor.server.engine.* import io.ktor.server.routing.* import io.ktor.server.websocket.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.transport.* import io.rsocket.kotlin.transport.ktor.websocket.* +//TODO: will be reworked later with transport API rework + +@Suppress("FunctionName") +public fun WebSocketServerTransport( + engineFactory: ApplicationEngineFactory, + port: Int = 80, host: String = "0.0.0.0", + path: String? = null, protocol: String? = null, + pool: ObjectPool = ChunkBuffer.Pool, + engine: T.() -> Unit = {}, + webSockets: WebSockets.WebSocketOptions.() -> Unit = {}, +): ServerTransport = WebSocketServerTransport( + engineFactory, + EngineConnectorBuilder().apply { + this.port = port + this.host = host + } as EngineConnectorConfig, + path = path, + protocol = protocol, + pool = pool, + engine = engine, + webSockets = webSockets, +) + +@Suppress("FunctionName") @OptIn(TransportApi::class) -internal fun Route.serverTransport( - path: String?, - protocol: String?, -): ServerTransport = ServerTransport { acceptor -> - when (path) { - null -> webSocket(protocol) { - val connection = WebSocketConnection(this) - acceptor(connection) - } - else -> webSocket(path, protocol) { - val connection = WebSocketConnection(this) - acceptor(connection) +public fun WebSocketServerTransport( + engineFactory: ApplicationEngineFactory, + vararg connectors: EngineConnectorConfig, + path: String? = null, protocol: String? = null, + pool: ObjectPool = ChunkBuffer.Pool, + engine: T.() -> Unit = {}, + webSockets: WebSockets.WebSocketOptions.() -> Unit = {}, +): ServerTransport = ServerTransport { acceptor -> + val handler: suspend DefaultWebSocketServerSession.() -> Unit = { + val connection = WebSocketConnection(this, pool) + acceptor(connection) + } + embeddedServer(engineFactory, *connectors, configure = engine) { + install(WebSockets, webSockets) + routing { + when (path) { + null -> webSocket(protocol, handler) + else -> webSocket(path, protocol, handler) + } } + }.also { + it.start(wait = false) } } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketTransportTest.kt b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketTransportTest.kt index 237993c32..beb1db5ea 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketTransportTest.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/server/WebSocketTransportTest.kt @@ -16,41 +16,23 @@ package io.rsocket.kotlin.transport.ktor.websocket.server -import io.ktor.client.* import io.ktor.client.engine.* -import io.ktor.server.application.* import io.ktor.server.engine.* -import io.ktor.server.routing.* +import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.ktor.websocket.client.* import io.rsocket.kotlin.transport.tests.* -import kotlinx.coroutines.* -import io.ktor.client.plugins.websocket.WebSockets as ClientWebSockets -import io.ktor.server.websocket.WebSockets as ServerWebSockets -import io.rsocket.kotlin.transport.ktor.websocket.client.RSocketSupport as ClientRSocketSupport -import io.rsocket.kotlin.transport.ktor.websocket.server.RSocketSupport as ServerRSocketSupport abstract class WebSocketTransportTest( - clientEngine: HttpClientEngineFactory<*>, + private val clientEngine: HttpClientEngineFactory<*>, private val serverEngine: ApplicationEngineFactory<*, *>, ) : TransportTest() { - private val port = PortProvider.next() - - private val httpClient = HttpClient(clientEngine) { - install(ClientWebSockets) - install(ClientRSocketSupport) { connector = CONNECTOR } - } - override suspend fun before() { - testScope.embeddedServer(serverEngine, port) { - install(ServerWebSockets) - install(ServerRSocketSupport) { server = SERVER } - install(Routing) { rSocket(acceptor = ACCEPTOR) } - }.start() - client = httpClient.rSocket(port = port) - } - - override suspend fun after() { - super.after() - httpClient.coroutineContext.job.cancelAndJoin() + val port = PortProvider.next() + startServer( + WebSocketServerTransport(serverEngine, port = port, pool = InUseTrackingPool) + ) + client = connectClient( + WebSocketClientTransport(clientEngine, port = port, context = testContext, pool = InUseTrackingPool) + ) } } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection.kt b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection.kt index 5fa7c9c88..bcaf6d8d1 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnection.kt @@ -17,12 +17,17 @@ package io.rsocket.kotlin.transport.ktor.websocket import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.ktor.websocket.* import io.rsocket.kotlin.* import kotlinx.coroutines.* @TransportApi -public class WebSocketConnection(private val session: WebSocketSession) : Connection, CoroutineScope by session { +public class WebSocketConnection( + private val session: WebSocketSession, + override val pool: ObjectPool +) : Connection, CoroutineScope by session { override suspend fun send(packet: ByteReadPacket) { session.send(packet.readBytes()) } diff --git a/rsocket-transport-tests/src/jvmTest/kotlin/io/rsocket/kotlin/transport/tests/server/App.kt b/rsocket-transport-tests/src/jvmTest/kotlin/io/rsocket/kotlin/transport/tests/server/App.kt index af2dba805..0e08214c6 100644 --- a/rsocket-transport-tests/src/jvmTest/kotlin/io/rsocket/kotlin/transport/tests/server/App.kt +++ b/rsocket-transport-tests/src/jvmTest/kotlin/io/rsocket/kotlin/transport/tests/server/App.kt @@ -16,11 +16,7 @@ package io.rsocket.kotlin.transport.tests.server -import io.ktor.server.application.* import io.ktor.server.cio.* -import io.ktor.server.engine.* -import io.ktor.server.routing.* -import io.ktor.server.websocket.* import io.rsocket.kotlin.transport.ktor.tcp.* import io.rsocket.kotlin.transport.ktor.websocket.server.* import io.rsocket.kotlin.transport.tests.* @@ -39,11 +35,11 @@ fun start(): Closeable { ).serverSocket.await() //await server start } - scope.embeddedServer(CIO, port = PortProvider.testServerWebSocket) { - install(WebSockets) - install(RSocketSupport) { server = TransportTest.SERVER } - install(Routing) { rSocket(acceptor = TransportTest.ACCEPTOR) } - }.start() + TransportTest.SERVER.bindIn( + scope, + WebSocketServerTransport(CIO, port = PortProvider.testServerWebSocket), + TransportTest.ACCEPTOR + ) Thread.sleep(1000) //await start diff --git a/settings.gradle.kts b/settings.gradle.kts index a203c9cdc..e947eaba5 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -62,3 +62,10 @@ include( ) include("rsocket-transport-nodejs-tcp") + +//deep ktor integration module +include( + "rsocket-ktor", + "rsocket-ktor:rsocket-ktor-client", + "rsocket-ktor:rsocket-ktor-server", +)