Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backend: replace ratelimiter with x/time/rate. #3155

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/BitBoxSwiss/bitbox-wallet-app/util/logging"
"github.com/BitBoxSwiss/bitbox-wallet-app/util/observable"
"github.com/BitBoxSwiss/bitbox-wallet-app/util/observable/action"
"github.com/BitBoxSwiss/bitbox-wallet-app/util/ratelimit"
"github.com/BitBoxSwiss/bitbox-wallet-app/util/socksproxy"
"github.com/btcsuite/btcd/chaincfg"
"github.com/ethereum/go-ethereum/params"
Expand Down Expand Up @@ -285,7 +284,7 @@ func NewBackend(arguments *arguments.Arguments, environment Environment) (*Backe
backend.notifier = notifier
backend.socksProxy = backendProxy
backend.httpClient = hclient
backend.etherScanHTTPClient = ratelimit.FromTransport(hclient.Transport, etherscan.CallInterval)
backend.etherScanHTTPClient = hclient

ratesCache := filepath.Join(arguments.CacheDirectoryPath(), "exchangerates")
if err := os.MkdirAll(ratesCache, 0700); err != nil {
Expand Down
43 changes: 25 additions & 18 deletions backend/coins/eth/etherscan/etherscan.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"golang.org/x/time/rate"
)

// CallInterval is the duration between etherscan requests.
// CallsPerSec is thenumber of etherscanr equests allowed
// per second.
// Etherscan rate limits to one request per 0.2 seconds.
var CallInterval = 260 * time.Millisecond
var CallsPerSec = 3.8

const apiKey = "X3AFAGQT2QCAFTFPIH9VJY88H9PIQ2UWP7"

Expand All @@ -50,17 +52,22 @@ const ERC20GasErr = "insufficient funds for gas * price + value"
type EtherScan struct {
url string
httpClient *http.Client
limiter *rate.Limiter
}

// NewEtherScan creates a new instance of EtherScan.
func NewEtherScan(url string, httpClient *http.Client) *EtherScan {
return &EtherScan{
url: url,
httpClient: httpClient,
limiter: rate.NewLimiter(rate.Limit(CallsPerSec), 1),
}
}

func (etherScan *EtherScan) call(params url.Values, result interface{}) error {
func (etherScan *EtherScan) call(ctx context.Context, params url.Values, result interface{}) error {
if err := etherScan.limiter.Wait(ctx); err != nil {
return errp.WithStack(err)
}
params.Set("apikey", apiKey)
response, err := etherScan.httpClient.Get(etherScan.url + "?" + params.Encode())
if err != nil {
Expand Down Expand Up @@ -323,7 +330,7 @@ func (etherScan *EtherScan) Transactions(
result := struct {
Result []*Transaction
}{}
if err := etherScan.call(params, &result); err != nil {
if err := etherScan.call(context.TODO(), params, &result); err != nil {
return nil, err
}
isERC20 := erc20Token != nil
Expand All @@ -338,7 +345,7 @@ func (etherScan *EtherScan) Transactions(
resultInternal := struct {
Result []*Transaction
}{}
if err := etherScan.call(params, &resultInternal); err != nil {
if err := etherScan.call(context.TODO(), params, &resultInternal); err != nil {
return nil, err
}
var err error
Expand All @@ -353,7 +360,7 @@ func (etherScan *EtherScan) Transactions(

// ----- RPC node proxy methods follow

func (etherScan *EtherScan) rpcCall(params url.Values, result interface{}) error {
func (etherScan *EtherScan) rpcCall(ctx context.Context, params url.Values, result interface{}) error {
params.Set("module", "proxy")

var wrapped struct {
Expand All @@ -364,7 +371,7 @@ func (etherScan *EtherScan) rpcCall(params url.Values, result interface{}) error
} `json:"error"`
Result *json.RawMessage `json:"result"`
}
if err := etherScan.call(params, &wrapped); err != nil {
if err := etherScan.call(ctx, params, &wrapped); err != nil {
return err
}
if wrapped.Error != nil {
Expand All @@ -389,7 +396,7 @@ func (etherScan *EtherScan) TransactionReceiptWithBlockNumber(
params.Set("action", "eth_getTransactionReceipt")
params.Set("txhash", hash.Hex())
var result *rpcclient.RPCTransactionReceipt
if err := etherScan.rpcCall(params, &result); err != nil {
if err := etherScan.rpcCall(ctx, params, &result); err != nil {
return nil, err
}
return result, nil
Expand All @@ -402,7 +409,7 @@ func (etherScan *EtherScan) TransactionByHash(
params.Set("action", "eth_getTransactionByHash")
params.Set("txhash", hash.Hex())
var result rpcclient.RPCTransaction
if err := etherScan.rpcCall(params, &result); err != nil {
if err := etherScan.rpcCall(ctx, params, &result); err != nil {
return nil, false, err
}
return &result.Transaction, result.BlockNumber == nil, nil
Expand All @@ -415,7 +422,7 @@ func (etherScan *EtherScan) BlockNumber(ctx context.Context) (*big.Int, error) {
params.Set("tag", "latest")
params.Set("boolean", "false")
var header *types.Header
if err := etherScan.rpcCall(params, &header); err != nil {
if err := etherScan.rpcCall(ctx, params, &header); err != nil {
return nil, err
}
return header.Number, nil
Expand All @@ -434,7 +441,7 @@ func (etherScan *EtherScan) Balance(ctx context.Context, account common.Address)
params.Set("action", "balance")
params.Set("address", account.Hex())
params.Set("tag", "latest")
if err := etherScan.call(params, &result); err != nil {
if err := etherScan.call(ctx, params, &result); err != nil {
return nil, err
}
if result.Status != "1" {
Expand All @@ -461,7 +468,7 @@ func (etherScan *EtherScan) ERC20Balance(account common.Address, erc20Token *erc
params.Set("address", account.Hex())
params.Set("contractaddress", erc20Token.ContractAddress().Hex())
params.Set("tag", "latest")
if err := etherScan.call(params, &result); err != nil {
if err := etherScan.call(context.TODO(), params, &result); err != nil {
return nil, err
}
if result.Status != "1" {
Expand All @@ -485,7 +492,7 @@ func (etherScan *EtherScan) CallContract(ctx context.Context, msg ethereum.CallM
panic("not implemented")
}
var result hexutil.Bytes
if err := etherScan.rpcCall(params, &result); err != nil {
if err := etherScan.rpcCall(ctx, params, &result); err != nil {
return nil, err
}
return result, nil
Expand Down Expand Up @@ -515,7 +522,7 @@ func (etherScan *EtherScan) EstimateGas(ctx context.Context, msg ethereum.CallMs
callMsgParams(&params, msg)

var result hexutil.Uint64
if err := etherScan.rpcCall(params, &result); err != nil {
if err := etherScan.rpcCall(ctx, params, &result); err != nil {
return 0, err
}
return uint64(result), nil
Expand All @@ -528,7 +535,7 @@ func (etherScan *EtherScan) PendingNonceAt(ctx context.Context, account common.A
params.Set("address", account.Hex())
params.Set("tag", "pending")
var result hexutil.Uint64
if err := etherScan.rpcCall(params, &result); err != nil {
if err := etherScan.rpcCall(ctx, params, &result); err != nil {
return 0, err
}
return uint64(result), nil
Expand All @@ -544,15 +551,15 @@ func (etherScan *EtherScan) SendTransaction(ctx context.Context, tx *types.Trans
params := url.Values{}
params.Set("action", "eth_sendRawTransaction")
params.Set("hex", hexutil.Encode(encodedTx))
return etherScan.rpcCall(params, nil)
return etherScan.rpcCall(ctx, params, nil)
}

// SuggestGasPrice implements rpc.Interface.
func (etherScan *EtherScan) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
params := url.Values{}
params.Set("action", "eth_gasPrice")
var result hexutil.Big
if err := etherScan.rpcCall(params, &result); err != nil {
if err := etherScan.rpcCall(ctx, params, &result); err != nil {
return nil, err
}
return (*big.Int)(&result), nil
Expand Down Expand Up @@ -581,7 +588,7 @@ func (etherScan *EtherScan) FeeTargets(ctx context.Context) ([]*ethtypes.FeeTarg
params := url.Values{}
params.Set("module", "gastracker")
params.Set("action", "gasoracle")
if err := etherScan.call(params, &result); err != nil {
if err := etherScan.call(ctx, params, &result); err != nil {
return nil, err
}
// Convert string fields to int64
Expand Down
10 changes: 5 additions & 5 deletions backend/rates/gecko.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ const (
maxGeckoRange = 364 * 24 * time.Hour
)

// apiRateLimit specifies the minimal interval between equally spaced API calls
// apiRateLimit specifies the maximum number of API calls per second
// to one of the supported exchange rates providers.
func apiRateLimit(baseURL string) time.Duration {
func apiRateLimit(baseURL string) float64 {
switch baseURL {
default:
return time.Second // arbitrary; localhost, staging, etc.
return 1 // arbitrary; localhost, staging, etc.
case coingeckoAPIV3:
// API calls. From https://www.coingecko.com/en/api:
// > Generous rate limits with up to 100 requests/minute
// We use slightly lower value.
return 2 * time.Second
return 0.5
case shiftGeckoMirrorAPIV3:
// Avoid zero to prevent unexpected panics like in time.NewTicker
// and leave some room to breathe.
return 10 * time.Millisecond
return 100
}
}

Expand Down
56 changes: 28 additions & 28 deletions backend/rates/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,38 +278,38 @@ func (updater *RateUpdater) fetchGeckoMarketRange(ctx context.Context, coin, fia
}

// Make the call, abiding the upstream rate limits.
msg := fmt.Sprintf("fetch coingecko coin=%s fiat=%s start=%s", coin, fiat, timeRange.start)
var jsonBody struct{ Prices [][2]float64 } // [timestamp in milliseconds, value]
callErr := updater.geckoLimiter.Call(ctx, msg, func() error {
param := url.Values{
"from": {strconv.FormatInt(timeRange.start.Unix(), 10)},
"to": {strconv.FormatInt(timeRange.end().Unix(), 10)},
"vs_currency": {gfiat},
}
endpoint := fmt.Sprintf("%s/coins/%s/market_chart/range?%s", updater.coingeckoURL, gcoin, param.Encode())
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return err
}
if err := updater.geckoLimiter.Wait(ctx); err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
res, err := updater.httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer res.Body.Close() //nolint:errcheck
if res.StatusCode != http.StatusOK {
return fmt.Errorf("fetchGeckoMarketRange: bad response code %d", res.StatusCode)
}
// 1Mb is more than enough for a single response, but make sure initial
// download with empty cache fits here. See maxGeckoRange.
return json.NewDecoder(io.LimitReader(res.Body, 1<<20)).Decode(&jsonBody)
})
if callErr != nil {
return nil, callErr
param := url.Values{
"from": {strconv.FormatInt(timeRange.start.Unix(), 10)},
"to": {strconv.FormatInt(timeRange.end().Unix(), 10)},
"vs_currency": {gfiat},
}
endpoint := fmt.Sprintf("%s/coins/%s/market_chart/range?%s", updater.coingeckoURL, gcoin, param.Encode())
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
res, err := updater.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer res.Body.Close() //nolint:errcheck
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("fetchGeckoMarketRange: bad response code %d", res.StatusCode)
}

// 1Mb is more than enough for a single response, but make sure initial
// download with empty cache fits here. See maxGeckoRange
if err := json.NewDecoder(io.LimitReader(res.Body, 1<<20)).Decode(&jsonBody); err != nil {
return nil, err
}
// Transform the response into a usable result.
rates := make([]exchangeRate, len(jsonBody.Prices))
for i, v := range jsonBody.Prices {
Expand Down
69 changes: 38 additions & 31 deletions backend/rates/rates.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/BitBoxSwiss/bitbox-wallet-app/util/errp"
"github.com/BitBoxSwiss/bitbox-wallet-app/util/logging"
"github.com/BitBoxSwiss/bitbox-wallet-app/util/observable"
"github.com/BitBoxSwiss/bitbox-wallet-app/util/observable/action"
"github.com/BitBoxSwiss/bitbox-wallet-app/util/ratelimit"
"github.com/sirupsen/logrus"
"go.etcd.io/bbolt"
)
Expand Down Expand Up @@ -119,15 +120,15 @@ type RateUpdater struct {
// See https://www.coingecko.com/en/api for details.
coingeckoURL string
// All requests to coingeckoURL are rate-limited using geckoLimiter.
geckoLimiter *ratelimit.LimitedCall
geckoLimiter *rate.Limiter
}

// NewRateUpdater returns a new rates updater.
// The dbdir argument is the location of a historical rates database cache.
// The returned updater can function without a valid database cache but may be
// impacted by rate limits. The database cache is transparent to the updater users.
// To stay within acceptable rate limits defined by CoinGeckoRateLimit, callers can
// use util/ratelimit package.
// use https://pkg.go.dev/golang.org/x/time/rate package.
//
// Both Last and PriceAt of the newly created updater always return zero values
// until data is fetched from the external APIs. To make the updater start fetching data
Expand Down Expand Up @@ -155,7 +156,7 @@ func NewRateUpdater(client *http.Client, dbdir string) *RateUpdater {
log: log,
httpClient: client,
coingeckoURL: apiURL,
geckoLimiter: ratelimit.NewLimitedCall(apiRateLimit(apiURL)),
geckoLimiter: rate.NewLimiter(rate.Limit(apiRateLimit(apiURL)), 1),
}
}

Expand Down Expand Up @@ -275,36 +276,42 @@ func (updater *RateUpdater) updateLast(ctx context.Context) {
}

var geckoRates map[string]map[string]float64
callErr := updater.geckoLimiter.Call(ctx, "updateLast", func() error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
res, err := updater.httpClient.Do(req.WithContext(ctx))
if err != nil {
return errp.WithStack(err)
}
defer res.Body.Close() //nolint:errcheck
if res.StatusCode != http.StatusOK {
return errp.Newf("bad response code %d", res.StatusCode)
}
const max = 10240
responseBody, err := io.ReadAll(io.LimitReader(res.Body, max+1))
if err != nil {
return errp.WithStack(err)
}
if len(responseBody) > max {
return errp.Newf("rates response too long (> %d bytes)", max)
}
if err := json.Unmarshal(responseBody, &geckoRates); err != nil {
return errp.WithMessage(err,
fmt.Sprintf("could not parse rates response: %s", string(responseBody)))
}
return nil
})
if callErr != nil {
updater.log.WithError(callErr).Errorf("updateLast")
if err := updater.geckoLimiter.Wait(ctx); err != nil {
updater.log.WithError(err).Error("could not wait for rate limiter")
updater.last = nil
return
}

res, err := updater.httpClient.Do(req.WithContext(ctx))
if err != nil {
updater.log.WithError(err).Error("could not make request")
updater.last = nil
return
}
defer res.Body.Close() //nolint:errcheck
if res.StatusCode != http.StatusOK {
updater.log.Errorf("bad response code %d", res.StatusCode)
updater.last = nil
return
}
const max = 10240
responseBody, err := io.ReadAll(io.LimitReader(res.Body, max+1))
if err != nil {
updater.log.WithError(err).Error("could not read response")
updater.last = nil
return
}
if len(responseBody) > max {
updater.last = nil
updater.log.Errorf("rates response too long (> %d bytes)", max)
return
}
if err := json.Unmarshal(responseBody, &geckoRates); err != nil {
updater.last = nil
updater.log.Errorf("could not parse rates response: %s", string(responseBody))
return
}

// Convert the map with coingecko coin/fiat codes to a map of coin/fiat units.
rates := map[string]map[string]float64{}
for coin, val := range geckoRates {
Expand Down
Loading