Skip to content

Commit

Permalink
stellar#4222: cleaned up formatting, static checks
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Feb 21, 2022
1 parent e2d303a commit 69d0977
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 76 deletions.
25 changes: 13 additions & 12 deletions services/horizon/internal/actions/filter_rules_asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
)

type assetFilterResource struct {
Rules filters.AssetFilterRules `json:"rules"`
Enabled bool `json:"enabled"`
Name string `json:"name"`
Rules filters.AssetFilterRules `json:"rules"`
Enabled bool `json:"enabled"`
Name string `json:"name"`
}

func (afr assetFilterResource) Validate() error {
Expand Down Expand Up @@ -47,8 +47,8 @@ func (handler AssetFilterRuleHandler) Get(w http.ResponseWriter, r *http.Request
return
}

var assetFilterRules filters.AssetFilterRules
if err = json.Unmarshal([]byte(filter.Rules), assetFilterRules); err != nil {
var assetFilterRules = filters.AssetFilterRules{}
if err = json.Unmarshal([]byte(filter.Rules), &assetFilterRules); err != nil {
p := problem.ServerError
p.Extras = map[string]interface{}{
"reason": "invalid asset filter rule json in db",
Expand All @@ -58,7 +58,7 @@ func (handler AssetFilterRuleHandler) Get(w http.ResponseWriter, r *http.Request
}

assetFilterResource := &assetFilterResource{
Rules: assetFilterRules,
Rules: assetFilterRules,
Enabled: filter.Enabled,
Name: filter.Name,
}
Expand Down Expand Up @@ -86,20 +86,21 @@ func (handler AssetFilterRuleHandler) Set(w http.ResponseWriter, r *http.Request
return
}

var filterConfig history.FilterConfig
var filterConfig history.FilterConfig
var assetFilterRules []byte
filterConfig.Enabled = assetFilterRequest.Enabled
filterConfig.Name = history.FilterAssetFilterName
if assetFilterRules, err := json.Marshal(assetFilterRequest.Rules); err != nil {

if assetFilterRules, err = json.Marshal(assetFilterRequest.Rules); err != nil {
p := problem.ServerError
p.Extras = map[string]interface{}{
"reason": "unable to serialize asset filter rules resource from json",
}
problem.Render(r.Context(), w, err)
return
} else {
filterConfig.Rules = string(assetFilterRules)
}

}
filterConfig.Rules = string(assetFilterRules)

if err = historyQ.SetFilterConfig(r.Context(), filterConfig); err != nil {
problem.Render(r.Context(), w, err)
return
Expand Down
38 changes: 19 additions & 19 deletions services/horizon/internal/db2/history/filter_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ import (
)

const (
filterRulesTableName = "ingest_filter_rules"
filterRulesTypeColumnName = "name"
filterRulesColumnName = "rules"
filterRulesEnabledColumnName = "enabled"
filterRulesTableName = "ingest_filter_rules"
filterRulesTypeColumnName = "name"
filterRulesColumnName = "rules"
filterRulesEnabledColumnName = "enabled"
filterRulesLastModifiedColumnName = "last_modified"
FilterAssetFilterName = "asset"
FilterAssetFilterName = "asset"
)

type FilterConfig struct {
Enabled bool `db:"enabled"`
Rules string `db:"rules"`
Name string `db:"name"`
LastModified uint64 `db:"last_modified"`
Enabled bool `db:"enabled"`
Rules string `db:"rules"`
Name string `db:"name"`
LastModified uint64 `db:"last_modified"`
}

type QFilter interface {
Expand All @@ -32,28 +32,28 @@ func (q *Q) GetAllFilters(ctx context.Context) ([]FilterConfig, error) {
var filterConfigs []FilterConfig
sql := sq.Select().From(filterRulesTableName)
err := q.Select(ctx, filterConfigs, sql)

return filterConfigs, err
}

func (q *Q) GetFilterByName(ctx context.Context, name string) (FilterConfig, error) {
var filterConfig FilterConfig
sql := sq.Select().From(filterRulesTableName).Where(sq.Eq{filterRulesTypeColumnName: name,})
sql := sq.Select().From(filterRulesTableName).Where(sq.Eq{filterRulesTypeColumnName: name})
err := q.Select(ctx, filterConfig, sql)

return filterConfig, err
}

func (q *Q) SetFilterConfig(ctx context.Context, config FilterConfig) error {
updateCols := map[string]interface{}{
filterRulesLastModifiedColumnName: sq.Expr("extract(epoch from now() at time zone 'utc')"),
filterRulesEnabledColumnName: config.Enabled,
filterRulesColumnName: config.Rules,
filterRulesLastModifiedColumnName: sq.Expr("extract(epoch from now() at time zone 'utc')"),
filterRulesEnabledColumnName: config.Enabled,
filterRulesColumnName: config.Rules,
}

sql := sq.Update(filterRulesTableName).SetMap(updateCols).Where(
sq.Eq{filterRulesTypeColumnName: config.Name,})
_, err := q.Exec(ctx, sql);
sq.Eq{filterRulesTypeColumnName: config.Name})

_, err := q.Exec(ctx, sql)
return err
}
34 changes: 18 additions & 16 deletions services/horizon/internal/ingest/filters/asset_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,40 @@ func init() {
logger = log.WithFields(log.F{
"ingest filter": "asset",
})
singleton = &AssetFilter{
canonicalAssetsLookup: map[string]bool{},
lastModified: 0,
singleton = &AssetFilter{
canonicalAssetsLookup: map[string]bool{},
lastModified: 0,
}
}

type AssetFilterRules struct {
CanonicalWhitelist []string `json:"canonical_asset_whitelist"`
CanonicalWhitelist []string `json:"canonical_asset_whitelist"`
}

type AssetFilter struct {
canonicalAssetsLookup map[string]bool
lastModified uint64
canonicalAssetsLookup map[string]bool
lastModified uint64
}

func GetAssetFilter(filterConfig *history.FilterConfig) (*AssetFilter, error) {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > singleton.lastModified {
var assetFilterRules AssetFilterRules
if err := json.Unmarshal([]byte(filterConfig.Rules), &assetFilterRules); err != nil {
var assetFilterRules AssetFilterRules
if err := json.Unmarshal([]byte(filterConfig.Rules), &assetFilterRules); err != nil {
return nil, errors.Wrap(err, "unable to serialize asset filter rules")
}
singleton = &AssetFilter{
canonicalAssetsLookup: listToMap(assetFilterRules.CanonicalWhitelist),
lastModified: filterConfig.LastModified,
}
canonicalAssetsLookup: listToMap(assetFilterRules.CanonicalWhitelist),
lastModified: filterConfig.LastModified,
}
}

return singleton, nil
}

func (f *AssetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {

tx, v1Exists := transaction.Envelope.GetV1()
if !v1Exists {
return true, nil
Expand Down Expand Up @@ -107,19 +107,21 @@ func (f *AssetFilter) FilterTransaction(ctx context.Context, transaction ingest.
allowed = true
}
}
return allowed, nil

if allowed {
return true, nil
}
}

logger.Debugf("No match, dropped tx with seq %v ", transaction.Envelope.SeqNum())
return false, nil
}


func (f *AssetFilter) assetMatchedFilter(asset *xdr.Asset) bool {
var matched = false
if _, found := f.canonicalAssetsLookup[asset.StringCanonical()]; found {
matched = true
}
}
return matched
}

Expand Down
15 changes: 7 additions & 8 deletions services/horizon/internal/ingest/filters/asset_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ func TestFilterHasMatch(t *testing.T) {
"USDC:GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"
]
}`,
Enabled: true,
Enabled: true,
LastModified: 1,
Name: history.FilterAssetFilterName,
Name: history.FilterAssetFilterName,
}
filter, err := GetAssetFilter(filterConfig)
tt.NoError(err)
tt.NoError(err)

var xdrAssetCode [12]byte
copy(xdrAssetCode[:], "USDC")
Expand Down Expand Up @@ -77,16 +77,16 @@ func TestFilterHasNoMatch(t *testing.T) {
tt := assert.New(t)
ctx := context.Background()

filterConfig := &history.FilterConfig{
Rules: `{
filterConfig := &history.FilterConfig{
Rules: `{
"canonical_asset_whitelist": [
"USDX:GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"
]
}`,

Enabled: true,
Enabled: true,
LastModified: 1,
Name: history.FilterAssetFilterName,
Name: history.FilterAssetFilterName,
}

filter, err := GetAssetFilter(filterConfig)
Expand Down Expand Up @@ -135,4 +135,3 @@ func TestFilterHasNoMatch(t *testing.T) {
tt.NoError(err)
tt.Equal(result, false)
}

6 changes: 3 additions & 3 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ type groupTransactionFilterers struct {
lastFilterConfigCheckUnixMS int64
}

func newGroupTransactionFilterers(filterers []processors.LedgerTransactionFilterer, lastFilterConfigCheckUnixMS int64 ) *groupTransactionFilterers {
func newGroupTransactionFilterers(filterers []processors.LedgerTransactionFilterer, lastFilterConfigCheckUnixMS int64) *groupTransactionFilterers {
return &groupTransactionFilterers{
filterers: filterers,
processorsRunDurations: make(map[string]time.Duration),
filterers: filterers,
processorsRunDurations: make(map[string]time.Duration),
lastFilterConfigCheckUnixMS: lastFilterConfigCheckUnixMS,
}
}
Expand Down
34 changes: 16 additions & 18 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"bytes"
"context"
"fmt"
"time"
logger "github.com/stellar/go/support/log"
"time"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/filters"
"github.com/stellar/go/services/horizon/internal/ingest/processors"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"

)

type ingestionSource int
Expand Down Expand Up @@ -82,17 +81,16 @@ type ProcessorRunnerInterface interface {
}

var _ ProcessorRunnerInterface = (*ProcessorRunner)(nil)
var (
var (
// default empty filters, this will get populated on first processor invocation
groupFilterers *groupTransactionFilterers = newGroupTransactionFilterers([]processors.LedgerTransactionFilterer{}, 0)
LOG *logger.Entry = log.WithFields(logger.F{
LOG *logger.Entry = log.WithFields(logger.F{
"processor": "filters",
})
// the filter config cache will be checked against latest from db at most once per each of this interval,
// the filter config cache will be checked against latest from db at most once per each of this interval,
filterConfigCheckIntervalMS int64 = 10000
)


type ProcessorRunner struct {
config Config

Expand Down Expand Up @@ -164,35 +162,35 @@ func (s *ProcessorRunner) buildTransactionProcessor(
func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers {

// only attempt to refresh filter config cache state at configured interval limit
if time.Now().UnixMilli() < (groupFilterers.lastFilterConfigCheckUnixMS + filterConfigCheckIntervalMS){
return groupFilterers
if time.Now().UnixMilli() < (groupFilterers.lastFilterConfigCheckUnixMS + filterConfigCheckIntervalMS) {
return groupFilterers
}

LOG.Info("expired filter config cache, refresh from db")
filterConfigs, err := s.historyQ.GetAllFilters(s.ctx)
if err != nil {
LOG.Errorf("unable to query filter configs, %v",err)
LOG.Errorf("unable to query filter configs, %v", err)
// reset the cache time regardless, so next attempt is at next interval
groupFilterers.lastFilterConfigCheckUnixMS = time.Now().UnixMilli()
return groupFilterers
}
}

newFilters := []processors.LedgerTransactionFilterer{}
for _, filterConfig := range filterConfigs {
if filterConfig.Enabled {
switch filterConfig.Name {
case history.FilterAssetFilterName:
assetFilter, err := filters.GetAssetFilter(&filterConfig)
if err != nil {
LOG.Errorf("unable to create asset filter %v",err)
continue
}
newFilters = append(newFilters, assetFilter)
case history.FilterAssetFilterName:
assetFilter, err := filters.GetAssetFilter(&filterConfig)
if err != nil {
LOG.Errorf("unable to create asset filter %v", err)
continue
}
newFilters = append(newFilters, assetFilter)
}
}
}
groupFilterers = newGroupTransactionFilterers(newFilters, time.Now().UnixMilli())
return groupFilterers
return groupFilterers
}

// checkIfProtocolVersionSupported checks if this Horizon version supports the
Expand Down

0 comments on commit 69d0977

Please sign in to comment.