Skip to content

Commit

Permalink
Merge pull request #2467 from hashicorp/f-consul-refactor
Browse files Browse the repository at this point in the history
Refactor Consul Syncer into new ServiceClient
  • Loading branch information
schmichael authored Apr 19, 2017
2 parents 55965cb + 58430bf commit 53eb407
Show file tree
Hide file tree
Showing 54 changed files with 3,228 additions and 2,567 deletions.
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type ServiceCheck struct {
Interval time.Duration
Timeout time.Duration
InitialStatus string `mapstructure:"initial_status"`
TLSSkipVerify bool `mapstructure:"tls_skip_verify"`
}

// The Service model represents a Consul service definition
Expand Down
36 changes: 20 additions & 16 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type AllocRunner struct {

updateCh chan *structs.Allocation

vaultClient vaultclient.VaultClient
vaultClient vaultclient.VaultClient
consulClient ConsulServiceAPI

otherAllocDir *allocdir.AllocDir

Expand Down Expand Up @@ -96,20 +97,23 @@ type allocRunnerState struct {

// NewAllocRunner is used to create a new allocation context
func NewAllocRunner(logger *log.Logger, config *config.Config, updater AllocStateUpdater,
alloc *structs.Allocation, vaultClient vaultclient.VaultClient) *AllocRunner {
alloc *structs.Allocation, vaultClient vaultclient.VaultClient,
consulClient ConsulServiceAPI) *AllocRunner {

ar := &AllocRunner{
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
config: config,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
}
return ar
}
Expand Down Expand Up @@ -174,7 +178,7 @@ func (r *AllocRunner) RestoreState() error {
}

task := &structs.Task{Name: name}
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, td, r.Alloc(), task, r.vaultClient, r.consulClient)
r.tasks[name] = tr

// Skip tasks in terminal states.
Expand Down Expand Up @@ -512,7 +516,7 @@ func (r *AllocRunner) Run() {
taskdir := r.allocDir.NewTaskDir(task.Name)
r.allocDirLock.Unlock()

tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient)
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, taskdir, r.Alloc(), task.Copy(), r.vaultClient, r.consulClient)
r.tasks[task.Name] = tr
tr.MarkReceived()

Expand Down
10 changes: 6 additions & 4 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func testAllocRunnerFromAlloc(alloc *structs.Allocation, restarts bool) (*MockAl
alloc.Job.Type = structs.JobTypeBatch
}
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient)
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, newMockConsulServiceClient())
return upd, ar
}

Expand Down Expand Up @@ -323,7 +323,8 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
ar2 := NewAllocRunner(l2, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient,
ar.consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -415,7 +416,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {

// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient)
&structs.Allocation{ID: ar.alloc.ID}, ar.vaultClient, ar.consulClient)
ar2.logger = prefixedTestLogger("ar2: ")
err = ar2.RestoreState()
if err != nil {
Expand Down Expand Up @@ -573,7 +574,8 @@ func TestAllocRunner_RestoreOldState(t *testing.T) {
*alloc.Job.LookupTaskGroup(alloc.TaskGroup).RestartPolicy = structs.RestartPolicy{Attempts: 0}
alloc.Job.Type = structs.JobTypeBatch
vclient := vaultclient.NewMockVaultClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient)
cclient := newMockConsulServiceClient()
ar := NewAllocRunner(logger, conf, upd.Update, alloc, vclient, cclient)
defer ar.Destroy()

// RestoreState should fail on the task state since we only test the
Expand Down
8 changes: 6 additions & 2 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ var (
// included in snapshots.
SharedDataDir = "data"

// TmpDirName is the name of the temporary directory in each alloc and
// task.
TmpDirName = "tmp"

// The set of directories that exist inside eache shared alloc directory.
SharedAllocDirs = []string{LogDirName, "tmp", SharedDataDir}
SharedAllocDirs = []string{LogDirName, TmpDirName, SharedDataDir}

// The name of the directory that exists inside each task directory
// regardless of driver.
Expand All @@ -46,7 +50,7 @@ var (
TaskSecrets = "secrets"

// TaskDirs is the set of directories created in each tasks directory.
TaskDirs = map[string]os.FileMode{"tmp": os.ModeSticky | 0777}
TaskDirs = map[string]os.FileMode{TmpDirName: os.ModeSticky | 0777}
)

type AllocDir struct {
Expand Down
77 changes: 13 additions & 64 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ const (
// datacenters looking for the Nomad server service.
datacenterQueryLimit = 9

// consulReaperIntv is the interval at which the Consul reaper will
// run.
consulReaperIntv = 5 * time.Second

// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
registerRetryIntv = 15 * time.Second
Expand Down Expand Up @@ -142,8 +138,12 @@ type Client struct {
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation

// consulSyncer advertises this Nomad Agent with Consul
consulSyncer *consul.Syncer
// consulService is Nomad's custom Consul client for managing services
// and checks.
consulService ConsulServiceAPI

// consulCatalog is the subset of Consul's Catalog API Nomad uses.
consulCatalog consul.CatalogAPI

// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
Expand Down Expand Up @@ -196,7 +196,7 @@ var (
)

// NewClient is used to create a new client from the given configuration
func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) {
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService ConsulServiceAPI, logger *log.Logger) (*Client, error) {
// Create the tls wrapper
var tlsWrap tlsutil.RegionWrapper
if cfg.TLSConfig.EnableRPC {
Expand All @@ -210,7 +210,8 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
// Create the client
c := &Client{
config: cfg,
consulSyncer: consulSyncer,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
logger: logger,
Expand Down Expand Up @@ -285,9 +286,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
}
}

// Start Consul reaper
go c.consulReaper()

// Setup the vault client for token and secret renewals
if err := c.setupVaultClient(); err != nil {
return nil, fmt.Errorf("failed to setup vault client: %v", err)
Expand Down Expand Up @@ -606,7 +604,7 @@ func (c *Client) restoreState() error {
id := entry.Name()
alloc := &structs.Allocation{ID: id}
c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
c.configLock.RUnlock()
c.allocLock.Lock()
c.allocs[id] = ar
Expand Down Expand Up @@ -1894,7 +1892,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, prevAllocDir *allocdir.Allo
defer c.allocLock.Unlock()

c.configLock.RLock()
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient)
ar := NewAllocRunner(c.logger, c.configCopy, c.updateAllocStatus, alloc, c.vaultClient, c.consulService)
ar.SetPreviousAllocDir(prevAllocDir)
c.configLock.RUnlock()
go ar.Run()
Expand Down Expand Up @@ -2047,8 +2045,7 @@ func (c *Client) consulDiscoveryImpl() error {
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()

consulCatalog := c.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
dcs, err := c.consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
Expand Down Expand Up @@ -2084,7 +2081,7 @@ DISCOLOOP:
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
consulServices, _, err := c.consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err))
continue
Expand Down Expand Up @@ -2143,54 +2140,6 @@ DISCOLOOP:
}
}

// consulReaper periodically reaps unmatched domains from Consul. Intended to
// be called in its own goroutine. See consulReaperIntv for interval.
func (c *Client) consulReaper() {
ticker := time.NewTicker(consulReaperIntv)
defer ticker.Stop()
lastok := true
for {
select {
case <-ticker.C:
if err := c.consulReaperImpl(); err != nil {
if lastok {
c.logger.Printf("[ERR] client.consul: error reaping services in consul: %v", err)
lastok = false
}
} else {
lastok = true
}
case <-c.shutdownCh:
return
}
}
}

// consulReaperImpl reaps unmatched domains from Consul.
func (c *Client) consulReaperImpl() error {
const estInitialExecutorDomains = 8

// Create the domains to keep and add the server and client
domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
domains[0] = consul.ServerDomain
domains[1] = consul.ClientDomain

for allocID, ar := range c.getAllocRunners() {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
// Only keep running tasks
if taskState.State == structs.TaskStateRunning {
d := consul.NewExecutorDomain(allocID, taskName)
domains = append(domains, d)
}
}
}

return c.consulSyncer.ReapUnmatched(domains)
}

// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
// Start collecting host stats right away and then keep collecting every
Expand Down
30 changes: 10 additions & 20 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,11 @@ func testServer(t *testing.T, cb func(*nomad.Config)) (*nomad.Server, string) {
cb(config)
}

shutdownCh := make(chan struct{})
logger := log.New(config.LogOutput, "", log.LstdFlags)
consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
catalog := consul.NewMockCatalog(logger)

// Create server
server, err := nomad.NewServer(config, consulSyncer, logger)
server, err := nomad.NewServer(config, catalog, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -105,14 +101,11 @@ func testClient(t *testing.T, cb func(c *config.Config)) *Client {
cb(conf)
}

shutdownCh := make(chan struct{})
consulSyncer, err := consul.NewSyncer(conf.ConsulConfig, shutdownCh, log.New(os.Stderr, "", log.LstdFlags))
if err != nil {
t.Fatalf("err: %v", err)
}

logger := log.New(conf.LogOutput, "", log.LstdFlags)
client, err := NewClient(conf, consulSyncer, logger)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
client, err := NewClient(conf, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -754,14 +747,11 @@ func TestClient_SaveRestoreState(t *testing.T) {
}

// Create a new client
shutdownCh := make(chan struct{})
logger := log.New(c1.config.LogOutput, "", log.LstdFlags)
consulSyncer, err := consul.NewSyncer(c1.config.ConsulConfig, shutdownCh, logger)
if err != nil {
t.Fatalf("err: %v", err)
}

c2, err := NewClient(c1.config, consulSyncer, logger)
catalog := consul.NewMockCatalog(logger)
mockService := newMockConsulServiceClient()
mockService.logger = logger
c2, err := NewClient(c1.config, catalog, mockService, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
14 changes: 14 additions & 0 deletions client/consul.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package client

import (
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
)

// ConsulServiceAPI is the interface the Nomad Client uses to register and
// remove services and checks from Consul.
type ConsulServiceAPI interface {
RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor) error
RemoveTask(allocID string, task *structs.Task)
UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor) error
}
Loading

0 comments on commit 53eb407

Please sign in to comment.