Skip to content

Commit 342e338

Browse files
authored
Implement cluster subscription API (#31057)
1 parent 9310a37 commit 342e338

File tree

116 files changed

+106024
-96
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

116 files changed

+106024
-96
lines changed

examples/kotlin-matter-controller/java/src/com/matter/controller/commands/pairing/PairOnNetworkLongImSubscribeCommand.kt

+62-24
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import kotlinx.coroutines.runBlocking
88
import matter.controller.MatterController
99
import matter.controller.SubscribeRequest
1010
import matter.controller.SubscriptionState
11+
import matter.controller.UShortSubscriptionState
12+
import matter.controller.cluster.clusters.IdentifyCluster
1113
import matter.controller.model.AttributePath
1214
import matter.controller.model.EventPath
1315

@@ -24,26 +26,6 @@ class PairOnNetworkLongImSubscribeCommand(
2426
DiscoveryFilterType.LONG_DISCRIMINATOR
2527
) {
2628
override fun runCommand() {
27-
val attributePaths =
28-
listOf(
29-
AttributePath(
30-
endpointId = WILDCARD_ENDPOINT_ID,
31-
clusterId = WILDCARD_CLUSTER_ID,
32-
attributeId = WILDCARD_EVENT_ID,
33-
)
34-
)
35-
36-
val eventPaths =
37-
listOf(
38-
EventPath(
39-
endpointId = WILDCARD_ENDPOINT_ID,
40-
clusterId = WILDCARD_CLUSTER_ID,
41-
eventId = WILDCARD_EVENT_ID
42-
)
43-
)
44-
45-
val subscribeRequest: SubscribeRequest = SubscribeRequest(eventPaths, attributePaths)
46-
4729
currentCommissioner()
4830
.pairDevice(
4931
getNodeId(),
@@ -57,7 +39,11 @@ class PairOnNetworkLongImSubscribeCommand(
5739

5840
runBlocking {
5941
try {
60-
startSubscription(subscribeRequest)
42+
// Verify Wildcard subscription
43+
startWildcardSubscription()
44+
45+
// Verify IdentifyTime attribute subscription
46+
subscribeIdentifyTimeAttribute()
6147
} catch (ex: Exception) {
6248
logger.log(Level.WARNING, "General subscribe failure occurred with error ${ex.message}")
6349
setFailure("subscribe failure")
@@ -69,8 +55,28 @@ class PairOnNetworkLongImSubscribeCommand(
6955
setSuccess()
7056
}
7157

72-
private suspend fun startSubscription(request: SubscribeRequest) {
73-
logger.log(Level.INFO, "Starting subscription")
58+
private suspend fun startWildcardSubscription() {
59+
logger.log(Level.INFO, "Starting wildcard subscription")
60+
61+
val attributePaths =
62+
listOf(
63+
AttributePath(
64+
endpointId = WILDCARD_ENDPOINT_ID,
65+
clusterId = WILDCARD_CLUSTER_ID,
66+
attributeId = WILDCARD_EVENT_ID,
67+
)
68+
)
69+
70+
val eventPaths =
71+
listOf(
72+
EventPath(
73+
endpointId = WILDCARD_ENDPOINT_ID,
74+
clusterId = WILDCARD_CLUSTER_ID,
75+
eventId = WILDCARD_EVENT_ID
76+
)
77+
)
78+
79+
val request: SubscribeRequest = SubscribeRequest(eventPaths, attributePaths)
7480

7581
currentCommissioner()
7682
.subscribe(request)
@@ -92,7 +98,39 @@ class PairOnNetworkLongImSubscribeCommand(
9298
)
9399
}
94100
is SubscriptionState.SubscriptionEstablished -> {
95-
logger.log(Level.INFO, "Subscription is established")
101+
logger.log(Level.INFO, "Wildcard Subscription is established")
102+
}
103+
else -> {
104+
logger.log(Level.SEVERE, "Unexpected subscription state: $subscriptionState")
105+
}
106+
}
107+
}
108+
}
109+
110+
private suspend fun subscribeIdentifyTimeAttribute() {
111+
logger.log(Level.INFO, "Subscribe IdentifyTime attribute")
112+
113+
val identifyCluster = IdentifyCluster(controller = currentCommissioner(), endpointId = 0u)
114+
115+
identifyCluster
116+
.subscribeIdentifyTimeAttribute(minInterval = 0, maxInterval = 5)
117+
.takeWhile { subscriptionState ->
118+
// Keep collecting as long as it's not SubscriptionEstablished
119+
subscriptionState !is UShortSubscriptionState.SubscriptionEstablished
120+
}
121+
.collect { subscriptionState ->
122+
when (subscriptionState) {
123+
is UShortSubscriptionState.Success -> {
124+
logger.log(Level.INFO, "Received IdentifyTime Update: ${subscriptionState.value}")
125+
}
126+
is UShortSubscriptionState.Error -> {
127+
logger.log(
128+
Level.WARNING,
129+
"Received SubscriptionErrorNotification with terminationCause: ${subscriptionState.exception}"
130+
)
131+
}
132+
is UShortSubscriptionState.SubscriptionEstablished -> {
133+
logger.log(Level.INFO, "IdentifyTime Subscription is established")
96134
}
97135
else -> {
98136
logger.log(Level.SEVERE, "Unexpected subscription state: $subscriptionState")

scripts/py_matter_idl/matter_idl/generators/kotlin/MatterClusters.jinja

+85-1
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,29 @@ package matter.controller.cluster.clusters
126126
import java.util.logging.Level
127127
import java.util.logging.Logger
128128
import java.time.Duration
129+
import kotlinx.coroutines.flow.Flow
130+
import kotlinx.coroutines.flow.transform
129131
import matter.controller.MatterController
130132
import matter.controller.ReadRequest
131133
import matter.controller.ReadData
132134
import matter.controller.ReadFailure
133135
import matter.controller.ReadResponse
134136
import matter.controller.SubscribeRequest
135137
import matter.controller.SubscriptionState
138+
import matter.controller.ByteSubscriptionState
139+
import matter.controller.ShortSubscriptionState
140+
import matter.controller.IntSubscriptionState
141+
import matter.controller.LongSubscriptionState
142+
import matter.controller.FloatSubscriptionState
143+
import matter.controller.DoubleSubscriptionState
144+
import matter.controller.CharSubscriptionState
145+
import matter.controller.BooleanSubscriptionState
146+
import matter.controller.UByteSubscriptionState
147+
import matter.controller.UShortSubscriptionState
148+
import matter.controller.UIntSubscriptionState
149+
import matter.controller.ULongSubscriptionState
150+
import matter.controller.StringSubscriptionState
151+
import matter.controller.ByteArraySubscriptionState
136152
import matter.controller.WriteRequest
137153
import matter.controller.WriteRequests
138154
import matter.controller.WriteResponse
@@ -173,9 +189,20 @@ class {{cluster.name}}Cluster(private val controller: MatterController, private
173189
{%- set encodable = attribute.definition | asEncodable(typeLookup) -%}
174190
{%- set interfaceName = attribute | javaAttributeCallbackName(typeLookup) -%}
175191
{%- if interfaceName not in already_handled_attribute %}
192+
{%- set valueType = encode_value(cluster, encodable, 0) -%}
176193
class {{interfaceName}}(
177-
val value: {{encode_value(cluster, encodable, 0)}}
194+
val value: {{valueType}}
178195
)
196+
197+
sealed class {{interfaceName}}SubscriptionState {
198+
data class Success(
199+
val value: {{valueType}}
200+
) : {{interfaceName}}SubscriptionState()
201+
202+
data class Error(val exception: Exception) : {{interfaceName}}SubscriptionState()
203+
204+
object SubscriptionEstablished : {{interfaceName}}SubscriptionState()
205+
}
179206
{% if already_handled_attribute.append(interfaceName) -%}
180207
{#- This block does nothing, it only exists to append to already_handled_attribute. -#}
181208
{%- endif -%}
@@ -387,6 +414,63 @@ class {{cluster.name}}Cluster(private val controller: MatterController, private
387414
}
388415
}
389416
{% endif %}
417+
{%- if attribute.is_subscribable %}
418+
{%- set encodable = attribute.definition | asEncodable(typeLookup) %}
419+
{%- set encodable_was_optional = encodable.is_optional or encodable.is_nullable %}
420+
suspend fun subscribe{{ attribute.definition.name | upfirst }}Attribute(
421+
minInterval: Int,
422+
maxInterval: Int
423+
): Flow<{{interfaceName}}SubscriptionState> {
424+
val ATTRIBUTE_ID: UInt = {{attribute.definition.code}}u
425+
val attributePaths = listOf(
426+
AttributePath(
427+
endpointId = endpointId,
428+
clusterId = CLUSTER_ID,
429+
attributeId = ATTRIBUTE_ID
430+
)
431+
)
432+
433+
val subscribeRequest: SubscribeRequest = SubscribeRequest(
434+
eventPaths = emptyList(),
435+
attributePaths = attributePaths,
436+
minInterval = Duration.ofSeconds(minInterval.toLong()),
437+
maxInterval = Duration.ofSeconds(maxInterval.toLong())
438+
)
439+
440+
return controller.subscribe(subscribeRequest).transform { subscriptionState ->
441+
when (subscriptionState) {
442+
is SubscriptionState.SubscriptionErrorNotification -> {
443+
emit({{interfaceName}}SubscriptionState.Error(Exception("Subscription terminated with error code: ${subscriptionState.terminationCause}")))
444+
}
445+
is SubscriptionState.NodeStateUpdate -> {
446+
val attributeData =
447+
subscriptionState.updateState.successes.filterIsInstance<ReadData.Attribute>().firstOrNull {
448+
it.path.attributeId == ATTRIBUTE_ID
449+
}
450+
451+
requireNotNull(attributeData) {
452+
"{{ attribute.definition.name | capitalize }} attribute not found in Node State update"
453+
}
454+
455+
// Decode the TLV data into the appropriate type
456+
val tlvReader = TlvReader(attributeData.data)
457+
val decodedValue: {{encode_value(cluster, encodable, 0)}} = {{decode_tlv(cluster, attribute.definition | asEncodable(typeLookup), "AnonymousTag", 0)}}
458+
459+
{% if encodable_was_optional-%}
460+
decodedValue?.let {
461+
emit({{interfaceName}}SubscriptionState.Success(it))
462+
}
463+
{% else -%}
464+
emit({{interfaceName}}SubscriptionState.Success(decodedValue))
465+
{%- endif %}
466+
}
467+
SubscriptionState.SubscriptionEstablished -> {
468+
emit({{interfaceName}}SubscriptionState.SubscriptionEstablished)
469+
}
470+
}
471+
}
472+
}
473+
{% endif -%}
390474
{%- endfor %}
391475
companion object {
392476
private val logger = Logger.getLogger({{cluster.name}}Cluster::class.java.name)

src/controller/java/BUILD.gn

+1
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ kotlin_library("kotlin_matter_controller") {
371371
"src/matter/controller/MatterControllerImpl.kt",
372372
"src/matter/controller/Messages.kt",
373373
"src/matter/controller/OperationalKeyConfig.kt",
374+
"src/matter/controller/SubscriptionStates.kt",
374375
"src/matter/controller/model/Paths.kt",
375376
"src/matter/controller/model/States.kt",
376377
]

0 commit comments

Comments
 (0)