Skip to content

Commit

Permalink
pkg/util/log: introduce log.FlushAllSync(), call on panic handle
Browse files Browse the repository at this point in the history
Previously, our crash reporter system would flush file log sinks
as part of the process to handle a panic.

This was an incomplete process, since buffered network sinks were
not included in part of this flush process. This means that many
times, panic logs would not make it to the network target, leading
to a loss in observability.

This patch introduces `log.FlushAllSync()`, which flushes both file
and buffered network log sinks. It then updates the crash reporter
to call into this, instead of just flushing file log sinks.

`FlushAllSync()` contains timeout logic to prevent the process from
completing if one of the underlying child sinks that a bufferedSink
wraps becomes unavailable/hangs on its `output()` call.

Release note: none
  • Loading branch information
abarganier committed Aug 25, 2023
1 parent 09c7f9f commit f113fb4
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func createAndStartServerAsync(

go func() {
// Ensure that the log files see the startup messages immediately.
defer log.FlushFiles()
defer log.FlushAllSync()
// If anything goes dramatically wrong, use Go's panic/recover
// mechanism to intercept the panic and log the panic details to
// the error reporting server.
Expand Down Expand Up @@ -1497,7 +1497,7 @@ func reportReadinessExternally(ctx context.Context, cmd *cobra.Command, waitForI
// Ensure the configuration logging is written to disk in case a
// process is waiting for the sdnotify readiness to read important
// information from there.
log.FlushFiles()
log.FlushAllSync()

// Signal readiness. This unblocks the process when running with
// --background or under systemd.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/schematelemetry/schema_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ UPDATE system.namespace SET id = %d WHERE id = %d;

// Ensure that our logs are flushed to disk before asserting about log
// entries.
log.Flush()
log.FlushFiles()

// Ensure that a log line is emitted for each invalid object, with a loose
// enforcement of the log structure.
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/log/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@
//
// # Output
//
// Log output is buffered and written periodically using FlushFiles. Programs
// should call FlushFiles before exiting to guarantee all log output is written.
// Log output is buffered and written periodically using FlushFiles.
// Programs should call FlushFiles before exiting to guarantee all
// log output is written to files. Note that buffered network sinks also
// exist. If you'd like to flush these as well, call FlushAllSync.
//
// By default, all log statements write to files in a temporary directory.
// This package provides several flags that modify this behavior.
Expand Down
42 changes: 42 additions & 0 deletions pkg/util/log/log_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package log

import (
"context"
"fmt"
"io"
"time"

Expand All @@ -37,6 +38,47 @@ func FlushFiles() {
})
}

// FlushAllSync explicitly flushes all asynchronous buffered logging sinks,
// including pending log file I/O and buffered network sinks.
//
// NB: This is a synchronous operation, and will block until all flushes
// have completed. Generally only recommended for use in crash reporting
// and shutdown scenarios. Note that `tryForceSync` is best effort, so the
// possibility exists that a buffered log sink is unable to block until
// the flush completes. In such a case though, the expectation that a flush
// is already imminent for that sink.
//
// Each sink we attempt to flush is attempted with a timeout.
func FlushAllSync() {
FlushFiles()
_ = logging.allSinkInfos.iterBufferedSinks(func(bs *bufferedSink) error {
// Trigger a synchronous flush by calling output on the bufferedSink
// with a `tryForceSync` option.
doneCh := make(chan struct{})
go func() {
err := bs.output([]byte{}, sinkOutputOptions{tryForceSync: true})
if err != nil {
fmt.Printf("Error draining buffered log sink %T: %v\n", bs.child, err)
}
doneCh <- struct{}{}
}()
// Don't wait forever if the underlying sink happens to be unavailable.
// Set a timeout to avoid holding up the panic handle process for too long.
select {
case <-time.After(3 * time.Second):
fmt.Printf("Timed out waiting on buffered log sink %T to drain.\n", bs.child)
case <-doneCh:
}
// We don't want to let errors stop us from iterating and flushing
// the remaining buffered log sinks. Nor do we want to log the error
// using the logging system, as it's unlikely to make it to the
// destination sink anyway (there's a good chance we're flushing
// as part of handling a panic). If an error occurs, it will be displayed.
// Regardless, we return nil so the iteration continues.
return nil
})
}

func init() {
go flushDaemon()
go signalFlusher()
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/log/logcrash/crash_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func ReportPanic(ctx context.Context, sv *settings.Values, r interface{}, depth

// Ensure that the logs are flushed before letting a panic
// terminate the server.
log.FlushFiles()
log.FlushAllSync()
}

// PanicAsError turns r into an error if it is not one already.
Expand Down
13 changes: 13 additions & 0 deletions pkg/util/log/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ func (r *sinkInfoRegistry) iterFileSinks(fn func(l *fileSink) error) error {
})
}

// iterBufferedSinks iterates over all the buffered sinks and stops at the first
// error encountered.
func (r *sinkInfoRegistry) iterBufferedSinks(fn func(bs *bufferedSink) error) error {
return r.iter(func(si *sinkInfo) error {
if bs, ok := si.sink.(*bufferedSink); ok {
if err := fn(bs); err != nil {
return err
}
}
return nil
})
}

// put adds a sinkInfo into the registry.
func (r *sinkInfoRegistry) put(l *sinkInfo) {
r.mu.Lock()
Expand Down

0 comments on commit f113fb4

Please sign in to comment.