Skip to content

Commit

Permalink
Merge pull request #298 from hashicorp/f-watch
Browse files Browse the repository at this point in the history
Adding support for 'watches'
  • Loading branch information
armon committed Aug 22, 2014
2 parents d980242 + 54316d2 commit 9e13633
Show file tree
Hide file tree
Showing 24 changed files with 1,817 additions and 24 deletions.
20 changes: 7 additions & 13 deletions command/agent/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/hashicorp/consul/consul/structs"
"log"
"os/exec"
"runtime"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -106,18 +105,13 @@ func (c *CheckMonitor) run() {

// check is invoked periodically to perform the script check
func (c *CheckMonitor) check() {
// Determine the shell invocation based on OS
var shell, flag string
if runtime.GOOS == "windows" {
shell = "cmd"
flag = "/C"
} else {
shell = "/bin/sh"
flag = "-c"
}

// Create the command
cmd := exec.Command(shell, flag, c.Script)
cmd, err := ExecScript(c.Script)
if err != nil {
c.Logger.Printf("[ERR] agent: failed to setup invoke '%s': %s", c.Script, err)
c.Notify.UpdateCheck(c.CheckID, structs.HealthUnknown, err.Error())
return
}

// Collect the output
output, _ := circbuf.NewBuffer(CheckBufSize)
Expand All @@ -140,7 +134,7 @@ func (c *CheckMonitor) check() {
time.Sleep(30 * time.Second)
errCh <- fmt.Errorf("Timed out running check '%s'", c.Script)
}()
err := <-errCh
err = <-errCh

// Get the output, add a message about truncation
outputStr := string(output.Bytes())
Expand Down
61 changes: 61 additions & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
"github.com/mitchellh/cli"
Expand All @@ -37,6 +38,7 @@ type Command struct {
ShutdownCh <-chan struct{}
args []string
logFilter *logutils.LevelFilter
logOutput io.Writer
agent *Agent
rpcServer *AgentRPC
httpServer *HTTPServer
Expand Down Expand Up @@ -141,6 +143,25 @@ func (c *Command) readConfig() *Config {
return nil
}

// Compile all the watches
for _, params := range config.Watches {
// Parse the watches, excluding the handler
wp, err := watch.ParseExempt(params, []string{"handler"})
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err))
return nil
}

// Get the handler
if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err))
return nil
}

// Store the watch plan
config.WatchPlans = append(config.WatchPlans, wp)
}

// Warn if we are in expect mode
if config.BootstrapExpect == 1 {
c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.")
Expand Down Expand Up @@ -206,6 +227,7 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri
} else {
logOutput = io.MultiWriter(c.logFilter, logWriter)
}
c.logOutput = logOutput
return logGate, logWriter, logOutput
}

Expand Down Expand Up @@ -377,6 +399,23 @@ func (c *Command) Run(args []string) int {
}
}

// Get the new client listener addr
httpAddr, err := config.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}

// Register the watches
for _, wp := range config.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}

// Let the agent know we've finished registration
c.agent.StartSync()

Expand Down Expand Up @@ -518,6 +557,28 @@ func (c *Command) handleReload(config *Config) *Config {
}
}

// Get the new client listener addr
httpAddr, err := newConf.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}

// Deregister the old watches
for _, wp := range config.WatchPlans {
wp.Stop()
}

// Register the new watches
for _, wp := range newConf.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}

return newConf
}

Expand Down
28 changes: 28 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/watch"
"github.com/mitchellh/mapstructure"
)

Expand Down Expand Up @@ -229,6 +230,11 @@ type Config struct {
// this acts like deny.
ACLDownPolicy string `mapstructure:"acl_down_policy"`

// Watches are used to monitor various endpoints and to invoke a
// handler to act appropriately. These are managed entirely in the
// agent layer using the standard APIs.
Watches []map[string]interface{} `mapstructure:"watches"`

// AEInterval controls the anti-entropy interval. This is how often
// the agent attempts to reconcile it's local state with the server'
// representation of our state. Defaults to every 60s.
Expand All @@ -251,6 +257,9 @@ type Config struct {

// VersionPrerelease is a label for pre-release builds
VersionPrerelease string `mapstructure:"-"`

// WatchPlans contains the compiled watches
WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"`
}

type dirEnts []os.FileInfo
Expand Down Expand Up @@ -302,6 +311,19 @@ func (c *Config) ClientListener(port int) (*net.TCPAddr, error) {
return &net.TCPAddr{IP: ip, Port: port}, nil
}

// ClientListenerAddr is used to format an address for a
// port on a ClientAddr, handling the zero IP.
func (c *Config) ClientListenerAddr(port int) (string, error) {
addr, err := c.ClientListener(port)
if err != nil {
return "", err
}
if addr.IP.IsUnspecified() {
addr.IP = net.ParseIP("127.0.0.1")
}
return addr.String(), nil
}

// DecodeConfig reads the configuration from the given reader in JSON
// format and decodes it into a proper Config structure.
func DecodeConfig(r io.Reader) (*Config, error) {
Expand Down Expand Up @@ -648,6 +670,12 @@ func MergeConfig(a, b *Config) *Config {
if b.ACLDefaultPolicy != "" {
result.ACLDefaultPolicy = b.ACLDefaultPolicy
}
if len(b.Watches) != 0 {
result.Watches = append(result.Watches, b.Watches...)
}
if len(b.WatchPlans) != 0 {
result.WatchPlans = append(result.WatchPlans, b.WatchPlans...)
}

// Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
Expand Down
28 changes: 28 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,27 @@ func TestDecodeConfig(t *testing.T) {
if config.ACLDefaultPolicy != "deny" {
t.Fatalf("bad: %#v", config)
}

// Watches
input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}

if len(config.Watches) != 1 {
t.Fatalf("bad: %#v", config)
}

out := config.Watches[0]
exp := map[string]interface{}{
"type": "keyprefix",
"prefix": "foo/",
"handler": "foobar",
}
if !reflect.DeepEqual(out, exp) {
t.Fatalf("bad: %#v", config)
}
}

func TestDecodeConfig_Service(t *testing.T) {
Expand Down Expand Up @@ -538,6 +559,13 @@ func TestMergeConfig(t *testing.T) {
ACLTTLRaw: "15s",
ACLDownPolicy: "deny",
ACLDefaultPolicy: "deny",
Watches: []map[string]interface{}{
map[string]interface{}{
"type": "keyprefix",
"prefix": "foo/",
"handler": "foobar",
},
},
}

c := MergeConfig(a, b)
Expand Down
16 changes: 16 additions & 0 deletions command/agent/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package agent
import (
"math"
"math/rand"
"os/exec"
"runtime"
"time"
)

Expand Down Expand Up @@ -39,3 +41,17 @@ func strContains(l []string, s string) bool {
}
return false
}

// ExecScript returns a command to execute a script
func ExecScript(script string) (*exec.Cmd, error) {
var shell, flag string
if runtime.GOOS == "windows" {
shell = "cmd"
flag = "/C"
} else {
shell = "/bin/sh"
flag = "-c"
}
cmd := exec.Command(shell, flag, script)
return cmd, nil
}
80 changes: 80 additions & 0 deletions command/agent/watch_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package agent

import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"strconv"

"github.com/armon/circbuf"
"github.com/hashicorp/consul/watch"
)

const (
// Limit the size of a watch handlers's output to the
// last WatchBufSize. Prevents an enormous buffer
// from being captured
WatchBufSize = 4 * 1024 // 4KB
)

// verifyWatchHandler does the pre-check for our handler configuration
func verifyWatchHandler(params interface{}) error {
if params == nil {
return fmt.Errorf("Must provide watch handler")
}
_, ok := params.(string)
if !ok {
return fmt.Errorf("Watch handler must be a string")
}
return nil
}

// makeWatchHandler returns a handler for the given watch
func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc {
script := params.(string)
logger := log.New(logOutput, "", log.LstdFlags)
fn := func(idx uint64, data interface{}) {
// Create the command
cmd, err := ExecScript(script)
if err != nil {
logger.Printf("[ERR] agent: Failed to setup watch: %v", err)
return
}
cmd.Env = append(os.Environ(),
"CONSUL_INDEX="+strconv.FormatUint(idx, 10),
)

// Collect the output
output, _ := circbuf.NewBuffer(WatchBufSize)
cmd.Stdout = output
cmd.Stderr = output

// Setup the input
var inp bytes.Buffer
enc := json.NewEncoder(&inp)
if err := enc.Encode(data); err != nil {
logger.Printf("[ERR] agent: Failed to encode data for watch '%s': %v", script, err)
return
}
cmd.Stdin = &inp

// Run the handler
if err := cmd.Run(); err != nil {
logger.Printf("[ERR] agent: Failed to invoke watch handler '%s': %v", script, err)
}

// Get the output, add a message about truncation
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
}

// Log the output
logger.Printf("[DEBUG] agent: watch handler '%s' output: %s", script, outputStr)
}
return fn
}
Loading

0 comments on commit 9e13633

Please sign in to comment.