Skip to content

Commit

Permalink
[ new ] Work stealing thread pools and posix bindings (#23)
Browse files Browse the repository at this point in the history
* [ wip ] work stealing

* [ wip ] new work-stealing thread pool

* [ refactor ] use posix and linux libraries

* [ test ] fix tests

* [ fix ] epoll imports
  • Loading branch information
stefan-hoeck authored Jan 6, 2025
1 parent 4d247f1 commit 10ae85b
Show file tree
Hide file tree
Showing 22 changed files with 780 additions and 834 deletions.
54 changes: 28 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ module README
import Data.List
import IO.Async.Loop.Epoll
import IO.Async.Signal
import IO.Async.Posix
import System
import System.Linux.SignalFD
import System.Linux.Signalfd
import System.Posix.File
%default total
```
Expand Down Expand Up @@ -75,17 +77,17 @@ we define two countdowns: One for counting down seconds,
the other counting down milliseconds (in 100 ms steps):

```idris
countSeconds : TimerH e => Nat -> Async e [] ()
countSeconds 0 = putStrLn "Second counter done."
countSeconds : TimerH e => Nat -> Async e [Errno] ()
countSeconds 0 = stdoutLn "Second counter done."
countSeconds (S k) = do
putStrLn "\{show $ S k} s left"
stdoutLn "\{show $ S k} s left"
sleep 1.s
countSeconds k
countMillis : TimerH e => Nat -> Async e [] ()
countMillis 0 = putStrLn "Millisecond counter done."
countMillis : TimerH e => Nat -> Async e [Errno] ()
countMillis 0 = stdoutLn "Millisecond counter done."
countMillis (S k) = do
putStrLn "\{show $ S k * 100} ms left"
stdoutLn "\{show $ S k * 100} ms left"
sleep 100.ms
countMillis k
```
Expand All @@ -110,9 +112,9 @@ used to run our computations to provide the `TimerH` capability.
Let's try and run the two countdowns sequentially:

```idris
countSequentially : Async WorkST [] ()
countSequentially : Async EpollST [Errno] ()
countSequentially = do
putStrLn "Sequential countdown:"
stdoutLn "Sequential countdown:"
countSeconds 2
countMillis 10
```
Expand Down Expand Up @@ -151,9 +153,9 @@ to finish using `wait`. Here's the code:


```idris
countParallel : TimerH e => Async e [] ()
countParallel : TimerH e => Async e [Errno] ()
countParallel = do
putStrLn "Concurrent countdown"
stdoutLn "Concurrent countdown"
f1 <- start $ countSeconds 2
f2 <- start $ countMillis 10
wait f1
Expand Down Expand Up @@ -193,7 +195,7 @@ stores their results again in a heterogeneous list (use `"par2"` as the
command-line argument to run the next example):

```idris
countParallel2 : TimerH e => Async e [] ()
countParallel2 : TimerH e => Async e [Errno] ()
countParallel2 = ignore $ par [ countSeconds 2, countMillis 10 ]
```

Expand All @@ -208,17 +210,14 @@ we also add a signal handler so we can abort the program by
entering `Ctrl-c` at the terminal:

```idris
onSigErr : SignalError -> Async e [] ()
onSigErr (Error n) = putStrLn "Encountered signal error: \{show n}"
covering
raceParallel : TimerH e => SignalH e => Async e [] ()
raceParallel : TimerH e => SignalH e => Async e [Errno] ()
raceParallel = do
putStrLn "Racing countdowns"
stdoutLn "Racing countdowns"
ignore $ race
[ countSeconds 10000
, countMillis 200
, onSignal SigINT (putStrLn "\nInterrupted by SigINT")
, onSignal SIGINT (stdoutLn "\nInterrupted by SIGINT")
]
```

Expand Down Expand Up @@ -256,7 +255,7 @@ fibo 0 = 1
fibo 1 = 1
fibo (S $ S k) = fibo k + fibo (S k)
sumFibos : Nat -> Nat -> Async WorkST [] ()
sumFibos : Nat -> Nat -> Async EpollST [Errno] ()
sumFibos nr fib = do
vs <- parTraverse (\n => lazy (fibo n)) (replicate nr fib)
printLn (maybe 0 sum vs)
Expand Down Expand Up @@ -288,12 +287,12 @@ an event to occur. As such, we want to be able to run *a lot* such
computations in parallel:

```idris
sleepMany : TimerH e => Nat -> Async e [] ()
sleepMany : TimerH e => Nat -> Async e [Errno] ()
sleepMany 0 = pure ()
sleepMany (S k) =
ignore $
parTraverse
(\n => sleep 100.ms >> putStrLn "fiber \{show n} done")
(\n => sleep 100.ms >> stdoutLn "fiber \{show n} done")
[0 .. k]
```

Expand Down Expand Up @@ -349,13 +348,13 @@ large enough to take hundreds of milliseconds at the least. In addition,
we print ever result to get an idea of the runtime behavior:

```idris
sumVisFibos : Nat -> Nat -> Async WorkST [] ()
sumVisFibos : Nat -> Nat -> Async EpollST [Errno] ()
sumVisFibos nr fib = do
vs <- parTraverse visFibo (replicate nr fib)
printLn (maybe 0 sum vs)
where
visFibo : Nat -> Async WorkST [] Nat
visFibo : Nat -> Async EpollST [Errno] Nat
visFibo n = lazy (fibo n) >>= \x => printLn x $> x
```

Expand Down Expand Up @@ -386,7 +385,7 @@ loop from `IO.Async.Loop.Epoll` is used to run this. This makes use of

```idris
covering
act : List String -> Async WorkST [] ()
act : List String -> Async EpollST [Errno] ()
act ("par" :: _) = countParallel
act ("par2" :: _) = countParallel2
act ("race" :: _) = raceParallel
Expand All @@ -401,13 +400,16 @@ act _ = countSequentially
-- `sigs` is used to block the default handling of the listed signals.
covering
run : (threads : Nat) -> {auto 0 _ : IsSucc threads} -> List String -> IO ()
run threads args = app threads sigs $ act args
run threads args = app threads sigs $ handle handlers (act args)
where
sigs : List Signal
sigs = case args of
"race"::_ => [SigINT]
"race"::_ => [SIGINT]
_ => []
handlers : All (Handler () e) [Errno]
handlers = [\x => stderrLn "Error: \{errorText x} (\{errorName x})"]
covering
main : IO ()
main = do
Expand Down
3 changes: 1 addition & 2 deletions async-epoll/async-epoll.ipkg
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ authors = "stefan-hoeck"
brief = "An epoll-based event loop for asynchronous computations on Linux"

depends = async
, epoll
, linux

modules = IO.Async.Loop.Epoll
, IO.Async.Loop.Poller

sourcedir = "src"
Loading

0 comments on commit 10ae85b

Please sign in to comment.