Skip to content

Commit

Permalink
Added separate class TaskResult for accessing result, updated tests a…
Browse files Browse the repository at this point in the history
…ccordingly
  • Loading branch information
sacOO7 committed Oct 18, 2024
1 parent 7c05911 commit e4d0870
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 15 deletions.
29 changes: 21 additions & 8 deletions chat-android/src/main/java/com/ably/chat/AtomicExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,26 @@ import kotlinx.coroutines.launch
private class Task(
private val priority: Int,
val coroutineBlock: suspend CoroutineScope.() -> Any,
val resultChannel: Channel<Result<Any>>)
val result: TaskResult<Any>)
: Comparable<Task> {
override fun compareTo(other: Task): Int {
return other.priority - this.priority
}
suspend fun setResult(result: Result<Any>) {
this.result.channel.send(result)
}
}

class TaskResult<T> {
// Size of channel is set to 1. This is to avoid sender getting blocked
// because receiver doesn't call receive on the channel
internal val channel = Channel<Result<T>>(1)

suspend fun await(): Result<T> {
val result = channel.receive()
channel.close()
return result
}
}

/**
Expand All @@ -30,26 +45,24 @@ class AtomicExecutor(private val scope: CoroutineScope) {
* This can also be set to negative number if operation needs higher priority than existing ones.
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
*/
suspend fun <T : Any>execute(priority:Int = 0, coroutineBlock: suspend CoroutineScope.() -> T) : Channel<Result<T>> {
// Size of resultChannel is set to 1 to keep while loop running.
// i.e. If caller doesn't explicitly receive on the channel, loop will be blocked.
val resultChannel = Channel<Result<Any>>(1)
tasks.add(Task(priority, coroutineBlock, resultChannel))
suspend fun <T : Any>execute(priority:Int = 0, coroutineBlock: suspend CoroutineScope.() -> T) : TaskResult<T> {
val taskResult = TaskResult<Any>()
tasks.add(Task(priority, coroutineBlock, taskResult))
scope.launch {
if (!isRunning) {
isRunning = true
while (tasks.isNotEmpty()) {
val task = tasks.poll()
task?.let {
val result = kotlin.runCatching { scope.async(block = it.coroutineBlock).await() }
it.resultChannel.send(result)
it.setResult(result)
}
}
isRunning = false
}
}

@Suppress("UNCHECKED_CAST")
return resultChannel as Channel<Result<T>>
return taskResult as TaskResult<T>
}
}
15 changes: 8 additions & 7 deletions chat-android/src/test/java/com/ably/chat/AtomicExecutorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.runTest
import org.junit.Assert.assertEquals
import org.junit.Assert
import org.junit.Test

class AtomicExecutorTest {
Expand All @@ -14,12 +14,13 @@ class AtomicExecutorTest {
fun `should perform given operation`() = runTest {
val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val atomicExecutor = AtomicExecutor(CoroutineScope(singleThreadedDispatcher))
val deferred = atomicExecutor.execute {
delay(5000)
return@execute "Hello"
val taskResult = atomicExecutor.execute {
delay(3000)
return@execute "Operation Success!"
}
val result = deferred.receive()
assert(result.isSuccess)
assertEquals("Hello", result.getOrNull())
val result = taskResult.await()
Assert.assertTrue(result.isSuccess)
Assert.assertFalse(result.isFailure)
Assert.assertEquals("Operation Success!", result.getOrNull())
}
}

0 comments on commit e4d0870

Please sign in to comment.