Skip to content

Commit

Permalink
Merge pull request #9882 from hashicorp/dnephin/state-index-mesh
Browse files Browse the repository at this point in the history
state: use constants and add indexer tests for gateway tables
  • Loading branch information
dnephin authored Mar 18, 2021
2 parents 43f6544 + 4d1d19e commit cffd055
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 69 deletions.
60 changes: 26 additions & 34 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2092,7 +2092,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
tx := s.db.Txn(false)
defer tx.Abort()

iter, err := gatewayServices(tx, gateway, entMeta)
iter, err := tx.Get(tableGatewayServices, indexGateway, structs.NewServiceName(gateway, entMeta))
if err != nil {
return 0, nil, fmt.Errorf("failed gateway services lookup: %s", err)
}
Expand Down Expand Up @@ -2386,7 +2386,7 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en
// Delete all associated with gateway first, to avoid keeping mappings that were removed
sn := structs.NewServiceName(conf.GetName(), entMeta)

if _, err := tx.DeleteAll(tableGatewayServices, "gateway", sn); err != nil {
if _, err := tx.DeleteAll(tableGatewayServices, indexGateway, sn); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := truncateGatewayServiceTopologyMappings(tx, idx, sn, conf.GetKind()); err != nil {
Expand Down Expand Up @@ -2524,7 +2524,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
continue
}

existing, err := tx.First(tableGatewayServices, "id", service.Gateway, sn.CompoundServiceName(), service.Port)
existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn.CompoundServiceName(), service.Port)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
}
Expand Down Expand Up @@ -2559,7 +2559,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
// Check if mapping already exists in table if it's already in the table
// Avoid insert if nothing changed
existing, err := tx.First(tableGatewayServices, "id", mapping.Gateway, mapping.Service, mapping.Port)
existing, err := tx.First(tableGatewayServices, indexID, mapping.Gateway, mapping.Service, mapping.Port)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
}
Expand Down Expand Up @@ -2597,7 +2597,8 @@ func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.NodeSe
return nil
}

svcGateways, err := serviceGateways(tx, structs.WildcardSpecifier, &svc.EnterpriseMeta)
sn := structs.ServiceName{Name: structs.WildcardSpecifier, EnterpriseMeta: svc.EnterpriseMeta}
svcGateways, err := tx.Get(tableGatewayServices, indexService, sn)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.Service, err)
}
Expand All @@ -2620,7 +2621,8 @@ func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.NodeSe

func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) error {
// Clean up association between service name and gateways if needed
gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta)
sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta}
gateways, err := tx.Get(tableGatewayServices, indexService, sn)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err)
}
Expand Down Expand Up @@ -2652,21 +2654,11 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
return nil
}

// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
// all the gateways mapped to this service.
func serviceGateways(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(tableGatewayServices, "service", structs.NewServiceName(name, entMeta))
}

func gatewayServices(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
return tx.Get(tableGatewayServices, "gateway", structs.NewServiceName(name, entMeta))
}

func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayServices, error) {
tx := s.db.ReadTxn()
defer tx.Abort()

iter, err := tx.Get(tableGatewayServices, "id")
iter, err := tx.Get(tableGatewayServices, indexID)
if err != nil {
return 0, nil, fmt.Errorf("failed to dump gateway-services: %s", err)
}
Expand Down Expand Up @@ -2709,7 +2701,7 @@ func (s *Store) collectGatewayServices(tx ReadTxn, ws memdb.WatchSet, iter memdb
// We might need something like the service_last_extinction index?
func serviceGatewayNodes(tx ReadTxn, ws memdb.WatchSet, service string, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) (uint64, structs.ServiceNodes, error) {
// Look up gateway name associated with the service
gws, err := serviceGateways(tx, service, entMeta)
gws, err := tx.Get(tableGatewayServices, indexService, structs.NewServiceName(service, entMeta))
if err != nil {
return 0, nil, fmt.Errorf("failed gateway lookup: %s", err)
}
Expand Down Expand Up @@ -2992,9 +2984,9 @@ func downstreamsFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, sn structs.Se
func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) {
// To fetch upstreams we query services that have the input listed as a downstream
// To fetch downstreams we query services that have the input listed as an upstream
index := "downstream"
index := indexDownstream
if downstreams {
index = "upstream"
index = indexUpstream
}

iter, err := tx.Get(tableMeshTopology, index, service)
Expand All @@ -3008,7 +3000,7 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se
resp []structs.ServiceName
)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(*structs.UpstreamDownstream)
entry := raw.(*upstreamDownstream)
if entry.ModifyIndex > idx {
idx = entry.ModifyIndex
}
Expand Down Expand Up @@ -3053,20 +3045,20 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
upstreamMeta := structs.NewEnterpriseMeta(u.DestinationNamespace)
upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta)

obj, err := tx.First(tableMeshTopology, "id", upstream, downstream)
obj, err := tx.First(tableMeshTopology, indexID, upstream, downstream)
if err != nil {
return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err)
}
sid := svc.CompoundServiceID()
uid := structs.UniqueID(node, sid.String())

var mapping *structs.UpstreamDownstream
if existing, ok := obj.(*structs.UpstreamDownstream); ok {
var mapping *upstreamDownstream
if existing, ok := obj.(*upstreamDownstream); ok {
rawCopy, err := copystructure.Copy(existing)
if err != nil {
return fmt.Errorf("failed to copy existing topology mapping: %v", err)
}
mapping, ok = rawCopy.(*structs.UpstreamDownstream)
mapping, ok = rawCopy.(*upstreamDownstream)
if !ok {
return fmt.Errorf("unexpected topology type %T", rawCopy)
}
Expand All @@ -3076,7 +3068,7 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
inserted[upstream] = true
}
if mapping == nil {
mapping = &structs.UpstreamDownstream{
mapping = &upstreamDownstream{
Upstream: upstream,
Downstream: downstream,
Refs: map[string]struct{}{uid: {}},
Expand All @@ -3097,7 +3089,7 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS

for u := range oldUpstreams {
if !inserted[u] {
if _, err := tx.DeleteAll(tableMeshTopology, "id", u, downstream); err != nil {
if _, err := tx.DeleteAll(tableMeshTopology, indexID, u, downstream); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
Expand All @@ -3119,14 +3111,14 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
sid := service.CompoundServiceID()
uid := structs.UniqueID(service.Node, sid.String())

iter, err := tx.Get(tableMeshTopology, "downstream", sn)
iter, err := tx.Get(tableMeshTopology, indexDownstream, sn)
if err != nil {
return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err)
}

mappings := make([]*structs.UpstreamDownstream, 0)
mappings := make([]*upstreamDownstream, 0)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
mappings = append(mappings, raw.(*structs.UpstreamDownstream))
mappings = append(mappings, raw.(*upstreamDownstream))
}

// Do the updates in a separate loop so we don't trash the iterator.
Expand All @@ -3135,7 +3127,7 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
if err != nil {
return fmt.Errorf("failed to copy existing topology mapping: %v", err)
}
copy, ok := rawCopy.(*structs.UpstreamDownstream)
copy, ok := rawCopy.(*upstreamDownstream)
if !ok {
return fmt.Errorf("unexpected topology type %T", rawCopy)
}
Expand Down Expand Up @@ -3169,7 +3161,7 @@ func insertGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga
return nil
}

mapping := structs.UpstreamDownstream{
mapping := upstreamDownstream{
Upstream: gs.Service,
Downstream: gs.Gateway,
RaftIndex: gs.RaftIndex,
Expand All @@ -3190,7 +3182,7 @@ func deleteGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga
return nil
}

if _, err := tx.DeleteAll(tableMeshTopology, "id", gs.Service, gs.Gateway); err != nil {
if _, err := tx.DeleteAll(tableMeshTopology, indexID, gs.Service, gs.Gateway); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
Expand All @@ -3206,7 +3198,7 @@ func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway str
return nil
}

if _, err := tx.DeleteAll(tableMeshTopology, "downstream", gateway); err != nil {
if _, err := tx.DeleteAll(tableMeshTopology, indexDownstream, gateway); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
}
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
Expand Down
7 changes: 6 additions & 1 deletion agent/consul/state/catalog_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,12 @@ func connectEventsByServiceKind(tx ReadTxn, origEvent stream.Event) ([]stream.Ev

case structs.ServiceKindTerminatingGateway:
var result []stream.Event
iter, err := gatewayServices(tx, node.Service.Service, &node.Service.EnterpriseMeta)

sn := structs.ServiceName{
Name: node.Service.Service,
EnterpriseMeta: node.Service.EnterpriseMeta,
}
iter, err := tx.Get(tableGatewayServices, indexGateway, sn)
if err != nil {
return nil, err
}
Expand Down
88 changes: 88 additions & 0 deletions agent/consul/state/catalog_oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,94 @@ func testIndexerTableChecks() map[string]indexerTestCase {
}
}

func testIndexerTableMeshTopology() map[string]indexerTestCase {
obj := upstreamDownstream{
Upstream: structs.ServiceName{Name: "UpStReAm"},
Downstream: structs.ServiceName{Name: "DownStream"},
}

return map[string]indexerTestCase{
indexID: {
read: indexValue{
source: []interface{}{
structs.ServiceName{Name: "UpStReAm"},
structs.ServiceName{Name: "DownStream"},
},
expected: []byte("upstream\x00downstream\x00"),
},
write: indexValue{
source: obj,
expected: []byte("upstream\x00downstream\x00"),
},
},
indexUpstream: {
read: indexValue{
source: structs.ServiceName{Name: "UpStReAm"},
expected: []byte("upstream\x00"),
},
write: indexValue{
source: obj,
expected: []byte("upstream\x00"),
},
},
indexDownstream: {
read: indexValue{
source: structs.ServiceName{Name: "DownStream"},
expected: []byte("downstream\x00"),
},
write: indexValue{
source: obj,
expected: []byte("downstream\x00"),
},
},
}
}

func testIndexerTableGatewayServices() map[string]indexerTestCase {
obj := &structs.GatewayService{
Gateway: structs.ServiceName{Name: "GateWay"},
Service: structs.ServiceName{Name: "SerVice"},
Port: 50123,
}
encodedPort := string([]byte{0x96, 0x8f, 0x06, 0, 0, 0, 0, 0, 0, 0})
return map[string]indexerTestCase{
indexID: {
read: indexValue{
source: []interface{}{
structs.ServiceName{Name: "GateWay"},
structs.ServiceName{Name: "SerVice"},
50123,
},
expected: []byte("gateway\x00service\x00" + encodedPort),
},
write: indexValue{
source: obj,
expected: []byte("gateway\x00service\x00" + encodedPort),
},
},
indexGateway: {
read: indexValue{
source: structs.ServiceName{Name: "GateWay"},
expected: []byte("gateway\x00"),
},
write: indexValue{
source: obj,
expected: []byte("gateway\x00"),
},
},
indexService: {
read: indexValue{
source: structs.ServiceName{Name: "SerVice"},
expected: []byte("service\x00"),
},
write: indexValue{
source: obj,
expected: []byte("service\x00"),
},
},
}
}

func testIndexerTableNodes() map[string]indexerTestCase {
return map[string]indexerTestCase{
indexID: {
Expand Down
Loading

0 comments on commit cffd055

Please sign in to comment.