Skip to content

Commit

Permalink
async await using libuv (nvim-lua#83)
Browse files Browse the repository at this point in the history
* started branch

* added bench

* added plenary. ahead

* changed naming

* added work future and test

* fixed await_all, added more benches and tests

* ntoes

* more notes

* added doc

* added M

* added some more uv functions

* start of counting semaphore

* more docs

* use join in run_all

* started branch

* fixed tests

* removed unneeded

* small changes

* async: refactor futures without object

* maded naming more consistent

* added argc

* added argc for wrap

* added argc for all functions

* put in main loop

* made timeout work

* added runned

* removed convert

* added nvim future to be able to call api

* added select

* fixed wrong argc in select function

* added block on

* updated waiting time for blockon

* added protect and block_on

* added api helper

* updated benchs for api

* fixed protected

* validate sender

* add in_fast_event check

* removed unneeded asset file

* removed comment

* change name to scheduler

* removed idle and work related stuff for now

* removed work tests and changed name to util

* added scope and void

* added check to condvar

* removed unnecesary concats

* removed long bench file

* added better errors

* added many docs

* moved block_on and fixed oneshot channel

* added async tests

* updated tests and added describe it

* fixed channel and added more tests

* more tests

* added counter channel

* changed counter api and added tests

* added more deque methods and tests

* added mspc channel

* woops forgot to commit

* remove runned

Co-authored-by: Björn Linse <[email protected]>
  • Loading branch information
2 people authored and delphinus committed Apr 26, 2021
1 parent fdcb306 commit 48f0ad5
Show file tree
Hide file tree
Showing 16 changed files with 1,371 additions and 0 deletions.
13 changes: 13 additions & 0 deletions lua/plenary/async_lib/api.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
local a = require('plenary.async_lib.async')
local async, await = a.async, a.await

return setmetatable({}, {__index = function(t, k)
return async(function(...)
-- if we are in a fast event await the scheduler
if vim.in_fast_event() then
await(a.scheduler())
end

vim.api[k](...)
end)
end})
201 changes: 201 additions & 0 deletions lua/plenary/async_lib/async.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
local co = coroutine
local errors = require('plenary.errors')
local traceback_error = errors.traceback_error

local M = {}

---@class Future
---Something that will give a value when run

---Executes a future with a callback when it is done
---@param future Future: the future to execute
---@param callback function: the callback to call when done
local execute = function(future, callback)
assert(type(future) == "function", "type error :: expected func")
local thread = co.create(future)

local step
step = function(...)
local res = {co.resume(thread, ...)}
local stat = res[1]
local ret = {select(2, unpack(res))}

if not stat then
error(string.format("The coroutine failed with this message: %s", ret[1]))
end

if co.status(thread) == "dead" then
(callback or function() end)(unpack(ret))
else
assert(#ret == 1, "expected a single return value")
local returned_future = ret[1]
assert(type(returned_future) == "function", "type error :: expected func")
returned_future(step)
end
end

step()
end

---Creates an async function with a callback style function.
---@param func function: A callback style function to be converted. The last argument must be the callback.
---@param argc number: The number of arguments of func. Must be included.
---@return function: Returns an async function
M.wrap = function(func, argc)
if type(func) ~= "function" then
traceback_error("type error :: expected func, got " .. type(func))
end

if type(argc) ~= "number" and argc ~= "vararg" then
traceback_error("expected argc to be a number or string literal 'vararg'")
end

return function(...)
local params = {...}

local function future(step)
if step then
if type(argc) == "number" then
params[argc] = step
else
table.insert(params, step) -- change once not optional
end
return func(unpack(params))
else
return co.yield(future)
end
end
return future
end
end

---Return a new future that when run will run all futures concurrently.
---@param futures table: the futures that you want to join
---@return Future: returns a future
M.join = M.wrap(function(futures, step)
local len = #futures
local results = {}
local done = 0

if len == 0 then
return step(results)
end

for i, future in ipairs(futures) do
assert(type(future) == "function", "type error :: future must be function")

local callback = function(...)
results[i] = {...}
done = done + 1
if done == len then
step(results)
end
end

future(callback)
end
end, 2)

---Returns a future that when run will select the first future that finishes
---@param futures table: The future that you want to select
---@return Future
M.select = M.wrap(function(futures, step)
local selected = false

for _, future in ipairs(futures) do
assert(type(future) == "function", "type error :: future must be function")

local callback = function(...)
if not selected then
selected = true
step(...)
end
end

future(callback)
end
end, 2)

---Use this to either run a future concurrently and then do something else
---or use it to run a future with a callback in a non async context
---@param future Future
---@param callback function
M.run = function(future, callback)
future(callback or function() end)
end

---Same as run but runs multiple futures
---@param futures table
---@param callback function
M.run_all = function(futures, callback)
M.run(M.join(futures), callback)
end

---Await a future, yielding the current function
---@param future Future
---@return any: returns the result of the future when it is done
M.await = function(future)
assert(type(future) == "function", "type error :: expected function to await")
return future(nil)
end

---Same as await but can await multiple futures.
---If the futures have libuv leaf futures they will be run concurrently
---@param futures table
---@return table: returns a table of results that each future returned. Note that if the future returns multiple values they will be packed into a table.
M.await_all = function(futures)
assert(type(futures) == "table", "type error :: expected table")
return M.await(M.join(futures))
end

---suspend a coroutine
M.suspend = co.yield

---create a async scope
M.scope = function(func)
M.run(M.future(func))
end

--- Future a :: a -> (a -> ())
--- turns this signature
--- ... -> Future a
--- into this signature
--- ... -> ()
M.void = function(async_func)
return function(...)
async_func(...)(function() end)
end
end

---creates an async function
---@param func function
---@return function: returns an async function
M.async = function(func)
if type(func) ~= "function" then
traceback_error("type error :: expected func, got " .. type(func))
end

return function(...)
local args = {...}
local function future(step)
if step == nil then
return func(unpack(args))
else
execute(future, step)
end
end
return future
end
end

---creates a future
---@param func function
---@return Future
M.future = function(func)
return M.async(func)()
end

---An async function that when awaited will await the scheduler to be able to call the api.
M.scheduler = M.wrap(vim.schedule, 1)

return M
36 changes: 36 additions & 0 deletions lua/plenary/async_lib/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
local exports = require('plenary.async_lib.async')
exports.uv = require('plenary.async_lib.uv_async')
exports.util = require('plenary.async_lib.util')
exports.lsp = require('plenary.async_lib.lsp')
exports.api = require('plenary.async_lib.api')
exports.tests = require('plenary.async_lib.tests')

exports.tests.add_globals = function()
a = exports
async = exports.async
await = exports.await
await_all = exports.await_all

-- must prefix with a or stack overflow, plenary.test harness already added it
a.describe = exports.tests.describe
-- must prefix with a or stack overflow
a.it = exports.tests.it
end

exports.tests.add_to_env = function()
local env = getfenv(2)

env.a = exports
env.async = exports.async
env.await = exports.await
env.await_all = exports.await_all

-- must prefix with a or stack overflow, plenary.test harness already added it
env.a.describe = exports.tests.describe
-- must prefix with a or stack overflow
env.a.it = exports.tests.it

setfenv(2, env)
end

return exports
8 changes: 8 additions & 0 deletions lua/plenary/async_lib/lsp.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
local a = require('plenary.async_lib.async')

local M = {}

---Same as vim.lsp.buf_request but works with async await
M.buf_request = a.wrap(vim.lsp.buf_request, 4)

return M
112 changes: 112 additions & 0 deletions lua/plenary/async_lib/structs.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
local M = {}

Deque = {}
Deque.__index = Deque

---@class Deque
---A double ended queue
---
---@return Deque
function Deque.new()
-- the indexes are created with an offset so that the indices are consequtive
-- otherwise, when both pushleft and pushright are used, the indices will have a 1 length hole in the middle
return setmetatable({first = 0, last = -1}, Deque)
end

---push to the left of the deque
---@param value any
function Deque:pushleft(value)
local first = self.first - 1
self.first = first
self[first] = value
end

---push to the right of the deque
---@param value any
function Deque:pushright(value)
local last = self.last + 1
self.last = last
self[last] = value
end

---pop from the left of the deque
---@return any
function Deque:popleft()
local first = self.first
if first > self.last then return nil end
local value = self[first]
self[first] = nil -- to allow garbage collection
self.first = first + 1
return value
end

---pops from the right of the deque
---@return any
function Deque:popright()
local last = self.last
if self.first > last then return nil end
local value = self[last]
self[last] = nil -- to allow garbage collection
self.last = last - 1
return value
end

---checks if the deque is empty
---@return boolean
function Deque:is_empty()
return self:len() == 0
end

---returns the number of elements of the deque
---@return number
function Deque:len()
return self.last - self.first + 1
end

---returns and iterator of the indices and values starting from the left
---@return function
function Deque:ipairs_left()
local i = self.first

return function()
local res = self[i]
local idx = i

if res then
i = i + 1

return idx, res
end
end
end

---returns and iterator of the indices and values starting from the right
---@return function
function Deque:ipairs_right()
local i = self.last

return function()
local res = self[i]
local idx = i

if res then
i = i - 1 -- advance the iterator before we return

return idx, res
end
end
end

---removes all values from the deque
---@return nil
function Deque:clear()
for i, _ in self:ipairs_left() do
self[i] = nil
end
self.first = 0
self.last = -1
end

M.Deque = Deque

return M
14 changes: 14 additions & 0 deletions lua/plenary/async_lib/tests.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
local a = require('plenary.async_lib.async')
local util = require('plenary.async_lib.util')

local M = {}

M.describe = function(s, func)
describe(s, util.will_block(a.future(func)))
end

M.it = function(s, func)
it(s, util.will_block(a.future(func)))
end

return M
Loading

0 comments on commit 48f0ad5

Please sign in to comment.