Skip to content

Commit

Permalink
Future.get_until is only usable once
Browse files Browse the repository at this point in the history
  • Loading branch information
yorickpeterse committed Jan 3, 2025
1 parent 22dcd9b commit ddf2eeb
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 57 deletions.
156 changes: 109 additions & 47 deletions std/src/std/sync.inko
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ let NO_WAITER = 0
let UNLOCKED = 0
let LOCKED = 1

type copy enum Status {
case Connected
case NoFuture
case NoPromise
}

# The state shared between a `Future` and a `Promise`.
type FutureState[T] {
# A spinlock used to restrict access to the state to a single thread/process
Expand All @@ -76,11 +82,8 @@ type FutureState[T] {
# them to begin with for this particular workload.
let @locked: UInt8

# A flag indicating if both the `Future` and `Promise` still exist.
#
# When either the `Future` or `Promise` is dropped, it sets this flag to
# `false` and the other half is responsible for cleaning up the shared state.
let @connected: Bool
# The status of the `Future` and `Promise`.
let @status: Status

# The process waiting for a value to be written to the future.
#
Expand Down Expand Up @@ -146,7 +149,7 @@ type pub Future[T] {
let fut: FutureState[uni T] = FutureState(
waiter: NO_WAITER as Pointer[UInt8],
locked: UNLOCKED as UInt8,
connected: true,
status: Status.Connected,
value: Option.None,
)

Expand Down Expand Up @@ -210,8 +213,8 @@ type pub Future[T] {
inko_process_wait_for_value(
_INKO.process,
mut fut.locked,
1 as UInt8,
0 as UInt8,
LOCKED as UInt8,
UNLOCKED as UInt8,
)

# Ensure the shared state isn't dropped.
Expand All @@ -224,9 +227,23 @@ type pub Future[T] {
# Returns the value of the future, blocking the calling process until a value
# is available or the given deadline is exceeded.
#
# If a value is resolved within the deadline, a `Result.Ok` containing the
# value is returned. If the timeout expired, a `Result.Error` is returned
# containing a new `Future` to use for resolving the value.
# If a value is resolved within the deadline, an `Option.Some` containing the
# value is returned. If the timeout expired, an `Option.None` is returned.
#
# In both cases `self` is consumed. This is because trying to wait for a
# result is inherently racy, and may result in unexpected results. For
# example, if a value were to be written using `Promise.set` _just_ after we
# return from this method, we wouldn't observe it unless the operation is
# retried. If we don't do so, the value would be dropped.
#
# However, it's more often than not clear how often the operation should be
# retried, as the time waited might not necessarily be the same or longer as
# the time it takes before `Promise.set` is called.
#
# Always consuming `self` instead forces the caller to create a new `Promise`
# and `Future` pair _if_ a retry is desired, and ensures that _if_
# `Promise.set` is called _after_ returning from this method the value passed
# to `Promise.set` is returned to its caller.
#
# # Deadlocks
#
Expand All @@ -244,13 +261,11 @@ type pub Future[T] {
# match Future.new {
# case (future, promise) -> {
# promise.set(42)
# future.get_until(Duration.from_secs(1)) # => Result.Ok(42)
# future.get_until(Duration.from_secs(1)) # => Option.Some(42)
# }
# }
# ```
fn pub move get_until[D: ToInstant](
deadline: ref D,
) -> Result[uni T, Future[T]] {
fn pub move get_until[D: ToInstant](deadline: ref D) -> Option[uni T] {
let nanos = deadline.to_instant.to_int as UInt64

loop {
Expand All @@ -262,7 +277,7 @@ type pub Future[T] {

# Ensure the shared state isn't dropped.
_INKO.moved(fut)
return Result.Ok(val)
return Option.Some(val)
}
case _ -> {
fut.waiter = _INKO.process
Expand All @@ -273,18 +288,42 @@ type pub Future[T] {
_INKO.state,
_INKO.process,
mut fut.locked,
1 as UInt8,
0 as UInt8,
LOCKED as UInt8,
UNLOCKED as UInt8,
nanos,
)

# Ensure the shared state isn't dropped.
_INKO.moved(fut)

if timed_out { return Result.Error(self) }
if timed_out { break }
}
}
}

# It's possible for a write to happen _just_ after we time out. We don't
# want to silently discard the value in that case. In addition, it's
# possible for a value to be written after returning from this method, which
# would result in the value also being lost.
#
# To prevent this from happening we disconnect the future immediately and
# perform a final check to see if a value is present. This ensures that
# beyond this point any values written using `Promise.set` are returned to
# the caller, instead of just being dropped.
let fut = lock

match fut.status {
case Connected -> fut.status = Status.NoFuture
case _ -> {}
}

let val = fut.value := Option.None

fut.unlock

# Ensure the shared state isn't dropped.
_INKO.moved(fut)
val
}

# Returns the value of the future if one is present, without blocking the
Expand Down Expand Up @@ -337,18 +376,27 @@ impl Drop for Future {
fn mut drop {
let fut = lock

if fut.connected {
# The `Promise` is still present, so it will be tasked with cleaning up
# the shared state.
fut.connected = false
fut.unlock

# Ensure the shared state isn't dropped.
_INKO.moved(fut)
} else {
# The `Promise` is already dropped, so it's our job to clean up the shared
# state.
drop_value(fut)
match fut.status {
case Connected -> {
fut.status = Status.NoFuture
fut.unlock

# Ensure the shared state isn't dropped.
_INKO.moved(fut)
}
case NoPromise -> {
# The `Promise` is already dropped, so it's our job to clean up the
# shared state.
drop_value(fut)
}
case _ -> {
# We can encounter this branch if Future.get_until times out because it
# sets the status to NoFuture.
fut.unlock

# Ensure the shared state isn't dropped.
_INKO.moved(fut)
}
}
}
}
Expand Down Expand Up @@ -405,18 +453,23 @@ type pub Promise[T] {
let val = Option.Some(value)
let fut = lock

if fut.connected.false? {
fut.unlock
_INKO.moved(fut)
return val
match fut.status {
case NoFuture -> {
fut.unlock

# Ensure the shared state isn't dropped.
_INKO.moved(fut)
return val
}
case _ -> {}
}

let waiter = fut.waiter := NO_WAITER as Pointer[UInt8]

fut.value = val
fut.unlock

# Ensure we don't drop the shared state.
# Ensure the shared state isn't dropped.
_INKO.moved(fut)

# If the waiter is waiting for a value, we have to reschedule it.
Expand All @@ -439,16 +492,25 @@ impl Drop for Promise {
fn mut drop {
let fut = lock

if fut.connected {
# The `Future` is still present, so it will be tasked with cleaning up the
# shared state.
fut.connected = false
fut.unlock
_INKO.moved(fut)
} else {
# The `Future` is already dropped, so it's our job to clean up the shared
# state.
drop_value(fut)
match fut.status {
case Connected -> {
fut.status = Status.NoPromise
fut.unlock

# Ensure the shared state isn't dropped.
_INKO.moved(fut)
return
}
case NoFuture -> {
# The `Future` is already dropped, so it's our job to clean up the
# shared state.
drop_value(fut)
}
case _ -> {
# This ensures `fut` is moved in all branches, such that we don't try to
# double drop it outside the match.
_INKO.moved(fut)
}
}
}
}
Expand Down Expand Up @@ -622,7 +684,7 @@ type pub inline Channel[T] {
match Future.new {
case (future, promise) -> {
@state.receive(promise)
future.get_until(deadline).ok
future.get_until(deadline)
}
}
}
Expand Down
17 changes: 7 additions & 10 deletions std/test/std/test_sync.inko
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,17 @@ fn pub tests(t: mut Tests) {
}
})

t.ok('Future.get_until', fn (t) {
t.test('Future.get_until', fn (t) {
match int_future {
case (r, w) -> {
let r = match r.get_until(Duration.from_millis(1)) {
case Ok(_) -> throw 'expected an Error'
case Error(r) -> r
}
case (r, _w) -> t.equal(r.get_until(Duration.from_millis(1)), Option.None)
}

match int_future {
case (r, w) -> {
w.set(42)
t.equal(r.get_until(Duration.from_millis(1)).ok, Option.Some(42))
t.equal(r.get_until(Duration.from_millis(1)), Option.Some(42))
}
}

Result.Ok(nil)
})

t.ok('Future.try_get', fn (t) {
Expand All @@ -63,7 +60,7 @@ fn pub tests(t: mut Tests) {
match int_future {
case (r, w) -> {
drop(w)
t.true(r.get_until(Duration.from_millis(1)).error?)
t.true(r.get_until(Duration.from_millis(1)).none?)
}
}
})
Expand Down

0 comments on commit ddf2eeb

Please sign in to comment.