Skip to content

Commit

Permalink
feat: add callback when mqtt connection is lost (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
andreasgrill committed Aug 12, 2024
1 parent c021a68 commit 9661e8b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class XesarConnect(private val client: IXesarMqttClient, val config: Config) {
private val coroutineScopeForSendCommand =
CoroutineScope(config.dispatcherForCommandsAndCleanUp)
private val coroutineScopeForCleanUp = CoroutineScope(config.dispatcherForCommandsAndCleanUp)

/** Callback for when the mqtt connection is lost. */
var onConnectionLost: ((ConnectionFailedException) -> Unit)? = null
lateinit var token: Token

/**
Expand All @@ -48,7 +51,12 @@ class XesarConnect(private val client: IXesarMqttClient, val config: Config) {
}

init {
client.onDisconnect = { connectionChannel.trySend(ConnectionEvent.DISCONNECTED) }
client.onDisconnect = { ex ->
connectionChannel.trySend(ConnectionEvent.DISCONNECTED)
if (ex != null) {
onConnectionLost?.let { it(ex) }
}
}
client.onMessage = { topic, message ->
// call all listeners on the topic
val decodedMessage = message.decodeToString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ class XesarMqttClient(private val client: MqttAsyncClient) : IXesarMqttClient {

override fun connectionLost(cause: Throwable?) {
log.error("lost connection: $cause")
val exception = ConnectionFailedException("lost connection: $cause")
val exception =
cause?.let { ConnectionFailedException("lost connection", it) }
?: ConnectionFailedException("lost connection")
onDisconnect(exception)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.open200.xesar.connect.it

import com.open200.xesar.connect.XesarConnect
import com.open200.xesar.connect.XesarMqttClient
import io.kotest.common.runBlocking
import io.kotest.core.spec.style.FunSpec
import io.kotest.extensions.testcontainers.perTest
import java.util.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout

class MqttConnectionLostTest :
FunSpec({
val container = MosquittoContainer.container()
val config = MosquittoContainer.config(container)
listener(container.perTest())

test("connect with custom mqtt client id") {
runBlocking {
val connected = CompletableDeferred<Unit>()
val connectionLost = CompletableDeferred<Unit>()
launch {
val xesarMqttClient = XesarMqttClient.connectAsync(config).await()
connected.complete(Unit)

val xesarConnect = XesarConnect(xesarMqttClient, config)
xesarConnect.onConnectionLost = { cause -> connectionLost.complete(Unit) }
try {
xesarConnect.delay()
} catch (e: Exception) {
// ignore
}
}
launch {
withTimeout(1000) { connected.await() }
container.stop()
withTimeout(1000) { connectionLost.await() }
}
}
}
})

0 comments on commit 9661e8b

Please sign in to comment.