Skip to content

Commit

Permalink
Merge pull request #9188 from hashicorp/dnephin/more-streaming-tests
Browse files Browse the repository at this point in the history
Add more streaming tests
  • Loading branch information
dnephin authored and hashicorp-ci committed Feb 26, 2021
1 parent 07bf4ec commit dd0b307
Show file tree
Hide file tree
Showing 9 changed files with 812 additions and 17 deletions.
4 changes: 4 additions & 0 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2280,6 +2280,10 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
// parseCheckServiceNodes is used to parse through a given set of services,
// and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query.
//
// TODO: idx parameter is not used except as a return value. Remove it.
// TODO: err parameter is only used for early return. Remove it and check from the
// caller.
func parseCheckServiceNodes(
tx ReadTxn, ws memdb.WatchSet, idx uint64,
services structs.ServiceNodes,
Expand Down
16 changes: 10 additions & 6 deletions agent/consul/state/catalog_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {

// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
// of stream.Events that describe the current state of a service health query.
//
// TODO: no tests for this yet
func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
tx := db.ReadTxn()
Expand All @@ -66,11 +64,17 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
event := stream.Event{
Index: idx,
Topic: topic,
Payload: EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
},
}
payload := EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &n,
}

if connect && n.Service.Kind == structs.ServiceKindConnectProxy {
payload.key = n.Service.Proxy.DestinationServiceName
}

event.Payload = payload

// append each event as a separate item so that they can be serialized
// separately, to prevent the encoding of one massive message.
Expand Down
160 changes: 158 additions & 2 deletions agent/consul/state/catalog_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,171 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/proto/pbcommon"

"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
)

func TestServiceHealthSnapshot(t *testing.T) {
store := NewStateStore(nil)

counter := newIndexCounter()
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
require.NoError(t, err)
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web"))
require.NoError(t, err)
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2))
require.NoError(t, err)

fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealth)
buf := &snapshotAppender{}
req := stream.SubscribeRequest{Key: "web"}

idx, err := fn(req, buf)
require.NoError(t, err)
require.Equal(t, counter.Last(), idx)

expected := [][]stream.Event{
{
testServiceHealthEvent(t, "web", func(e *stream.Event) error {
e.Index = counter.Last()
csn := getPayloadCheckServiceNode(e.Payload)
csn.Node.CreateIndex = 1
csn.Node.ModifyIndex = 1
csn.Service.CreateIndex = 2
csn.Service.ModifyIndex = 2
csn.Checks[0].CreateIndex = 1
csn.Checks[0].ModifyIndex = 1
csn.Checks[1].CreateIndex = 2
csn.Checks[1].ModifyIndex = 2
return nil
}),
},
{
testServiceHealthEvent(t, "web", evNode2, func(e *stream.Event) error {
e.Index = counter.Last()
csn := getPayloadCheckServiceNode(e.Payload)
csn.Node.CreateIndex = 3
csn.Node.ModifyIndex = 3
csn.Service.CreateIndex = 3
csn.Service.ModifyIndex = 3
for i := range csn.Checks {
csn.Checks[i].CreateIndex = 3
csn.Checks[i].ModifyIndex = 3
}
return nil
}),
},
}
assertDeepEqual(t, expected, buf.events, cmpEvents)
}

func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
store := NewStateStore(nil)

counter := newIndexCounter()
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
require.NoError(t, err)
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web"))
require.NoError(t, err)
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regSidecar))
require.NoError(t, err)
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2))
require.NoError(t, err)
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2, regSidecar))
require.NoError(t, err)

fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect)
buf := &snapshotAppender{}
req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect}

idx, err := fn(req, buf)
require.NoError(t, err)
require.Equal(t, counter.Last(), idx)

expected := [][]stream.Event{
{
testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error {
e.Index = counter.Last()
ep := e.Payload.(EventPayloadCheckServiceNode)
ep.key = "web"
e.Payload = ep
csn := ep.Value
csn.Node.CreateIndex = 1
csn.Node.ModifyIndex = 1
csn.Service.CreateIndex = 3
csn.Service.ModifyIndex = 3
csn.Checks[0].CreateIndex = 1
csn.Checks[0].ModifyIndex = 1
csn.Checks[1].CreateIndex = 3
csn.Checks[1].ModifyIndex = 3
return nil
}),
},
{
testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error {
e.Index = counter.Last()
ep := e.Payload.(EventPayloadCheckServiceNode)
ep.key = "web"
e.Payload = ep
csn := ep.Value
csn.Node.CreateIndex = 4
csn.Node.ModifyIndex = 4
csn.Service.CreateIndex = 5
csn.Service.ModifyIndex = 5
csn.Checks[0].CreateIndex = 4
csn.Checks[0].ModifyIndex = 4
csn.Checks[1].CreateIndex = 5
csn.Checks[1].ModifyIndex = 5
return nil
}),
},
}
assertDeepEqual(t, expected, buf.events, cmpEvents)
}

type snapshotAppender struct {
events [][]stream.Event
}

func (s *snapshotAppender) Append(events []stream.Event) {
s.events = append(s.events, events)
}

type indexCounter struct {
value uint64
}

func (c *indexCounter) Next() uint64 {
c.value++
return c.value
}

func (c *indexCounter) Last() uint64 {
return c.value
}

func newIndexCounter() *indexCounter {
return &indexCounter{}
}

var _ stream.SnapshotAppender = (*snapshotAppender)(nil)

func evIndexes(idx, create, modify uint64) func(e *stream.Event) error {
return func(e *stream.Event) error {
e.Index = idx
csn := getPayloadCheckServiceNode(e.Payload)
csn.Node.CreateIndex = create
csn.Node.ModifyIndex = modify
csn.Service.CreateIndex = create
csn.Service.ModifyIndex = modify
return nil
}
}

func TestServiceHealthEventsFromChanges(t *testing.T) {
cases := []struct {
Name string
Expand Down
Loading

0 comments on commit dd0b307

Please sign in to comment.