Skip to content

Commit

Permalink
feat(executor): Executor now sends the ExecutionStep event
Browse files Browse the repository at this point in the history
This event unifies the old `log_trace` table and `heap_trace` table
in one event. This pr now makes it so the executor only calls one function
and it will emit this event, and also append to the old tables for now.
In the future we will be able to remove those appends.

This also allowed some more refactoring of the handlers, we no longer use the
`EventLogEmitter` so this could be removed. This also simplied the `Executor`
type.
  • Loading branch information
symbiont-daniel-gustafsson committed Jan 19, 2021
1 parent 239bd2a commit 47a2b75
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 116 deletions.
74 changes: 74 additions & 0 deletions src/executor/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package executor

import (
"database/sql"
"encoding/json"
"time"

"github.com/symbiont-io/detsys-testkit/src/lib"
)

// We should probably have what kind of step this is, message vs. timer
type ExecutionStepEvent struct {
Meta lib.MetaInfo
Reactor string
SimulatedTime time.Time
LogLines []string
HeapDiff json.RawMessage
}

// This should be removed when are not using the old events anymore
func emitOldEvents(db *sql.DB, event ExecutionStepEvent) {
appendHeapTrace(db, event.Meta.TestId, event.Meta.RunId, event.Reactor, event.HeapDiff, event.SimulatedTime)

for _, p := range event.LogLines {
lib.AddLogStamp(db, event.Meta.TestId, event.Meta.RunId, event.Reactor, []byte(p), event.SimulatedTime)
}
}

func EmitExecutionStepEvent(db *sql.DB, event ExecutionStepEvent) {
metaBlob, err := json.Marshal(struct {
Component string `json:"component"`
RunId lib.RunId `json:"run-id"`
TestId lib.TestId `json:"test-id"`
}{
Component: "executor",
RunId: event.Meta.RunId,
TestId: event.Meta.TestId,
})
if err != nil {
panic(err)
}

dataBlob, err := json.Marshal(struct {
Reactor string `json:"reactor"`
LogicalTime int `json:"logical-time"`
SimulatedTime time.Time `json:"simulated-time"`
LogLines []string `json:"log-lines"`
HeapDiff json.RawMessage `json:"diff"`
}{
Reactor: event.Reactor,
LogicalTime: event.Meta.LogicalTime,
SimulatedTime: event.SimulatedTime,
LogLines: event.LogLines,
HeapDiff: event.HeapDiff,
})
if err != nil {
panic(err)
}

stmt, err := db.Prepare(`INSERT INTO event_log(event, meta, data) VALUES(?,?,?)`)
if err != nil {
panic(err)
}
defer stmt.Close()

_, err = stmt.Exec("ExecutionStep", metaBlob, dataBlob)

if err != nil {
panic(err)
}

// Remove when we no longer use old events
emitOldEvents(db, event)
}
86 changes: 40 additions & 46 deletions src/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,13 @@ func jsonError(s string) string {
}

type StepInfo struct {
LogLines [][]byte
LogLines []string
}

type ComponentUpdate = func(component string) StepInfo
type Topology = map[string]lib.Reactor

func flushLog(si StepInfo, component string, at time.Time, mi lib.MetaInfo) {
// Inefficient for now, but this will be removed later
for _, p := range si.LogLines {
lib.AddLogStamp(mi.TestId, mi.RunId, component, p, at)
}
}

func handler(db *sql.DB, eventLog lib.EventLogEmitter, topology Topology, m lib.Marshaler, cu ComponentUpdate) http.HandlerFunc {
func handler(db *sql.DB, topology Topology, m lib.Marshaler, cu ComponentUpdate) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if r.Method != "POST" {
Expand All @@ -56,14 +49,21 @@ func handler(db *sql.DB, eventLog lib.EventLogEmitter, topology Topology, m lib.
oevs := topology[sev.To].Receive(sev.At, sev.From, sev.Event)
heapAfter := dumpHeapJson(topology[sev.To])
heapDiff := jsonDiff(heapBefore, heapAfter)
appendHeapTrace(db, sev.Meta.TestId, sev.Meta.RunId, sev.To, heapDiff, sev.At)
flushLog(si, sev.To, sev.At, sev.Meta)

EmitExecutionStepEvent(db, ExecutionStepEvent{
Meta: sev.Meta,
Reactor: sev.To,
SimulatedTime: sev.At,
LogLines: si.LogLines,
HeapDiff: heapDiff,
})

bs := lib.MarshalUnscheduledEvents(sev.To, oevs)
fmt.Fprint(w, string(bs))
}
}

func handleTick(eventLog lib.EventLogEmitter, topology Topology, m lib.Marshaler, cu ComponentUpdate) http.HandlerFunc {
func handleTick(topology Topology, m lib.Marshaler, cu ComponentUpdate) http.HandlerFunc {
type TickRequest struct {
Component string `json:"component"`
At time.Time `json:"at"`
Expand Down Expand Up @@ -91,7 +91,7 @@ func handleTick(eventLog lib.EventLogEmitter, topology Topology, m lib.Marshaler
}
}

func handleTimer(db *sql.DB, eventLog lib.EventLogEmitter, topology Topology, m lib.Marshaler, cu ComponentUpdate) http.HandlerFunc {
func handleTimer(db *sql.DB, topology Topology, m lib.Marshaler, cu ComponentUpdate) http.HandlerFunc {
type TimerRequest struct {
Component string `json:"to"`
At time.Time `json:"at"`
Expand Down Expand Up @@ -120,8 +120,14 @@ func handleTimer(db *sql.DB, eventLog lib.EventLogEmitter, topology Topology, m
heapAfter := dumpHeapJson(topology[req.Component])
heapDiff := jsonDiff(heapBefore, heapAfter)

appendHeapTrace(db, req.Meta.TestId, req.Meta.RunId, req.Component, heapDiff, req.At)
flushLog(si, req.Component, req.At, req.Meta)
EmitExecutionStepEvent(db, ExecutionStepEvent{
Meta: req.Meta,
Reactor: req.Component,
SimulatedTime: req.At,
LogLines: si.LogLines,
HeapDiff: heapDiff,
})

bs := lib.MarshalUnscheduledEvents(req.Component, oevs)
fmt.Fprint(w, string(bs))
}
Expand Down Expand Up @@ -158,15 +164,15 @@ func handleInits(topology Topology, m lib.Marshaler) http.HandlerFunc {
}
}

func DeployWithComponentUpdate(srv *http.Server, eventLog lib.EventLogEmitter, topology Topology, m lib.Marshaler, cu ComponentUpdate) {
func DeployWithComponentUpdate(srv *http.Server, topology Topology, m lib.Marshaler, cu ComponentUpdate) {
mux := http.NewServeMux()

db := lib.OpenDB()
defer db.Close()

mux.HandleFunc("/api/v1/event", handler(db, eventLog, topology, m, cu))
mux.HandleFunc("/api/v1/tick", handleTick(eventLog, topology, m, cu))
mux.HandleFunc("/api/v1/timer", handleTimer(db, eventLog, topology, m, cu))
mux.HandleFunc("/api/v1/event", handler(db, topology, m, cu))
mux.HandleFunc("/api/v1/tick", handleTick(topology, m, cu))
mux.HandleFunc("/api/v1/timer", handleTimer(db, topology, m, cu))
mux.HandleFunc("/api/v1/inits", handleInits(topology, m))

srv.Addr = ":3001"
Expand All @@ -176,8 +182,8 @@ func DeployWithComponentUpdate(srv *http.Server, eventLog lib.EventLogEmitter, t
}
}

func Deploy(srv *http.Server, eventLog lib.EventLogEmitter, topology Topology, m lib.Marshaler) {
DeployWithComponentUpdate(srv, eventLog, topology, m, func(string) StepInfo { return StepInfo{} })
func Deploy(srv *http.Server, topology Topology, m lib.Marshaler) {
DeployWithComponentUpdate(srv, topology, m, func(string) StepInfo { return StepInfo{} })
}

func topologyFromDeployment(testId lib.TestId, constructor func(string) lib.Reactor) (Topology, error) {
Expand Down Expand Up @@ -215,13 +221,13 @@ func topologyFromDeployment(testId lib.TestId, constructor func(string) lib.Reac
return topologyCooked, nil
}

func DeployRaw(srv *http.Server, testId lib.TestId, eventLog lib.EventLogEmitter, topology map[string]string, m lib.Marshaler, constructor func(string) lib.Reactor) {
func DeployRaw(srv *http.Server, testId lib.TestId, topology map[string]string, m lib.Marshaler, constructor func(string) lib.Reactor) {
topologyCooked, err := topologyFromDeployment(testId, constructor)
if err != nil {
panic(err)
}
fmt.Printf("Deploying topology: %+v\n", topologyCooked)
Deploy(srv, eventLog, topologyCooked, m)
Deploy(srv, topologyCooked, m)
}

type LogWriter struct {
Expand Down Expand Up @@ -255,9 +261,7 @@ type Executor struct {
buffers map[string]*LogWriter
marshaler lib.Marshaler
testId lib.TestId
runId lib.RunId
constructor func(name string, logger *zap.Logger) lib.Reactor
eventLog lib.EventLogEmitter
logger *zap.Logger
}

Expand All @@ -269,48 +273,37 @@ func (e *Executor) SetTestId(testId lib.TestId) {
e.testId = testId
}

func NewExecutor(testId lib.TestId, marshaler lib.Marshaler, logger *zap.Logger, components []string, constructor func(name string, logger *zap.Logger) lib.Reactor) *Executor {
runId := lib.RunId{0}

func NewExecutor(testId lib.TestId, marshaler lib.Marshaler, logger *zap.Logger, reactorNames []string, constructor func(name string, logger *zap.Logger) lib.Reactor) *Executor {
topology := make(map[string]lib.Reactor)
buffers := make(map[string]*LogWriter)

for _, component := range components {
for _, reactorName := range reactorNames {
buffer := &LogWriter{
current: [][]byte{},
}
topology[component] = constructor(component, buffer.AppendToLogger(logger))
buffers[component] = buffer
topology[reactorName] = constructor(reactorName, buffer.AppendToLogger(logger))
buffers[reactorName] = buffer
}

executor := &Executor{
topology: topology,
buffers: buffers,
marshaler: marshaler,
testId: testId,
runId: runId,
constructor: constructor,
eventLog: lib.EventLogEmitter{
Component: "Executor",
TestId: nil,
RunId: nil,
},
logger: logger,
logger: logger,
}

executor.eventLog.TestId = &executor.testId
executor.eventLog.RunId = &executor.runId

return executor
}

func (e *Executor) Deploy(srv *http.Server) {
DeployWithComponentUpdate(srv, e.eventLog, e.topology, e.marshaler, func(name string) StepInfo {
DeployWithComponentUpdate(srv, e.topology, e.marshaler, func(name string) StepInfo {
buffer, ok := e.buffers[name]
logs := make([][]byte, 0)
logs := make([]string, 0)
if ok {
for _, l := range buffer.current {
logs = append(logs, l)
logs = append(logs, string(l))
}
buffer.current = make([][]byte, 0)
} else {
Expand All @@ -323,11 +316,12 @@ func (e *Executor) Deploy(srv *http.Server) {
}

func (e *Executor) Register() {
// Should probably separate the loading of the database to get deployment and register
// if so we could remove the need for `Executor` to know the test-id
lib.Register(e.testId)
}

func (e *Executor) Reset(runId lib.RunId) {
e.runId = runId
func (e *Executor) Reset() {
for c, b := range e.buffers {
e.topology[c] = e.constructor(c, b.AppendToLogger(e.logger))
}
Expand Down
52 changes: 0 additions & 52 deletions src/lib/event.go

This file was deleted.

7 changes: 3 additions & 4 deletions src/lib/log.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package lib

import (
"database/sql"
"time"
)

func AddLogStamp(testId TestId, runId RunId, component string, message []byte, at time.Time) {
db := OpenDB()
defer db.Close()

// Deprecated: will be removed in the future
func AddLogStamp(db *sql.DB, testId TestId, runId RunId, component string, message []byte, at time.Time) {
stmt, err := db.Prepare(`INSERT INTO log_trace (test_id, run_id, id, component, log, simulated_time)
VALUES (?, ?, (SELECT IFNULL(MAX(id), -1) + 1 FROM log_trace WHERE test_id = ? AND run_id = ?), ?, ?, ?)
`)
Expand Down
8 changes: 1 addition & 7 deletions src/sut/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@ func once(round Round, testId lib.TestId, t *testing.T) (lib.RunId, bool) {
"C": NewNode(round, "B"),
}
marshaler := NewMarshaler()
eventLog := lib.EventLogEmitter{
Component: "Broadcast test",
TestId: &testId,
RunId: nil,
}
var srv http.Server
lib.Setup(func() {
executor.Deploy(&srv, eventLog, topology, marshaler)
executor.Deploy(&srv, topology, marshaler)
})
qs := lib.LoadTest(testId)
log.Printf("Loaded test of size: %d\n", qs.QueueSize)
lib.Register(testId)
log.Printf("Registered executor")
runId := lib.CreateRun(testId)
eventLog.RunId = &runId
log.Printf("Created run id: %v", runId)
lib.Run()
log.Printf("Finished run id: %d\n", runId.RunId)
Expand Down
8 changes: 1 addition & 7 deletions src/sut/register/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,15 @@ func createTopology(newFrontEnd func() lib.Reactor) map[string]lib.Reactor {
func once(newFrontEnd func() lib.Reactor, testId lib.TestId, t *testing.T) (lib.RunId, bool) {
topology := createTopology(newFrontEnd)
marshaler := NewMarshaler()
eventLog := lib.EventLogEmitter{
Component: "Broadcast test",
TestId: &testId,
RunId: nil,
}
var srv http.Server
lib.Setup(func() {
executor.Deploy(&srv, eventLog, topology, marshaler)
executor.Deploy(&srv, topology, marshaler)
})
qs := lib.LoadTest(testId)
lib.SetSeed(lib.Seed{4})
log.Printf("Loaded test of size: %d\n", qs.QueueSize)
lib.Register(testId)
runId := lib.CreateRun(testId)
eventLog.RunId = &runId
lib.Run()
log.Printf("Finished run id: %d\n", runId.RunId)
lib.Teardown(&srv)
Expand Down

0 comments on commit 47a2b75

Please sign in to comment.