From cd16084b424a40337098401701ac5bfaccdfc3dd Mon Sep 17 00:00:00 2001 From: Andy Librian Date: Sun, 20 Dec 2020 08:47:32 +0700 Subject: [PATCH] Add support for worker name --- cmd/terjang/main.go | 15 +++++++++++++-- pkg/server/server.go | 13 ++++++++++--- pkg/server/worker_service.go | 4 ++-- pkg/worker/worker.go | 9 +++++++-- test/integration/exchange_message_test.go | 10 +++++----- test/integration/load_test.go | 8 ++++---- test/integration/notification_test.go | 16 ++++++++-------- test/integration/worker_metrics_test.go | 4 ++-- 8 files changed, 51 insertions(+), 28 deletions(-) diff --git a/cmd/terjang/main.go b/cmd/terjang/main.go index 8d5fc92..0a9ddfa 100644 --- a/cmd/terjang/main.go +++ b/cmd/terjang/main.go @@ -46,7 +46,7 @@ func getCliApp() *cli.App { srv := server.NewServer() fmt.Printf("Server is listening on %s:%s\n", host, port) - err := srv.Run() + err := srv.Run(host + ":" + port) if err != nil { return err @@ -76,13 +76,24 @@ func getCliApp() *cli.App { }, }, Action: func(c *cli.Context) error { + name := c.String("name") + if name == "" { + hostname, err := os.Hostname() + + if err == nil { + name = hostname + } else { + name = "worker" + } + } host := c.String("host") port := c.String("port") w := worker.NewWorker() + w.SetName(name) fmt.Printf("Connecting to server %s:%s\n", host, port) - w.Run() + w.Run(host + ":" + port) return nil }, diff --git a/pkg/server/server.go b/pkg/server/server.go index 9c807a0..c09a71b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -34,7 +34,7 @@ func (s *Server) GetWorkerService() *WorkerService { } // Run listens on the specified port and serve requests. -func (s *Server) Run() error { +func (s *Server) Run(addr string) error { router, err := s.setupRouter() if err != nil { @@ -44,7 +44,7 @@ func (s *Server) Run() error { go s.runNotificationLoop() go s.watchWorkerStateChange() - s.httpServer = &http.Server{Addr: "0.0.0.0:9009", Handler: router} + s.httpServer = &http.Server{Addr: addr, Handler: router} if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { // TODO: log err } @@ -85,13 +85,20 @@ func (s *Server) setupRouter() (*httprouter.Router, error) { } func (s *Server) acceptWorkerConn(responseWriter http.ResponseWriter, req *http.Request, _ httprouter.Params) { + name := "" + + names, ok := req.URL.Query()["name"] + if ok && len(names[0]) > 0 { + name = names[0] + } + conn, err := s.upgrader.Upgrade(responseWriter, req, nil) if err != nil { // TODO: should respond? should probably log return } - s.workerService.AddWorker(conn) + s.workerService.AddWorker(conn, name) defer s.workerService.RemoveWorker(conn) defer conn.Close() diff --git a/pkg/server/worker_service.go b/pkg/server/worker_service.go index 321179c..7ebc00b 100644 --- a/pkg/server/worker_service.go +++ b/pkg/server/worker_service.go @@ -50,11 +50,11 @@ func (w *WorkerService) SetMessageHandler(h MessageHandler) { w.messageHandler = h } -func (w *WorkerService) AddWorker(conn *websocket.Conn) { +func (w *WorkerService) AddWorker(conn *websocket.Conn, name string) { w.workersLock.Lock() defer w.workersLock.Unlock() - w.workers[conn] = &worker{conn: conn} + w.workers[conn] = &worker{conn: conn, Name: name} } func (w *WorkerService) RemoveWorker(conn *websocket.Conn) { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6c5790c..c6436ea 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -14,6 +14,7 @@ import ( ) type Worker struct { + name string conn *websocket.Conn connWriteLock sync.Mutex messageHandler MessageHandler @@ -45,8 +46,12 @@ func NewWorker() *Worker { return worker } -func (w *Worker) Run() { - serverURL := url.URL{Scheme: "ws", Host: "127.0.0.1:9009", Path: "/cluster/join"} +func (w *Worker) SetName(name string) { + w.name = name +} + +func (w *Worker) Run(addr string) { + serverURL := url.URL{Scheme: "ws", Host: addr, Path: "/cluster/join", RawQuery: "name=" + w.name} serverURLStr := serverURL.String() diff --git a/test/integration/exchange_message_test.go b/test/integration/exchange_message_test.go index 93168f4..cdc1f7b 100644 --- a/test/integration/exchange_message_test.go +++ b/test/integration/exchange_message_test.go @@ -64,12 +64,12 @@ func TestWorkerSendMessageToServer(t *testing.T) { serverMsgHandlerStub := serverMessageHandlerStub{handlerDelegate: defaultServerMsgHandler} server.GetWorkerService().SetMessageHandler(&serverMsgHandlerStub) - go server.Run() + go server.Run("127.0.0.1:9009") defer server.Close() worker := worker.NewWorker() worker.SetConnectRetryInterval(connectRetryInterval) - go worker.Run() + go worker.Run("127.0.0.1:9009") <-worker.IsConnectedCh() @@ -81,20 +81,20 @@ func TestWorkerSendMessageToServer(t *testing.T) { func TestServerBroadcastMessagesToWorker(t *testing.T) { server := server.NewServer() - go server.Run() + go server.Run("127.0.0.1:9009") defer server.Close() worker1 := worker.NewWorker() defaultWorker1MsgHandler := worker1.GetMessageHandler() worker1MsgHandlerStub := workerMessageHandlerStub{handlerDelegate: defaultWorker1MsgHandler} worker1.SetMessageHandler(&worker1MsgHandlerStub) - go worker1.Run() + go worker1.Run("127.0.01:9009") worker2 := worker.NewWorker() defaultWorker2MsgHandler := worker2.GetMessageHandler() worker2MsgHandlerStub := workerMessageHandlerStub{handlerDelegate: defaultWorker2MsgHandler} worker2.SetMessageHandler(&worker2MsgHandlerStub) - go worker2.Run() + go worker2.Run("127.0.0.1:9009") <-worker1.IsConnectedCh() <-worker2.IsConnectedCh() diff --git a/test/integration/load_test.go b/test/integration/load_test.go index 4e206fa..2931d2b 100644 --- a/test/integration/load_test.go +++ b/test/integration/load_test.go @@ -41,12 +41,12 @@ func TestStartLoadTest(t *testing.T) { go target.listenAndServe(":10080") server := server.NewServer() - go server.Run() + go server.Run("127.0.0.1:9019") defer server.Close() worker := worker.NewWorker() worker.SetConnectRetryInterval(connectRetryInterval) - go worker.Run() + go worker.Run("127.0.0.1:9019") <-worker.IsConnectedCh() @@ -81,12 +81,12 @@ func TestStopLoadTest(t *testing.T) { go target.listenAndServe(":10081") server := server.NewServer() - go server.Run() + go server.Run("127.0.0.1:9019") defer server.Close() worker := worker.NewWorker() worker.SetConnectRetryInterval(connectRetryInterval) - go worker.Run() + go worker.Run("127.0.0.1:9019") <-worker.IsConnectedCh() diff --git a/test/integration/notification_test.go b/test/integration/notification_test.go index cdfd5bc..6fa59cb 100644 --- a/test/integration/notification_test.go +++ b/test/integration/notification_test.go @@ -27,8 +27,8 @@ type stubNotificationClient struct { workersInfoMsgs []messages.Envelope } -func (s *stubNotificationClient) run() { - serverURL := url.URL{Scheme: "ws", Host: "127.0.0.1:9009", Path: "/notifications"} +func (s *stubNotificationClient) run(addr string) { + serverURL := url.URL{Scheme: "ws", Host: addr, Path: "/notifications"} serverURLStr := serverURL.String() var conn *websocket.Conn @@ -76,11 +76,11 @@ func (s *stubNotificationClient) run() { func TestServerSendServerInfoNotification(t *testing.T) { server := server.NewServer() - go server.Run() + go server.Run("127.0.0.1:9029") defer server.Close() clientStub := stubNotificationClient{isConnectedCh: make(chan struct{})} - go clientStub.run() + go clientStub.run("127.0.0.1:9029") <-clientStub.isConnectedCh @@ -97,7 +97,7 @@ func TestServerSendServerInfoNotification(t *testing.T) { assert.Equal(t, "NotStarted", serverInfo.State) worker := worker.NewWorker() - go worker.Run() + go worker.Run("127.0.0.1:9029") <-worker.IsConnectedCh() @@ -118,14 +118,14 @@ func TestServerUpdateServerInfoNotification(t *testing.T) { go target.listenAndServe(":10090") server := server.NewServer() - go server.Run() + go server.Run("127.0.0.1:9039") defer server.Close() clientStub := stubNotificationClient{isConnectedCh: make(chan struct{})} - go clientStub.run() + go clientStub.run("127.0.0.1:9039") worker := worker.NewWorker() - go worker.Run() + go worker.Run("127.0.0.1:9039") <-clientStub.isConnectedCh <-worker.IsConnectedCh() diff --git a/test/integration/worker_metrics_test.go b/test/integration/worker_metrics_test.go index ba15964..20c71e7 100644 --- a/test/integration/worker_metrics_test.go +++ b/test/integration/worker_metrics_test.go @@ -20,12 +20,12 @@ func TestWorkerSendMetricsDuringLoadTest(t *testing.T) { serverMsgHandlerStub := serverMessageHandlerStub{handlerDelegate: defaultServerMsgHandler} server.GetWorkerService().SetMessageHandler(&serverMsgHandlerStub) - go server.Run() + go server.Run("127.0.0.1:9049") defer server.Close() worker := worker.NewWorker() worker.SetConnectRetryInterval(connectRetryInterval) - go worker.Run() + go worker.Run("127.0.0.1:9049") <-worker.IsConnectedCh()