Skip to content

Commit

Permalink
#4222: fix filters internal expired cache state rollover setting
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Mar 3, 2022
1 parent eea2ae6 commit f5a092d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/filters/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (filter *accountFilter) RefreshAccountFilter(filterConfig *history.FilterCo
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > filter.lastModified {
logger.Infof("New Account Filter config detected, reloading new config %v ", *filterConfig)
var accountFilterRules AccountFilterRules
if err := json.Unmarshal([]byte(filterConfig.Rules), &accountFilterRules); err != nil {
return errors.Wrap(err, "unable to serialize account filter rules")
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/filters/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (filter *assetFilter) RefreshAssetFilter(filterConfig *history.FilterConfig
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > filter.lastModified {
logger.Infof("New Asset Filter config detected, reloading new config %v ", *filterConfig)
var assetFilterRules AssetFilterRules
if err := json.Unmarshal([]byte(filterConfig.Rules), &assetFilterRules); err != nil {
return errors.Wrap(err, "unable to serialize asset filter rules")
Expand Down
11 changes: 8 additions & 3 deletions services/horizon/internal/ingest/filters/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,24 @@ func NewFilters() Filters {
}

// Provide list of the active filters. Optimize performance by caching the list, only
// rebuild the list on expiration time interval. Method is thread-safe.
// rebuild the list on expiration time interval. Method is NOT thread-safe.
func (f *filtersCache) GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer {
// TODO, should we put a mutex/sync on this to be safe? currently not re-entrant,
// bound to instance of filtersCache, looks like it is only invoked serially per ledger from a processor,
// thinking can safely avoid the sync overhead?

// only attempt to refresh filter config cache state at configured interval limit
if time.Now().Unix() < (f.lastFilterConfigCheckUnixEpoch + filterConfigCheckIntervalSeconds) {
return f.convertCacheToList()
}

f.lastFilterConfigCheckUnixEpoch = time.Now().Unix()

LOG.Info("expired filter config cache, refresh from db")
filterConfigs, err := filterQ.GetAllFilters(ctx)
if err != nil {
LOG.Errorf("unable to query filter configs, %v", err)
// reset the cache time regardless, so next attempt is at next interval
f.lastFilterConfigCheckUnixEpoch = time.Now().Unix()
// allow the error, fall back to last loaded config
return f.convertCacheToList()
}

Expand Down

0 comments on commit f5a092d

Please sign in to comment.