Skip to content

Commit

Permalink
Merge pull request #83295 from abarganier/backport22.1-79793
Browse files Browse the repository at this point in the history
release-22.1: pkg/util/log: introduce log.BufferedSinkCloser
  • Loading branch information
abarganier authored Jun 23, 2022
2 parents 4ebde16 + c1b9561 commit 31c6f8c
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 58 deletions.
2 changes: 2 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func Main() {
// by the sub-command.
errCode = getExitCode(err)
}
// Finally, gracefully shutdown logging facilities.
cliCtx.logShutdownFn()

exit.WithCode(errCode)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/cli/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ type cliContext struct {
// logConfig is the resulting logging configuration after the input
// configuration has been parsed and validated.
logConfig logconfig.Config
// logShutdownFn is used to close & teardown logging facilities gracefully
// before exiting the process. This may block until all buffered log sinks
// are shutdown, if any are enabled, or until a timeout is triggered.
logShutdownFn func()
// deprecatedLogOverrides is the legacy pre-v21.1 discrete flag
// overrides for the logging configuration.
// TODO(knz): Deprecated in v21.1. Remove this.
Expand Down Expand Up @@ -229,6 +233,7 @@ func setCliContextDefaults() {
cliCtx.allowUnencryptedClientPassword = false
cliCtx.logConfigInput = settableString{s: ""}
cliCtx.logConfig = logconfig.Config{}
cliCtx.logShutdownFn = func() {}
cliCtx.ambiguousLogDir = false
// TODO(knz): Deprecated in v21.1. Remove this.
cliCtx.deprecatedLogOverrides.reset()
Expand Down
4 changes: 3 additions & 1 deletion pkg/cli/log_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ func setupLogging(ctx context.Context, cmd *cobra.Command, isServerCmd, applyCon
}

// Configuration ready and directories exist; apply it.
if _, err := log.ApplyConfig(h.Config); err != nil {
logShutdownFn, err := log.ApplyConfig(h.Config)
if err != nil {
return err
}
cliCtx.logShutdownFn = logShutdownFn

// If using a custom config, report the configuration at the start of the logging stream.
if cliCtx.logConfigInput.isSet {
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/log/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
srcs = [
"ambient_context.go",
"buffered_sink.go",
"buffered_sink_closer.go",
"channels.go",
"clog.go",
"doc.go",
Expand Down Expand Up @@ -131,6 +132,7 @@ go_test(
size = "small",
srcs = [
"ambient_context_test.go",
"buffered_sink_closer_test.go",
"buffered_sink_test.go",
"channels_test.go",
"clog_test.go",
Expand Down
25 changes: 12 additions & 13 deletions pkg/util/log/buffered_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,16 @@ func newBufferedSink(
return sink
}

// Start starts an internal goroutine that will run until ctx is canceled.
func (bs *bufferedSink) Start(ctx context.Context) {
// Start the runFlusher goroutine.
go bs.runFlusher(ctx)
// Start starts an internal goroutine that will run until the provided closer is
// closed.
func (bs *bufferedSink) Start(closer *bufferedSinkCloser) {
stopC, unregister := closer.RegisterBufferedSink(bs)
// Start the runFlusher goroutine & mark as done on the
// closer once it exits.
go func() {
defer unregister()
bs.runFlusher(stopC)
}()
}

// active returns true if this sink is currently active.
Expand Down Expand Up @@ -230,20 +236,13 @@ func (bs *bufferedSink) exitCode() exit.Code {
//
// TODO(knz): How does this interact with the runFlusher logic in log_flush.go?
// See: https://github.com/cockroachdb/cockroach/issues/72458
//
// TODO(knz): this code should be extended to detect server shutdowns:
// as currently implemented the runFlusher will only terminate after all
// the writes in the channel are completed. If the writes are slow,
// the goroutine may not terminate properly when server shutdown is
// requested.
// See: https://github.com/cockroachdb/cockroach/issues/72459
func (bs *bufferedSink) runFlusher(ctx context.Context) {
func (bs *bufferedSink) runFlusher(stopC <-chan struct{}) {
buf := &bs.mu.buf
for {
done := false
select {
case <-bs.flushC:
case <-ctx.Done():
case <-stopC:
// We'll return after flushing everything.
done = true
}
Expand Down
126 changes: 126 additions & 0 deletions pkg/util/log/buffered_sink_closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package log

import (
"fmt"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// bufferedSinkCloser is a utility used by the logging system to
// trigger the teardown of logging facilities gracefully within
// CockroachDB.
//
// The API allows us to register buffered log sinks with RegisterBufferedSink
// so we can wait for them to drain/timeout before exiting.
//
// The API also allows us to trigger the shutdown sequence via Close and
// wait for all registered buffered log sinks to finish processing
// before exiting, to help ensure a graceful shutdown of buffered
// log sinks.
type bufferedSinkCloser struct {
// stopC is closed by Close() to signal all the registered sinks to shut down.
stopC chan struct{}
// wg is the WaitGroup used during the Close procedure to ensure that
// all registered bufferedSink's have completed before returning.
wg sync.WaitGroup
mu struct {
syncutil.Mutex
// sinkRegistry acts as a set and stores references to all bufferSink's
// registered with this bufferedSinkCloser instance. Only useful for debugging
// purposes.
sinkRegistry map[*bufferedSink]struct{}
}
}

// newBufferedSinkCloser returns a new bufferedSinkCloser.
func newBufferedSinkCloser() *bufferedSinkCloser {
closer := &bufferedSinkCloser{
stopC: make(chan struct{}),
}
closer.mu.sinkRegistry = make(map[*bufferedSink]struct{})
return closer
}

// RegisterBufferedSink registers a bufferedSink with closer. closer.Close will
// block for this sink's shutdown.
//
// A reference to the bufferSink is maintained in an internal registry to aid in
// debug capabilities.
//
// Returns a channel that will be closed when closer.Close() is called. The
// bufferedSink should listen to this channel and shutdown. The cleanup function
// needs to be called once the bufferedSink has shutdown.
func (closer *bufferedSinkCloser) RegisterBufferedSink(
bs *bufferedSink,
) (shutdown <-chan (struct{}), cleanup func()) {
closer.mu.Lock()
defer closer.mu.Unlock()

if _, ok := closer.mu.sinkRegistry[bs]; ok {
panic(errors.AssertionFailedf("buffered log sink registered more than once within log.bufferedSinkCloser: %T", bs.child))
}

closer.mu.sinkRegistry[bs] = struct{}{}
closer.wg.Add(1)
return closer.stopC, func() { closer.bufferedSinkDone(bs) }
}

// bufferedSinkDone notifies the bufferedSinkCloser that one of the buffered
// log sinks registered via RegisterBufferedSink has finished processing
// & has terminated.
func (closer *bufferedSinkCloser) bufferedSinkDone(bs *bufferedSink) {
closer.mu.Lock()
defer closer.mu.Unlock()
// If we don't have the sink in the registry, then the sink is not accounted for
// in the WaitGroup. Warn and return early - to signal the WaitGroup could prematurely
// end the shutdown sequence of a different bufferSink that is registered.
if _, ok := closer.mu.sinkRegistry[bs]; !ok {
panic(errors.AssertionFailedf(
"log shutdown sequence has detected an unregistered log sink: %T\n", bs.child))
}
delete(closer.mu.sinkRegistry, bs)
closer.wg.Done()
}

// defaultCloserTimeout is the default duration that
// bufferedSinkCloser.Close(timeout) will wait for sinks to shut down.
const defaultCloserTimeout = 90 * time.Second

// Close triggers the logging shutdown process, signaling all registered sinks
// to shut down and waiting for them to do so up to timeout.
func (closer *bufferedSinkCloser) Close(timeout time.Duration) error {
close(closer.stopC)
doneCh := make(chan struct{})
go func() {
closer.wg.Wait()
doneCh <- struct{}{}
}()

select {
case <-doneCh:
return nil
case <-time.After(timeout):
closer.mu.Lock()
defer closer.mu.Unlock()
leakedSinks := make([]string, 0, len(closer.mu.sinkRegistry))
for bs := range closer.mu.sinkRegistry {
leakedSinks = append(leakedSinks, fmt.Sprintf("%T", bs.child))
}
return errors.Newf(
"log shutdown sequence has detected a deadlock & has timed out. Hanging log sink(s): %v",
leakedSinks)
}
}
103 changes: 103 additions & 0 deletions pkg/util/log/buffered_sink_closer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package log

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClose(t *testing.T) {
t.Run("returns after all registered sinks exit", func(t *testing.T) {
fakeSinkRoutine := func(stopC <-chan struct{}, bs *bufferedSink, cleanup func()) {
defer cleanup()
<-stopC
}

closer := newBufferedSinkCloser()
for i := 0; i < 3; i++ {
fakeBufferSink := &bufferedSink{}
stopC, cleanup := closer.RegisterBufferedSink(fakeBufferSink)
go fakeSinkRoutine(stopC, fakeBufferSink, cleanup)
}

require.NoError(t, closer.Close(1000*time.Hour) /* timeout - verify that it doesn't expire */)
})

t.Run("times out if leaked bufferedSink doesn't shut down", func(t *testing.T) {
closer := newBufferedSinkCloser()
_, _ = closer.RegisterBufferedSink(&bufferedSink{})
require.Error(t, closer.Close(time.Nanosecond /* timeout */))
})
}

func TestRegisterBufferSink(t *testing.T) {
t.Run("registers bufferedSink as expected", func(t *testing.T) {
lc := newBufferedSinkCloser()
bs := &bufferedSink{}
lc.RegisterBufferedSink(bs)
lc.mu.Lock()
defer lc.mu.Unlock()
_, ok := lc.mu.sinkRegistry[bs]
assert.True(t, ok)
})

t.Run("panics if same bufferedSink registered twice", func(t *testing.T) {
lc := newBufferedSinkCloser()
bs := &bufferedSink{}
lc.RegisterBufferedSink(bs)
assert.Panics(t,
func() { lc.RegisterBufferedSink(bs) },
"expected RegisterBufferedSink() to panic when same sink registered twice.")
})
}

func TestBufferSinkDone(t *testing.T) {
t.Run("signals waitgroup and removes bufferSink from registry", func(t *testing.T) {
closer := newBufferedSinkCloser()
bs := &bufferedSink{}

closer.RegisterBufferedSink(bs)

closer.mu.Lock()
assert.Len(t, closer.mu.sinkRegistry, 1, "expected sink registry to include registered bufferedSink")
closer.mu.Unlock()

closer.bufferedSinkDone(bs)

closer.mu.Lock()
assert.Empty(t, closer.mu.sinkRegistry, "expected sink registry to be empty")
closer.mu.Unlock()

require.NoError(t, closer.Close(time.Second /* timeout */), "bufferedSinkCloser timed out unexpectedly")
})

t.Run("panics if called on unregistered bufferSink", func(t *testing.T) {
closer := newBufferedSinkCloser()
bs1 := &bufferedSink{}
bs2 := &bufferedSink{}

closer.RegisterBufferedSink(bs1)

closer.mu.Lock()
_, ok := closer.mu.sinkRegistry[bs1]
assert.Len(t, closer.mu.sinkRegistry, 1, "length of bufferSink registry larger than expected")
assert.True(t, ok, "expected bufferSink to be in registry")
closer.mu.Unlock()

require.Panics(t, func() {
closer.bufferedSinkDone(bs2)
})
})
}
Loading

0 comments on commit 31c6f8c

Please sign in to comment.