Skip to content

Commit

Permalink
logging and semaphore (device TODO)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcia Piccione authored and Marcia Piccione committed Feb 16, 2023
1 parent 8473a72 commit dc6e1fb
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 127 deletions.
21 changes: 9 additions & 12 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package kratos
import (
"sync"

"github.com/go-kit/kit/log"
"github.com/goph/emperror"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
)

// Client is what function calls we expose to the user of kratos
Expand All @@ -31,7 +29,7 @@ type client struct {
decoderSender decoderSender
connection websocketConnection
headerInfo *clientHeader
logger log.Logger
logger *zap.Logger
done chan struct{}
wg sync.WaitGroup
pingConfig PingConfig
Expand Down Expand Up @@ -72,7 +70,7 @@ func (c *client) Send(message *wrp.Message) {
func (c *client) Close() error {
var connectionErr error
c.once.Do(func() {
logging.Info(c.logger).Log(logging.MessageKey(), "Closing client...")
c.logger.Info("Closing client...")
close(c.done)
c.wg.Wait()
c.decoderSender.Close()
Expand All @@ -83,33 +81,32 @@ func (c *client) Close() error {
// if err != nil {
// return emperror.Wrap(err, "Failed to close connection")
// }
logging.Info(c.logger).Log(logging.MessageKey(), "Client Closed")
c.logger.Info("Client Closed")
})
return connectionErr
}

// going to be used to access the HandleMessage() function
func (c *client) read() {
defer c.wg.Done()
logging.Info(c.logger).Log(logging.MessageKey(), "Watching socket for messages.")
c.logger.Info("Watching socket for messages.")

for {
select {
case <-c.done:
logging.Info(c.logger).Log(logging.MessageKey(), "Stopped reading from socket.")
c.logger.Info("Stopped reading from socket.")
return
default:
logging.Debug(c.logger).Log(logging.MessageKey(), "Reading message...")
c.logger.Info("Reading message...")

_, serverMessage, err := c.connection.ReadMessage()
if err != nil {
logging.Error(c.logger, emperror.Context(err)...).
Log(logging.MessageKey(), "Failed to read message. Exiting out of read loop.", logging.ErrorKey(), err.Error())
c.logger.Error("Failed to read message. Exiting out of read loop.", zap.Error(err))
return
}
c.decoderSender.DecodeAndSend(serverMessage)

logging.Debug(c.logger).Log(logging.MessageKey(), "Message sent to be decoded")
c.logger.Debug("Message sent to be decoded")
}
}
}
14 changes: 7 additions & 7 deletions clientConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/gorilla/websocket"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/webpa-common/device"
"github.com/xmidt-org/webpa-common/logging"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -38,7 +38,7 @@ type ClientConfig struct {
HandleMsgQueue QueueConfig
Handlers []HandlerConfig
HandlePingMiss HandlePingMiss
ClientLogger log.Logger
ClientLogger *zap.Logger
PingConfig PingConfig
}

Expand Down Expand Up @@ -83,11 +83,11 @@ func NewClient(config ClientConfig) (Client, error) {
// with the knowledge that `:` will be found in the string twice
connectionURL = connectionURL[len("ws://"):strings.LastIndex(connectionURL, ":")]

var logger log.Logger
var logger *zap.Logger
if config.ClientLogger != nil {
logger = config.ClientLogger
} else {
logger = logging.DefaultLogger()
logger = sallust.Default()
}
if config.PingConfig.MaxPingMiss <= 0 {
config.PingConfig.MaxPingMiss = 1
Expand Down Expand Up @@ -115,7 +115,7 @@ func NewClient(config ClientConfig) (Client, error) {

newClient.registry, err = NewHandlerRegistry(config.Handlers)
if err != nil {
logging.Warn(newClient.logger).Log(logging.MessageKey(), "failed to initialize all handlers for registry", logging.ErrorKey(), err.Error())
logger.Warn("failed to initialize all handlers for registry", zap.Error(err))
}

downstreamSender := NewDownstreamSender(newClient.Send, config.HandleMsgQueue.MaxWorkers, config.HandleMsgQueue.Size, logger)
Expand Down Expand Up @@ -154,7 +154,7 @@ func createConnection(headerInfo *clientHeader, httpURL string) (connection *web
// creates a new client connection given the URL string
connection, resp, err := websocket.DefaultDialer.Dial(wsURL, headers)

for ;err == websocket.ErrBadHandshake && resp != nil && resp.StatusCode == http.StatusTemporaryRedirect; {
for err == websocket.ErrBadHandshake && resp != nil && resp.StatusCode == http.StatusTemporaryRedirect {
// Get url to which we are redirected and reconfigure it
wsURL = strings.Replace(resp.Header.Get("Location"), "http", "ws", 1)

Expand Down
16 changes: 8 additions & 8 deletions clientPing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package kratos
import (
"time"

"github.com/xmidt-org/webpa-common/logging"
"go.uber.org/zap"
)

// HandlePingMiss is a function called when we run into situations where we're
Expand All @@ -18,36 +18,36 @@ func (c *client) checkPing(inTimer *time.Timer, pinged <-chan string) {
defer c.wg.Done()
// pingMiss indicates that a ping has been missed.
pingMiss := false
logging.Info(c.logger).Log(logging.MessageKey(), "Watching socket for pings")
c.logger.Info("Watching socket for pings")
count := 0
// as long as we're getting pings, we continue to loop.
for !pingMiss {
select {
// if we get a done signal, we leave the function.
case <-c.done:
logging.Info(c.logger).Log(logging.MessageKey(), "Stopped waiting for pings")
c.logger.Info("Stopped waiting for pings")
return
// if we get a ping, make sure to reset the timer until the next ping.
case <-pinged:
count = 0
if !inTimer.Stop() {
<-inTimer.C
}
logging.Debug(c.logger).Log(logging.MessageKey(), "Received a ping. Resetting ping timer")
c.logger.Debug("Received a ping. Resetting ping timer")
inTimer.Reset(c.pingConfig.PingWait)

// if we hit the timer, we've missed a ping.
case <-inTimer.C:
logging.Error(c.logger).Log(logging.MessageKey(), "Ping miss, calling handler", "count", count)
c.logger.Error("Ping miss, calling handler", zap.Int("count", count))
err := c.handlePingMiss()
if err != nil {
logging.Info(c.logger).Log(logging.MessageKey(), "Error handling ping miss:", logging.ErrorKey(), err)
c.logger.Error("Error handling ping miss:", zap.Error(err))
}
if count >= c.pingConfig.MaxPingMiss {
logging.Error(c.logger).Log(logging.MessageKey(), "Ping miss, exiting ping loop")
c.logger.Error("Ping miss, exiting ping loop")
pingMiss = true
}
logging.Debug(c.logger).Log(logging.MessageKey(), "Resetting ping timer")
c.logger.Debug("Resetting ping timer")
inTimer.Reset(c.pingConfig.PingWait)
}
}
Expand Down
31 changes: 15 additions & 16 deletions decodeWorkers.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package kratos

import (
"context"
"sync"
"sync/atomic"

"github.com/go-kit/kit/log"
"github.com/goph/emperror"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/semaphore"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
)

const (
Expand All @@ -27,16 +26,16 @@ type decoderSender interface {
type decoderQueue struct {
incoming chan []byte
sender registryHandler
workers semaphore.Interface
workers *semaphore.Weighted
wg sync.WaitGroup
logger log.Logger
logger *zap.Logger
once sync.Once
closed atomic.Value
}

// NewDecoderSender creates a new decoderQueue for decoding and sending
// messages.
func NewDecoderSender(sender registryHandler, maxWorkers int, queueSize int, logger log.Logger) *decoderQueue {
func NewDecoderSender(sender registryHandler, maxWorkers int, queueSize int, logger *zap.Logger) *decoderQueue {
size := queueSize
if size < minQueueSize {
size = minQueueSize
Expand All @@ -48,7 +47,7 @@ func NewDecoderSender(sender registryHandler, maxWorkers int, queueSize int, log
d := decoderQueue{
incoming: make(chan []byte, size),
sender: sender,
workers: semaphore.New(numWorkers),
workers: semaphore.NewWeighted(int64(numWorkers)),
logger: logger,
}
d.wg.Add(1)
Expand All @@ -61,7 +60,7 @@ func NewDecoderSender(sender registryHandler, maxWorkers int, queueSize int, log
func (d *decoderQueue) DecodeAndSend(msg []byte) {
switch d.closed.Load() {
case true:
logging.Error(d.logger).Log(logging.MessageKey(),
d.logger.Error(
"Failed to queue message. DecoderQueue is no longer accepting messages.")
default:
d.incoming <- msg
Expand All @@ -83,9 +82,10 @@ func (d *decoderQueue) Close() {
// long-running go routine that watches the queue and starts other go routines
// to decode and send the messages.
func (d *decoderQueue) startParsing() {
ctx := context.Background()
defer d.wg.Done()
for i := range d.incoming {
d.workers.Acquire()
d.workers.Acquire(ctx, 1)
d.wg.Add(1)
go d.parse(i)
}
Expand All @@ -95,20 +95,19 @@ func (d *decoderQueue) startParsing() {
// registryHandler.
func (d *decoderQueue) parse(incoming []byte) {
defer d.wg.Done()
defer d.workers.Release()
defer d.workers.Release(1)
msg := wrp.Message{}

// decoding
logging.Debug(d.logger).Log(logging.MessageKey(), "Decoding message...")
d.logger.Debug("Decoding message...")
err := wrp.NewDecoderBytes(incoming, wrp.Msgpack).Decode(&msg)
if err != nil {
logging.Error(d.logger, emperror.Context(err)...).
Log(logging.MessageKey(), "Failed to decode message into wrp", logging.ErrorKey(), err.Error())
d.logger.Error("Failed to decode message into wrp", zap.Error(err))
return
}
logging.Debug(d.logger).Log(logging.MessageKey(), "Message Decoded")
d.logger.Debug("Message Decoded")

// sending
d.sender.GetHandlerThenSend(&msg)
logging.Debug(d.logger).Log(logging.MessageKey(), "Message Sent")
d.logger.Debug("Message Sent")
}
35 changes: 17 additions & 18 deletions encodeWorkers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package kratos

import (
"bytes"
"context"
"sync"
"sync/atomic"

"github.com/go-kit/kit/log"
"github.com/goph/emperror"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/semaphore"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"
)

// encoderSender is anything that can encode and send a message.
Expand All @@ -22,16 +21,16 @@ type encoderSender interface {
type encoderQueue struct {
incoming chan *wrp.Message
sender outboundSender
workers semaphore.Interface
workers *semaphore.Weighted
wg sync.WaitGroup
logger log.Logger
logger *zap.Logger
once sync.Once
closed atomic.Value
}

// NewEncoderSender creates a new encoderQueue, that allows for asynchronous
// sending outbound.
func NewEncoderSender(sender outboundSender, maxWorkers int, queueSize int, logger log.Logger) *encoderQueue {
func NewEncoderSender(sender outboundSender, maxWorkers int, queueSize int, logger *zap.Logger) *encoderQueue {
size := queueSize
if size < minQueueSize {
size = minQueueSize
Expand All @@ -43,7 +42,7 @@ func NewEncoderSender(sender outboundSender, maxWorkers int, queueSize int, logg
e := encoderQueue{
incoming: make(chan *wrp.Message, size),
sender: sender,
workers: semaphore.New(numWorkers),
workers: semaphore.NewWeighted(int64(numWorkers)),
logger: logger,
}
e.wg.Add(1)
Expand All @@ -57,7 +56,7 @@ func NewEncoderSender(sender outboundSender, maxWorkers int, queueSize int, logg
func (e *encoderQueue) EncodeAndSend(msg *wrp.Message) {
switch e.closed.Load() {
case true:
logging.Error(e.logger).Log(logging.MessageKey(),
e.logger.Error(
"Failed to queue message. EncoderQueue is no longer accepting messages.")
default:
e.incoming <- msg
Expand All @@ -80,8 +79,10 @@ func (e *encoderQueue) Close() {
// they arrive in the queue.
func (e *encoderQueue) startParsing() {
defer e.wg.Done()
ctx := context.Background() // TODO - does this need to be withCancel?

for i := range e.incoming {
e.workers.Acquire()
e.workers.Acquire(ctx, 1)
e.wg.Add(1)
go e.parse(i)
}
Expand All @@ -90,22 +91,20 @@ func (e *encoderQueue) startParsing() {
// parse encodes the wrp message and then uses the outboundSender to send it.
func (e *encoderQueue) parse(incoming *wrp.Message) {
defer e.wg.Done()
defer e.workers.Release()
defer e.workers.Release(1)
var buffer bytes.Buffer

// encoding
logging.Debug(e.logger).Log(logging.MessageKey(), "Encoding message...")
e.logger.Error("Encoding message...")
err := wrp.NewEncoder(&buffer, wrp.Msgpack).Encode(incoming)
if err != nil {
logging.Error(e.logger, emperror.Context(err)...).
Log(logging.MessageKey(), "Failed to encode message",
logging.ErrorKey(), err.Error(),
"message", incoming)
e.logger.Error( "Failed to encode message", zap.Error(err),
zap.Any("message", incoming))
return
}
logging.Debug(e.logger).Log(logging.MessageKey(), "Message Encoded")
e.logger.Debug("Message Encoded")

// sending
e.sender.Send(buffer.Bytes())
logging.Debug(e.logger).Log(logging.MessageKey(), "Message Sent")
e.logger.Debug("Message Sent")
}
4 changes: 2 additions & 2 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"

"github.com/xmidt-org/kratos"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/sallust"
"github.com/xmidt-org/wrp-go/v3"
)

Expand Down Expand Up @@ -67,7 +67,7 @@ func main() {
fmt.Println("We missed the ping!")
return nil
},
ClientLogger: logging.New(nil),
ClientLogger: sallust.Default(),
})
if err != nil {
fmt.Println("Error making client: ", err)
Expand Down
Loading

0 comments on commit dc6e1fb

Please sign in to comment.