- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Coroutines (async/await/promise) language support #727
Comments
See also #174. I don't think it's a duplicate, because supporting coroutines is only one component of a greater concurrency story. And I do think we should support coroutines. My plan is to add experimental coroutines support as soon as 0.2.0 is released (1 week after LLVM 6.0.0 is released). That way I can work against llvm 6 instead of 5 which I believe has iterated on the coroutines support a bit. The first implementation of coroutines in zig will likely be very low level, little more than exposing the LLVM coroutine primitives. Once myself and others have a chance to play with these tools, I'll do another design iteration on it. |
As a first step towards coroutines, I submitted a patch to LLVM: https://reviews.llvm.org/D43368 In the meantime we can create an API that pretends we have this patch, and generates less efficient code (allocating too much memory) until this patch is merged and available. |
Coroutines ProposalThere are two main use cases for coroutines:
The former is easy to specify how it should work. The latter is difficult. So this proposal is only for Async I/O and we can solve generators later. How to use coroutines to do async I/Opub fn main() !void {
var loop = try EventLoop.init(std.debug.global_allocator);
// async keyword expects a FunctionCall syntax node next.
// async optionally can be passed `(` + allocator + `)`
// allocator does not have to be std.mem.allocator; it can be any struct T that exposes
// fn alloc(self: T, byte_count: usize, alignment: u29) ![]u8
// fn free(self: T, old_mem: []u8) void
// Calling an `async` function can fail with the same error set as `alloc`.
// Here we add alloc and free to EventLoop for convenience.
const amain_promise = try async(&loop) amain(&loop);
// Calling an async function creates and returns a promise. A promise is a linear type,
// which means that it cannot be copied. It can be consumed with `cancel` or `await`,
// and it *must* be consumed.
defer cancel amain_promise;
// @typeOf(amain_promise) == promise->InferredErrorSet!void
// `promise->T` can be implicitly casted to `promise`.
// `promise->T` can be used with `await`; `cancel` and `resume` can be
// used on `promise` as well as `promise->T`.
// Most async I/O implementations will use an event loop to interact with the Operating
// System's non-blocking I/O facilities.
// `await` can only be used from an async function, and this main function is not
// async, so we start an event loop which bootstraps async functions.
try loop.run();
}
// a function that has `async` in front of `fn` must be called with the async keyword
// Here there are no parentheses after `async` so the allocator type is
// inferred.
async fn amain(loop: &EventLoop) !void {
// Here we start 2 async functions to load 2 different files in a non-blocking manner.
// Async functions may call other async functions with no `(` allocator `)` after
// the async keyword. In this case the async function call uses the same allocator
// that the current function used.
const config_promise = try async loop.allocReadFile("config.json");
// The `alive` property of a promise starts out `true` when an `async` function is called.
// It is set to `false` under these conditions:
// * When you `await` the promise.
// * When you copy (move) the value to another promise (such as passing it as a parameter, or
// assinging it to another variable).
// * When you `cancel` the promise.
// `cancel` asserts `the_promise.alive`.
defer if (config_promise.alive) cancel config_promise;
// When you call an async function, it executes until the first suspend, which could
// be an `await` or a `suspend` call.
const dict_promise = try async loop.allocReadFile("dictionary.txt");
// Canceling the promise on error return of a function is important because it
// runs the defers (and possibly errdefers) of the async function associated with
// the promise. By using `cancel` on any
// promise (or suspended async fn), it properly and cleanly cancels the entire chain of
// suspended coroutines. When an async function is canceled while suspended at an await point,
// it runs the defers and errdefers in scope.
// One could note that, only in the case of error return does this if statement return true,
// and therefore we could use `errdefer` here for better performance and code size.
// Then again, we could let the optimizer perform that note and associated code change,
// and then a future code modification could insert an early return before `await`
// without introducing a bug.
defer if (dict_promise.alive) cancel config_promise;
// When you call `await` on a promise, it does the following things:
// 0. Sets `the_promise.alive` to `false`.
// 1. The caller populates a pointer to the await expression result in the async function's
// coroutine frame.
// 2. The caller populates a pointer to the caller's coroutine handle in the async function's
// coroutine frame.
// 3. The caller creates a save point and is now considered suspended, but can still execute code.
// 4. The caller checks if the suspended coroutine is at the final suspend point.
// If so, the caller resumes the suspended coroutine, which populates the await expression
// value, destroys itself, and then resumes the awaiter.
// 5. Otherwise, the caller finishes suspending. When the async function completes, it populates
// the await expression value, destroys itself, and then resumes the awaiter.
const config_contents = try await config_promise;
defer allocator.free(config_contents);
// `await` cannot fail. The `try` here is from the result type of allocReadFile, which is `![]u8`.
// However, `await` *can* cause an early exit from a function. `await`
// causes the current async function to suspend, and if it is canceled
// instead of resumed, much like `try`, it runs the defers and errdefers in scope.
const dict_contents = try await dict_promise;
defer allocator.free(dict_contents);
if (mem.eql(u8, dict_contents, config_contents)) {
std.debug.warn("config.json and dictionary.txt are the same");
}
}
// Here's an example that dispatches multiple async functions and then
// collects all the information together.
// We also give an example of using an explicit allocator type using `async(T)` in the function prototype.
async(EventLoop) fn concatFiles(loop: &EventLoop, filenames: []const []const u8) ![]u8 {
const promises = try loop.allocator.alloc(promise->[]u8, filenames.len);
errdefer loop.allocator.free(promises);
var started_promise_index: usize = 0;
var done_promise_index: usize = 0;
errdefer {
var i: usize = done_promise_index;
while (i < started_promise_index) : (i += 1) {
cancel promises[i];
}
}
while (started_promise_index < filenames.len) : (started_promise_index += 1) {
promises[started_promise_index] = try async loop.allocReadFile(filenames[started_promise_index]);
}
var buf = std.Buffer.init(loop.allocator);
errdefer buf.deinit();
while (done_promise_index < promises.len) {
const data = try await promises[done_promise_index];
done_promise_index += 1;
defer loop.allocator.free(data);
try buf.append(data);
}
return buf.toOwnedSlice();
}
const EventLoop = struct {
allocator: &mem.Allocator,
queue: std.LinkedList(coro),
epollfd: i32,
fn alloc(self: &EventLoop, byte_count: usize, alignment: u29) (error{OutOfMemory}[]u8) {
return self.allocator.allocFn(self.allocator, byte_count, alignment);
}
fn free(self: &EventLoop, old_mem: []u8) void {
return self.allocator.freeFn(self.allocator, old_mem);
}
async fn allocReadFile(loop: &EventLoop, path: []const u8) ![]u8 {
const fd = std.os.linux.open(path, O_NONBLOCK, perm);
const err = std.os.linux.getErrno(fd);
if (err != 0) return error.OpenFailed;
const file_size_bytes = getFileSizeBytes(fd); // assume this is implemented
const bytes = try loop.allocator.alloc(u8, file_size_bytes);
// in addition to errdefers running when returning an error from a function,
// they also run when the async function is canceled from a non-final
// suspend point, such as `await` or `suspend`.
errdefer loop.allocator.free(bytes);
// This construct is valid and has no possibility of a resource leak.
try await try async loop.read(bytes, fd);
// because this coroutine ends with a `return`, this is marked as the llvm.coro.suspend(final=true)
// and no implicit one is inserted.
return bytes;
};
async fn read(self: &EventLoop, bytes: []u8, fd: i32) !usize {
var dest_index: usize = 0;
while (true) {
const rc = std.os.posixRead(fd, &bytes[dest_index], bytes.len - dest_index);
const read_err = std.os.linux.getErrno(rc);
if (read_err == E_WOULDBLOCK) {
try await try async self.waitFd(fd);
continue;
}
if (read_err != 0) return error.ReadFailed;
if (rc == 0) {
return dest_index;
}
dest_index += rc;
}
}
async fn waitFd(self: &EventLoop, fd: i32) !void {
// A suspend block should always contain logic that will cause the
// current async function to get resumed somehow. In this case,
// the matching `resume` is in the `run` loop, after the `epoll_wait`.
// We put the promise handle into epoll and this suspend point will get
// resumed when epoll_wait says that this fd has data ready.
defer epoll_ctl(self.epollfd, EPOLL_CTL_DEL, fd, &ev);
suspend |the_promise| {
var ev: epoll_event = undefined;
ev.events = EPOLLIN|EPOLLET;
ev.data.ptr = @ptrToInt(the_promise);
epoll_ctl(self.epollfd, EPOLL_CTL_ADD, fd, &ev);
}
}
fn init(allocator: &mem.Allocator) !EventLoop {
const rc = std.os.linux.epoll_create1(std.os.linux.EPOLL_CLOEXEC);
const err = std.os.linux.getErrno(rc);
if (err != 0) return error.EpollFailed;
return EventLoop {
.allocator = allocator,
.queue = std.LinkedList(coro).init(),
.epollfd = rc,
};
}
fn run(self: &EventLoop, main: var) !void {
while (true) {
var events: [10]epoll_event = undefined;
const rc = epoll_wait(self.epollfd, &events[0], events.len, -1);
const err = std.os.linux.getErrno(rc);
if (err != 0) return error.EpollFailed;
for (events[0..rc]) |ev| {
const prom = @intToPtr(promise, ev.data.ptr);
resume prom;
}
}
}
}; |
cc @bnoordhuis I know you did (do?) a lot of libuv work, so I'd be curious to get your take on this. |
Proposal edit: Renaming these builtins to keywords:
|
Proposal edit:
suspend |the_promise| {
// do something with the_promise that will cause it to
// be resumed appropriately
} This corresponds to the distinct save and suspend points in the LLVM docs. So async fn waitFd(self: &EventLoop, fd: i32) !void {
// A suspend block should always contain logic that will cause the
// current async function to get resumed somehow. In this case,
// the matching `resume` is in the `run` loop, after the `epoll_wait`.
// We put the promise handle into epoll and this suspend point will get
// resumed when epoll_wait says that this fd has data ready.
defer epoll_ctl(self.epollfd, EPOLL_CTL_DEL, fd, &ev);
suspend |the_promise| {
var ev: epoll_event = undefined;
ev.events = EPOLLIN|EPOLLET;
ev.data.ptr = @ptrToInt(the_promise);
epoll_ctl(self.epollfd, EPOLL_CTL_ADD, fd, &ev);
}
} What this allows us to do is multiplex coroutines onto threads: suspend |prom| {
// Consider if N threads are waiting on this job queue.
// They would call resume right away, so we must be already
// suspended when we add the promise to the queue. This is
// exactly what the suspend block provides.
atomic_push_to_job_queue(prom);
} |
Proposal edit:
This matters because, although most async functions will probably be generic, if you want to use an async function as a function pointer, e.g. call an async function whose value is not known at compile time, you will have to explicitly specify the allocator type. |
Thanks for the ping, Andrew. Yep, I still do a lot of libuv work. I'm intrigued by the comment on promises being linear types. Is that a new concept in zig? Does linearity mean the promise can be used only inside the function that creates it? Or do you plan to introduce implicit or explicit lifetimes so that it can be returned to the caller? Will Is var promise = try async self.waitFd(fd);
try await promise; Is Will you be able to use if (foo()) bar();
if (await afoo()) bar(); Or: for (foo()) |x| bar(x);
for (await afoo()) |x| bar(x);
for await (foo()) |x| bar(x); // async iterator - probably a project of itself I think the answer is yes, but I thought I'd ask anyway. Does the promise start out suspended or until the first async fn everyNthPiDigit(nth: int) int { ... }
var promise = async everyNthPiDigit(1e9); // returns immediately or after computing the billionth digit?
runInNewThread(promise); I'm still a little unclear on how generators (multiple returns) would work with this proposal. All in all, I like where this is going! |
I'll answer these questions for the proposal as it stands now, but these answers are not necessarily set in stone.
Yes it is, and
The linearity is a requirement of the language but would not be entirely enforced at compile-time. You would be able to assign the promise to another variable, return it from a function, pass it as a parameter, any of these things. Each of these things would assign var promise1 = try async foo();
assert(promise1.alive);
var copy = promise1;
assert(copy.alive);
assert(!promise1.alive); Under the hood, a promise is a pointer. The alive property is actually implemented by setting it to 0 to make it not alive. Calling So it's a linear type in the sense of what the responsibility is for the programmer - one must In practice, if one fails to do anything with a promise, then the async function's defers, if any, will not run. I haven't thought of a way to provide any runtime safety checks for this situation. It's pretty similar to not calling free, which is difficult to catch in the same way. It's also possible to cause a promise to have an incorrect value for
Because of this, I would consider actually removing the ability to query the Either way, the caller can, in fact, return the promise.
That's related to the code review I sent to LLVM above. I actually abandoned it, because the proposal I ended up coming up with didn't necessarily need it. However, now that you bring it up again, it would be nice to have the ability to query the coroutine frame size. Unfortunately, without significant reorganization of how zig works, and tightly coupling zig to LLVM instead of loose coupling as it currently stands, the size would not be available to use at Also, currently when you reference a function, it's actually a comptime known pointer to a function. So
Yes.
It's a semantic analysis error, and in fact is already implemented in the test "aoeu" {
amain();
}
async fn amain() void {
} Output:
You would get
No - with this proposal the async function executes until suspension. You would have to intentionally offload CPU-bound code to a thread like this: async fn everyNthPiDigit(nth: int) int {
suspend |my_promise| {
runInNewThread(my_promise);
}
// we are now running in the other thread
// do computation...
suspend |my_promise| {
runInOriginalThread(my_promise);
}
// back in the original thread
return result;
} Some options here:
The eager execution of async functions does the following things:
To be determined. As is, this proposal does not support multiple returns. |
Here's an example of what async fn amain(loop: &EventLoop) !void {
const config_promise = try async loop.allocReadFile("config.json");
var config_promise_alive = true;
defer if (config_promise_alive) cancel config_promise;
const dict_promise = try async loop.allocReadFile("dictionary.txt");
var dict_promise_alive = true;
defer if (dict_promise_alive) cancel dict_promise;
config_promise_alive = false;
const config_contents = try await config_promise;
defer allocator.free(config_contents);
dict_promise_alive = false;
const dict_contents = try await dict_promise;
defer allocator.free(dict_contents);
if (mem.eql(u8, dict_contents, config_contents)) {
std.debug.warn("config.json and dictionary.txt are the same");
}
} It's a bit verbose, but it does not introduce nesting, it's pretty straightforward, and does not introduce any new language features. My biggest concern would be that it would be easier to not do this than to do it, thus leading to incorrect cleanup code. I have a more general dont-forget-to-cleanup-resources proposal coming up soon that could be integrated into this. |
With #782 the example above would become: async fn amain(loop: &EventLoop) !void {
const config_promise = clean try async loop.allocReadFile("config.json");
const dict_promise = clean try async loop.allocReadFile("dictionary.txt");
const config_contents = clean try await config_promise;
const dict_contents = clean try await dict_promise;
if (mem.eql(u8, dict_contents, config_contents)) {
std.debug.warn("config.json and dictionary.txt are the same");
}
} |
by making alloc and free functions be parameters to async functions instead of using getelementptr in the DynAlloc block See #727
maybe this can be reverted, but it seems to be related to llvm's coro transformations crashing. See #727
my simple coro test program builds now see #727
In case this is helpful to someone: some C++ async/await/resumable proposals |
I dont see how async/await fits into zig's philosophy of not hiding function calls, there is now an entire threadpool (right?!) hidden inside a keyword with complex stack rebuilding and what not. I cant tell you how to be threadsafe and write performant code while there are countless threads doing God knows what; |
There's no thread pool in the zig language. Coroutines work in freestanding mode, with no OS. Here's the thread pool: Line 16 in fd575fe
It's in userland. If you don't import |
Hi,
Does Zig will have language support for coroutines or similar construct ? Rust is going to support async/await, C++20 will get language support for coroutines and other mainstream languages (like C#, JS etc) already have/will have support too. There is already support for coroutines in LLVM also (https://llvm.org/docs/Coroutines.html).
Any thoughts?
The text was updated successfully, but these errors were encountered: