Skip to content

Commit

Permalink
Adding latency injector option to -dev mode for storage operations (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chrishoffman authored Sep 11, 2017
1 parent 157f2a7 commit 09f2725
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 13 deletions.
12 changes: 11 additions & 1 deletion command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,15 @@ func (c *ServerCommand) Run(args []string) int {
var dev, verifyOnly, devHA, devTransactional, devLeasedGeneric, devThreeNode bool
var configPath []string
var logLevel, devRootTokenID, devListenAddress, devPluginDir string
var devLatency, devLatencyJitter int
flags := c.Meta.FlagSet("server", meta.FlagSetDefault)
flags.BoolVar(&dev, "dev", false, "")
flags.StringVar(&devRootTokenID, "dev-root-token-id", "", "")
flags.StringVar(&devListenAddress, "dev-listen-address", "", "")
flags.StringVar(&devPluginDir, "dev-plugin-dir", "", "")
flags.StringVar(&logLevel, "log-level", "info", "")
flags.IntVar(&devLatency, "dev-latency", 0, "")
flags.IntVar(&devLatencyJitter, "dev-latency-jitter", 20, "")
flags.BoolVar(&verifyOnly, "verify-only", false, "")
flags.BoolVar(&devHA, "dev-ha", false, "")
flags.BoolVar(&devTransactional, "dev-transactional", false, "")
Expand Down Expand Up @@ -266,7 +269,14 @@ func (c *ServerCommand) Run(args []string) int {
if devPluginDir != "" {
coreConfig.PluginDirectory = devPluginDir
}

if devLatency > 0 {
injectLatency := time.Duration(devLatency) * time.Millisecond
if _, txnOK := backend.(physical.Transactional); txnOK {
coreConfig.Physical = physical.NewTransactionalLatencyInjector(backend, injectLatency, devLatencyJitter, c.logger)
} else {
coreConfig.Physical = physical.NewLatencyInjector(backend, injectLatency, devLatencyJitter, c.logger)
}
}
}

if devThreeNode {
Expand Down
90 changes: 90 additions & 0 deletions physical/latency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package physical

import (
"math/rand"
"time"

log "github.com/mgutz/logxi/v1"
)

const (
// DefaultJitterPercent is used if no cache size is specified for NewCache
DefaultJitterPercent = 20
)

// LatencyInjector is used to add latency into underlying physical requests
type LatencyInjector struct {
backend Backend
latency time.Duration
jitterPercent int
random *rand.Rand
}

// TransactionalLatencyInjector is the transactional version of the latency
// injector
type TransactionalLatencyInjector struct {
*LatencyInjector
Transactional
}

// NewLatencyInjector returns a wrapped physical backend to simulate latency
func NewLatencyInjector(b Backend, latency time.Duration, jitter int, logger log.Logger) *LatencyInjector {
if jitter < 0 || jitter > 100 {
jitter = DefaultJitterPercent
}
logger.Info("physical/latency: creating latency injector")

return &LatencyInjector{
backend: b,
latency: latency,
jitterPercent: jitter,
random: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))),
}
}

// NewTransactionalLatencyInjector creates a new transactional LatencyInjector
func NewTransactionalLatencyInjector(b Backend, latency time.Duration, jitter int, logger log.Logger) *TransactionalLatencyInjector {
return &TransactionalLatencyInjector{
LatencyInjector: NewLatencyInjector(b, latency, jitter, logger),
Transactional: b.(Transactional),
}
}

func (l *LatencyInjector) addLatency() {
// Calculate a value between 1 +- jitter%
min := 100 - l.jitterPercent
max := 100 + l.jitterPercent
percent := l.random.Intn(max-min) + min
latencyDuration := time.Duration(int(l.latency) * percent / 100)
time.Sleep(latencyDuration)
}

// Put is a latent put request
func (l *LatencyInjector) Put(entry *Entry) error {
l.addLatency()
return l.backend.Put(entry)
}

// Get is a latent get request
func (l *LatencyInjector) Get(key string) (*Entry, error) {
l.addLatency()
return l.backend.Get(key)
}

// Delete is a latent delete request
func (l *LatencyInjector) Delete(key string) error {
l.addLatency()
return l.backend.Delete(key)
}

// List is a latent list request
func (l *LatencyInjector) List(prefix string) ([]string, error) {
l.addLatency()
return l.backend.List(prefix)
}

// Transaction is a latent transaction request
func (l *TransactionalLatencyInjector) Transaction(txns []TxnEntry) error {
l.addLatency()
return l.Transactional.Transaction(txns)
}
13 changes: 4 additions & 9 deletions vault/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *Core) setupExpiration() error {
c.logger.Error("expiration: error shutting down core: %v", err)
}
}
go c.expiration.Restore(errorFunc, 0)
go c.expiration.Restore(errorFunc)

return nil
}
Expand Down Expand Up @@ -268,7 +268,7 @@ func (m *ExpirationManager) Tidy() error {

// Restore is used to recover the lease states when starting.
// This is used after starting the vault.
func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) (retErr error) {
func (m *ExpirationManager) Restore(errorFunc func()) (retErr error) {
defer func() {
// Turn off restore mode. We can do this safely without the lock because
// if restore mode finished successfully, restore mode was already
Expand Down Expand Up @@ -322,7 +322,7 @@ func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) (
return
}

err := m.processRestore(leaseID, loadDelay)
err := m.processRestore(leaseID)
if err != nil {
errs <- err
continue
Expand Down Expand Up @@ -398,7 +398,7 @@ func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) (

// processRestore takes a lease and restores it in the expiration manager if it has
// not already been seen
func (m *ExpirationManager) processRestore(leaseID string, loadDelay time.Duration) error {
func (m *ExpirationManager) processRestore(leaseID string) error {
m.restoreRequestLock.RLock()
defer m.restoreRequestLock.RUnlock()

Expand All @@ -415,11 +415,6 @@ func (m *ExpirationManager) processRestore(leaseID string, loadDelay time.Durati
return nil
}

// Useful for testing to add latency to all load requests
if loadDelay > 0 {
time.Sleep(loadDelay)
}

// Load lease and restore expiration timer
_, err := m.loadEntryInternal(leaseID, true, false)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions vault/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestExpiration_Tidy(t *testing.T) {
var err error

exp := mockExpiration(t)
if err := exp.Restore(nil, 0); err != nil {
if err := exp.Restore(nil); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -341,7 +341,7 @@ func benchmarkExpirationBackend(b *testing.B, physicalBackend physical.Backend,

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = exp.Restore(nil, 0)
err = exp.Restore(nil)
// Restore
if err != nil {
b.Fatalf("err: %v", err)
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestExpiration_Restore(t *testing.T) {
}

// Restore
err = exp.Restore(nil, 0)
err = exp.Restore(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down

0 comments on commit 09f2725

Please sign in to comment.