Skip to content

Commit

Permalink
feat(logger): add first iteration of logging component
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Jan 26, 2021
1 parent 6064d1b commit 751b7a3
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/logger/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/symbiont-io/detsys-testkit/src/logger

go 1.15

require github.com/mattn/go-sqlite3 v1.14.5
2 changes: 2 additions & 0 deletions src/logger/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/mattn/go-sqlite3 v1.14.5 h1:1IdxlwTNazvbKJQSxoJ5/9ECbEeaTTyeU7sEAZ5KKTQ=
github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
150 changes: 150 additions & 0 deletions src/logger/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package logger

import (
"bufio"
"bytes"
"context"
"database/sql"
"fmt"
"log"
"os"
"path/filepath"
"syscall"
"time"

_ "github.com/mattn/go-sqlite3"
)

const (
QUEUE_SIZE int = 1024
BUFFER_LEN int = 128
MAX_STARVE_COUNT int = 32
)

func main() {
db := OpenDB()
fh := OpenPipe()
queue := make(chan []byte, QUEUE_SIZE)

go worker(db, queue)

for {
scanner := bufio.NewScanner(fh)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
enqueue(queue, scanner.Bytes())
}
}
}

func worker(db *sql.DB, queue chan []byte) {
var buffer [][]byte
starve := 0
for {
if len(buffer) <= BUFFER_LEN || starve >= MAX_STARVE_COUNT {
commit(db, buffer)
buffer = [][]byte{}
} else {
item, ok := dequeue(queue)
if ok {
buffer = append(buffer, item)
starve = 0
} else {
starve++
}
}
}
}

func enqueue(queue chan []byte, item []byte) {
var ok bool
select {
case queue <- item:
ok = true
default:
ok = false
}
if !ok {
start := time.Now()
queue <- item
duration := time.Since(start)
log.Println("The main thread was blocked for %v due to the queue being full!",
duration)
}
}

// Non-blocking dequeue.
func dequeue(queue chan []byte) ([]byte, bool) {
var item []byte
var ok bool
select {
case item = <-queue:
ok = true
default:
ok = false
}
return item, ok
}

func commit(db *sql.DB, buffer [][]byte) {
start := time.Now()
ctx := context.Background()
tx, err := db.BeginTx(ctx, nil)
if err != nil {
panic(err)
}
for _, item := range buffer {
event, meta, data := parse(item)
_, err := tx.ExecContext(ctx,
`INSERT INTO event_log(event, meta, data) VALUES(?, ?, ?)`,
event, meta, data)
if err != nil {
_ = tx.Rollback()
return
}
}
if err := tx.Commit(); err != nil {
panic(err)
}
duration := time.Since(start)
log.Println("The worker thread commited %d items in %v!", BUFFER_LEN, duration)
}

func parse(item []byte) ([]byte, []byte, []byte) {
split := bytes.Split(item, []byte(";"))
if len(split) != 3 {
panic(fmt.Sprintf("parse: failed to split item: %v", item))
}
return split[0], split[1], split[2]
}

func OpenDB() *sql.DB {
path := DBPath()
// https://stackoverflow.com/questions/35804884/sqlite-concurrent-writing-performance
db, err := sql.Open("sqlite3",
path+"?&cache=shared&_journal_mode=WAL&_synchronous=NORMAL")
if err != nil {
panic(err)
}
db.SetMaxOpenConns(1)
return db
}

func DBPath() string {
path, ok := os.LookupEnv("DETSYS_DB")
if !ok {
path = os.Getenv("HOME") + "/.detsys.db"
}
return path
}

func OpenPipe() *os.File {
namedPipe := filepath.Join(os.TempDir(), "detsys-logger")
syscall.Mkfifo(namedPipe, 0600)

fh, err := os.OpenFile(namedPipe, os.O_RDONLY, 0600)
if err != nil {
panic(err)
}
return fh
}

0 comments on commit 751b7a3

Please sign in to comment.