Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent: prevent very old servers re-joining a cluster with stale data #17171

Merged
merged 24 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/17171.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent: add a configurable maximimum age (default: 7 days) to prevent servers re-joining a cluster with stale data
```
91 changes: 82 additions & 9 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
Expand All @@ -22,8 +23,6 @@ import (

"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
"github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
Expand All @@ -50,12 +49,13 @@ import (
grpcDNS "github.com/hashicorp/consul/agent/grpc-external/services/dns"
middleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/hcp/scada"
libscada "github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
localproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/rpcclient"
"github.com/hashicorp/consul/agent/rpcclient/configentry"
"github.com/hashicorp/consul/agent/rpcclient/health"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/systemd"
Expand Down Expand Up @@ -575,11 +575,11 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}

// copy over the existing node id, this cannot be
// changed while running anyways but this prevents
// breaking some existing behavior. then overwrite
// the configuration
// Copy over the existing node id. This cannot be
// changed while running, but this prevents
// breaking some existing behavior.
c.NodeID = a.config.NodeID
// Overwrite the configuration.
a.config = c

if err := a.tlsConfigurator.Update(a.config.TLS); err != nil {
Expand Down Expand Up @@ -625,6 +625,15 @@ func (a *Agent) Start(ctx context.Context) error {
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)

// TODO: maybe this is called too early?
loshz marked this conversation as resolved.
Show resolved Hide resolved
if err := a.checkServerLastSeen(); err != nil {
// TODO: log a bunch of times first?
loshz marked this conversation as resolved.
Show resolved Hide resolved
return err
}

// periodically write server metadata to disk.
go a.persistServerMetadata()

incomingRPCLimiter := consul.ConfiguredIncomingRPCLimiter(
&lib.StopChannelContext{StopCh: a.shutdownCh},
serverLogger,
Expand Down Expand Up @@ -661,7 +670,6 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start server cert manager: %w", err)
}
}

} else {
a.externalGRPCServer = external.NewServer(
a.logger.Named("grpc.external"),
Expand Down Expand Up @@ -1094,7 +1102,7 @@ func (a *Agent) listenHTTP() ([]apiServer, error) {
MaxHeaderBytes: a.config.HTTPMaxHeaderBytes,
}

if libscada.IsCapability(l.Addr()) {
if scada.IsCapability(l.Addr()) {
// wrap in http2 server handler
httpServer.Handler = h2c.NewHandler(srv.handler(a.config.EnableDebug), &http2.Server{})
}
Expand Down Expand Up @@ -1521,6 +1529,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co

cfg.Reporting.License.Enabled = runtimeCfg.Reporting.License.Enabled

cfg.ServerRejoinAgeMax = runtimeCfg.ServerRejoinAgeMax

enterpriseConsulConfig(cfg, runtimeCfg)

return cfg, nil
Expand Down Expand Up @@ -4529,7 +4539,70 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {

a.fillEnterpriseProxyDataSources(&sources)
return sources
}

// persistServerMetadata periodically writes a server's metadata to a file
// in the configured data directory.
func (a *Agent) persistServerMetadata() {
file := filepath.Join(a.config.DataDir, consul.ServerMetadataFile)

// Create a timer with no initial tick to allow metadata to be written immediately.
t := time.NewTimer(0)
defer t.Stop()

for {
select {
case <-t.C:
// Reset the timer to the larger periodic interval.
t.Reset(1 * time.Hour)

f, err := consul.OpenServerMetadata(file)
if err != nil {
a.logger.Error("failed to open existing server metadata: %w", err)
continue
}

if err := consul.WriteServerMetadata(f); err != nil {
f.Close()
// TODO: should we exit if this has happened too many times?
loshz marked this conversation as resolved.
Show resolved Hide resolved
a.logger.Error("failed to write server metadata: %w", err)
continue
}

f.Close()
case <-a.shutdownCh:
return
}
}
}

// checkServerLastSeen is a safety check for preventing old servers from rejoining an existing cluster.
//
// It attempts to read a server's metadata file and check the last seen Unix timestamp against a
// configurable max age. If the metadata file does not exist, we treat this as an initial startup
// and return no error.
//
// Example: if the server recorded a last seen timestamp of now-7d, and we configure a max age
// of 3d, then we should prevent the server from rejoining.
func (a *Agent) checkServerLastSeen() error {
filename := filepath.Join(a.config.DataDir, consul.ServerMetadataFile)

// Read server metadata file.
md, err := consul.ReadServerMetadata(filename)
if err != nil {
// Return early if it doesn't as this indicates the server is starting for the first time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that persistServerMetadata() never wrote the file because it encountered errors writing the file? do we need some kind of check between first startup time and now in this so we figure out "it must have had an opportunity to write a file"? Or possibly an inmemory counter of how many times persistServerMetadata() has run so we should expect a file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered here. But I will update the code comments to explicitly call this out.

if errors.Is(err, os.ErrNotExist) {
return nil
}
return fmt.Errorf("error reading server metadata: %w", err)
}

maxAge := a.config.ServerRejoinAgeMax
if md.IsLastSeenStale(maxAge) {
return fmt.Errorf("refusing to rejoin cluster because server has been offline for more than the configured server_rejoin_age_max (%s) - consider wiping your data dir", maxAge)
}

return nil
}

func listenerPortKey(svcID structs.ServiceID, checkID structs.CheckID) string {
Expand Down
14 changes: 12 additions & 2 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ import (
"github.com/hashicorp/memberlist"
"golang.org/x/time/rate"

hcpconfig "github.com/hashicorp/consul/agent/hcp/config"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/dns"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
Expand Down Expand Up @@ -1090,6 +1089,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
ServerMode: serverMode,
ServerName: stringVal(c.ServerName),
ServerPort: serverPort,
ServerRejoinAgeMax: b.durationValWithDefaultMin("server_rejoin_age_max", c.ServerRejoinAgeMax, 24*7*time.Hour, time.Hour),
loshz marked this conversation as resolved.
Show resolved Hide resolved
Services: services,
SessionTTLMin: b.durationVal("session_ttl_min", c.SessionTTLMin),
SkipLeaveOnInt: skipLeaveOnInt,
Expand Down Expand Up @@ -1952,6 +1952,16 @@ func (b *builder) durationValWithDefault(name string, v *string, defaultVal time
return d
}

// durationValWithDefaultMin is equivalent to durationValWithDefault, but enforces a minimum duration.
func (b *builder) durationValWithDefaultMin(name string, v *string, defaultVal, minVal time.Duration) (d time.Duration) {
d = b.durationValWithDefault(name, v, defaultVal)
if d < minVal {
b.err = multierror.Append(b.err, fmt.Errorf("%s: duration '%s' cannot be less than: %s", name, *v, minVal))
}

return d
}

func (b *builder) durationVal(name string, v *string) (d time.Duration) {
return b.durationValWithDefault(name, v, 0)
}
Expand Down
15 changes: 15 additions & 0 deletions agent/config/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,21 @@ func TestBuilder_DurationVal_InvalidDuration(t *testing.T) {
require.Contains(t, b.err.Error(), badDuration2)
}

func TestBuilder_DurationValWithDefaultMin(t *testing.T) {
b := builder{}

// Attempt to validate that a duration of 10 hours will not error when the min val is 1 hour.
dur := "10h0m0s"
b.durationValWithDefaultMin("field2", &dur, 24*7*time.Hour, time.Hour)
require.NoError(t, b.err)

// Attempt to validate that a duration of 1 min will error when the min val is 1 hour.
dur = "0h1m0s"
b.durationValWithDefaultMin("field1", &dur, 24*7*time.Hour, time.Hour)
require.Error(t, b.err)
require.Contains(t, b.err.Error(), "1 error")
}

func TestBuilder_ServiceVal_MultiError(t *testing.T) {
b := builder{}
b.serviceVal(&ServiceDefinition{
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ type Config struct {
SerfBindAddrWAN *string `mapstructure:"serf_wan" json:"serf_wan,omitempty"`
ServerMode *bool `mapstructure:"server" json:"server,omitempty"`
ServerName *string `mapstructure:"server_name" json:"server_name,omitempty"`
ServerRejoinAgeMax *string `mapstructure:"server_rejoin_age_max" json:"server_rejoin_age_max,omitempty"`
Service *ServiceDefinition `mapstructure:"service" json:"-"`
Services []ServiceDefinition `mapstructure:"services" json:"-"`
SessionTTLMin *string `mapstructure:"session_ttl_min" json:"session_ttl_min,omitempty"`
Expand Down
12 changes: 12 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,18 @@ type RuntimeConfig struct {
// hcl: ports { server = int }
ServerPort int

// ServerRejoinAgeMax is used to specify the duration of time a server
// is allowed to be down/offline before a startup operation is refused.
//
// For example: if a server has been offline for 5 days, and this option
// is configured to 3 days, then any subsequent startup operation will fail
// and require an operator to manually intervene.
//
// The default is: 7 days
//
// hcl: server_rejoin_age_max = "duration"
ServerRejoinAgeMax time.Duration

loshz marked this conversation as resolved.
Show resolved Hide resolved
// Services contains the provided service definitions:
//
// hcl: services = [
Expand Down
7 changes: 4 additions & 3 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

hcpconfig "github.com/hashicorp/consul/agent/hcp/config"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/consul"
consulrate "github.com/hashicorp/consul/agent/consul/rate"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/lib"
Expand Down Expand Up @@ -6419,6 +6418,7 @@ func TestLoad_FullConfig(t *testing.T) {
SerfPortWAN: 8302,
ServerMode: true,
ServerName: "Oerr9n1G",
ServerRejoinAgeMax: 604800 * time.Second,
ServerPort: 3757,
Services: []*structs.ServiceDefinition{
{
Expand Down Expand Up @@ -7163,7 +7163,8 @@ func TestRuntimeConfig_Sanitize(t *testing.T) {
},
},
},
Locality: &Locality{Region: strPtr("us-west-1"), Zone: strPtr("us-west-1a")},
Locality: &Locality{Region: strPtr("us-west-1"), Zone: strPtr("us-west-1a")},
ServerRejoinAgeMax: 24 * 7 * time.Hour,
}

b, err := json.MarshalIndent(rt.Sanitized(), "", " ")
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@
"ServerMode": false,
"ServerName": "",
"ServerPort": 0,
"ServerRejoinAgeMax": "168h0m0s",
"Services": [
{
"Address": "",
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ serf_lan = "99.43.63.15"
serf_wan = "67.88.33.19"
server = true
server_name = "Oerr9n1G"
server_rejoin_age_max = "604800s"
service = {
id = "dLOXpSCI"
name = "o1ynPkp0"
Expand Down
1 change: 1 addition & 0 deletions agent/config/testdata/full-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@
"serf_wan": "67.88.33.19",
"server": true,
"server_name": "Oerr9n1G",
"server_rejoin_age_max": "604800s",
"service": {
"id": "dLOXpSCI",
"name": "o1ynPkp0",
Expand Down
4 changes: 4 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ type Config struct {

// Embedded Consul Enterprise specific configuration
*EnterpriseConfig

// ServerRejoinAgeMax is used to specify the duration of time a server
// is allowed to be down/offline before a startup operation is refused.
ServerRejoinAgeMax time.Duration
}

func (c *Config) InPrimaryDatacenter() bool {
Expand Down
69 changes: 69 additions & 0 deletions agent/consul/server_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package consul

import (
"encoding/json"
"io"
"os"
"time"
)

// ServerMetadataFile is the name of the file on disk that server metadata
// should be written to.
const ServerMetadataFile = "server_metadata.json"

// ServerMetadata represents specific metadata about a running server.
type ServerMetadata struct {
// LastSeenUnix is the timestamp a server was last seen, in Unix format.
LastSeenUnix int64 `json:"last_seen_unix"`
}

// IsLastSeenStale checks whether the last seen timestamp is older than a given duration.
func (md *ServerMetadata) IsLastSeenStale(d time.Duration) bool {
lastSeen := time.Unix(md.LastSeenUnix, 0)
maxAge := time.Now().Add(-d)

return lastSeen.Before(maxAge)
}

// OpenServerMetadata is a helper function for opening the server metadata file
// with the correct permissions.
func OpenServerMetadata(filename string) (io.WriteCloser, error) {
return os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
}

// ReadServerMetadata is a helper function for reading the contents of a server
// metadata file and unmarshaling the data from JSON.
func ReadServerMetadata(filename string) (*ServerMetadata, error) {
b, err := os.ReadFile(filename)
if err != nil {
return nil, err
}

var md ServerMetadata
if err := json.Unmarshal(b, &md); err != nil {
return nil, err
}

return &md, nil
}

// WriteServerMetadata writes server metadata to a file in JSON format.
func WriteServerMetadata(w io.Writer) error {
md := &ServerMetadata{
LastSeenUnix: time.Now().Unix(),
}

b, err := json.Marshal(md)
if err != nil {
return err
}

if _, err := w.Write(b); err != nil {
return err
}

return nil
}
Loading