Skip to content

Commit

Permalink
Merge pull request #388 from kakao/storage_debug_logs
Browse files Browse the repository at this point in the history
feat(storage): metrics logging and less verbose event logging
  • Loading branch information
ijsong authored Apr 5, 2023
2 parents c97626b + 3b47f8b commit 3d62738
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 3 deletions.
3 changes: 3 additions & 0 deletions bin/start_varlogsn.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ def start(args: argparse.Namespace) -> None:
if args.storage_max_concurrent_compaction:
cmd.append(
f"--storage-max-concurrent-compaction={args.storage_max_concurrent_compaction}")
if args.storage_metrics_log_interval:
cmd.append(f"--storage-metrics-log-interval={args.storage_metrics_log_interval}")
if args.storage_verbose:
cmd.append("--storage-verbose")

Expand Down Expand Up @@ -266,6 +268,7 @@ def main() -> None:
parser.add_argument("--storage-mem-table-size", type=str)
parser.add_argument("--storage-mem-table-stop-writes-threshold", type=int)
parser.add_argument("--storage-max-concurrent-compaction", type=int)
parser.add_argument("--storage-metrics-log-interval", type=str)
parser.add_argument("--storage-verbose", action="store_true")

# logging options
Expand Down
1 change: 1 addition & 0 deletions cmd/varlogsn/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func newStartCommand() *cli.Command {
flagStorageMemTableSize.StringFlag(false, units.ToByteSizeString(storage.DefaultMemTableSize)),
flagStorageMemTableStopWritesThreshold.IntFlag(false, storage.DefaultMemTableStopWritesThreshold),
flagStorageMaxConcurrentCompaction.IntFlag(false, storage.DefaultMaxConcurrentCompactions),
flagStorageMetricsLogInterval,
flagStorageVerbose.BoolFlag(),

flagLogDir.StringFlag(false, ""),
Expand Down
6 changes: 6 additions & 0 deletions cmd/varlogsn/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/urfave/cli/v2"

"github.com/kakao/varlog/internal/flags"
"github.com/kakao/varlog/internal/storage"
"github.com/kakao/varlog/internal/storagenode"
)

Expand Down Expand Up @@ -122,6 +123,11 @@ var (
Name: "storage-max-concurrent-compaction",
Envs: []string{"STORAGE_MAX_CONCURRENT_COMPACTION"},
}
flagStorageMetricsLogInterval = &cli.DurationFlag{
Name: "storage-metrics-log-interval",
EnvVars: []string{"STORAGE_METRICS_LOG_INTERVAL"},
Value: storage.DefaultMetricsLogInterval,
}
flagStorageVerbose = flags.FlagDesc{
Name: "storage-verbose",
Envs: []string{"STORAGE_VERBOSE"},
Expand Down
1 change: 1 addition & 0 deletions cmd/varlogsn/varlogsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func start(c *cli.Context) error {
storage.WithMemTableSize(c.Int(flagStorageMemTableSize.Name)),
storage.WithMemTableStopWritesThreshold(c.Int(flagStorageMemTableStopWritesThreshold.Name)),
storage.WithMaxConcurrentCompaction(c.Int(flagStorageMaxConcurrentCompaction.Name)),
storage.WithMetrisLogInterval(c.Duration(flagStorageMetricsLogInterval.Name)),
}
if c.Bool(flagStorageDisableWAL.Name) {
storageOpts = append(storageOpts, storage.WithoutWAL())
Expand Down
12 changes: 11 additions & 1 deletion internal/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"errors"
"time"

"go.uber.org/zap"

Expand All @@ -16,6 +17,7 @@ const (
DefaultMemTableSize = 4 << 20
DefaultMemTableStopWritesThreshold = 2
DefaultMaxConcurrentCompactions = 1
DefaultMetricsLogInterval = time.Duration(0)
)

type config struct {
Expand All @@ -31,6 +33,7 @@ type config struct {
memTableStopWritesThreshold int
maxConcurrentCompaction int
verbose bool
metricsLogInterval time.Duration
logger *zap.Logger

readOnly bool
Expand All @@ -50,7 +53,8 @@ func newConfig(opts []Option) (config, error) {
memTableStopWritesThreshold: DefaultMemTableStopWritesThreshold,
maxConcurrentCompaction: DefaultMaxConcurrentCompactions,

logger: zap.NewNop(),
metricsLogInterval: DefaultMetricsLogInterval,
logger: zap.NewNop(),
}
for _, opt := range opts {
opt.apply(&cfg)
Expand Down Expand Up @@ -153,6 +157,12 @@ func WithVerboseLogging() Option {
})
}

func WithMetrisLogInterval(metricsLogInterval time.Duration) Option {
return newFuncOption(func(cfg *config) {
cfg.metricsLogInterval = metricsLogInterval
})
}

func WithLogger(logger *zap.Logger) Option {
return newFuncOption(func(cfg *config) {
cfg.logger = logger
Expand Down
56 changes: 54 additions & 2 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package storage

import (
"errors"
"sync"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
Expand All @@ -22,6 +24,12 @@ type Storage struct {

db *pebble.DB
writeOpts *pebble.WriteOptions

metricsLogger struct {
wg sync.WaitGroup
ticker *time.Ticker
stop chan struct{}
}
}

// New creates a new storage.
Expand Down Expand Up @@ -61,8 +69,19 @@ func New(opts ...Option) (*Storage, error) {
if cfg.verbose {
el := pebble.MakeLoggingEventListener(newLogAdaptor(cfg.logger))
pebbleOpts.EventListener = &el
// BackgroundError, DiskSlow, WriteStallBegin, WriteStallEnd
pebbleOpts.EventListener.CompactionBegin = nil
pebbleOpts.EventListener.CompactionEnd = nil
pebbleOpts.EventListener.FlushBegin = nil
pebbleOpts.EventListener.FlushEnd = nil
pebbleOpts.EventListener.FormatUpgrade = nil
pebbleOpts.EventListener.ManifestCreated = nil
pebbleOpts.EventListener.ManifestDeleted = nil
pebbleOpts.EventListener.TableCreated = nil
pebbleOpts.EventListener.TableDeleted = nil
pebbleOpts.EventListener.TableIngested = nil
pebbleOpts.EventListener.TableStatsLoaded = nil
pebbleOpts.EventListener.TableValidated = nil
pebbleOpts.EventListener.WALCreated = nil
pebbleOpts.EventListener.WALDeleted = nil
}
Expand All @@ -75,11 +94,14 @@ func New(opts ...Option) (*Storage, error) {
if err != nil {
return nil, err
}
return &Storage{

stg := &Storage{
config: cfg,
db: db,
writeOpts: &pebble.WriteOptions{Sync: cfg.sync},
}, nil
}
stg.startMetricsLogger()
return stg, nil
}

// NewWriteBatch creates a batch for write operations.
Expand Down Expand Up @@ -251,10 +273,40 @@ func (s *Storage) DiskUsage() uint64 {
return s.db.Metrics().DiskSpaceUsage()
}

func (s *Storage) startMetricsLogger() {
if s.metricsLogInterval <= 0 {
return
}
s.metricsLogger.stop = make(chan struct{})
s.metricsLogger.ticker = time.NewTicker(s.metricsLogInterval)
s.metricsLogger.wg.Add(1)
go func() {
defer s.metricsLogger.wg.Done()
for {
select {
case <-s.metricsLogger.ticker.C:
s.logger.Info("\n" + s.db.Metrics().String())
case <-s.metricsLogger.stop:
return
}
}
}()
}

func (s *Storage) stopMetricsLogger() {
if s.metricsLogInterval <= 0 {
return
}
s.metricsLogger.ticker.Stop()
close(s.metricsLogger.stop)
s.metricsLogger.wg.Wait()
}

// Close closes the storage.
func (s *Storage) Close() (err error) {
if !s.readOnly {
err = s.db.Flush()
}
s.stopMetricsLogger()
return multierr.Append(err, s.db.Close())
}

0 comments on commit 3d62738

Please sign in to comment.