Skip to content

Commit

Permalink
Update peering state and RPC for deferred deletion
Browse files Browse the repository at this point in the history
When deleting a peering we do not want to delete the peering and all
imported data in a single operation, since deleting a large amount of
data at once could overload Consul.

Instead we defer deletion of peerings so that:

1. When a peering deletion request is received via gRPC the peering is
   marked for deletion by setting the DeletedAt field.

2. A leader routine will monitor for peerings that are marked for
   deletion and kick off a throttled deletion of all imported resources
   before deleting the peering itself.

This commit mostly addresses point #1 by modifying the peering service
to mark peerings for deletion. Another key change is to add a
PeeringListDeleted state store function which can return all peerings
marked for deletion. This function is what will be watched by the
deferred deletion leader routine.
  • Loading branch information
freddygv committed Jun 13, 2022
1 parent 71b2545 commit cc921a9
Show file tree
Hide file tree
Showing 14 changed files with 880 additions and 420 deletions.
5 changes: 0 additions & 5 deletions agent/consul/peering_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,6 @@ func (a *peeringApply) PeeringWrite(req *pbpeering.PeeringWriteRequest) error {
return err
}

func (a *peeringApply) PeeringDelete(req *pbpeering.PeeringDeleteRequest) error {
_, err := a.srv.raftApplyProtobuf(structs.PeeringDeleteType, req)
return err
}

// TODO(peering): This needs RPC metrics interceptor since it's not triggered by an RPC.
func (a *peeringApply) PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error {
_, err := a.srv.raftApplyProtobuf(structs.PeeringTerminateByIDType, req)
Expand Down
64 changes: 61 additions & 3 deletions agent/consul/state/peering.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
const (
tablePeering = "peering"
tablePeeringTrustBundles = "peering-trust-bundles"

indexDeleted = "deleted"
)

func peeringTableSchema() *memdb.TableSchema {
Expand All @@ -42,6 +44,15 @@ func peeringTableSchema() *memdb.TableSchema {
prefixIndex: prefixIndexFromQueryNoNamespace,
},
},
indexDeleted: {
Name: indexDeleted,
AllowMissing: false,
Unique: false,
Indexer: indexerSingle{
readIndex: indexDeletedFromBoolQuery,
writeIndex: indexDeletedFromPeering,
},
},
},
}
}
Expand Down Expand Up @@ -82,6 +93,17 @@ func indexIDFromPeering(raw interface{}) ([]byte, error) {
return b.Bytes(), nil
}

func indexDeletedFromPeering(raw interface{}) ([]byte, error) {
p, ok := raw.(*pbpeering.Peering)
if !ok {
return nil, fmt.Errorf("unexpected type %T for *pbpeering.Peering index", raw)
}

var b indexBuilder
b.Bool(!p.IsActive())
return b.Bytes(), nil
}

func (s *Store) PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
Expand Down Expand Up @@ -205,10 +227,19 @@ func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error {
}

if existing != nil {
// Prevent modifications to Peering marked for deletion
if !existing.IsActive() {
return fmt.Errorf("cannot write to peering that is marked for deletion")
}

p.CreateIndex = existing.CreateIndex
p.ID = existing.ID

} else {
if !p.IsActive() {
return fmt.Errorf("cannot create a new peering marked for deletion")
}

// TODO(peering): consider keeping PeeringState enum elsewhere?
p.State = pbpeering.PeeringState_INITIAL
p.CreateIndex = idx
Expand All @@ -230,8 +261,6 @@ func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error {
return tx.Commit()
}

// TODO(peering): replace with deferred deletion since this operation
// should involve cleanup of data associated with the peering.
func (s *Store) PeeringDelete(idx uint64, q Query) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
Expand All @@ -245,6 +274,10 @@ func (s *Store) PeeringDelete(idx uint64, q Query) error {
return nil
}

if existing.(*pbpeering.Peering).IsActive() {
return fmt.Errorf("cannot delete a peering without first marking for deletion")
}

if err := tx.Delete(tablePeering, existing); err != nil {
return fmt.Errorf("failed deleting peering: %v", err)
}
Expand Down Expand Up @@ -499,7 +532,7 @@ func peeringsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, serviceName string, en
if idx > maxIdx {
maxIdx = idx
}
if peering == nil {
if peering == nil || !peering.IsActive() {
continue
}
peerings = append(peerings, peering)
Expand Down Expand Up @@ -734,3 +767,28 @@ func peersForServiceTxn(
}
return idx, results, nil
}

func (s *Store) PeeringListDeleted(ws memdb.WatchSet) (uint64, []*pbpeering.Peering, error) {
tx := s.db.ReadTxn()
defer tx.Abort()

return peeringListDeletedTxn(tx, ws)
}

func peeringListDeletedTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*pbpeering.Peering, error) {
iter, err := tx.Get(tablePeering, indexDeleted, BoolQuery{Value: true})
if err != nil {
return 0, nil, fmt.Errorf("failed peering lookup: %v", err)
}

// Instead of watching iter.WatchCh() we only need to watch the index entry for the peering table
// This is sufficient to pick up any changes to peerings.
idx := maxIndexWatchTxn(tx, ws, tablePeering)

var result []*pbpeering.Peering
for t := iter.Next(); t != nil; t = iter.Next() {
result = append(result, t.(*pbpeering.Peering))
}

return idx, result, nil
}
85 changes: 85 additions & 0 deletions agent/consul/state/peering_oss_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//go:build !consulent
// +build !consulent

package state

import (
"time"

"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
)

func testIndexerTablePeering() map[string]indexerTestCase {
id := "432feb2f-5476-4ae2-b33c-e43640ca0e86"
encodedID := []byte{0x43, 0x2f, 0xeb, 0x2f, 0x54, 0x76, 0x4a, 0xe2, 0xb3, 0x3c, 0xe4, 0x36, 0x40, 0xca, 0xe, 0x86}

obj := &pbpeering.Peering{
Name: "TheName",
ID: id,
DeletedAt: structs.TimeToProto(time.Now()),
}

return map[string]indexerTestCase{
indexID: {
read: indexValue{
source: "432feb2f-5476-4ae2-b33c-e43640ca0e86",
expected: encodedID,
},
write: indexValue{
source: obj,
expected: encodedID,
},
},
indexName: {
read: indexValue{
source: Query{
Value: "TheNAME",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition("pArTition"),
},
expected: []byte("thename\x00"),
},
write: indexValue{
source: obj,
expected: []byte("thename\x00"),
},
prefix: []indexValue{
{
source: *structs.DefaultEnterpriseMetaInPartition("pArTition"),
expected: nil,
},
},
},
indexDeleted: {
read: indexValue{
source: BoolQuery{
Value: true,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition("partITION"),
},
expected: []byte("\x01"),
},
write: indexValue{
source: obj,
expected: []byte("\x01"),
},
extra: []indexerTestCase{
{
read: indexValue{
source: BoolQuery{
Value: false,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition("partITION"),
},
expected: []byte("\x00"),
},
write: indexValue{
source: &pbpeering.Peering{
Name: "TheName",
Partition: "PartItioN",
},
expected: []byte("\x00"),
},
},
},
},
}
}
Loading

0 comments on commit cc921a9

Please sign in to comment.