Skip to content

Commit

Permalink
fix: check if the stream output is being serviced. if a stream doesn'…
Browse files Browse the repository at this point in the history
…t accept a log message within a second then that channel gets removed.

this isn't ideal, but at least the whole application won't deadlock.
  • Loading branch information
perbu committed Dec 13, 2021
1 parent 4c52c4b commit 00f96d8
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 29 deletions.
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ The logging is async so messages might take a bit of time before showing up.
Control messages are synchronously, however, so a Flush will not return unless
all pending messages are processed.

Also, if you request a log stream you have to service the channel. If you don't
the application will lock up. Be careful when working with these.

log.go is generated by gen/main.go

chainsaw is pretty slow - it is about as slow as logrus. Channels in Go aren't really that fast and if you're logging
Expand All @@ -66,6 +63,5 @@ On my laptop a regular log invocation takes about 600ns and control messages tak

* Increase compatibility with other logging libraries.
* Add support for formatting
* Consider trying to detect blocked channels when GetStream is being used.
* Documentation, at least when we're somewhat certain that the basic design is sane.
* Adapt a logging interface. Shame there isn't one in Stdlib.
19 changes: 7 additions & 12 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ const (
// MakeLogger creates a new logger instance.
// Params:
// Mandatory is name, can be empty.
// In addition you can supply two ints:
// In addition, you can supply two ints:
// logBufferSize - the size of the circular buffer
// chanBufferSize - how big the channels buffers should be
func MakeLogger(name string, options ...int) *CircularLogger {
logBufferSize := defaultLogBufferSize
chanBufferSize := defaultChanBufferSize
if len(options) > 1 {
if len(options) > 0 {
logBufferSize = options[0]
}
if len(options) > 2 {
if len(options) > 1 {
chanBufferSize = options[1]
}
c := CircularLogger{
Expand All @@ -52,9 +52,7 @@ func MakeLogger(name string, options ...int) *CircularLogger {
// Things might deadlock if you log while it is down.
func (l *CircularLogger) Stop() {
if l.running.Get() {
cMsg := controlMessage{
cType: ctrlQuit,
}
cMsg := controlMessage{cType: ctrlQuit}
_ = l.sendCtrlAndWait(cMsg)
} else {
fmt.Printf("Error! Stop called on a passive logger")
Expand All @@ -63,9 +61,7 @@ func (l *CircularLogger) Stop() {

// Reset the circular buffer of the logger. Flush the logs.
func (l *CircularLogger) Reset() {
cMsg := controlMessage{
cType: ctrlRst,
}
cMsg := controlMessage{cType: ctrlRst}
_ = l.sendCtrlAndWait(cMsg)
}

Expand All @@ -83,14 +79,14 @@ func (l *CircularLogger) GetStream(ctx context.Context) chan LogMessage {
There is a race condition here. What happens is:
ctx is cancelled.
We send the control message then we get blocked --> deadlock.
*/
// Make the channel we're gonna return.
retCh := make(chan LogMessage, l.chanBufferSize)
cMessage := controlMessage{cType: ctrlAddOutputChan, outputCh: retCh}
_ = l.sendCtrlAndWait(cMessage)
// spin of a goroutine that will wait for the context.
go func(outputCh chan LogMessage) {
<-ctx.Done()
<-ctx.Done() // wait for the context to cancel
cMessage := controlMessage{cType: ctrlRemoveOutputChan, outputCh: outputCh}
err := l.sendCtrlAndWait(cMessage) // waits for the response.
if err != nil {
Expand All @@ -114,7 +110,6 @@ func (l *CircularLogger) GetMessages(level LogLevel) []LogMessage {
cType: ctrlDump,
returnChan: retCh,
level: level,
outputCh: nil,
}
l.controlCh <- cMsg // Requesting messages over control channel
ret := <-retCh
Expand Down
31 changes: 18 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
//go:generate stringer -type=LogLevel

//go:generate go run gen/main.go
const channelTimeout = time.Second

var defaultLogger *CircularLogger

Expand Down Expand Up @@ -84,24 +85,21 @@ type LogMessage struct {
// CircularLogger is the struct holding a chainsaw instance.
// All state within is private and should be access through methods.
type CircularLogger struct {
// The name gets into all the loglines.
name string
// at what log level should messages be printed to stdout
printLevel LogLevel
name string // The name gets into all the loglines when printed.
printLevel LogLevel // at what log level should messages be printed to stdout
// messages is the internal log buffer keeping the last messages in a circular buffer
messages []LogMessage
// logChan Messages are send over this channel to the log worker.
logCh logChan
logCh logChan // logChan Messages are send over this channel to the log worker.
// outputChs is a list of channels that messages are copied onto when they arrive
// if a stream has been requested it is added here.
outputChs []logChan
// controlCh is the internal channel that is used for control messages.
controlCh controlChannel
current int
logBufferSize int
outputWriters []io.Writer
running atomicBool
chanBufferSize int
current int // points to the current message in the circular buffer
logBufferSize int // the size of the circular buffer we use.
outputWriters []io.Writer // List of io.Writers where the output gets copied.
running atomicBool // bool to indicate that the internal goroutine for the logger is running.
chanBufferSize int // how big the channel buffer is. re-used when making streams.
}

func (l *CircularLogger) log(level LogLevel, m string) {
Expand Down Expand Up @@ -155,8 +153,15 @@ func (l *CircularLogger) channelHandler(wg *sync.WaitGroup) {
l.messages[l.current] = msg
l.current = (l.current + 1) % l.logBufferSize
for _, ch := range l.outputChs {
// println("sending to channel ", i, msg.Content)
ch <- msg
select {
case ch <- msg:
case <-time.After(channelTimeout):
fmt.Println("timed out during channel write, removing channel")
err := l.removeOutputChan(ch) // This should be safe. The channel is buffered.
if err != nil {
fmt.Println("error while removing block channel: ", err)
}
}
}
if msg.LogLevel == FatalLevel {
fmt.Println("[chainsaw causing exit]")
Expand Down
27 changes: 27 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,31 @@ func TestStream(t *testing.T) {
fmt.Println("Stream test passed")
}

// TestStreamBlocked will create a logger, open a stream, fail to service that stream and
// detect if time out.
// If we don't carefully write to channels this will cause a deadlock panic.
func TestStreamBlocked(t *testing.T) {
is := is.New(t)
testLogger := MakeLogger("", 10, 0) // Unbuffered so we provoke races.
defer testLogger.Stop()
ctx, cancel := context.WithCancel(context.Background())
stream := testLogger.GetStream(ctx)
wg := sync.WaitGroup{}
wg.Add(1)
start := time.Now()
go func() {
testLogger.Info("Silly message #1")
_ = testLogger.Flush() // will block and trigger deadlock if it isn't handled.
wg.Done()
}()
wg.Wait()
cancel()
fmt.Println(<-stream)
timeTaken := time.Since(start)
is.True(timeTaken < channelTimeout*2)
fmt.Println("ok")
}

func handleStream(stream chan LogMessage, counter *SafeInt, wg *sync.WaitGroup) {
for range stream {
counter.Inc()
Expand Down Expand Up @@ -362,6 +387,8 @@ func TestManyLoggers(t *testing.T) {
}
}
for i, logger := range loggers {
err := logger.Flush()
is.NoErr(err)
msgs := logger.GetMessages(TraceLevel)
m := msgs[0]
is.Equal(fmt.Sprintf("Message %d on logger %d", messagesPerLogger-logBufferSize, i), m.Content)
Expand Down

0 comments on commit 00f96d8

Please sign in to comment.