-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(logger): start turning logger into a reactor
- Loading branch information
1 parent
88b9307
commit e6f2e5d
Showing
4 changed files
with
271 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,263 @@ | ||
package logger | ||
|
||
import ( | ||
"bufio" | ||
"bytes" | ||
"context" | ||
"database/sql" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"log" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/symbiont-io/detsys-testkit/src/lib" | ||
) | ||
|
||
const ( | ||
QUEUE_SIZE int = 1024 | ||
BUFFER_LEN int = 512 | ||
PIPE_BUF int = 512 // POSIX | ||
) | ||
|
||
type Logger struct { | ||
Queue chan []byte | ||
} | ||
|
||
func NewLogger() *Logger { | ||
return &Logger{ | ||
Queue: make(chan []byte, QUEUE_SIZE), | ||
} | ||
} | ||
|
||
type Log struct { | ||
Data []byte | ||
} | ||
|
||
func (_ Log) Request() {} | ||
func (_ Log) RequestEvent() string { return "log" } | ||
|
||
func (l *Logger) Receive(at time.Time, from string, event lib.InEvent) []lib.OutEvent { | ||
switch ev := event.(type) { | ||
case *lib.ClientRequest: | ||
switch req := (*ev).Request.(type) { | ||
case Log: | ||
enqueue(l.Queue, req.Data) | ||
default: | ||
panic(fmt.Errorf("Unknown request type: %s\n", req)) | ||
} | ||
default: | ||
panic(fmt.Errorf("Unknown event type: %s\n", ev)) | ||
} | ||
return nil | ||
} | ||
|
||
func (l *Logger) Init() []lib.OutEvent { | ||
db := OpenDB() | ||
go worker(db, l.Queue) | ||
return nil | ||
} | ||
|
||
func (_ *Logger) Tick(_ time.Time) []lib.OutEvent { | ||
return nil | ||
} | ||
|
||
func (_ *Logger) Timer(_ time.Time) []lib.OutEvent { | ||
return nil | ||
} | ||
|
||
func newBuffer() [][]byte { | ||
return make([][]byte, 0, BUFFER_LEN) | ||
} | ||
|
||
func worker(db *sql.DB, queue chan []byte) { | ||
buffer := newBuffer() | ||
for { | ||
if len(buffer) >= BUFFER_LEN { | ||
commit(db, buffer) | ||
buffer = newBuffer() | ||
} else { | ||
if len(buffer) == 0 { | ||
entry := <-queue // Blocking. | ||
buffer = append(buffer, entry) | ||
} else { | ||
entry, ok := dequeue(queue) | ||
if ok { | ||
buffer = append(buffer, entry) | ||
} else { | ||
commit(db, buffer) | ||
buffer = newBuffer() | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
func enqueue(queue chan []byte, entry []byte) { | ||
var ok bool | ||
select { | ||
case queue <- entry: | ||
ok = true | ||
default: | ||
ok = false | ||
} | ||
if !ok { | ||
start := time.Now() | ||
queue <- entry | ||
duration := time.Since(start) | ||
log.Printf("The main thread was blocked for %v due to the queue being full!\n", | ||
duration) | ||
} | ||
} | ||
|
||
// Blocking dequeue with timeout. | ||
func dequeue(queue chan []byte) ([]byte, bool) { | ||
var entry []byte | ||
var ok bool | ||
select { | ||
case entry = <-queue: | ||
ok = true | ||
case <-time.After(200 * time.Millisecond): | ||
ok = false | ||
} | ||
return entry, ok | ||
} | ||
|
||
func commit(db *sql.DB, buffer [][]byte) { | ||
start := time.Now() | ||
ctx := context.Background() | ||
tx, err := db.BeginTx(ctx, nil) | ||
if err != nil { | ||
panic(err) | ||
} | ||
for _, entry := range buffer { | ||
event, meta, data := parse(entry) | ||
_, err := tx.ExecContext(ctx, | ||
`INSERT INTO event_log(event, meta, data) VALUES(?, ?, ?)`, | ||
event, meta, data) | ||
if err != nil { | ||
panic(err) | ||
} | ||
} | ||
if err := tx.Commit(); err != nil { | ||
panic(err) | ||
} | ||
duration := time.Since(start) | ||
log.Printf("The worker thread committed %d entries in %v!\n", len(buffer), duration) | ||
} | ||
|
||
func parse(entry []byte) ([]byte, []byte, []byte) { | ||
// NOTE: Tab characters are not allowed to appear unescaped in JSON. | ||
split := bytes.Split(entry, []byte("\t")) | ||
if len(split) != 3 { | ||
panic(fmt.Sprintf("parse: failed to split entry: '%s'\n", string(entry))) | ||
} | ||
return split[0], split[1], split[2] | ||
} | ||
|
||
func OpenDB() *sql.DB { | ||
path := DBPath() | ||
// https://stackoverflow.com/questions/35804884/sqlite-concurrent-writing-performance | ||
db, err := sql.Open("sqlite3", | ||
path+"?&cache=shared&_journal_mode=WAL&_synchronous=NORMAL") | ||
if err != nil { | ||
panic(err) | ||
} | ||
db.SetMaxOpenConns(1) | ||
return db | ||
} | ||
|
||
func DBPath() string { | ||
path, ok := os.LookupEnv("DETSYS_DB") | ||
if !ok { | ||
path = os.Getenv("HOME") + "/.detsys.db" | ||
} | ||
return path | ||
} | ||
|
||
type Marshaler struct{} | ||
|
||
func NewMarshaler() *Marshaler { | ||
return &Marshaler{} | ||
} | ||
|
||
func (_ *Marshaler) UnmarshalRequest(request string, raw json.RawMessage, req *lib.Request) error { | ||
switch strings.ToLower(request) { | ||
case "log": | ||
var op struct { | ||
Data []byte `json:"data"` | ||
} | ||
if err := json.Unmarshal(raw, &op); err != nil { | ||
panic(err) | ||
} | ||
*req = Log{ | ||
Data: op.Data, | ||
} | ||
default: | ||
panic(fmt.Errorf("Unknown request type: %s\n%s", request, raw)) | ||
} | ||
return nil | ||
} | ||
|
||
func deployReadOnlyPipe(pipePath string, reactor lib.Reactor, m lib.Marshaler) { | ||
|
||
fh := openPipe(pipePath) | ||
r := bufio.NewReaderSize(fh, PIPE_BUF) | ||
|
||
for { | ||
// TODO(stevan): If we want or need to support linearisable | ||
// reads rather than eventual consist ant reads, we could do it | ||
// as follows. Upon opening the db, save the highest index of | ||
// the event log table. When reading a line, increment the | ||
// index, parse the line to determine if it's a write or a read. | ||
// If it's a write, proceed like below. If it's a read then | ||
// spawn a new goroutine and pass it the index and the read | ||
// query. This reader goroutine should then wait until the index | ||
// is persisted in the db and then perform the read query. This | ||
// way the reads happen as fast they can while being | ||
// linearisable and not holding up writes. The efficiency of | ||
// this solution relies on the assumption that there are many | ||
// writes between each read. | ||
line, err := r.ReadBytes('\n') | ||
for err == nil { | ||
// TODO(stevan): how can we not only try to parse `log`? | ||
// The client could prepend the line with the operation | ||
// and then add a tab to separate it from the data? | ||
var req lib.Request | ||
err := m.UnmarshalRequest("log", line, &req) | ||
if err != nil { | ||
panic(err) | ||
} | ||
from := "client" // TODO(stevan): make this part of the request? | ||
var clientId uint64 | ||
clientId = 0 | ||
oevs := reactor.Receive(time.Now(), from, lib.ClientRequest{clientId, req}) | ||
if oevs != nil { | ||
panic("read only pipe") | ||
} | ||
line, err = r.ReadBytes('\n') | ||
} | ||
if err != io.EOF { | ||
panic(err) | ||
} else { | ||
// Avoid getting into a 100% CPU loop if there's nothing | ||
// to read from the pipe. | ||
time.Sleep(50 * time.Millisecond) | ||
} | ||
} | ||
} | ||
|
||
func openPipe(pipePath string) *os.File { | ||
namedPipe := filepath.Join(os.TempDir(), pipePath) | ||
syscall.Mkfifo(namedPipe, 0600) | ||
|
||
fh, err := os.OpenFile(namedPipe, os.O_RDONLY, 0600) | ||
if err != nil { | ||
panic(err) | ||
} | ||
return fh | ||
} |