From 09f272510f9158df9737f3ed767cc3340f983a60 Mon Sep 17 00:00:00 2001 From: Chris Hoffman Date: Mon, 11 Sep 2017 14:49:08 -0400 Subject: [PATCH] Adding latency injector option to -dev mode for storage operations (#3289) --- command/server.go | 12 +++++- physical/latency.go | 90 ++++++++++++++++++++++++++++++++++++++++ vault/expiration.go | 13 ++---- vault/expiration_test.go | 6 +-- 4 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 physical/latency.go diff --git a/command/server.go b/command/server.go index 0b5f8081009c..e5101fd124da 100644 --- a/command/server.go +++ b/command/server.go @@ -75,12 +75,15 @@ func (c *ServerCommand) Run(args []string) int { var dev, verifyOnly, devHA, devTransactional, devLeasedGeneric, devThreeNode bool var configPath []string var logLevel, devRootTokenID, devListenAddress, devPluginDir string + var devLatency, devLatencyJitter int flags := c.Meta.FlagSet("server", meta.FlagSetDefault) flags.BoolVar(&dev, "dev", false, "") flags.StringVar(&devRootTokenID, "dev-root-token-id", "", "") flags.StringVar(&devListenAddress, "dev-listen-address", "", "") flags.StringVar(&devPluginDir, "dev-plugin-dir", "", "") flags.StringVar(&logLevel, "log-level", "info", "") + flags.IntVar(&devLatency, "dev-latency", 0, "") + flags.IntVar(&devLatencyJitter, "dev-latency-jitter", 20, "") flags.BoolVar(&verifyOnly, "verify-only", false, "") flags.BoolVar(&devHA, "dev-ha", false, "") flags.BoolVar(&devTransactional, "dev-transactional", false, "") @@ -266,7 +269,14 @@ func (c *ServerCommand) Run(args []string) int { if devPluginDir != "" { coreConfig.PluginDirectory = devPluginDir } - + if devLatency > 0 { + injectLatency := time.Duration(devLatency) * time.Millisecond + if _, txnOK := backend.(physical.Transactional); txnOK { + coreConfig.Physical = physical.NewTransactionalLatencyInjector(backend, injectLatency, devLatencyJitter, c.logger) + } else { + coreConfig.Physical = physical.NewLatencyInjector(backend, injectLatency, devLatencyJitter, c.logger) + } + } } if devThreeNode { diff --git a/physical/latency.go b/physical/latency.go new file mode 100644 index 000000000000..3253036da05e --- /dev/null +++ b/physical/latency.go @@ -0,0 +1,90 @@ +package physical + +import ( + "math/rand" + "time" + + log "github.com/mgutz/logxi/v1" +) + +const ( + // DefaultJitterPercent is used if no cache size is specified for NewCache + DefaultJitterPercent = 20 +) + +// LatencyInjector is used to add latency into underlying physical requests +type LatencyInjector struct { + backend Backend + latency time.Duration + jitterPercent int + random *rand.Rand +} + +// TransactionalLatencyInjector is the transactional version of the latency +// injector +type TransactionalLatencyInjector struct { + *LatencyInjector + Transactional +} + +// NewLatencyInjector returns a wrapped physical backend to simulate latency +func NewLatencyInjector(b Backend, latency time.Duration, jitter int, logger log.Logger) *LatencyInjector { + if jitter < 0 || jitter > 100 { + jitter = DefaultJitterPercent + } + logger.Info("physical/latency: creating latency injector") + + return &LatencyInjector{ + backend: b, + latency: latency, + jitterPercent: jitter, + random: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))), + } +} + +// NewTransactionalLatencyInjector creates a new transactional LatencyInjector +func NewTransactionalLatencyInjector(b Backend, latency time.Duration, jitter int, logger log.Logger) *TransactionalLatencyInjector { + return &TransactionalLatencyInjector{ + LatencyInjector: NewLatencyInjector(b, latency, jitter, logger), + Transactional: b.(Transactional), + } +} + +func (l *LatencyInjector) addLatency() { + // Calculate a value between 1 +- jitter% + min := 100 - l.jitterPercent + max := 100 + l.jitterPercent + percent := l.random.Intn(max-min) + min + latencyDuration := time.Duration(int(l.latency) * percent / 100) + time.Sleep(latencyDuration) +} + +// Put is a latent put request +func (l *LatencyInjector) Put(entry *Entry) error { + l.addLatency() + return l.backend.Put(entry) +} + +// Get is a latent get request +func (l *LatencyInjector) Get(key string) (*Entry, error) { + l.addLatency() + return l.backend.Get(key) +} + +// Delete is a latent delete request +func (l *LatencyInjector) Delete(key string) error { + l.addLatency() + return l.backend.Delete(key) +} + +// List is a latent list request +func (l *LatencyInjector) List(prefix string) ([]string, error) { + l.addLatency() + return l.backend.List(prefix) +} + +// Transaction is a latent transaction request +func (l *TransactionalLatencyInjector) Transaction(txns []TxnEntry) error { + l.addLatency() + return l.Transactional.Transaction(txns) +} diff --git a/vault/expiration.go b/vault/expiration.go index 41a697291749..628df8e973b0 100644 --- a/vault/expiration.go +++ b/vault/expiration.go @@ -117,7 +117,7 @@ func (c *Core) setupExpiration() error { c.logger.Error("expiration: error shutting down core: %v", err) } } - go c.expiration.Restore(errorFunc, 0) + go c.expiration.Restore(errorFunc) return nil } @@ -268,7 +268,7 @@ func (m *ExpirationManager) Tidy() error { // Restore is used to recover the lease states when starting. // This is used after starting the vault. -func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) (retErr error) { +func (m *ExpirationManager) Restore(errorFunc func()) (retErr error) { defer func() { // Turn off restore mode. We can do this safely without the lock because // if restore mode finished successfully, restore mode was already @@ -322,7 +322,7 @@ func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) ( return } - err := m.processRestore(leaseID, loadDelay) + err := m.processRestore(leaseID) if err != nil { errs <- err continue @@ -398,7 +398,7 @@ func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) ( // processRestore takes a lease and restores it in the expiration manager if it has // not already been seen -func (m *ExpirationManager) processRestore(leaseID string, loadDelay time.Duration) error { +func (m *ExpirationManager) processRestore(leaseID string) error { m.restoreRequestLock.RLock() defer m.restoreRequestLock.RUnlock() @@ -415,11 +415,6 @@ func (m *ExpirationManager) processRestore(leaseID string, loadDelay time.Durati return nil } - // Useful for testing to add latency to all load requests - if loadDelay > 0 { - time.Sleep(loadDelay) - } - // Load lease and restore expiration timer _, err := m.loadEntryInternal(leaseID, true, false) if err != nil { diff --git a/vault/expiration_test.go b/vault/expiration_test.go index 4df5fb42699f..144bd16b045f 100644 --- a/vault/expiration_test.go +++ b/vault/expiration_test.go @@ -37,7 +37,7 @@ func TestExpiration_Tidy(t *testing.T) { var err error exp := mockExpiration(t) - if err := exp.Restore(nil, 0); err != nil { + if err := exp.Restore(nil); err != nil { t.Fatal(err) } @@ -341,7 +341,7 @@ func benchmarkExpirationBackend(b *testing.B, physicalBackend physical.Backend, b.ResetTimer() for i := 0; i < b.N; i++ { - err = exp.Restore(nil, 0) + err = exp.Restore(nil) // Restore if err != nil { b.Fatalf("err: %v", err) @@ -399,7 +399,7 @@ func TestExpiration_Restore(t *testing.T) { } // Restore - err = exp.Restore(nil, 0) + err = exp.Restore(nil) if err != nil { t.Fatalf("err: %v", err) }