diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt index 25cc041..b4a58ed 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt @@ -155,8 +155,10 @@ abstract class BrokerClient( return topics[topic]?.keys?.get(key) } - internal fun createResponseTopic(topic: String): String = "$topic.responses" - internal fun createResponseKey(key: String): String = "$key.response" + internal fun toResponseTopic(topic: String): String = + if (connection.supportsTopicHotSwap) "$topic.responses" else topic + + internal fun toResponseKey(key: String): String = "$key.response" private fun onTopicMessage( topic: String, diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt index 05ffb65..f2e7398 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt @@ -6,4 +6,9 @@ data class BrokerMessage( val key: String, val value: T, val headers: BaseBrokerMessageHeaders -) +) { + + val messageId: String + get() = headers.messageId + +} diff --git a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt index 15d2f4f..86a0248 100644 --- a/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt +++ b/latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt @@ -186,8 +186,8 @@ class RpcClient( private val requestConsumer = client.consumer(topic, key, options, requestType, responseIsNullable) { val result = callback(it) val responseProducer = client.producer( - client.createResponseTopic(topic), - client.createResponseKey(key), + client.toResponseTopic(topic), + client.toResponseKey(key), options, responseType, responseIsNullable, @@ -197,7 +197,7 @@ class RpcClient( result, services = setOf(it.headers.sourceService), instances = setOf(it.headers.sourceInstance), - inReplyTo = it.headers.messageId, + inReplyTo = it.messageId, ) responseProducer.destroy() } @@ -230,8 +230,8 @@ class RpcClient( val messageId = AtomicReference(null) val responseConsumer = client.consumer( - client.createResponseTopic(topic), - client.createResponseKey(key), + client.toResponseTopic(topic), + client.toResponseKey(key), options, responseType, responseIsNullable,