Skip to content

Commit

Permalink
feat: retrieve remote payload
Browse files Browse the repository at this point in the history
Makes quite a few changes which were hard to make separately.
1. When fetching a payload, check the local, __and__ the remote auction. We choose not to support n remote auctions to to keep assumptions minimal. Use an auth token with the remote call.
2. When retrieving the payload from redis as a fallback we truly only check redis as a fallback. We no longer try to check memcached and postgres.
3. The tangle of paths for different failures during payload fetching could now also be simplified.
  • Loading branch information
alextes committed Sep 12, 2024
1 parent f219233 commit 2b7397a
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 116 deletions.
6 changes: 5 additions & 1 deletion cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ var (
apiDefaultDataAPIEnabled = os.Getenv("DISABLE_DATA_API") != "1"
apiDefaultProposerAPIEnabled = os.Getenv("DISABLE_PROPOSER_API") != "1"

localAuctionHost = os.Getenv("LOCAL_AUCTION_HOST")
remoteAuctionHost = os.Getenv("REMOTE_AUCTION_HOST")
auctionAuthToken = os.Getenv("AUCTION_AUTH_TOKEN")

apiListenAddr string
apiPprofEnabled bool
apiSecretKey string
Expand Down Expand Up @@ -142,7 +146,7 @@ var apiCmd = &cobra.Command{
}

log.Info("Setting up datastore...")
ds, err := datastore.NewDatastore(redis, mem, db)
ds, err := datastore.NewDatastore(redis, mem, db, localAuctionHost, remoteAuctionHost, auctionAuthToken)
if err != nil {
log.WithError(err).Fatalf("Failed setting up prod datastore")
}
Expand Down
66 changes: 49 additions & 17 deletions datastore/auction_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net/http"
"net/url"
"time"

builderApi "github.com/attestantio/go-builder-client/api"
builderApiDeneb "github.com/attestantio/go-builder-client/api/deneb"
Expand All @@ -15,38 +16,45 @@ import (
"github.com/pkg/errors"
)

const API_ROOT = "http://turbo-auction-api"

var ErrFailedToParsePayload = errors.New("failed to parse payload")

func GetPayloadContents(slot uint64, proposerPubkey, blockHash string) (*builderApi.VersionedSubmitBlindedBlockResponse, error) {
func getPayloadContents(slot uint64, proposerPubkey, blockHash, host, basePath, authToken string, timeout time.Duration) (*builderApi.VersionedSubmitBlindedBlockResponse, error) {
client := &http.Client{Timeout: timeout}

queryParams := url.Values{}
queryParams.Add("slot", fmt.Sprintf("%d", slot))
queryParams.Add("proposer_pubkey", proposerPubkey)
queryParams.Add("block_hash", blockHash)

fullUrl := fmt.Sprintf("%s/internal/payload_contents?%s", API_ROOT, queryParams.Encode())
fullURL := fmt.Sprintf("%s/%s/payload_contents?%s", host, basePath, queryParams.Encode())
req, err := http.NewRequest("GET", fullURL, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create payload contents request")
}

var err error
// Add auth token if provided
if authToken != "" {
req.Header.Add("x-auth-token", authToken)
}

resp, err := http.Get(fullUrl)
resp, err := client.Do(req)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to fetch payload contents")
}
defer resp.Body.Close()

if resp.StatusCode == 404 {
if resp.StatusCode == http.StatusNotFound {
return nil, ErrExecutionPayloadNotFound
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to read payload contents response body")
}

// Try to parse deneb contents
denebPayloadContents := new(builderApiDeneb.ExecutionPayloadAndBlobsBundle)
err = denebPayloadContents.UnmarshalSSZ([]byte(body))
err = denebPayloadContents.UnmarshalSSZ(body)

if err == nil {
return &builderApi.VersionedSubmitBlindedBlockResponse{
Expand All @@ -57,7 +65,7 @@ func GetPayloadContents(slot uint64, proposerPubkey, blockHash string) (*builder

// Try to parse capella payload
capellaPayload := new(capella.ExecutionPayload)
err = capellaPayload.UnmarshalSSZ([]byte(body))
err = capellaPayload.UnmarshalSSZ(body)

if err == nil {
return &builderApi.VersionedSubmitBlindedBlockResponse{
Expand All @@ -69,29 +77,45 @@ func GetPayloadContents(slot uint64, proposerPubkey, blockHash string) (*builder
return nil, ErrFailedToParsePayload
}

func GetBidTrace(slot uint64, proposerPubkey, blockHash string) (*common.BidTraceV2, error) {
func (ds *Datastore) LocalPayloadContents(slot uint64, proposerPubkey, blockHash string) (*builderApi.VersionedSubmitBlindedBlockResponse, error) {
return getPayloadContents(slot, proposerPubkey, blockHash, ds.localAuctionHost, "internal", "", 0)
}

func (ds *Datastore) RemotePayloadContents(slot uint64, proposerPubkey, blockHash string) (*builderApi.VersionedSubmitBlindedBlockResponse, error) {
return getPayloadContents(slot, proposerPubkey, blockHash, ds.remoteAuctionHost, "private", ds.auctionAuthToken, 1*time.Second)
}

func getBidTrace(slot uint64, proposerPubkey, blockHash, auctionHost, basePath, authToken string) (*common.BidTraceV2, error) {
client := &http.Client{}

queryParams := url.Values{}
queryParams.Add("slot", fmt.Sprintf("%d", slot))
queryParams.Add("proposer_pubkey", proposerPubkey)
queryParams.Add("block_hash", blockHash)

fullUrl := fmt.Sprintf("%s/internal/bid_trace?%s", API_ROOT, queryParams.Encode())
fullURL := fmt.Sprintf("%s/%s/bid_trace?%s", auctionHost, basePath, queryParams.Encode())
req, err := http.NewRequest("GET", fullURL, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create bid trace request")
}

var err error
if authToken != "" {
req.Header.Add("x-auth-token", authToken)
}

resp, err := http.Get(fullUrl)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode == 404 {
if resp.StatusCode == http.StatusNotFound {
return nil, ErrBidTraceNotFound
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to read bit trace response body")
}

bidtrace := new(common.BidTraceV2)
Expand All @@ -102,3 +126,11 @@ func GetBidTrace(slot uint64, proposerPubkey, blockHash string) (*common.BidTrac

return bidtrace, nil
}

func (ds *Datastore) LocalBidTrace(slot uint64, proposerPubkey, blockHash string) (*common.BidTraceV2, error) {
return getBidTrace(slot, proposerPubkey, blockHash, ds.localAuctionHost, "internal", "")
}

func (ds *Datastore) RemoteBidTrace(slot uint64, proposerPubkey, blockHash string) (*common.BidTraceV2, error) {
return getBidTrace(slot, proposerPubkey, blockHash, ds.remoteAuctionHost, "private", ds.auctionAuthToken)
}
67 changes: 32 additions & 35 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
package datastore

import (
"database/sql"
"fmt"
"log"
"strings"
"sync"
"time"

builderApi "github.com/attestantio/go-builder-client/api"
builderApiV1 "github.com/attestantio/go-builder-client/api/v1"
"github.com/bradfitz/gomemcache/memcache"
"github.com/flashbots/mev-boost-relay/beaconclient"
"github.com/flashbots/mev-boost-relay/common"
"github.com/flashbots/mev-boost-relay/database"
Expand Down Expand Up @@ -49,15 +48,33 @@ type Datastore struct {

// Used for proposer-API readiness check
KnownValidatorsWasUpdated uberatomic.Bool

// Where we can find payloads for our local auction
// Should be protocol + hostname, e.g. http://turbo-auction-api, https://relay-builders-us.ultrasound.money
localAuctionHost string
remoteAuctionHost string
// Token used to remotely authenticate to auction API.
auctionAuthToken string
}

func NewDatastore(redisCache *RedisCache, memcached *Memcached, db database.IDatabaseService) (ds *Datastore, err error) {
func NewDatastore(redisCache *RedisCache, memcached *Memcached, db database.IDatabaseService, localAuctionHost, remoteAuctionHost, auctionAuthToken string) (ds *Datastore, err error) {
ds = &Datastore{
db: db,
memcached: memcached,
redis: redisCache,
knownValidatorsByPubkey: make(map[common.PubkeyHex]uint64),
knownValidatorsByIndex: make(map[uint64]common.PubkeyHex),
localAuctionHost: localAuctionHost,
remoteAuctionHost: remoteAuctionHost,
auctionAuthToken: auctionAuthToken,
}

if localAuctionHost == "" {
log.Fatal("LOCAL_AUCTION_HOST is not set")
}

if remoteAuctionHost == "" {
log.Fatal("REMOTE_AUCTION_HOST is not set")
}

return ds, err
Expand Down Expand Up @@ -191,47 +208,27 @@ func (ds *Datastore) SaveValidatorRegistration(entry builderApiV1.SignedValidato
return nil
}

// GetGetPayloadResponse returns the getPayload response from memory or Redis or Database
func (ds *Datastore) GetGetPayloadResponse(log *logrus.Entry, slot uint64, proposerPubkey, blockHash string) (*builderApi.VersionedSubmitBlindedBlockResponse, error) {
log = log.WithField("datastoreMethod", "GetGetPayloadResponse")
// RedisPayload returns the getPayload response from Redis
func (ds *Datastore) RedisPayload(log *logrus.Entry, slot uint64, proposerPubkey, blockHash string) (*builderApi.VersionedSubmitBlindedBlockResponse, error) {
log = log.WithField("datastoreMethod", "RedisPayload")
_proposerPubkey := strings.ToLower(proposerPubkey)
_blockHash := strings.ToLower(blockHash)

// 1. try to get from Redis
// try to get from Redis
resp, err := ds.redis.GetPayloadContents(slot, _proposerPubkey, _blockHash)

// redis.Nil is a common error when the key is not found
// this may happen if we're asked for a payload we don't have.
if errors.Is(err, redis.Nil) {
log.WithError(err).Warn("execution payload not found in redis")
} else if err != nil {
log.WithError(err).Error("error getting execution payload from redis")
} else {
log.Debug("getPayload response from redis")
return resp, nil
}

// 2. try to get from Memcached
if ds.memcached != nil {
resp, err = ds.memcached.GetExecutionPayload(slot, _proposerPubkey, _blockHash)
if errors.Is(err, memcache.ErrCacheMiss) {
log.WithError(err).Warn("execution payload not found in memcached")
} else if err != nil {
log.WithError(err).Error("error getting execution payload from memcached")
} else if resp != nil {
log.Debug("getPayload response from memcached")
return resp, nil
}
return nil, ErrExecutionPayloadNotFound
}

// 3. try to get from database (should not happen, it's just a backup)
executionPayloadEntry, err := ds.db.GetExecutionPayloadEntryBySlotPkHash(slot, proposerPubkey, blockHash)
if errors.Is(err, sql.ErrNoRows) {
log.WithError(err).Warn("execution payload not found in database")
return nil, ErrExecutionPayloadNotFound
} else if err != nil {
log.WithError(err).Error("error getting execution payload from database")
if err != nil {
log.WithError(err).Error("error getting execution payload from redis")
return nil, err
}

// Got it from database, now deserialize execution payload and compile full response
log.Warn("getPayload response from database, primary storage failed")
return database.ExecutionPayloadEntryToExecutionPayload(executionPayloadEntry)
log.Debug("getPayload response from redis")
return resp, nil
}
6 changes: 3 additions & 3 deletions datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ func setupTestDatastore(t *testing.T, mockDB *database.MockDB) *Datastore {
redisDs, err := NewRedisCache("", redisTestServer.Addr(), "", "")
require.NoError(t, err)

ds, err := NewDatastore(redisDs, nil, mockDB)
ds, err := NewDatastore(redisDs, nil, mockDB, "", "")
require.NoError(t, err)

return ds
}

func TestGetPayloadFailure(t *testing.T) {
ds := setupTestDatastore(t, &database.MockDB{})
_, err := ds.GetGetPayloadResponse(common.TestLog, 1, "a", "b")
_, err := ds.RedisPayload(common.TestLog, 1, "a", "b")
require.Error(t, ErrExecutionPayloadNotFound, err)
}

Expand Down Expand Up @@ -65,7 +65,7 @@ func TestGetPayloadDatabaseFallback(t *testing.T) {
},
}
ds := setupTestDatastore(t, mockDB)
payload, err := ds.GetGetPayloadResponse(common.TestLog, 1, "a", "b")
payload, err := ds.RedisPayload(common.TestLog, 1, "a", "b")
require.NoError(t, err)
blockHash, err := payload.BlockHash()
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 2b7397a

Please sign in to comment.