Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async await using libuv #83

Merged
merged 61 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
a096073
started branch
oberblastmeister Feb 23, 2021
b363ee0
added bench
oberblastmeister Feb 23, 2021
621bfb9
added plenary. ahead
oberblastmeister Feb 23, 2021
b226549
changed naming
oberblastmeister Feb 24, 2021
eea0b24
added work future and test
oberblastmeister Feb 24, 2021
3fe213a
fixed await_all, added more benches and tests
oberblastmeister Feb 24, 2021
db2f958
ntoes
oberblastmeister Feb 24, 2021
02e938e
more notes
oberblastmeister Feb 24, 2021
7b9c2ac
added doc
oberblastmeister Feb 24, 2021
2ccda6b
added M
oberblastmeister Feb 24, 2021
4fe5f84
added some more uv functions
oberblastmeister Feb 24, 2021
23553f3
start of counting semaphore
oberblastmeister Feb 26, 2021
7062489
more docs
oberblastmeister Feb 26, 2021
dab2d92
use join in run_all
oberblastmeister Feb 26, 2021
de8ac26
started branch
oberblastmeister Feb 27, 2021
430a2d0
fixed tests
oberblastmeister Feb 27, 2021
1bc875a
removed unneeded
oberblastmeister Feb 27, 2021
68af03b
small changes
oberblastmeister Feb 28, 2021
8915094
async: refactor futures without object
bfredl Feb 28, 2021
e1e44b1
maded naming more consistent
oberblastmeister Feb 28, 2021
9234048
added argc
oberblastmeister Feb 28, 2021
6906da0
added argc for wrap
oberblastmeister Mar 5, 2021
60368ff
added argc for all functions
oberblastmeister Mar 6, 2021
53ab866
put in main loop
oberblastmeister Mar 9, 2021
b682e78
made timeout work
oberblastmeister Mar 9, 2021
7fc1214
added runned
oberblastmeister Mar 9, 2021
02fe675
removed convert
oberblastmeister Mar 13, 2021
1c4559b
added nvim future to be able to call api
oberblastmeister Mar 18, 2021
0fd0f1a
added select
oberblastmeister Mar 18, 2021
dc68752
fixed wrong argc in select function
oberblastmeister Mar 18, 2021
c5efd4d
added block on
oberblastmeister Mar 20, 2021
83378a0
updated waiting time for blockon
oberblastmeister Mar 20, 2021
2e3ef66
Merge remote-tracking branch 'upstream/master' into async
oberblastmeister Mar 20, 2021
bab7509
added protect and block_on
oberblastmeister Mar 20, 2021
578cd63
added api helper
oberblastmeister Mar 20, 2021
0c96921
updated benchs for api
oberblastmeister Mar 20, 2021
8384d3d
fixed protected
oberblastmeister Mar 20, 2021
fbd402c
validate sender
oberblastmeister Mar 20, 2021
1666282
add in_fast_event check
oberblastmeister Mar 20, 2021
e24a703
removed unneeded asset file
oberblastmeister Mar 21, 2021
f74b094
removed comment
oberblastmeister Mar 21, 2021
522a7b8
change name to scheduler
oberblastmeister Mar 21, 2021
ffcf332
removed idle and work related stuff for now
oberblastmeister Mar 21, 2021
2c9f3a9
removed work tests and changed name to util
oberblastmeister Mar 21, 2021
58ac59d
added scope and void
oberblastmeister Mar 21, 2021
73c4c81
added check to condvar
oberblastmeister Mar 21, 2021
e3e3e8f
removed unnecesary concats
oberblastmeister Mar 22, 2021
14c734d
removed long bench file
oberblastmeister Mar 22, 2021
79640e9
added better errors
oberblastmeister Mar 22, 2021
92afbb9
added many docs
oberblastmeister Mar 22, 2021
fc4557f
moved block_on and fixed oneshot channel
oberblastmeister Mar 23, 2021
4767627
added async tests
oberblastmeister Mar 23, 2021
60ea7d9
updated tests and added describe it
oberblastmeister Mar 23, 2021
1752ae5
fixed channel and added more tests
oberblastmeister Mar 23, 2021
df921db
more tests
oberblastmeister Mar 23, 2021
374d7cd
added counter channel
oberblastmeister Mar 24, 2021
981a291
changed counter api and added tests
oberblastmeister Mar 27, 2021
2765ee5
added more deque methods and tests
oberblastmeister Mar 27, 2021
6dfa6fc
added mspc channel
oberblastmeister Mar 27, 2021
5f2d6e5
woops forgot to commit
oberblastmeister Mar 28, 2021
5233604
remove runned
oberblastmeister Mar 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions lua/plenary/async_lib/api.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
local a = require('plenary.async_lib.async')
local async, await = a.async, a.await

return setmetatable({}, {__index = function(t, k)
return async(function(...)
-- if we are in a fast event await the scheduler
if vim.in_fast_event() then
await(a.scheduler())
end

vim.api[k](...)
end)
end})
201 changes: 201 additions & 0 deletions lua/plenary/async_lib/async.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
local co = coroutine
local errors = require('plenary.errors')
local traceback_error = errors.traceback_error

local M = {}

---@class Future
---Something that will give a value when run

---Executes a future with a callback when it is done
---@param future Future: the future to execute
---@param callback function: the callback to call when done
local execute = function(future, callback)
assert(type(future) == "function", "type error :: expected func")
local thread = co.create(future)

local step
step = function(...)
local res = {co.resume(thread, ...)}
local stat = res[1]
local ret = {select(2, unpack(res))}
oberblastmeister marked this conversation as resolved.
Show resolved Hide resolved

if not stat then
error(string.format("The coroutine failed with this message: %s", ret[1]))
end

if co.status(thread) == "dead" then
(callback or function() end)(unpack(ret))
else
assert(#ret == 1, "expected a single return value")
local returned_future = ret[1]
assert(type(returned_future) == "function", "type error :: expected func")
returned_future(step)
end
end

step()
end

---Creates an async function with a callback style function.
---@param func function: A callback style function to be converted. The last argument must be the callback.
---@param argc number: The number of arguments of func. Must be included.
---@return function: Returns an async function
M.wrap = function(func, argc)
if type(func) ~= "function" then
traceback_error("type error :: expected func, got " .. type(func))
end

if type(argc) ~= "number" and argc ~= "vararg" then
traceback_error("expected argc to be a number or string literal 'vararg'")
end

return function(...)
local params = {...}

local function future(step)
if step then
if type(argc) == "number" then
params[argc] = step
else
table.insert(params, step) -- change once not optional
end
oberblastmeister marked this conversation as resolved.
Show resolved Hide resolved
return func(unpack(params))
else
return co.yield(future)
end
end
return future
end
end

---Return a new future that when run will run all futures concurrently.
---@param futures table: the futures that you want to join
---@return Future: returns a future
M.join = M.wrap(function(futures, step)
local len = #futures
local results = {}
local done = 0

if len == 0 then
return step(results)
end

for i, future in ipairs(futures) do
assert(type(future) == "function", "type error :: future must be function")

local callback = function(...)
results[i] = {...}
done = done + 1
if done == len then
step(results)
end
end

future(callback)
end
end, 2)

---Returns a future that when run will select the first future that finishes
---@param futures table: The future that you want to select
---@return Future
M.select = M.wrap(function(futures, step)
local selected = false

for _, future in ipairs(futures) do
assert(type(future) == "function", "type error :: future must be function")

local callback = function(...)
if not selected then
selected = true
step(...)
end
end

future(callback)
end
end, 2)

---Use this to either run a future concurrently and then do something else
---or use it to run a future with a callback in a non async context
---@param future Future
---@param callback function
M.run = function(future, callback)
future(callback or function() end)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about making callback required and making a new function like start or something? Idk, I'm not sure I like having it optional callback (haven't thought a lot about it yet tho)

end

---Same as run but runs multiple futures
---@param futures table
---@param callback function
M.run_all = function(futures, callback)
M.run(M.join(futures), callback)
end

---Await a future, yielding the current function
---@param future Future
---@return any: returns the result of the future when it is done
M.await = function(future)
assert(type(future) == "function", "type error :: expected function to await")
return future(nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit weird that here we pass nil and above (line 119) we pass callback or function() end -- I'd like to make it more consistent perhaps. Seems fine though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are supposed to be completely opposite functions which is why they are not consistent. await will yield the function and run a future asynchronously and then resume when it is done. Passing in nil to any future will await it in an async context. callback or function() end is used because passing a function into a future will run it in a none async function meaning it does not yield. Not passing in a function will still run it, hence the or. Using the or basically forces it to be a function. Futures are created using the wrap function and async function. In those functions you can see that there are if statements checking if we passed in nil or a function. What type of argument we passed in controls the behavior.

end

---Same as await but can await multiple futures.
---If the futures have libuv leaf futures they will be run concurrently
---@param futures table
---@return table: returns a table of results that each future returned. Note that if the future returns multiple values they will be packed into a table.
M.await_all = function(futures)
assert(type(futures) == "table", "type error :: expected table")
return M.await(M.join(futures))
end

---suspend a coroutine
M.suspend = co.yield

---create a async scope
M.scope = function(func)
M.run(M.future(func))
end

--- Future a :: a -> (a -> ())
--- turns this signature
--- ... -> Future a
--- into this signature
--- ... -> ()
M.void = function(async_func)
return function(...)
async_func(...)(function() end)
end
end

---creates an async function
---@param func function
---@return function: returns an async function
M.async = function(func)
if type(func) ~= "function" then
traceback_error("type error :: expected func, got " .. type(func))
end

return function(...)
local args = {...}
local function future(step)
if step == nil then
return func(unpack(args))
else
execute(future, step)
end
end
return future
end
end

---creates a future
---@param func function
---@return Future
M.future = function(func)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this create a future? I'd like to rename this maybe to to_future ? idk, what are your thoughts?

return M.async(func)()
end

---An async function that when awaited will await the scheduler to be able to call the api.
M.scheduler = M.wrap(vim.schedule, 1)

return M
36 changes: 36 additions & 0 deletions lua/plenary/async_lib/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
local exports = require('plenary.async_lib.async')
exports.uv = require('plenary.async_lib.uv_async')
exports.util = require('plenary.async_lib.util')
exports.lsp = require('plenary.async_lib.lsp')
exports.api = require('plenary.async_lib.api')
exports.tests = require('plenary.async_lib.tests')

exports.tests.add_globals = function()
a = exports
async = exports.async
await = exports.await
await_all = exports.await_all

-- must prefix with a or stack overflow, plenary.test harness already added it
a.describe = exports.tests.describe
-- must prefix with a or stack overflow
a.it = exports.tests.it
end

exports.tests.add_to_env = function()
local env = getfenv(2)

env.a = exports
env.async = exports.async
env.await = exports.await
env.await_all = exports.await_all

-- must prefix with a or stack overflow, plenary.test harness already added it
env.a.describe = exports.tests.describe
-- must prefix with a or stack overflow
env.a.it = exports.tests.it

setfenv(2, env)
end

return exports
8 changes: 8 additions & 0 deletions lua/plenary/async_lib/lsp.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
local a = require('plenary.async_lib.async')

local M = {}

---Same as vim.lsp.buf_request but works with async await
M.buf_request = a.wrap(vim.lsp.buf_request, 4)

return M
112 changes: 112 additions & 0 deletions lua/plenary/async_lib/structs.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
local M = {}

Deque = {}
Deque.__index = Deque

---@class Deque
---A double ended queue
---
---@return Deque
function Deque.new()
-- the indexes are created with an offset so that the indices are consequtive
-- otherwise, when both pushleft and pushright are used, the indices will have a 1 length hole in the middle
return setmetatable({first = 0, last = -1}, Deque)
end

---push to the left of the deque
---@param value any
function Deque:pushleft(value)
local first = self.first - 1
self.first = first
self[first] = value
end

---push to the right of the deque
---@param value any
function Deque:pushright(value)
local last = self.last + 1
self.last = last
self[last] = value
end

---pop from the left of the deque
---@return any
function Deque:popleft()
local first = self.first
if first > self.last then return nil end
local value = self[first]
self[first] = nil -- to allow garbage collection
self.first = first + 1
return value
end

---pops from the right of the deque
---@return any
function Deque:popright()
local last = self.last
if self.first > last then return nil end
local value = self[last]
self[last] = nil -- to allow garbage collection
self.last = last - 1
return value
end

---checks if the deque is empty
---@return boolean
function Deque:is_empty()
return self:len() == 0
end

---returns the number of elements of the deque
---@return number
function Deque:len()
return self.last - self.first + 1
end

---returns and iterator of the indices and values starting from the left
---@return function
function Deque:ipairs_left()
local i = self.first

return function()
local res = self[i]
local idx = i

if res then
i = i + 1

return idx, res
end
end
end

---returns and iterator of the indices and values starting from the right
---@return function
function Deque:ipairs_right()
local i = self.last

return function()
local res = self[i]
local idx = i

if res then
i = i - 1 -- advance the iterator before we return

return idx, res
end
end
end

---removes all values from the deque
---@return nil
function Deque:clear()
for i, _ in self:ipairs_left() do
self[i] = nil
end
self.first = 0
self.last = -1
end

M.Deque = Deque

return M
14 changes: 14 additions & 0 deletions lua/plenary/async_lib/tests.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
local a = require('plenary.async_lib.async')
local util = require('plenary.async_lib.util')

local M = {}

M.describe = function(s, func)
describe(s, util.will_block(a.future(func)))
end

M.it = function(s, func)
it(s, util.will_block(a.future(func)))
end

return M
Loading