-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs: add first draft of event loop design doc
- Loading branch information
1 parent
6453bf6
commit ab7d7d5
Showing
1 changed file
with
351 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,351 @@ | ||
#+title: The event loop | ||
#+author: Stevan Andjelkovic | ||
|
||
* Motivation | ||
** Top-down | ||
** Bottom-up | ||
** Questions | ||
*** Convenient | ||
*** Performance | ||
*** "real" vs "simulation" implementation | ||
*** Deployment | ||
*** Operator expererience or observability | ||
*** Upgrades | ||
*** security | ||
*** HA | ||
*** Scalability | ||
|
||
*** faults, filesystem I/O, or MITM | ||
|
||
* The event loop | ||
``` | ||
el1 := makeEventLoop(address) | ||
el2 := makeEventLoop(address2) | ||
r1 := deploy(someReactor, el) | ||
r2 := deploy(anotherReactor(r1), el2) | ||
``` | ||
|
||
* Convenience | ||
** Local calls (sync) | ||
anotherReactor(refToSomeReactor, msg) := | ||
reply := invoke(refToSomeReactor, msg2) | ||
return {reply} | ||
|
||
** Remote calls with reply (async) | ||
yetAnotherReactor(refToRemoteReactor, msg) := | ||
if msg == foo then promise := | ||
send(refToRemoteReactor, msg2) | ||
await (promise, fn (reply) { return reply }) | ||
|
||
** mapReduce(ref1, ref2, ref3) := | ||
promise1 := send(ref1, task1) | ||
promise2 := send(ref2, task2) | ||
promise3 := send(ref3, task3) | ||
await ([promise1, promise2, promise2], | ||
fn ([reply1, reply2, reply3]) | ||
return {reduce reply1, reply2, reply3}) | ||
|
||
** Async I/O | ||
promise := dbRead/Write | ||
await promise ... | ||
|
||
** Pipelining | ||
|
||
promise := send(ref, msg) | ||
if promise.reply == bar then | ||
send(ref, msg2) | ||
|
||
* "real" vs "simulation" impl and performance | ||
|
||
stack heap | ||
($) (actormap) | ||
.-------.----------------------. -. | ||
| | | | | ||
| | .-. | | | ||
| | (obj) .-. | | | ||
| | '-' (obj) | | | ||
| __ | '-' | | | ||
| |__>* | .-. | |- actormap | ||
| __ | (obj) | | territory | ||
| |__>* | '-' | | | ||
| __ | | | | ||
| |__>* | | | | ||
:-------'----------------------: -' | ||
queue | __ __ __ | -. | ||
(<-) | |__>* |__>* |__>* | |- event loop | ||
'------------------------------' -' territory | ||
|
||
** Main event loop is CPU core | ||
** Worker threads on separate CPU cores doing | ||
async stuff | ||
|
||
* Operator expereince or observability | ||
** log of network traffic and state diffs | ||
** detsys debugger | ||
|
||
* Deployment and HA | ||
|
||
** Supervisor trees | ||
** stuct Sup = RestartStrat & List Children | ||
rootSup := Sup restartStrat | ||
[ reactor1, Sup restat2 [reactor2, reactor3]] | ||
|
||
** "let it crash" | ||
** deploy(someReactor, el) | ||
deploy(someSupervisor, el) | ||
|
||
* Upgrades | ||
** Assume we can send reactor over the wire | ||
** deploy(newReactor, el) | ||
|
||
* Security | ||
** Reference | ||
** spawn or ref passed in message | ||
|
||
* The middle | ||
|
||
** async messages and I/O and scheduler? | ||
|
||
* Summary | ||
|
||
** Convencience of programming the SUT | ||
** Foundation for performance | ||
** Potentially helpful for other difficult problems | ||
*** fault injection around disk I/O | ||
*** deployment and operability including observability | ||
*** scalability | ||
*** HA | ||
*** upgrades | ||
|
||
--- | ||
|
||
#+title: The event loop (part 2) | ||
#+author: Stevan Andjelkovic | ||
#+date: Wed Jul 21 15:00:00 CEST 2021 | ||
|
||
* Recap: motivation | ||
** First implementation of detsys | ||
*** slow communication between scheduler and executor | ||
*** scheduler/executor event logging is synchronous and slow | ||
*** scheduler implementation is complicated | ||
** Convenience | ||
*** useful for non-async parts of smartlog | ||
*** sync communication between reactors that are "near" | ||
*** async disk I/O? | ||
**** leveldb async flag means no fsync... | ||
** Potentially helpful for other difficult problems | ||
*** performance / scalability | ||
*** deployment and operability including observability | ||
*** HA | ||
*** upgrades | ||
*** fault injection around disk I/O | ||
|
||
* Implementation so far | ||
** Most of "convenience" features | ||
** Even though idea was pretty clear, was difficult to get right | ||
*** 3 complete rewrites, fine because small implementation | ||
*** haskell currently, but should be easy to port to golang | ||
*** more code than executor, but not much | ||
* Benchmarking | ||
** single process test / built-in profiler | ||
** histogram | ||
*** measure(value: Double, h: Histogram) | ||
*** percentile(p: Double, h: Histogram): Double | ||
*** E.g.: | ||
#+begin_src golang | ||
h := newHistogram | ||
measure(1, h) | ||
measure(1, h) | ||
measure(2, h) | ||
measure(2, h) | ||
measure(3, h) | ||
assert(percentile(0.0, h), 1) // min | ||
assert(percentile(40.0, h), 1) | ||
assert(percentile(40.1, h), 2) | ||
assert(percentile(80.0, h), 2) | ||
assert(percentile(80.1, h), 3) | ||
assert(percentile(100.0, h), 3) // max | ||
#+end_src | ||
*** Implementation idea | ||
any double can be compressed into one of 2^16 buckets, with less than 1% | ||
compression loss (using the natural logarithm function for compression and | ||
exponentiation for decompression, hence the name logarithmic bucketing) | ||
|
||
** metrics (histograms and counters) | ||
*** What to collect? Following Brendan Gregg USE method: | ||
*** utilisation (proportion of time the system is busy, as opposed to waiting) | ||
*** saturation (how long does a request have to wait? the queue depth when a request arrives) | ||
*** errors | ||
*** latency (time that an operation takes) | ||
*** throughput (how many operations can be performed in some unit of time) | ||
** event loop level vs SUT level vs OS level? | ||
|
||
* Next steps | ||
** reimplement scheduler on top of event loop | ||
*** some experience using this style of programming before adopting it for Smartlog | ||
*** solves problem with slow sync disk I/O | ||
*** and slow communication between executor and scheduler | ||
*** reduce scheduler complexity, make it testable and debuggable using the detsys | ||
** extend benchmarking (more workloads and collecting more metrics) | ||
** client request throughput via event loop | ||
*** `wrk` uses event loop implenented using epoll | ||
*** select/epoll/io_uring | ||
*** same approach as Chuck's event loop, but for client side | ||
|
||
* Longer term | ||
** put the Smartlog reactors on top of the event loop | ||
*** timers? | ||
** merge efforts with progress made by others on: | ||
*** scalability / benchmarking | ||
*** deployment and operability including observability | ||
*** HA | ||
*** upgrades | ||
|
||
--- | ||
|
||
#+title: The event loop (part 3) | ||
#+author: Stevan Andjelkovic | ||
#+date: ? | ||
|
||
* Requirements | ||
** Convenience | ||
** Performance | ||
** Testability | ||
** Observability | ||
** (Deployment) | ||
*** for development | ||
**** whole cluster running in single process | ||
**** fake networking between nodes | ||
**** simulation testing | ||
*** for production | ||
**** one node per computer | ||
**** real networking | ||
** (HA) | ||
** (Upgrades) | ||
* Reactors | ||
** Local sync invoke | ||
** Remote async send | ||
** (A)sync disk I/O | ||
** Timers | ||
** Install callbacks on completion of async actions | ||
|
||
* Event loop | ||
** reactors are spawned on event loops | ||
** each event loop has a "queue" of events | ||
** event producers | ||
*** incoming requests (remote async sends from reactors on remote event loops) | ||
*** incoming response (to remote async send made by reactor running on this event loop) | ||
*** async disk I/O completed | ||
*** timeouts | ||
** event consumer | ||
|
||
** the trace of processing the events is the basis for testability and observability | ||
|
||
* Event loop threading | ||
** Single-threaded polling | ||
*** run each non-blocking handler in turn | ||
*** inefficient as we might be running handlers that have nothing to do | ||
*** pseudo code | ||
#+begin_src go | ||
func main() { | ||
... | ||
ls := ... // the event loop state, contains the event queue etc. | ||
for { | ||
runHandlers(ls) | ||
} | ||
} | ||
#+end_src go | ||
|
||
** Multi-threaded | ||
*** run each, possibly blocking, handler in separate thread | ||
*** needs synchronisation, which is expensive | ||
*** closest to idiomatic Go | ||
*** pseudo code | ||
#+begin_src go | ||
func networkProducer(ls loopState, ch chan event) { | ||
... | ||
} | ||
|
||
func timeoutProducer(ls loopState, ch chan event) { | ||
... | ||
} | ||
|
||
func asyncIOProducer(ls loopState, ch chan event) { | ||
... | ||
} | ||
|
||
func eventConsumer(ls loopState, ch chan event) { | ||
for { | ||
e := ls.events.dequeue() | ||
ch <- e | ||
} | ||
} | ||
|
||
func main() { | ||
ch1 := make(chan event) | ||
ch2 := make(chan event) | ||
ch3 := make(chan event) | ||
ch4 := make(chan event) | ||
go networkProducer(ls, ch1) | ||
go timeoutProducer(ls, ch2) | ||
go asyncIOProducer(ls, ch3) | ||
go eventConsumer(ls, ch4) | ||
|
||
for { | ||
select { | ||
case e1 := <-ch1: | ||
ls.events.enqueue(e1) | ||
case e2 := <-ch2: | ||
ls.events.enqueue(e2) | ||
case e3 := <-ch3: | ||
ls.events.enqueue(e3) | ||
case e4 := <-ch4: | ||
... | ||
} | ||
} | ||
} | ||
#+end_src go | ||
** Single-threaded notified | ||
*** Use kernel level notifications, select/poll/epoll/kqueue/io_uring | ||
*** basically wait for some file handle to be readable/writable or timer to fire | ||
**** sockets for networking | ||
**** io_uring can handle any filesystem file | ||
*** avoids inefficiency of running handlers unnecessarily | ||
*** pseudo code | ||
#+begin_src go | ||
func main() { | ||
fds := ... // file descriptors to watch | ||
for { | ||
r := posix.select(fds, ...) | ||
// one of the fds is ready... figure out which and enqueue appropritate event to queue | ||
// when do we run eventConsumer? | ||
} | ||
} | ||
#+end_src go | ||
|
||
* Thread pools | ||
|
||
* Batching | ||
** Automatic low latency / high throughput reconfiguration | ||
|
||
* Zero-deseralisation | ||
** fixed sized datastructures | ||
** no parsing | ||
*** cast incoming bytestrings directly into said datastructures | ||
*** or use functions to read fields straight from the bytestring | ||
|
||
* Zero-syscalls | ||
** io_uring allows us to batch syscalls effectively amortising their cost | ||
** increasingly important post meltdown/spectre | ||
*** https://www.brendangregg.com/blog/2018-02-09/kpti-kaiser-meltdown-performance.html | ||
|
||
* Zero-allocation | ||
** Allocate at start up, not while running | ||
** Disk | ||
** Memory | ||
|
||
* Zero-copy | ||
** Direct I/O | ||
|
||
* Resources | ||
** http://ithare.com/five-myths-used-in-golang-vs-node-js-debate/ |