Skip to content

Commit

Permalink
feat(runtime): Add unix domain socket transport
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-daniel-gustafsson committed Nov 4, 2021
1 parent 93c74a2 commit b9cc0f7
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 14 deletions.
30 changes: 20 additions & 10 deletions src/executor-event-loop/communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package executorEL
import (
"bufio"
"encoding/json"
"fmt"
"net"
"os"
)

Expand All @@ -14,11 +14,6 @@ type CommandTransport struct {
}

func NewCommandTransport(input string) (*CommandTransport, *error) {
err := createPipe(input)
if err != nil {
return nil, err
}

com := make(chan Envelope)
return &CommandTransport{
Started: false,
Expand All @@ -33,18 +28,33 @@ func (ct *CommandTransport) Send(env Envelope) {
panic(err)
}

writePipe(env.Receiver.Address, string(j)+"\n")
c, err := net.Dial("unix", "/tmp/"+env.Receiver.Address+".sock")
if err != nil {
panic(err)
}
defer c.Close()

c.Write(j)
}

func (ct *CommandTransport) findIncoming() {
for {
if err := os.RemoveAll(ct.Incomming); err != nil {
panic(err)
}

file, err := openPipe(ct.Incomming, os.O_RDONLY)
l, err := net.Listen("unix", ct.Incomming)
if err != nil {
panic(err)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
panic(err)
}
buf := bufio.NewScanner(conn)
defer conn.Close()

buf := bufio.NewScanner(file)
for buf.Scan() {
line := buf.Text() // is this correct?
if line == "" {
Expand Down
2 changes: 1 addition & 1 deletion src/executor-event-loop/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewEventLoop(ai *AdminInterface, ct *CommandTransport, executor *Executor)
Log: []TimestampedLogEntry{},
LogicalTime: &lt,
Executor: executor,
SchedulerRef: RemoteRef{"/tmp/scheduler", 0},
SchedulerRef: RemoteRef{"scheduler", 0},
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/runtime-prototype/src/Scheduler/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ main version = do
schedulerPort = 3005
schedulerIncoming = "/tmp/"
fp <- getDbPath
el <- makeEventLoop realClock (makeSeed 0) (NamedPipeCodec schedulerIncoming) (AdminNamedPipe "/tmp/")
el <- makeEventLoop realClock (makeSeed 0) (UnixDomainSocket schedulerIncoming) (AdminNamedPipe "/tmp/")
executorCodec (RealDisk fp) (EventLoopName "scheduler")
now <- getCurrentTime realClock
lref <- spawn el (fakeScheduler executorRef) (initState now (makeSeed 0))
Expand Down
2 changes: 2 additions & 0 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import StuntDouble.Transport.Http
import StuntDouble.Transport.HttpSync
import StuntDouble.Transport.NamedPipe
import qualified StuntDouble.Transport.NamedPipeCodec as NPC
import qualified StuntDouble.Transport.UnixSocket as UDS
import StuntDouble.Transport.Stm

------------------------------------------------------------------------
Expand Down Expand Up @@ -619,6 +620,7 @@ makeEventLoopThreaded threaded threadpool clock seed tk atk codec dk name = do
t <- case tk of
NamedPipe fp -> namedPipeTransport fp name
NamedPipeCodec fp -> NPC.namedPipeTransport fp name codec
UnixDomainSocket fp -> UDS.unixSocketTransport fp name codec
Http port -> httpTransport port
HttpSync -> httpSyncTransport codec
Stm -> stmTransport
Expand Down
2 changes: 1 addition & 1 deletion src/runtime-prototype/src/StuntDouble/Transport.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import StuntDouble.Envelope

------------------------------------------------------------------------

data TransportKind = NamedPipe FilePath | NamedPipeCodec FilePath | Http Int | HttpSync | Stm
data TransportKind = NamedPipe FilePath | NamedPipeCodec FilePath | Http Int | HttpSync | Stm | UnixDomainSocket FilePath

data Transport m = Transport
{ transportSend :: Envelope -> m ()
Expand Down
68 changes: 68 additions & 0 deletions src/runtime-prototype/src/StuntDouble/Transport/NamedPipeCodec.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{-# language OverloadedStrings #-}
module StuntDouble.Transport.NamedPipeCodec where

import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.Lazy.Char8 as BSLC
import Control.Concurrent.Async
import Control.Exception
import System.Directory
import System.FilePath
import System.IO
import System.IO.Error
import System.Posix.Files
import System.Timeout

import StuntDouble.Codec
import StuntDouble.Envelope
import StuntDouble.Message
import StuntDouble.Reference
import StuntDouble.Transport

------------------------------------------------------------------------

namedPipeTransport :: FilePath -> EventLoopName -> Codec ->IO (Transport IO)
namedPipeTransport fp name (Codec encode decode) = do
safeCreateNamedPipe (fp </> getEventLoopName name)
h <- openFile (fp </> getEventLoopName name) ReadWriteMode
putStrLn $ "Listening on: " <> (fp </> getEventLoopName name)
hSetBuffering h LineBuffering
return Transport { transportSend = \e ->
let Encode addr _corrId payload = encode e in
withFile (fp </> addr) WriteMode $ \h' -> do
hSetBuffering h' LineBuffering
BSL.hPutStr h' (payload <> "\n")
, transportReceive = do
m <- hMaybeGetLine h
case m of
Nothing -> return Nothing
Just resp -> do
putStrLn "Found input"
case decode resp of
Left err -> error ("transportReceive: couldn't parse response: " ++ show err)
Right envelope -> return . pure $ envelope
, transportShutdown = cleanUpNamedPipe fp name
}

safeCreateNamedPipe :: FilePath -> IO ()
safeCreateNamedPipe fp =
catchJust
(\e -> if isAlreadyExistsErrorType (ioeGetErrorType e)
then Just ()
else Nothing)
(createNamedPipe fp
(namedPipeMode `unionFileModes`
ownerReadMode `unionFileModes`
ownerWriteMode))
return

cleanUpNamedPipe :: FilePath -> EventLoopName -> IO ()
cleanUpNamedPipe fp name =
catchJust
(\e -> if isDoesNotExistErrorType (ioeGetErrorType e)
then Just ()
else Nothing)
(removeFile (fp </> getEventLoopName name))
return

hMaybeGetLine :: Handle -> IO (Maybe BSL.ByteString)
hMaybeGetLine = timeout 10 . fmap BSLC.pack . hGetLine
87 changes: 87 additions & 0 deletions src/runtime-prototype/src/StuntDouble/Transport/UnixSocket.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
module StuntDouble.Transport.UnixSocket where

import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.Lazy.Char8 as BSLC
import Control.Concurrent (forkFinally)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import qualified Control.Exception as E
import Control.Monad(forever)
import qualified Data.Aeson as Aeson
import System.Directory
import System.FilePath
import System.IO
import System.IO.Error
import System.Posix.Files
import System.Timeout
import Network.Socket
import Network.Socket.ByteString.Lazy (recv, sendAll)

import StuntDouble.Codec
import StuntDouble.Envelope
import StuntDouble.Message
import StuntDouble.Reference
import StuntDouble.Transport

------------------------------------------------------------------------

unixSocketTransport :: FilePath -> EventLoopName -> Codec -> IO (Transport IO)
unixSocketTransport fp name c@(Codec encode _) = withSocketsDo $ do
queue <- newTBQueueIO 128 -- XXX: when/how does this grow?
let udsFP = fp </> getEventLoopName name <> ".sock"
putStrLn $ "Listening on: " <> udsFP
cleanUpUnixDomainSocket udsFP
aServer <- async (runServer udsFP c queue)
-- maybe we need to block until server is up?
return Transport { transportSend = \e ->
let Encode addr _corrId payload = encode e in
transportSend' (fp </> addr <> ".sock") payload
, transportReceive = atomically (tryReadTBQueue queue)
, transportShutdown = do
cancel aServer
cleanUpUnixDomainSocket udsFP
}

uSocket = socket AF_UNIX Stream defaultProtocol

runServer :: FilePath -> Codec -> TBQueue Envelope -> IO ()
runServer fp (Codec _ decode) queue = do
E.bracket open close loop
where
open = E.bracketOnError uSocket close $ \s -> do
setSocketOption s ReuseAddr 1
withFdSocket s setCloseOnExecIfNeeded
putStrLn $ "Binding socket for: " <> fp
bind s (SockAddrUnix fp)
listen s 1024
return s
loop s = forever $ E.bracketOnError (accept s) (close . fst) $ \ (conn, peer) -> do
forkFinally (server conn) (const $ gracefulClose conn 5000)
server conn = do
msg <- recv conn 1024
case decode msg of
Left err -> error err
Right envelope -> do
atomically $ writeTBQueue queue envelope
server conn

-- we should have open connections?
transportSend' :: FilePath -> BSL.ByteString -> IO ()
transportSend' addr payload = do
withSocketsDo $ E.bracket open close client
where
client c = do
sendAll c payload
open = E.bracketOnError uSocket close $ \s -> do
connect s (SockAddrUnix addr)
return s

cleanUpUnixDomainSocket :: FilePath -> IO ()
cleanUpUnixDomainSocket fp =
catchJust
(\e -> if isDoesNotExistErrorType (ioeGetErrorType e)
then Just ()
else Nothing)
(removeFile fp)
return
2 changes: 2 additions & 0 deletions src/runtime-prototype/stunt-double.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ library
StuntDouble.Transport.NamedPipe
StuntDouble.Transport.NamedPipeCodec
StuntDouble.Transport.Stm
StuntDouble.Transport.UnixSocket

-- GHC boot library dependencies:
-- (https://gitlab.haskell.org/ghc/ghc/-/blob/master/packages)
Expand All @@ -80,6 +81,7 @@ library
, heaps
, http-client
, http-types
, network
, primitive
, random
, sqlite-simple
Expand Down
2 changes: 1 addition & 1 deletion src/sut/register/executor/executorcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func main() {
panic(err)
}
fmt.Printf("Created admin\n")
commandT, err := executorEL.NewCommandTransport("/tmp/executor")
commandT, err := executorEL.NewCommandTransport("/tmp/executor.sock")
if err != nil {
panic(err)
}
Expand Down

0 comments on commit b9cc0f7

Please sign in to comment.