Skip to content

Commit

Permalink
Add support for worker name
Browse files Browse the repository at this point in the history
  • Loading branch information
andylibrian committed Dec 20, 2020
1 parent 5d99066 commit cd16084
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 28 deletions.
15 changes: 13 additions & 2 deletions cmd/terjang/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func getCliApp() *cli.App {
srv := server.NewServer()

fmt.Printf("Server is listening on %s:%s\n", host, port)
err := srv.Run()
err := srv.Run(host + ":" + port)

if err != nil {
return err
Expand Down Expand Up @@ -76,13 +76,24 @@ func getCliApp() *cli.App {
},
},
Action: func(c *cli.Context) error {
name := c.String("name")
if name == "" {
hostname, err := os.Hostname()

if err == nil {
name = hostname
} else {
name = "worker"
}
}
host := c.String("host")
port := c.String("port")

w := worker.NewWorker()
w.SetName(name)

fmt.Printf("Connecting to server %s:%s\n", host, port)
w.Run()
w.Run(host + ":" + port)

return nil
},
Expand Down
13 changes: 10 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *Server) GetWorkerService() *WorkerService {
}

// Run listens on the specified port and serve requests.
func (s *Server) Run() error {
func (s *Server) Run(addr string) error {
router, err := s.setupRouter()

if err != nil {
Expand All @@ -44,7 +44,7 @@ func (s *Server) Run() error {
go s.runNotificationLoop()
go s.watchWorkerStateChange()

s.httpServer = &http.Server{Addr: "0.0.0.0:9009", Handler: router}
s.httpServer = &http.Server{Addr: addr, Handler: router}
if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
// TODO: log err
}
Expand Down Expand Up @@ -85,13 +85,20 @@ func (s *Server) setupRouter() (*httprouter.Router, error) {
}

func (s *Server) acceptWorkerConn(responseWriter http.ResponseWriter, req *http.Request, _ httprouter.Params) {
name := ""

names, ok := req.URL.Query()["name"]
if ok && len(names[0]) > 0 {
name = names[0]
}

conn, err := s.upgrader.Upgrade(responseWriter, req, nil)
if err != nil {
// TODO: should respond? should probably log
return
}

s.workerService.AddWorker(conn)
s.workerService.AddWorker(conn, name)

defer s.workerService.RemoveWorker(conn)
defer conn.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func (w *WorkerService) SetMessageHandler(h MessageHandler) {
w.messageHandler = h
}

func (w *WorkerService) AddWorker(conn *websocket.Conn) {
func (w *WorkerService) AddWorker(conn *websocket.Conn, name string) {
w.workersLock.Lock()
defer w.workersLock.Unlock()

w.workers[conn] = &worker{conn: conn}
w.workers[conn] = &worker{conn: conn, Name: name}
}

func (w *WorkerService) RemoveWorker(conn *websocket.Conn) {
Expand Down
9 changes: 7 additions & 2 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

type Worker struct {
name string
conn *websocket.Conn
connWriteLock sync.Mutex
messageHandler MessageHandler
Expand Down Expand Up @@ -45,8 +46,12 @@ func NewWorker() *Worker {
return worker
}

func (w *Worker) Run() {
serverURL := url.URL{Scheme: "ws", Host: "127.0.0.1:9009", Path: "/cluster/join"}
func (w *Worker) SetName(name string) {
w.name = name
}

func (w *Worker) Run(addr string) {
serverURL := url.URL{Scheme: "ws", Host: addr, Path: "/cluster/join", RawQuery: "name=" + w.name}

serverURLStr := serverURL.String()

Expand Down
10 changes: 5 additions & 5 deletions test/integration/exchange_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ func TestWorkerSendMessageToServer(t *testing.T) {
serverMsgHandlerStub := serverMessageHandlerStub{handlerDelegate: defaultServerMsgHandler}
server.GetWorkerService().SetMessageHandler(&serverMsgHandlerStub)

go server.Run()
go server.Run("127.0.0.1:9009")
defer server.Close()

worker := worker.NewWorker()
worker.SetConnectRetryInterval(connectRetryInterval)
go worker.Run()
go worker.Run("127.0.0.1:9009")

<-worker.IsConnectedCh()

Expand All @@ -81,20 +81,20 @@ func TestWorkerSendMessageToServer(t *testing.T) {

func TestServerBroadcastMessagesToWorker(t *testing.T) {
server := server.NewServer()
go server.Run()
go server.Run("127.0.0.1:9009")
defer server.Close()

worker1 := worker.NewWorker()
defaultWorker1MsgHandler := worker1.GetMessageHandler()
worker1MsgHandlerStub := workerMessageHandlerStub{handlerDelegate: defaultWorker1MsgHandler}
worker1.SetMessageHandler(&worker1MsgHandlerStub)
go worker1.Run()
go worker1.Run("127.0.01:9009")

worker2 := worker.NewWorker()
defaultWorker2MsgHandler := worker2.GetMessageHandler()
worker2MsgHandlerStub := workerMessageHandlerStub{handlerDelegate: defaultWorker2MsgHandler}
worker2.SetMessageHandler(&worker2MsgHandlerStub)
go worker2.Run()
go worker2.Run("127.0.0.1:9009")

<-worker1.IsConnectedCh()
<-worker2.IsConnectedCh()
Expand Down
8 changes: 4 additions & 4 deletions test/integration/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ func TestStartLoadTest(t *testing.T) {
go target.listenAndServe(":10080")

server := server.NewServer()
go server.Run()
go server.Run("127.0.0.1:9019")
defer server.Close()

worker := worker.NewWorker()
worker.SetConnectRetryInterval(connectRetryInterval)
go worker.Run()
go worker.Run("127.0.0.1:9019")

<-worker.IsConnectedCh()

Expand Down Expand Up @@ -81,12 +81,12 @@ func TestStopLoadTest(t *testing.T) {
go target.listenAndServe(":10081")

server := server.NewServer()
go server.Run()
go server.Run("127.0.0.1:9019")
defer server.Close()

worker := worker.NewWorker()
worker.SetConnectRetryInterval(connectRetryInterval)
go worker.Run()
go worker.Run("127.0.0.1:9019")

<-worker.IsConnectedCh()

Expand Down
16 changes: 8 additions & 8 deletions test/integration/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type stubNotificationClient struct {
workersInfoMsgs []messages.Envelope
}

func (s *stubNotificationClient) run() {
serverURL := url.URL{Scheme: "ws", Host: "127.0.0.1:9009", Path: "/notifications"}
func (s *stubNotificationClient) run(addr string) {
serverURL := url.URL{Scheme: "ws", Host: addr, Path: "/notifications"}
serverURLStr := serverURL.String()

var conn *websocket.Conn
Expand Down Expand Up @@ -76,11 +76,11 @@ func (s *stubNotificationClient) run() {

func TestServerSendServerInfoNotification(t *testing.T) {
server := server.NewServer()
go server.Run()
go server.Run("127.0.0.1:9029")
defer server.Close()

clientStub := stubNotificationClient{isConnectedCh: make(chan struct{})}
go clientStub.run()
go clientStub.run("127.0.0.1:9029")

<-clientStub.isConnectedCh

Expand All @@ -97,7 +97,7 @@ func TestServerSendServerInfoNotification(t *testing.T) {
assert.Equal(t, "NotStarted", serverInfo.State)

worker := worker.NewWorker()
go worker.Run()
go worker.Run("127.0.0.1:9029")

<-worker.IsConnectedCh()

Expand All @@ -118,14 +118,14 @@ func TestServerUpdateServerInfoNotification(t *testing.T) {
go target.listenAndServe(":10090")

server := server.NewServer()
go server.Run()
go server.Run("127.0.0.1:9039")
defer server.Close()

clientStub := stubNotificationClient{isConnectedCh: make(chan struct{})}
go clientStub.run()
go clientStub.run("127.0.0.1:9039")

worker := worker.NewWorker()
go worker.Run()
go worker.Run("127.0.0.1:9039")

<-clientStub.isConnectedCh
<-worker.IsConnectedCh()
Expand Down
4 changes: 2 additions & 2 deletions test/integration/worker_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ func TestWorkerSendMetricsDuringLoadTest(t *testing.T) {

serverMsgHandlerStub := serverMessageHandlerStub{handlerDelegate: defaultServerMsgHandler}
server.GetWorkerService().SetMessageHandler(&serverMsgHandlerStub)
go server.Run()
go server.Run("127.0.0.1:9049")
defer server.Close()

worker := worker.NewWorker()
worker.SetConnectRetryInterval(connectRetryInterval)
go worker.Run()
go worker.Run("127.0.0.1:9049")

<-worker.IsConnectedCh()

Expand Down

0 comments on commit cd16084

Please sign in to comment.