Skip to content

Commit

Permalink
Refactored TaskScheduler, renamed to SingleThreadedExecutor which can…
Browse files Browse the repository at this point in the history
… only

be initialized from static method
  • Loading branch information
sacOO7 committed Oct 18, 2024
1 parent 25759b0 commit 4483533
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 75 deletions.
85 changes: 85 additions & 0 deletions chat-android/src/main/java/com/ably/chat/SingleThreadedExecutor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.ably.chat

import java.util.PriorityQueue
import java.util.concurrent.Executors
import kotlin.coroutines.cancellation.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch

/**
* Each ChatRoomLifecycleManager is supposed to have it's own SingleThreadedExecutor.
* SingleThreadedExecutor makes sure all operations are atomic and run with given priority.
* Uses single threaded dispatcher to avoid thread synchronization issues.
*/
class SingleThreadedExecutor private constructor(private val scope: CoroutineScope) {

private class Job(
private val priority: Int,
val coroutineBlock: suspend CoroutineScope.() -> Any,
val deferredResult: CompletableDeferred<Any>,
) :
Comparable<Job> {
override fun compareTo(other: Job): Int = this.priority.compareTo(other.priority)
}

private var isRunning = false
private val jobs: PriorityQueue<Job> = PriorityQueue()

/**
* @param priority Defines priority for the operation execution.
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
*/
suspend fun <T : Any>run(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
val deferredResult = CompletableDeferred<Any>()
scope.launch {
jobs.add(Job(priority, coroutineBlock, deferredResult))
if (!isRunning) {
isRunning = true
while (jobs.isNotEmpty()) {
val job = jobs.poll()
job?.let {
try {
it.deferredResult.complete(scope.async(block = it.coroutineBlock).await())
} catch (t: Throwable) {
it.deferredResult.completeExceptionally(t)
}
}
}
isRunning = false
}
}

@Suppress("UNCHECKED_CAST")
return deferredResult as CompletableDeferred<T>
}

/**
* Cancels all jobs along along with it's children.
* This includes cancelling queued jobs and current retry timers.
*/
fun close(message: String, cause: Throwable? = null) {
jobs.clear()
scope.cancel(message, cause)
}

companion object {
private var _singleThreadedDispatcher : ExecutorCoroutineDispatcher? = null

fun create(): SingleThreadedExecutor {
if (_singleThreadedDispatcher == null) {
_singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher();
}
return SingleThreadedExecutor(CoroutineScope(singleThreadedDispatcher))
}

val singleThreadedDispatcher: ExecutorCoroutineDispatcher
get() {
return _singleThreadedDispatcher?: error("Call SingleThreadedExecutor.create() method to initialize SingleThreadedDispatcher")
}
}
}
55 changes: 0 additions & 55 deletions chat-android/src/main/java/com/ably/chat/TaskScheduler.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,53 @@ package com.ably.chat

import io.ably.lib.types.AblyException
import io.ably.lib.types.ErrorInfo
import java.util.concurrent.Executors
import kotlin.time.DurationUnit
import kotlin.time.toDuration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.Assert
import org.junit.Test

class TaskSchedulerTest {
class SingleThreadedExecutorTest {

@Test
fun `should perform given operation`() = runTest {
val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher))
val taskResult = taskScheduler.schedule {
val singleThreadedExecutor = SingleThreadedExecutor.create()
val deferredResult = singleThreadedExecutor.run {
delay(3000)
return@schedule "Operation Success!"
return@run "Operation Success!"
}
val result = taskResult.await()
val result = deferredResult.await()
Assert.assertEquals("Operation Success!", result)
}

@Test
fun `should capture failure of the given operation`() = runTest {
val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher))
val taskResult = taskScheduler.schedule {
val singleThreadedExecutor = SingleThreadedExecutor.create()
val deferredResult = singleThreadedExecutor.run {
delay(2000)
throw AblyException.fromErrorInfo(ErrorInfo("Error performing operation", 400))
}
Assert.assertThrows("Error performing operation", AblyException::class.java) {
runBlocking {
taskResult.await()
deferredResult.await()
}
}
}

@Test
fun `should perform mutually exclusive operations with given priority`() = runTest {
val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val taskScheduler = TaskScheduler(CoroutineScope(singleThreadedDispatcher))
val taskResults = mutableListOf<Deferred<Int>>()
val singleThreadedExecutor = SingleThreadedExecutor.create()
val deferredResults = mutableListOf<Deferred<Int>>()
var operationInProgress = false
var counter = 0
val threadIds = mutableSetOf<Long>()

repeat(20) {
val result = taskScheduler.schedule(it) {
val result = singleThreadedExecutor.run(it) {
threadIds.add(Thread.currentThread().id)
if (operationInProgress) {
error("Can't perform operation when other operation is going on")
Expand All @@ -63,12 +57,12 @@ class TaskSchedulerTest {
delay((200..800).random().toDuration(DurationUnit.MILLISECONDS))
operationInProgress = false
val returnValue = counter++
return@schedule returnValue
return@run returnValue
}
taskResults.add(result)
deferredResults.add(result)
}

val results = taskResults.awaitAll()
val results = deferredResults.awaitAll()
repeat(20) {
Assert.assertEquals(it, results[it])
}
Expand Down

0 comments on commit 4483533

Please sign in to comment.