Skip to content

Commit

Permalink
Fix/message sent multiple times (#74)
Browse files Browse the repository at this point in the history
* changes(orchestrator): remove useless file

* Debug Mode for Node handler

* Debug mode for orchestrator & do not resent message to sender
  • Loading branch information
timtimjnvr committed May 30, 2024
1 parent 273785b commit 0507295
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 13 deletions.
23 changes: 22 additions & 1 deletion conn/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type (
}

NodeHandler struct {
nodes map[slot]*node
debugMode bool
nodes map[slot]*node

Wg *sync.WaitGroup
Shutdown chan struct{}
Expand Down Expand Up @@ -122,6 +123,10 @@ func NewNodeHandler(shutdown chan struct{}) *NodeHandler {
}
}

func (d *NodeHandler) SetDebugMode() {
d.debugMode = true
}

func (d *NodeHandler) Start(newConnections <-chan net.Conn, toSend <-chan *crdt.Operation, toExecute chan<- *crdt.Operation) {
var (
done = make(chan slot)
Expand All @@ -143,6 +148,10 @@ func (d *NodeHandler) Start(newConnections <-chan net.Conn, toSend <-chan *crdt.

case c := <-newConnections:
s := d.getNextSlot()
if d.debugMode {
fmt.Println("[DEBUG] ", "New connection", s)
}

n, err := newNode(c, d.getNextSlot(), outputNodes)
if err != nil {
log.Println("[ERROR] ", err)
Expand All @@ -154,12 +163,20 @@ func (d *NodeHandler) Start(newConnections <-chan net.Conn, toSend <-chan *crdt.
d.nodes[s] = n

case s := <-done:
if d.debugMode {
fmt.Println("[DEBUG] node Handler", "Node done", s)
}

quitOperation := crdt.NewOperation(crdt.KillNode, "", nil)
quitOperation.Slot = uint8(s)
toExecute <- quitOperation
d.nodes[s] = nil

case operation := <-toSend:
if d.debugMode {
fmt.Println("[DEBUG] node Handler", crdt.GetOperationName(operation.Typology), "operation to send")
}

// Set slot
s := slot(operation.Slot)
if n, exist := d.nodes[s]; exist {
Expand All @@ -176,6 +193,10 @@ func (d *NodeHandler) Start(newConnections <-chan net.Conn, toSend <-chan *crdt.
continue
}

if d.debugMode {
fmt.Println("[DEBUG] node Handler", crdt.GetOperationName(operation.Typology), "operation received")
}

// need to create connection and set slot in operation
if operation.Typology == crdt.AddNode {
newNodeInfos, ok := operation.Data.(*crdt.NodeInfos)
Expand Down
7 changes: 6 additions & 1 deletion lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"sync"
)

func start(addr string, port string, name string, stdin *os.File, sigc chan os.Signal) {
func start(addr string, port string, name string, stdin *os.File, sigc chan os.Signal, debugModePtr bool) {
var (
myInfos = crdt.NewNodeInfos(addr, port, name)
shutdown = make(chan struct{})
Expand All @@ -29,6 +29,11 @@ func start(addr string, port string, name string, stdin *os.File, sigc chan os.S
nodeHandler = conn.NewNodeHandler(shutdown)
)

if debugModePtr {
orch.SetDebugMode()
nodeHandler.SetDebugMode()
}

defer func() {
wgHandleStdin.Wait()
wgHandleChats.Wait()
Expand Down
4 changes: 2 additions & 2 deletions lib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ func TestTwoUsers(t *testing.T) {
go func() {
defer wg.Done()

start("", fmt.Sprintf("%d", port1), "user1", stdinUser1, sigC1)
start("", fmt.Sprintf("%d", port1), "user1", stdinUser1, sigC1, false)
}()

wg.Add(1)
go func() {
defer wg.Done()
start("", fmt.Sprintf("%d", port2), "user2", stdinUser2, sigC2)
start("", fmt.Sprintf("%d", port2), "user2", stdinUser2, sigC2, false)
}()

_, err = w2.Write([]byte(fmt.Sprintf("/join localhost %d user1\n", port1)))
Expand Down
9 changes: 5 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (

func main() {
var (
myPortPtr = flag.String("p", "8080", "port number used to accept connections")
myAddrPtr = flag.String("a", "", "address used to accept connections")
myNamePtr = flag.String("u", "tim", "nickname used in all chat")
myPortPtr = flag.String("p", "8080", "port number used to accept connections")
myAddrPtr = flag.String("a", "", "address used to accept connections")
myNamePtr = flag.String("u", "tim", "nickname used in all chat")
debugModePtr = flag.Bool("d", false, "Enable debub mode")

sigc = make(chan os.Signal, 1)
)
Expand All @@ -25,5 +26,5 @@ func main() {
syscall.SIGTERM,
syscall.SIGQUIT)

start(*myAddrPtr, *myPortPtr, *myNamePtr, os.Stdin, sigc)
start(*myAddrPtr, *myPortPtr, *myNamePtr, os.Stdin, sigc, *debugModePtr)
}
38 changes: 34 additions & 4 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type (
Orchestrator struct {
*sync.RWMutex
debugMode bool
myInfos *crdt.NodeInfos
currenChatID uuid.UUID
storage *storage.Storage
Expand Down Expand Up @@ -52,6 +53,10 @@ func NewOrchestrator(myInfos *crdt.NodeInfos) *Orchestrator {
return o
}

func (o *Orchestrator) SetDebugMode() {
o.debugMode = true
}

// HandleChats maintains chat infos consistency by executing and propagating operations received
// from stdin or TCP connections through the channel toExecute
func (o *Orchestrator) HandleChats(wg *sync.WaitGroup, toExecute chan *crdt.Operation, toSend chan<- *crdt.Operation, shutdown <-chan struct{}) {
Expand All @@ -62,9 +67,16 @@ func (o *Orchestrator) HandleChats(wg *sync.WaitGroup, toExecute chan *crdt.Oper
for {
select {
case <-shutdown:
if o.debugMode {
fmt.Println("[DEBUG] orch", "shutting down")
}
return

case op := <-toExecute:
if o.debugMode {
fmt.Println("[DEBUG] orch", crdt.GetOperationName(op.Typology), "operation to execute")
}

switch op.Typology {
case crdt.JoinChatByName:
chatID, err := o.storage.GetChatID(op.TargetedChat)
Expand Down Expand Up @@ -181,18 +193,21 @@ func (o *Orchestrator) HandleChats(wg *sync.WaitGroup, toExecute chan *crdt.Oper
continue
}

// No error so we effectively got a new message
fmt.Printf("%s (%s): %s", newMessage.Sender, newMessage.Date, newMessage.Content)

slots, err := o.storage.GetSlots(chatID)
if err != nil {
fmt.Printf(logOpperationErrFormat, crdt.GetOperationName(op.Typology), err)
continue
}

for _, s := range slots {
copied := op.Copy()
copied.Slot = s
toSend <- copied
// Send message to all slots except the sender
if op.Slot != s {
copied := op.Copy()
copied.Slot = s
toSend <- copied
}
}

case crdt.RemoveNode:
Expand Down Expand Up @@ -283,6 +298,9 @@ func (o *Orchestrator) HandleStdin(wg *sync.WaitGroup, osStdin *os.File, toExecu
)

defer func() {
if o.debugMode {
fmt.Println("[DEBUG] orch : defering HandleStdin")
}
close(stopReading)
wgReadStdin.Wait()
wg.Done()
Expand All @@ -296,9 +314,21 @@ func (o *Orchestrator) HandleStdin(wg *sync.WaitGroup, osStdin *os.File, toExecu

select {
case <-shutdown:
if o.debugMode {
fmt.Println("[DEBUG] orch : shuting down HandleStdin")
}
return

case <-isDone:
if o.debugMode {
fmt.Println("[DEBUG] orch : stding reader is done")
}
return

case line := <-stdinChann:
if o.debugMode {
fmt.Println("[DEBUG] orch : got new line from stdin")
}
cmd, err := parsestdin.NewCommand(string(line))
if err != nil {
fmt.Printf(logErrFormat, err)
Expand Down
1 change: 0 additions & 1 deletion orchestrator/orchestrator_test.go

This file was deleted.

0 comments on commit 0507295

Please sign in to comment.