Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the ability to exclude specific prefixes from replication #51

Merged
merged 3 commits into from
Oct 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Usage
| `syslog-facility` | The facility to use when sending to syslog. This requires the use of `-syslog`. The default value is `LOCAL0`.
| `token` | The [Consul API token][Consul ACLs]. There is no default value.
| `prefix`* | The source prefix including the , with optional destination prefix, separated by a colon (`:`). This option is additive and may be specified multiple times for multiple prefixes to replicate.
| `exclude` | A prefix to exclude during replication. This option is additive and may be specified multiple times for multiple prefixes to exclude.
| `wait` | The `minimum(:maximum)` to wait for stability before replicating, separated by a colon (`:`). If the optional maximum value is omitted, it is assumed to be 4x the required minimum value. There is no default value.
| `retry` | The amount of time to wait if Consul returns an error when communicating with the API. The default value is 5 seconds.
| `config` | The path to a configuration file or directory of configuration files on disk, relative to the current working directory. Values specified on the CLI take precedence over values specified in the configuration file. There is no default value.
Expand Down Expand Up @@ -80,6 +81,15 @@ $ consul-replicate \
-once
```

Replicate all keys under "global" from the nyc1 , but exclude the global/private prefix:

```shell
$ consul-replicate \
-prefix "global@nyc1" \
-exclude "global/private" \
-once
```

### Configuration File(s)
The Consul Replicate configuration files are written in [HashiCorp Configuration Language (HCL)][HCL]. By proxy, this means the Consul Replicate configuration file is JSON-compatible. For more information, please see the [HCL specification][HCL].

Expand Down Expand Up @@ -120,6 +130,10 @@ prefix {
prefix {
// Multiple prefix definitions are supported
}

exclude {
source = "vault/core/lock"
}
```

If a directory is given instead of a file, all files in the directory (recursively) will be merged in [lexical order](http://golang.org/pkg/path/filepath/#Walk). So if multiple files declare a "consul" key for instance, the last one will be used.
Expand Down
10 changes: 10 additions & 0 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ func (cli *CLI) parseFlags(args []string) (*Config, bool, bool, error) {
return nil
}), "prefix", "")

flags.Var((funcVar)(func(s string) error {
if config.Excludes == nil {
config.Excludes = make([]*Exclude, 0, 1)
}
config.Excludes = append(config.Excludes, &Exclude{Source: s})
return nil
}), "exclude", "")

flags.Var((funcBoolVar)(func(b bool) error {
config.Syslog.Enabled = b
config.set("syslog")
Expand Down Expand Up @@ -382,6 +390,8 @@ Options:
datacenter and optionally the destination prefix in
the destination datacenters - if the destination is
omitted, it is assumed to be the same as the source
-exclude=<src> Provides a prefix to exclude from replication

-wait=<duration> Sets the 'minumum(:maximum)' amount of time to wait
before replicating
-retry=<duration> The amount of time to wait if Consul returns an
Expand Down
19 changes: 19 additions & 0 deletions cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,25 @@ func TestParseFlags_prefixes(t *testing.T) {
}
}

func TestParseFlags_excludes(t *testing.T) {
cli := NewCLI(ioutil.Discard, ioutil.Discard)
config, _, _, err := cli.parseFlags([]string{
"-exclude", "excluded/",
})
if err != nil {
t.Fatal(err)
}

if len(config.Excludes) != 1 {
t.Fatal("expected 1 exclude")
}

exclude := config.Excludes[0]
if exclude.Source != "excluded/" {
t.Errorf("expected %q to be %q", exclude.Source, "excluded/")
}
}

func TestParseFlags_syslog(t *testing.T) {
cli := NewCLI(ioutil.Discard, ioutil.Discard)
config, _, _, err := cli.parseFlags([]string{
Expand Down
28 changes: 28 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Config struct {
// Prefixes is the list of key prefix dependencies.
Prefixes []*Prefix `mapstructure:"prefix"`

// Excludes is the list of key prefixes to exclude from replication.
Excludes []*Exclude `mapstructure:"exclude"`

// Auth is the HTTP basic authentication for communicating with Consul.
Auth *AuthConfig `mapstructure:"auth"`

Expand Down Expand Up @@ -115,6 +118,13 @@ func (c *Config) Copy() *Config {
}
}

config.Excludes = make([]*Exclude, len(c.Excludes))
for i, p := range c.Excludes {
config.Excludes[i] = &Exclude{
Source: p.Source,
}
}

config.Retry = c.Retry

if c.Wait != nil {
Expand Down Expand Up @@ -224,6 +234,18 @@ func (c *Config) Merge(config *Config) {
}
}

if config.Excludes != nil {
if c.Excludes == nil {
c.Excludes = []*Exclude{}
}

for _, exclude := range config.Excludes {
c.Excludes = append(c.Excludes, &Exclude{
Source: exclude.Source,
})
}
}

if config.WasSet("retry") {
c.Retry = config.Retry
}
Expand Down Expand Up @@ -375,6 +397,7 @@ func DefaultConfig() *Config {
},
LogLevel: logLevel,
Prefixes: []*Prefix{},
Excludes: []*Exclude{},
Retry: 5 * time.Second,
StatusDir: "service/consul-replicate/statuses",
Wait: &watch.Wait{
Expand Down Expand Up @@ -476,6 +499,11 @@ type Prefix struct {
Destination string `mapstructure:"destination"`
}

// Exclude is a key path prefix to exclude from replication
type Exclude struct {
Source string `mapstructure:"source"`
}

// ParsePrefix parses a prefix of the format "source@dc:destination" into the
// Prefix component.
func ParsePrefix(s string) (*Prefix, error) {
Expand Down
27 changes: 27 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,32 @@ func TestMerge_Prefixes(t *testing.T) {
}
}

func TestMerge_Excludes(t *testing.T) {
config1 := testConfig(`
exclude {
source = "foo"
}
`, t)
config2 := testConfig(`
exclude {
source = "foo-2"
}
`, t)
config1.Merge(config2)

if len(config1.Excludes) != 2 {
t.Fatalf("bad excludes %d", len(config1.Excludes))
}

if config1.Excludes[0].Source != "foo" {
t.Errorf("bad source: %#v", config1.Excludes[0].Source)
}

if config1.Excludes[1].Source != "foo-2" {
t.Errorf("bad source: %#v", config1.Excludes[1].Source)
}
}

func TestMerge_wait(t *testing.T) {
config1 := testConfig(`
wait = "1s:1s"
Expand Down Expand Up @@ -257,6 +283,7 @@ func TestParseConfig_correctValues(t *testing.T) {
Password: "test",
},
Prefixes: []*Prefix{},
Excludes: []*Exclude{},
SSL: &SSLConfig{
Enabled: true,
Verify: false,
Expand Down
37 changes: 34 additions & 3 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hashicorp/consul-template/watch"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
"strings"
)

// Regexp for invalid characters in keys
Expand Down Expand Up @@ -188,7 +189,7 @@ func (r *Runner) Run() error {

// Replicate each prefix in a goroutine
for _, prefix := range prefixes {
go r.replicate(prefix, doneCh, errCh)
go r.replicate(prefix, r.config.Excludes, doneCh, errCh)
}

var errs *multierror.Error
Expand Down Expand Up @@ -256,7 +257,7 @@ func (r *Runner) get(prefix *Prefix) (*watch.View, bool) {
// replicate performs replication into the current datacenter from the given
// prefix. This function is designed to be called via a goroutine since it is
// expensive and needs to be parallelized.
func (r *Runner) replicate(prefix *Prefix, doneCh chan struct{}, errCh chan error) {
func (r *Runner) replicate(prefix *Prefix, excludes []*Exclude, doneCh chan struct{}, errCh chan error) {
// Ensure we are not self-replicating
info, err := r.client.Agent().Self()
if err != nil {
Expand Down Expand Up @@ -301,6 +302,22 @@ func (r *Runner) replicate(prefix *Prefix, doneCh chan struct{}, errCh chan erro
key := prefix.Destination + pair.Key
usedKeys[key] = struct{}{}

// Ignore if the key falls under an excluded prefix
if len(excludes) > 0 {
excluded := false
for _, exclude := range excludes {
if strings.HasPrefix(pair.Path, exclude.Source) {
log.Printf("[DEBUG] (runner) key %q has prefix %q, excluding",
pair.Path, exclude.Source)
excluded = true
}
}

if excluded {
continue
}
}

// Ignore if the modify index is old
if pair.ModifyIndex <= status.LastReplicated {
log.Printf("[DEBUG] (runner) skipping because %q is already "+
Expand Down Expand Up @@ -346,7 +363,21 @@ func (r *Runner) replicate(prefix *Prefix, doneCh chan struct{}, errCh chan erro
return
}
for _, key := range localKeys {
if _, ok := usedKeys[key]; !ok {
excluded := false

// Ignore if the key falls under an excluded prefix
if len(excludes) > 0 {
sourceKey := strings.Replace(key, prefix.Destination, prefix.Source.Prefix, -1)
for _, exclude := range excludes {
if strings.HasPrefix(sourceKey, exclude.Source) {
log.Printf("[DEBUG] (runner) key %q has prefix %q, excluding from deletes",
sourceKey, exclude.Source)
excluded = true
}
}
}

if _, ok := usedKeys[key]; !ok && !excluded {
if _, err := kv.Delete(key, nil); err != nil {
errCh <- fmt.Errorf("failed to delete %q: %s", key, err)
return
Expand Down
3 changes: 3 additions & 0 deletions runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ func TestNewRunner_initialize(t *testing.T) {
&Prefix{SourceRaw: "3", Destination: "6"},
&Prefix{SourceRaw: "4", Destination: "7"},
},
Excludes: []*Exclude{
&Exclude{Source: "3"},
},
}

runner, err := NewRunner(config, once)
Expand Down
18 changes: 17 additions & 1 deletion scripts/integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ PORT_DC1="8100"
PORT_DC2="8200"
ADDRESS_DC1="127.0.0.1:$PORT_DC1"
ADDRESS_DC2="127.0.0.1:$PORT_DC2"
EXCLUDED_KEY=555

function cleanup {
kill -9 $CONSUL_DC1_PID
Expand Down Expand Up @@ -64,6 +65,7 @@ echo $CONSUL_REPLICATE_BIN
$CONSUL_REPLICATE_BIN \
-consul $ADDRESS_DC2 \
-prefix "global@dc1:backup" \
-exclude "global/$EXCLUDED_KEY" \
-log-level $LOG_LEVEL &
CONSUL_REPLICATE_PID=$!
sleep 5
Expand All @@ -80,7 +82,11 @@ echo
echo "CHECKING DC2 FOR REPLICATION"
for i in `seq 1 1000`;
do
curl -s $ADDRESS_DC2/v1/kv/backup/$i | grep "dGVzdCBkYXRh"
if [ $i -ne "$EXCLUDED_KEY" ]; then
curl -s $ADDRESS_DC2/v1/kv/backup/$i | grep "dGVzdCBkYXRh"
else
curl -sw '%{http_code}' $ADDRESS_DC2/v1/kv/backup/$i | grep "404"
fi
done

echo
Expand All @@ -90,6 +96,16 @@ sleep 5
curl -s $ADDRESS_DC2/v1/kv/backup/six | grep "c2l4"

echo
echo "WRITING KEY IN DC2"
curl -s -o /dev/null -X PUT $ADDRESS_DC2/v1/kv/backup/$EXCLUDED_KEY/nodelete -d "don't delete"
sleep 5

echo "UPDATING PREFIX IN DC1"
curl -s -o /dev/null -X PUT $ADDRESS_DC1/v1/kv/global/$EXCLUDED_KEY -d "test data"
sleep 5

echo "CHECKING THAT KEY STILL EXISTS IN DC2"
curl -s $ADDRESS_DC2/v1/kv/backup/$EXCLUDED_KEY/nodelete | grep "ZG9uJ3QgZGVsZXRl"

rm -rf $DATADIR_DC1
rm -rf $DATADIR_DC2