Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Performance On Large clusters] Reduce updates on large services #4720

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/consul/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,8 @@ func TestLeader_ChangeServerID(t *testing.T) {
defer os.RemoveAll(dir4)
defer s4.Shutdown()
joinLAN(t, s4, s1)
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
testrpc.WaitForTestAgent(t, s4.RPC, "dc1")
servers[2] = s4

// While integrating #3327 it uncovered that this test was flaky. The
Expand Down
63 changes: 47 additions & 16 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) err
// Get the indexes.
if n != nil {
node.CreateIndex = n.CreateIndex
node.ModifyIndex = n.ModifyIndex
// We do not need to update anything
if node.IsSame(n) {
return nil
}
node.ModifyIndex = idx
} else {
node.CreateIndex = idx
Expand Down Expand Up @@ -687,14 +692,6 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
// conversion doesn't populate any of the node-specific information.
// That's always populated when we read from the state store.
entry := svc.ToServiceNode(node)
if existing != nil {
entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex
entry.ModifyIndex = idx
} else {
entry.CreateIndex = idx
entry.ModifyIndex = idx
}

// Get the node
n, err := tx.First("nodes", "id", node)
if err != nil {
Expand All @@ -703,6 +700,21 @@ func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *st
if n == nil {
return ErrMissingNode
}
if existing != nil {
serviceNode := existing.(*structs.ServiceNode)
entry.CreateIndex = serviceNode.CreateIndex
entry.ModifyIndex = serviceNode.ModifyIndex
// We cannot return here because: we want to keep existing behaviour (ex: failed node lookup -> ErrMissingNode)
// It might be modified in future, but it requires changing many unit tests
// Enforcing saving the entry also ensures that if we add default values in .ToServiceNode()
// those values will be saved even if node is not really modified for a while.
if entry.IsSame(serviceNode) {
return nil
}
} else {
entry.CreateIndex = idx
}
entry.ModifyIndex = idx

// Insert the service and update the index
if err := tx.Insert("services", entry); err != nil {
Expand Down Expand Up @@ -1236,8 +1248,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec

// Set the indexes
if existing != nil {
hc.CreateIndex = existing.(*structs.HealthCheck).CreateIndex
hc.ModifyIndex = idx
existingCheck := existing.(*structs.HealthCheck)
hc.CreateIndex = existingCheck.CreateIndex
hc.ModifyIndex = existingCheck.ModifyIndex
} else {
hc.CreateIndex = idx
hc.ModifyIndex = idx
Expand All @@ -1257,6 +1270,7 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
return ErrMissingNode
}

modified := true
// If the check is associated with a service, check that we have
// a registration for the service.
if hc.ServiceID != "" {
Expand All @@ -1272,14 +1286,24 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
svc := service.(*structs.ServiceNode)
hc.ServiceName = svc.ServiceName
hc.ServiceTags = svc.ServiceTags
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
if existing != nil && existing.(*structs.HealthCheck).IsSame(hc) {
modified = false
} else {
// Check has been modified, we trigger a index service change
if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
}
} else {
// Update the status for all the services associated with this node
err = s.updateAllServiceIndexesOfNode(tx, idx, hc.Node)
if err != nil {
return err
if existing != nil && existing.(*structs.HealthCheck).IsSame(hc) {
modified = false
} else {
// Since the check has been modified, it impacts all services of node
// Update the status for all the services associated with this node
err = s.updateAllServiceIndexesOfNode(tx, idx, hc.Node)
if err != nil {
return err
}
}
}

Expand All @@ -1303,6 +1327,13 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
}
}
}
if modified {
// We update the modify index, ONLY if something has changed, thus
// With constant output, no change is seen when watching a service
// With huge number of nodes where anti-entropy updates continuously
// the checks, but not the values within the check
hc.ModifyIndex = idx
}

// Persist the check registration in the db.
if err := tx.Insert("checks", hc); err != nil {
Expand Down
117 changes: 87 additions & 30 deletions agent/consul/state/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
CheckID: "check1",
Name: "check",
Status: "critical",
RaftIndex: structs.RaftIndex{CreateIndex: 3, ModifyIndex: 4},
RaftIndex: structs.RaftIndex{CreateIndex: 3, ModifyIndex: 3},
},
&structs.HealthCheck{
Node: "node1",
Expand Down Expand Up @@ -491,8 +491,8 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
}
c1 := out[0]
if c1.Node != nodeName || c1.CheckID != "check1" || c1.Name != "check" ||
c1.CreateIndex != 3 || c1.ModifyIndex != 4 {
t.Fatalf("bad check returned: %#v", c1)
c1.CreateIndex != 3 || c1.ModifyIndex != 3 {
t.Fatalf("bad check returned, should not be modified: %#v", c1)
}

c2 := out[1]
Expand All @@ -508,6 +508,9 @@ func deprecatedEnsureNodeWithoutIDCanRegister(t *testing.T, s *Store, nodeName s
in := &structs.Node{
Node: nodeName,
Address: "1.1.1.9",
Meta: map[string]string{
"version": string(txIdx),
},
}
if err := s.EnsureNode(txIdx, in); err != nil {
t.Fatalf("err: %s", err)
Expand All @@ -517,10 +520,10 @@ func deprecatedEnsureNodeWithoutIDCanRegister(t *testing.T, s *Store, nodeName s
t.Fatalf("err: %s", err)
}
if idx != txIdx {
t.Fatalf("index should be %q, was: %q", txIdx, idx)
t.Fatalf("index should be %v, was: %v", txIdx, idx)
}
if out.Node != nodeName {
t.Fatalf("unexpected result out = %q, nodeName supposed to be %s", out, nodeName)
t.Fatalf("unexpected result out = %v, nodeName supposed to be %s", out, nodeName)
}
}

Expand Down Expand Up @@ -726,8 +729,12 @@ func TestStateStore_EnsureNode(t *testing.T) {
}

// Update the node registration
in.Address = "1.1.1.2"
if err := s.EnsureNode(2, in); err != nil {
in2 := &structs.Node{
ID: in.ID,
Node: in.Node,
Address: "1.1.1.2",
}
if err := s.EnsureNode(2, in2); err != nil {
t.Fatalf("err: %s", err)
}

Expand All @@ -745,15 +752,32 @@ func TestStateStore_EnsureNode(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}

// Re-inserting data should not modify ModifiedIndex
if err := s.EnsureNode(3, in2); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if out.CreateIndex != 1 || out.ModifyIndex != 2 || out.Address != "1.1.1.2" {
t.Fatalf("node was modified: %#v", out)
}

// Node upsert preserves the create index
if err := s.EnsureNode(3, in); err != nil {
in3 := &structs.Node{
ID: in.ID,
Node: in.Node,
Address: "1.1.1.3",
}
if err := s.EnsureNode(3, in3); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if out.CreateIndex != 1 || out.ModifyIndex != 3 || out.Address != "1.1.1.2" {
if out.CreateIndex != 1 || out.ModifyIndex != 3 || out.Address != "1.1.1.3" {
t.Fatalf("node was modified: %#v", out)
}
if idx != 3 {
Expand Down Expand Up @@ -2177,32 +2201,60 @@ func TestStateStore_EnsureCheck(t *testing.T) {
t.Fatalf("bad: %#v", checks[0])
}

// Modify the health check
check.Output = "bbb"
if err := s.EnsureCheck(4, check); err != nil {
t.Fatalf("err: %s", err)
}
testCheckOutput := func(expectedNodeIndex, expectedIndexForCheck uint64, outputTxt string) {
// Check that we successfully updated
idx, checks, err = s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != expectedNodeIndex {
t.Fatalf("bad index: %d", idx)
}

// Check that we successfully updated
idx, checks, err = s.NodeChecks(nil, "node1")
if err != nil {
t.Fatalf("err: %s", err)
if len(checks) != 1 {
t.Fatalf("wrong number of checks: %d", len(checks))
}
if checks[0].Output != outputTxt {
t.Fatalf("wrong check output: %#v", checks[0])
}
if checks[0].CreateIndex != 3 || checks[0].ModifyIndex != expectedIndexForCheck {
t.Fatalf("bad index: %#v, expectedIndexForCheck:=%v ", checks[0], expectedIndexForCheck)
}
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
// Do not really modify the health check content the health check
check = &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "redis check",
Status: api.HealthPassing,
Notes: "test check",
Output: "aaa",
ServiceID: "service1",
ServiceName: "redis",
}
if len(checks) != 1 {
t.Fatalf("wrong number of checks: %d", len(checks))
if err := s.EnsureCheck(4, check); err != nil {
t.Fatalf("err: %s", err)
}
if checks[0].Output != "bbb" {
t.Fatalf("wrong check output: %#v", checks[0])
testCheckOutput(4, 3, check.Output)

// Do modify the heathcheck
check = &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "redis check",
Status: api.HealthPassing,
Notes: "test check",
Output: "bbbmodified",
ServiceID: "service1",
ServiceName: "redis",
}
if checks[0].CreateIndex != 3 || checks[0].ModifyIndex != 4 {
t.Fatalf("bad index: %#v", checks[0])
if err := s.EnsureCheck(5, check); err != nil {
t.Fatalf("err: %s", err)
}
testCheckOutput(5, 5, "bbbmodified")

// Index tables were updated
if idx := s.maxIndex("checks"); idx != 4 {
if idx := s.maxIndex("checks"); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
}
Expand Down Expand Up @@ -2890,7 +2942,7 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
}

// Node updates alter the returned index and fire the watch.
testRegisterNode(t, s, 8, "node1")
testRegisterNodeWithChange(t, s, 8, "node1")
if !watchFired(ws) {
t.Fatalf("bad")
}
Expand All @@ -2905,7 +2957,8 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
}

// Service updates alter the returned index and fire the watch.
testRegisterService(t, s, 9, "node1", "service1")

testRegisterServiceWithChange(t, s, 9, "node1", "service1", true)
if !watchFired(ws) {
t.Fatalf("bad")
}
Expand Down Expand Up @@ -3261,6 +3314,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
ID: "service1",
Service: "service1",
Address: "1.1.1.1",
Meta: make(map[string]string),
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
Expand All @@ -3272,6 +3326,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
ID: "service2",
Service: "service2",
Address: "1.1.1.1",
Meta: make(map[string]string),
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
Expand Down Expand Up @@ -3313,6 +3368,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service1",
Address: "1.1.1.1",
Port: 1111,
Meta: make(map[string]string),
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 4,
Expand All @@ -3324,6 +3380,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service2",
Address: "1.1.1.1",
Port: 1111,
Meta: make(map[string]string),
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
Expand All @@ -3344,7 +3401,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
if len(dump) != 1 || !reflect.DeepEqual(dump[0], expect[0]) {
t.Fatalf("bad: %#v", dump)
t.Fatalf("bad: len=%#v dump=%#v expect=%#v", len(dump), dump[0], expect[0])
}

// Generate a dump of all the nodes
Expand Down
Loading