Skip to content

Commit

Permalink
Added Fiber.cancel support
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Dec 20, 2024
1 parent b579984 commit 121d287
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 4 deletions.
2 changes: 2 additions & 0 deletions core/js/src/main/scala/rapid/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ import scala.concurrent.ExecutionContext
object Platform extends RapidPlatform {
override def executionContext: ExecutionContext = org.scalajs.macrotaskexecutor.MacrotaskExecutor.Implicits.global

override def supportsCancel: Boolean = false

override def createFiber[Return](task: Task[Return]): Fiber[Return] = new FutureFiber[Return](task)
}
2 changes: 2 additions & 0 deletions core/jvm/src/main/scala/rapid/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ import scala.concurrent.ExecutionContext
object Platform extends RapidPlatform {
override def executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

override def supportsCancel: Boolean = true

override def createFiber[Return](task: Task[Return]): Fiber[Return] = new VirtualThreadFiber[Return](task)
}
25 changes: 21 additions & 4 deletions core/jvm/src/main/scala/rapid/VirtualThreadFiber.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
package rapid

import java.util.concurrent.CancellationException
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
import scala.util.{Failure, Try}

class VirtualThreadFiber[Return](val task: Task[Return]) extends Blockable[Return] with Fiber[Return] {
private var result: Try[Return] = _
@volatile private var result: Try[Return] = _
@volatile private var cancelled = false

private val thread = Thread.startVirtualThread(() => {
result = Try(task.sync())
if (!cancelled) {
result = Try(task.sync())
}
})

override protected def invoke(): Return = {
thread.join()
if (result == null && cancelled) {
result = Failure(new CancellationException())
}
result.get
}

override def cancel(): Task[Boolean] = Task {
if (!cancelled) {
cancelled = true
thread.interrupt()
true
} else {
false
}
}

override def await(duration: FiniteDuration): Option[Return] = if (thread.join(java.time.Duration.ofMillis(duration.toMillis))) {
Some(result.get)
Option(result).flatMap(_.toOption)
} else {
None
}
Expand Down
2 changes: 2 additions & 0 deletions core/native/src/main/scala/rapid/Platform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ import scala.concurrent.ExecutionContext
object Platform extends RapidPlatform {
override def executionContext: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global

override def supportsCancel: Boolean = false

override def createFiber[Return](task: Task[Return]): Fiber[Return] = new FutureBlockableFiber[Return](task)
}
5 changes: 5 additions & 0 deletions core/shared/src/main/scala/rapid/Fiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import scala.concurrent.{Await, Future}
trait Fiber[Return] extends Task[Return] {
override def start(): Fiber[Return] = this

/**
* Attempts to cancel the Fiber. Returns true if successful.
*/
def cancel(): Task[Boolean] = Task.pure(false)

override def await(): Return = invoke()
}

Expand Down
12 changes: 12 additions & 0 deletions core/shared/src/main/scala/rapid/RapidApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package rapid

/**
* Provides convenience functionality for running an application as a Task
*/
trait RapidApp {
def main(args: Array[String]): Unit = {
run(args.toList).sync()
}

def run(args: List[String]): Task[Unit]
}
2 changes: 2 additions & 0 deletions core/shared/src/main/scala/rapid/RapidPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ import scala.concurrent.ExecutionContext
trait RapidPlatform {
def executionContext: ExecutionContext

def supportsCancel: Boolean

def createFiber[Return](task: Task[Return]): Fiber[Return]
}
12 changes: 12 additions & 0 deletions core/shared/src/test/scala/spec/BasicsSyncSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.scalatest.wordspec.AnyWordSpec
import rapid._

import scala.concurrent.CancellationException

class BasicsSyncSpec extends AnyWordSpec with Matchers {
"Basics sync" should {
"handle a simple task" in {
Expand Down Expand Up @@ -43,5 +45,15 @@ class BasicsSyncSpec extends AnyWordSpec with Matchers {
)
list.tasks.sync() should be(List("One", "Two", "Three"))
}
// TODO: Re-enable once this can work with JS
/*"cancel a running task" in {
if (Platform.supportsCancel) {
val start = System.currentTimeMillis()
val fiber = Task.sleep(1.hour).map(_ => "Never").start()
fiber.cancel().sync()
a[CancellationException] should be thrownBy fiber.sync()
(System.currentTimeMillis() - start) should be < 1000L
}
}*/
}
}

0 comments on commit 121d287

Please sign in to comment.