Skip to content

Commit

Permalink
Add logs to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
andylibrian committed Dec 20, 2020
1 parent e19d9d1 commit 27116ab
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
2 changes: 0 additions & 2 deletions cmd/terjang/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"fmt"
"log"
"os"

Expand Down Expand Up @@ -92,7 +91,6 @@ func getCliApp() *cli.App {
w := worker.NewWorker()
w.SetName(name)

fmt.Printf("Connecting to server %s:%s\n", host, port)
w.Run(host + ":" + port)

return nil
Expand Down
33 changes: 32 additions & 1 deletion pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,25 @@ import (
"github.com/andylibrian/terjang/pkg/messages"
"github.com/gorilla/websocket"
vegeta "github.com/tsenart/vegeta/v12/lib"
"go.uber.org/zap"
)

var logger *zap.SugaredLogger

func init() {
l, err := zap.NewProduction()

if err != nil {
panic("Can not create logger")
}

logger = l.Sugar()
}

func SetLogger(l *zap.SugaredLogger) {
logger = l
}

type Worker struct {
name string
conn *websocket.Conn
Expand Down Expand Up @@ -59,6 +76,8 @@ func (w *Worker) Run(addr string) {
var err error

for i := 0; i < 10; i++ {
logger.Infow("Connecting to server", "address", addr)

conn, _, err = websocket.DefaultDialer.Dial(serverURLStr, nil)

if err == nil {
Expand All @@ -68,6 +87,8 @@ func (w *Worker) Run(addr string) {
time.Sleep(w.connectRetryInterval)
}

logger.Infow("Connected to server", "address", addr)

w.conn = conn
defer conn.Close()
defer close(w.isConnectedCh)
Expand Down Expand Up @@ -95,7 +116,7 @@ func (w *Worker) SetConnectRetryInterval(d time.Duration) {

func (w *Worker) SendMessageToServer(message []byte) {
if w.conn == nil {
// should indicate error
logger.Errorw("Can not send message to server because we are disconnected")
} else {
w.connWriteLock.Lock()
defer w.connWriteLock.Unlock()
Expand All @@ -120,7 +141,10 @@ func (h *defaultMessageHandler) HandleMessage(message []byte) {
var envelope messages.Envelope
err := json.Unmarshal(message, &envelope)

logger.Debugw("Received message from server", "message", string(message))

if err != nil {
logger.Errorw("Failed to unmarshal a message from server", "message", string(message))
return
}

Expand All @@ -129,6 +153,7 @@ func (h *defaultMessageHandler) HandleMessage(message []byte) {
err = json.Unmarshal([]byte(envelope.Data), &req)

if err != nil {
logger.Errorw("Failed to unmarshal a message from server", "message", string(message))
return
}

Expand All @@ -155,9 +180,13 @@ func (h *defaultMessageHandler) HandleMessage(message []byte) {
Body: []byte(req.Body),
})

logger.Infow("Starting load test", "request", &req)

h.worker.resetLoadTest()
go h.worker.startLoadTest(targeter, rate, duration, "terjang")
} else if envelope.Kind == messages.KindStopLoadTestRequest {

logger.Infow("Stopping load test")
h.worker.stopLoadTest()
}
}
Expand All @@ -181,6 +210,8 @@ func (w *Worker) startLoadTest(tr vegeta.Targeter, p vegeta.Pacer, du time.Durat
}

w.sendWorkerInfoToServer()

logger.Infow("Finished load test")
}

func (w *Worker) stopLoadTest() {
Expand Down

0 comments on commit 27116ab

Please sign in to comment.