Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests scenarios for 2 users #72

Merged
merged 4 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion conn/conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package conn

import (
"fmt"
"net"
"os"
)
Expand All @@ -11,7 +12,12 @@ type conn struct {
}

func newConn(c net.Conn) (*conn, error) {
file, err := c.(*net.TCPConn).File()
f, ok := c.(*net.TCPConn)
if !ok {
return nil, fmt.Errorf("conn can't be converted to *net.TCPConn")
}

file, err := f.File()
if err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions conn/creators.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package conn

import (
"fmt"
"github.com/pkg/errors"
"github/timtimjnvr/chat/crdt"
"log"
"net"
Expand Down Expand Up @@ -44,10 +43,14 @@ func CreateConnections(wg *sync.WaitGroup, isReady *sync.Cond, myInfos *crdt.Nod
go Connect(&wgInitNodeConnections, myInfos, incomingConnectionRequests, newConnections, shutdown)

defer func() {
if r := recover(); r != nil {
log.Println("Recovered from panic:", r)
}

wgInitNodeConnections.Wait()
close(newConnections)
isReady.Signal()
wgClosure.Wait()
wgInitNodeConnections.Wait()
wg.Done()
}()

Expand All @@ -63,12 +66,10 @@ func CreateConnections(wg *sync.WaitGroup, isReady *sync.Cond, myInfos *crdt.Nod
for {
// extracts the first connection on the listener queue
c, err = ln.Accept()
if errors.Is(err, net.ErrClosed) {
if err != nil {
fmt.Println("[ERROR] ", err.Error())
return
}

if err != nil {
log.Fatal("[ERROR]", err)
}

newConnections <- c
Expand All @@ -77,6 +78,9 @@ func CreateConnections(wg *sync.WaitGroup, isReady *sync.Cond, myInfos *crdt.Nod

func Connect(wg *sync.WaitGroup, myInfos *crdt.NodeInfos, incomingConnectionRequest <-chan ConnectionRequest, newConnections chan<- net.Conn, shutdown <-chan struct{}) {
defer func() {
if r := recover(); r != nil {
fmt.Println("[ERROR] ", r)
}
wg.Done()
}()

Expand All @@ -86,7 +90,6 @@ func Connect(wg *sync.WaitGroup, myInfos *crdt.NodeInfos, incomingConnectionRequ
return

case connectionRequest := <-incomingConnectionRequest:
//args := connectionRequest.GetArgs()
var (
addr = connectionRequest.targetedAddress
chatRoom = connectionRequest.chatRoom
Expand Down
63 changes: 63 additions & 0 deletions lib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package main

import (
"fmt"
"github/timtimjnvr/chat/conn"
"github/timtimjnvr/chat/crdt"
"github/timtimjnvr/chat/orchestrator"
"net"
"os"
"sync"
)

func start(addr string, port string, name string, stdin *os.File, sigc chan os.Signal) {
var (
myInfos = crdt.NewNodeInfos(addr, port, name)
shutdown = make(chan struct{})
connectionRequests = make(chan conn.ConnectionRequest)
newConnections = make(chan net.Conn)
toSend = make(chan *crdt.Operation)
toExecute = make(chan *crdt.Operation)

wgListen = sync.WaitGroup{}
wgHandleChats = sync.WaitGroup{}
wgHandleStdin = sync.WaitGroup{}
lock = sync.Mutex{}
isReady = sync.NewCond(&lock)

orch = orchestrator.NewOrchestrator(myInfos)
nodeHandler = conn.NewNodeHandler(shutdown)
)

defer func() {
wgHandleStdin.Wait()
wgHandleChats.Wait()
wgListen.Wait()
nodeHandler.Wg.Wait()
fmt.Println("[INFO] program shutdown")
}()

// create connections : tcp connect & listen for incoming connections
wgListen.Add(1)
isReady.L.Lock()
go conn.CreateConnections(&wgListen, isReady, myInfos, connectionRequests, newConnections, shutdown)
isReady.Wait()

// handle created connections until closure
nodeHandler.Wg.Add(1)
go nodeHandler.Start(newConnections, toSend, toExecute)
defer nodeHandler.Wg.Wait()

// maintain chat infos by executing and propagating operations
wgHandleChats.Add(1)
go orch.HandleChats(&wgHandleChats, toExecute, toSend, shutdown)

// create operations from stdin input
wgHandleStdin.Add(1)
go orch.HandleStdin(&wgHandleStdin, stdin, toExecute, connectionRequests, shutdown)

select {
case <-sigc:
close(shutdown)
}
}
134 changes: 134 additions & 0 deletions lib_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"bytes"
"fmt"
"github.com/stretchr/testify/assert"
"io"
"math/rand"
"os"
"strings"
"sync"
"syscall"
"testing"
"time"
)

func TestTwoUsers(t *testing.T) {
// keep backup of the real stdout
oldStdout := os.Stdout
rStdout, wStdout, _ := os.Pipe()
os.Stdout = wStdout

// get stdout content in the background
var (
stop = make(chan struct{})
outC = make(chan string, 1000)
wg = &sync.WaitGroup{}
)

go func() {
defer close(outC)
for {
select {
case <-stop:
return

default:
var buf = bytes.Buffer{}
_, err := io.Copy(&buf, rStdout)
if err != nil {
return
}
outC <- buf.String()
}
}
}()

var expectedMessages = []string{
"[INFO] type a Command :",
"[INFO] type a Command :",
"[INFO] type a Command :",
"[INFO] user2 joined chat",
"[INFO] you joined a new chat : user1",
"[INFO] type a Command :",
"user2 (2024-05-19T16:45:02+02:00): hey man",
"user2 (2024-05-19T16:45:02+02:00): hey man",
"[INFO] program shutdown",
"[INFO] program shutdown",
}

// Set up test resources and start test scenarios
rand.Seed(time.Now().UnixNano())
var (
port1 = rand.Intn(65533-49152) + 49152
port2 = port1 + 1
sigC1 = make(chan os.Signal, 1)
sigC2 = make(chan os.Signal, 1)
)

stdinUser1, _, err := os.Pipe()
if err != nil {
t.Fatal("os.Pipe err : failed to set up user 1", err)
return
}

stdinUser2, w2, err := os.Pipe()
if err != nil {
t.Fatal("os.Pipe err : failed to set up user 2", err)
return
}

wg.Add(1)
go func() {
defer wg.Done()

start("", fmt.Sprintf("%d", port1), "user1", stdinUser1, sigC1)
}()

wg.Add(1)
go func() {
defer wg.Done()
start("", fmt.Sprintf("%d", port2), "user2", stdinUser2, sigC2)
}()

_, err = w2.Write([]byte(fmt.Sprintf("/join localhost %d user1\n", port1)))
if err != nil {
t.Fatal("w2.Write err : failed to write on user2 stdin", err)
return
}

<-time.Tick(1 * time.Second)

_, err = w2.Write([]byte("/msg hey man"))
if err != nil {
t.Fatal("w2.Write err : failed to write on user2 stdin", err)
return
}

<-time.Tick(5 * time.Second)

// stop simulation
sigC1 <- syscall.SIGINT
sigC2 <- syscall.SIGINT

// stop reading redirected stdout
// wait for simulation to be done
close(stop)
wg.Wait()

// restoring stdout for printing test results
wStdout.Close()
os.Stdout = oldStdout

wg.Add(1)
go func() {
defer wg.Done()
index := 0
for m := range outC {
messages := strings.Split(m, "\n")
assert.Equal(t, expectedMessages[index], messages[index])
}
}()
wg.Wait()
}
75 changes: 7 additions & 68 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,29 @@
package main

import (
"fmt"
"github/timtimjnvr/chat/conn"
"github/timtimjnvr/chat/crdt"
"github/timtimjnvr/chat/orchestrator"
"net"

"flag"
"os"
"os/signal"
"sync"
"syscall"
)

type (
currentChat struct {
crdt.Chat
rw *sync.RWMutex
}
)

func main() {
// Program arguments
var (
myPortPtr = flag.String("p", "8080", "port number used to accept conn")
myAddrPtr = flag.String("a", "", "address used to accept conn")
myNamePtr = flag.String("u", "tim", "nickname used in chats")
)
flag.Parse()

var (
myInfos = crdt.NewNodeInfos(*myAddrPtr, *myPortPtr, *myNamePtr)
myPortPtr = flag.String("p", "8080", "port number used to accept connections")
myAddrPtr = flag.String("a", "", "address used to accept connections")
myNamePtr = flag.String("u", "tim", "nickname used in all chat")

sigc = make(chan os.Signal, 1)
shutdown = make(chan struct{})
connectionRequests = make(chan conn.ConnectionRequest)
newConnections = make(chan net.Conn)
toSend = make(chan *crdt.Operation)
toExecute = make(chan *crdt.Operation)

wgListen = sync.WaitGroup{}
wgHandleChats = sync.WaitGroup{}
wgHandleStdin = sync.WaitGroup{}
lock = sync.Mutex{}
isReady = sync.NewCond(&lock)

orch = orchestrator.NewOrchestrator(myInfos)
nodeHandler = conn.NewNodeHandler(shutdown)
sigc = make(chan os.Signal, 1)
)

defer func() {
wgHandleStdin.Wait()
wgHandleChats.Wait()
wgListen.Wait()
nodeHandler.Wg.Wait()
fmt.Println("[INFO] program shutdown")
}()
flag.Parse()

signal.Notify(sigc,
syscall.SIGUSR1, // only used for interruption in testing
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

// create connections : tcp connect & listen for incoming connections
wgListen.Add(1)
isReady.L.Lock()
go conn.CreateConnections(&wgListen, isReady, myInfos, connectionRequests, newConnections, shutdown)
isReady.Wait()

// handle created connections until closure
nodeHandler.Wg.Add(1)
go nodeHandler.Start(newConnections, toSend, toExecute)
defer nodeHandler.Wg.Wait()

// maintain chat infos by executing and propagating operations
wgHandleChats.Add(1)
go orch.HandleChats(&wgHandleChats, toExecute, toSend, shutdown)

// create operations from stdin input
wgHandleStdin.Add(1)
go orch.HandleStdin(&wgHandleStdin, toExecute, connectionRequests, shutdown)

select {
case <-sigc:
close(shutdown)
}
start(*myAddrPtr, *myPortPtr, *myNamePtr, os.Stdin, sigc)
}
Loading
Loading