Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding latency injector option to -dev mode for storage operations #3289

Merged
merged 3 commits into from
Sep 11, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,15 @@ func (c *ServerCommand) Run(args []string) int {
var dev, verifyOnly, devHA, devTransactional, devLeasedGeneric, devThreeNode bool
var configPath []string
var logLevel, devRootTokenID, devListenAddress, devPluginDir string
var devLatency, devLatencyJitter int
flags := c.Meta.FlagSet("server", meta.FlagSetDefault)
flags.BoolVar(&dev, "dev", false, "")
flags.StringVar(&devRootTokenID, "dev-root-token-id", "", "")
flags.StringVar(&devListenAddress, "dev-listen-address", "", "")
flags.StringVar(&devPluginDir, "dev-plugin-dir", "", "")
flags.StringVar(&logLevel, "log-level", "info", "")
flags.IntVar(&devLatency, "dev-latency", 0, "")
flags.IntVar(&devLatencyJitter, "dev-latency-jitter", 20, "")
flags.BoolVar(&verifyOnly, "verify-only", false, "")
flags.BoolVar(&devHA, "dev-ha", false, "")
flags.BoolVar(&devTransactional, "dev-transactional", false, "")
Expand Down Expand Up @@ -266,7 +269,14 @@ func (c *ServerCommand) Run(args []string) int {
if devPluginDir != "" {
coreConfig.PluginDirectory = devPluginDir
}

if devLatency > 0 {
injectLatency := time.Duration(devLatency) * time.Millisecond
if _, txnOK := backend.(physical.Transactional); txnOK {
coreConfig.Physical = physical.NewTransactionalLatencyInjector(backend, injectLatency, devLatencyJitter, c.logger)
} else {
coreConfig.Physical = physical.NewLatencyInjector(backend, injectLatency, devLatencyJitter, c.logger)
}
}
}

if devThreeNode {
Expand Down
90 changes: 90 additions & 0 deletions physical/latency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package physical

import (
"math/rand"
"time"

log "github.com/mgutz/logxi/v1"
)

const (
// DefaultJitterPercent is used if no cache size is specified for NewCache
DefaultJitterPercent = 20
)

// LatencyInjector is used to add latency into underlying physical requests
type LatencyInjector struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this and (TransactionalLatencyInjector) is really an implementation of the Physical interface, should we call this LatencyBackend to keep things uniform with other physical backend implementations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a backend itself and just wraps some other backend that is why I didn't call it a backend. It is more analogous to the Cache wrapper.

backend Backend
latency time.Duration
jitterPercent int
random *rand.Rand
}

// TransactionalLatencyInjector is the transactional version of the latency
// injector
type TransactionalLatencyInjector struct {
*LatencyInjector
Transactional
}

// NewLatencyInjector returns a wrapped physical backend to simulate latency
func NewLatencyInjector(b Backend, latency time.Duration, jitter int, logger log.Logger) *LatencyInjector {
if jitter < 0 || jitter > 100 {
jitter = DefaultJitterPercent
}
logger.Info("physical/latency: creating latency injector")

return &LatencyInjector{
backend: b,
latency: latency,
jitterPercent: jitter,
random: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))),
}
}

// NewTransactionalLatencyInjector creates a new transactional LatencyInjector
func NewTransactionalLatencyInjector(b Backend, latency time.Duration, jitter int, logger log.Logger) *TransactionalLatencyInjector {
return &TransactionalLatencyInjector{
LatencyInjector: NewLatencyInjector(b, latency, jitter, logger),
Transactional: b.(Transactional),
}
}

func (l *LatencyInjector) addLatency() {
// Calculate a value between -1 and 1
min := 100 - l.jitterPercent
max := 100 + l.jitterPercent
percent := float64(l.random.Intn(max-min)+min) / 100
latencyDuration := time.Duration(int64(float64(l.latency/time.Millisecond)*percent)) * time.Millisecond
time.Sleep(latencyDuration)
}

// Put is a latent put request
func (l *LatencyInjector) Put(entry *Entry) error {
l.addLatency()
return l.backend.Put(entry)
}

// Get is a latent get request
func (l *LatencyInjector) Get(key string) (*Entry, error) {
l.addLatency()
return l.backend.Get(key)
}

// Delete is a latent delete request
func (l *LatencyInjector) Delete(key string) error {
l.addLatency()
return l.backend.Delete(key)
}

// List is a latent list request
func (l *LatencyInjector) List(prefix string) ([]string, error) {
l.addLatency()
return l.backend.List(prefix)
}

// Transaction is a latent transaction request
func (l *TransactionalLatencyInjector) Transaction(txns []TxnEntry) error {
l.addLatency()
return l.Transactional.Transaction(txns)
}
13 changes: 4 additions & 9 deletions vault/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Core) setupExpiration() error {
c.logger.Error("expiration: error shutting down core: %v", err)
}
}
go c.expiration.Restore(errorFunc, 0)
go c.expiration.Restore(errorFunc)

return nil
}
Expand Down Expand Up @@ -270,7 +270,7 @@ func (m *ExpirationManager) Tidy() error {

// Restore is used to recover the lease states when starting.
// This is used after starting the vault.
func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) (retErr error) {
func (m *ExpirationManager) Restore(errorFunc func()) (retErr error) {
defer func() {
// Turn off restore mode. We can do this safely without the lock because
// if restore mode finished successfully, restore mode was already
Expand Down Expand Up @@ -324,7 +324,7 @@ func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) (
return
}

err := m.processRestore(leaseID, loadDelay)
err := m.processRestore(leaseID)
if err != nil {
errs <- err
continue
Expand Down Expand Up @@ -400,7 +400,7 @@ func (m *ExpirationManager) Restore(errorFunc func(), loadDelay time.Duration) (

// processRestore takes a lease and restores it in the expiration manager if it has
// not already been seen
func (m *ExpirationManager) processRestore(leaseID string, loadDelay time.Duration) error {
func (m *ExpirationManager) processRestore(leaseID string) error {
m.restoreRequestLock.RLock()
defer m.restoreRequestLock.RUnlock()

Expand All @@ -417,11 +417,6 @@ func (m *ExpirationManager) processRestore(leaseID string, loadDelay time.Durati
return nil
}

// Useful for testing to add latency to all load requests
if loadDelay > 0 {
time.Sleep(loadDelay)
}

// Load lease and restore expiration timer
_, err := m.loadEntryInternal(leaseID, true, false)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions vault/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestExpiration_Tidy(t *testing.T) {
var err error

exp := mockExpiration(t)
if err := exp.Restore(nil, 0); err != nil {
if err := exp.Restore(nil); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -341,7 +341,7 @@ func benchmarkExpirationBackend(b *testing.B, physicalBackend physical.Backend,

b.ResetTimer()
for i := 0; i < b.N; i++ {
err = exp.Restore(nil, 0)
err = exp.Restore(nil)
// Restore
if err != nil {
b.Fatalf("err: %v", err)
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestExpiration_Restore(t *testing.T) {
}

// Restore
err = exp.Restore(nil, 0)
err = exp.Restore(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down