diff --git a/.gitignore b/.gitignore index 36a6b08..6ee1f64 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,6 @@ /static_dev/ /relayscan /deploy* +/test.csv +/csv/ +/build/ \ No newline at end of file diff --git a/Makefile b/Makefile index fadffd8..cf02b7d 100644 --- a/Makefile +++ b/Makefile @@ -50,3 +50,14 @@ cover-html: docker-image: DOCKER_BUILDKIT=1 docker build --platform linux/amd64 --build-arg VERSION=${VERSION} . -t relayscan + + +generate-ssz: + rm -f common/ultrasoundbid_encoding.go + sszgen --path common --objs UltrasoundStreamBid + +bids-website: + go run . service bidcollect --build-website --build-website-upload + +bids-website-dev: + go run . service bidcollect --devserver diff --git a/cmd/service/bidcollect.go b/cmd/service/bidcollect.go new file mode 100644 index 0000000..094194d --- /dev/null +++ b/cmd/service/bidcollect.go @@ -0,0 +1,114 @@ +package service + +/** + * https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md + */ + +import ( + "github.com/flashbots/relayscan/common" + "github.com/flashbots/relayscan/services/bidcollect" + "github.com/flashbots/relayscan/services/bidcollect/website" + "github.com/flashbots/relayscan/vars" + "github.com/spf13/cobra" +) + +var ( + collectUltrasoundStream bool + collectGetHeader bool + collectDataAPI bool + useAllRelays bool + + outDir string + outputTSV bool // by default: CSV, but can be changed to TSV with this setting + + runDevServerOnly bool // used to play with file listing website + devServerListenAddr = ":8095" + + buildWebsite bool + buildWebsiteUpload bool + buildWebsiteOutDir string +) + +func init() { + bidCollectCmd.Flags().BoolVar(&collectUltrasoundStream, "ultrasound-stream", false, "use ultrasound top-bid stream") + bidCollectCmd.Flags().BoolVar(&collectGetHeader, "get-header", false, "use getHeader API") + bidCollectCmd.Flags().BoolVar(&collectDataAPI, "data-api", false, "use data API") + bidCollectCmd.Flags().BoolVar(&useAllRelays, "all-relays", false, "use all relays") + + // for getHeader + bidCollectCmd.Flags().StringVar(&beaconNodeURI, "beacon-uri", vars.DefaultBeaconURI, "beacon endpoint") + + // for saving to file + bidCollectCmd.Flags().StringVar(&outDir, "out", "csv", "output directory for CSV/TSV") + bidCollectCmd.Flags().BoolVar(&outputTSV, "out-tsv", false, "output as TSV (instead of CSV)") + + // for dev purposes + bidCollectCmd.Flags().BoolVar(&runDevServerOnly, "devserver", false, "only run devserver to play with file listing website") + + // building the S3 website + bidCollectCmd.Flags().BoolVar(&buildWebsite, "build-website", false, "build file listing website") + bidCollectCmd.Flags().BoolVar(&buildWebsiteUpload, "build-website-upload", false, "upload after building") + bidCollectCmd.Flags().StringVar(&buildWebsiteOutDir, "build-website-out", "build", "output directory for website") +} + +var bidCollectCmd = &cobra.Command{ + Use: "bidcollect", + Short: "Collect bids", + Run: func(cmd *cobra.Command, args []string) { + if runDevServerOnly { + log.Infof("Bidcollect (%s) devserver starting on %s ...", vars.Version, devServerListenAddr) + fileListingDevServer() + return + } + + if buildWebsite { + log.Infof("Bidcollect %s building website (output: %s) ...", vars.Version, buildWebsiteOutDir) + website.BuildProdWebsite(log, buildWebsiteOutDir, buildWebsiteUpload) + return + } + + log.Infof("Bidcollect starting (%s) ...", vars.Version) + + // Prepare relays + relays := []common.RelayEntry{ + common.MustNewRelayEntry(vars.RelayFlashbots, false), + common.MustNewRelayEntry(vars.RelayUltrasound, false), + } + if useAllRelays { + relays = common.MustGetRelays() + } + + log.Infof("Using %d relays", len(relays)) + for index, relay := range relays { + log.Infof("- relay #%d: %s", index+1, relay.Hostname()) + } + + opts := bidcollect.BidCollectorOpts{ + Log: log, + Relays: relays, + CollectUltrasoundStream: collectUltrasoundStream, + CollectGetHeader: collectGetHeader, + CollectDataAPI: collectDataAPI, + BeaconNodeURI: beaconNodeURI, + OutDir: outDir, + OutputTSV: outputTSV, + } + + bidCollector := bidcollect.NewBidCollector(&opts) + bidCollector.MustStart() + }, +} + +func fileListingDevServer() { + webserver, err := website.NewDevWebserver(&website.DevWebserverOpts{ //nolint:exhaustruct + ListenAddress: devServerListenAddr, + Log: log, + }) + if err != nil { + log.Fatal(err) + } + err = webserver.StartServer() + if err != nil { + log.Fatal(err) + } +} diff --git a/cmd/service/collect-live-bids.go b/cmd/service/collect-live-bids.go index ed5ae6d..8a376f1 100644 --- a/cmd/service/collect-live-bids.go +++ b/cmd/service/collect-live-bids.go @@ -14,7 +14,7 @@ func init() { var liveBidsCmd = &cobra.Command{ Use: "collect-live-bids", - Short: "On every slot, ask for live bids", + Short: "On every slot, ask for live bids (using getHeader)", Run: func(cmd *cobra.Command, args []string) { // Connect to Postgres db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) diff --git a/cmd/service/service.go b/cmd/service/service.go index d63c07b..8d75727 100644 --- a/cmd/service/service.go +++ b/cmd/service/service.go @@ -22,4 +22,5 @@ var ServiceCmd = &cobra.Command{ func init() { ServiceCmd.AddCommand(websiteCmd) ServiceCmd.AddCommand(liveBidsCmd) + ServiceCmd.AddCommand(bidCollectCmd) } diff --git a/common/relayentry.go b/common/relayentry.go index 892a8a9..23fddf8 100644 --- a/common/relayentry.go +++ b/common/relayentry.go @@ -52,6 +52,12 @@ func NewRelayEntry(relayURL string, requireUser bool) (entry RelayEntry, err err return entry, err } +func MustNewRelayEntry(relayURL string, requireUser bool) (entry RelayEntry) { + entry, err := NewRelayEntry(relayURL, requireUser) + Check(err) + return entry +} + // RelayEntriesToStrings returns the string representation of a list of relay entries func RelayEntriesToStrings(relays []RelayEntry) []string { ret := make([]string, len(relays)) @@ -61,6 +67,15 @@ func RelayEntriesToStrings(relays []RelayEntry) []string { return ret } +// RelayEntriesToHostnameStrings returns the hostnames of a list of relay entries +func RelayEntriesToHostnameStrings(relays []RelayEntry) []string { + ret := make([]string, len(relays)) + for i, entry := range relays { + ret[i] = entry.Hostname() + } + return ret +} + func GetRelays() ([]RelayEntry, error) { var err error relays := make([]RelayEntry, len(vars.RelayURLs)) @@ -72,3 +87,9 @@ func GetRelays() ([]RelayEntry, error) { } return relays, nil } + +func MustGetRelays() []RelayEntry { + relays, err := GetRelays() + Check(err) + return relays +} diff --git a/common/request.go b/common/request.go index 289d6b8..06faa80 100644 --- a/common/request.go +++ b/common/request.go @@ -55,7 +55,13 @@ func SendHTTPRequest(ctx context.Context, client http.Client, method, url string return resp.StatusCode, fmt.Errorf("%w: %d / %s", errHTTPErrorResponse, resp.StatusCode, string(bodyBytes)) } - if dst != nil { + if dst == nil { + // still read the body to reuse http connection (see also https://stackoverflow.com/a/17953506) + _, err = io.Copy(io.Discard, resp.Body) + if err != nil { + return resp.StatusCode, fmt.Errorf("could not read response body: %w", err) + } + } else { bodyBytes, err := io.ReadAll(resp.Body) if err != nil { return resp.StatusCode, fmt.Errorf("could not read response body: %w", err) diff --git a/common/ultrasoundbid.go b/common/ultrasoundbid.go new file mode 100644 index 0000000..7a54d7c --- /dev/null +++ b/common/ultrasoundbid.go @@ -0,0 +1,28 @@ +package common + +import "math/big" + +// https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md + +type ( + U64 [8]byte + Hash [32]byte + PublicKey [48]byte + Address [20]byte + U256 [32]byte +) + +func (n *U256) String() string { + return new(big.Int).SetBytes(ReverseBytes(n[:])).String() +} + +type UltrasoundStreamBid struct { + Timestamp uint64 `json:"timestamp"` + Slot uint64 `json:"slot"` + BlockNumber uint64 `json:"block_number"` + BlockHash Hash `json:"block_hash" ssz-size:"32"` + ParentHash Hash `json:"parent_hash" ssz-size:"32"` + BuilderPubkey PublicKey `json:"builder_pubkey" ssz-size:"48"` + FeeRecipient Address `json:"fee_recipient" ssz-size:"20"` + Value U256 `json:"value" ssz-size:"32"` +} diff --git a/common/ultrasoundbid_encoding.go b/common/ultrasoundbid_encoding.go new file mode 100644 index 0000000..6857eb6 --- /dev/null +++ b/common/ultrasoundbid_encoding.go @@ -0,0 +1,124 @@ +package common + +import ( + ssz "github.com/ferranbt/fastssz" +) + +// MarshalSSZ ssz marshals the UltrasoundStreamBid object +func (u *UltrasoundStreamBid) MarshalSSZ() ([]byte, error) { + return ssz.MarshalSSZ(u) +} + +// MarshalSSZTo ssz marshals the UltrasoundStreamBid object to a target array +func (u *UltrasoundStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) { + dst = buf + + // Field (0) 'Timestamp' + dst = ssz.MarshalUint64(dst, u.Timestamp) + + // Field (1) 'Slot' + dst = ssz.MarshalUint64(dst, u.Slot) + + // Field (2) 'BlockNumber' + dst = ssz.MarshalUint64(dst, u.BlockNumber) + + // Field (3) 'BlockHash' + dst = append(dst, u.BlockHash[:]...) + + // Field (4) 'ParentHash' + dst = append(dst, u.ParentHash[:]...) + + // Field (5) 'BuilderPubkey' + dst = append(dst, u.BuilderPubkey[:]...) + + // Field (6) 'FeeRecipient' + dst = append(dst, u.FeeRecipient[:]...) + + // Field (7) 'Value' + dst = append(dst, u.Value[:]...) + + return +} + +// UnmarshalSSZ ssz unmarshals the UltrasoundStreamBid object +func (u *UltrasoundStreamBid) UnmarshalSSZ(buf []byte) error { + var err error + size := uint64(len(buf)) + if size != 188 { + return ssz.ErrSize + } + + // Field (0) 'Timestamp' + u.Timestamp = ssz.UnmarshallUint64(buf[0:8]) + + // Field (1) 'Slot' + u.Slot = ssz.UnmarshallUint64(buf[8:16]) + + // Field (2) 'BlockNumber' + u.BlockNumber = ssz.UnmarshallUint64(buf[16:24]) + + // Field (3) 'BlockHash' + copy(u.BlockHash[:], buf[24:56]) + + // Field (4) 'ParentHash' + copy(u.ParentHash[:], buf[56:88]) + + // Field (5) 'BuilderPubkey' + copy(u.BuilderPubkey[:], buf[88:136]) + + // Field (6) 'FeeRecipient' + copy(u.FeeRecipient[:], buf[136:156]) + + // Field (7) 'Value' + copy(u.Value[:], buf[156:188]) + + return err +} + +// SizeSSZ returns the ssz encoded size in bytes for the UltrasoundStreamBid object +func (u *UltrasoundStreamBid) SizeSSZ() (size int) { + size = 188 + return +} + +// HashTreeRoot ssz hashes the UltrasoundStreamBid object +func (u *UltrasoundStreamBid) HashTreeRoot() ([32]byte, error) { + return ssz.HashWithDefaultHasher(u) +} + +// HashTreeRootWith ssz hashes the UltrasoundStreamBid object with a hasher +func (u *UltrasoundStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) { + indx := hh.Index() + + // Field (0) 'Timestamp' + hh.PutUint64(u.Timestamp) + + // Field (1) 'Slot' + hh.PutUint64(u.Slot) + + // Field (2) 'BlockNumber' + hh.PutUint64(u.BlockNumber) + + // Field (3) 'BlockHash' + hh.PutBytes(u.BlockHash[:]) + + // Field (4) 'ParentHash' + hh.PutBytes(u.ParentHash[:]) + + // Field (5) 'BuilderPubkey' + hh.PutBytes(u.BuilderPubkey[:]) + + // Field (6) 'FeeRecipient' + hh.PutBytes(u.FeeRecipient[:]) + + // Field (7) 'Value' + hh.PutBytes(u.Value[:]) + + hh.Merkleize(indx) + return +} + +// GetTree ssz hashes the UltrasoundStreamBid object +func (u *UltrasoundStreamBid) GetTree() (*ssz.Node, error) { + return ssz.ProofTree(u) +} diff --git a/common/ultrasoundbid_test.go b/common/ultrasoundbid_test.go new file mode 100644 index 0000000..8314b97 --- /dev/null +++ b/common/ultrasoundbid_test.go @@ -0,0 +1,30 @@ +package common + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/stretchr/testify/require" +) + +func TestValueDecoding(t *testing.T) { + expected := "55539751698389157" + hex := "0xa558e5221c51c500000000000000000000000000000000000000000000000000" + hexBytes := hexutil.MustDecode(hex) + value := new(big.Int).SetBytes(ReverseBytes(hexBytes[:])).String() + require.Equal(t, expected, value) +} + +func TestUltrasoundBidSSZDecoding(t *testing.T) { + hex := "0x704b87ce8f010000a94b8c0000000000b6043101000000002c02b28fd8fdb45fd6ac43dd04adad1449a35b64247b1ed23a723a1fcf6cac074d0668c9e0912134628c32a54854b952234ebb6c1fdd6b053566ac2d2a09498da03b00ddb78b2c111450a5417a8c368c40f1f140cdf97d95b7fa9565467e0bbbe27877d08e01c69b4e5b02b144e6a265df99a0839818b3f120ebac9b73f82b617dc6a5556c71794b1a9c5400000000000000000000000000000000000000000000000000" + bytes := hexutil.MustDecode(hex) + bid := new(UltrasoundStreamBid) + err := bid.UnmarshalSSZ(bytes) + require.NoError(t, err) + + require.Equal(t, uint64(1717156924272), bid.Timestamp) + require.Equal(t, uint64(9194409), bid.Slot) + require.Equal(t, uint64(19989686), bid.BlockNumber) + require.Equal(t, "0x2c02b28fd8fdb45fd6ac43dd04adad1449a35b64247b1ed23a723a1fcf6cac07", hexutil.Encode(bid.BlockHash[:])) +} diff --git a/common/utils.go b/common/utils.go index 1349fac..012854d 100644 --- a/common/utils.go +++ b/common/utils.go @@ -4,8 +4,11 @@ package common import ( "math/big" "net/url" + "runtime" + "strings" "time" + "github.com/dustin/go-humanize" "github.com/ethereum/go-ethereum/params" "github.com/flashbots/mev-boost-relay/beaconclient" "github.com/flashbots/relayscan/vars" @@ -26,6 +29,18 @@ func GetURI(url *url.URL, path string) string { return u2.String() } +func GetURIWithQuery(url *url.URL, path string, queryArgs map[string]string) string { + u2 := *url + u2.User = nil + u2.Path = path + q := u2.Query() + for key, value := range queryArgs { + q.Add(key, value) + } + u2.RawQuery = q.Encode() + return u2.String() +} + func EthToWei(eth *big.Int) *big.Float { if eth == nil { return big.NewFloat(0) @@ -108,3 +123,28 @@ func MustConnectBeaconNode(log *logrus.Entry, beaconNodeURI string, allowSyncing } return bn, syncStatus.HeadSlot } + +func ReverseBytes(src []byte) []byte { + dst := make([]byte, len(src)) + copy(dst, src) + for i := len(dst)/2 - 1; i >= 0; i-- { + opp := len(dst) - 1 - i + dst[i], dst[opp] = dst[opp], dst[i] + } + return dst +} + +func GetMemMB() uint64 { + var m runtime.MemStats + runtime.ReadMemStats(&m) + return m.Alloc / 1024 / 1024 +} + +// HumanBytes returns size in the same format as AWS S3 +func HumanBytes(n uint64) string { + s := humanize.IBytes(n) + s = strings.Replace(s, "KiB", "KB", 1) + s = strings.Replace(s, "MiB", "MB", 1) + s = strings.Replace(s, "GiB", "GB", 1) + return s +} diff --git a/common/utils_test.go b/common/utils_test.go index 510e03f..5747df9 100644 --- a/common/utils_test.go +++ b/common/utils_test.go @@ -3,6 +3,7 @@ package common import ( "testing" + "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" ) @@ -13,3 +14,22 @@ func TestSlotToTime(t *testing.T) { func TestTimeToSlot(t *testing.T) { require.Equal(t, uint64(6591598), TimeToSlot(SlotToTime(6591598))) } + +func TestBytesFormat(t *testing.T) { + n := uint64(795025173) + + s := humanize.Bytes(n) + require.Equal(t, "795 MB", s) + + s = humanize.IBytes(n) + require.Equal(t, "758 MiB", s) + + s = HumanBytes(n) + require.Equal(t, "758 MB", s) + + s = HumanBytes(n * 10) + require.Equal(t, "7.4 GB", s) + + s = HumanBytes(n / 1000) + require.Equal(t, "776 KB", s) +} diff --git a/docs/2024-06_bidcollect.md b/docs/2024-06_bidcollect.md new file mode 100644 index 0000000..b499f24 --- /dev/null +++ b/docs/2024-06_bidcollect.md @@ -0,0 +1,97 @@ +# Bid Collection + +Relayscan collects bids across [relays](../vars/relays.go) with these methods: + +1. [getHeader polling](https://ethereum.github.io/builder-specs/#/Builder/getHeader) +2. [Data API polling](https://flashbots.github.io/relay-specs/#/Data/getReceivedBids) +3. [Ultrasound top-bid websocket stream](https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md) + +Output: + +1. CSV file archive + +See also: + +- [Example output](https://gist.github.com/metachris/061c0443afb8b8d07eed477a848fa395) +- [Pull request #37](https://github.com/flashbots/relayscan/pull/37) +- Live data: https://bidarchive.relayscan.io + +--- + +## Notes on data sources + +Source types: +- `0`: `getHeader` polling +- `1`: Data API polling +- `2`: Ultrasound top-bid Websockets stream + +Different data sources have different limitations: + +- `getHeader` polling: + - The received header only has limited information, with these implications: + - Optimistic is always `false` + - No `builder_pubkey` + - No bid timestamp (need to use receive timestamp) + - getHeader bid timestamps are always when the response from polling at t=1s comes back (but not when the bid was received at a relay) + - Some relays only allow a single `getHeader` request per slot, so we time it at `t=1s` +- Data API polling: + - Has all the necessary information + - Due to rate limits, we only poll at specific times + - Polling at t-4, t-2, t-0.5, t+0.5, t+2 (see also [`services/bidcollect/data-api-poller.go`](services/bidcollect/data-api-poller.go#64-69)) +- Ultrasound websocket stream + - doesn't expose optimistic, thus that field is always `false` + +## Other notes + +- Bids are deduplicated based on this key: + - `fmt.Sprintf("%d-%s-%s-%s-%s", bid.Slot, bid.BlockHash, bid.ParentHash, bid.BuilderPubkey, bid.Value)` + - this means only the first bid for a given key is stored, even if - for instance - other relays also deliver the same bid + +--- + +## Running it + +By default, the collector will output CSV into `//.csv` + +```bash +# Start data API and ultrasound stream collectors +go run . service bidcollect --data-api --ultrasound-stream --all-relays + +# getHeader needs a beacon node too +go run . service bidcollect --get-header --beacon-uri http://localhost:3500 --all-relays +``` + +--- + +## Useful Clickhouse queries + +Useful [clickhouse-local](https://clickhouse.com/docs/en/operations/utilities/clickhouse-local) queries: + +```bash +$ clickhouse local -q "SELECT source_type, COUNT(source_type) FROM '2024-06-12_top.csv' GROUP BY source_type ORDER BY source_type;" +0 2929 +1 21249 +2 1057722 + +# Get bids > 1 ETH for specific builders (CSV has 10M rows) +$ time clickhouse local -q "SELECT count(value), quantile(0.5)(value) as p50, quantile(0.75)(value) as p75, quantile(0.9)(value) as p90, max(value) FROM '2024-06-05_all.csv' WHERE value > 1000000000000000000 AND builder_pubkey IN ('0xa01a00479f1fa442a8ebadb352be69091d07b0c0a733fae9166dae1b83179e326a968717da175c7363cd5a13e8580e8d', '0xa02a0054ea4ba422c88baccfdb1f43b2c805f01d1475335ea6647f69032da847a41c0e23796c6bed39b0ee11ab9772c6', '0xa03a000b0e3d1dc008f6075a1b1af24e6890bd674c26235ce95ac06e86f2bd3ccf4391df461b9e5d3ca654ef6b9e1ceb') FORMAT TabSeparatedWithNames;" +count(value) p50 p75 p90 max(value) +1842 1789830446982354000 2279820737908906200 4041286254343376400 8216794401676997763 + +real 0m2.202s +user 0m17.320s +sys 0m0.589s +``` + +--- + +## Architecture + +![Architecture](./img/bidcollect-overview.png) + + +--- + +## TODO + +- Dockerization \ No newline at end of file diff --git a/docs/img/bidcollect-overview.png b/docs/img/bidcollect-overview.png new file mode 100644 index 0000000..bd27cc7 Binary files /dev/null and b/docs/img/bidcollect-overview.png differ diff --git a/go.mod b/go.mod index 51edc7a..b59dee0 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,14 @@ go 1.22 require ( github.com/NYTimes/gziphandler v1.1.1 + github.com/dustin/go-humanize v1.0.1 github.com/ethereum/go-ethereum v1.11.6 + github.com/ferranbt/fastssz v0.1.3 github.com/flashbots/go-boost-utils v1.6.0 github.com/flashbots/go-utils v0.4.9 github.com/flashbots/mev-boost-relay v1.0.0-alpha4.0.20230519091033-0453fc247553 github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.5.1 github.com/jmoiron/sqlx v1.3.5 github.com/lib/pq v1.10.9 github.com/metachris/flashbotsrpc v0.5.0 @@ -19,7 +22,8 @@ require ( github.com/stretchr/testify v1.8.3 github.com/tdewolff/minify v2.3.6+incompatible go.uber.org/atomic v1.11.0 - golang.org/x/text v0.9.0 + go.uber.org/zap v1.24.0 + golang.org/x/text v0.15.0 ) require ( @@ -41,7 +45,6 @@ require ( github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/fatih/color v1.15.0 // indirect - github.com/ferranbt/fastssz v0.1.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/getsentry/sentry-go v0.21.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect @@ -52,7 +55,6 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/uuid v1.3.0 // indirect - github.com/gorilla/websocket v1.4.2 // indirect github.com/holiman/uint256 v1.2.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jarcoal/httpmock v1.2.0 // indirect @@ -90,11 +92,10 @@ require ( github.com/tklauser/numcpus v0.6.0 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.24.0 // indirect - golang.org/x/crypto v0.9.0 // indirect + golang.org/x/crypto v0.23.0 // indirect golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sys v0.8.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sys v0.20.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/protobuf v1.30.0 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect diff --git a/go.sum b/go.sum index c0fafff..2e8b7de 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,8 @@ github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498/go.mod h1:Mw6PkjjMXWbT github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk= github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= @@ -352,8 +354,9 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= @@ -753,8 +756,8 @@ golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWP golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= -golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= -golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -829,8 +832,8 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -906,8 +909,8 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -921,8 +924,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/scripts/bidcollect/bids-combine-and-upload-yesterday.sh b/scripts/bidcollect/bids-combine-and-upload-yesterday.sh new file mode 100755 index 0000000..97c264a --- /dev/null +++ b/scripts/bidcollect/bids-combine-and-upload-yesterday.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# +# This is a quick and dirty script to create a daily archive for yesterday and upload to Cloudflare R2 and AWS S3. +# +set -o errexit +set -o nounset +set -o pipefail +if [[ "${TRACE-0}" == "1" ]]; then + set -o xtrace +fi + +# print current date +echo "now: $(date)" + +# get yesterday's date +d=$(date -d yesterday '+%Y-%m-%d') +echo "upload for: $d" + +# change to project root directory +cd "$(dirname "$0")" +cd ../../ + +# load environment variables +source .env.prod + +# archive and upload! +./scripts/bidcollect/bids-combine-and-upload.sh "/mnt/data/relayscan-bids/$d/" + +# update website +make bids-website diff --git a/scripts/bidcollect/bids-combine-and-upload.sh b/scripts/bidcollect/bids-combine-and-upload.sh new file mode 100755 index 0000000..0b65530 --- /dev/null +++ b/scripts/bidcollect/bids-combine-and-upload.sh @@ -0,0 +1,85 @@ +#!/bin/bash +# +# Combine bid CSVs (from bidcollect) into a single CSV, and upload to R2/S3 +# +set -e + +# require directory as first argument +if [ -z "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +cd $1 +date=$(basename $1) +ym=${date:0:7} +echo $date +echo "" + +# ALL BIDS +fn_out="${date}_all.csv" +fn_out_zip="${fn_out}.zip" +rm -f $fn_out $fn_out_zip + +echo "Combining all bids..." +first="1" +for fn in $(\ls all*); do + echo "- ${fn}" + if [ $first == "1" ]; then + head -n 1 $fn > $fn_out + first="0" + fi + tail -n +2 $fn >> $fn_out +done + +wc -l $fn_out +zip ${fn_out_zip} $fn_out +echo "Wrote ${fn_out_zip}" +rm -f $fn_out +rm -f all*.csv + +# Upload +if [[ "${UPLOAD}" != "0" ]]; then + echo "Uploading to R2 and S3..." + aws --profile r2 s3 cp --no-progress "${fn_out_zip}" "s3://relayscan-bidarchive/ethereum/mainnet/${ym}/" --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com" + aws --profile s3 s3 cp --no-progress "${fn_out_zip}" "s3://relayscan-bidarchive/ethereum/mainnet/${ym}/" +fi + +if [[ "${DEL}" == "1" ]]; then + rm -f all* +fi + +echo "" + +# TOP BIDS +echo "Combining top bids..." +fn_out="${date}_top.csv" +fn_out_zip="${fn_out}.zip" +rm -f $fn_out $fn_out_zip + +first="1" +for fn in $(\ls top*); do + echo "- ${fn}" + if [ $first == "1" ]; then + head -n 1 $fn > $fn_out + first="0" + fi + tail -n +2 $fn >> $fn_out +done + +wc -l $fn_out +zip ${fn_out_zip} $fn_out +echo "Wrote ${fn_out_zip}" +rm -f $fn_out +rm -f top*.csv + +# Upload +if [[ "${UPLOAD}" != "0" ]]; then + echo "Uploading to R2 and S3..." + aws --profile r2 s3 cp --no-progress "${fn_out_zip}" "s3://relayscan-bidarchive/ethereum/mainnet/${ym}/" --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com" + aws --profile s3 s3 cp --no-progress "${fn_out_zip}" "s3://relayscan-bidarchive/ethereum/mainnet/${ym}/" +fi + +if [[ "${DEL}" == "1" ]]; then + rm -f top* +fi diff --git a/scripts/bidcollect/s3/get-files.sh b/scripts/bidcollect/s3/get-files.sh new file mode 100755 index 0000000..71405bd --- /dev/null +++ b/scripts/bidcollect/s3/get-files.sh @@ -0,0 +1,8 @@ +#!/bin/bash +# require one argument +if [ $# -ne 1 ]; then + echo "Usage: $0 " + exit 1 +fi + +aws --profile r2 s3 ls s3://relayscan-bidarchive/$1 --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com" \ No newline at end of file diff --git a/scripts/bidcollect/s3/get-folders.sh b/scripts/bidcollect/s3/get-folders.sh new file mode 100755 index 0000000..18c47ef --- /dev/null +++ b/scripts/bidcollect/s3/get-folders.sh @@ -0,0 +1,2 @@ +#!/bin/bash +aws --profile r2 s3 ls s3://relayscan-bidarchive/$1 --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com" | awk '{ print $2 }' \ No newline at end of file diff --git a/scripts/bidcollect/s3/upload-file-to-r2.sh b/scripts/bidcollect/s3/upload-file-to-r2.sh new file mode 100755 index 0000000..55d5e8e --- /dev/null +++ b/scripts/bidcollect/s3/upload-file-to-r2.sh @@ -0,0 +1,16 @@ +#!/bin/bash +src=$1 +target=$2 +if [ -z "$src" ]; then + echo "Usage: $0 ["] + exit 1 +fi + +# auto-fill target if not given +if [ -z "$target" ]; then + # remove "/mnt/data/relayscan-bidarchive/" prefix from src and make it the S3 prefix + target="/ethereum/mainnet/${src#"/mnt/data/relayscan-bidarchive/"}" +fi + +echo "uploading $src to S3 $target ..." +aws --profile r2 s3 cp $src s3://relayscan-bidarchive$target --endpoint-url "https://${CLOUDFLARE_R2_ACCOUNT_ID}.r2.cloudflarestorage.com" diff --git a/services/bidcollect/bid-processor.go b/services/bidcollect/bid-processor.go new file mode 100644 index 0000000..47c2ac7 --- /dev/null +++ b/services/bidcollect/bid-processor.go @@ -0,0 +1,244 @@ +package bidcollect + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/flashbots/relayscan/common" + "github.com/sirupsen/logrus" +) + +// Goals: +// 1. Dedup bids +// 2. Save bids to CSV +// - One CSV for all bids +// - One CSV for top bids only + +type BidProcessorOpts struct { + Log *logrus.Entry + OutDir string + OutputTSV bool +} + +type OutFiles struct { + FAll *os.File + FTop *os.File +} + +type BidProcessor struct { + opts *BidProcessorOpts + log *logrus.Entry + + outFiles map[int64]*OutFiles // map[slot][bidUniqueKey]Bid + outFilesLock sync.RWMutex + + bidCache map[uint64]map[string]*CommonBid // map[slot][bidUniqueKey]Bid + topBidCache map[uint64]*CommonBid // map[slot]Bid + bidCacheLock sync.RWMutex + + csvSeparator string + csvFileEnding string +} + +func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor { + c := &BidProcessor{ + log: opts.Log, + opts: opts, + outFiles: make(map[int64]*OutFiles), + bidCache: make(map[uint64]map[string]*CommonBid), + topBidCache: make(map[uint64]*CommonBid), + } + + if opts.OutputTSV { + c.csvSeparator = "\t" + c.csvFileEnding = "tsv" + } else { + c.csvSeparator = "," + c.csvFileEnding = "csv" + } + + return c +} + +func (c *BidProcessor) Start() { + for { + time.Sleep(30 * time.Second) + c.housekeeping() + } +} + +func (c *BidProcessor) processBids(bids []*CommonBid) { + c.bidCacheLock.Lock() + defer c.bidCacheLock.Unlock() + + var isTopBid, isNewBid bool + for _, bid := range bids { + isNewBid, isTopBid = false, false + if _, ok := c.bidCache[bid.Slot]; !ok { + c.bidCache[bid.Slot] = make(map[string]*CommonBid) + } + + // Check if bid is new top bid + if topBid, ok := c.topBidCache[bid.Slot]; !ok { + c.topBidCache[bid.Slot] = bid // first one for the slot + isTopBid = true + } else { + // if current bid has higher value, use it as new top bid + if bid.ValueAsBigInt().Cmp(topBid.ValueAsBigInt()) == 1 { + c.topBidCache[bid.Slot] = bid + isTopBid = true + } + } + + // process regular bids only once per unique key (slot+blockhash+parenthash+builderpubkey+value) + if _, ok := c.bidCache[bid.Slot][bid.UniqueKey()]; !ok { + // yet unknown bid, save it + c.bidCache[bid.Slot][bid.UniqueKey()] = bid + isNewBid = true + } + + // Write to CSV + c.writeBidToFile(bid, isNewBid, isTopBid) + } +} + +func (c *BidProcessor) writeBidToFile(bid *CommonBid, isNewBid, isTopBid bool) { + fAll, fTop, err := c.getFiles(bid) + if err != nil { + c.log.WithError(err).Error("get get output file") + return + } + if isNewBid { + _, err = fmt.Fprint(fAll, bid.ToCSVLine(c.csvSeparator)+"\n") + if err != nil { + c.log.WithError(err).Error("couldn't write bid to file") + return + } + } + if isTopBid { + _, err = fmt.Fprint(fTop, bid.ToCSVLine(c.csvSeparator)+"\n") + if err != nil { + c.log.WithError(err).Error("couldn't write bid to file") + return + } + } +} + +func (c *BidProcessor) getFiles(bid *CommonBid) (fAll, fTop *os.File, err error) { + // hourlybucket + sec := int64(bucketMinutes * 60) + bucketTS := bid.ReceivedAtMs / 1000 / sec * sec // timestamp down-round to start of bucket + t := time.Unix(bucketTS, 0).UTC() + + // files may already be opened + c.outFilesLock.RLock() + outFiles, outFilesOk := c.outFiles[bucketTS] + c.outFilesLock.RUnlock() + + if outFilesOk { + return outFiles.FAll, outFiles.FTop, nil + } + + // Create output directory + dir := filepath.Join(c.opts.OutDir, t.Format(time.DateOnly)) + err = os.MkdirAll(dir, os.ModePerm) + if err != nil { + return nil, nil, err + } + + // Open ALL BIDS CSV + fnAll := filepath.Join(dir, c.getFilename("all", bucketTS)) + fAll, err = os.OpenFile(fnAll, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) + if err != nil { + return nil, nil, err + } + fi, err := fAll.Stat() + if err != nil { + c.log.WithError(err).Fatal("failed stat on output file") + } + if fi.Size() == 0 { + _, err = fmt.Fprint(fAll, strings.Join(CommonBidCSVFields, c.csvSeparator)+"\n") + if err != nil { + c.log.WithError(err).Fatal("failed to write header to output file") + } + } + + // Open TOP BIDS CSV + fnTop := filepath.Join(dir, c.getFilename("top", bucketTS)) + fTop, err = os.OpenFile(fnTop, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600) + if err != nil { + return nil, nil, err + } + fi, err = fTop.Stat() + if err != nil { + c.log.WithError(err).Fatal("failed stat on output file") + } + if fi.Size() == 0 { + _, err = fmt.Fprint(fTop, strings.Join(CommonBidCSVFields, c.csvSeparator)+"\n") + if err != nil { + c.log.WithError(err).Fatal("failed to write header to output file") + } + } + + outFiles = &OutFiles{ + FAll: fAll, + FTop: fTop, + } + c.outFilesLock.Lock() + c.outFiles[bucketTS] = outFiles + c.outFilesLock.Unlock() + + c.log.Infof("[bid-processor] created output file: %s", fnAll) + c.log.Infof("[bid-processor] created output file: %s", fnTop) + return fAll, fTop, nil +} + +func (c *BidProcessor) getFilename(prefix string, timestamp int64) string { + t := time.Unix(timestamp, 0).UTC() + if prefix != "" { + prefix += "_" + } + return fmt.Sprintf("%s%s.%s", prefix, t.Format("2006-01-02_15-04"), c.csvFileEnding) +} + +func (c *BidProcessor) housekeeping() { + currentSlot := common.TimeToSlot(time.Now().UTC()) + maxSlotInCache := currentSlot - 3 + + nDeleted := 0 + nBids := 0 + + c.bidCacheLock.Lock() + defer c.bidCacheLock.Unlock() + for slot := range c.bidCache { + if slot < maxSlotInCache { + delete(c.bidCache, slot) + nDeleted += 1 + } else { + nBids += len(c.bidCache[slot]) + } + } + + // Close and remove old files + now := time.Now().UTC().Unix() + filesBefore := len(c.outFiles) + c.outFilesLock.Lock() + for timestamp, outFiles := range c.outFiles { + usageSec := bucketMinutes * 60 * 2 + if now-timestamp > int64(usageSec) { // remove all handles from 2x usage seconds ago + c.log.Info("closing output files", timestamp) + delete(c.outFiles, timestamp) + _ = outFiles.FAll.Close() + _ = outFiles.FTop.Close() + } + } + nFiles := len(c.outFiles) + filesClosed := len(c.outFiles) - filesBefore + c.outFilesLock.Unlock() + + c.log.Infof("[bid-processor] cleanupBids - deleted slots: %d / total slots: %d / total bids: %d / files closed: %d, current: %d / memUsedMB: %d", nDeleted, len(c.bidCache), nBids, filesClosed, nFiles, common.GetMemMB()) +} diff --git a/services/bidcollect/bidcollector.go b/services/bidcollect/bidcollector.go new file mode 100644 index 0000000..7c16de3 --- /dev/null +++ b/services/bidcollect/bidcollector.go @@ -0,0 +1,101 @@ +// Package bidcollect contains code for bid collection from various sources. +package bidcollect + +import ( + "github.com/flashbots/relayscan/common" + "github.com/sirupsen/logrus" +) + +type BidCollectorOpts struct { + Log *logrus.Entry + + CollectUltrasoundStream bool + CollectGetHeader bool + CollectDataAPI bool + + Relays []common.RelayEntry + BeaconNodeURI string // for getHeader + + OutDir string + OutputTSV bool +} + +type BidCollector struct { + opts *BidCollectorOpts + log *logrus.Entry + + ultrasoundBidC chan UltrasoundStreamBidsMsg + dataAPIBidC chan DataAPIPollerBidsMsg + getHeaderBidC chan GetHeaderPollerBidsMsg + + processor *BidProcessor +} + +func NewBidCollector(opts *BidCollectorOpts) *BidCollector { + c := &BidCollector{ + log: opts.Log, + opts: opts, + } + + if c.opts.OutDir == "" { + opts.Log.Fatal("outDir is required") + } + + // inputs + c.dataAPIBidC = make(chan DataAPIPollerBidsMsg, bidCollectorInputChannelSize) + c.ultrasoundBidC = make(chan UltrasoundStreamBidsMsg, bidCollectorInputChannelSize) + c.getHeaderBidC = make(chan GetHeaderPollerBidsMsg, bidCollectorInputChannelSize) + + // output + c.processor = NewBidProcessor(&BidProcessorOpts{ + Log: opts.Log, + OutDir: opts.OutDir, + OutputTSV: opts.OutputTSV, + }) + return c +} + +func (c *BidCollector) MustStart() { + go c.processor.Start() + + if c.opts.CollectGetHeader { + poller := NewGetHeaderPoller(&GetHeaderPollerOpts{ + Log: c.log, + BidC: c.getHeaderBidC, + BeaconURI: c.opts.BeaconNodeURI, + Relays: c.opts.Relays, + }) + go poller.Start() + } + + if c.opts.CollectDataAPI { + poller := NewDataAPIPoller(&DataAPIPollerOpts{ + Log: c.log, + BidC: c.dataAPIBidC, + Relays: c.opts.Relays, + }) + go poller.Start() + } + + if c.opts.CollectUltrasoundStream { + ultrasoundStream := NewUltrasoundStreamConnection(UltrasoundStreamOpts{ + Log: c.log, + BidC: c.ultrasoundBidC, + }) + go ultrasoundStream.Start() + } + + for { + select { + case bid := <-c.ultrasoundBidC: + commonBid := UltrasoundStreamToCommonBid(&bid) + c.processor.processBids([]*CommonBid{commonBid}) + case bids := <-c.dataAPIBidC: + commonBids := DataAPIToCommonBids(bids) + c.processor.processBids(commonBids) + case bid := <-c.getHeaderBidC: + commonBid := GetHeaderToCommonBid(bid) + c.processor.processBids([]*CommonBid{commonBid}) + } + } +} diff --git a/services/bidcollect/consts.go b/services/bidcollect/consts.go new file mode 100644 index 0000000..16df1cd --- /dev/null +++ b/services/bidcollect/consts.go @@ -0,0 +1,22 @@ +package bidcollect + +const ( + SourceTypeGetHeader = 0 + SourceTypeDataAPI = 1 + SourceTypeUltrasoundStream = 2 + + ultrasoundStreamDefaultURL = "ws://relay-builders-eu.ultrasound.money/ws/v1/top_bid" + initialBackoffSec = 5 + maxBackoffSec = 120 + + // bucketMinutes is the number of minutes to write into each CSV file (i.e. new file created for every X minutes bucket) + bucketMinutes = 60 + + // channel size for bid collector inputs + bidCollectorInputChannelSize = 1000 +) + +var ( +// csvFileEnding = relaycommon.GetEnv("CSV_FILE_END", "tsv") +// csvSeparator = relaycommon.GetEnv("CSV_SEP", "\t") +) diff --git a/services/bidcollect/data-api-poller.go b/services/bidcollect/data-api-poller.go new file mode 100644 index 0000000..fea2aa1 --- /dev/null +++ b/services/bidcollect/data-api-poller.go @@ -0,0 +1,128 @@ +package bidcollect + +import ( + "context" + "fmt" + "net/http" + "time" + + relaycommon "github.com/flashbots/mev-boost-relay/common" + "github.com/flashbots/relayscan/common" + "github.com/sirupsen/logrus" +) + +type DataAPIPollerBidsMsg struct { + Bids []relaycommon.BidTraceV2WithTimestampJSON + Relay common.RelayEntry + ReceivedAt time.Time +} + +type DataAPIPollerOpts struct { + Log *logrus.Entry + BidC chan DataAPIPollerBidsMsg + Relays []common.RelayEntry +} + +type DataAPIPoller struct { + Log *logrus.Entry + BidC chan DataAPIPollerBidsMsg + Relays []common.RelayEntry +} + +func NewDataAPIPoller(opts *DataAPIPollerOpts) *DataAPIPoller { + return &DataAPIPoller{ + Log: opts.Log, + BidC: opts.BidC, + Relays: opts.Relays, + } +} + +func (poller *DataAPIPoller) Start() { + poller.Log.WithField("relays", common.RelayEntriesToHostnameStrings(poller.Relays)).Info("Starting DataAPIPoller ...") + + // initially, wait until start of next slot + t := time.Now().UTC() + slot := common.TimeToSlot(t) + nextSlot := slot + 1 + tNextSlot := common.SlotToTime(nextSlot) + untilNextSlot := tNextSlot.Sub(t) + + poller.Log.Infof("[data-api poller] waiting until start of next slot (%d - %s from now)", nextSlot, untilNextSlot.String()) + time.Sleep(untilNextSlot) + + // then run polling loop + for { + // calculate next slot details + t := time.Now().UTC() + slot := common.TimeToSlot(t) + nextSlot := slot + 1 + tNextSlot := common.SlotToTime(nextSlot) + untilNextSlot := tNextSlot.Sub(t) + + poller.Log.Infof("[data-api poller] scheduling polling for upcoming slot: %d (%s - in %s)", nextSlot, tNextSlot.String(), untilNextSlot.String()) + + // Schedule polling at t-4, t-2, t=0, t=2 + go poller.pollRelaysForBids(nextSlot, -4*time.Second) + go poller.pollRelaysForBids(nextSlot, -2*time.Second) + go poller.pollRelaysForBids(nextSlot, -500*time.Millisecond) + go poller.pollRelaysForBids(nextSlot, 500*time.Millisecond) + go poller.pollRelaysForBids(nextSlot, 2*time.Second) + + // wait until next slot + time.Sleep(untilNextSlot) + } +} + +// pollRelaysForBids will poll data api for given slot with t seconds offset +func (poller *DataAPIPoller) pollRelaysForBids(slot uint64, tOffset time.Duration) { + tSlotStart := common.SlotToTime(slot) + tStart := tSlotStart.Add(tOffset) + waitTime := tStart.Sub(time.Now().UTC()) + + // poller.Log.Debugf("[data-api poller] - prepare polling for slot %d t %d (tSlotStart: %s, tStart: %s, waitTime: %s)", slot, t, tSlotStart.String(), tStart.String(), waitTime.String()) + if waitTime < 0 { + poller.Log.Debugf("[data-api poller] waitTime is negative: %s", waitTime.String()) + return + } + + // Wait until expected time + time.Sleep(waitTime) + + // Poll for bids now + untilSlot := tSlotStart.Sub(time.Now().UTC()) + poller.Log.Debugf("[data-api poller] polling for slot %d at t=%s (tNow=%s)", slot, tOffset.String(), (untilSlot * -1).String()) + + for _, relay := range poller.Relays { + go poller._pollRelayForBids(slot, relay, tOffset) + } +} + +func (poller *DataAPIPoller) _pollRelayForBids(slot uint64, relay common.RelayEntry, t time.Duration) { + // log := poller.Log.WithField("relay", relay.Hostname()).WithField("slot", slot) + log := poller.Log.WithFields(logrus.Fields{ + "relay": relay.Hostname(), + "slot": slot, + "t": t.String(), + }) + // log.Debugf("[data-api poller] polling relay %s for slot %d", relay.Hostname(), slot) + + // build query URL + path := "/relay/v1/data/bidtraces/builder_blocks_received" + url := common.GetURIWithQuery(relay.URL, path, map[string]string{"slot": fmt.Sprintf("%d", slot)}) + // log.Debugf("[data-api poller] Querying %s", url) + + // start query + var data []relaycommon.BidTraceV2WithTimestampJSON + timeRequestStart := time.Now().UTC() + code, err := common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &data) + timeRequestEnd := time.Now().UTC() + if err != nil { + log.WithError(err).Error("[data-api poller] failed to get data") + return + } + log = log.WithFields(logrus.Fields{"code": code, "entries": len(data), "durationMs": timeRequestEnd.Sub(timeRequestStart).Milliseconds()}) + log.Debug("[data-api poller] request complete") + + // send data to channel + poller.BidC <- DataAPIPollerBidsMsg{Bids: data, Relay: relay, ReceivedAt: time.Now().UTC()} +} diff --git a/services/bidcollect/getheader-poller.go b/services/bidcollect/getheader-poller.go new file mode 100644 index 0000000..5c61e83 --- /dev/null +++ b/services/bidcollect/getheader-poller.go @@ -0,0 +1,195 @@ +package bidcollect + +import ( + "context" + "fmt" + "net/http" + "strings" + "time" + + "github.com/flashbots/go-boost-utils/types" + "github.com/flashbots/mev-boost-relay/beaconclient" + relaycommon "github.com/flashbots/mev-boost-relay/common" + "github.com/flashbots/relayscan/common" + "github.com/sirupsen/logrus" +) + +type GetHeaderPollerBidsMsg struct { + Slot uint64 + Bid types.GetHeaderResponse + Relay common.RelayEntry + ReceivedAt time.Time +} + +type GetHeaderPollerOpts struct { + Log *logrus.Entry + BidC chan GetHeaderPollerBidsMsg + BeaconURI string + Relays []common.RelayEntry +} + +type GetHeaderPoller struct { + log *logrus.Entry + bidC chan GetHeaderPollerBidsMsg + relays []common.RelayEntry + bn *beaconclient.ProdBeaconInstance +} + +func NewGetHeaderPoller(opts *GetHeaderPollerOpts) *GetHeaderPoller { + return &GetHeaderPoller{ + log: opts.Log, + bidC: opts.BidC, + relays: opts.Relays, + bn: beaconclient.NewProdBeaconInstance(opts.Log, opts.BeaconURI), + } +} + +func (poller *GetHeaderPoller) Start() { + poller.log.WithField("relays", common.RelayEntriesToHostnameStrings(poller.relays)).Info("Starting GetHeaderPoller ...") + + // Check beacon-node sync status, process current slot and start slot updates + syncStatus, err := poller.bn.SyncStatus() + if err != nil { + poller.log.WithError(err).Fatal("couldn't get BN sync status") + } else if syncStatus.IsSyncing { + poller.log.Fatal("beacon node is syncing") + } + + // var headSlot uint64 + var headSlot, nextSlot, currentEpoch, lastDutyUpdateEpoch uint64 + var duties map[uint64]string + + // subscribe to head events (because then, the BN will know the block + proposer details for the next slot) + c := make(chan beaconclient.HeadEventData) + go poller.bn.SubscribeToHeadEvents(c) + + // then run polling loop + for { + headEvent := <-c + if headEvent.Slot <= headSlot { + continue + } + + headSlot = headEvent.Slot + nextSlot = headSlot + 1 + tNextSlot := common.SlotToTime(nextSlot) + untilNextSlot := tNextSlot.Sub(time.Now().UTC()) + + currentEpoch = headSlot / relaycommon.SlotsPerEpoch + poller.log.Infof("[getHeader poller] headSlot slot: %d / next slot: %d (%s), waitTime: %s", headSlot, nextSlot, tNextSlot.String(), untilNextSlot.String()) + + // On every new epoch, get proposer duties for current and next epoch (to avoid boundary problems) + if len(duties) == 0 || currentEpoch > lastDutyUpdateEpoch { + dutiesResp, err := poller.bn.GetProposerDuties(currentEpoch) + if err != nil { + poller.log.WithError(err).Error("couldn't get proposer duties") + continue + } + + duties = make(map[uint64]string) + for _, d := range dutiesResp.Data { + duties[d.Slot] = d.Pubkey + } + + dutiesResp, err = poller.bn.GetProposerDuties(currentEpoch + 1) + if err != nil { + poller.log.WithError(err).Error("failed get proposer duties") + } else { + for _, d := range dutiesResp.Data { + duties[d.Slot] = d.Pubkey + } + } + poller.log.Debugf("[getHeader poller] duties updated: %d entries", len(duties)) + lastDutyUpdateEpoch = currentEpoch + } + + // Now get the latest block, for the execution payload + block, err := poller.bn.GetBlock("head") + if err != nil { + poller.log.WithError(err).Error("failed get latest block from BN") + continue + } + + if block.Data.Message.Slot != headSlot { + poller.log.WithField("slot", headSlot).WithField("bnSlot", block.Data.Message.Slot).Error("latest block slot is not current slot") + continue + } + + nextProposerPubkey := duties[nextSlot] + poller.log.Debugf("[getHeader poller] next slot: %d / block: %s / parent: %s / proposerPubkey: %s", nextSlot, block.Data.Message.Body.ExecutionPayload.BlockHash.String(), block.Data.Message.Body.ExecutionPayload.ParentHash, nextProposerPubkey) + + if nextProposerPubkey == "" { + poller.log.WithField("duties", duties).Error("no proposerPubkey for next slot") + } else { + // go poller.pollRelaysForBids(0*time.Second, nextSlot, block.Data.Message.Body.ExecutionPayload.BlockHash.String(), duties[nextSlot]) + go poller.pollRelaysForBids(1000*time.Millisecond, nextSlot, block.Data.Message.Body.ExecutionPayload.BlockHash.String(), duties[nextSlot]) + } + } +} + +// pollRelaysForBids will poll data api for given slot with t seconds offset +func (poller *GetHeaderPoller) pollRelaysForBids(tOffset time.Duration, slot uint64, parentHash, proposerPubkey string) { + tSlotStart := common.SlotToTime(slot) + tStart := tSlotStart.Add(tOffset) + waitTime := tStart.Sub(time.Now().UTC()) + + // poller.Log.Debugf("[getHeader poller] - prepare polling for slot %d t %d (tSlotStart: %s, tStart: %s, waitTime: %s)", slot, t, tSlotStart.String(), tStart.String(), waitTime.String()) + if waitTime < 0 { + poller.log.Debugf("[getHeader poller] waitTime is negative: %s", waitTime.String()) + return + } + + // Wait until expected time + time.Sleep(waitTime) + + // Poll for bids now + untilSlot := tSlotStart.Sub(time.Now().UTC()) + poller.log.Debugf("[getHeader poller] polling for slot %d at t=%s (tNow=%s)", slot, tOffset.String(), (untilSlot * -1).String()) + + for _, relay := range poller.relays { + go poller._pollRelayForBids(relay, tOffset, slot, parentHash, proposerPubkey) + } +} + +func (poller *GetHeaderPoller) _pollRelayForBids(relay common.RelayEntry, t time.Duration, slot uint64, parentHash, proposerPubkey string) { + // log := poller.Log.WithField("relay", relay.Hostname()).WithField("slot", slot) + log := poller.log.WithFields(logrus.Fields{ + "relay": relay.Hostname(), + "slot": slot, + "t": t.String(), + }) + log.Debugf("[getHeader poller] polling relay %s for slot %d", relay.Hostname(), slot) + + path := fmt.Sprintf("/eth/v1/builder/header/%d/%s/%s", slot, parentHash, proposerPubkey) + url := relay.GetURI(path) + // log.Debugf("Querying %s", url) + + var bid types.GetHeaderResponse + timeRequestStart := time.Now().UTC() + code, err := common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &bid) + timeRequestEnd := time.Now().UTC() + if err != nil { + msg := err.Error() + if strings.Contains(msg, "no builder bid") { + return + } else if strings.Contains(msg, "Too many getHeader requests! Use relay-analytics.ultrasound.money or the Websocket API") { + return + } else if code == 429 { + log.Warn("429 received") + return + } + log.WithFields(logrus.Fields{ + "code": code, + "url": url, + }).WithError(err).Error("error on getHeader request") + return + } + if code != 200 { + log.WithField("code", code).Debug("no bid received") + return + } + log.WithField("durationMs", timeRequestEnd.Sub(timeRequestStart).Milliseconds()).Infof("bid received! slot: %d \t value: %s \t block_hash: %s \t timestamp: %d", slot, bid.Data.Message.Value.String(), bid.Data.Message.Header.BlockHash.String(), bid.Data.Message.Header.Timestamp) + + // send data to channel + poller.bidC <- GetHeaderPollerBidsMsg{Slot: slot, Bid: bid, Relay: relay, ReceivedAt: time.Now().UTC()} +} diff --git a/services/bidcollect/types.go b/services/bidcollect/types.go new file mode 100644 index 0000000..5d37a7a --- /dev/null +++ b/services/bidcollect/types.go @@ -0,0 +1,159 @@ +package bidcollect + +import ( + "fmt" + "math/big" + "strings" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/flashbots/relayscan/common" +) + +var CommonBidCSVFields = []string{ + "source_type", "received_at_ms", + "timestamp_ms", + "slot", "slot_t_ms", "value", + "block_hash", "parent_hash", "builder_pubkey", "block_number", + "block_fee_recipient", + "relay", "proposer_pubkey", "proposer_fee_recipient", "optimistic_submission", +} + +type CommonBid struct { + // Collector-internal fields + SourceType int `json:"source_type"` + ReceivedAtMs int64 `json:"received_at"` + + // Common fields + Timestamp int64 `json:"timestamp"` + Slot uint64 `json:"slot"` + BlockNumber uint64 `json:"block_number"` + BlockHash string `json:"block_hash"` + ParentHash string `json:"parent_hash"` + BuilderPubkey string `json:"builder_pubkey"` + Value string `json:"value"` + + // Ultrasound top-bid stream - https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md + BlockFeeRecipient string `json:"block_fee_recipient"` + + // Data API + // - Ultrasound: https://relay-analytics.ultrasound.money/relay/v1/data/bidtraces/builder_blocks_received?slot=9194844 + // - Flashbots: https://boost-relay.flashbots.net/relay/v1/data/bidtraces/builder_blocks_received?slot=8969837 + Relay string `json:"relay"` + TimestampMs int64 `json:"timestamp_ms"` + ProposerPubkey string `json:"proposer_pubkey"` + ProposerFeeRecipient string `json:"proposer_fee_recipient"` + OptimisticSubmission bool `json:"optimistic_submission"` + + // getHeader +} + +func (bid *CommonBid) UniqueKey() string { + return fmt.Sprintf("%d-%s-%s-%s-%s", bid.Slot, bid.BlockHash, bid.ParentHash, bid.BuilderPubkey, bid.Value) +} + +func (bid *CommonBid) ValueAsBigInt() *big.Int { + value := new(big.Int) + value.SetString(bid.Value, 10) + return value +} + +func (bid *CommonBid) ToCSVFields() []string { + tsMs := bid.TimestampMs + if tsMs == 0 { + if bid.Timestamp > 0 { + tsMs = bid.Timestamp * 1000 + } else { + tsMs = bid.ReceivedAtMs // fallback for getHeader bids (which don't include the bid timestamp) + } + } + + slotTms := tsMs - common.SlotToTime(bid.Slot).UnixMilli() + + return []string{ + // Collector-internal fields + fmt.Sprint(bid.SourceType), fmt.Sprint(bid.ReceivedAtMs), + + // Common fields + fmt.Sprint(tsMs), + fmt.Sprint(bid.Slot), fmt.Sprint(slotTms), bid.Value, + bid.BlockHash, bid.ParentHash, bid.BuilderPubkey, fmt.Sprint(bid.BlockNumber), + + // Ultrasound top-bid stream + bid.BlockFeeRecipient, + + // Data API + bid.Relay, bid.ProposerPubkey, bid.ProposerFeeRecipient, boolToString(bid.OptimisticSubmission), + } +} + +func (bid *CommonBid) ToCSVLine(separator string) string { + return strings.Join(bid.ToCSVFields(), separator) +} + +func boolToString(b bool) string { + if b { + return "true" + } + return "false" +} + +func UltrasoundStreamToCommonBid(bid *UltrasoundStreamBidsMsg) *CommonBid { + blockHash := hexutil.Encode(bid.Bid.BlockHash[:]) + parentHash := hexutil.Encode(bid.Bid.ParentHash[:]) + builderPubkey := hexutil.Encode(bid.Bid.BuilderPubkey[:]) + blockFeeRecipient := hexutil.Encode(bid.Bid.FeeRecipient[:]) + + return &CommonBid{ + SourceType: SourceTypeUltrasoundStream, + ReceivedAtMs: bid.ReceivedAt.UnixMilli(), + + Timestamp: int64(bid.Bid.Timestamp) / 1000, + TimestampMs: int64(bid.Bid.Timestamp), + Slot: bid.Bid.Slot, + BlockNumber: bid.Bid.BlockNumber, + BlockHash: strings.ToLower(blockHash), + ParentHash: strings.ToLower(parentHash), + BuilderPubkey: strings.ToLower(builderPubkey), + Value: bid.Bid.Value.String(), + BlockFeeRecipient: strings.ToLower(blockFeeRecipient), + Relay: bid.Relay, + } +} + +func DataAPIToCommonBids(bids DataAPIPollerBidsMsg) []*CommonBid { + commonBids := make([]*CommonBid, 0, len(bids.Bids)) + for _, bid := range bids.Bids { + commonBids = append(commonBids, &CommonBid{ + SourceType: SourceTypeDataAPI, + ReceivedAtMs: bids.ReceivedAt.UnixMilli(), + + Timestamp: bid.Timestamp, + TimestampMs: bid.TimestampMs, + Slot: bid.Slot, + BlockNumber: bid.BlockNumber, + BlockHash: strings.ToLower(bid.BlockHash), + ParentHash: strings.ToLower(bid.ParentHash), + BuilderPubkey: strings.ToLower(bid.BuilderPubkey), + Value: bid.Value, + Relay: bids.Relay.Hostname(), + ProposerPubkey: strings.ToLower(bid.ProposerPubkey), + ProposerFeeRecipient: strings.ToLower(bid.ProposerFeeRecipient), + OptimisticSubmission: bid.OptimisticSubmission, + }) + } + return commonBids +} + +func GetHeaderToCommonBid(bid GetHeaderPollerBidsMsg) *CommonBid { + return &CommonBid{ + SourceType: SourceTypeGetHeader, + ReceivedAtMs: bid.ReceivedAt.UnixMilli(), + Relay: bid.Relay.Hostname(), + Slot: bid.Slot, + + BlockNumber: bid.Bid.Data.Message.Header.BlockNumber, + BlockHash: strings.ToLower(bid.Bid.Data.Message.Header.BlockHash.String()), + ParentHash: strings.ToLower(bid.Bid.Data.Message.Header.ParentHash.String()), + Value: bid.Bid.Data.Message.Value.String(), + } +} diff --git a/services/bidcollect/types_test.go b/services/bidcollect/types_test.go new file mode 100644 index 0000000..2b61b4b --- /dev/null +++ b/services/bidcollect/types_test.go @@ -0,0 +1,13 @@ +package bidcollect + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSourceTypes(t *testing.T) { + require.Equal(t, 0, SourceTypeGetHeader) + require.Equal(t, 1, SourceTypeDataAPI) + require.Equal(t, 2, SourceTypeUltrasoundStream) +} diff --git a/services/bidcollect/ultrasound-stream.go b/services/bidcollect/ultrasound-stream.go new file mode 100644 index 0000000..97be61c --- /dev/null +++ b/services/bidcollect/ultrasound-stream.go @@ -0,0 +1,99 @@ +package bidcollect + +import ( + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/flashbots/relayscan/common" + "github.com/gorilla/websocket" + "github.com/sirupsen/logrus" +) + +type UltrasoundStreamBidsMsg struct { + Bid common.UltrasoundStreamBid + Relay string + ReceivedAt time.Time +} + +type UltrasoundStreamOpts struct { + Log *logrus.Entry + BidC chan UltrasoundStreamBidsMsg +} + +type UltrasoundStreamConnection struct { + log *logrus.Entry + url string + bidC chan UltrasoundStreamBidsMsg + backoffSec int +} + +func NewUltrasoundStreamConnection(opts UltrasoundStreamOpts) *UltrasoundStreamConnection { + return &UltrasoundStreamConnection{ + log: opts.Log, + url: ultrasoundStreamDefaultURL, + bidC: opts.BidC, + backoffSec: initialBackoffSec, + } +} + +func (ustream *UltrasoundStreamConnection) Start() { + ustream.connect() +} + +func (ustream *UltrasoundStreamConnection) reconnect() { + backoffDuration := time.Duration(ustream.backoffSec) * time.Second + ustream.log.Infof("[ultrasounds-stream] reconnecting to ultrasound stream in %s sec ...", backoffDuration.String()) + time.Sleep(backoffDuration) + + // increase backoff timeout for next try + ustream.backoffSec *= 2 + if ustream.backoffSec > maxBackoffSec { + ustream.backoffSec = maxBackoffSec + } + + ustream.connect() +} + +func (ustream *UltrasoundStreamConnection) connect() { + ustream.log.WithField("uri", ustream.url).Info("[ultrasounds-stream] Starting bid stream...") + + dialer := websocket.DefaultDialer + wsSubscriber, resp, err := dialer.Dial(ustream.url, nil) + if err != nil { + ustream.log.WithError(err).Error("[ultrasounds-stream] failed to connect to bloxroute, reconnecting in a bit...") + go ustream.reconnect() + return + } + defer wsSubscriber.Close() + defer resp.Body.Close() + + ustream.log.Info("[ultrasounds-stream] stream connection successful") + ustream.backoffSec = initialBackoffSec // reset backoff timeout + + bid := new(common.UltrasoundStreamBid) + + for { + _, nextNotification, err := wsSubscriber.ReadMessage() + if err != nil { + // Handle websocket errors, by closing and reconnecting. Errors seen previously: + ustream.log.WithError(err).Error("ultrasound stream websocket error") + go ustream.reconnect() + return + } + + // nc.log.WithField("msg", hexutil.Encode(nextNotification)).Info("got message from ultrasound stream") + + // Unmarshal SSZ + err = bid.UnmarshalSSZ(nextNotification) + if err != nil { + ustream.log.WithError(err).WithField("msg", hexutil.Encode(nextNotification)).Error("[ultrasounds-stream] failed to unmarshal ultrasound stream message") + continue + } + + ustream.bidC <- UltrasoundStreamBidsMsg{ + Bid: *bid, + Relay: "relay.ultrasound.money", + ReceivedAt: time.Now().UTC(), + } + } +} diff --git a/services/bidcollect/website/devserver.go b/services/bidcollect/website/devserver.go new file mode 100644 index 0000000..04520c5 --- /dev/null +++ b/services/bidcollect/website/devserver.go @@ -0,0 +1,145 @@ +// Package website contains the service delivering the website +package website + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + _ "net/http/pprof" + "time" + + "github.com/gorilla/mux" + "github.com/sirupsen/logrus" + "github.com/tdewolff/minify" + "github.com/tdewolff/minify/html" + uberatomic "go.uber.org/atomic" +) + +var ErrServerAlreadyStarted = errors.New("server was already started") + +type DevWebserverOpts struct { + ListenAddress string + Log *logrus.Entry +} + +type DevWebserver struct { + opts *DevWebserverOpts + log *logrus.Entry + + srv *http.Server + srvStarted uberatomic.Bool + minifier *minify.M +} + +func NewDevWebserver(opts *DevWebserverOpts) (server *DevWebserver, err error) { + minifier := minify.New() + minifier.AddFunc("text/css", html.Minify) + minifier.AddFunc("text/html", html.Minify) + minifier.AddFunc("application/javascript", html.Minify) + + server = &DevWebserver{ //nolint:exhaustruct + opts: opts, + log: opts.Log, + minifier: minifier, + } + + return server, nil +} + +func (srv *DevWebserver) StartServer() (err error) { + if srv.srvStarted.Swap(true) { + return ErrServerAlreadyStarted + } + + srv.srv = &http.Server{ //nolint:exhaustruct + Addr: srv.opts.ListenAddress, + Handler: srv.getRouter(), + + ReadTimeout: 600 * time.Millisecond, + ReadHeaderTimeout: 400 * time.Millisecond, + WriteTimeout: 3 * time.Second, + IdleTimeout: 3 * time.Second, + } + + err = srv.srv.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) { + return nil + } + return err +} + +func (srv *DevWebserver) getRouter() http.Handler { + r := mux.NewRouter() + r.PathPrefix("/static/").Handler(http.StripPrefix("/static/", http.FileServer(http.Dir("./website/static")))) + + r.HandleFunc("/", srv.handleRoot).Methods(http.MethodGet) + r.HandleFunc("/index.html", srv.handleRoot).Methods(http.MethodGet) + r.HandleFunc("/ethereum/mainnet/{month}/index.html", srv.handleMonth).Methods(http.MethodGet) + + return r +} + +func (srv *DevWebserver) RespondError(w http.ResponseWriter, code int, message string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + resp := HTTPErrorResp{code, message} + if err := json.NewEncoder(w).Encode(resp); err != nil { + srv.log.WithError(err).Error("Couldn't write error response") + http.Error(w, "", http.StatusInternalServerError) + } +} + +func (srv *DevWebserver) RespondOK(w http.ResponseWriter, response any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(response); err != nil { + srv.log.WithError(err).Error("Couldn't write OK response") + http.Error(w, "", http.StatusInternalServerError) + } +} + +func (srv *DevWebserver) handleRoot(w http.ResponseWriter, req *http.Request) { + tpl, err := ParseIndexTemplate() + if err != nil { + srv.log.WithError(err).Error("wroot: error parsing template") + return + } + w.WriteHeader(http.StatusOK) + + data := *DummyHTMLData + data.Path = "/" + err = tpl.ExecuteTemplate(w, "base", data) + if err != nil { + srv.log.WithError(err).Error("wroot: error executing template") + return + } +} + +func (srv *DevWebserver) handleMonth(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + + layout := "2006-01" + _, err := time.Parse(layout, vars["month"]) + if err != nil { + srv.RespondError(w, http.StatusBadRequest, "invalid date") + return + } + + tpl, err := ParseFilesTemplate() + if err != nil { + srv.log.WithError(err).Error("wroot: error parsing template") + return + } + w.WriteHeader(http.StatusOK) + + data := *DummyHTMLData + data.Title = vars["month"] + data.Path = fmt.Sprintf("ethereum/mainnet/%s/index.html", vars["month"]) + + err = tpl.ExecuteTemplate(w, "base", &data) + if err != nil { + srv.log.WithError(err).Error("wroot: error executing template") + return + } +} diff --git a/services/bidcollect/website/generator.go b/services/bidcollect/website/generator.go new file mode 100644 index 0000000..9418147 --- /dev/null +++ b/services/bidcollect/website/generator.go @@ -0,0 +1,211 @@ +package website + +// +// Quick and dirty website generator +// + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "path/filepath" + "regexp" + "strconv" + "strings" + + "github.com/sirupsen/logrus" + "github.com/tdewolff/minify" + "github.com/tdewolff/minify/css" + "github.com/tdewolff/minify/html" +) + +func BuildProdWebsite(log *logrus.Entry, outDir string, upload bool) { + log.Infof("Creating build server in %s", outDir) + err := os.MkdirAll(outDir, os.ModePerm) + if err != nil { + log.Fatal(err) + } + + dir := "ethereum/mainnet/" + + // Setup minifier + minifier := minify.New() + minifier.AddFunc("text/html", html.Minify) + minifier.AddFunc("text/css", css.Minify) + + // Load month folders from S3 + log.Infof("Getting folders from S3 for %s ...", dir) + months, err := getFoldersFromS3(dir) + if err != nil { + log.Fatal(err) + } + fmt.Println("Months:", months) + + // build root page + log.Infof("Building root page ...") + rootPageData := HTMLData{ //nolint:exhaustruct + Title: "", + Path: "/index.html", + EthMainnetMonths: months, + } + + tpl, err := ParseIndexTemplate() + if err != nil { + log.Fatal(err) + } + + buf := new(bytes.Buffer) + err = tpl.ExecuteTemplate(buf, "base", rootPageData) + if err != nil { + log.Fatal(err) + } + + // minify + mBytes, err := minifier.Bytes("text/html", buf.Bytes()) + if err != nil { + log.Fatal(err) + } + + // write to file + fn := filepath.Join(outDir, "index.html") + log.Infof("Writing to %s ...", fn) + err = os.WriteFile(fn, mBytes, 0o0600) + if err != nil { + log.Fatal(err) + } + + toUpload := []struct{ from, to string }{ + {fn, "/"}, + } + + // build files pages + for _, month := range months { + dir := "ethereum/mainnet/" + month + "/" + log.Infof("Getting files from S3 for %s ...", dir) + files, err := getFilesFromS3(dir) + if err != nil { + log.Fatal(err) + } + + rootPageData := HTMLData{ //nolint:exhaustruct + Title: month, + Path: fmt.Sprintf("ethereum/mainnet/%s/index.html", month), + + CurrentNetwork: "Ethereum Mainnet", + CurrentMonth: month, + Files: files, + } + + tpl, err := ParseFilesTemplate() + if err != nil { + log.Fatal(err) + } + + buf := new(bytes.Buffer) + err = tpl.ExecuteTemplate(buf, "base", rootPageData) + if err != nil { + log.Fatal(err) + } + + // minify + mBytes, err := minifier.Bytes("text/html", buf.Bytes()) + if err != nil { + log.Fatal(err) + } + + // write to file + _outDir := filepath.Join(outDir, dir) + err = os.MkdirAll(_outDir, os.ModePerm) + if err != nil { + log.Fatal(err) + } + + fn := filepath.Join(_outDir, "index.html") + log.Infof("Writing to %s ...", fn) + err = os.WriteFile(fn, mBytes, 0o0600) + if err != nil { + log.Fatal(err) + } + + toUpload = append(toUpload, struct{ from, to string }{fn, "/" + dir}) + } + + if upload { + log.Info("Uploading to S3 ...") + // for _, file := range toUpload { + // fmt.Printf("- %s -> %s\n", file.from, file.to) + // } + + for _, file := range toUpload { + app := "./scripts/bidcollect/s3/upload-file-to-r2.sh" + cmd := exec.Command(app, file.from, file.to) //nolint:gosec + stdout, err := cmd.Output() + if err != nil { + log.Fatal(err) + } + fmt.Println(string(stdout)) + } + } +} + +func getFoldersFromS3(dir string) ([]string, error) { + folders := []string{} + + app := "./scripts/bidcollect/s3/get-folders.sh" + cmd := exec.Command(app, dir) + stdout, err := cmd.Output() + if err != nil { + return folders, err + } + + // Print the output + lines := strings.Split(string(stdout), "\n") + for _, line := range lines { + if line != "" && strings.HasPrefix(line, "20") { + folders = append(folders, strings.TrimSuffix(line, "/")) + } + } + return folders, nil +} + +func getFilesFromS3(month string) ([]FileEntry, error) { + files := []FileEntry{} + + app := "./scripts/bidcollect/s3/get-files.sh" + cmd := exec.Command(app, month) + stdout, err := cmd.Output() + if err != nil { + return files, err + } + + space := regexp.MustCompile(`\s+`) + lines := strings.Split(string(stdout), "\n") + for _, line := range lines { + if line != "" { + line = space.ReplaceAllString(line, " ") + parts := strings.Split(line, " ") + + // parts[2] is the size + size, err := strconv.ParseUint(parts[2], 10, 64) + if err != nil { + return files, err + } + + filename := parts[3] + + if filename == "index.html" { + continue + } else if strings.HasSuffix(filename, ".csv.gz") { + continue + } + + files = append(files, FileEntry{ + Filename: filename, + Size: size, + Modified: parts[1] + " " + parts[0], + }) + } + } + return files, nil +} diff --git a/services/bidcollect/website/htmldata.go b/services/bidcollect/website/htmldata.go new file mode 100644 index 0000000..900fe4d --- /dev/null +++ b/services/bidcollect/website/htmldata.go @@ -0,0 +1,82 @@ +package website + +import ( + "text/template" + + "github.com/flashbots/relayscan/common" +) + +type HTMLData struct { + Title string + Path string + + // Root page + EthMainnetMonths []string + + // File-listing page + CurrentNetwork string + CurrentMonth string + Files []FileEntry +} + +type FileEntry struct { + Filename string + Size uint64 + Modified string +} + +func prettyInt(i uint64) string { + return printer.Sprintf("%d", i) +} + +func caseIt(s string) string { + return caser.String(s) +} + +func percent(cnt, total uint64) string { + p := float64(cnt) / float64(total) * 100 + return printer.Sprintf("%.2f", p) +} + +func substr10(s string) string { + return s[:10] +} + +var DummyHTMLData = &HTMLData{ + Title: "", + Path: "", + + EthMainnetMonths: []string{ + "2023-08", + "2023-09", + }, + + CurrentNetwork: "Ethereum Mainnet", + CurrentMonth: "2023-08", + Files: []FileEntry{ + {"2023-08-29_all.csv.zip", 97210118, "02:02:23 2023-09-02"}, + {"2023-08-29_top.csv.zip", 7210118, "02:02:23 2023-09-02"}, + + {"2023-08-30_all.csv.zip", 97210118, "02:02:23 2023-09-02"}, + {"2023-08-30_top.csv.zip", 7210118, "02:02:23 2023-09-02"}, + + {"2023-08-31_all.csv.zip", 97210118, "02:02:23 2023-09-02"}, + {"2023-08-31_top.csv.zip", 7210118, "02:02:23 2023-09-02"}, + }, +} + +var funcMap = template.FuncMap{ + "prettyInt": prettyInt, + "caseIt": caseIt, + "percent": percent, + "humanBytes": common.HumanBytes, + "substr10": substr10, +} + +func ParseIndexTemplate() (*template.Template, error) { + return template.New("index.html").Funcs(funcMap).ParseFiles("services/bidcollect/website/templates/index_root.html", "services/bidcollect/website/templates/base.html") +} + +func ParseFilesTemplate() (*template.Template, error) { + return template.New("index.html").Funcs(funcMap).ParseFiles("services/bidcollect/website/templates/index_files.html", "services/bidcollect/website/templates/base.html") +} diff --git a/services/bidcollect/website/static/favicon/android-chrome-192x192.png b/services/bidcollect/website/static/favicon/android-chrome-192x192.png new file mode 100644 index 0000000..d6475e4 Binary files /dev/null and b/services/bidcollect/website/static/favicon/android-chrome-192x192.png differ diff --git a/services/bidcollect/website/static/favicon/android-chrome-512x512.png b/services/bidcollect/website/static/favicon/android-chrome-512x512.png new file mode 100644 index 0000000..27067e2 Binary files /dev/null and b/services/bidcollect/website/static/favicon/android-chrome-512x512.png differ diff --git a/services/bidcollect/website/static/favicon/apple-touch-icon.png b/services/bidcollect/website/static/favicon/apple-touch-icon.png new file mode 100644 index 0000000..f74258d Binary files /dev/null and b/services/bidcollect/website/static/favicon/apple-touch-icon.png differ diff --git a/services/bidcollect/website/static/favicon/favicon-16x16.png b/services/bidcollect/website/static/favicon/favicon-16x16.png new file mode 100644 index 0000000..2449b62 Binary files /dev/null and b/services/bidcollect/website/static/favicon/favicon-16x16.png differ diff --git a/services/bidcollect/website/static/favicon/favicon-32x32.png b/services/bidcollect/website/static/favicon/favicon-32x32.png new file mode 100644 index 0000000..a25d217 Binary files /dev/null and b/services/bidcollect/website/static/favicon/favicon-32x32.png differ diff --git a/services/bidcollect/website/static/favicon/favicon.ico b/services/bidcollect/website/static/favicon/favicon.ico new file mode 100644 index 0000000..f0b14ae Binary files /dev/null and b/services/bidcollect/website/static/favicon/favicon.ico differ diff --git a/services/bidcollect/website/static/favicon/site.webmanifest b/services/bidcollect/website/static/favicon/site.webmanifest new file mode 100644 index 0000000..45dc8a2 --- /dev/null +++ b/services/bidcollect/website/static/favicon/site.webmanifest @@ -0,0 +1 @@ +{"name":"","short_name":"","icons":[{"src":"/android-chrome-192x192.png","sizes":"192x192","type":"image/png"},{"src":"/android-chrome-512x512.png","sizes":"512x512","type":"image/png"}],"theme_color":"#ffffff","background_color":"#ffffff","display":"standalone"} \ No newline at end of file diff --git a/services/bidcollect/website/static/styles.css b/services/bidcollect/website/static/styles.css new file mode 100644 index 0000000..e69de29 diff --git a/services/bidcollect/website/templates/base.html b/services/bidcollect/website/templates/base.html new file mode 100644 index 0000000..72d8554 --- /dev/null +++ b/services/bidcollect/website/templates/base.html @@ -0,0 +1,127 @@ +{{ define "base" }} + +{{ $title:="Relayscan Bidarchive 📚" }} + +{{ if ne .Title "" }} +{{ $title = (printf "%v | %v" .Title $title) }} +{{ end }} + + + + + + + + + + + + + + + + + + + {{ $title }} + + + + + + + + + + + + + + + + + + + + + + +
+ + + +

{{ $title }}

+

+ + docs · https://github.com/flashbots/relayscan +

+

Illuminate, Democratize, Distribute

+
+
+ {{ template "content" . }} +
+ + + +{{ end }} \ No newline at end of file diff --git a/services/bidcollect/website/templates/index_files.html b/services/bidcollect/website/templates/index_files.html new file mode 100644 index 0000000..54d8d3c --- /dev/null +++ b/services/bidcollect/website/templates/index_files.html @@ -0,0 +1,48 @@ +{{ define "content" }} +{{ $day:="" }} +{{ $class:="even" }} +{{ $change:="" }} + +
+
+{{ .CurrentNetwork }} +

{{ .CurrentMonth }}

+ + + + + + + + + {{ range .Files }} + {{ $dayTmp:=.Filename|substr10 }} + {{ if ne $day $dayTmp }} + {{ $change = "1" }} + {{ $day = $dayTmp }} + {{ if ne $class "even" }} + {{ $class = "even" }} + {{ else }} + {{ $class = "odd" }} + {{ end }} + {{ else }} + {{ $change = "" }} + {{ end }} + + + + + {{ end }} + +
../
+ {{ if eq $change "1" }}{{ end }} + + {{ .Filename }} + {{ .Size | humanBytes }}
+ +
+
+

+ The data is dedicated to the public domain under the CC-0 license. +

+{{ end }} \ No newline at end of file diff --git a/services/bidcollect/website/templates/index_root.html b/services/bidcollect/website/templates/index_root.html new file mode 100644 index 0000000..161a5b0 --- /dev/null +++ b/services/bidcollect/website/templates/index_root.html @@ -0,0 +1,19 @@ +{{ define "content" }} + +
+
+

Ethereum Mainnet

+
    + {{ range .EthMainnetMonths }} +
  • {{ . }}
  • + {{ end }} +
+ + +
+
+
+

+

The data is dedicated to the public domain under the CC-0 license.

+

+{{ end }} \ No newline at end of file diff --git a/services/bidcollect/website/utils.go b/services/bidcollect/website/utils.go new file mode 100644 index 0000000..caa69c8 --- /dev/null +++ b/services/bidcollect/website/utils.go @@ -0,0 +1,87 @@ +package website + +import ( + "fmt" + "net/http" + "time" + + "go.uber.org/zap" + "golang.org/x/text/cases" + "golang.org/x/text/language" + "golang.org/x/text/message" +) + +var ( + // Printer for pretty printing numbers + printer = message.NewPrinter(language.English) + + // Caser is used for casing strings + caser = cases.Title(language.English) +) + +type HTTPErrorResp struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// responseWriter is a minimal wrapper for http.ResponseWriter that allows the +// written HTTP status code to be captured for logging. +type responseWriter struct { + http.ResponseWriter + status int + wroteHeader bool +} + +func wrapResponseWriter(w http.ResponseWriter) *responseWriter { + return &responseWriter{ResponseWriter: w} //nolint:exhaustruct +} + +func (rw *responseWriter) Status() int { + return rw.status +} + +func (rw *responseWriter) WriteHeader(code int) { + if rw.wroteHeader { + return + } + + rw.status = code + rw.ResponseWriter.WriteHeader(code) + rw.wroteHeader = true +} + +// LoggingMiddlewareZap logs the incoming HTTP request & its duration. +func LoggingMiddlewareZap(logger *zap.Logger, next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle panics + defer func() { + if msg := recover(); msg != nil { + w.WriteHeader(http.StatusInternalServerError) + var method, url string + if r != nil { + method = r.Method + url = r.URL.EscapedPath() + } + logger.Error("HTTP request handler panicked", + zap.Any("error", msg), + zap.String("method", method), + zap.String("url", url), + ) + } + }() + + start := time.Now() + wrapped := wrapResponseWriter(w) + next.ServeHTTP(w, r) + + // Passing request stats both in-message (for the human reader) + // as well as inside the structured log (for the machine parser) + logger.Info(fmt.Sprintf("%s %s %d", r.Method, r.URL.EscapedPath(), wrapped.status), + zap.Int("durationMs", int(time.Since(start).Milliseconds())), + zap.Int("status", wrapped.status), + zap.String("logType", "access"), + zap.String("method", r.Method), + zap.String("path", r.URL.EscapedPath()), + ) + }) +} diff --git a/vars/relays.go b/vars/relays.go index db0404c..a66fd87 100644 --- a/vars/relays.go +++ b/vars/relays.go @@ -1,17 +1,21 @@ package vars -var RelayURLs = []string{ - "https://0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e3ad3b71d3499c54ad14d6c21b41a37ae@boost-relay.flashbots.net", - "https://0x8b5d2e73e2a3a55c6c87b8b6eb92e0149a125c852751db1422fa951e42a09b82c142c3ea98d0d9930b056a3bc9896b8f@bloxroute.max-profit.blxrbdn.com", - "https://0xb0b07cd0abef743db4260b0ed50619cf6ad4d82064cb4fbec9d3ec530f7c5e6793d9f286c4e082c0244ffb9f2658fe88@bloxroute.regulated.blxrbdn.com", - "https://0xb3ee7afcf27f1f1259ac1787876318c6584ee353097a50ed84f51a1f21a323b3736f271a895c7ce918c038e4265918be@relay.edennetwork.io", - "https://0x98650451ba02064f7b000f5768cf0cf4d4e492317d82871bdc87ef841a0743f69f0f1eea11168503240ac35d101c9135@mainnet-relay.securerpc.com", - "https://0xa1559ace749633b997cb3fdacffb890aeebdb0f5a3b6aaa7eeeaf1a38af0a8fe88b9e4b1f61f236d2e64d95733327a62@relay.ultrasound.money", - "https://0xa7ab7a996c8584251c8f925da3170bdfd6ebc75d50f5ddc4050a6fdc77f2a3b5fce2cc750d0865e05d7228af97d69561@agnostic-relay.net", - "https://0xa15b52576bcbf1072f4a011c0f99f9fb6c66f3e1ff321f11f461d15e31b1cb359caa092c71bbded0bae5b5ea401aab7e@aestus.live", - "https://0x8c7d33605ecef85403f8b7289c8058f440cbb6bf72b055dfe2f3e2c6695b6a1ea5a9cd0eb3a7982927a463feb4c3dae2@relay.wenmerge.com", - "https://0x95a0a6af2566fa7db732020bb2724be61963ac1eb760aa1046365eb443bd4e3cc0fba0265d40a2d81dd94366643e986a@blockspace.frontier.tech", - // "https://0xad0a8bb54565c2211cee576363f3a347089d2f07cf72679d16911d740262694cadb62d7fd7483f27afd714ca0f1b9118@bloxroute.ethical.blxrbdn.com", // deactivated aug 2023: https://twitter.com/bloXrouteLabs/status/1690065892778926080 - // "https://0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@builder-relay-mainnet.blocknative.com", // deactivated sept. 27, 2023: https://twitter.com/blocknative/status/1706685103286485364 - "https://0x8c4ed5e24fe5c6ae21018437bde147693f68cda427cd1122cf20819c30eda7ed74f72dece09bb313f2a1855595ab677d@titanrelay.xyz", // added 2024-02-22 -} +var ( + RelayFlashbots = "https://0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e3ad3b71d3499c54ad14d6c21b41a37ae@boost-relay.flashbots.net" + RelayUltrasound = "https://0xa1559ace749633b997cb3fdacffb890aeebdb0f5a3b6aaa7eeeaf1a38af0a8fe88b9e4b1f61f236d2e64d95733327a62@relay.ultrasound.money" + RelayURLs = []string{ + RelayFlashbots, + RelayUltrasound, + "https://0x8b5d2e73e2a3a55c6c87b8b6eb92e0149a125c852751db1422fa951e42a09b82c142c3ea98d0d9930b056a3bc9896b8f@bloxroute.max-profit.blxrbdn.com", + "https://0xb0b07cd0abef743db4260b0ed50619cf6ad4d82064cb4fbec9d3ec530f7c5e6793d9f286c4e082c0244ffb9f2658fe88@bloxroute.regulated.blxrbdn.com", + "https://0xb3ee7afcf27f1f1259ac1787876318c6584ee353097a50ed84f51a1f21a323b3736f271a895c7ce918c038e4265918be@relay.edennetwork.io", + "https://0x98650451ba02064f7b000f5768cf0cf4d4e492317d82871bdc87ef841a0743f69f0f1eea11168503240ac35d101c9135@mainnet-relay.securerpc.com", + "https://0xa7ab7a996c8584251c8f925da3170bdfd6ebc75d50f5ddc4050a6fdc77f2a3b5fce2cc750d0865e05d7228af97d69561@agnostic-relay.net", + "https://0xa15b52576bcbf1072f4a011c0f99f9fb6c66f3e1ff321f11f461d15e31b1cb359caa092c71bbded0bae5b5ea401aab7e@aestus.live", + "https://0x8c7d33605ecef85403f8b7289c8058f440cbb6bf72b055dfe2f3e2c6695b6a1ea5a9cd0eb3a7982927a463feb4c3dae2@relay.wenmerge.com", + // "https://0x95a0a6af2566fa7db732020bb2724be61963ac1eb760aa1046365eb443bd4e3cc0fba0265d40a2d81dd94366643e986a@blockspace.frontier.tech", // data API doesn't work anymore (as of June 1, 2024) + // "https://0xad0a8bb54565c2211cee576363f3a347089d2f07cf72679d16911d740262694cadb62d7fd7483f27afd714ca0f1b9118@bloxroute.ethical.blxrbdn.com", // deactivated aug 2023: https://twitter.com/bloXrouteLabs/status/1690065892778926080 + // "https://0x9000009807ed12c1f08bf4e81c6da3ba8e3fc3d953898ce0102433094e5f22f21102ec057841fcb81978ed1ea0fa8246@builder-relay-mainnet.blocknative.com", // deactivated sept. 27, 2023: https://twitter.com/blocknative/status/1706685103286485364 + "https://0x8c4ed5e24fe5c6ae21018437bde147693f68cda427cd1122cf20819c30eda7ed74f72dece09bb313f2a1855595ab677d@titanrelay.xyz", // added 2024-02-22 + } +)