Skip to content

Commit

Permalink
Fix LinkedListChannel.onUndeliveredElement call on channel cancel
Browse files Browse the repository at this point in the history
Fixes #2435
  • Loading branch information
elizarov committed Dec 11, 2020
1 parent fa30140 commit 0f21190
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 3 deletions.
16 changes: 15 additions & 1 deletion kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,14 @@ internal abstract class AbstractSendChannel<E>(
override val pollResult: Any? get() = element
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeSend() {}
override fun resumeSendClosed(closed: Closed<*>) {}

/**
* This method should be never called, see special logic in [LinkedListChannel.onCancelIdempotentList].
*/
override fun resumeSendClosed(closed: Closed<*>) {
assert { false }
}

override fun toString(): String = "SendBuffered@$hexAddress($element)"
}
}
Expand Down Expand Up @@ -669,6 +676,13 @@ internal abstract class AbstractChannel<E>(
// Add to the list only **after** successful removal
list += previous as Send
}
onCancelIdempotentList(list, closed)
}

/**
* This method is overriden by [LinkedListChannel] to handle cancellation of [SendBuffered] elements from the list.
*/
protected open fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {
list.forEachReversed { it.resumeSendClosed(closed) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ internal open class ArrayChannel<E>(
}
// then clean all queued senders
super.onCancelIdempotent(wasClosed)
undeliveredElementException?.let { throw it } // throw cancel exception at the end if there was one
undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
}

// ------ debug ------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ internal open class ConflatedChannel<E>(onUndeliveredElement: OnUndeliveredEleme
undeliveredElementException = updateValueLocked(EMPTY)
}
super.onCancelIdempotent(wasClosed)
undeliveredElementException?.let { throw it } // throw exception at the end if there was one
undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
}

private fun updateValueLocked(element: Any?): UndeliveredElementException? {
Expand Down
14 changes: 14 additions & 0 deletions kotlinx-coroutines-core/common/src/channels/LinkedListChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,19 @@ internal open class LinkedListChannel<E>(onUndeliveredElement: OnUndeliveredElem
}
}
}

override fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {
var undeliveredElementException: UndeliveredElementException? = null
list.forEachReversed {
when (it) {
is SendBuffered<*> -> {
@Suppress("UNCHECKED_CAST")
undeliveredElementException = onUndeliveredElement?.callUndeliveredElementCatchingException(it.element as E, undeliveredElementException)
}
else -> it.resumeSendClosed(closed)
}
}
undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ class ChannelUndeliveredElementTest : TestBase() {
assertTrue(resA.isCancelled) // now cancelled in buffer
}

@Test
fun testUnlimitedChannelCancelled() = runTest {
val channel = Channel<Resource>(Channel.UNLIMITED) { it.cancel() }
val resA = Resource("A")
val resB = Resource("B")
channel.send(resA) // goes to buffer
channel.send(resB) // goes to buffer
assertFalse(resA.isCancelled) // it is in buffer, not cancelled
assertFalse(resB.isCancelled) // it is in buffer, not cancelled
channel.cancel() // now cancel the channel
assertTrue(resA.isCancelled) // now cancelled in buffer
assertTrue(resB.isCancelled) // now cancelled in buffer
}

@Test
fun testConflatedResourceCancelled() = runTest {
val channel = Channel<Resource>(Channel.CONFLATED) { it.cancel() }
Expand Down

0 comments on commit 0f21190

Please sign in to comment.