Skip to content

Commit

Permalink
Bid archive collection (#37)
Browse files Browse the repository at this point in the history
start ultrasound bid stream

ultrasound stream start

cleanup

cleanup + start DataApiPoller

polling delay

cleanup

use relays

actally request data api

commonbid

cleanup

BidProcessor

cleanups

cleanup

more cleanup

outdir

--all-relays

cleanup

comment

outdir with date

cleanup

simplify a bit

data-api poller: better offset timing

bids-combine.sh

tsv file ending

cleanup

getheader polling

normalize csv timestamp_ms, remove csv timestamp because redundant

cleanup

getHeader: only call once

notes

getHeader silence common errors

csv/tsv

docs

upload script

docs update

start website

website foundation

more doc cleanup

devserver cleanup

minor notes

bids-script: delete

fix script perms

website

cleanup
  • Loading branch information
metachris committed Jun 13, 2024
1 parent 3938ead commit 36c98b1
Show file tree
Hide file tree
Showing 45 changed files with 2,358 additions and 33 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@
/static_dev/
/relayscan
/deploy*
/test.csv
/csv/
/build/
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,14 @@ cover-html:

docker-image:
DOCKER_BUILDKIT=1 docker build --platform linux/amd64 --build-arg VERSION=${VERSION} . -t relayscan


generate-ssz:
rm -f common/ultrasoundbid_encoding.go
sszgen --path common --objs UltrasoundStreamBid

bids-website:
go run . service bidcollect --build-website --build-website-upload

bids-website-dev:
go run . service bidcollect --devserver
114 changes: 114 additions & 0 deletions cmd/service/bidcollect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package service

/**
* https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md
*/

import (
"github.com/flashbots/relayscan/common"
"github.com/flashbots/relayscan/services/bidcollect"
"github.com/flashbots/relayscan/services/bidcollect/website"
"github.com/flashbots/relayscan/vars"
"github.com/spf13/cobra"
)

var (
collectUltrasoundStream bool
collectGetHeader bool
collectDataAPI bool
useAllRelays bool

outDir string
outputTSV bool // by default: CSV, but can be changed to TSV with this setting

runDevServerOnly bool // used to play with file listing website
devServerListenAddr = ":8095"

buildWebsite bool
buildWebsiteUpload bool
buildWebsiteOutDir string
)

func init() {
bidCollectCmd.Flags().BoolVar(&collectUltrasoundStream, "ultrasound-stream", false, "use ultrasound top-bid stream")
bidCollectCmd.Flags().BoolVar(&collectGetHeader, "get-header", false, "use getHeader API")
bidCollectCmd.Flags().BoolVar(&collectDataAPI, "data-api", false, "use data API")
bidCollectCmd.Flags().BoolVar(&useAllRelays, "all-relays", false, "use all relays")

// for getHeader
bidCollectCmd.Flags().StringVar(&beaconNodeURI, "beacon-uri", vars.DefaultBeaconURI, "beacon endpoint")

// for saving to file
bidCollectCmd.Flags().StringVar(&outDir, "out", "csv", "output directory for CSV/TSV")
bidCollectCmd.Flags().BoolVar(&outputTSV, "out-tsv", false, "output as TSV (instead of CSV)")

// for dev purposes
bidCollectCmd.Flags().BoolVar(&runDevServerOnly, "devserver", false, "only run devserver to play with file listing website")

// building the S3 website
bidCollectCmd.Flags().BoolVar(&buildWebsite, "build-website", false, "build file listing website")
bidCollectCmd.Flags().BoolVar(&buildWebsiteUpload, "build-website-upload", false, "upload after building")
bidCollectCmd.Flags().StringVar(&buildWebsiteOutDir, "build-website-out", "build", "output directory for website")
}

var bidCollectCmd = &cobra.Command{
Use: "bidcollect",
Short: "Collect bids",
Run: func(cmd *cobra.Command, args []string) {
if runDevServerOnly {
log.Infof("Bidcollect (%s) devserver starting on %s ...", vars.Version, devServerListenAddr)
fileListingDevServer()
return
}

if buildWebsite {
log.Infof("Bidcollect %s building website (output: %s) ...", vars.Version, buildWebsiteOutDir)
website.BuildProdWebsite(log, buildWebsiteOutDir, buildWebsiteUpload)
return
}

log.Infof("Bidcollect starting (%s) ...", vars.Version)

// Prepare relays
relays := []common.RelayEntry{
common.MustNewRelayEntry(vars.RelayFlashbots, false),
common.MustNewRelayEntry(vars.RelayUltrasound, false),
}
if useAllRelays {
relays = common.MustGetRelays()
}

log.Infof("Using %d relays", len(relays))
for index, relay := range relays {
log.Infof("- relay #%d: %s", index+1, relay.Hostname())
}

opts := bidcollect.BidCollectorOpts{
Log: log,
Relays: relays,
CollectUltrasoundStream: collectUltrasoundStream,
CollectGetHeader: collectGetHeader,
CollectDataAPI: collectDataAPI,
BeaconNodeURI: beaconNodeURI,
OutDir: outDir,
OutputTSV: outputTSV,
}

bidCollector := bidcollect.NewBidCollector(&opts)
bidCollector.MustStart()
},
}

func fileListingDevServer() {
webserver, err := website.NewDevWebserver(&website.DevWebserverOpts{ //nolint:exhaustruct
ListenAddress: devServerListenAddr,
Log: log,
})
if err != nil {
log.Fatal(err)
}
err = webserver.StartServer()
if err != nil {
log.Fatal(err)
}
}
2 changes: 1 addition & 1 deletion cmd/service/collect-live-bids.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func init() {

var liveBidsCmd = &cobra.Command{
Use: "collect-live-bids",
Short: "On every slot, ask for live bids",
Short: "On every slot, ask for live bids (using getHeader)",
Run: func(cmd *cobra.Command, args []string) {
// Connect to Postgres
db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN)
Expand Down
1 change: 1 addition & 0 deletions cmd/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ var ServiceCmd = &cobra.Command{
func init() {
ServiceCmd.AddCommand(websiteCmd)
ServiceCmd.AddCommand(liveBidsCmd)
ServiceCmd.AddCommand(bidCollectCmd)
}
21 changes: 21 additions & 0 deletions common/relayentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ func NewRelayEntry(relayURL string, requireUser bool) (entry RelayEntry, err err
return entry, err
}

func MustNewRelayEntry(relayURL string, requireUser bool) (entry RelayEntry) {
entry, err := NewRelayEntry(relayURL, requireUser)
Check(err)
return entry
}

// RelayEntriesToStrings returns the string representation of a list of relay entries
func RelayEntriesToStrings(relays []RelayEntry) []string {
ret := make([]string, len(relays))
Expand All @@ -61,6 +67,15 @@ func RelayEntriesToStrings(relays []RelayEntry) []string {
return ret
}

// RelayEntriesToHostnameStrings returns the hostnames of a list of relay entries
func RelayEntriesToHostnameStrings(relays []RelayEntry) []string {
ret := make([]string, len(relays))
for i, entry := range relays {
ret[i] = entry.Hostname()
}
return ret
}

func GetRelays() ([]RelayEntry, error) {
var err error
relays := make([]RelayEntry, len(vars.RelayURLs))
Expand All @@ -72,3 +87,9 @@ func GetRelays() ([]RelayEntry, error) {
}
return relays, nil
}

func MustGetRelays() []RelayEntry {
relays, err := GetRelays()
Check(err)
return relays
}
8 changes: 7 additions & 1 deletion common/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ func SendHTTPRequest(ctx context.Context, client http.Client, method, url string
return resp.StatusCode, fmt.Errorf("%w: %d / %s", errHTTPErrorResponse, resp.StatusCode, string(bodyBytes))
}

if dst != nil {
if dst == nil {
// still read the body to reuse http connection (see also https://stackoverflow.com/a/17953506)
_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
return resp.StatusCode, fmt.Errorf("could not read response body: %w", err)
}
} else {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return resp.StatusCode, fmt.Errorf("could not read response body: %w", err)
Expand Down
28 changes: 28 additions & 0 deletions common/ultrasoundbid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package common

import "math/big"

// https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md

type (
U64 [8]byte
Hash [32]byte
PublicKey [48]byte
Address [20]byte
U256 [32]byte
)

func (n *U256) String() string {
return new(big.Int).SetBytes(ReverseBytes(n[:])).String()
}

type UltrasoundStreamBid struct {
Timestamp uint64 `json:"timestamp"`
Slot uint64 `json:"slot"`
BlockNumber uint64 `json:"block_number"`
BlockHash Hash `json:"block_hash" ssz-size:"32"`
ParentHash Hash `json:"parent_hash" ssz-size:"32"`
BuilderPubkey PublicKey `json:"builder_pubkey" ssz-size:"48"`
FeeRecipient Address `json:"fee_recipient" ssz-size:"20"`
Value U256 `json:"value" ssz-size:"32"`
}
124 changes: 124 additions & 0 deletions common/ultrasoundbid_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package common

import (
ssz "github.com/ferranbt/fastssz"
)

// MarshalSSZ ssz marshals the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) MarshalSSZ() ([]byte, error) {
return ssz.MarshalSSZ(u)
}

// MarshalSSZTo ssz marshals the UltrasoundStreamBid object to a target array
func (u *UltrasoundStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) {
dst = buf

// Field (0) 'Timestamp'
dst = ssz.MarshalUint64(dst, u.Timestamp)

// Field (1) 'Slot'
dst = ssz.MarshalUint64(dst, u.Slot)

// Field (2) 'BlockNumber'
dst = ssz.MarshalUint64(dst, u.BlockNumber)

// Field (3) 'BlockHash'
dst = append(dst, u.BlockHash[:]...)

// Field (4) 'ParentHash'
dst = append(dst, u.ParentHash[:]...)

// Field (5) 'BuilderPubkey'
dst = append(dst, u.BuilderPubkey[:]...)

// Field (6) 'FeeRecipient'
dst = append(dst, u.FeeRecipient[:]...)

// Field (7) 'Value'
dst = append(dst, u.Value[:]...)

return
}

// UnmarshalSSZ ssz unmarshals the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) UnmarshalSSZ(buf []byte) error {
var err error
size := uint64(len(buf))
if size != 188 {
return ssz.ErrSize
}

// Field (0) 'Timestamp'
u.Timestamp = ssz.UnmarshallUint64(buf[0:8])

// Field (1) 'Slot'
u.Slot = ssz.UnmarshallUint64(buf[8:16])

// Field (2) 'BlockNumber'
u.BlockNumber = ssz.UnmarshallUint64(buf[16:24])

// Field (3) 'BlockHash'
copy(u.BlockHash[:], buf[24:56])

// Field (4) 'ParentHash'
copy(u.ParentHash[:], buf[56:88])

// Field (5) 'BuilderPubkey'
copy(u.BuilderPubkey[:], buf[88:136])

// Field (6) 'FeeRecipient'
copy(u.FeeRecipient[:], buf[136:156])

// Field (7) 'Value'
copy(u.Value[:], buf[156:188])

return err
}

// SizeSSZ returns the ssz encoded size in bytes for the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) SizeSSZ() (size int) {
size = 188
return
}

// HashTreeRoot ssz hashes the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) HashTreeRoot() ([32]byte, error) {
return ssz.HashWithDefaultHasher(u)
}

// HashTreeRootWith ssz hashes the UltrasoundStreamBid object with a hasher
func (u *UltrasoundStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) {
indx := hh.Index()

// Field (0) 'Timestamp'
hh.PutUint64(u.Timestamp)

// Field (1) 'Slot'
hh.PutUint64(u.Slot)

// Field (2) 'BlockNumber'
hh.PutUint64(u.BlockNumber)

// Field (3) 'BlockHash'
hh.PutBytes(u.BlockHash[:])

// Field (4) 'ParentHash'
hh.PutBytes(u.ParentHash[:])

// Field (5) 'BuilderPubkey'
hh.PutBytes(u.BuilderPubkey[:])

// Field (6) 'FeeRecipient'
hh.PutBytes(u.FeeRecipient[:])

// Field (7) 'Value'
hh.PutBytes(u.Value[:])

hh.Merkleize(indx)
return
}

// GetTree ssz hashes the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) GetTree() (*ssz.Node, error) {
return ssz.ProofTree(u)
}
30 changes: 30 additions & 0 deletions common/ultrasoundbid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package common

import (
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/require"
)

func TestValueDecoding(t *testing.T) {
expected := "55539751698389157"
hex := "0xa558e5221c51c500000000000000000000000000000000000000000000000000"
hexBytes := hexutil.MustDecode(hex)
value := new(big.Int).SetBytes(ReverseBytes(hexBytes[:])).String()
require.Equal(t, expected, value)
}

func TestUltrasoundBidSSZDecoding(t *testing.T) {
hex := "0x704b87ce8f010000a94b8c0000000000b6043101000000002c02b28fd8fdb45fd6ac43dd04adad1449a35b64247b1ed23a723a1fcf6cac074d0668c9e0912134628c32a54854b952234ebb6c1fdd6b053566ac2d2a09498da03b00ddb78b2c111450a5417a8c368c40f1f140cdf97d95b7fa9565467e0bbbe27877d08e01c69b4e5b02b144e6a265df99a0839818b3f120ebac9b73f82b617dc6a5556c71794b1a9c5400000000000000000000000000000000000000000000000000"
bytes := hexutil.MustDecode(hex)
bid := new(UltrasoundStreamBid)
err := bid.UnmarshalSSZ(bytes)
require.NoError(t, err)

require.Equal(t, uint64(1717156924272), bid.Timestamp)
require.Equal(t, uint64(9194409), bid.Slot)
require.Equal(t, uint64(19989686), bid.BlockNumber)
require.Equal(t, "0x2c02b28fd8fdb45fd6ac43dd04adad1449a35b64247b1ed23a723a1fcf6cac07", hexutil.Encode(bid.BlockHash[:]))
}
Loading

0 comments on commit 36c98b1

Please sign in to comment.