diff --git a/bin/start_varlogsn.py b/bin/start_varlogsn.py index a23f91899..bbd904621 100755 --- a/bin/start_varlogsn.py +++ b/bin/start_varlogsn.py @@ -212,6 +212,8 @@ def start(args: argparse.Namespace) -> None: if args.storage_max_concurrent_compaction: cmd.append( f"--storage-max-concurrent-compaction={args.storage_max_concurrent_compaction}") + if args.storage_metrics_log_interval: + cmd.append(f"--storage-metrics-log-interval={args.storage_metrics_log_interval}") if args.storage_verbose: cmd.append("--storage-verbose") @@ -266,6 +268,7 @@ def main() -> None: parser.add_argument("--storage-mem-table-size", type=str) parser.add_argument("--storage-mem-table-stop-writes-threshold", type=int) parser.add_argument("--storage-max-concurrent-compaction", type=int) + parser.add_argument("--storage-metrics-log-interval", type=str) parser.add_argument("--storage-verbose", action="store_true") # logging options diff --git a/cmd/varlogsn/cli.go b/cmd/varlogsn/cli.go index 7f39b347d..7c1415a6a 100644 --- a/cmd/varlogsn/cli.go +++ b/cmd/varlogsn/cli.go @@ -66,6 +66,7 @@ func newStartCommand() *cli.Command { flagStorageMemTableSize.StringFlag(false, units.ToByteSizeString(storage.DefaultMemTableSize)), flagStorageMemTableStopWritesThreshold.IntFlag(false, storage.DefaultMemTableStopWritesThreshold), flagStorageMaxConcurrentCompaction.IntFlag(false, storage.DefaultMaxConcurrentCompactions), + flagStorageMetricsLogInterval, flagStorageVerbose.BoolFlag(), flagLogDir.StringFlag(false, ""), diff --git a/cmd/varlogsn/flags.go b/cmd/varlogsn/flags.go index 16decb243..573b94aa2 100644 --- a/cmd/varlogsn/flags.go +++ b/cmd/varlogsn/flags.go @@ -4,6 +4,7 @@ import ( "github.com/urfave/cli/v2" "github.com/kakao/varlog/internal/flags" + "github.com/kakao/varlog/internal/storage" "github.com/kakao/varlog/internal/storagenode" ) @@ -122,6 +123,11 @@ var ( Name: "storage-max-concurrent-compaction", Envs: []string{"STORAGE_MAX_CONCURRENT_COMPACTION"}, } + flagStorageMetricsLogInterval = &cli.DurationFlag{ + Name: "storage-metrics-log-interval", + EnvVars: []string{"STORAGE_METRICS_LOG_INTERVAL"}, + Value: storage.DefaultMetricsLogInterval, + } flagStorageVerbose = flags.FlagDesc{ Name: "storage-verbose", Envs: []string{"STORAGE_VERBOSE"}, diff --git a/cmd/varlogsn/varlogsn.go b/cmd/varlogsn/varlogsn.go index 48968f717..ebfbc7dcc 100644 --- a/cmd/varlogsn/varlogsn.go +++ b/cmd/varlogsn/varlogsn.go @@ -143,6 +143,7 @@ func start(c *cli.Context) error { storage.WithMemTableSize(c.Int(flagStorageMemTableSize.Name)), storage.WithMemTableStopWritesThreshold(c.Int(flagStorageMemTableStopWritesThreshold.Name)), storage.WithMaxConcurrentCompaction(c.Int(flagStorageMaxConcurrentCompaction.Name)), + storage.WithMetrisLogInterval(c.Duration(flagStorageMetricsLogInterval.Name)), } if c.Bool(flagStorageDisableWAL.Name) { storageOpts = append(storageOpts, storage.WithoutWAL()) diff --git a/internal/storage/config.go b/internal/storage/config.go index fcebf45da..c28a2c20b 100644 --- a/internal/storage/config.go +++ b/internal/storage/config.go @@ -2,6 +2,7 @@ package storage import ( "errors" + "time" "go.uber.org/zap" @@ -16,6 +17,7 @@ const ( DefaultMemTableSize = 4 << 20 DefaultMemTableStopWritesThreshold = 2 DefaultMaxConcurrentCompactions = 1 + DefaultMetricsLogInterval = time.Duration(0) ) type config struct { @@ -31,6 +33,7 @@ type config struct { memTableStopWritesThreshold int maxConcurrentCompaction int verbose bool + metricsLogInterval time.Duration logger *zap.Logger readOnly bool @@ -50,7 +53,8 @@ func newConfig(opts []Option) (config, error) { memTableStopWritesThreshold: DefaultMemTableStopWritesThreshold, maxConcurrentCompaction: DefaultMaxConcurrentCompactions, - logger: zap.NewNop(), + metricsLogInterval: DefaultMetricsLogInterval, + logger: zap.NewNop(), } for _, opt := range opts { opt.apply(&cfg) @@ -153,6 +157,12 @@ func WithVerboseLogging() Option { }) } +func WithMetrisLogInterval(metricsLogInterval time.Duration) Option { + return newFuncOption(func(cfg *config) { + cfg.metricsLogInterval = metricsLogInterval + }) +} + func WithLogger(logger *zap.Logger) Option { return newFuncOption(func(cfg *config) { cfg.logger = logger diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 5ad8cbe4e..8da8d689f 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -2,6 +2,8 @@ package storage import ( "errors" + "sync" + "time" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/bloom" @@ -22,6 +24,12 @@ type Storage struct { db *pebble.DB writeOpts *pebble.WriteOptions + + metricsLogger struct { + wg sync.WaitGroup + ticker *time.Ticker + stop chan struct{} + } } // New creates a new storage. @@ -61,8 +69,19 @@ func New(opts ...Option) (*Storage, error) { if cfg.verbose { el := pebble.MakeLoggingEventListener(newLogAdaptor(cfg.logger)) pebbleOpts.EventListener = &el + // BackgroundError, DiskSlow, WriteStallBegin, WriteStallEnd + pebbleOpts.EventListener.CompactionBegin = nil + pebbleOpts.EventListener.CompactionEnd = nil + pebbleOpts.EventListener.FlushBegin = nil + pebbleOpts.EventListener.FlushEnd = nil + pebbleOpts.EventListener.FormatUpgrade = nil + pebbleOpts.EventListener.ManifestCreated = nil + pebbleOpts.EventListener.ManifestDeleted = nil + pebbleOpts.EventListener.TableCreated = nil pebbleOpts.EventListener.TableDeleted = nil pebbleOpts.EventListener.TableIngested = nil + pebbleOpts.EventListener.TableStatsLoaded = nil + pebbleOpts.EventListener.TableValidated = nil pebbleOpts.EventListener.WALCreated = nil pebbleOpts.EventListener.WALDeleted = nil } @@ -75,11 +94,14 @@ func New(opts ...Option) (*Storage, error) { if err != nil { return nil, err } - return &Storage{ + + stg := &Storage{ config: cfg, db: db, writeOpts: &pebble.WriteOptions{Sync: cfg.sync}, - }, nil + } + stg.startMetricsLogger() + return stg, nil } // NewWriteBatch creates a batch for write operations. @@ -251,10 +273,40 @@ func (s *Storage) DiskUsage() uint64 { return s.db.Metrics().DiskSpaceUsage() } +func (s *Storage) startMetricsLogger() { + if s.metricsLogInterval <= 0 { + return + } + s.metricsLogger.stop = make(chan struct{}) + s.metricsLogger.ticker = time.NewTicker(s.metricsLogInterval) + s.metricsLogger.wg.Add(1) + go func() { + defer s.metricsLogger.wg.Done() + for { + select { + case <-s.metricsLogger.ticker.C: + s.logger.Info("\n" + s.db.Metrics().String()) + case <-s.metricsLogger.stop: + return + } + } + }() +} + +func (s *Storage) stopMetricsLogger() { + if s.metricsLogInterval <= 0 { + return + } + s.metricsLogger.ticker.Stop() + close(s.metricsLogger.stop) + s.metricsLogger.wg.Wait() +} + // Close closes the storage. func (s *Storage) Close() (err error) { if !s.readOnly { err = s.db.Flush() } + s.stopMetricsLogger() return multierr.Append(err, s.db.Close()) }