-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add a new module
channel
similar to Golang's channel
See example `examples/channel.lua`. Signed-off-by: Jianhui Zhao <[email protected]>
- Loading branch information
Showing
5 changed files
with
154 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
-- SPDX-License-Identifier: MIT | ||
-- Author: Jianhui Zhao <[email protected]> | ||
|
||
local sync = require 'eco.sync' | ||
|
||
local M = {} | ||
|
||
local methods = {} | ||
|
||
function methods:length() | ||
return #self.buf | ||
end | ||
|
||
function methods:close() | ||
if self.closed then | ||
return | ||
end | ||
|
||
self.closed = true | ||
self.cond_recv:signal() | ||
end | ||
|
||
-- On success, true is returned | ||
-- On error, false is returned with a string describing the error | ||
function methods:send(v, timeout) | ||
assert(not self.closed, 'sending on closed channel') | ||
|
||
local buf = self.buf | ||
|
||
if #buf == self.capacity then | ||
local ok, err = self.cond_send:wait(timeout) | ||
if not ok then | ||
return false, err | ||
end | ||
end | ||
|
||
buf[#buf + 1] = v | ||
|
||
self.cond_recv:signal() | ||
|
||
return true | ||
end | ||
|
||
-- On success, the value received is returned | ||
-- On closed, nil is returned | ||
-- On error, nil is returned with a string describing the error | ||
function methods:recv(timeout) | ||
local buf = self.buf | ||
|
||
if #buf < 1 then | ||
if self.closed then | ||
return nil | ||
end | ||
|
||
local ok, err = self.cond_recv:wait(timeout) | ||
if not ok then | ||
return nil, err | ||
end | ||
end | ||
|
||
if #buf < 1 then | ||
return nil | ||
end | ||
|
||
local v = buf[1] | ||
|
||
table.remove(buf, 1) | ||
|
||
self.cond_send:signal() | ||
|
||
return v | ||
end | ||
|
||
local metatable = { | ||
__index = methods | ||
} | ||
|
||
--[[ | ||
A channel provides a mechanism for communication between coroutines by sending and receiving values. | ||
When a coroutine sends data to a channel, if the channel is full, the sending operation is blocked | ||
until another coroutine has taken data from the channel. | ||
Similarly, when a coroutine receives data from a channel, if there is no data available in the channel, | ||
the receiving operation will be blocked until another coroutine has sent data to the channel. | ||
Closing a channel notifies other coroutines that the channel is no longer in use. After a channel is closed, | ||
other coroutines can still receive data from it, but they can no longer send data to it. | ||
A channel can have a buffer, default is 1. | ||
--]] | ||
function M.new(capacity) | ||
assert(capacity == nil or math.type(capacity) == 'integer', 'capacity must be an integer') | ||
|
||
if not capacity or capacity < 1 then | ||
capacity = 1 | ||
end | ||
|
||
local ch = { | ||
cond_send = sync.cond(), | ||
cond_recv = sync.cond(), | ||
capacity = capacity, | ||
closed = false, | ||
buf = {} | ||
} | ||
|
||
return setmetatable(ch, metatable) | ||
end | ||
|
||
return M |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
#!/usr/bin/env eco | ||
|
||
local channel = require 'eco.channel' | ||
local time = require 'eco.time' | ||
|
||
-- Create a channel with 5 buffers. | ||
local ch = channel.new(5) | ||
|
||
eco.run(function() | ||
local i = 1 | ||
while true do | ||
ch:send(i) | ||
|
||
print(os.time(), 'send', i) | ||
|
||
if i == 5 then | ||
ch:close() | ||
break | ||
end | ||
|
||
i = i + 1 | ||
time.sleep(1) | ||
end | ||
end) | ||
|
||
eco.run(function() | ||
time.sleep(2) | ||
while true do | ||
local v = ch:recv() | ||
if v then | ||
print(os.time(), 'recv:', v) | ||
else | ||
print(os.time(), 'closed') | ||
break | ||
end | ||
end | ||
end) | ||
|
||
while true do | ||
time.sleep(1) | ||
end |