diff --git a/agent/acl_test.go b/agent/acl_test.go index 8cf49cd02adc..12d3a4231547 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -80,7 +80,7 @@ func NewTestACLAgent(name string, hcl string, resolveFn func(string) (acl.Author agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute) a.Agent.delegate = a - a.Agent.State = local.NewState(LocalConfig(a.Config), a.Agent.logger, a.Agent.tokens) + a.Agent.State = local.NewState(LocalConfig(a.Config), a.Agent.logger, a.Agent.tokens, time.Now) a.Agent.State.TriggerSyncChanges = func() {} return a } diff --git a/agent/agent.go b/agent/agent.go index 289653cd1bf2..a6aabbef5cd0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -249,6 +249,9 @@ type Agent struct { // grpcServer is the server instance used currently to serve xDS API for // Envoy. grpcServer *grpc.Server + + // clock is used to timestamp healthchecks + clock func() time.Time } func New(c *config.RuntimeConfig) (*Agent, error) { @@ -277,6 +280,7 @@ func New(c *config.RuntimeConfig) (*Agent, error) { shutdownCh: make(chan struct{}), endpoints: make(map[string]string), tokens: new(token.Store), + clock: time.Now, } if err := a.initializeACLs(); err != nil { @@ -363,7 +367,7 @@ func (a *Agent) Start() error { } // create the local state - a.State = local.NewState(LocalConfig(c), a.logger, a.tokens) + a.State = local.NewState(LocalConfig(c), a.logger, a.tokens, a.clock) // create the state synchronization manager which performs // regular and on-demand state synchronizations (anti-entropy). @@ -385,7 +389,7 @@ func (a *Agent) Start() error { // Setup either the client or the server. if c.ServerMode { - server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens) + server, err := consul.NewServerLogger(consulCfg, a.logger, a.tokens, a.clock) if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } @@ -2799,6 +2803,7 @@ func (a *Agent) loadCheckState(check *structs.HealthCheck) error { // Restore the fields from the state check.Output = p.Output check.Status = p.Status + check.LastStatusModifyTime = p.LastStatusModifyTime return nil } diff --git a/agent/agent_test.go b/agent/agent_test.go index 7551a597704e..2a1ad9eb4336 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -451,6 +451,8 @@ func TestAgent_AddService(t *testing.T) { for k, v := range tt.healthChks { t.Run(k, func(t *testing.T) { got, want := a.State.Checks()[types.CheckID(k)], v + want.LastStatusModifyTime = a.Now + fmt.Println(want.LastStatusModifyTime, got.LastStatusModifyTime) verify.Values(t, k, got, want) }) } @@ -653,9 +655,11 @@ func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) { } // check that both checks are there + hchk1.LastStatusModifyTime = a.Now if got, want := a.State.Checks()["chk1"], hchk1; !verify.Values(t, "", got, want) { t.FailNow() } + hchk2.LastStatusModifyTime = a.Now if got, want := a.State.Checks()["chk2"], hchk2; !verify.Values(t, "", got, want) { t.FailNow() } @@ -1978,11 +1982,12 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { t.Fatalf("missing check registration") } expected := &structs.HealthCheck{ - Node: a2.Config.NodeName, - CheckID: "mem", - Name: "memory check", - Status: api.HealthCritical, - Notes: "my cool notes", + Node: a2.Config.NodeName, + CheckID: "mem", + Name: "memory check", + Status: api.HealthCritical, + Notes: "my cool notes", + LastStatusModifyTime: a2.Now, } if got, want := result, expected; !verify.Values(t, "", got, want) { t.FailNow() diff --git a/agent/check.go b/agent/check.go index 0a3ce6b48dcd..107903579d74 100644 --- a/agent/check.go +++ b/agent/check.go @@ -1,6 +1,8 @@ package agent import ( + "time" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/types" ) @@ -18,8 +20,9 @@ type persistedCheck struct { // expiration timestamp which is used to determine staleness on later // agent restarts. type persistedCheckState struct { - CheckID types.CheckID - Output string - Status string - Expires int64 + CheckID types.CheckID + Output string + Status string + LastStatusModifyTime time.Time + Expires int64 } diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index d28e3994ee81..32e5abfb8f0f 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -365,7 +365,7 @@ func TestClient_RPC_TLS(t *testing.T) { conf1.VerifyIncoming = true conf1.VerifyOutgoing = true configureTLS(conf1) - s1, err := NewServer(conf1) + s1, err := NewServer(conf1, time.Now) if err != nil { t.Fatalf("err: %v", err) } @@ -408,7 +408,7 @@ func TestClient_RPC_TLS(t *testing.T) { func TestClient_RPC_RateLimit(t *testing.T) { t.Parallel() dir1, conf1 := testServerConfig(t) - s1, err := NewServer(conf1) + s1, err := NewServer(conf1, time.Now) if err != nil { t.Fatalf("err: %v", err) } @@ -518,7 +518,7 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) { conf1.VerifyIncoming = true conf1.VerifyOutgoing = true configureTLS(conf1) - s1, err := NewServer(conf1) + s1, err := NewServer(conf1, time.Now) if err != nil { t.Fatalf("err: %v", err) } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 05a1992ac35f..ef7950ded501 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -1311,11 +1311,12 @@ AFTER_CHECK: Address: member.Addr.String(), Service: service, Check: &structs.HealthCheck{ - Node: member.Name, - CheckID: structs.SerfCheckID, - Name: structs.SerfCheckName, - Status: api.HealthPassing, - Output: structs.SerfCheckAliveOutput, + Node: member.Name, + CheckID: structs.SerfCheckID, + Name: structs.SerfCheckName, + Status: api.HealthPassing, + Output: structs.SerfCheckAliveOutput, + LastStatusModifyTime: s.clock(), }, // If there's existing information about the node, do not @@ -1356,11 +1357,12 @@ func (s *Server) handleFailedMember(member serf.Member) error { ID: types.NodeID(member.Tags["id"]), Address: member.Addr.String(), Check: &structs.HealthCheck{ - Node: member.Name, - CheckID: structs.SerfCheckID, - Name: structs.SerfCheckName, - Status: api.HealthCritical, - Output: structs.SerfCheckFailedOutput, + Node: member.Name, + CheckID: structs.SerfCheckID, + Name: structs.SerfCheckName, + Status: api.HealthCritical, + Output: structs.SerfCheckFailedOutput, + LastStatusModifyTime: s.clock(), }, // If there's existing information about the node, do not diff --git a/agent/consul/server.go b/agent/consul/server.go index c3893b207250..cb49e32ba9cb 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -248,17 +248,20 @@ type Server struct { shutdownCh chan struct{} shutdownLock sync.Mutex + // clock is used to timestamp healthchecks + clock func() time.Time + // embedded struct to hold all the enterprise specific data EnterpriseServer } -func NewServer(config *Config) (*Server, error) { - return NewServerLogger(config, nil, new(token.Store)) +func NewServer(config *Config, clock func() time.Time) (*Server, error) { + return NewServerLogger(config, nil, new(token.Store), clock) } // NewServer is used to construct a new Consul server from the // configuration, potentially returning an error -func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*Server, error) { +func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store, clock func() time.Time) (*Server, error) { // Check the protocol version. if err := config.CheckProtocolVersion(); err != nil { return nil, err @@ -346,6 +349,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* tombstoneGC: gc, serverLookup: NewServerLookup(), shutdownCh: shutdownCh, + clock: clock, } // Initialize enterprise specific server functionality diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 795dd97e2008..884020f61181 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -175,7 +175,7 @@ func newServer(c *Config) (*Server, error) { w = os.Stderr } logger := log.New(w, c.NodeName+" - ", log.LstdFlags|log.Lmicroseconds) - srv, err := NewServerLogger(c, logger, new(token.Store)) + srv, err := NewServerLogger(c, logger, new(token.Store), time.Now) if err != nil { return nil, err } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 3b1314f960da..e36aa5ea7f8c 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -6,6 +6,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" @@ -2130,16 +2131,18 @@ func TestStateStore_Service_Snapshot(t *testing.T) { func TestStateStore_EnsureCheck(t *testing.T) { s := testStateStore(t) + now := time.Now() // Create a check associated with the node check := &structs.HealthCheck{ - Node: "node1", - CheckID: "check1", - Name: "redis check", - Status: api.HealthPassing, - Notes: "test check", - Output: "aaa", - ServiceID: "service1", - ServiceName: "redis", + Node: "node1", + CheckID: "check1", + Name: "redis check", + Status: api.HealthPassing, + Notes: "test check", + Output: "aaa", + ServiceID: "service1", + ServiceName: "redis", + LastStatusModifyTime: now, } // Creating a check without a node returns error diff --git a/agent/local/state.go b/agent/local/state.go index ebca0c8fc843..7b87504543e2 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -214,10 +214,12 @@ type State struct { // are sent a message each time a proxy changes via Add or RemoveProxy. managedProxies map[string]*ManagedProxy managedProxyHandlers map[chan<- struct{}]struct{} + + clock func() time.Time } // NewState creates a new local state for the agent. -func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { +func NewState(c Config, lg *log.Logger, tokens *token.Store, clock func() time.Time) *State { l := &State{ config: c, logger: lg, @@ -229,6 +231,7 @@ func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { notifyHandlers: make(map[chan<- struct{}]struct{}), managedProxies: make(map[string]*ManagedProxy), managedProxyHandlers: make(map[chan<- struct{}]struct{}), + clock: clock, } l.SetDiscardCheckOutput(c.DiscardCheckOutput) return l @@ -434,6 +437,9 @@ func (l *State) AddCheck(check *structs.HealthCheck, token string) error { // hard-set the node name check.Node = l.config.NodeName + // init last status modify time + check.LastStatusModifyTime = l.clock().UTC() + l.SetCheckState(&CheckState{ Check: check, Token: token, @@ -572,6 +578,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { // Update status and mark out of sync c.Check.Status = status c.Check.Output = output + c.Check.LastStatusModifyTime = l.clock().UTC() c.InSync = false l.TriggerSyncChanges() } diff --git a/agent/local/state_test.go b/agent/local/state_test.go index e3024c458ff7..2712600ac028 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1381,6 +1381,75 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { } } +func TestAgent_UpdateCheck_LastStatusModifyTime(t *testing.T) { + t.Parallel() + a := agent.NewTestAgent(t.Name(), ` + discard_check_output = true + check_update_interval = "0s" # set to "0s" since otherwise output checks are deferred + `) + defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + + checkLastModifyTime := func(t *testing.T, id string, expected time.Time) { + chkReq := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: a.Config.NodeName, + } + var checks structs.IndexedHealthChecks + if err := a.RPC("Health.NodeChecks", &chkReq, &checks); err != nil { + t.Fatalf("err: %v", err) + } + + for _, check := range checks.HealthChecks { + if check.CheckID != types.CheckID(id) { + continue + } + + require.Equal(t, expected.Round(0), check.LastStatusModifyTime) + return + } + + require.Failf(t, "", "could not find check %s", id) + } + + // register a check + check := &structs.HealthCheck{ + Node: a.Config.NodeName, + CheckID: "web", + Name: "web", + Status: api.HealthPassing, + Output: "first output", + } + + err := a.State.AddCheck(check, "") + require.Nil(t, err) + a.State.SyncFull() + require.Nil(t, err) + checkLastModifyTime(t, "web", a.Now) + + t1 := a.Now + a.Now = a.Now.Add(time.Hour) + // no change, should not modify LastModifyTime + a.State.UpdateCheck(types.CheckID("web"), api.HealthPassing, "") + a.State.SyncFull() + require.Nil(t, err) + checkLastModifyTime(t, "web", t1) + + a.Now = a.Now.Add(time.Hour) + // output change, should not modify LastModifyTime + a.State.UpdateCheck(types.CheckID("web"), api.HealthPassing, "new OUTPUT") + a.State.SyncFull() + require.Nil(t, err) + checkLastModifyTime(t, "web", t1) + + a.Now = a.Now.Add(time.Hour) + // change status, shoud update LastModifyTime + a.State.UpdateCheck(types.CheckID("web"), api.HealthWarning, "new OUTPUT") + a.State.SyncFull() + require.Nil(t, err) + checkLastModifyTime(t, "web", a.Now) +} + func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) { t.Parallel() a := &agent.TestAgent{Name: t.Name(), HCL: ` @@ -1648,7 +1717,7 @@ func TestAgent_ServiceTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) - l := local.NewState(agent.LocalConfig(cfg), nil, tokens) + l := local.NewState(agent.LocalConfig(cfg), nil, tokens, time.Now) l.TriggerSyncChanges = func() {} l.AddService(&structs.NodeService{ID: "redis"}, "") @@ -1677,7 +1746,7 @@ func TestAgent_CheckTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) - l := local.NewState(agent.LocalConfig(cfg), nil, tokens) + l := local.NewState(agent.LocalConfig(cfg), nil, tokens, time.Now) l.TriggerSyncChanges = func() {} // Returns default when no token is set @@ -1702,7 +1771,7 @@ func TestAgent_CheckTokens(t *testing.T) { func TestAgent_CheckCriticalTime(t *testing.T) { t.Parallel() cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) - l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) + l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store), time.Now) l.TriggerSyncChanges = func() {} svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} @@ -1766,7 +1835,7 @@ func TestAgent_CheckCriticalTime(t *testing.T) { func TestAgent_AddCheckFailure(t *testing.T) { t.Parallel() cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) - l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) + l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store), time.Now) l.TriggerSyncChanges = func() {} // Add a check for a service that does not exist and verify that it fails @@ -1789,7 +1858,7 @@ func TestAgent_AliasCheck(t *testing.T) { require := require.New(t) cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) - l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) + l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store), time.Now) l.TriggerSyncChanges = func() {} // Add checks @@ -1905,7 +1974,7 @@ func TestState_Notify(t *testing.T) { t.Parallel() state := local.NewState(local.Config{}, - log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + log.New(os.Stderr, "", log.LstdFlags), &token.Store{}, time.Now) // Stub state syncing state.TriggerSyncChanges = func() {} @@ -1968,7 +2037,7 @@ func TestStateProxyManagement(t *testing.T) { state := local.NewState(local.Config{ ProxyBindMinPort: 20000, ProxyBindMaxPort: 20001, - }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}, time.Now) // Stub state syncing state.TriggerSyncChanges = func() {} @@ -2158,7 +2227,7 @@ func TestStateProxyRestore(t *testing.T) { // Wide random range to make it very unlikely to pass by chance ProxyBindMinPort: 10000, ProxyBindMaxPort: 20000, - }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}, time.Now) // Stub state syncing state.TriggerSyncChanges = func() {} @@ -2191,7 +2260,7 @@ func TestStateProxyRestore(t *testing.T) { // Wide random range to make it very unlikely to pass by chance ProxyBindMinPort: 10000, ProxyBindMaxPort: 20000, - }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}, time.Now) // Stub state syncing state2.TriggerSyncChanges = func() {} diff --git a/agent/local/testing.go b/agent/local/testing.go index 6ca9d12aea89..a29bc0e676ce 100644 --- a/agent/local/testing.go +++ b/agent/local/testing.go @@ -3,6 +3,7 @@ package local import ( "log" "os" + "time" "github.com/hashicorp/consul/agent/token" "github.com/mitchellh/go-testing-interface" @@ -13,7 +14,7 @@ func TestState(t testing.T) *State { result := NewState(Config{ ProxyBindMinPort: 20000, ProxyBindMaxPort: 20500, - }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}) + }, log.New(os.Stderr, "", log.LstdFlags), &token.Store{}, time.Now) result.TriggerSyncChanges = func() {} return result } diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 6183766b0c2b..f72c2a8a35c8 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -57,7 +57,7 @@ func TestManager_BasicLifecycle(t *testing.T) { }) logger := log.New(os.Stderr, "", log.LstdFlags) - state := local.NewState(local.Config{}, logger, &token.Store{}) + state := local.NewState(local.Config{}, logger, &token.Store{}, time.Now) source := &structs.QuerySource{ Node: "node1", Datacenter: "dc1", @@ -225,7 +225,7 @@ func TestManager_deliverLatest(t *testing.T) { logger := log.New(os.Stderr, "", log.LstdFlags) cfg := ManagerConfig{ Cache: cache.New(nil), - State: local.NewState(local.Config{}, logger, &token.Store{}), + State: local.NewState(local.Config{}, logger, &token.Store{}, time.Now), Source: &structs.QuerySource{ Node: "node1", Datacenter: "dc1", diff --git a/agent/structs/structs.go b/agent/structs/structs.go index c1bd34a9f95a..3ecab11fd3c6 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -890,6 +890,8 @@ type HealthCheck struct { Definition HealthCheckDefinition RaftIndex + + LastStatusModifyTime time.Time } type HealthCheckDefinition struct { diff --git a/agent/testagent.go b/agent/testagent.go index 301908ed9687..886ecee63419 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -88,6 +88,8 @@ type TestAgent struct { // Agent is the embedded consul agent. // It is valid after Start(). *Agent + + Now time.Time } // NewTestAgent returns a started agent with the given name and @@ -95,7 +97,7 @@ type TestAgent struct { // caller should call Shutdown() to stop the agent and remove temporary // directories. func NewTestAgent(name string, hcl string) *TestAgent { - a := &TestAgent{Name: name, HCL: hcl} + a := &TestAgent{Name: name, HCL: hcl, Now: time.Now().UTC()} a.Start() return a } @@ -157,6 +159,9 @@ func (a *TestAgent) Start() *TestAgent { agent.LogWriter = a.LogWriter agent.logger = log.New(logOutput, a.Name+" - ", log.LstdFlags|log.Lmicroseconds) agent.MemSink = metrics.NewInmemSink(1*time.Second, time.Minute) + agent.clock = func() time.Time { + return a.Now + } // we need the err var in the next exit condition if err := agent.Start(); err == nil {