Skip to content

Commit

Permalink
feat(executor): Make admin interface use unix domain sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-daniel-gustafsson committed Nov 8, 2021
1 parent a6d396e commit 42d5ca6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 43 deletions.
57 changes: 28 additions & 29 deletions src/executor-event-loop/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ package executorEL
import (
"bufio"
"fmt"
"net"
"os"
)

type AdminCommand int
type AdminCommandType int

const (
AdminQuit = iota
Expand All @@ -19,9 +20,14 @@ const (
AdminUnknown
)

type AdminCommand struct {
Type AdminCommandType
Response func(string)
}

// panic galore if we don't have it
func parseCommandFromString(command string) AdminCommand {
v, ok := map[string]AdminCommand{
func parseCommandFromString(command string) AdminCommandType {
v, ok := map[string]AdminCommandType{
"AdminQuit": AdminQuit,
"AdminDumpLog": AdminDumpLog,
"AdminResetLog": AdminResetLog,
Expand All @@ -35,46 +41,46 @@ func parseCommandFromString(command string) AdminCommand {

type AdminInterface struct {
Started bool
Incoming string
Outgoing string
Domain string
ListenChannel chan AdminCommand
}

func NewAdmin(input string, output string) (*AdminInterface, *error) {
err := createPipe(input)
if err != nil {
return nil, err
}

err = createPipe(output)
if err != nil {
return nil, err
}

func NewAdmin(domain string) *AdminInterface {
com := make(chan AdminCommand)
return &AdminInterface{
Started: false,
Incoming: input,
Outgoing: output,
Domain: domain,
ListenChannel: com,
}, nil
}
}

func (a *AdminInterface) findIncoming() {
if err := os.RemoveAll(a.Domain); err != nil {
panic(err)
}
l, err := net.Listen("unix", a.Domain)
if err != nil {
panic(err)
}
defer l.Close()
for {
file, err := openPipe(a.Incoming, os.O_RDONLY)
conn, err := l.Accept()
if err != nil {
panic(err)
}
buf := bufio.NewScanner(file)
defer conn.Close()
buf := bufio.NewScanner(conn)
for buf.Scan() {
line := buf.Text()
if line == "" {
continue
}
command := parseCommandFromString(line)
if command != AdminUnknown {
a.ListenChannel <- command
a.ListenChannel <- AdminCommand{command, func(line string) {
conn.Write([]byte(line))
conn.Close()
}}
} else {
fmt.Printf("Unknown command: %s\n", line)
}
Expand All @@ -89,10 +95,3 @@ func (a *AdminInterface) Listen() <-chan AdminCommand {
}
return a.ListenChannel
}

func (a *AdminInterface) Respond(line string) {
if !a.Started {
panic("Can't send on admin interface before you Listen()")
}
writePipe(a.Outgoing, line)
}
20 changes: 10 additions & 10 deletions src/executor-event-loop/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (el *EventLoop) AddToLog(logDirection LogDirection, me LocalRef, env Messag
}

func (el *EventLoop) AddToAdminLog(cmd AdminCommand) {
el.AddToLog(LogResumeContinuation, LocalRef{0}, Message{"AdminCommand", []byte(fmt.Sprintf("Got command %d\n", cmd))})
el.AddToLog(LogResumeContinuation, LocalRef{0}, Message{"AdminCommand", []byte(fmt.Sprintf("\"Got command %d\\n\"", cmd))})
}

func (el *EventLoop) toSchedulerEnvelope(me RemoteRef, msg Message, correlationId CorrelationId) Envelope {
Expand All @@ -56,22 +56,25 @@ func (el *EventLoop) toSchedulerEnvelope(me RemoteRef, msg Message, correlationI
}
}

func (el *EventLoop) processAdmin(cmd AdminCommand) (bool, []string) {
switch cmd {
func (el *EventLoop) processAdmin(cmd AdminCommand) bool {
switch cmd.Type {
case AdminQuit:
fmt.Printf("Shutting down....\n")
return true, []string{}
cmd.Response("ok\n")
return true
case AdminDumpLog:
fmt.Printf("dumping log\n")
log := make([]string, len(el.Log))
for _, e := range el.Log {
log = append(log, e.Serialise())
}
return false, log
cmd.Response(fmt.Sprintf("%v\n", log))
return false
case AdminResetLog:
fmt.Printf("resetting log\n")
el.Log = make([]TimestampedLogEntry, 0)
return false, []string{}
cmd.Response("Log reseted\n")
return false
default:
fmt.Printf("Unknown admin command: %#v\n", cmd)
panic("Unhandled admin command")
Expand All @@ -88,13 +91,10 @@ func (el *EventLoop) Run() {
fmt.Printf("Found admin command\n")
el.LogicalTime.Incr()
el.AddToAdminLog(cmd)
quit, output := el.processAdmin(cmd)
quit := el.processAdmin(cmd)
if quit {
return
}
for _, entry := range output {
el.Admin.Respond(entry)
}
case envelope := <-commands:
fmt.Printf("Found message\n")
me := envelope.Receiver.ToLocal()
Expand Down
5 changes: 1 addition & 4 deletions src/sut/register/executor/executorcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ import (
func main() {

fmt.Printf("Starting up executor\n")
adminI, err := executorEL.NewAdmin("/tmp/executor-admin", "/tmp/executor-admin-response")
if err != nil {
panic(err)
}
adminI := executorEL.NewAdmin("/tmp/executor-admin.sock")
fmt.Printf("Created admin\n")
commandT, err := executorEL.NewCommandTransport("/tmp/executor.sock")
if err != nil {
Expand Down

0 comments on commit 42d5ca6

Please sign in to comment.