-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathOccupancy.kt
207 lines (173 loc) · 5.84 KB
/
Occupancy.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
@file:Suppress("StringLiteralDuplication", "NotImplementedDeclaration")
package com.ably.chat
import com.google.gson.JsonObject
import com.google.gson.JsonPrimitive
import io.ably.lib.realtime.Channel
import java.util.concurrent.CopyOnWriteArrayList
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
/**
* This interface is used to interact with occupancy in a chat room: subscribing to occupancy updates and
* fetching the current room occupancy metrics.
*
* Get an instance via {@link Room.occupancy}.
*/
interface Occupancy : EmitsDiscontinuities {
/**
* Get underlying Ably channel for occupancy events.
*
* @returns The underlying Ably channel for occupancy events.
*/
val channel: Channel
/**
* Subscribe a given listener to occupancy updates of the chat room.
*
* @param listener A listener to be called when the occupancy of the room changes.
*/
fun subscribe(listener: Listener): Subscription
/**
* Get the current occupancy of the chat room.
*
* @returns the current occupancy of the chat room.
*/
suspend fun get(): OccupancyEvent
/**
* An interface for listening to new occupancy event
*/
fun interface Listener {
/**
* A function that can be called when the new occupancy event happens.
* @param event The event that happened.
*/
fun onEvent(event: OccupancyEvent)
}
}
/**
* Represents the occupancy of a chat room.
*
* (CHA-O2)
*/
data class OccupancyEvent(
/**
* The number of connections to the chat room.
*/
val connections: Int,
/**
* The number of presence members in the chat room - members who have entered presence.
*/
val presenceMembers: Int,
)
internal class DefaultOccupancy(
private val room: DefaultRoom,
) : Occupancy, ContributesToRoomLifecycleImpl(room.roomLogger) {
override val featureName: String = "occupancy"
override val attachmentErrorCode: ErrorCode = ErrorCode.OccupancyAttachmentFailed
override val detachmentErrorCode: ErrorCode = ErrorCode.OccupancyDetachmentFailed
private val logger = room.roomLogger.withContext(tag = "Occupancy")
override val channel: Channel = room.messages.channel
private val listeners: MutableList<Occupancy.Listener> = CopyOnWriteArrayList()
private val eventBus = MutableSharedFlow<OccupancyEvent>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
private val occupancyScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + SupervisorJob())
private val occupancySubscription: Subscription
init {
occupancyScope.launch {
eventBus.collect { occupancyEvent ->
listeners.forEach {
it.onEvent(occupancyEvent)
}
}
}
val occupancyListener = PubSubMessageListener {
internalChannelListener(it)
}
channel.subscribe(occupancyListener)
occupancySubscription = Subscription {
channel.unsubscribe(occupancyListener)
}
}
// (CHA-O4)
override fun subscribe(listener: Occupancy.Listener): Subscription {
logger.trace("Occupancy.subscribe()")
listeners.add(listener)
return Subscription {
logger.trace("Occupancy.unsubscribe()")
// (CHA-04b)
listeners.remove(listener)
}
}
// (CHA-O3)
override suspend fun get(): OccupancyEvent {
logger.trace("Occupancy.get()")
return room.chatApi.getOccupancy(room.roomId)
}
override fun release() {
occupancySubscription.unsubscribe()
occupancyScope.cancel()
}
/**
* An internal listener that listens for occupancy events from the underlying channel and translates them into
* occupancy events for the public API.
*/
@Suppress("ReturnCount")
private fun internalChannelListener(message: PubSubMessage) {
val data = message.data as? JsonObject
if (data == null) {
logger.error(
"invalid occupancy event received; data is not an object",
staticContext = mapOf(
"message" to message.toString(),
),
)
// (CHA-04d)
return
}
val metrics = data.get("metrics") as? JsonObject
if (metrics == null) {
logger.error(
"invalid occupancy event received; metrics is missing",
staticContext = mapOf(
"data" to data.toString(),
),
)
// (CHA-04d)
return
}
val connections = metrics.get("connections") as? JsonPrimitive
if (connections == null) {
logger.error(
"invalid occupancy event received; connections is missing",
staticContext = mapOf(
"data" to data.toString(),
),
)
// (CHA-04d)
return
}
val presenceMembers = metrics.get("presenceMembers") as? JsonPrimitive
if (presenceMembers == null) {
logger.error(
"invalid occupancy event received; presenceMembers is missing",
staticContext = mapOf(
"data" to data.toString(),
),
)
// (CHA-04d)
return
}
eventBus.tryEmit(
// (CHA-04c)
OccupancyEvent(
connections = connections.asInt,
presenceMembers = presenceMembers.asInt,
),
)
}
}