Skip to content

Commit

Permalink
Merge pull request #1666 from hashicorp/f-consul-lib
Browse files Browse the repository at this point in the history
Refactor various utility functions into a consul/lib package
  • Loading branch information
sean- committed Feb 2, 2016
2 parents 3215b87 + 8cb5b4d commit eb27a02
Show file tree
Hide file tree
Showing 31 changed files with 206 additions and 200 deletions.
5 changes: 3 additions & 2 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
Expand Down Expand Up @@ -600,8 +601,8 @@ func (a *Agent) sendCoordinate() {
for {
rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin
intv := rateScaledInterval(rate, min, len(a.LANMembers()))
intv = intv + randomStagger(intv)
intv := lib.RateScaledInterval(rate, min, len(a.LANMembers()))
intv = intv + lib.RandomStagger(intv)

select {
case <-time.After(intv):
Expand Down
9 changes: 5 additions & 4 deletions command/agent/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-cleanhttp"
)

Expand Down Expand Up @@ -131,7 +132,7 @@ func (c *CheckMonitor) Stop() {
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckMonitor) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval)
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s", initialPauseTime, c.Script)
next := time.After(initialPauseTime)
for {
Expand Down Expand Up @@ -366,7 +367,7 @@ func (c *CheckHTTP) Stop() {
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckHTTP) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval)
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first HTTP request of %s", initialPauseTime, c.HTTP)
next := time.After(initialPauseTime)
for {
Expand Down Expand Up @@ -482,7 +483,7 @@ func (c *CheckTCP) Stop() {
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckTCP) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval)
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first socket connection of %s", initialPauseTime, c.TCP)
next := time.After(initialPauseTime)
for {
Expand Down Expand Up @@ -580,7 +581,7 @@ func (c *CheckDocker) Stop() {
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckDocker) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(c.Interval)
initialPauseTime := lib.RandomStagger(c.Interval)
c.Logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s -c %s in container %s", initialPauseTime, c.Shell, c.Script, c.DockerContainerID)
next := time.After(initialPauseTime)
for {
Expand Down
3 changes: 2 additions & 1 deletion command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/armon/go-metrics"
"github.com/armon/go-metrics/datadog"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-checkpoint"
"github.com/hashicorp/go-reap"
Expand Down Expand Up @@ -424,7 +425,7 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log

// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(randomStagger(30 * time.Second))
time.Sleep(lib.RandomStagger(30 * time.Second))
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
Expand Down
3 changes: 2 additions & 1 deletion command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/watch"
"github.com/mitchellh/mapstructure"
)
Expand Down Expand Up @@ -634,7 +635,7 @@ func DecodeConfig(r io.Reader) (*Config, error) {
allowedKeys := []string{"service", "services", "check", "checks"}
var unused []string
for _, field := range md.Unused {
if !strContains(allowedKeys, field) {
if !lib.StrContains(allowedKeys, field) {
unused = append(unused, field)
}
}
Expand Down
4 changes: 3 additions & 1 deletion command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"strings"
"testing"
"time"

"github.com/hashicorp/consul/lib"
)

func TestConfigEncryptBytes(t *testing.T) {
Expand Down Expand Up @@ -1103,7 +1105,7 @@ func TestDecodeConfig_Service(t *testing.T) {
t.Fatalf("bad: %v", serv)
}

if !strContains(serv.Tags, "master") {
if !lib.StrContains(serv.Tags, "master") {
t.Fatalf("bad: %v", serv)
}

Expand Down
9 changes: 5 additions & 4 deletions command/agent/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
)

const (
Expand Down Expand Up @@ -252,7 +253,7 @@ func (l *localState) UpdateCheck(checkID, status, output string) {
if l.config.CheckUpdateInterval > 0 && check.Status == status {
check.Output = output
if _, ok := l.deferCheck[checkID]; !ok {
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + randomStagger(l.config.CheckUpdateInterval)
intv := time.Duration(uint64(l.config.CheckUpdateInterval)/2) + lib.RandomStagger(l.config.CheckUpdateInterval)
deferSync := time.AfterFunc(intv, func() {
l.Lock()
if _, ok := l.checkStatus[checkID]; ok {
Expand Down Expand Up @@ -302,11 +303,11 @@ SYNC:
case <-l.consulCh:
// Stagger the retry on leader election, avoid a thundering heard
select {
case <-time.After(randomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))):
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))):
case <-shutdownCh:
return
}
case <-time.After(syncRetryIntv + randomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))):
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))):
case <-shutdownCh:
return
}
Expand All @@ -317,7 +318,7 @@ SYNC:

// Schedule the next full sync, with a random stagger
aeIntv := aeScale(l.config.AEInterval, len(l.iface.LANMembers()))
aeIntv = aeIntv + randomStagger(aeIntv)
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
aeTimer := time.After(aeIntv)

// Wait for sync events
Expand Down
10 changes: 10 additions & 0 deletions command/agent/remote_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@ package agent
import (
"bytes"
"encoding/json"
"fmt"
"os"
"reflect"
"testing"
"time"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/go-uuid"
)

func generateUUID() (ret string) {
var err error
if ret, err = uuid.GenerateUUID(); err != nil {
panic(fmt.Sprintf("Unable to generate a UUID, %v", err))
}
return ret
}

func TestRexecWriter(t *testing.T) {
writer := &rexecWriter{
BufCh: make(chan []byte, 16),
Expand Down
6 changes: 5 additions & 1 deletion command/agent/user_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"regexp"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-uuid"
)

const (
Expand Down Expand Up @@ -78,7 +79,10 @@ func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
}

// Format message
params.ID = generateUUID()
var err error
if params.ID, err = uuid.GenerateUUID(); err != nil {
return fmt.Errorf("UUID generation failed: %v", err)
}
params.Version = userEventMaxVersion
payload, err := encodeMsgPack(&params)
if err != nil {
Expand Down
43 changes: 0 additions & 43 deletions command/agent/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package agent
import (
"bytes"
"crypto/md5"
crand "crypto/rand"
"fmt"
"math"
"math/rand"
"os"
"os/exec"
"os/user"
Expand Down Expand Up @@ -39,32 +37,6 @@ func aeScale(interval time.Duration, n int) time.Duration {
return time.Duration(multiplier) * interval
}

// rateScaledInterval is used to choose an interval to perform an action in order
// to target an aggregate number of actions per second across the whole cluster.
func rateScaledInterval(rate float64, min time.Duration, n int) time.Duration {
interval := time.Duration(float64(time.Second) * float64(n) / rate)
if interval < min {
return min
}

return interval
}

// Returns a random stagger interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}

// strContains checks if a list contains a string
func strContains(l []string, s string) bool {
for _, v := range l {
if v == s {
return true
}
}
return false
}

// ExecScript returns a command to execute a script
func ExecScript(script string) (*exec.Cmd, error) {
var shell, flag string
Expand All @@ -82,21 +54,6 @@ func ExecScript(script string) (*exec.Cmd, error) {
return cmd, nil
}

// generateUUID is used to generate a random UUID
func generateUUID() string {
buf := make([]byte, 16)
if _, err := crand.Read(buf); err != nil {
panic(fmt.Errorf("failed to read random bytes: %v", err))
}

return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
buf[0:4],
buf[4:6],
buf[6:8],
buf[8:10],
buf[10:16])
}

// decodeMsgPack is used to decode a MsgPack encoded object
func decodeMsgPack(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
Expand Down
33 changes: 0 additions & 33 deletions command/agent/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,39 +24,6 @@ func TestAEScale(t *testing.T) {
}
}

func TestRateScaledInterval(t *testing.T) {
min := 1 * time.Second
rate := 200.0
if v := rateScaledInterval(rate, min, 0); v != min {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 100); v != min {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 200); v != 1*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 1000); v != 5*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 5000); v != 25*time.Second {
t.Fatalf("Bad: %v", v)
}
if v := rateScaledInterval(rate, min, 10000); v != 50*time.Second {
t.Fatalf("Bad: %v", v)
}
}

func TestRandomStagger(t *testing.T) {
intv := time.Minute
for i := 0; i < 10; i++ {
stagger := randomStagger(intv)
if stagger < 0 || stagger >= intv {
t.Fatalf("Bad: %v", stagger)
}
}
}

func TestStringHash(t *testing.T) {
in := "hello world"
expected := "5eb63bbbe01eeed093cb22bb8f5acdc3"
Expand Down
7 changes: 6 additions & 1 deletion consul/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-uuid"
)

// ACL endpoint is used to manipulate ACLs
Expand Down Expand Up @@ -62,7 +63,11 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
if args.ACL.ID == "" {
state := a.srv.fsm.State()
for {
args.ACL.ID = generateUUID()
if args.ACL.ID, err = uuid.GenerateUUID(); err != nil {
a.srv.logger.Printf("[ERR] consul.acl: UUID generation failed: %v", err)
return err
}

_, acl, err := state.ACLGet(args.ACL.ID)
if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion consul/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
Expand Down Expand Up @@ -436,7 +437,7 @@ func TestACLEndpoint_List(t *testing.T) {
if s.ID == anonymousToken || s.ID == "root" {
continue
}
if !strContains(ids, s.ID) {
if !lib.StrContains(ids, s.ID) {
t.Fatalf("bad: %v", s)
}
if s.Name != "User token" {
Expand Down
3 changes: 2 additions & 1 deletion consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
Expand Down Expand Up @@ -978,7 +979,7 @@ func TestCatalogNodeServices(t *testing.T) {
t.Fatalf("bad: %v", out)
}
services := out.NodeServices.Services
if !strContains(services["db"].Tags, "primary") || services["db"].Port != 5000 {
if !lib.StrContains(services["db"].Tags, "primary") || services["db"].Port != 5000 {
t.Fatalf("bad: %v", out)
}
if len(services["web"].Tags) != 0 || services["web"].Port != 80 {
Expand Down
Loading

0 comments on commit eb27a02

Please sign in to comment.