Skip to content

Commit

Permalink
Fix Channel dropping values in certain conditions
Browse files Browse the repository at this point in the history
The type std.sync.Channel is built on top of the Future and Promise
types. The implementation of Promise.set was such that if it's
disconnected prior to a write, the value would be dropped silently.
Combined with how Channel was implemented, this could result in sent
values being discarded.

To resolve this, Promise.set now returns the value to set if its
corresponding Future is already dropped, allowing callers to act
accordingly. The implementation of Channel in turn is changed to take
advantage of this, and to ensure values are always resolved in FIFO
order.

Changelog: fixed
  • Loading branch information
yorickpeterse committed Jan 2, 2025
1 parent 888832e commit 98c7f1e
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 10 deletions.
85 changes: 76 additions & 9 deletions std/src/std/sync.inko
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type pub Future[T] {
# cases as doing so is notoriously difficult.
#
# To avoid a deadlock, make sure to always write a value to a `Promise`
# _before_ discarding it.
# _before_ discarding it, or use `Future.get_until` to wait using a deadline.
#
# # Examples
#
Expand Down Expand Up @@ -367,23 +367,53 @@ type pub Promise[T] {
#
# This method never blocks the calling process.
#
# # Disconnected writes
#
# If the corresponding `Future` is dropped, this method returns the value
# wrapped in an `Option.Some`, otherwise an `Option.None` is returned. This
# allows callers to detect a disconnected `Promise` and act accordingly, such
# as by storing the value elsewhere.
#
# # Examples
#
# Resolving a `Future` using a `Promise`:
#
# ```inko
# import std.sync (Future)
#
# match Future.new {
# case (future, promise) -> {
# promise.set(42)
# future.get # => 42
# promise.set(42) # => Option.None
# future.get # => 42
# }
# }
# ```
#
# Trying to resolve a dropped `Future`:
#
# ```inko
# import std.sync (Future)
#
# match Future.new {
# case (future, promise) -> {
# drop(future)
# promise.set(42) # => Option.Some(42)
# }
# }
# ```
fn pub move set(value: uni T) {
fn pub move set(value: uni T) -> Option[uni T] {
let val = Option.Some(value)
let fut = lock

if fut.connected.false? {
fut.unlock
_INKO.moved(fut)
return val
}

let waiter = fut.waiter := NO_WAITER as Pointer[UInt8]

fut.value = Option.Some(value)
fut.value = val
fut.unlock

# Ensure we don't drop the shared state.
Expand All @@ -393,6 +423,8 @@ type pub Promise[T] {
if waiter as Int != NO_WAITER {
inko_process_reschedule_for_value(_INKO.state, _INKO.process, waiter)
}

Option.None
}

fn lock -> FutureState[uni T] {
Expand Down Expand Up @@ -431,15 +463,50 @@ type async ChannelState[T] {
}

fn async mut send(value: uni T) {
match @promises.pop_front {
case Some(p) -> p.set(value)
case _ -> @values.push_back(value)
@values.push_back(value)

# Now that we have at least one (but possibly multiple) values, we can start
# flushing them to the queued up Promises (if any). This ensures that both
# Promises and values are processed in FIFO order, and as soon as possible.
#
# An important detail here is the handling of disconnected Promises.
# Consider the following case: we have 3 Promises, and the first two are
# disconnected. When sending a value, we want to skip the first two and
# resolve the third Promise.
loop {
match (@promises.pop_front, @values.pop_front) {
case (Some(p), Some(v)) -> {
# If the Promise is disconnected we'll try the value again with the
# next Promise.
match p.set(v) {
case Some(v) -> @values.push_front(v)
case _ -> {}
}

next
}
case (Some(p), _) -> @promises.push_front(p)
case (_, Some(v)) -> @values.push_front(v)
case _ -> {}
}

# At this point we either only have Promises or values, meaning there's
# nothing left to do.
break
}
}

fn async mut receive(promise: uni Promise[uni T]) {
match @values.pop_front {
case Some(v) -> promise.set(v)
case Some(v) -> {
# If the Promise is disconnected at this point we push the value back to
# the _start_ such that the next Promise will resolve to it, instead of
# the last Promise.
match promise.set(v) {
case Some(v) -> @values.push_front(v)
case _ -> {}
}
}
case _ -> @promises.push_back(promise)
}
}
Expand Down
31 changes: 30 additions & 1 deletion std/test/std/test_sync.inko
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,20 @@ fn pub tests(t: mut Tests) {
}
})

t.test('Promise.set', fn (t) {
match int_future {
case (r, w) -> {
t.equal(w.set(42), Option.None)
t.equal(r.get, 42)
}
}
})

t.test('Promise.set with a dropped Future', fn (t) {
match int_future {
case (r, w) -> {
drop(r)
w.set(42)
t.equal(w.set(42), Option.Some(42))
}
}
})
Expand Down Expand Up @@ -117,6 +126,26 @@ fn pub tests(t: mut Tests) {
t.equal(chan.receive_until(deadline), Option.None)
})

t.test('Channel.receive_until after a previous receive timed out', fn (t) {
let chan: Channel[Int] = Channel.new

t.equal(chan.receive_until(Duration.from_millis(5)), Option.None)
chan.send(10)
chan.send(20)
t.equal(chan.receive_until(Duration.from_millis(5)), Option.Some(10))
t.equal(chan.receive_until(Duration.from_millis(5)), Option.Some(20))
})

t.test('Channel.receive after a previous receive timed out', fn (t) {
let chan: Channel[Int] = Channel.new

t.equal(chan.receive_until(Duration.from_millis(5)), Option.None)
chan.send(10)
chan.send(20)
t.equal(chan.receive, 10)
t.equal(chan.receive, 20)
})

t.test('Channel.clone', fn (t) {
let chan1 = Channel.new
let chan2 = chan1.clone
Expand Down

0 comments on commit 98c7f1e

Please sign in to comment.