Skip to content

Commit

Permalink
Merge pull request #4001 from durban/issue3998
Browse files Browse the repository at this point in the history
Fix issue #3998
  • Loading branch information
djspiewak authored Feb 16, 2024
2 parents 87349d1 + de8b5d4 commit dca88ec
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 43 deletions.
92 changes: 49 additions & 43 deletions std/shared/src/main/scala/cats/effect/std/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -820,52 +820,58 @@ object Queue {

val size: F[Int] = F.delay(buffer.size())

val take: F[A] = F defer {
try {
// attempt to take from the buffer. if it's empty, this will raise an exception
F.pure(buffer.take())
} catch {
case FailureSignal =>
// buffer was empty
// capture the fact that our retry succeeded and the value we were able to take
var received = false
var result: A = null.asInstanceOf[A]

// a latch to block until some offerer wakes us up
val wait = F.async[Unit] { k =>
F delay {
// register ourselves as a listener for offers
val clear = takers.put(k)

try {
// now that we're registered, retry the take
result = buffer.take()

// it worked! clear out our listener
clear()
// we got a result, so received should be true now
received = true

// complete our own callback. see notes in offer about raced redundant completion
k(EitherUnit)

// we *might* have negated a notification by succeeding here
// unnecessary wake-ups are mostly harmless (only slight fairness loss)
notifyOne()

// don't bother with a finalizer since we're already complete
None
} catch {
case FailureSignal =>
// println(s"failed take size = ${buffer.size()}")
// our retry failed, we're registered as a listener, so suspend
Some(F.delay(clear()))
val take: F[A] = F uncancelable { poll =>
F defer {
try {
// attempt to take from the buffer. if it's empty, this will raise an exception
F.pure(buffer.take())
} catch {
case FailureSignal =>
// buffer was empty
// capture the fact that our retry succeeded and the value we were able to take
var received = false
var result: A = null.asInstanceOf[A]

// a latch to block until some offerer wakes us up
val wait = F.asyncCheckAttempt[Unit] { k =>
F delay {
// register ourselves as a listener for offers
val clear = takers.put(k)

try {
// now that we're registered, retry the take
result = buffer.take()

// it worked! clear out our listener
clear()
// we got a result, so received should be true now
received = true

// we *might* have negated a notification by succeeding here
// unnecessary wake-ups are mostly harmless (only slight fairness loss)
notifyOne()

// don't bother with a finalizer since we're already complete
EitherUnit
} catch {
case FailureSignal =>
// println(s"failed take size = ${buffer.size()}")
// our retry failed, we're registered as a listener, so suspend
Left(Some(F.delay(clear())))
}
}
}
}

// suspend until an offerer wakes us or our retry succeeds, then return a result
wait *> F.defer(if (received) F.pure(result) else take)
val notifyAnyway = F delay {
// we might have been awakened and canceled simultaneously
// try waking up another taker just in case
notifyOne()
}

// suspend until an offerer wakes us or our retry succeeds, then return a result
(poll(wait) *> F.defer(if (received) F.pure(result) else poll(take)))
.onCancel(notifyAnyway)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ class UnboundedQueueSpec extends BaseSpec with QueueTests[Queue] {
commonTests(_ => constructor, _.offer(_), _.tryOffer(_), _.take, _.tryTake, _.size)
batchTakeTests(_ => constructor, _.offer(_), _.tryTakeN(_))
batchOfferTests(_ => constructor, _.tryOfferN(_), _.tryTakeN(_))
cancelableTakeTests(_ => constructor, _.offer(_), _.take)
}
}

Expand Down

0 comments on commit dca88ec

Please sign in to comment.