Skip to content

Commit

Permalink
Ingestion filtering should use OR logic for rules rather than AND (#5303
Browse files Browse the repository at this point in the history
)

* Refactor ingestion filtering rules for OR rather than AND

* Fix failing tests

* Rename variable

* rename func name to IsEnabled

* Refactor to remove interface dependency

* Add extra assertion to asset_test

* Add changelog entry
  • Loading branch information
aditya1702 authored May 10, 2024
1 parent fbe7631 commit 0f6abb0
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 48 deletions.
7 changes: 6 additions & 1 deletion services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased


### Breaking Changes

- Change ingestion filtering logic to store transactions if any filter matches on it. ([5303](https://github.com/stellar/go/pull/5303))
- The previous behaviour was to store a tx only if both asset and account filters match together. So even if a tx matched an account filter but failed to match an asset filter, it would not be stored by Horizon.

## 2.30.0

**This release adds support for Protocol 21**
Expand Down
30 changes: 17 additions & 13 deletions services/horizon/internal/ingest/filters/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 +26,45 @@ func NewAccountFilter() AccountFilter {
}
}

func (filter *accountFilter) Name() string {
func (f *accountFilter) Name() string {
return "filters.accountFilter"
}

func (filter *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error {
func (f *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error {
// only need to re-initialize the filter config state(rules) if its cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > filter.lastModified {
if filterConfig.LastModified > f.lastModified {
logger.Infof("New Account Filter config detected, reloading new config %v ", *filterConfig)

filter.enabled = filterConfig.Enabled
filter.whitelistedAccountsSet = listToSet(filterConfig.Whitelist)
filter.lastModified = filterConfig.LastModified
f.enabled = filterConfig.Enabled
f.whitelistedAccountsSet = listToSet(filterConfig.Whitelist)
f.lastModified = filterConfig.LastModified
}

return nil
}

func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
// filtering is disabled if the whitelist is empty for now, as that is the only filter rule
if len(f.whitelistedAccountsSet) == 0 || !f.enabled {
return true, nil
func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) {
if !f.isEnabled() {
return false, true, nil
}

participants, err := processors.ParticipantsForTransaction(0, transaction)
if err != nil {
return false, err
return true, false, err
}

// NOTE: this assumes that the participant list has a small memory footprint
// otherwise, we should be doing the filtering on the DB side
for _, p := range participants {
if f.whitelistedAccountsSet.Contains(p.Address()) {
return true, nil
return true, true, nil
}
}
return false, nil
return true, false, nil
}

func (f accountFilter) isEnabled() bool {
// filtering is disabled if the whitelist is empty for now, as that is the only filter rule
return len(f.whitelistedAccountsSet) >= 1 && f.enabled
}
12 changes: 8 additions & 4 deletions services/horizon/internal/ingest/filters/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ func TestAccountFilterAllowsWhenMatch(t *testing.T) {
err := filter.RefreshAccountFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
"GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"))

tt.NoError(err)
tt.Equal(isEnabled, true)
tt.Equal(result, true)
}

Expand All @@ -47,13 +48,14 @@ func TestAccountFilterAllowsWhenDisabled(t *testing.T) {
err := filter.RefreshAccountFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"))

tt.NoError(err)

// there is no match on filter rule, but since filter is disabled, it should allow all
tt.Equal(isEnabled, false)
tt.Equal(result, true)
}

Expand All @@ -70,11 +72,12 @@ func TestAccountFilterAllowsWhenEmptyWhitelist(t *testing.T) {
err := filter.RefreshAccountFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
"GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"))

tt.NoError(err)
tt.Equal(isEnabled, false)
tt.Equal(result, true)
}

Expand All @@ -92,11 +95,12 @@ func TestAccountFilterDoesNotAllowWhenNoMatch(t *testing.T) {
err := filter.RefreshAccountFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"))

tt.NoError(err)
tt.Equal(isEnabled, true)
tt.Equal(result, false)
}

Expand Down
28 changes: 16 additions & 12 deletions services/horizon/internal/ingest/filters/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,26 @@ func NewAssetFilter() AssetFilter {
}
}

func (filter *assetFilter) Name() string {
func (f *assetFilter) Name() string {
return "filters.assetFilter"
}

func (filter *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error {
func (f *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > filter.lastModified {
if filterConfig.LastModified > f.lastModified {
logger.Infof("New Asset Filter config detected, reloading new config %v ", *filterConfig)
filter.enabled = filterConfig.Enabled
filter.canonicalAssetsLookup = listToSet(filterConfig.Whitelist)
filter.lastModified = filterConfig.LastModified
f.enabled = filterConfig.Enabled
f.canonicalAssetsLookup = listToSet(filterConfig.Whitelist)
f.lastModified = filterConfig.LastModified
}

return nil
}

func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
// filtering is disabled if the whitelist is empty for now as that is the only filter rule
if len(f.canonicalAssetsLookup) < 1 || !f.enabled {
return true, nil
func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) {
if !f.isEnabled() {
return false, true, nil
}

var operations []xdr.Operation
Expand All @@ -68,11 +67,11 @@ func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.
}

if f.filterOperationsMatchedOnRules(operations) {
return true, nil
return true, true, nil
}

logger.Debugf("No match, dropped tx with seq %v ", transaction.Envelope.SeqNum())
return false, nil
return true, false, nil
}

func (f assetFilter) filterOperationsMatchedOnRules(operations []xdr.Operation) bool {
Expand Down Expand Up @@ -144,3 +143,8 @@ func listToSet(list []string) set.Set[string] {
}
return set
}

func (f assetFilter) isEnabled() bool {
// filtering is disabled if the whitelist is empty for now as that is the only filter rule
return len(f.canonicalAssetsLookup) >= 1 && f.enabled
}
21 changes: 14 additions & 7 deletions services/horizon/internal/ingest/filters/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ func TestAssetFilterAllowsOnMatch(t *testing.T) {
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(isEnabled, true)
tt.Equal(result, true)

result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
isEnabled, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(isEnabled, true)
tt.Equal(result, true)
}

Expand All @@ -47,12 +49,14 @@ func TestAssetFilterAllowsWhenEmptyWhitelist(t *testing.T) {
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(isEnabled, false)
tt.Equal(result, true)

result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
isEnabled, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(isEnabled, false)
tt.Equal(result, true)
}

Expand All @@ -69,9 +73,10 @@ func TestAssetFilterAllowsWhenDisabled(t *testing.T) {
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
// there was no match on filter rules, but since filter was disabled also, it should allow all
tt.Equal(isEnabled, false)
tt.Equal(result, true)
}

Expand All @@ -89,12 +94,14 @@ func TestAssetFilterDoesNotAllowV1WhenNoMatch(t *testing.T) {
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(isEnabled, true)
tt.Equal(result, false)

result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
isEnabled, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(isEnabled, true)
tt.Equal(result, false)
}

Expand Down
26 changes: 18 additions & 8 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,31 @@ func (g *groupTransactionFilterers) Name() string {
return "groupTransactionFilterers"
}

func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) {
func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, bool, error) {
filtersEnabled := false

for _, f := range g.filterers {
startTime := time.Now()
include, err := f.FilterTransaction(ctx, tx)
filterEnabled, include, err := f.FilterTransaction(ctx, tx)
if !filterEnabled {
continue
}

filtersEnabled = true
if err != nil {
return false, errors.Wrapf(err, "error in %T.FilterTransaction", f)
return true, false, errors.Wrapf(err, "error in %T.FilterTransaction", f)
}
g.AddRunDuration(f.Name(), startTime)
if !include {
// filter out, we can return early
g.droppedTransactions++
return false, nil
if include {
return true, true, nil
}
}
return true, nil

if filtersEnabled {
g.droppedTransactions++
return true, false, nil
}
return false, true, nil
}

func (g *groupTransactionFilterers) ResetStats() {
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/processors/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type LedgerTransactionProcessor interface {

type LedgerTransactionFilterer interface {
Name() string
FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error)
FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error)
}

func StreamLedgerTransactions(
Expand All @@ -47,7 +47,7 @@ func StreamLedgerTransactions(
if err != nil {
return errors.Wrap(err, "could not read transaction")
}
include, err := txFilterer.FilterTransaction(ctx, tx)
_, include, err := txFilterer.FilterTransaction(ctx, tx)
if err != nil {
return errors.Wrapf(
err,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/clients/horizonclient"
hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/services/horizon/internal/ingest/filters"
"github.com/stellar/go/services/horizon/internal/test/integration"
"github.com/stellar/go/txnbuild"
"github.com/stretchr/testify/assert"
)

func TestFilteringWithNoFilters(t *testing.T) {
Expand Down Expand Up @@ -168,3 +169,67 @@ func TestFilteringAssetWhiteList(t *testing.T) {
_, err = itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)
}

func TestFilteringAssetAndAccountFilters(t *testing.T) {
tt := assert.New(t)
const adminPort uint16 = 6000
itest := integration.NewTest(t, integration.Config{
HorizonIngestParameters: map[string]string{
"admin-port": strconv.Itoa(int(adminPort)),
},
})

fullKeys, accounts := itest.CreateAccounts(2, "10000")
whitelistedAccount := accounts[0]
whitelistedAccountKey := fullKeys[0]
nonWhitelistedAccount := accounts[1]
nonWhitelistedAccountKey := fullKeys[1]
enabled := true

whitelistedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()}
nonWhitelistedAsset := txnbuild.CreditAsset{Code: "SEK", Issuer: nonWhitelistedAccountKey.Address()}
itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, nonWhitelistedAsset)

// Setup whitelisted account and asset rule, force refresh of filter configs to be quick
filters.SetFilterConfigCheckIntervalSeconds(1)

expectedAccountFilter := hProtocol.AccountFilterConfig{
Whitelist: []string{whitelistedAccount.GetAccountID()},
Enabled: &enabled,
}
err := itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter)
tt.NoError(err)
accountFilter, err := itest.AdminClient().GetIngestionAccountFilter()
tt.NoError(err)
tt.ElementsMatch(expectedAccountFilter.Whitelist, accountFilter.Whitelist)
tt.Equal(expectedAccountFilter.Enabled, accountFilter.Enabled)

asset, err := whitelistedAsset.ToXDR()
tt.NoError(err)
expectedAssetFilter := hProtocol.AssetFilterConfig{
Whitelist: []string{asset.StringCanonical()},
Enabled: &enabled,
}
err = itest.AdminClient().SetIngestionAssetFilter(expectedAssetFilter)
tt.NoError(err)
assetFilter, err := itest.AdminClient().GetIngestionAssetFilter()
tt.NoError(err)

tt.ElementsMatch(expectedAssetFilter.Whitelist, assetFilter.Whitelist)
tt.Equal(expectedAssetFilter.Enabled, assetFilter.Enabled)

// Ensure the latest filter configs are reloaded by the ingestion state machine processor
time.Sleep(filters.GetFilterConfigCheckIntervalSeconds() * time.Second)

// Use a non-whitelisted account to submit a non-whitelisted asset to a whitelisted account.
// The transaction should be stored.
txResp := itest.MustSubmitOperations(nonWhitelistedAccount, nonWhitelistedAccountKey,
&txnbuild.Payment{
Destination: whitelistedAccount.GetAccountID(),
Amount: "10",
Asset: nonWhitelistedAsset,
},
)
_, err = itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)
}

0 comments on commit 0f6abb0

Please sign in to comment.