Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
durban committed Nov 26, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent eb918fa commit 447b732
Showing 28 changed files with 205 additions and 176 deletions.
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ class AsyncBenchmark {
var size: Int = _

def evalAsync(n: Int): IO[Int] =
IO.async_(_(Right(n)))
IO.async_ { cb => cb(Right(n)); () }

def evalCancelable(n: Int): IO[Int] =
IO.async[Int] { cb =>
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ private[effect] sealed abstract class WorkStealingThreadPool[P] private ()
private[effect] def reschedule(runnable: Runnable): Unit
private[effect] def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable
private[effect] def sleep(
delay: FiniteDuration,
task: Runnable,
19 changes: 10 additions & 9 deletions core/js/src/main/scala/cats/effect/CallbackStack.scala
Original file line number Diff line number Diff line change
@@ -20,17 +20,17 @@ import scala.scalajs.js

import CallbackStack.Handle

private trait CallbackStack[A] extends js.Object
private trait CallbackStack[A, B] extends js.Object

private final class CallbackStackOps[A](private val callbacks: js.Array[A => Unit])
private final class CallbackStackOps[A, B](private val callbacks: js.Array[A => B])
extends AnyVal {

@inline def push(next: A => Unit): Handle[A] = {
@inline def push(next: A => B): Handle[A] = {
callbacks.push(next)
callbacks.length - 1
}

@inline def unsafeSetCallback(cb: A => Unit): Unit = {
@inline def unsafeSetCallback(cb: A => B): Unit = {
callbacks(callbacks.length - 1) = cb
}

@@ -42,7 +42,7 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni
callbacks
.asInstanceOf[js.Dynamic]
.reduceRight( // skips deleted indices, but there can still be nulls
(acc: Boolean, cb: A => Unit) =>
(acc: Boolean, cb: A => B) =>
if (cb ne null) { cb(oc); true }
else acc,
false)
@@ -66,11 +66,12 @@ private final class CallbackStackOps[A](private val callbacks: js.Array[A => Uni
}

private object CallbackStack {
@inline def of[A](cb: A => Unit): CallbackStack[A] =
js.Array(cb).asInstanceOf[CallbackStack[A]]

@inline implicit def ops[A](stack: CallbackStack[A]): CallbackStackOps[A] =
new CallbackStackOps(stack.asInstanceOf[js.Array[A => Unit]])
@inline def of[A, B](cb: A => B): CallbackStack[A, B] =
js.Array(cb).asInstanceOf[CallbackStack[A, B]]

@inline implicit def ops[A, B](stack: CallbackStack[A, B]): CallbackStackOps[A, B] =
new CallbackStackOps(stack.asInstanceOf[js.Array[A => B]])

type Handle[A] = Int
}
36 changes: 17 additions & 19 deletions core/jvm-native/src/main/scala/cats/effect/CallbackStack.scala
Original file line number Diff line number Diff line change
@@ -24,8 +24,8 @@ import CallbackStack.Handle
import CallbackStack.Node
import Platform.static

private final class CallbackStack[A](private[this] var callback: A => Unit)
extends AtomicReference[Node[A]] {
private final class CallbackStack[A, B](private[this] var callback: A => B)
extends AtomicReference[Node[A, B]] {
head =>

private[this] val allowedToPack = new AtomicBoolean(true)
@@ -34,11 +34,11 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
* Pushes a callback to the top of the stack. Returns a handle that can be used with
* [[clearHandle]] to clear the callback.
*/
def push(cb: A => Unit): Handle[A] = {
def push(cb: A => B): Handle[A, B] = {
val newHead = new Node(cb)

@tailrec
def loop(): Handle[A] = {
def loop(): Handle[A, B] = {
val currentHead = head.get()
newHead.setNext(currentHead)

@@ -51,7 +51,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
loop()
}

def unsafeSetCallback(cb: A => Unit): Unit = {
def unsafeSetCallback(cb: A => B): Unit = {
callback = cb
}

@@ -103,7 +103,7 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
* Removes the callback referenced by a handle. Returns `true` if the data structure was
* cleaned up immediately, `false` if a subsequent call to [[pack]] is required.
*/
def clearHandle(handle: CallbackStack.Handle[A]): Boolean = {
def clearHandle(handle: CallbackStack.Handle[A, B]): Boolean = {
handle.clear()
false
}
@@ -156,23 +156,23 @@ private final class CallbackStack[A](private[this] var callback: A => Unit)
}

private object CallbackStack {
@static def of[A](cb: A => Unit): CallbackStack[A] =
@static def of[A, B](cb: A => B): CallbackStack[A, B] =
new CallbackStack(cb)

sealed abstract class Handle[A] {
sealed abstract class Handle[A, B] {
private[CallbackStack] def clear(): Unit
}

private[CallbackStack] final class Node[A](
private[this] var callback: A => Unit
) extends Handle[A] {
private[this] var next: Node[A] = _
private[CallbackStack] final class Node[A, B](
private[this] var callback: A => B
) extends Handle[A, B] {
private[this] var next: Node[A, B] = _

def getCallback(): A => Unit = callback
def getCallback(): A => B = callback

def getNext(): Node[A] = next
def getNext(): Node[A, B] = next

def setNext(next: Node[A]): Unit = {
def setNext(next: Node[A, B]): Unit = {
this.next = next
}

@@ -184,7 +184,7 @@ private object CallbackStack {
* Packs this head node
*/
@tailrec
def packHead(bound: Int, removed: Int, root: CallbackStack[A]): Int = {
def packHead(bound: Int, removed: Int, root: CallbackStack[A, B]): Int = {
val next = this.next // local copy

if (callback == null) {
@@ -224,7 +224,7 @@ private object CallbackStack {
* Packs this non-head node
*/
@tailrec
private def packTail(bound: Int, removed: Int, prev: Node[A]): Int = {
private def packTail(bound: Int, removed: Int, prev: Node[A, B]): Int = {
val next = this.next // local copy

if (callback == null) {
@@ -251,7 +251,5 @@ private object CallbackStack {
}
}
}

override def toString(): String = s"Node($callback, $next)"
}
}
3 changes: 2 additions & 1 deletion core/jvm/src/main/scala/cats/effect/IOFiberPlatform.scala
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) {
IO.async[Any] { nextCb =>
for {
done <- IO(new AtomicBoolean(false))
cb <- IO(new AtomicReference[Either[Throwable, Unit] => Unit](null))
cb <- IO(new AtomicReference[Either[Throwable, Unit] => Boolean](null))

canInterrupt <- IO(new juc.Semaphore(0))
manyDone <- IO(new AtomicBoolean(false))
@@ -100,6 +100,7 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) {
val cb0 = cb.getAndSet(null)
if (cb0 != null) {
cb0(RightUnit)
()
}
}
}
Original file line number Diff line number Diff line change
@@ -130,7 +130,8 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
}

cb(Right(Some(cancel)))
} catch { case ex if NonFatal(ex) => cb(Left(ex)) }
()
} catch { case ex if NonFatal(ex) => cb(Left(ex)); () }
}
}
}
@@ -155,7 +156,7 @@ object SelectorSystem {
private var head: Node = null
private var last: Node = null

def append(interest: Int, callback: Either[Throwable, Int] => Unit): Node = {
def append(interest: Int, callback: Either[Throwable, Int] => Boolean): Node = {
val node = new Node(interest, callback)
if (last ne null) {
last.next = node
@@ -181,7 +182,7 @@ object SelectorSystem {

final class Node(
var interest: Int,
var callback: Either[Throwable, Int] => Unit
var callback: Either[Throwable, Int] => Boolean
) {
var prev: Node = null
var next: Node = null
@@ -200,5 +201,4 @@ object SelectorSystem {
}
}
}

}
16 changes: 8 additions & 8 deletions core/jvm/src/main/scala/cats/effect/unsafe/TimerHeap.scala
Original file line number Diff line number Diff line change
@@ -108,19 +108,19 @@ private final class TimerHeap extends AtomicInteger {
/**
* for testing
*/
def peekFirstQuiescent(): Right[Nothing, Unit] => Unit = {
def peekFirstQuiescent(): Right[Nothing, Unit] => Boolean = {
if (size > 0) heap(1).get()
else null
}

/**
* only called by owner thread
*/
def pollFirstIfTriggered(now: Long): Right[Nothing, Unit] => Unit = {
def pollFirstIfTriggered(now: Long): Right[Nothing, Unit] => Boolean = {
val heap = this.heap // local copy

@tailrec
def loop(): Right[Nothing, Unit] => Unit = if (size > 0) {
def loop(): Right[Nothing, Unit] => Boolean = if (size > 0) {
val root = heap(1)
val rootDeleted = root.isDeleted()
val rootExpired = !rootDeleted && isExpired(root, now)
@@ -176,8 +176,8 @@ private final class TimerHeap extends AtomicInteger {
def insert(
now: Long,
delay: Long,
callback: Right[Nothing, Unit] => Unit,
out: Array[Right[Nothing, Unit] => Unit]
callback: Right[Nothing, Unit] => Boolean,
out: Array[Right[Nothing, Unit] => Boolean]
): Function0[Unit] with Runnable = if (size > 0) {
val heap = this.heap // local copy
val triggerTime = computeTriggerTime(now, delay)
@@ -428,21 +428,21 @@ private final class TimerHeap extends AtomicInteger {

private final class Node(
val triggerTime: Long,
private[this] var callback: Right[Nothing, Unit] => Unit,
private[this] var callback: Right[Nothing, Unit] => Boolean,
var index: Int
) extends Function0[Unit]
with Runnable {

private[this] var canceled: Boolean = false

def getAndClear(): Right[Nothing, Unit] => Unit = {
def getAndClear(): Right[Nothing, Unit] => Boolean = {
val back = callback
if (back ne null) // only clear if we read something
callback = null
back
}

def get(): Right[Nothing, Unit] => Unit = callback
def get(): Right[Nothing, Unit] => Boolean = callback

/**
* Cancel this timer.
Original file line number Diff line number Diff line change
@@ -637,7 +637,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
*/
def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable = {
val thread = Thread.currentThread()
if (thread.isInstanceOf[WorkerThread[_]]) {
val worker = thread.asInstanceOf[WorkerThread[P]]
@@ -658,7 +658,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
*/
private[this] final def sleepExternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable = {
val scheduledAt = monotonicNanos()
val cancel = new ExternalSleepCancel

@@ -671,14 +671,18 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
}

override def sleep(delay: FiniteDuration, task: Runnable): Runnable = {
val cb = new AtomicBoolean with (Right[Nothing, Unit] => Unit) { // run at most once
val cb = new AtomicBoolean with (Right[Nothing, Unit] => Boolean) { // run at most once
def apply(ru: Right[Nothing, Unit]) = if (compareAndSet(false, true)) {
try {
task.run()
true
} catch {
case ex if NonFatal(ex) =>
reportFailure(ex)
true // FIXME
}
} else {
false
}
}

8 changes: 4 additions & 4 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
@@ -172,7 +172,7 @@ private[effect] final class WorkerThread[P <: AnyRef](

def sleep(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable =
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable =
sleepImpl(nanoTime(), delay.toNanos, callback)

/**
@@ -181,7 +181,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
def sleepLate(
scheduledAt: Long,
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable = {
val _now = nanoTime()
val newDelay = delay.toNanos - (_now - scheduledAt)
if (newDelay > 0) {
@@ -195,8 +195,8 @@ private[effect] final class WorkerThread[P <: AnyRef](
private[this] def sleepImpl(
now: Long,
delay: Long,
callback: Right[Nothing, Unit] => Unit): Function0[Unit] with Runnable = {
val out = new Array[Right[Nothing, Unit] => Unit](1)
callback: Right[Nothing, Unit] => Boolean): Function0[Unit] with Runnable = {
val out = new Array[Right[Nothing, Unit] => Boolean](1)

// note that blockers aren't owned by the pool, meaning we only end up here if !blocking
val cancel = sleepers.insert(
12 changes: 8 additions & 4 deletions core/native/src/main/scala/cats/effect/unsafe/EpollSystem.scala
Original file line number Diff line number Diff line change
@@ -94,10 +94,10 @@ object EpollSystem extends PollingSystem {
) extends FileDescriptorPollHandle {

private[this] var readReadyCounter = 0
private[this] var readCallback: Either[Throwable, Int] => Unit = null
private[this] var readCallback: Either[Throwable, Int] => Boolean = null

private[this] var writeReadyCounter = 0
private[this] var writeCallback: Either[Throwable, Int] => Unit = null
private[this] var writeCallback: Either[Throwable, Int] => Boolean = null

def notify(events: Int): Unit = {
if ((events & EPOLLIN) != 0) {
@@ -112,7 +112,10 @@ object EpollSystem extends PollingSystem {
writeReadyCounter = counter
val cb = writeCallback
writeCallback = null
if (cb ne null) cb(Right(counter))
if (cb ne null) {
cb(Right(counter))
()
}
}
}

@@ -226,7 +229,7 @@ object EpollSystem extends PollingSystem {
reads: Boolean,
writes: Boolean,
handle: PollHandle,
cb: Either[Throwable, (PollHandle, IO[Unit])] => Unit
cb: Either[Throwable, (PollHandle, IO[Unit])] => Boolean
): Unit = {
val event = stackalloc[epoll_event]()
event.events =
@@ -247,6 +250,7 @@ object EpollSystem extends PollingSystem {
}

cb(result)
()
}

@alwaysinline private[this] def toPtr(handle: PollHandle): Ptr[Byte] =
Loading

0 comments on commit 447b732

Please sign in to comment.