Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider using structured concurrency instead of being able to spawn processes at any point #759

Closed
yorickpeterse opened this issue Sep 5, 2024 · 18 comments
Labels
compiler Changes related to the compiler feature New things to add to Inko, such as a new standard library module runtime Changes related to the Rust-based runtime library std Changes related to the standard library

Comments

@yorickpeterse
Copy link
Collaborator

yorickpeterse commented Sep 5, 2024

Description

The premise of structured concurrency is simple: you have something that signals a join scope, and a construct to spawn processes in that scope. When the end of the scope is reached, all processes are joined. Conceptually this is the same as having a channel with space for N messages, N processes that use the channel and send it a message when they're join, and N calls to Channel.receive to wait for the processes.

Structured concurrency makes reasoning about concurrency easier, because it's more clear where asynchronous work starts and ends. In addition, by making this part of the compiler/type system you can allow for interesting patterns such as sharing read-only access to data. This article is a good reference on the subject.

A hypothetical setup would be the following:

let nums = [10, 20, 30]

async {
  let handles = nums.iter.map(fn (v) { spawn { v * v } }).to_array

  Int.sum(handles.into_iter.map(fn (v) { v.value }))
}

The idea is that async signals a scope in which asynchronous operations can happen. spawn in turn spawns a new process in the inner most async block. Variables defined outside an async scope are exposed as immutable references. Values of type uni T are moved into an async scope upon capture by it or a spawn expression.

spawn expressions in turn can only capture variables that are value types (e.g. Int) or defined outside the async expression. If a spawn captures a variable of type uni T, the variable is moved into the spawn. This means such variables need to be assigned a new value if captured inside a spawn inside a loop.

When reaching the end of the async expression, we discard any unused values as usual, then join all the spawned processes. The return value of an async is the last expression, just as with regular scopes.

Processes in turn are given a value method. When called, it joins the process and returns whatever value the spawn expression returned. This is done by generating a class for each spawn with the appropriate fields (one for each capture), and by generating said value method. We also generate a dropper that does the same thing, but discards the value. The value is stored by generating a dedicated field (= the first one), writing the return value into that field, and setting a bit somewhere to indicate that we've in fact written a value (we can't use NULL / 0 because then returning integer 0 wouldn't work).

If the value is a T it can be lifted into a uni T through a recover. This is safe because upon termination any T returned can't have any references pointing to it in the old process.

Process messages is removed (i.e. no more fn async). Channels remain and would be used instead if some form of communication protocol is necessary. The concept of sendability remains (i.e. you still can't stick a ref User in a Channel). uni T values in turn are used if you want to transfer ownership of some complex data from one process to another, either via it capturing the variable or through a Channel.

Compared to the current process setup, structured concurrency would allow for more efficient fork-join workflows as processes can capture immutable data, something that's not possible with the class async setup (as immutable borrows aren't sendable).

For long-lived background processes, you simply create a top-level async of sorts, spawn the necessary processes in there, and include the rest of the logic in the async block, i.e:

async {
  spawn { background_work() }
  ... rest of the code ...
}

Related work

@yorickpeterse yorickpeterse added feature New things to add to Inko, such as a new standard library module compiler Changes related to the compiler std Changes related to the standard library runtime Changes related to the Rust-based runtime library labels Sep 5, 2024
@yorickpeterse
Copy link
Collaborator Author

yorickpeterse commented Sep 5, 2024

This approach has a challenge: if spawn expression can capture immutable borrows, then those borrows need to use atomic reference counting, as multiple processes may now mutate the reference count of the same value. Since we can't somehow automatically switch between the two, in practice that means always using atomic reference counting for borrows.

@yorickpeterse
Copy link
Collaborator Author

The problem here is related to #750. I think we could do something like this to make things better:

ref T and mut T become compile-time borrows, and don't use reference counting. These borrows can't be returned or stored in other data, nor are they compatible with generic type parameters (unless they too use borrows). In practice this means you can only pass them down the stack through arguments. These borrows are created using a borrow expression that takes a list of variables and exposes them as borrows. Through shadowing, the variable we're borrowing can't be moved:

let a = [10, 20]
let b = [30]

borrow a, mut b {
  a # => ref Array[Int]
  b # => mut Array[Int]
}

Such borrows can be created for any type, though borrowing Int, Float, String and other value types should just be a compile-time error as that's likely a mistake.

If we introduce structured concurrency, we can allow sharing ref T values with different processes, though this technically requires violating the "you can't store them rule" as the borrow has to be stored in the process. However, due to the structured nature that should be sound.

For more complex cases you could use borrows that use runtime borrow counting, let's call those rc T and rc mut T. These behave exactly the same way as ref T and mut T currently do, i.e. creating and disposing of them mutates the borrow count. These RC borrows can only be created from owned values or other RC borrows. RC borrows can only be created for types stored in a stable location (= the heap). Compile-time borrows can't be created from RC borrows though, as this would allow you to drop the borrowed data while the compile-time borrow is still used.

This basic setup is interesting, but introduces a problem: if a ref T can't be created from an rc T, because we can't guarantee the borrowed data isn't dropped, then passing rc T values around becomes really annoying. What we could do is that if you borrow an rc T value using borrow, we still increment the reference count and decrement it when the borrow is done, but only at the start and end of the scope. Because this behaviour is transitive, if at any point somewhere inside the borrow we drop the borrowed data, the increment done at the start of the borrow triggers the borrow check to panic at runtime.

In addition, we can make things easier by introducing implicit borrow expressions when passing non-compile-time borrows to arguments that expect them, similar to our current auto borrow logic. That is, foo(data) turns into borrow data { foo(data) } if data is expected to be a compile-time borrow.

RC borrows can't be shared between processes since they use regular reference counting. We could allow using those by simply exposing rc T values as ref T inside spawn expressions, provided the values are defined outside the async:

let nums = [10, 20, 30]

async {
  nums # => ref Array[Int]

  spawn { 
    nums.size # => OK, since ref Array[Int] is sendable and defined outside the async
  }
}

rc mut T values are never sendable, nor are ref T/mut T values defined inside the async.

@yorickpeterse
Copy link
Collaborator Author

A further enhancement of the above idea: not being able to store ref T and mut T is annoying, as you can't do things like this:

fn eq(a: ref Foo, b: ref Foo) -> Bool {
  match (a, b) {
    ...
  }
} 

To make such patterns possible, we could allow storing borrows in data if and only if the borrow doesn't escape the surrounding scope/function. This likely requires more complicated escape analysis though, so a simplification could be this:

A borrow can be stored in another value, but only if the value is not assigned to a variable, and does not receive calls to fn mut and fn move methods.

The assignment restriction means we can keep the check local to the expression, while the call restriction prevents the borrow from escaping through instance methods that either mutate or move the receiver. This may be overly strict though.

@yorickpeterse
Copy link
Collaborator Author

yorickpeterse commented Sep 5, 2024

The store restriction for compile-time borrows would be enforced as follows:

  • They can't be captured by a closure
  • They can't be the value assigned to a field
  • They're not compatible with T (a type parameter) and move T
  • They can't be written to pointers
  • They can't occur in a return type or as a generic type argument (i.e. Option[ref Something] is invalid)
  • They can't be used to create RC borrows

Not being able to capture borrows is annoying though, but allowing that would require escape analysis on the closure to determine if it outlives the borrow or not.

@yorickpeterse
Copy link
Collaborator Author

The borrowing proposal has a flaw: if a spawn expression can only capture variables defined outside the async scope, then a spawn can never capture the values yielded by an iterator. That is, code such as this would not compile:

let users = [User(...), User(...)]

async {
  users.iter.each(fn (u) {
    spawn { do_something(u) }
  }
}

Even if we did allow this somehow, there's the problem of borrowing: an iterator can't yield compile-time borrows because that would violate the "you can't store them" invariant. This means they'd have to produce RC borrows, which you can't capture in a spawn (and also wouldn't allow iterating over value types).

Of course we can still introduce structured concurrency and tackle the borrowing issue separately.

@yorickpeterse
Copy link
Collaborator Author

Here's another example of where structured concurrency is easier/requires less code:

class async Worker {
  fn async run(number: Int, out: Channel[Int]) {
    out.send(number * number)
  }
}

class async Main {
  fn async main {
    let numbers = [10, 20, 30]
    let chan = Channel.new(size: 3)

    numbers.into_iter.each(fn (num) { Worker().run(num, chan) })

    let mut total = 0
    let mut i = 0

    while i < 3 {
      total += chan.receive
      i += 1
    }
  }
}

This just illustrates a case where you have a bunch of values and want to compute something asynchronously, them collect the results in some way. This requires a bit of boilerplate (e.g. the class async definition), and manual fiddling with channels and loops. Of course you can abstract over that, but compare that to structured concurrency:

class async Main {
  fn async main {
    let numbers = [10, 20, 30]

    async {
      let handles = numbers
        .into_iter
        .map(fn (num) { spawn { num * num } })
        .to_array

      handles.into_iter.reduce(0, fn (acc, proc) { acc + proc.join })
    }
  }
}

This approach has several benefits:

  • No need to define a class async just to run some computation, though you can still push the computation into a dedicated class if necessary
  • Collecting the results is much easier and doesn't require channels, as we can rely on joining a process returning the value of the last expression
  • The usual benefits of structured concurrency (i.e. all processes are implicitly joined if not done so already)
  • We can build on top of this type-system wise. For example, if a spawn returns a T we can automatically recover it into a uni T similar to recover. This can't be done with class async because there's no clear point of termination and thus no clear idea as to whether and for how long the process might retain references to the result. In the future we could also allow sharing of data in a limited fashion.
  • We can get rid of the message allocation process, fixing Using C types smaller than 8 bytes in a Channel results in a segfault #755 (comment)
  • This also means we can get rid of mailboxes, reducing the size of processes by 32 bytes (from 104B to 72B on Linux). This is not significant, but we get it for free because of the above

@yorickpeterse
Copy link
Collaborator Author

yorickpeterse commented Sep 9, 2024

Openflow uses class async a fair bit to expose shared state/mutation safely. At its core is a State type that's a class async, which is used for diffing and updating the state of the ventilation system. Different classes used for different types of inputs (e.g. a CO2 sensor or a motion sensor) use references to this State process to apply their changes.

A heavily condensed version of the State class would be something like this:

class Room {}

class async State {
  fn pub async mut add_room(room: uni Room) {}

  fn pub async mut prepare {}

  fn pub async mut update(block: uni fn (mut Map[String, Room]) -> Bool) {}
}

If we throw class async out the window and use structured concurrency, State becomes a regular class initialized in a spawn somewhere, and all clients use a Channel. The resulting setup would be something like this:

class enum Message {
  case AddRoom(uni Room)
  case Prepare
  case Update(uni fn (mut Map[String, Room]) -> Bool)
}

class State {
  let @chan: Channel[uni Message]

  fn mut run {
    loop {
      match @chan.receive {
        case AddRoom(r) -> add_room(r)
        case Prepare -> prepare
        case Update(f) -> update(f)
      }
    }
  }

  fn mut add_room(room: uni Room) {}

  fn mut prepare {}

  fn mut update(block: uni fn (mut Map[String, Room]) -> Bool) {}
}

The main loop here is what I wanted to avoid by adding class async to the language, and it's a pattern also seen in e.g. Erlang where it's handled using gen_server.

With that said, the class async setup does start to break down a little the moment you need to expose the output to another process. In such a case, you need to pass a Channel around somehow so the result can be sent over the channel.

@yorickpeterse
Copy link
Collaborator Author

yorickpeterse commented Sep 9, 2024

To put it differently, there's some interesting opposites at play here:

Structured concurrency using the proposed async / spawn reduces boilerplate for fork-join style workloads, but requires a bit of extra boilerplate for long-lived processes. Long-lived processes in turn suffer from the exact opposite: they require more boilerplate for short-lived tasks, but for long-lived tasks it sort of evens out.

@yorickpeterse
Copy link
Collaborator Author

Another thing worth noting: when looking at code, async/spawn makes it clear we're starting a bunch of processes and waiting for them to finish. With class async this isn't clear because the syntax for spawning processes is the same as creating instances of regular classes. It's also not clear when the process finishes running.

@yorickpeterse
Copy link
Collaborator Author

yorickpeterse commented Sep 9, 2024

Another thought: structured concurrency is nice, but if the only benefit is that after an async all processes are joined, then I'm not sure that's that much of a benefit. Or to put it differently, runaway processes/threads aren't that common of a problem.

In addition, channels were introduced to make fork-join workflows easier, but this sort of violates the general idea of actors. An alternative would be to (re)introduce the Future type, but also introduce a Futures type that allows for efficient "polling" of multiple futures.

A fully async model would be nicer, but this won't work well for fork-join workflows as the capabilities/messages of the calling process aren't necessarily known, so there's no way for child processes to communicate back their results.

@yorickpeterse
Copy link
Collaborator Author

I'm currently leaning towards keeping class async, and perhaps reintroduce futures but allow for more efficient polling. This would allow for bi-directional communication, but without the need for multiple-published-multiple-consumer channels, though we may want to keep those to allow e.g. distribution of jobs (#681).

@yorickpeterse
Copy link
Collaborator Author

Using futures and processes, we can in fact implement something like Channel efficiently:

import std.clone (Clone)
import std.stdio (Stdout)

class Future[T] {
  # This would of course use something else in reality.
  let @inner: Channel[T]

  fn static new -> Future[T] {
    Future(Channel.new(size: 1))
  }

  fn move set(value: uni T) {
    @inner.send(value)
  }

  fn move get -> uni T {
    @inner.receive
  }
}

impl Clone[Future[T]] for Future {
  fn pub clone -> Future[T] {
    Future(@inner.clone)
  }
}

# How one could implement a Channel-like construct but using futures.
#
# This implementation is uses LIFO, but if we used a Deque we'd be able to turn
# it into a FIFO channel.
class async Chan[T] {
  let @values: Array[uni T]
  let @futures: Array[Future[uni T]]

  fn static new -> Chan[T] {
    Chan(values: recover [], futures: recover [])
  }

  fn async mut send(value: uni T) {
    match @futures.pop {
      case Some(f) -> f.set(value)
      case _ -> @values.push(value)
    }
  }

  fn async mut receive(future: uni Future[uni T]) {
    match @values.pop {
      case Some(v) -> future.set(v)
      case _ -> @futures.push(future)
    }
  }
}

class async Main {
  fn async main {
    let out = Stdout.new
    let chan = Chan.new

    chan.send(42)

    let fut = Future.new

    # This part is a little weird since we have to explicitly pass in the
    # futures.
    chan.receive(recover fut.clone)
    out.print(fut.get.to_string)
  }
}

The caveat is that making it a fixed-size channel is probably a little more tricky, and that there would be some extra overhead due to the message sending.

The way you'd use this for e.g. std.test is you'd create a Chan and pass it to each test runner. Then for each test you'd create a future and wait for it to be resolved. This works because we don't really care which test resolves what future, as long as we don't have to wait for all tests to finish before we can show anything.

@yorickpeterse
Copy link
Collaborator Author

Also worth adding: futures would need a shared internal state. This would require making them use atomic reference counting. This in turn means that values can be written and received multiple times, meaning they're technically a promise instead of a future. This in turn also means they're just channels with a capacity of 1, though this allows them to be a bit more efficient.

@yorickpeterse
Copy link
Collaborator Author

Futures/promises allowing multiple reads and writes might actually not be so bad: in the above example it means you can reuse the same future/promise for every test, instead of having to allocate one for each test.

@yorickpeterse
Copy link
Collaborator Author

Using futures, using processes is a bit like iterators: every time you want them to produce a result, you have to "advance" them by sending a message. Similar to external iterators, this can make certain implementations tricky. For example, if you have a process that walks through the files in a directory, you have two choices:

  • Advance one file at a time, writing the results to a future. This requires maintaining internal state such that the iteration process can be resumed.
  • Advance through all files at once, sending the results over a Channel. This removes the need for internal mutable state, similar to internal iterators. The channel being of a fixed size also prevents us from buffering too much data at once.

If we compare just futures with channels, channels are strictly superior in terms of flexibility because:

  1. They allow for multiple readers and writers
  2. You can send multiple values instead of just a single one
  3. You can use them for balancing workloads fairly across processes

The latter is also worth highlighting: if you have a bunch of work that needs to be performed across processes, a channel allows for that work to be balanced automatically (since it's just a shared FIFO queue). If all we had were futures, we'd need some sort of round-robin approach. This can result in worse performance if one process is performing a big job, because other processes can't steal the still pending messages.

@yorickpeterse
Copy link
Collaborator Author

A benefit in favour of class async over anonymous processes and channels: using class async, each message can have its own generic type parameters. This is something I use in Openflow:

class async State {
  fn pub async mut update_with[T](
    data: uni T,
    block: uni fn (mut Map[String, Room], T) -> Bool,
  ) {
    ...
  }
}

Using channels we'd have to define a Message[T] type, then assign T some dummy type whenever we create other messages (e.g. Message[Nil]). This can get out of hand quickly when many messages need their own type parameters.

@yorickpeterse
Copy link
Collaborator Author

yorickpeterse commented Sep 12, 2024

Regarding load balancing: this can be achieved by spawning a process for each job, i.e. M processes for M jobs, instead of mapping M jobs onto N processes where N < M. In the case of std.test this would simplify the implementation a bit as well.

EDIT: actually, this isn't the case. The test suite has 1079 tests, and a bunch of these spawn Inko sub processes. Each of those uses a bunch of threads, which seems to trigger some thread/process count limit on my machine. In other words, there are times where you do need to limit the amount of concurrency and use channels to balance the load.

@yorickpeterse
Copy link
Collaborator Author

I'm going to leave this be for the time being. At this stage I think it's premature to replace our concurrency setup with something else, and I can't really think of anything better either. We'll likely revisit this at some point in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
compiler Changes related to the compiler feature New things to add to Inko, such as a new standard library module runtime Changes related to the Rust-based runtime library std Changes related to the standard library
Projects
None yet
Development

No branches or pull requests

1 participant