Skip to content

Commit

Permalink
Refactored TaskScheduler, renamed to AtomicCoroutineScope which can only
Browse files Browse the repository at this point in the history
be initialized from static method
  • Loading branch information
sacOO7 committed Oct 18, 2024
1 parent 25759b0 commit c3bf5e0
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 75 deletions.
84 changes: 84 additions & 0 deletions chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.ably.chat

import java.util.PriorityQueue
import java.util.concurrent.Executors
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch

/**
* Each ChatRoomLifecycleManager is supposed to have it's own AtomicCoroutineScope.
* AtomicCoroutineScope makes sure all operations are atomic and run with given priority.
* Uses single threaded dispatcher to avoid thread synchronization issues.
*/
class AtomicCoroutineScope private constructor(private val scope: CoroutineScope) {

private class Job(
private val priority: Int,
val coroutineBlock: suspend CoroutineScope.() -> Any,
val deferredResult: CompletableDeferred<Any>,
) :
Comparable<Job> {
override fun compareTo(other: Job): Int = this.priority.compareTo(other.priority)
}

private var isRunning = false
private val jobs: PriorityQueue<Job> = PriorityQueue()

/**
* @param priority Defines priority for the operation execution.
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
*/
suspend fun <T : Any>async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
val deferredResult = CompletableDeferred<Any>()
scope.launch {
jobs.add(Job(priority, coroutineBlock, deferredResult))
if (!isRunning) {
isRunning = true
while (jobs.isNotEmpty()) {
val job = jobs.poll()
job?.let {
try {
it.deferredResult.complete(scope.async(block = it.coroutineBlock).await())
} catch (t: Throwable) {
it.deferredResult.completeExceptionally(t)
}
}
}
isRunning = false
}
}

@Suppress("UNCHECKED_CAST")
return deferredResult as CompletableDeferred<T>
}

/**
* Cancels all jobs along along with it's children.
* This includes cancelling queued jobs and current retry timers.
*/
fun cancel(message: String, cause: Throwable? = null) {
jobs.clear()
scope.cancel(message, cause)
}

companion object {
private var _singleThreadedDispatcher : ExecutorCoroutineDispatcher? = null

fun create(): AtomicCoroutineScope {
if (_singleThreadedDispatcher == null) {
_singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher();
}
return AtomicCoroutineScope(CoroutineScope(singleThreadedDispatcher))
}

val singleThreadedDispatcher: ExecutorCoroutineDispatcher
get() {
return _singleThreadedDispatcher?: error("Call SingleThreadedExecutor.create() method to initialize SingleThreadedDispatcher")
}
}
}
55 changes: 0 additions & 55 deletions chat-android/src/main/java/com/ably/chat/TaskScheduler.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,53 @@ package com.ably.chat

import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import java.util.concurrent.Executors
import kotlin.time.DurationUnit
import kotlin.time.toDuration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.Assert
import org.junit.Test

class TaskSchedulerTest {
class AtomicCoroutineScopeTest {

@Test
fun `should perform given operation`() = runTest {
val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher))
val taskResult = taskScheduler.schedule {
val atomicCoroutineScope = AtomicCoroutineScope.create()
val deferredResult = atomicCoroutineScope.async {
delay(3000)
return@schedule "Operation Success!"
return@async "Operation Success!"
}
val result = taskResult.await()
val result = deferredResult.await()
Assert.assertEquals("Operation Success!", result)
}

@Test
fun `should capture failure of the given operation`() = runTest {
val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher))
val taskResult = taskScheduler.schedule {
val atomicCoroutineScope = AtomicCoroutineScope.create()
val deferredResult = atomicCoroutineScope.async {
delay(2000)
throw AblyException.fromErrorInfo(ErrorInfo("Error performing operation", 400))
}
Assert.assertThrows("Error performing operation", AblyException::class.java) {
runBlocking {
taskResult.await()
deferredResult.await()
}
}
}

@Test
fun `should perform mutually exclusive operations with given priority`() = runTest {
val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher))
val taskResults = mutableListOf<Deferred<Int>>()
val atomicCoroutineScope = AtomicCoroutineScope.create()
val deferredResults = mutableListOf<Deferred<Int>>()
var operationInProgress = false
var counter = 0
val threadIds = mutableSetOf<Long>()

repeat(20) {
val result = taskScheduler.schedule(it) {
val result = atomicCoroutineScope.async(it) {
threadIds.add(Thread.currentThread().id)
if (operationInProgress) {
error("Can't perform operation when other operation is going on")
Expand All @@ -63,12 +57,12 @@ class TaskSchedulerTest {
delay((200..800).random().toDuration(DurationUnit.MILLISECONDS))
operationInProgress = false
val returnValue = counter++
return@schedule returnValue
return@async returnValue
}
taskResults.add(result)
deferredResults.add(result)
}

val results = taskResults.awaitAll()
val results = deferredResults.awaitAll()
repeat(20) {
Assert.assertEquals(it, results[it])
}
Expand Down

0 comments on commit c3bf5e0

Please sign in to comment.