Skip to content

Commit

Permalink
Implement cluster subscription API (#31057)
Browse files Browse the repository at this point in the history
  • Loading branch information
yufengwangca authored Dec 18, 2023
1 parent 9310a37 commit 342e338
Show file tree
Hide file tree
Showing 116 changed files with 106,024 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import kotlinx.coroutines.runBlocking
import matter.controller.MatterController
import matter.controller.SubscribeRequest
import matter.controller.SubscriptionState
import matter.controller.UShortSubscriptionState
import matter.controller.cluster.clusters.IdentifyCluster
import matter.controller.model.AttributePath
import matter.controller.model.EventPath

Expand All @@ -24,26 +26,6 @@ class PairOnNetworkLongImSubscribeCommand(
DiscoveryFilterType.LONG_DISCRIMINATOR
) {
override fun runCommand() {
val attributePaths =
listOf(
AttributePath(
endpointId = WILDCARD_ENDPOINT_ID,
clusterId = WILDCARD_CLUSTER_ID,
attributeId = WILDCARD_EVENT_ID,
)
)

val eventPaths =
listOf(
EventPath(
endpointId = WILDCARD_ENDPOINT_ID,
clusterId = WILDCARD_CLUSTER_ID,
eventId = WILDCARD_EVENT_ID
)
)

val subscribeRequest: SubscribeRequest = SubscribeRequest(eventPaths, attributePaths)

currentCommissioner()
.pairDevice(
getNodeId(),
Expand All @@ -57,7 +39,11 @@ class PairOnNetworkLongImSubscribeCommand(

runBlocking {
try {
startSubscription(subscribeRequest)
// Verify Wildcard subscription
startWildcardSubscription()

// Verify IdentifyTime attribute subscription
subscribeIdentifyTimeAttribute()
} catch (ex: Exception) {
logger.log(Level.WARNING, "General subscribe failure occurred with error ${ex.message}")
setFailure("subscribe failure")
Expand All @@ -69,8 +55,28 @@ class PairOnNetworkLongImSubscribeCommand(
setSuccess()
}

private suspend fun startSubscription(request: SubscribeRequest) {
logger.log(Level.INFO, "Starting subscription")
private suspend fun startWildcardSubscription() {
logger.log(Level.INFO, "Starting wildcard subscription")

val attributePaths =
listOf(
AttributePath(
endpointId = WILDCARD_ENDPOINT_ID,
clusterId = WILDCARD_CLUSTER_ID,
attributeId = WILDCARD_EVENT_ID,
)
)

val eventPaths =
listOf(
EventPath(
endpointId = WILDCARD_ENDPOINT_ID,
clusterId = WILDCARD_CLUSTER_ID,
eventId = WILDCARD_EVENT_ID
)
)

val request: SubscribeRequest = SubscribeRequest(eventPaths, attributePaths)

currentCommissioner()
.subscribe(request)
Expand All @@ -92,7 +98,39 @@ class PairOnNetworkLongImSubscribeCommand(
)
}
is SubscriptionState.SubscriptionEstablished -> {
logger.log(Level.INFO, "Subscription is established")
logger.log(Level.INFO, "Wildcard Subscription is established")
}
else -> {
logger.log(Level.SEVERE, "Unexpected subscription state: $subscriptionState")
}
}
}
}

private suspend fun subscribeIdentifyTimeAttribute() {
logger.log(Level.INFO, "Subscribe IdentifyTime attribute")

val identifyCluster = IdentifyCluster(controller = currentCommissioner(), endpointId = 0u)

identifyCluster
.subscribeIdentifyTimeAttribute(minInterval = 0, maxInterval = 5)
.takeWhile { subscriptionState ->
// Keep collecting as long as it's not SubscriptionEstablished
subscriptionState !is UShortSubscriptionState.SubscriptionEstablished
}
.collect { subscriptionState ->
when (subscriptionState) {
is UShortSubscriptionState.Success -> {
logger.log(Level.INFO, "Received IdentifyTime Update: ${subscriptionState.value}")
}
is UShortSubscriptionState.Error -> {
logger.log(
Level.WARNING,
"Received SubscriptionErrorNotification with terminationCause: ${subscriptionState.exception}"
)
}
is UShortSubscriptionState.SubscriptionEstablished -> {
logger.log(Level.INFO, "IdentifyTime Subscription is established")
}
else -> {
logger.log(Level.SEVERE, "Unexpected subscription state: $subscriptionState")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,29 @@ package matter.controller.cluster.clusters
import java.util.logging.Level
import java.util.logging.Logger
import java.time.Duration
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.transform
import matter.controller.MatterController
import matter.controller.ReadRequest
import matter.controller.ReadData
import matter.controller.ReadFailure
import matter.controller.ReadResponse
import matter.controller.SubscribeRequest
import matter.controller.SubscriptionState
import matter.controller.ByteSubscriptionState
import matter.controller.ShortSubscriptionState
import matter.controller.IntSubscriptionState
import matter.controller.LongSubscriptionState
import matter.controller.FloatSubscriptionState
import matter.controller.DoubleSubscriptionState
import matter.controller.CharSubscriptionState
import matter.controller.BooleanSubscriptionState
import matter.controller.UByteSubscriptionState
import matter.controller.UShortSubscriptionState
import matter.controller.UIntSubscriptionState
import matter.controller.ULongSubscriptionState
import matter.controller.StringSubscriptionState
import matter.controller.ByteArraySubscriptionState
import matter.controller.WriteRequest
import matter.controller.WriteRequests
import matter.controller.WriteResponse
Expand Down Expand Up @@ -173,9 +189,20 @@ class {{cluster.name}}Cluster(private val controller: MatterController, private
{%- set encodable = attribute.definition | asEncodable(typeLookup) -%}
{%- set interfaceName = attribute | javaAttributeCallbackName(typeLookup) -%}
{%- if interfaceName not in already_handled_attribute %}
{%- set valueType = encode_value(cluster, encodable, 0) -%}
class {{interfaceName}}(
val value: {{encode_value(cluster, encodable, 0)}}
val value: {{valueType}}
)

sealed class {{interfaceName}}SubscriptionState {
data class Success(
val value: {{valueType}}
) : {{interfaceName}}SubscriptionState()

data class Error(val exception: Exception) : {{interfaceName}}SubscriptionState()

object SubscriptionEstablished : {{interfaceName}}SubscriptionState()
}
{% if already_handled_attribute.append(interfaceName) -%}
{#- This block does nothing, it only exists to append to already_handled_attribute. -#}
{%- endif -%}
Expand Down Expand Up @@ -387,6 +414,63 @@ class {{cluster.name}}Cluster(private val controller: MatterController, private
}
}
{% endif %}
{%- if attribute.is_subscribable %}
{%- set encodable = attribute.definition | asEncodable(typeLookup) %}
{%- set encodable_was_optional = encodable.is_optional or encodable.is_nullable %}
suspend fun subscribe{{ attribute.definition.name | upfirst }}Attribute(
minInterval: Int,
maxInterval: Int
): Flow<{{interfaceName}}SubscriptionState> {
val ATTRIBUTE_ID: UInt = {{attribute.definition.code}}u
val attributePaths = listOf(
AttributePath(
endpointId = endpointId,
clusterId = CLUSTER_ID,
attributeId = ATTRIBUTE_ID
)
)

val subscribeRequest: SubscribeRequest = SubscribeRequest(
eventPaths = emptyList(),
attributePaths = attributePaths,
minInterval = Duration.ofSeconds(minInterval.toLong()),
maxInterval = Duration.ofSeconds(maxInterval.toLong())
)

return controller.subscribe(subscribeRequest).transform { subscriptionState ->
when (subscriptionState) {
is SubscriptionState.SubscriptionErrorNotification -> {
emit({{interfaceName}}SubscriptionState.Error(Exception("Subscription terminated with error code: ${subscriptionState.terminationCause}")))
}
is SubscriptionState.NodeStateUpdate -> {
val attributeData =
subscriptionState.updateState.successes.filterIsInstance<ReadData.Attribute>().firstOrNull {
it.path.attributeId == ATTRIBUTE_ID
}

requireNotNull(attributeData) {
"{{ attribute.definition.name | capitalize }} attribute not found in Node State update"
}

// Decode the TLV data into the appropriate type
val tlvReader = TlvReader(attributeData.data)
val decodedValue: {{encode_value(cluster, encodable, 0)}} = {{decode_tlv(cluster, attribute.definition | asEncodable(typeLookup), "AnonymousTag", 0)}}

{% if encodable_was_optional-%}
decodedValue?.let {
emit({{interfaceName}}SubscriptionState.Success(it))
}
{% else -%}
emit({{interfaceName}}SubscriptionState.Success(decodedValue))
{%- endif %}
}
SubscriptionState.SubscriptionEstablished -> {
emit({{interfaceName}}SubscriptionState.SubscriptionEstablished)
}
}
}
}
{% endif -%}
{%- endfor %}
companion object {
private val logger = Logger.getLogger({{cluster.name}}Cluster::class.java.name)
Expand Down
1 change: 1 addition & 0 deletions src/controller/java/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ kotlin_library("kotlin_matter_controller") {
"src/matter/controller/MatterControllerImpl.kt",
"src/matter/controller/Messages.kt",
"src/matter/controller/OperationalKeyConfig.kt",
"src/matter/controller/SubscriptionStates.kt",
"src/matter/controller/model/Paths.kt",
"src/matter/controller/model/States.kt",
]
Expand Down
Loading

0 comments on commit 342e338

Please sign in to comment.