-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathRoom.kt
268 lines (229 loc) · 8.8 KB
/
Room.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@file:Suppress("StringLiteralDuplication", "NotImplementedDeclaration")
package com.ably.chat
import io.ably.lib.types.ErrorInfo
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
/**
* Represents a chat room.
*/
interface Room {
/**
* The unique identifier of the room.
* @returns The room identifier.
*/
val roomId: String
/**
* Allows you to send, subscribe-to and query messages in the room.
*
* @returns The messages instance for the room.
*/
val messages: Messages
/**
* Allows you to subscribe to presence events in the room.
*
* @throws {@link ErrorInfo}} if presence is not enabled for the room.
* @returns The presence instance for the room.
*/
val presence: Presence
/**
* Allows you to interact with room-level reactions.
*
* @throws {@link ErrorInfo} if reactions are not enabled for the room.
* @returns The room reactions instance for the room.
*/
val reactions: RoomReactions
/**
* Allows you to interact with typing events in the room.
*
* @throws {@link ErrorInfo} if typing is not enabled for the room.
* @returns The typing instance for the room.
*/
val typing: Typing
/**
* Allows you to interact with occupancy metrics for the room.
*
* @throws {@link ErrorInfo} if occupancy is not enabled for the room.
* @returns The occupancy instance for the room.
*/
val occupancy: Occupancy
/**
* Returns the room options.
*
* @returns A copy of the options used to create the room.
*/
val options: RoomOptions
/**
* (CHA-RS2)
* The current status of the room.
*
* @returns The current status.
*/
val status: RoomStatus
/**
* The current error, if any, that caused the room to enter the current status.
*/
val error: ErrorInfo?
/**
* Registers a listener that will be called whenever the room status changes.
* @param listener The function to call when the status changes.
* @returns An object that can be used to unregister the listener.
*/
fun onStatusChange(listener: RoomLifecycle.Listener): Subscription
/**
* Removes all listeners that were added by the `onStatusChange` method.
*/
fun offAllStatusChange()
/**
* Attaches to the room to receive events in realtime.
*
* If a room fails to attach, it will enter either the {@link RoomLifecycle.Suspended} or {@link RoomLifecycle.Failed} state.
*
* If the room enters the failed state, then it will not automatically retry attaching and intervention is required.
*
* If the room enters the suspended state, then the call to attach will reject with the {@link ErrorInfo} that caused the suspension. However,
* the room will automatically retry attaching after a delay.
*/
suspend fun attach()
/**
* Detaches from the room to stop receiving events in realtime.
*/
suspend fun detach()
}
internal class DefaultRoom(
override val roomId: String,
override val options: RoomOptions,
internal val realtimeClient: RealtimeClient,
internal val chatApi: ChatApi,
internal val clientId: String,
logger: Logger,
) : Room {
internal val roomLogger = logger.withContext("Room", mapOf("roomId" to roomId))
/**
* RoomScope is a crucial part of the Room lifecycle. It manages sequential and atomic operations.
* Parallelism is intentionally limited to 1 to ensure that only one coroutine runs at a time,
* preventing concurrency issues. Every operation within Room must be performed through this scope.
*/
private val roomScope =
CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(roomId) + SupervisorJob())
override val messages = DefaultMessages(room = this)
private var _presence: Presence? = null
override val presence: Presence
get() {
if (_presence == null) { // CHA-RC2b
throw ablyException("Presence is not enabled for this room", ErrorCode.BadRequest)
}
return _presence as Presence
}
private var _reactions: RoomReactions? = null
override val reactions: RoomReactions
get() {
if (_reactions == null) { // CHA-RC2b
throw ablyException("Reactions are not enabled for this room", ErrorCode.BadRequest)
}
return _reactions as RoomReactions
}
private var _typing: Typing? = null
override val typing: Typing
get() {
if (_typing == null) { // CHA-RC2b
throw ablyException("Typing is not enabled for this room", ErrorCode.BadRequest)
}
return _typing as Typing
}
private var _occupancy: Occupancy? = null
override val occupancy: Occupancy
get() {
if (_occupancy == null) { // CHA-RC2b
throw ablyException("Occupancy is not enabled for this room", ErrorCode.BadRequest)
}
return _occupancy as Occupancy
}
private val statusLifecycle = DefaultRoomLifecycle(roomLogger)
override val status: RoomStatus
get() = statusLifecycle.status
override val error: ErrorInfo?
get() = statusLifecycle.error
private var lifecycleManager: RoomLifecycleManager
init {
options.validateRoomOptions() // CHA-RC2a
val roomFeatures = mutableListOf<ContributesToRoomLifecycle>(messages)
options.presence?.let {
val presenceContributor = DefaultPresence(room = this)
roomFeatures.add(presenceContributor)
_presence = presenceContributor
}
options.typing?.let {
val typingContributor = DefaultTyping(room = this)
roomFeatures.add(typingContributor)
_typing = typingContributor
}
options.reactions?.let {
val reactionsContributor = DefaultRoomReactions(room = this)
roomFeatures.add(reactionsContributor)
_reactions = reactionsContributor
}
options.occupancy?.let {
val occupancyContributor = DefaultOccupancy(room = this)
roomFeatures.add(occupancyContributor)
_occupancy = occupancyContributor
}
lifecycleManager = RoomLifecycleManager(roomScope, statusLifecycle, roomFeatures, roomLogger)
}
override fun onStatusChange(listener: RoomLifecycle.Listener): Subscription =
statusLifecycle.onChange(listener)
override fun offAllStatusChange() {
statusLifecycle.offAll()
}
override suspend fun attach() {
lifecycleManager.attach()
}
override suspend fun detach() {
lifecycleManager.detach()
}
/**
* Releases the room, underlying channels are removed from the core SDK to prevent leakage.
* This is an internal method and only called from Rooms interface implementation.
*/
internal suspend fun release() {
lifecycleManager.release()
}
/**
* Ensures that the room is attached before performing any realtime room operation.
* @throws roomInvalidStateException if room is not in ATTACHING/ATTACHED state.
* Spec: CHA-RL9
*/
internal suspend fun ensureAttached() {
// CHA-PR3d, CHA-PR10d, CHA-PR6c, CHA-PR6c
if (statusLifecycle.status == RoomStatus.Attached) {
return
}
if (statusLifecycle.status == RoomStatus.Attaching) { // CHA-RL9
val attachDeferred = CompletableDeferred<Unit>()
roomScope.launch {
when (statusLifecycle.status) {
RoomStatus.Attached -> attachDeferred.complete(Unit)
RoomStatus.Attaching -> statusLifecycle.onChangeOnce {
if (it.current == RoomStatus.Attached) {
attachDeferred.complete(Unit)
} else {
val exception = roomInvalidStateException(roomId, statusLifecycle.status, HttpStatusCode.InternalServerError)
attachDeferred.completeExceptionally(exception)
}
}
else -> {
val exception = roomInvalidStateException(roomId, statusLifecycle.status, HttpStatusCode.InternalServerError)
attachDeferred.completeExceptionally(exception)
}
}
}
attachDeferred.await()
return
}
// CHA-PR3h, CHA-PR10h, CHA-PR6h, CHA-T2g
throw roomInvalidStateException(roomId, statusLifecycle.status, HttpStatusCode.BadRequest)
}
}