diff --git a/README.md b/README.md index 4d3d584..f68e474 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ Usage | `syslog-facility` | The facility to use when sending to syslog. This requires the use of `-syslog`. The default value is `LOCAL0`. | `token` | The [Consul API token][Consul ACLs]. There is no default value. | `prefix`* | The source prefix including the , with optional destination prefix, separated by a colon (`:`). This option is additive and may be specified multiple times for multiple prefixes to replicate. +| `exclude` | A prefix to exclude during replication. This option is additive and may be specified multiple times for multiple prefixes to exclude. | `wait` | The `minimum(:maximum)` to wait for stability before replicating, separated by a colon (`:`). If the optional maximum value is omitted, it is assumed to be 4x the required minimum value. There is no default value. | `retry` | The amount of time to wait if Consul returns an error when communicating with the API. The default value is 5 seconds. | `config` | The path to a configuration file or directory of configuration files on disk, relative to the current working directory. Values specified on the CLI take precedence over values specified in the configuration file. There is no default value. @@ -80,6 +81,15 @@ $ consul-replicate \ -once ``` +Replicate all keys under "global" from the nyc1 , but exclude the global/private prefix: + +```shell +$ consul-replicate \ + -prefix "global@nyc1" \ + -exclude "global/private" \ + -once +``` + ### Configuration File(s) The Consul Replicate configuration files are written in [HashiCorp Configuration Language (HCL)][HCL]. By proxy, this means the Consul Replicate configuration file is JSON-compatible. For more information, please see the [HCL specification][HCL]. @@ -120,6 +130,10 @@ prefix { prefix { // Multiple prefix definitions are supported } + +exclude { + source = "vault/core/lock" +} ``` If a directory is given instead of a file, all files in the directory (recursively) will be merged in [lexical order](http://golang.org/pkg/path/filepath/#Walk). So if multiple files declare a "consul" key for instance, the last one will be used. diff --git a/cli.go b/cli.go index 985a9cb..5d10b54 100644 --- a/cli.go +++ b/cli.go @@ -241,6 +241,14 @@ func (cli *CLI) parseFlags(args []string) (*Config, bool, bool, error) { return nil }), "prefix", "") + flags.Var((funcVar)(func(s string) error { + if config.Excludes == nil { + config.Excludes = make([]*Exclude, 0, 1) + } + config.Excludes = append(config.Excludes, &Exclude{Source: s}) + return nil + }), "exclude", "") + flags.Var((funcBoolVar)(func(b bool) error { config.Syslog.Enabled = b config.set("syslog") @@ -382,6 +390,8 @@ Options: datacenter and optionally the destination prefix in the destination datacenters - if the destination is omitted, it is assumed to be the same as the source + -exclude= Provides a prefix to exclude from replication + -wait= Sets the 'minumum(:maximum)' amount of time to wait before replicating -retry= The amount of time to wait if Consul returns an diff --git a/cli_test.go b/cli_test.go index c0e60dd..0cafa93 100644 --- a/cli_test.go +++ b/cli_test.go @@ -288,6 +288,25 @@ func TestParseFlags_prefixes(t *testing.T) { } } +func TestParseFlags_excludes(t *testing.T) { + cli := NewCLI(ioutil.Discard, ioutil.Discard) + config, _, _, err := cli.parseFlags([]string{ + "-exclude", "excluded/", + }) + if err != nil { + t.Fatal(err) + } + + if len(config.Excludes) != 1 { + t.Fatal("expected 1 exclude") + } + + exclude := config.Excludes[0] + if exclude.Source != "excluded/" { + t.Errorf("expected %q to be %q", exclude.Source, "excluded/") + } +} + func TestParseFlags_syslog(t *testing.T) { cli := NewCLI(ioutil.Discard, ioutil.Discard) config, _, _, err := cli.parseFlags([]string{ diff --git a/config.go b/config.go index fbbcb54..3205ed9 100644 --- a/config.go +++ b/config.go @@ -33,6 +33,9 @@ type Config struct { // Prefixes is the list of key prefix dependencies. Prefixes []*Prefix `mapstructure:"prefix"` + // Excludes is the list of key prefixes to exclude from replication. + Excludes []*Exclude `mapstructure:"exclude"` + // Auth is the HTTP basic authentication for communicating with Consul. Auth *AuthConfig `mapstructure:"auth"` @@ -115,6 +118,13 @@ func (c *Config) Copy() *Config { } } + config.Excludes = make([]*Exclude, len(c.Excludes)) + for i, p := range c.Excludes { + config.Excludes[i] = &Exclude{ + Source: p.Source, + } + } + config.Retry = c.Retry if c.Wait != nil { @@ -224,6 +234,18 @@ func (c *Config) Merge(config *Config) { } } + if config.Excludes != nil { + if c.Excludes == nil { + c.Excludes = []*Exclude{} + } + + for _, exclude := range config.Excludes { + c.Excludes = append(c.Excludes, &Exclude{ + Source: exclude.Source, + }) + } + } + if config.WasSet("retry") { c.Retry = config.Retry } @@ -375,6 +397,7 @@ func DefaultConfig() *Config { }, LogLevel: logLevel, Prefixes: []*Prefix{}, + Excludes: []*Exclude{}, Retry: 5 * time.Second, StatusDir: "service/consul-replicate/statuses", Wait: &watch.Wait{ @@ -476,6 +499,11 @@ type Prefix struct { Destination string `mapstructure:"destination"` } +// Exclude is a key path prefix to exclude from replication +type Exclude struct { + Source string `mapstructure:"source"` +} + // ParsePrefix parses a prefix of the format "source@dc:destination" into the // Prefix component. func ParsePrefix(s string) (*Prefix, error) { diff --git a/config_test.go b/config_test.go index 6bf4a80..4c29b98 100644 --- a/config_test.go +++ b/config_test.go @@ -184,6 +184,32 @@ func TestMerge_Prefixes(t *testing.T) { } } +func TestMerge_Excludes(t *testing.T) { + config1 := testConfig(` + exclude { + source = "foo" + } + `, t) + config2 := testConfig(` + exclude { + source = "foo-2" + } + `, t) + config1.Merge(config2) + + if len(config1.Excludes) != 2 { + t.Fatalf("bad excludes %d", len(config1.Excludes)) + } + + if config1.Excludes[0].Source != "foo" { + t.Errorf("bad source: %#v", config1.Excludes[0].Source) + } + + if config1.Excludes[1].Source != "foo-2" { + t.Errorf("bad source: %#v", config1.Excludes[1].Source) + } +} + func TestMerge_wait(t *testing.T) { config1 := testConfig(` wait = "1s:1s" @@ -257,6 +283,7 @@ func TestParseConfig_correctValues(t *testing.T) { Password: "test", }, Prefixes: []*Prefix{}, + Excludes: []*Exclude{}, SSL: &SSLConfig{ Enabled: true, Verify: false, diff --git a/runner.go b/runner.go index 2addccc..d1c6981 100644 --- a/runner.go +++ b/runner.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/consul-template/watch" "github.com/hashicorp/consul/api" "github.com/hashicorp/go-multierror" + "strings" ) // Regexp for invalid characters in keys @@ -188,7 +189,7 @@ func (r *Runner) Run() error { // Replicate each prefix in a goroutine for _, prefix := range prefixes { - go r.replicate(prefix, doneCh, errCh) + go r.replicate(prefix, r.config.Excludes, doneCh, errCh) } var errs *multierror.Error @@ -256,7 +257,7 @@ func (r *Runner) get(prefix *Prefix) (*watch.View, bool) { // replicate performs replication into the current datacenter from the given // prefix. This function is designed to be called via a goroutine since it is // expensive and needs to be parallelized. -func (r *Runner) replicate(prefix *Prefix, doneCh chan struct{}, errCh chan error) { +func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan struct{}, errCh chan error) { // Ensure we are not self-replicating info, err := r.client.Agent().Self() if err != nil { @@ -301,6 +302,22 @@ func (r *Runner) replicate(prefix *Prefix, doneCh chan struct{}, errCh chan erro key := prefix.Destination + pair.Key usedKeys[key] = struct{}{} + // Ignore if the key falls under an excluded prefix + if len(excludes) > 0 { + excluded := false + for _, exclude := range excludes { + if strings.HasPrefix(pair.Path, exclude.Source) { + log.Printf("[DEBUG] (runner) key %q has prefix %q, excluding", + pair.Path, exclude.Source) + excluded = true + } + } + + if excluded { + continue + } + } + // Ignore if the modify index is old if pair.ModifyIndex <= status.LastReplicated { log.Printf("[DEBUG] (runner) skipping because %q is already "+ @@ -346,7 +363,21 @@ func (r *Runner) replicate(prefix *Prefix, doneCh chan struct{}, errCh chan erro return } for _, key := range localKeys { - if _, ok := usedKeys[key]; !ok { + excluded := false + + // Ignore if the key falls under an excluded prefix + if len(excludes) > 0 { + sourceKey := strings.Replace(key, prefix.Destination, prefix.Source.Prefix, -1) + for _, exclude := range excludes { + if strings.HasPrefix(sourceKey, exclude.Source) { + log.Printf("[DEBUG] (runner) key %q has prefix %q, excluding from deletes", + sourceKey, exclude.Source) + excluded = true + } + } + } + + if _, ok := usedKeys[key]; !ok && !excluded { if _, err := kv.Delete(key, nil); err != nil { errCh <- fmt.Errorf("failed to delete %q: %s", key, err) return diff --git a/runner_test.go b/runner_test.go index 39c3ed1..4ac6b3f 100644 --- a/runner_test.go +++ b/runner_test.go @@ -15,6 +15,9 @@ func TestNewRunner_initialize(t *testing.T) { &Prefix{SourceRaw: "3", Destination: "6"}, &Prefix{SourceRaw: "4", Destination: "7"}, }, + Excludes: []*Exclude{ + &Exclude{Source: "3"}, + }, } runner, err := NewRunner(config, once) diff --git a/scripts/integration.sh b/scripts/integration.sh index 8238c84..26a198e 100755 --- a/scripts/integration.sh +++ b/scripts/integration.sh @@ -11,6 +11,7 @@ PORT_DC1="8100" PORT_DC2="8200" ADDRESS_DC1="127.0.0.1:$PORT_DC1" ADDRESS_DC2="127.0.0.1:$PORT_DC2" +EXCLUDED_KEY=555 function cleanup { kill -9 $CONSUL_DC1_PID @@ -64,6 +65,7 @@ echo $CONSUL_REPLICATE_BIN $CONSUL_REPLICATE_BIN \ -consul $ADDRESS_DC2 \ -prefix "global@dc1:backup" \ + -exclude "global/$EXCLUDED_KEY" \ -log-level $LOG_LEVEL & CONSUL_REPLICATE_PID=$! sleep 5 @@ -80,7 +82,11 @@ echo echo "CHECKING DC2 FOR REPLICATION" for i in `seq 1 1000`; do - curl -s $ADDRESS_DC2/v1/kv/backup/$i | grep "dGVzdCBkYXRh" + if [ $i -ne "$EXCLUDED_KEY" ]; then + curl -s $ADDRESS_DC2/v1/kv/backup/$i | grep "dGVzdCBkYXRh" + else + curl -sw '%{http_code}' $ADDRESS_DC2/v1/kv/backup/$i | grep "404" + fi done echo @@ -90,6 +96,16 @@ sleep 5 curl -s $ADDRESS_DC2/v1/kv/backup/six | grep "c2l4" echo +echo "WRITING KEY IN DC2" +curl -s -o /dev/null -X PUT $ADDRESS_DC2/v1/kv/backup/$EXCLUDED_KEY/nodelete -d "don't delete" +sleep 5 + +echo "UPDATING PREFIX IN DC1" +curl -s -o /dev/null -X PUT $ADDRESS_DC1/v1/kv/global/$EXCLUDED_KEY -d "test data" +sleep 5 + +echo "CHECKING THAT KEY STILL EXISTS IN DC2" +curl -s $ADDRESS_DC2/v1/kv/backup/$EXCLUDED_KEY/nodelete | grep "ZG9uJ3QgZGVsZXRl" rm -rf $DATADIR_DC1 rm -rf $DATADIR_DC2