From 547257b35c3867d0d3d5f0ef4e652bd2cc02e75b Mon Sep 17 00:00:00 2001 From: hopeyen Date: Mon, 7 Oct 2024 16:53:45 -0700 Subject: [PATCH 01/15] feat: meterer helper types and structs --- core/assignment.go | 4 +- core/data.go | 2 +- core/meterer/meterer.go | 64 ++++++ core/meterer/offchain_store.go | 293 ++++++++++++++++++++++++++++ core/meterer/offchain_store_test.go | 248 +++++++++++++++++++++++ core/meterer/onchain_state.go | 115 +++++++++++ core/meterer/onchain_state_test.go | 117 +++++++++++ core/meterer/types.go | 192 ++++++++++++++++++ core/meterer/types_test.go | 186 ++++++++++++++++++ core/meterer/util.go | 164 ++++++++++++++++ 10 files changed, 1382 insertions(+), 3 deletions(-) create mode 100644 core/meterer/meterer.go create mode 100644 core/meterer/offchain_store.go create mode 100644 core/meterer/offchain_store_test.go create mode 100644 core/meterer/onchain_state.go create mode 100644 core/meterer/onchain_state_test.go create mode 100644 core/meterer/types.go create mode 100644 core/meterer/types_test.go create mode 100644 core/meterer/util.go diff --git a/core/assignment.go b/core/assignment.go index ecabb18771..465c1a2cfa 100644 --- a/core/assignment.go +++ b/core/assignment.go @@ -206,7 +206,7 @@ func (c *StdAssignmentCoordinator) ValidateChunkLength(state *OperatorState, blo denom := new(big.Int).Mul(big.NewInt(int64(info.ConfirmationThreshold-info.AdversaryThreshold)), totalStake) maxChunkLength := uint(roundUpDivideBig(num, denom).Uint64()) - maxChunkLength2 := roundUpDivide(2*blobLength*percentMultiplier, MaxRequiredNumChunks*uint(info.ConfirmationThreshold-info.AdversaryThreshold)) + maxChunkLength2 := RoundUpDivide(2*blobLength*percentMultiplier, MaxRequiredNumChunks*uint(info.ConfirmationThreshold-info.AdversaryThreshold)) if maxChunkLength < maxChunkLength2 { maxChunkLength = maxChunkLength2 @@ -271,7 +271,7 @@ func roundUpDivideBig(a, b *big.Int) *big.Int { } -func roundUpDivide(a, b uint) uint { +func RoundUpDivide(a, b uint) uint { return (a + b - 1) / b } diff --git a/core/data.go b/core/data.go index 45f9e750f8..879e2b4eb2 100644 --- a/core/data.go +++ b/core/data.go @@ -291,7 +291,7 @@ func (b *BlobHeader) EncodedSizeAllQuorums() int64 { size := int64(0) for _, quorum := range b.QuorumInfos { - size += int64(roundUpDivide(b.Length*percentMultiplier*encoding.BYTES_PER_SYMBOL, uint(quorum.ConfirmationThreshold-quorum.AdversaryThreshold))) + size += int64(RoundUpDivide(b.Length*percentMultiplier*encoding.BYTES_PER_SYMBOL, uint(quorum.ConfirmationThreshold-quorum.AdversaryThreshold))) } return size } diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go new file mode 100644 index 0000000000..7cb866eacb --- /dev/null +++ b/core/meterer/meterer.go @@ -0,0 +1,64 @@ +package meterer + +import ( + "time" + + "github.com/Layr-Labs/eigensdk-go/logging" +) + +type TimeoutConfig struct { + ChainReadTimeout time.Duration + ChainWriteTimeout time.Duration + ChainStateTimeout time.Duration + TxnBroadcastTimeout time.Duration +} + +// network parameters (this should be published on-chain and read through contracts) +type Config struct { + GlobalBytesPerSecond uint64 // 2^64 bytes ~= 18 exabytes per second; if we use uint32, that's ~4GB/s + PricePerChargeable uint32 // 2^64 gwei ~= 18M Eth; uint32 => ~4ETH + MinChargeableSize uint32 + ReservationWindow uint32 +} + +// disperser API server will receive requests from clients. these requests will be with a blobHeader with payments information (CumulativePayments, BinIndex, and Signature) +// Disperser will pass the blob header to the meterer, which will check if the payments information is valid. if it is, it will be added to the meterer's state. +// To check if the payment is valid, the meterer will: +// 1. check if the signature is valid +// (against the CumulativePayments and BinIndex fields ; +// maybe need something else to secure against using this appraoch for reservations when rev request comes in same bin interval; say that nonce is signed over as well) +// 2. For reservations, check offchain bin state as demonstrated in pseudocode, also check onchain state before rejecting (since onchain data is pulled) +// 3. For on-demand, check against payments and the global rates, similar to the reservation case +// +// If the payment is valid, the meterer will add the blob header to its state and return a success response to the disperser API server. +// if any of the checks fail, the meterer will return a failure response to the disperser API server. +var OnDemandQuorumNumbers = []uint8{0, 1} + +type Meterer struct { + Config + TimeoutConfig + + ChainState *OnchainPaymentState + OffchainStore *OffchainStore + + logger logging.Logger +} + +func NewMeterer( + config Config, + timeoutConfig TimeoutConfig, + paymentChainState *OnchainPaymentState, + offchainStore *OffchainStore, + logger logging.Logger, +) (*Meterer, error) { + // TODO: create a separate thread to pull from the chain and update chain state + return &Meterer{ + Config: config, + TimeoutConfig: timeoutConfig, + + ChainState: paymentChainState, + OffchainStore: offchainStore, + + logger: logger.With("component", "Meterer"), + }, nil +} diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go new file mode 100644 index 0000000000..d494a84ab2 --- /dev/null +++ b/core/meterer/offchain_store.go @@ -0,0 +1,293 @@ +package meterer + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + + commonaws "github.com/Layr-Labs/eigenda/common/aws" + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +type OffchainStore struct { + dynamoClient *commondynamodb.Client + reservationTableName string + onDemandTableName string + globalBinTableName string + logger logging.Logger + // TODO: add maximum storage for both tables +} + +func NewOffchainStore( + cfg commonaws.ClientConfig, + reservationTableName string, + onDemandTableName string, + globalBinTableName string, + logger logging.Logger, +) (*OffchainStore, error) { + + dynamoClient, err := commondynamodb.NewClient(cfg, logger) + if err != nil { + return nil, err + } + if reservationTableName == "" || onDemandTableName == "" || globalBinTableName == "" { + return nil, fmt.Errorf("table names cannot be empty") + } + + err = CreateReservationTable(cfg, reservationTableName) + if err != nil && !strings.Contains(err.Error(), "Table already exists") { + fmt.Println("Error creating reservation table:", err) + return nil, err + } + err = CreateGlobalReservationTable(cfg, globalBinTableName) + if err != nil && !strings.Contains(err.Error(), "Table already exists") { + fmt.Println("Error creating global bin table:", err) + return nil, err + } + err = CreateOnDemandTable(cfg, onDemandTableName) + if err != nil && !strings.Contains(err.Error(), "Table already exists") { + fmt.Println("Error creating on-demand table:", err) + return nil, err + } + //TODO: add a separate thread to periodically clean up the tables + // delete expired reservation bins ( 0 { + prevPayment, err = strconv.ParseUint(smallerResult[0]["CumulativePayments"].(*types.AttributeValueMemberN).Value, 10, 64) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to parse previous payment: %w", err) + } + } + + // Fetch the smallest entry larger than the given cumulativePayment + largerResult, err := s.dynamoClient.QueryIndexOrderWithLimit(ctx, s.onDemandTableName, "AccountIDIndex", + "AccountID = :account AND CumulativePayments > :cumulativePayment", + commondynamodb.ExpresseionValues{ + ":account": &types.AttributeValueMemberS{Value: accountID}, + ":cumulativePayment": &types.AttributeValueMemberN{Value: strconv.FormatUint(cumulativePayment, 10)}, + }, + true, // Retrieve results in ascending order for the smallest greater amount + 1, + ) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to query the next payment for account: %w", err) + } + var nextPayment uint64 + var nextDataLength uint32 + if len(largerResult) > 0 { + nextPayment, err = strconv.ParseUint(largerResult[0]["CumulativePayments"].(*types.AttributeValueMemberN).Value, 10, 64) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to parse next payment: %w", err) + } + dataLength, err := strconv.ParseUint(largerResult[0]["DataLength"].(*types.AttributeValueMemberN).Value, 10, 32) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to parse blob size: %w", err) + } + nextDataLength = uint32(dataLength) + } + + return prevPayment, nextPayment, nextDataLength, nil +} diff --git a/core/meterer/offchain_store_test.go b/core/meterer/offchain_store_test.go new file mode 100644 index 0000000000..6978a3ca80 --- /dev/null +++ b/core/meterer/offchain_store_test.go @@ -0,0 +1,248 @@ +package meterer_test + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core/meterer" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/stretchr/testify/assert" +) + +func TestReservationBinsBasicOperations(t *testing.T) { + tableName := "reservations" + meterer.CreateReservationTable(clientConfig, tableName) + indexName := "AccountIDIndex" + + ctx := context.Background() + err := dynamoClient.PutItem(ctx, tableName, + commondynamodb.Item{ + "AccountID": &types.AttributeValueMemberS{Value: "account1"}, + "BinIndex": &types.AttributeValueMemberN{Value: "1"}, + "BinUsage": &types.AttributeValueMemberN{Value: "1000"}, + "UpdatedAt": &types.AttributeValueMemberS{Value: time.Now().Format(time.RFC3339)}, + }, + ) + assert.NoError(t, err) + + item, err := dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + "AccountID": &types.AttributeValueMemberS{Value: "account1"}, + "BinIndex": &types.AttributeValueMemberN{Value: "1"}, + }) + assert.NoError(t, err) + + assert.Equal(t, "account1", item["AccountID"].(*types.AttributeValueMemberS).Value) + assert.Equal(t, "1", item["BinIndex"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "1000", item["BinUsage"].(*types.AttributeValueMemberN).Value) + + items, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpresseionValues{ + ":account": &types.AttributeValueMemberS{Value: "account1"}, + }) + assert.NoError(t, err) + assert.Len(t, items, 1) + + item, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + "AccountID": &types.AttributeValueMemberS{Value: "account2"}, + }) + assert.Error(t, err) + + _, err = dynamoClient.UpdateItem(ctx, tableName, commondynamodb.Key{ + "AccountID": &types.AttributeValueMemberS{Value: "account1"}, + "BinIndex": &types.AttributeValueMemberN{Value: "1"}, + }, commondynamodb.Item{ + "BinUsage": &types.AttributeValueMemberN{Value: "2000"}, + }) + assert.NoError(t, err) + err = dynamoClient.PutItem(ctx, tableName, + commondynamodb.Item{ + "AccountID": &types.AttributeValueMemberS{Value: "account2"}, + "BinIndex": &types.AttributeValueMemberN{Value: "1"}, + "BinUsage": &types.AttributeValueMemberN{Value: "3000"}, + "UpdatedAt": &types.AttributeValueMemberS{Value: time.Now().Format(time.RFC3339)}, + }, + ) + assert.NoError(t, err) + + item, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + "AccountID": &types.AttributeValueMemberS{Value: "account1"}, + "BinIndex": &types.AttributeValueMemberN{Value: "1"}, + }) + assert.NoError(t, err) + assert.Equal(t, "2000", item["BinUsage"].(*types.AttributeValueMemberN).Value) + + items, err = dynamoClient.Query(ctx, tableName, "AccountID = :account", commondynamodb.ExpresseionValues{ + ":account": &types.AttributeValueMemberS{Value: "account1"}, + }) + assert.NoError(t, err) + assert.Len(t, items, 1) + assert.Equal(t, "2000", items[0]["BinUsage"].(*types.AttributeValueMemberN).Value) + + item, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + "AccountID": &types.AttributeValueMemberS{Value: "account2"}, + "BinIndex": &types.AttributeValueMemberN{Value: "1"}, + }) + assert.NoError(t, err) + assert.Equal(t, "3000", item["BinUsage"].(*types.AttributeValueMemberN).Value) + + err = dynamoClient.DeleteTable(ctx, tableName) + assert.NoError(t, err) +} + +func TestGlobalBinsBasicOperations(t *testing.T) { + tableName := "global" + meterer.CreateGlobalReservationTable(clientConfig, tableName) + indexName := "BinIndexIndex" + + ctx := context.Background() + numItems := 30 + items := make([]commondynamodb.Item, numItems) + for i := 0; i < numItems; i += 1 { + items[i] = commondynamodb.Item{ + "BinIndex": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)}, + "BinUsage": &types.AttributeValueMemberN{Value: "1000"}, + "UpdatedAt": &types.AttributeValueMemberS{Value: time.Now().Format(time.RFC3339)}, + } + } + unprocessed, err := dynamoClient.PutItems(ctx, tableName, items) + assert.NoError(t, err) + assert.Len(t, unprocessed, 0) + + queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + ":index": &types.AttributeValueMemberN{ + Value: "1", + }}) + assert.NoError(t, err) + assert.Len(t, queryResult, 1) + assert.Equal(t, "1", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "1000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) + + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + ":index": &types.AttributeValueMemberN{ + Value: "1", + }}) + assert.NoError(t, err) + assert.Len(t, queryResult, 1) + assert.Equal(t, "1", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "1000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) + + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + ":index": &types.AttributeValueMemberN{ + Value: "32", + }}) + assert.NoError(t, err) + assert.Len(t, queryResult, 0) + + _, err = dynamoClient.UpdateItem(ctx, tableName, commondynamodb.Key{ + "BinIndex": &types.AttributeValueMemberN{Value: "1"}, + }, commondynamodb.Item{ + "BinUsage": &types.AttributeValueMemberN{Value: "2000"}, + }) + assert.NoError(t, err) + + err = dynamoClient.PutItem(ctx, tableName, + commondynamodb.Item{ + "BinIndex": &types.AttributeValueMemberN{Value: "2"}, + "BinUsage": &types.AttributeValueMemberN{Value: "3000"}, + "UpdatedAt": &types.AttributeValueMemberS{Value: time.Now().Format(time.RFC3339)}, + }, + ) + assert.NoError(t, err) + + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + ":index": &types.AttributeValueMemberN{ + Value: "1", + }}) + assert.NoError(t, err) + assert.Len(t, queryResult, 1) + assert.Equal(t, "1", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "2000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) + + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + ":index": &types.AttributeValueMemberN{ + Value: "2", + }}) + assert.NoError(t, err) + assert.Len(t, queryResult, 1) + assert.Equal(t, "2", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "3000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) +} + +func TestOnDemandUsageBasicOperations(t *testing.T) { + tableName := "ondemand" + meterer.CreateOnDemandTable(clientConfig, tableName) + indexName := "AccountIDIndex" + + ctx := context.Background() + + err := dynamoClient.PutItem(ctx, tableName, + commondynamodb.Item{ + "AccountID": &types.AttributeValueMemberS{Value: "account1"}, + "CumulativePayments": &types.AttributeValueMemberN{Value: "1"}, + "DataLength": &types.AttributeValueMemberN{Value: "1000"}, + }, + ) + assert.NoError(t, err) + + numItems := 30 + repetitions := 5 + items := make([]commondynamodb.Item, numItems) + for i := 0; i < numItems; i += 1 { + items[i] = commondynamodb.Item{ + "AccountID": &types.AttributeValueMemberS{Value: fmt.Sprintf("account%d", i%repetitions)}, + "CumulativePayments": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i)}, + "DataLength": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", i*1000)}, + } + } + unprocessed, err := dynamoClient.PutItems(ctx, tableName, items) + assert.NoError(t, err) + assert.Len(t, unprocessed, 0) + + // get item + item, err := dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + "AccountID": &types.AttributeValueMemberS{Value: "account1"}, + "CumulativePayments": &types.AttributeValueMemberN{Value: "1"}, + }) + assert.NoError(t, err) + assert.Equal(t, "1", item["CumulativePayments"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, "1000", item["DataLength"].(*types.AttributeValueMemberN).Value) + + queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpresseionValues{ + ":account": &types.AttributeValueMemberS{ + Value: "account1", + }}) + assert.NoError(t, err) + assert.Len(t, queryResult, numItems/repetitions) + for _, item := range queryResult { + cumulativePayments, _ := strconv.Atoi(item["CumulativePayments"].(*types.AttributeValueMemberN).Value) + assert.Equal(t, fmt.Sprintf("%d", cumulativePayments*1000), item["DataLength"].(*types.AttributeValueMemberN).Value) + } + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account_id", commondynamodb.ExpresseionValues{ + ":account_id": &types.AttributeValueMemberS{ + Value: fmt.Sprintf("account%d", numItems/repetitions+1), + }}) + assert.NoError(t, err) + assert.Len(t, queryResult, 0) + + updatedItem, err := dynamoClient.UpdateItem(ctx, tableName, commondynamodb.Key{ + "AccountID": &types.AttributeValueMemberS{Value: "account1"}, + "CumulativePayments": &types.AttributeValueMemberN{Value: "1"}, + // "BinUsage": &types.AttributeValueMemberN{Value: "1000"}, + }, commondynamodb.Item{ + "AccountID": &types.AttributeValueMemberS{Value: "account1"}, + "CumulativePayments": &types.AttributeValueMemberN{Value: "3"}, + "DataLength": &types.AttributeValueMemberN{Value: "3000"}, + }) + assert.NoError(t, err) + assert.Equal(t, "3000", updatedItem["DataLength"].(*types.AttributeValueMemberN).Value) + + item, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + "AccountID": &types.AttributeValueMemberS{Value: "account1"}, + "CumulativePayments": &types.AttributeValueMemberN{Value: "1"}, + }) + assert.NoError(t, err) + assert.Equal(t, "3000", item["DataLength"].(*types.AttributeValueMemberN).Value) +} diff --git a/core/meterer/onchain_state.go b/core/meterer/onchain_state.go new file mode 100644 index 0000000000..2ddec01da2 --- /dev/null +++ b/core/meterer/onchain_state.go @@ -0,0 +1,115 @@ +package meterer + +import ( + "context" + "errors" + "fmt" + "math" + + "github.com/Layr-Labs/eigenda/core/eth" +) + +/* HEAVILY MOCKED */ + +var ( + DummyReservationBytesLimit = uint64(1024) + DummyPaymentLimit = TokenAmount(512) + DummyMinimumChargeableSize = uint32(128) + DummyMinimumChargeablePayment = uint32(128) + + DummyReservation = ActiveReservation{DataRate: DummyReservationBytesLimit, StartTimestamp: 0, EndTimestamp: math.MaxUint32, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}} + DummyOnDemandPayment = OnDemandPayment{CumulativePayment: DummyPaymentLimit} +) + +// PaymentAccounts (For reservations and on-demand payments) + +type TokenAmount uint64 // TODO: change to uint128 + +// OperatorInfo contains information about an operator which is stored on the blockchain state, +// corresponding to a particular quorum +type ActiveReservation struct { + DataRate uint64 // Bandwidth per reservation bin + StartTimestamp uint64 // Unix timestamp that's valid for basically eternity + EndTimestamp uint64 + + QuorumNumbers []uint8 + QuorumSplit []byte // ordered mapping of quorum number to payment split; on-chain validation should ensure split <= 100 +} + +type OnDemandPayment struct { + CumulativePayment TokenAmount // Total amount deposited by the user +} + +// ActiveReservations contains information about the current state of active reservations +// map account ID to the ActiveReservation for that account. +type ActiveReservations struct { + Reservations map[string]*ActiveReservation +} + +// OnDemandPayments contains information about the current state of on-demand payments +// Map from account ID to the OnDemandPayment for that account. +type OnDemandPayments struct { + Payments map[string]*OnDemandPayment +} + +// OnchainPaymentState is an interface for getting information about the current chain state for payments. +type OnchainPaymentState struct { + tx *eth.Transactor + + ActiveReservations *ActiveReservations + OnDemandPayments *OnDemandPayments + // FUNCTIONS IF THIS STRUCT WAS AN INTERFACE? + // GetActiveReservations(ctx context.Context, blockNumber uint) (map[string]*ActiveReservations, error) + // GetActiveReservationByAccount(ctx context.Context, blockNumber uint, accountID string) (*ActiveReservation, error) + // GetOnDemandPayments(ctx context.Context, blockNumber uint) (map[string]*OnDemandPayments, error) + // GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint, accountID string) (*OnDemandPayment, error) +} + +func NewOnchainPaymentState() *OnchainPaymentState { + return &OnchainPaymentState{ + ActiveReservations: &ActiveReservations{}, + OnDemandPayments: &OnDemandPayments{}, + } +} + +// Mock data initialization method (mocked structs) +func (pcs *OnchainPaymentState) InitializeOnchainPaymentState() { + + // update with a pull from chain (write pulling functions in/core/eth/tx.go) + // TODO: update with pulling from chain; currently use a dummy + pbk := "0x04cd9ba0357d1e5b929554e932cccdd6cf2d6e41d9d67907365b3e46cf005d5afd92b4f7bb3b829520be1a1b88641691973c98dfe68b07ee3613e270406285dfe8" + pcs.ActiveReservations.Reservations = map[string]*ActiveReservation{ + pbk: &DummyReservation, + } + pcs.OnDemandPayments.Payments = map[string]*OnDemandPayment{ + pbk: &DummyOnDemandPayment, + } + + fmt.Println("Initialized payment state with dummy reservation and on-demand payments") +} + +func (pcs *OnchainPaymentState) GetCurrentBlockNumber(ctx context.Context) (uint, error) { + return 0, nil +} + +func (pcs *OnchainPaymentState) GetActiveReservations(ctx context.Context, blockNumber uint) (*ActiveReservations, error) { + return pcs.ActiveReservations, nil +} + +func (pcs *OnchainPaymentState) GetActiveReservationByAccount(ctx context.Context, blockNumber uint, accountID string) (*ActiveReservation, error) { + if reservation, ok := pcs.ActiveReservations.Reservations[accountID]; ok { + return reservation, nil + } + return nil, errors.New("reservation not found") +} + +func (pcs *OnchainPaymentState) GetOnDemandPayments(ctx context.Context, blockNumber uint) (*OnDemandPayments, error) { + return pcs.OnDemandPayments, nil +} + +func (pcs *OnchainPaymentState) GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint, accountID string) (*OnDemandPayment, error) { + if payment, ok := pcs.OnDemandPayments.Payments[accountID]; ok { + return payment, nil + } + return nil, errors.New("payment not found") +} diff --git a/core/meterer/onchain_state_test.go b/core/meterer/onchain_state_test.go new file mode 100644 index 0000000000..46f7fc7985 --- /dev/null +++ b/core/meterer/onchain_state_test.go @@ -0,0 +1,117 @@ +package meterer_test + +import ( + "context" + "testing" + + "github.com/Layr-Labs/eigenda/core/meterer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +// MockOnchainPaymentState : TO BE REPLACED WITH ACTUAL IMPLEMENTATION +type MockOnchainPaymentState struct { + mock.Mock +} + +func (m *MockOnchainPaymentState) GetCurrentBlockNumber() (uint, error) { + args := m.Called() + return args.Get(0).(uint), args.Error(1) +} + +func (m *MockOnchainPaymentState) GetActiveReservations(ctx context.Context, blockNumber uint) (*meterer.ActiveReservations, error) { + args := m.Called(ctx, blockNumber) + return args.Get(0).(*meterer.ActiveReservations), args.Error(1) +} + +func (m *MockOnchainPaymentState) GetActiveReservationByAccount(ctx context.Context, blockNumber uint, accountID string) (*meterer.ActiveReservation, error) { + args := m.Called(ctx, blockNumber, accountID) + return args.Get(0).(*meterer.ActiveReservation), args.Error(1) +} + +func (m *MockOnchainPaymentState) GetOnDemandPayments(ctx context.Context, blockNumber uint) (*meterer.OnDemandPayments, error) { + args := m.Called(ctx, blockNumber) + return args.Get(0).(*meterer.OnDemandPayments), args.Error(1) +} + +func (m *MockOnchainPaymentState) GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint, accountID string) (*meterer.OnDemandPayment, error) { + args := m.Called(ctx, blockNumber, accountID) + return args.Get(0).(*meterer.OnDemandPayment), args.Error(1) +} + +func (m *MockOnchainPaymentState) GetIndexedActiveReservations(ctx context.Context, blockNumber uint) (*meterer.ActiveReservations, error) { + args := m.Called(ctx, blockNumber) + return args.Get(0).(*meterer.ActiveReservations), args.Error(1) +} + +func (m *MockOnchainPaymentState) GetIndexedActiveReservationByAccount(ctx context.Context, blockNumber uint, accountID string) (*meterer.ActiveReservation, error) { + args := m.Called(ctx, blockNumber, accountID) + return args.Get(0).(*meterer.ActiveReservation), args.Error(1) +} + +func (m *MockOnchainPaymentState) GetIndexedOnDemandPayments(ctx context.Context, blockNumber uint) (*meterer.OnDemandPayments, error) { + args := m.Called(ctx, blockNumber) + return args.Get(0).(*meterer.OnDemandPayments), args.Error(1) +} + +func (m *MockOnchainPaymentState) GetIndexedOnDemandPaymentByAccount(ctx context.Context, blockNumber uint, accountID string) (*meterer.OnDemandPayment, error) { + args := m.Called(ctx, blockNumber, accountID) + return args.Get(0).(*meterer.OnDemandPayment), args.Error(1) +} + +func (m *MockOnchainPaymentState) Start(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +func TestGetCurrentBlockNumber(t *testing.T) { + mockState := new(MockOnchainPaymentState) + mockState.On("GetCurrentBlockNumber").Return(uint(1000), nil) + + blockNumber, err := mockState.GetCurrentBlockNumber() + assert.NoError(t, err) + assert.Equal(t, uint(1000), blockNumber) +} + +func TestGetActiveReservations(t *testing.T) { + mockState := new(MockOnchainPaymentState) + ctx := context.Background() + expectedReservations := &meterer.ActiveReservations{ + Reservations: map[string]*meterer.ActiveReservation{ + "account1": { + DataRate: 100, + StartTimestamp: 1000, + EndTimestamp: 2000, + QuorumSplit: []byte{50, 50}, + }, + }, + } + mockState.On("GetActiveReservations", ctx, uint(1000)).Return(expectedReservations, nil) + + reservations, err := mockState.GetActiveReservations(ctx, 1000) + assert.NoError(t, err) + assert.Equal(t, expectedReservations, reservations) +} + +func TestGetOnDemandPaymentByAccount(t *testing.T) { + mockState := new(MockOnchainPaymentState) + ctx := context.Background() + accountID := "account1" + expectedPayment := &meterer.OnDemandPayment{ + CumulativePayment: meterer.TokenAmount(1000000), + } + mockState.On("GetOnDemandPaymentByAccount", ctx, uint(1000), accountID).Return(expectedPayment, nil) + + payment, err := mockState.GetOnDemandPaymentByAccount(ctx, 1000, accountID) + assert.NoError(t, err) + assert.Equal(t, expectedPayment, payment) +} + +func TestStart(t *testing.T) { + mockState := new(MockOnchainPaymentState) + ctx := context.Background() + mockState.On("Start", ctx).Return(nil) + + err := mockState.Start(ctx) + assert.NoError(t, err) +} diff --git a/core/meterer/types.go b/core/meterer/types.go new file mode 100644 index 0000000000..a84d06f51b --- /dev/null +++ b/core/meterer/types.go @@ -0,0 +1,192 @@ +package meterer + +import ( + "crypto/ecdsa" + "fmt" + "math/big" + + "github.com/Layr-Labs/eigenda/core" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/common/math" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/signer/core/apitypes" +) + +/* SUBJECT TO BIG MODIFICATIONS */ + +// BlobHeader represents the header information for a blob +type BlobHeader struct { + // Existing fields + Commitment core.G1Point + DataLength uint32 + QuorumNumbers []uint8 + AccountID string + + // New fields + BinIndex uint32 + // TODO: we are thinking the contract can use uint128 for cumulative payment, + // but the definition on v2 uses uint64. Double check with team. + CumulativePayment uint64 + + Signature []byte +} + +// EIP712Signer handles EIP-712 signing operations +type EIP712Signer struct { + domain apitypes.TypedDataDomain + types apitypes.Types +} + +// NewEIP712Signer creates a new EIP712Signer instance +func NewEIP712Signer(chainID *big.Int, verifyingContract common.Address) *EIP712Signer { + return &EIP712Signer{ + domain: apitypes.TypedDataDomain{ + Name: "EigenDA", + Version: "1", + ChainId: (*math.HexOrDecimal256)(chainID), + VerifyingContract: verifyingContract.Hex(), + }, + types: apitypes.Types{ + "EIP712Domain": []apitypes.Type{ + {Name: "name", Type: "string"}, + {Name: "version", Type: "string"}, + {Name: "chainId", Type: "uint256"}, + {Name: "verifyingContract", Type: "address"}, + }, + "BlobHeader": []apitypes.Type{ + {Name: "accountID", Type: "string"}, + {Name: "binIndex", Type: "uint32"}, + {Name: "cumulativePayment", Type: "uint64"}, + {Name: "commitment", Type: "bytes"}, + {Name: "dataLength", Type: "uint32"}, + {Name: "quorumNumbers", Type: "uint8[]"}, + }, + }, + } +} + +// SignBlobHeader signs a BlobHeader using EIP-712 +func (s *EIP712Signer) SignBlobHeader(header *BlobHeader, privateKey *ecdsa.PrivateKey) ([]byte, error) { + commitment := header.Commitment.Serialize() + typedData := apitypes.TypedData{ + Types: s.types, + PrimaryType: "BlobHeader", + Domain: s.domain, + Message: apitypes.TypedDataMessage{ + "accountID": header.AccountID, + "binIndex": fmt.Sprintf("%d", header.BinIndex), + "cumulativePayment": fmt.Sprintf("%d", header.CumulativePayment), + "commitment": hexutil.Encode(commitment), + "dataLength": fmt.Sprintf("%d", header.DataLength), + "quorumNumbers": convertUint8SliceToMap(header.QuorumNumbers), + }, + } + + signature, err := s.signTypedData(typedData, privateKey) + if err != nil { + return nil, fmt.Errorf("error signing blob header: %v", err) + } + + return signature, nil +} + +func convertUint8SliceToMap(params []uint8) []string { + result := make([]string, len(params)) + for i, param := range params { + result[i] = fmt.Sprintf("%d", param) // Converting uint32 to string + } + return result +} + +// RecoverSender recovers the sender's address from a signed BlobHeader +func (s *EIP712Signer) RecoverSender(header *BlobHeader) (common.Address, error) { + typedData := apitypes.TypedData{ + Types: s.types, + PrimaryType: "BlobHeader", + Domain: s.domain, + Message: apitypes.TypedDataMessage{ + "accountID": header.AccountID, + "binIndex": fmt.Sprintf("%d", header.BinIndex), + "cumulativePayment": fmt.Sprintf("%d", header.CumulativePayment), + "commitment": hexutil.Encode(header.Commitment.Serialize()), + "dataLength": fmt.Sprintf("%d", header.DataLength), + "quorumNumbers": convertUint8SliceToMap(header.QuorumNumbers), + }, + } + + return s.recoverTypedData(typedData, header.Signature) +} + +func (s *EIP712Signer) signTypedData(typedData apitypes.TypedData, privateKey *ecdsa.PrivateKey) ([]byte, error) { + domainSeparator, err := typedData.HashStruct("EIP712Domain", typedData.Domain.Map()) + if err != nil { + return nil, fmt.Errorf("error hashing EIP712Domain: %v", err) + } + + typedDataHash, err := typedData.HashStruct(typedData.PrimaryType, typedData.Message) + if err != nil { + return nil, fmt.Errorf("error hashing primary type: %v", err) + } + + rawData := []byte(fmt.Sprintf("\x19\x01%s%s", string(domainSeparator), string(typedDataHash))) + digest := crypto.Keccak256(rawData) + + signature, err := crypto.Sign(digest, privateKey) + if err != nil { + return nil, fmt.Errorf("error signing digest: %v", err) + } + + return signature, nil +} + +func (s *EIP712Signer) recoverTypedData(typedData apitypes.TypedData, signature []byte) (common.Address, error) { + domainSeparator, err := typedData.HashStruct("EIP712Domain", typedData.Domain.Map()) + if err != nil { + return common.Address{}, fmt.Errorf("error hashing EIP712Domain: %v", err) + } + + typedDataHash, err := typedData.HashStruct(typedData.PrimaryType, typedData.Message) + if err != nil { + return common.Address{}, fmt.Errorf("error hashing primary type: %v", err) + } + + rawData := []byte(fmt.Sprintf("\x19\x01%s%s", string(domainSeparator), string(typedDataHash))) + digest := crypto.Keccak256(rawData) + + pubKey, err := crypto.SigToPub(digest, signature) + if err != nil { + return common.Address{}, fmt.Errorf("error recovering public key: %v", err) + } + + return crypto.PubkeyToAddress(*pubKey), nil +} + +// ConstructBlobHeader creates a BlobHeader with a valid signature +func ConstructBlobHeader( + signer *EIP712Signer, + binIndex uint32, + cumulativePayment uint64, + commitment core.G1Point, + dataLength uint32, + quorumNumbers []uint8, + privateKey *ecdsa.PrivateKey, +) (*BlobHeader, error) { + accountID := crypto.PubkeyToAddress(privateKey.PublicKey).Hex() + header := &BlobHeader{ + AccountID: accountID, + BinIndex: binIndex, + CumulativePayment: cumulativePayment, + Commitment: commitment, + QuorumNumbers: quorumNumbers, + DataLength: dataLength, + } + + signature, err := signer.SignBlobHeader(header, privateKey) + if err != nil { + return nil, fmt.Errorf("error signing blob header: %v", err) + } + + header.Signature = signature + return header, nil +} diff --git a/core/meterer/types_test.go b/core/meterer/types_test.go new file mode 100644 index 0000000000..bab550b806 --- /dev/null +++ b/core/meterer/types_test.go @@ -0,0 +1,186 @@ +package meterer_test + +import ( + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/meterer" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEIP712Signer(t *testing.T) { + chainID := big.NewInt(17000) + verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") + signer := meterer.NewEIP712Signer(chainID, verifyingContract) + + privateKey, err := crypto.GenerateKey() + require.NoError(t, err) + + commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) + + header := &meterer.BlobHeader{ + BinIndex: 0, + CumulativePayment: 1000, + Commitment: *commitment, + DataLength: 1024, + QuorumNumbers: []uint8{1}, + } + + t.Run("SignBlobHeader", func(t *testing.T) { + signature, err := signer.SignBlobHeader(header, privateKey) + require.NoError(t, err) + assert.NotEmpty(t, signature) + }) + + t.Run("RecoverSender", func(t *testing.T) { + signature, err := signer.SignBlobHeader(header, privateKey) + require.NoError(t, err) + + header.Signature = signature + recoveredAddress, err := signer.RecoverSender(header) + require.NoError(t, err) + + expectedAddress := crypto.PubkeyToAddress(privateKey.PublicKey) + assert.Equal(t, expectedAddress, recoveredAddress) + }) +} + +func TestConstructBlobHeader(t *testing.T) { + chainID := big.NewInt(17000) + verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") + signer := meterer.NewEIP712Signer(chainID, verifyingContract) + + privateKey, err := crypto.GenerateKey() + require.NoError(t, err) + + commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) + + header, err := meterer.ConstructBlobHeader( + signer, + 0, // binIndex + 1000, // cumulativePayment + *commitment, // core.G1Point + 1024, // dataLength + []uint8{1}, + privateKey, + ) + + require.NoError(t, err) + assert.NotNil(t, header) + assert.NotEmpty(t, header.Signature) + + recoveredAddress, err := signer.RecoverSender(header) + require.NoError(t, err) + + expectedAddress := crypto.PubkeyToAddress(privateKey.PublicKey) + assert.Equal(t, expectedAddress, recoveredAddress) +} + +func TestEIP712SignerWithDifferentKeys(t *testing.T) { + chainID := big.NewInt(17000) + verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") + signer := meterer.NewEIP712Signer(chainID, verifyingContract) + + privateKey1, err := crypto.GenerateKey() + require.NoError(t, err) + + privateKey2, err := crypto.GenerateKey() + require.NoError(t, err) + + commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) + + header, err := meterer.ConstructBlobHeader( + signer, + 0, + 1000, + *commitment, + 1024, + []uint8{1}, + privateKey1, + ) + + require.NoError(t, err) + assert.NotNil(t, header) + assert.NotEmpty(t, header.Signature) + + recoveredAddress, err := signer.RecoverSender(header) + require.NoError(t, err) + + expectedAddress1 := crypto.PubkeyToAddress(privateKey1.PublicKey) + expectedAddress2 := crypto.PubkeyToAddress(privateKey2.PublicKey) + + assert.Equal(t, expectedAddress1, recoveredAddress) + assert.NotEqual(t, expectedAddress2, recoveredAddress) +} + +func TestEIP712SignerWithModifiedHeader(t *testing.T) { + chainID := big.NewInt(17000) + verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") + signer := meterer.NewEIP712Signer(chainID, verifyingContract) + + privateKey, err := crypto.GenerateKey() + require.NoError(t, err) + + commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) + + header, err := meterer.ConstructBlobHeader( + signer, + 0, + 1000, + *commitment, + 1024, + []uint8{1}, + privateKey, + ) + + require.NoError(t, err) + assert.NotNil(t, header) + assert.NotEmpty(t, header.Signature) + recoveredAddress, err := signer.RecoverSender(header) + require.NoError(t, err) + + expectedAddress := crypto.PubkeyToAddress(privateKey.PublicKey) + assert.Equal(t, expectedAddress, recoveredAddress, "Recovered address should match the address derived from the private key") + + header.AccountID = "modifiedAccount" + + addr, err := signer.RecoverSender(header) + require.NotEqual(t, expectedAddress, addr) +} + +func TestEIP712SignerWithDifferentChainID(t *testing.T) { + chainID1 := big.NewInt(17000) + chainID2 := big.NewInt(17001) + verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") + signer1 := meterer.NewEIP712Signer(chainID1, verifyingContract) + signer2 := meterer.NewEIP712Signer(chainID2, verifyingContract) + + privateKey, err := crypto.GenerateKey() + require.NoError(t, err) + + commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) + + header, err := meterer.ConstructBlobHeader( + signer1, + 0, + 1000, + *commitment, + 1024, + []uint8{1}, + privateKey, + ) + + require.NoError(t, err) + assert.NotNil(t, header) + assert.NotEmpty(t, header.Signature) + + // Try to recover the sender using a signer with a different chain ID + sender, err := signer2.RecoverSender(header) + expectedAddress := crypto.PubkeyToAddress(privateKey.PublicKey) + + require.NotEqual(t, expectedAddress, sender) +} diff --git a/core/meterer/util.go b/core/meterer/util.go new file mode 100644 index 0000000000..475567b39d --- /dev/null +++ b/core/meterer/util.go @@ -0,0 +1,164 @@ +package meterer + +import ( + "context" + "math/big" + + commonaws "github.com/Layr-Labs/eigenda/common/aws" + test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" + "github.com/Layr-Labs/eigenda/core" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +func DummyCommitment() core.G1Point { + commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) + return *commitment +} + +func CreateReservationTable(clientConfig commonaws.ClientConfig, tableName string) error { + ctx := context.Background() + _, err := test_utils.CreateTable(ctx, clientConfig, tableName, &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("AccountID"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("BinIndex"), + AttributeType: types.ScalarAttributeTypeN, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("AccountID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("BinIndex"), + KeyType: types.KeyTypeRange, + }, + }, + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String("AccountIDIndex"), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("AccountID"), + KeyType: types.KeyTypeHash, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, // ProjectionTypeAll means all attributes are projected into the index + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, + }, + }, + TableName: aws.String(tableName), + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, + }) + return err +} + +func CreateGlobalReservationTable(clientConfig commonaws.ClientConfig, tableName string) error { + ctx := context.Background() + _, err := test_utils.CreateTable(ctx, clientConfig, tableName, &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("BinIndex"), + AttributeType: types.ScalarAttributeTypeN, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BinIndex"), + KeyType: types.KeyTypeHash, + }, + }, + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String("BinIndexIndex"), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BinIndex"), + KeyType: types.KeyTypeHash, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, + }, + }, + TableName: aws.String(tableName), + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, + }) + return err +} + +func CreateOnDemandTable(clientConfig commonaws.ClientConfig, tableName string) error { + ctx := context.Background() + _, err := test_utils.CreateTable(ctx, clientConfig, tableName, &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("AccountID"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("CumulativePayments"), + AttributeType: types.ScalarAttributeTypeN, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("AccountID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("CumulativePayments"), + KeyType: types.KeyTypeRange, + }, + }, + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String("AccountIDIndex"), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("AccountID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("CumulativePayments"), + KeyType: types.KeyTypeRange, // Sort key + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, + }, + }, + TableName: aws.String(tableName), + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, + }) + return err +} From f066d70e6f3390dea605a8353473c6664ee1b2c8 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Tue, 8 Oct 2024 10:56:22 -0700 Subject: [PATCH 02/15] refactor: minor updates and cleanup --- core/meterer/meterer.go | 12 ++++--- core/meterer/offchain_store.go | 64 +++------------------------------- core/meterer/onchain_state.go | 5 --- 3 files changed, 12 insertions(+), 69 deletions(-) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 7cb866eacb..3b7bc62b1f 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -13,12 +13,14 @@ type TimeoutConfig struct { TxnBroadcastTimeout time.Duration } -// network parameters (this should be published on-chain and read through contracts) +// network parameters that should be published on-chain. We currently configure these params through disperser env vars. type Config struct { - GlobalBytesPerSecond uint64 // 2^64 bytes ~= 18 exabytes per second; if we use uint32, that's ~4GB/s - PricePerChargeable uint32 // 2^64 gwei ~= 18M Eth; uint32 => ~4ETH - MinChargeableSize uint32 - ReservationWindow uint32 + // for rate limiting 2^64 ~= 18 exabytes per second; 2^32 ~= 4GB/s + // for payments 2^64 ~= 18M Eth; 2^32 ~= 4ETH + GlobalBytesPerSecond uint64 // Global rate limit in bytes per second for on-demand payments + MinChargeableSize uint32 // Minimum size of a chargeable unit in bytes, used as a floor for on-demand payments + PricePerChargeable uint32 // Price per chargeable unit in gwei, used for on-demand payments + ReservationWindow uint32 // Duration of all reservations in seconds, used to calculate bin indices } // disperser API server will receive requests from clients. these requests will be with a blobHeader with payments information (CumulativePayments, BinIndex, and Signature) diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index d494a84ab2..c4026fd970 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -11,7 +11,6 @@ import ( commonaws "github.com/Layr-Labs/eigenda/common/aws" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) @@ -90,12 +89,7 @@ func (s *OffchainStore) UpdateReservationBin(ctx context.Context, accountID stri "BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)}, } - update := map[string]types.AttributeValue{ - "BinUsage": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(size), 10)}, - } - - fmt.Println("increment the item in a table", s.reservationTableName) - res, err := s.dynamoClient.UpdateItemIncrement(ctx, s.reservationTableName, key, update) + res, err := s.dynamoClient.IncrementBy(ctx, s.reservationTableName, key, "BinUsage", size) if err != nil { return 0, fmt.Errorf("failed to increment bin usage: %w", err) } @@ -118,15 +112,12 @@ func (s *OffchainStore) UpdateReservationBin(ctx context.Context, accountID stri return binUsageValue, nil } -func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, size uint32) (uint64, error) { +func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, size uint64) (uint64, error) { key := map[string]types.AttributeValue{ "BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)}, } - update := map[string]types.AttributeValue{ - "BinUsage": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(size), 10)}, - } - res, err := s.dynamoClient.UpdateItemIncrement(ctx, s.globalBinTableName, key, update) + res, err := s.dynamoClient.IncrementBy(ctx, s.globalBinTableName, key, "BinUsage", size) if err != nil { return 0, err } @@ -149,52 +140,6 @@ func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, si return binUsageValue, nil } -func (s *OffchainStore) FindReservationBin(ctx context.Context, accountID string, binIndex uint64) (*ReservationBin, error) { - key := map[string]types.AttributeValue{ - "AccountID": &types.AttributeValueMemberS{Value: accountID}, - "BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)}, - } - - result, err := s.dynamoClient.GetItem(ctx, s.reservationTableName, key) - if err != nil { - return nil, err - } - - if result == nil { - return nil, errors.New("reservation not found") - } - - var reservation ReservationBin - err = attributevalue.UnmarshalMap(result, &reservation) - if err != nil { - return nil, err - } - - return &reservation, nil -} - -// Find all reservation bins for a given account -func (s *OffchainStore) FindReservationBins(ctx context.Context, accountID string) ([]ReservationBin, error) { - result, err := s.dynamoClient.QueryIndex(ctx, s.reservationTableName, "AccountIDIndex", "AccountID = :accountID", commondynamodb.ExpresseionValues{ - ":accountID": &types.AttributeValueMemberS{Value: accountID}, - }) - if err != nil { - return nil, err - } - - if result == nil { - return nil, errors.New("reservation not found") - } - - var reservations []ReservationBin - err = attributevalue.UnmarshalListOfMaps(result, &reservations) - if err != nil { - return nil, err - } - - return reservations, nil -} - func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, blobHeader BlobHeader, blobSizeCharged uint32) error { result, err := s.dynamoClient.GetItem(ctx, s.onDemandTableName, commondynamodb.Item{ @@ -238,7 +183,8 @@ func (s *OffchainStore) RemoveOnDemandPayment(ctx context.Context, accountID str return nil } -// relevant on-demand payment records: previous cumulative payment, next cumulative payment, blob size of next payment +// GetRelevantOnDemandRecords gets previous cumulative payment, next cumulative payment, blob size of next payment +// The queries are done sequentially instead of one-go for efficient querying and would not cause race condition errors for honest requests func (s *OffchainStore) GetRelevantOnDemandRecords(ctx context.Context, accountID string, cumulativePayment uint64) (uint64, uint64, uint32, error) { // Fetch the largest entry smaller than the given cumulativePayment smallerResult, err := s.dynamoClient.QueryIndexOrderWithLimit(ctx, s.onDemandTableName, "AccountIDIndex", diff --git a/core/meterer/onchain_state.go b/core/meterer/onchain_state.go index 2ddec01da2..88e0e4f5f5 100644 --- a/core/meterer/onchain_state.go +++ b/core/meterer/onchain_state.go @@ -58,11 +58,6 @@ type OnchainPaymentState struct { ActiveReservations *ActiveReservations OnDemandPayments *OnDemandPayments - // FUNCTIONS IF THIS STRUCT WAS AN INTERFACE? - // GetActiveReservations(ctx context.Context, blockNumber uint) (map[string]*ActiveReservations, error) - // GetActiveReservationByAccount(ctx context.Context, blockNumber uint, accountID string) (*ActiveReservation, error) - // GetOnDemandPayments(ctx context.Context, blockNumber uint) (map[string]*OnDemandPayments, error) - // GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint, accountID string) (*OnDemandPayment, error) } func NewOnchainPaymentState() *OnchainPaymentState { From 1e3a10b13475a595ceca60f40502b902b2932657 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Tue, 8 Oct 2024 11:09:01 -0700 Subject: [PATCH 03/15] refactor: PaymentMetadata instead of BlobHeader, remove commitment field --- core/meterer/offchain_store.go | 12 +++++----- core/meterer/types.go | 38 +++++++++++++------------------- core/meterer/types_test.go | 40 ++++++++++------------------------ 3 files changed, 33 insertions(+), 57 deletions(-) diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index c4026fd970..3981f39d58 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -140,11 +140,11 @@ func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, si return binUsageValue, nil } -func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, blobHeader BlobHeader, blobSizeCharged uint32) error { +func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, paymentMetadata PaymentMetadata, symbolsCharged uint32) error { result, err := s.dynamoClient.GetItem(ctx, s.onDemandTableName, commondynamodb.Item{ - "AccountID": &types.AttributeValueMemberS{Value: blobHeader.AccountID}, - "CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(blobHeader.CumulativePayment, 10)}, + "AccountID": &types.AttributeValueMemberS{Value: paymentMetadata.AccountID}, + "CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(paymentMetadata.CumulativePayment, 10)}, }, ) if err != nil { @@ -155,9 +155,9 @@ func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, blobHeader BlobH } err = s.dynamoClient.PutItem(ctx, s.onDemandTableName, commondynamodb.Item{ - "AccountID": &types.AttributeValueMemberS{Value: blobHeader.AccountID}, - "CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(blobHeader.CumulativePayment, 10)}, - "DataLength": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(blobSizeCharged), 10)}, + "AccountID": &types.AttributeValueMemberS{Value: paymentMetadata.AccountID}, + "CumulativePayments": &types.AttributeValueMemberN{Value: strconv.FormatUint(paymentMetadata.CumulativePayment, 10)}, + "DataLength": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(symbolsCharged), 10)}, }, ) diff --git a/core/meterer/types.go b/core/meterer/types.go index a84d06f51b..369a12ec21 100644 --- a/core/meterer/types.go +++ b/core/meterer/types.go @@ -5,9 +5,7 @@ import ( "fmt" "math/big" - "github.com/Layr-Labs/eigenda/core" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/signer/core/apitypes" @@ -15,11 +13,10 @@ import ( /* SUBJECT TO BIG MODIFICATIONS */ -// BlobHeader represents the header information for a blob -type BlobHeader struct { +// PaymentMetadata represents the header information for a blob +type PaymentMetadata struct { // Existing fields - Commitment core.G1Point - DataLength uint32 + DataLength uint32 // length in number of symbols QuorumNumbers []uint8 AccountID string @@ -54,7 +51,7 @@ func NewEIP712Signer(chainID *big.Int, verifyingContract common.Address) *EIP712 {Name: "chainId", Type: "uint256"}, {Name: "verifyingContract", Type: "address"}, }, - "BlobHeader": []apitypes.Type{ + "PaymentMetadata": []apitypes.Type{ {Name: "accountID", Type: "string"}, {Name: "binIndex", Type: "uint32"}, {Name: "cumulativePayment", Type: "uint64"}, @@ -66,18 +63,16 @@ func NewEIP712Signer(chainID *big.Int, verifyingContract common.Address) *EIP712 } } -// SignBlobHeader signs a BlobHeader using EIP-712 -func (s *EIP712Signer) SignBlobHeader(header *BlobHeader, privateKey *ecdsa.PrivateKey) ([]byte, error) { - commitment := header.Commitment.Serialize() +// SignPaymentMetadata signs a PaymentMetadata using EIP-712 +func (s *EIP712Signer) SignPaymentMetadata(header *PaymentMetadata, privateKey *ecdsa.PrivateKey) ([]byte, error) { typedData := apitypes.TypedData{ Types: s.types, - PrimaryType: "BlobHeader", + PrimaryType: "PaymentMetadata", Domain: s.domain, Message: apitypes.TypedDataMessage{ "accountID": header.AccountID, "binIndex": fmt.Sprintf("%d", header.BinIndex), "cumulativePayment": fmt.Sprintf("%d", header.CumulativePayment), - "commitment": hexutil.Encode(commitment), "dataLength": fmt.Sprintf("%d", header.DataLength), "quorumNumbers": convertUint8SliceToMap(header.QuorumNumbers), }, @@ -99,17 +94,16 @@ func convertUint8SliceToMap(params []uint8) []string { return result } -// RecoverSender recovers the sender's address from a signed BlobHeader -func (s *EIP712Signer) RecoverSender(header *BlobHeader) (common.Address, error) { +// RecoverSender recovers the sender's address from a signed PaymentMetadata +func (s *EIP712Signer) RecoverSender(header *PaymentMetadata) (common.Address, error) { typedData := apitypes.TypedData{ Types: s.types, - PrimaryType: "BlobHeader", + PrimaryType: "PaymentMetadata", Domain: s.domain, Message: apitypes.TypedDataMessage{ "accountID": header.AccountID, "binIndex": fmt.Sprintf("%d", header.BinIndex), "cumulativePayment": fmt.Sprintf("%d", header.CumulativePayment), - "commitment": hexutil.Encode(header.Commitment.Serialize()), "dataLength": fmt.Sprintf("%d", header.DataLength), "quorumNumbers": convertUint8SliceToMap(header.QuorumNumbers), }, @@ -162,27 +156,25 @@ func (s *EIP712Signer) recoverTypedData(typedData apitypes.TypedData, signature return crypto.PubkeyToAddress(*pubKey), nil } -// ConstructBlobHeader creates a BlobHeader with a valid signature -func ConstructBlobHeader( +// ConstructPaymentMetadata creates a PaymentMetadata with a valid signature +func ConstructPaymentMetadata( signer *EIP712Signer, binIndex uint32, cumulativePayment uint64, - commitment core.G1Point, dataLength uint32, quorumNumbers []uint8, privateKey *ecdsa.PrivateKey, -) (*BlobHeader, error) { +) (*PaymentMetadata, error) { accountID := crypto.PubkeyToAddress(privateKey.PublicKey).Hex() - header := &BlobHeader{ + header := &PaymentMetadata{ AccountID: accountID, BinIndex: binIndex, CumulativePayment: cumulativePayment, - Commitment: commitment, QuorumNumbers: quorumNumbers, DataLength: dataLength, } - signature, err := signer.SignBlobHeader(header, privateKey) + signature, err := signer.SignPaymentMetadata(header, privateKey) if err != nil { return nil, fmt.Errorf("error signing blob header: %v", err) } diff --git a/core/meterer/types_test.go b/core/meterer/types_test.go index bab550b806..0712b108e6 100644 --- a/core/meterer/types_test.go +++ b/core/meterer/types_test.go @@ -4,7 +4,6 @@ import ( "math/big" "testing" - "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/meterer" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -20,24 +19,21 @@ func TestEIP712Signer(t *testing.T) { privateKey, err := crypto.GenerateKey() require.NoError(t, err) - commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) - - header := &meterer.BlobHeader{ + header := &meterer.PaymentMetadata{ BinIndex: 0, CumulativePayment: 1000, - Commitment: *commitment, DataLength: 1024, QuorumNumbers: []uint8{1}, } - t.Run("SignBlobHeader", func(t *testing.T) { - signature, err := signer.SignBlobHeader(header, privateKey) + t.Run("SignPaymentMetadata", func(t *testing.T) { + signature, err := signer.SignPaymentMetadata(header, privateKey) require.NoError(t, err) assert.NotEmpty(t, signature) }) t.Run("RecoverSender", func(t *testing.T) { - signature, err := signer.SignBlobHeader(header, privateKey) + signature, err := signer.SignPaymentMetadata(header, privateKey) require.NoError(t, err) header.Signature = signature @@ -49,7 +45,7 @@ func TestEIP712Signer(t *testing.T) { }) } -func TestConstructBlobHeader(t *testing.T) { +func TestConstructPaymentMetadata(t *testing.T) { chainID := big.NewInt(17000) verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") signer := meterer.NewEIP712Signer(chainID, verifyingContract) @@ -57,14 +53,11 @@ func TestConstructBlobHeader(t *testing.T) { privateKey, err := crypto.GenerateKey() require.NoError(t, err) - commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) - - header, err := meterer.ConstructBlobHeader( + header, err := meterer.ConstructPaymentMetadata( signer, - 0, // binIndex - 1000, // cumulativePayment - *commitment, // core.G1Point - 1024, // dataLength + 0, // binIndex + 1000, // cumulativePayment + 1024, // dataLength []uint8{1}, privateKey, ) @@ -91,13 +84,10 @@ func TestEIP712SignerWithDifferentKeys(t *testing.T) { privateKey2, err := crypto.GenerateKey() require.NoError(t, err) - commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) - - header, err := meterer.ConstructBlobHeader( + header, err := meterer.ConstructPaymentMetadata( signer, 0, 1000, - *commitment, 1024, []uint8{1}, privateKey1, @@ -125,13 +115,10 @@ func TestEIP712SignerWithModifiedHeader(t *testing.T) { privateKey, err := crypto.GenerateKey() require.NoError(t, err) - commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) - - header, err := meterer.ConstructBlobHeader( + header, err := meterer.ConstructPaymentMetadata( signer, 0, 1000, - *commitment, 1024, []uint8{1}, privateKey, @@ -162,13 +149,10 @@ func TestEIP712SignerWithDifferentChainID(t *testing.T) { privateKey, err := crypto.GenerateKey() require.NoError(t, err) - commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) - - header, err := meterer.ConstructBlobHeader( + header, err := meterer.ConstructPaymentMetadata( signer1, 0, 1000, - *commitment, 1024, []uint8{1}, privateKey, From 1b2c99180d89fe1cab18e94ac24b52c5d26fc05e Mon Sep 17 00:00:00 2001 From: hopeyen Date: Tue, 8 Oct 2024 17:04:14 -0700 Subject: [PATCH 04/15] refactor: reduce TimeoutConfig to read timeout in Config --- core/meterer/meterer.go | 16 ++-------------- core/meterer/onchain_state.go | 10 +++++++++- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 3b7bc62b1f..5a11241c88 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -1,19 +1,10 @@ package meterer import ( - "time" - "github.com/Layr-Labs/eigensdk-go/logging" ) -type TimeoutConfig struct { - ChainReadTimeout time.Duration - ChainWriteTimeout time.Duration - ChainStateTimeout time.Duration - TxnBroadcastTimeout time.Duration -} - -// network parameters that should be published on-chain. We currently configure these params through disperser env vars. +// Config contains network parameters that should be published on-chain. We currently configure these params through disperser env vars. type Config struct { // for rate limiting 2^64 ~= 18 exabytes per second; 2^32 ~= 4GB/s // for payments 2^64 ~= 18M Eth; 2^32 ~= 4ETH @@ -38,7 +29,6 @@ var OnDemandQuorumNumbers = []uint8{0, 1} type Meterer struct { Config - TimeoutConfig ChainState *OnchainPaymentState OffchainStore *OffchainStore @@ -48,15 +38,13 @@ type Meterer struct { func NewMeterer( config Config, - timeoutConfig TimeoutConfig, paymentChainState *OnchainPaymentState, offchainStore *OffchainStore, logger logging.Logger, ) (*Meterer, error) { // TODO: create a separate thread to pull from the chain and update chain state return &Meterer{ - Config: config, - TimeoutConfig: timeoutConfig, + Config: config, ChainState: paymentChainState, OffchainStore: offchainStore, diff --git a/core/meterer/onchain_state.go b/core/meterer/onchain_state.go index 88e0e4f5f5..11e524f932 100644 --- a/core/meterer/onchain_state.go +++ b/core/meterer/onchain_state.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "time" "github.com/Layr-Labs/eigenda/core/eth" ) @@ -52,9 +53,16 @@ type OnDemandPayments struct { Payments map[string]*OnDemandPayment } +// TimeoutConfig contains the timeout configurations for the chain state +type TimeoutConfig struct { + ChainReadTimeout time.Duration + ChainStateTimeout time.Duration +} + // OnchainPaymentState is an interface for getting information about the current chain state for payments. type OnchainPaymentState struct { - tx *eth.Transactor + tx *eth.Transactor + TimeoutConfig TimeoutConfig ActiveReservations *ActiveReservations OnDemandPayments *OnDemandPayments From e6e4ea2701fda68cb4ca3e710d72edc316d3ff5a Mon Sep 17 00:00:00 2001 From: hopeyen Date: Tue, 8 Oct 2024 17:20:50 -0700 Subject: [PATCH 05/15] refactor: PaymentMetadata to core/data, signing to core/auth --- .../types.go => auth/payment_metadata.go} | 35 ++++++------------- .../payment_metadata_test.go} | 27 +++++++------- core/data.go | 16 +++++++++ core/meterer/meterer.go | 4 +++ core/meterer/offchain_store.go | 3 +- core/meterer/onchain_state.go | 10 +----- 6 files changed, 47 insertions(+), 48 deletions(-) rename core/{meterer/types.go => auth/payment_metadata.go} (84%) rename core/{meterer/types_test.go => auth/payment_metadata_test.go} (85%) diff --git a/core/meterer/types.go b/core/auth/payment_metadata.go similarity index 84% rename from core/meterer/types.go rename to core/auth/payment_metadata.go index 369a12ec21..7eeb1704d7 100644 --- a/core/meterer/types.go +++ b/core/auth/payment_metadata.go @@ -1,35 +1,20 @@ -package meterer +package auth import ( "crypto/ecdsa" "fmt" "math/big" + "github.com/Layr-Labs/eigenda/core" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/signer/core/apitypes" ) -/* SUBJECT TO BIG MODIFICATIONS */ +/* SUBJECT TO MODIFICATIONS */ -// PaymentMetadata represents the header information for a blob -type PaymentMetadata struct { - // Existing fields - DataLength uint32 // length in number of symbols - QuorumNumbers []uint8 - AccountID string - - // New fields - BinIndex uint32 - // TODO: we are thinking the contract can use uint128 for cumulative payment, - // but the definition on v2 uses uint64. Double check with team. - CumulativePayment uint64 - - Signature []byte -} - -// EIP712Signer handles EIP-712 signing operations +// EIP712Signer handles EIP-712 domain specific signing operations over typed and structured data type EIP712Signer struct { domain apitypes.TypedDataDomain types apitypes.Types @@ -64,7 +49,7 @@ func NewEIP712Signer(chainID *big.Int, verifyingContract common.Address) *EIP712 } // SignPaymentMetadata signs a PaymentMetadata using EIP-712 -func (s *EIP712Signer) SignPaymentMetadata(header *PaymentMetadata, privateKey *ecdsa.PrivateKey) ([]byte, error) { +func (s *EIP712Signer) SignPaymentMetadata(header *core.PaymentMetadata, privateKey *ecdsa.PrivateKey) ([]byte, error) { typedData := apitypes.TypedData{ Types: s.types, PrimaryType: "PaymentMetadata", @@ -80,7 +65,7 @@ func (s *EIP712Signer) SignPaymentMetadata(header *PaymentMetadata, privateKey * signature, err := s.signTypedData(typedData, privateKey) if err != nil { - return nil, fmt.Errorf("error signing blob header: %v", err) + return nil, fmt.Errorf("error signing payment metadata (header): %v", err) } return signature, nil @@ -95,7 +80,7 @@ func convertUint8SliceToMap(params []uint8) []string { } // RecoverSender recovers the sender's address from a signed PaymentMetadata -func (s *EIP712Signer) RecoverSender(header *PaymentMetadata) (common.Address, error) { +func (s *EIP712Signer) RecoverSender(header *core.PaymentMetadata) (common.Address, error) { typedData := apitypes.TypedData{ Types: s.types, PrimaryType: "PaymentMetadata", @@ -164,9 +149,9 @@ func ConstructPaymentMetadata( dataLength uint32, quorumNumbers []uint8, privateKey *ecdsa.PrivateKey, -) (*PaymentMetadata, error) { +) (*core.PaymentMetadata, error) { accountID := crypto.PubkeyToAddress(privateKey.PublicKey).Hex() - header := &PaymentMetadata{ + header := &core.PaymentMetadata{ AccountID: accountID, BinIndex: binIndex, CumulativePayment: cumulativePayment, @@ -176,7 +161,7 @@ func ConstructPaymentMetadata( signature, err := signer.SignPaymentMetadata(header, privateKey) if err != nil { - return nil, fmt.Errorf("error signing blob header: %v", err) + return nil, fmt.Errorf("error signing payment metadata (header): %v", err) } header.Signature = signature diff --git a/core/meterer/types_test.go b/core/auth/payment_metadata_test.go similarity index 85% rename from core/meterer/types_test.go rename to core/auth/payment_metadata_test.go index 0712b108e6..8e2001f3a7 100644 --- a/core/meterer/types_test.go +++ b/core/auth/payment_metadata_test.go @@ -1,10 +1,11 @@ -package meterer_test +package auth_test import ( "math/big" "testing" - "github.com/Layr-Labs/eigenda/core/meterer" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/auth" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/assert" @@ -14,12 +15,12 @@ import ( func TestEIP712Signer(t *testing.T) { chainID := big.NewInt(17000) verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer := meterer.NewEIP712Signer(chainID, verifyingContract) + signer := auth.NewEIP712Signer(chainID, verifyingContract) privateKey, err := crypto.GenerateKey() require.NoError(t, err) - header := &meterer.PaymentMetadata{ + header := &core.PaymentMetadata{ BinIndex: 0, CumulativePayment: 1000, DataLength: 1024, @@ -48,12 +49,12 @@ func TestEIP712Signer(t *testing.T) { func TestConstructPaymentMetadata(t *testing.T) { chainID := big.NewInt(17000) verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer := meterer.NewEIP712Signer(chainID, verifyingContract) + signer := auth.NewEIP712Signer(chainID, verifyingContract) privateKey, err := crypto.GenerateKey() require.NoError(t, err) - header, err := meterer.ConstructPaymentMetadata( + header, err := auth.ConstructPaymentMetadata( signer, 0, // binIndex 1000, // cumulativePayment @@ -76,7 +77,7 @@ func TestConstructPaymentMetadata(t *testing.T) { func TestEIP712SignerWithDifferentKeys(t *testing.T) { chainID := big.NewInt(17000) verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer := meterer.NewEIP712Signer(chainID, verifyingContract) + signer := auth.NewEIP712Signer(chainID, verifyingContract) privateKey1, err := crypto.GenerateKey() require.NoError(t, err) @@ -84,7 +85,7 @@ func TestEIP712SignerWithDifferentKeys(t *testing.T) { privateKey2, err := crypto.GenerateKey() require.NoError(t, err) - header, err := meterer.ConstructPaymentMetadata( + header, err := auth.ConstructPaymentMetadata( signer, 0, 1000, @@ -110,12 +111,12 @@ func TestEIP712SignerWithDifferentKeys(t *testing.T) { func TestEIP712SignerWithModifiedHeader(t *testing.T) { chainID := big.NewInt(17000) verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer := meterer.NewEIP712Signer(chainID, verifyingContract) + signer := auth.NewEIP712Signer(chainID, verifyingContract) privateKey, err := crypto.GenerateKey() require.NoError(t, err) - header, err := meterer.ConstructPaymentMetadata( + header, err := auth.ConstructPaymentMetadata( signer, 0, 1000, @@ -143,13 +144,13 @@ func TestEIP712SignerWithDifferentChainID(t *testing.T) { chainID1 := big.NewInt(17000) chainID2 := big.NewInt(17001) verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer1 := meterer.NewEIP712Signer(chainID1, verifyingContract) - signer2 := meterer.NewEIP712Signer(chainID2, verifyingContract) + signer1 := auth.NewEIP712Signer(chainID1, verifyingContract) + signer2 := auth.NewEIP712Signer(chainID2, verifyingContract) privateKey, err := crypto.GenerateKey() require.NoError(t, err) - header, err := meterer.ConstructPaymentMetadata( + header, err := auth.ConstructPaymentMetadata( signer1, 0, 1000, diff --git a/core/data.go b/core/data.go index 879e2b4eb2..0a4106b74a 100644 --- a/core/data.go +++ b/core/data.go @@ -470,3 +470,19 @@ func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) { } return c, nil } + +// PaymentMetadata represents the header information for a blob +type PaymentMetadata struct { + // Existing fields + DataLength uint32 // length in number of symbols + QuorumNumbers []uint8 + AccountID string + + // New fields + BinIndex uint32 + // TODO: we are thinking the contract can use uint128 for cumulative payment, + // but the definition on v2 uses uint64. Double check with team. + CumulativePayment uint64 + + Signature []byte +} diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 5a11241c88..791e9c8dde 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -1,6 +1,8 @@ package meterer import ( + "time" + "github.com/Layr-Labs/eigensdk-go/logging" ) @@ -12,6 +14,8 @@ type Config struct { MinChargeableSize uint32 // Minimum size of a chargeable unit in bytes, used as a floor for on-demand payments PricePerChargeable uint32 // Price per chargeable unit in gwei, used for on-demand payments ReservationWindow uint32 // Duration of all reservations in seconds, used to calculate bin indices + + ChainReadTimeout time.Duration // Timeout for reading payment state from chain } // disperser API server will receive requests from clients. these requests will be with a blobHeader with payments information (CumulativePayments, BinIndex, and Signature) diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index 3981f39d58..2760478a1b 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -10,6 +10,7 @@ import ( commonaws "github.com/Layr-Labs/eigenda/common/aws" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) @@ -140,7 +141,7 @@ func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, si return binUsageValue, nil } -func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, paymentMetadata PaymentMetadata, symbolsCharged uint32) error { +func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, paymentMetadata core.PaymentMetadata, symbolsCharged uint32) error { result, err := s.dynamoClient.GetItem(ctx, s.onDemandTableName, commondynamodb.Item{ "AccountID": &types.AttributeValueMemberS{Value: paymentMetadata.AccountID}, diff --git a/core/meterer/onchain_state.go b/core/meterer/onchain_state.go index 11e524f932..88e0e4f5f5 100644 --- a/core/meterer/onchain_state.go +++ b/core/meterer/onchain_state.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "math" - "time" "github.com/Layr-Labs/eigenda/core/eth" ) @@ -53,16 +52,9 @@ type OnDemandPayments struct { Payments map[string]*OnDemandPayment } -// TimeoutConfig contains the timeout configurations for the chain state -type TimeoutConfig struct { - ChainReadTimeout time.Duration - ChainStateTimeout time.Duration -} - // OnchainPaymentState is an interface for getting information about the current chain state for payments. type OnchainPaymentState struct { - tx *eth.Transactor - TimeoutConfig TimeoutConfig + tx *eth.Transactor ActiveReservations *ActiveReservations OnDemandPayments *OnDemandPayments From 7ce96d5a6791b7caf08a9b512f642e90975168b4 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 9 Oct 2024 09:49:20 -0700 Subject: [PATCH 06/15] chore: use Mock for on-chain payments in tests --- core/data.go | 17 ++++ core/eth/tx.go | 10 ++ core/meterer/meterer.go | 8 +- core/meterer/meterer_test.go | 151 +++++++++++++++++++++++++++++ core/meterer/offchain_store.go | 14 +-- core/meterer/onchain_state.go | 119 ++++++++++------------- core/meterer/onchain_state_test.go | 148 +++++++++++++--------------- core/mock/onchain_state.go | 70 +++++++++++++ core/mock/tx.go | 12 +++ core/tx.go | 6 ++ 10 files changed, 396 insertions(+), 159 deletions(-) create mode 100644 core/meterer/meterer_test.go create mode 100644 core/mock/onchain_state.go diff --git a/core/data.go b/core/data.go index 0a4106b74a..5403282088 100644 --- a/core/data.go +++ b/core/data.go @@ -486,3 +486,20 @@ type PaymentMetadata struct { Signature []byte } + +type TokenAmount uint64 // TODO: change to uint128 + +// OperatorInfo contains information about an operator which is stored on the blockchain state, +// corresponding to a particular quorum +type ActiveReservation struct { + DataRate uint64 // Bandwidth per reservation bin + StartTimestamp uint64 // Unix timestamp that's valid for basically eternity + EndTimestamp uint64 + + QuorumNumbers []uint8 + QuorumSplit []byte // ordered mapping of quorum number to payment split; on-chain validation should ensure split <= 100 +} + +type OnDemandPayment struct { + CumulativePayment TokenAmount // Total amount deposited by the user +} diff --git a/core/eth/tx.go b/core/eth/tx.go index e5eca92b93..ccf85c91dd 100644 --- a/core/eth/tx.go +++ b/core/eth/tx.go @@ -763,6 +763,16 @@ func (t *Transactor) GetRequiredQuorumNumbers(ctx context.Context, blockNumber u return requiredQuorums, nil } +func (t *Transactor) GetActiveReservations(ctx context.Context, blockNumber uint32) (map[string]core.ActiveReservation, error) { + // contract is not implemented yet + return map[string]core.ActiveReservation{}, nil +} + +func (t *Transactor) GetOnDemandPayments(ctx context.Context, blockNumber uint32) (map[string]core.OnDemandPayment, error) { + // contract is not implemented yet + return map[string]core.OnDemandPayment{}, nil +} + func (t *Transactor) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDAServiceManagerAddr gethcommon.Address) error { contractEigenDAServiceManager, err := eigendasrvmg.NewContractEigenDAServiceManager(eigenDAServiceManagerAddr, t.EthClient) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 791e9c8dde..45931079f9 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -34,16 +34,16 @@ var OnDemandQuorumNumbers = []uint8{0, 1} type Meterer struct { Config - ChainState *OnchainPaymentState - OffchainStore *OffchainStore + ChainState OnchainPayment + OffchainStore OffchainStore logger logging.Logger } func NewMeterer( config Config, - paymentChainState *OnchainPaymentState, - offchainStore *OffchainStore, + paymentChainState OnchainPayment, + offchainStore OffchainStore, logger logging.Logger, ) (*Meterer, error) { // TODO: create a separate thread to pull from the chain and update chain state diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go new file mode 100644 index 0000000000..9e68d1da80 --- /dev/null +++ b/core/meterer/meterer_test.go @@ -0,0 +1,151 @@ +package meterer_test + +import ( + "crypto/ecdsa" + "fmt" + "math/big" + "os" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/common" + commonaws "github.com/Layr-Labs/eigenda/common/aws" + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/core/meterer" + "github.com/Layr-Labs/eigenda/core/mock" + "github.com/Layr-Labs/eigenda/inabox/deploy" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ory/dockertest/v3" + + "github.com/Layr-Labs/eigensdk-go/logging" +) + +var ( + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + dynamoClient *commondynamodb.Client + clientConfig commonaws.ClientConfig + privateKey1 *ecdsa.PrivateKey + privateKey2 *ecdsa.PrivateKey + signer *auth.EIP712Signer + mt *meterer.Meterer + + deployLocalStack bool + localStackPort = "4566" + paymentChainState = &mock.MockOnchainPaymentState{} +) + +func TestMain(m *testing.M) { + setup(m) + code := m.Run() + teardown() + os.Exit(code) +} + +// // Mock data initialization method +// func InitializeMockPayments(pcs *meterer.OnchainPaymentState, privateKey1 *ecdsa.PrivateKey, privateKey2 *ecdsa.PrivateKey) { +// // Initialize mock active reservations +// now := uint64(time.Now().Unix()) +// pcs.ActiveReservations = map[string]core.ActiveReservation{ +// crypto.PubkeyToAddress(privateKey1.PublicKey).Hex(): {DataRate: 100, StartTimestamp: now + 1200, EndTimestamp: now + 1800, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}}, +// crypto.PubkeyToAddress(privateKey2.PublicKey).Hex(): {DataRate: 200, StartTimestamp: now - 120, EndTimestamp: now + 180, QuorumSplit: []byte{30, 70}, QuorumNumbers: []uint8{0, 1}}, +// } +// pcs.OnDemandPayments = map[string]core.OnDemandPayment{ +// crypto.PubkeyToAddress(privateKey1.PublicKey).Hex(): {CumulativePayment: 1500}, +// crypto.PubkeyToAddress(privateKey2.PublicKey).Hex(): {CumulativePayment: 1000}, +// } +// } + +func setup(_ *testing.M) { + + deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + if !deployLocalStack { + localStackPort = os.Getenv("LOCALSTACK_PORT") + } + + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) + if err != nil { + teardown() + panic("failed to start localstack container") + } + } + + loggerConfig := common.DefaultLoggerConfig() + logger, err := common.NewLogger(loggerConfig) + if err != nil { + teardown() + panic("failed to create logger") + } + + clientConfig = commonaws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort), + } + + dynamoClient, err = commondynamodb.NewClient(clientConfig, logger) + if err != nil { + teardown() + panic("failed to create dynamodb client") + } + + chainID := big.NewInt(17000) + verifyingContract := gethcommon.HexToAddress("0x1234000000000000000000000000000000000000") + signer = auth.NewEIP712Signer(chainID, verifyingContract) + + privateKey1, err = crypto.GenerateKey() + privateKey2, err = crypto.GenerateKey() + + logger = logging.NewNoopLogger() + config := meterer.Config{ + PricePerChargeable: 1, + MinChargeableSize: 1, + GlobalBytesPerSecond: 1000, + ReservationWindow: 60, + ChainReadTimeout: 3 * time.Second, + } + + clientConfig := commonaws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:4566"), + } + + store, err := meterer.NewOffchainStore( + clientConfig, + "reservations", + "ondemand", + "global", + logger, + ) + if err != nil { + teardown() + panic("failed to create offchain store") + } + + // add some default sensible configs + mt, err = meterer.NewMeterer( + config, + paymentChainState, + store, + logging.NewNoopLogger(), + // metrics.NewNoopMetrics(), + ) + + if err != nil { + teardown() + panic("failed to create meterer") + } +} + +func teardown() { + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +} diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index 2760478a1b..95b29a9243 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -30,34 +30,34 @@ func NewOffchainStore( onDemandTableName string, globalBinTableName string, logger logging.Logger, -) (*OffchainStore, error) { +) (OffchainStore, error) { dynamoClient, err := commondynamodb.NewClient(cfg, logger) if err != nil { - return nil, err + return OffchainStore{}, err } if reservationTableName == "" || onDemandTableName == "" || globalBinTableName == "" { - return nil, fmt.Errorf("table names cannot be empty") + return OffchainStore{}, fmt.Errorf("table names cannot be empty") } err = CreateReservationTable(cfg, reservationTableName) if err != nil && !strings.Contains(err.Error(), "Table already exists") { fmt.Println("Error creating reservation table:", err) - return nil, err + return OffchainStore{}, err } err = CreateGlobalReservationTable(cfg, globalBinTableName) if err != nil && !strings.Contains(err.Error(), "Table already exists") { fmt.Println("Error creating global bin table:", err) - return nil, err + return OffchainStore{}, err } err = CreateOnDemandTable(cfg, onDemandTableName) if err != nil && !strings.Contains(err.Error(), "Table already exists") { fmt.Println("Error creating on-demand table:", err) - return nil, err + return OffchainStore{}, err } //TODO: add a separate thread to periodically clean up the tables // delete expired reservation bins ( current timestamp) + GetActiveReservations(ctx context.Context, blockNumber uint32) (map[string]ActiveReservation, error) + + // GetOnDemandPayments returns all on-demand payments + GetOnDemandPayments(ctx context.Context, blockNumber uint32) (map[string]OnDemandPayment, error) } From c487395850eb9edabae3ba0b6beb3df684b7874b Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 9 Oct 2024 10:14:39 -0700 Subject: [PATCH 07/15] refactor: CreateTable's -> TableCheck for existance --- common/aws/dynamodb/client.go | 15 +++++++++++++++ core/meterer/offchain_store.go | 19 ++++++------------- core/meterer/util.go | 7 ------- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 4b013cc415..5c62089520 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -2,6 +2,7 @@ package dynamodb import ( "context" + "errors" "fmt" "math" "strconv" @@ -408,3 +409,17 @@ func (c *Client) readItems(ctx context.Context, tableName string, keys []Key) ([ return items, nil } + +// TableCheck checks if a table exists and can be described +func (c *Client) TableCheck(ctx context.Context, name string) error { + if name == "" { + return errors.New("table name is empty") + } + _, err := c.dynamoClient.DescribeTable(ctx, &dynamodb.DescribeTableInput{ + TableName: aws.String(name), + }) + if err != nil { + return err + } + return nil +} diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index 95b29a9243..8f58c4a197 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "strconv" - "strings" "time" commonaws "github.com/Layr-Labs/eigenda/common/aws" @@ -36,23 +35,17 @@ func NewOffchainStore( if err != nil { return OffchainStore{}, err } - if reservationTableName == "" || onDemandTableName == "" || globalBinTableName == "" { - return OffchainStore{}, fmt.Errorf("table names cannot be empty") - } - err = CreateReservationTable(cfg, reservationTableName) - if err != nil && !strings.Contains(err.Error(), "Table already exists") { - fmt.Println("Error creating reservation table:", err) + err = dynamoClient.TableCheck(context.Background(), reservationTableName) + if err != nil { return OffchainStore{}, err } - err = CreateGlobalReservationTable(cfg, globalBinTableName) - if err != nil && !strings.Contains(err.Error(), "Table already exists") { - fmt.Println("Error creating global bin table:", err) + err = dynamoClient.TableCheck(context.Background(), onDemandTableName) + if err != nil { return OffchainStore{}, err } - err = CreateOnDemandTable(cfg, onDemandTableName) - if err != nil && !strings.Contains(err.Error(), "Table already exists") { - fmt.Println("Error creating on-demand table:", err) + err = dynamoClient.TableCheck(context.Background(), globalBinTableName) + if err != nil { return OffchainStore{}, err } //TODO: add a separate thread to periodically clean up the tables diff --git a/core/meterer/util.go b/core/meterer/util.go index 475567b39d..b98f522e5f 100644 --- a/core/meterer/util.go +++ b/core/meterer/util.go @@ -2,21 +2,14 @@ package meterer import ( "context" - "math/big" commonaws "github.com/Layr-Labs/eigenda/common/aws" test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" - "github.com/Layr-Labs/eigenda/core" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) -func DummyCommitment() core.G1Point { - commitment := core.NewG1Point(big.NewInt(123), big.NewInt(456)) - return *commitment -} - func CreateReservationTable(clientConfig commonaws.ClientConfig, tableName string) error { ctx := context.Background() _, err := test_utils.CreateTable(ctx, clientConfig, tableName, &dynamodb.CreateTableInput{ From be254b8594b5a9ffd6ed3dd866a6ccf06f03edfe Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 9 Oct 2024 10:30:46 -0700 Subject: [PATCH 08/15] refactor: remove hardcoded quorumNumbers for on-demand --- core/meterer/meterer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 45931079f9..0704f3bcc5 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -29,7 +29,6 @@ type Config struct { // // If the payment is valid, the meterer will add the blob header to its state and return a success response to the disperser API server. // if any of the checks fail, the meterer will return a failure response to the disperser API server. -var OnDemandQuorumNumbers = []uint8{0, 1} type Meterer struct { Config From 99a7c98c6362465bfe2427804c802065e3a0c9e2 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 9 Oct 2024 14:47:29 -0700 Subject: [PATCH 09/15] fix: ExpresseionValues -> ExpressionValues typo --- core/meterer/offchain_store.go | 4 ++-- core/meterer/offchain_store_test.go | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index 8f58c4a197..52df410d06 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -183,7 +183,7 @@ func (s *OffchainStore) GetRelevantOnDemandRecords(ctx context.Context, accountI // Fetch the largest entry smaller than the given cumulativePayment smallerResult, err := s.dynamoClient.QueryIndexOrderWithLimit(ctx, s.onDemandTableName, "AccountIDIndex", "AccountID = :account AND CumulativePayments < :cumulativePayment", - commondynamodb.ExpresseionValues{ + commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{Value: accountID}, ":cumulativePayment": &types.AttributeValueMemberN{Value: strconv.FormatUint(cumulativePayment, 10)}, }, @@ -205,7 +205,7 @@ func (s *OffchainStore) GetRelevantOnDemandRecords(ctx context.Context, accountI // Fetch the smallest entry larger than the given cumulativePayment largerResult, err := s.dynamoClient.QueryIndexOrderWithLimit(ctx, s.onDemandTableName, "AccountIDIndex", "AccountID = :account AND CumulativePayments > :cumulativePayment", - commondynamodb.ExpresseionValues{ + commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{Value: accountID}, ":cumulativePayment": &types.AttributeValueMemberN{Value: strconv.FormatUint(cumulativePayment, 10)}, }, diff --git a/core/meterer/offchain_store_test.go b/core/meterer/offchain_store_test.go index 6978a3ca80..8de9dd7469 100644 --- a/core/meterer/offchain_store_test.go +++ b/core/meterer/offchain_store_test.go @@ -39,7 +39,7 @@ func TestReservationBinsBasicOperations(t *testing.T) { assert.Equal(t, "1", item["BinIndex"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", item["BinUsage"].(*types.AttributeValueMemberN).Value) - items, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpresseionValues{ + items, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{Value: "account1"}, }) assert.NoError(t, err) @@ -74,7 +74,7 @@ func TestReservationBinsBasicOperations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "2000", item["BinUsage"].(*types.AttributeValueMemberN).Value) - items, err = dynamoClient.Query(ctx, tableName, "AccountID = :account", commondynamodb.ExpresseionValues{ + items, err = dynamoClient.Query(ctx, tableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{Value: "account1"}, }) assert.NoError(t, err) @@ -111,7 +111,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.NoError(t, err) assert.Len(t, unprocessed, 0) - queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "1", }}) @@ -120,7 +120,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Equal(t, "1", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "1", }}) @@ -129,7 +129,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Equal(t, "1", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "32", }}) @@ -152,7 +152,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { ) assert.NoError(t, err) - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "1", }}) @@ -161,7 +161,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Equal(t, "1", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "2000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpresseionValues{ + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "2", }}) @@ -210,7 +210,7 @@ func TestOnDemandUsageBasicOperations(t *testing.T) { assert.Equal(t, "1", item["CumulativePayments"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", item["DataLength"].(*types.AttributeValueMemberN).Value) - queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpresseionValues{ + queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{ Value: "account1", }}) @@ -220,7 +220,7 @@ func TestOnDemandUsageBasicOperations(t *testing.T) { cumulativePayments, _ := strconv.Atoi(item["CumulativePayments"].(*types.AttributeValueMemberN).Value) assert.Equal(t, fmt.Sprintf("%d", cumulativePayments*1000), item["DataLength"].(*types.AttributeValueMemberN).Value) } - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account_id", commondynamodb.ExpresseionValues{ + queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account_id", commondynamodb.ExpressionValues{ ":account_id": &types.AttributeValueMemberS{ Value: fmt.Sprintf("account%d", numItems/repetitions+1), }}) From 785468757c65d642363278e7ca67bbd2e8867793 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 9 Oct 2024 14:57:39 -0700 Subject: [PATCH 10/15] fix: db creation in tests, mock filename --- core/meterer/meterer_test.go | 19 +++++-------------- .../{onchain_state.go => payment_state.go} | 0 2 files changed, 5 insertions(+), 14 deletions(-) rename core/mock/{onchain_state.go => payment_state.go} (100%) diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index 9e68d1da80..758e893116 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -44,20 +44,6 @@ func TestMain(m *testing.M) { os.Exit(code) } -// // Mock data initialization method -// func InitializeMockPayments(pcs *meterer.OnchainPaymentState, privateKey1 *ecdsa.PrivateKey, privateKey2 *ecdsa.PrivateKey) { -// // Initialize mock active reservations -// now := uint64(time.Now().Unix()) -// pcs.ActiveReservations = map[string]core.ActiveReservation{ -// crypto.PubkeyToAddress(privateKey1.PublicKey).Hex(): {DataRate: 100, StartTimestamp: now + 1200, EndTimestamp: now + 1800, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}}, -// crypto.PubkeyToAddress(privateKey2.PublicKey).Hex(): {DataRate: 200, StartTimestamp: now - 120, EndTimestamp: now + 180, QuorumSplit: []byte{30, 70}, QuorumNumbers: []uint8{0, 1}}, -// } -// pcs.OnDemandPayments = map[string]core.OnDemandPayment{ -// crypto.PubkeyToAddress(privateKey1.PublicKey).Hex(): {CumulativePayment: 1500}, -// crypto.PubkeyToAddress(privateKey2.PublicKey).Hex(): {CumulativePayment: 1000}, -// } -// } - func setup(_ *testing.M) { deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") @@ -117,6 +103,10 @@ func setup(_ *testing.M) { EndpointURL: fmt.Sprintf("http://0.0.0.0:4566"), } + meterer.CreateReservationTable(clientConfig, "reservations") + meterer.CreateOnDemandTable(clientConfig, "ondemand") + meterer.CreateGlobalReservationTable(clientConfig, "global") + store, err := meterer.NewOffchainStore( clientConfig, "reservations", @@ -124,6 +114,7 @@ func setup(_ *testing.M) { "global", logger, ) + if err != nil { teardown() panic("failed to create offchain store") diff --git a/core/mock/onchain_state.go b/core/mock/payment_state.go similarity index 100% rename from core/mock/onchain_state.go rename to core/mock/payment_state.go From 4f1e9319976515cb0f8a9b76b77b686ca2108c1e Mon Sep 17 00:00:00 2001 From: hopeyen Date: Thu, 10 Oct 2024 17:01:52 -0700 Subject: [PATCH 11/15] refactor: comments, db query fn and tests --- core/meterer/meterer.go | 38 +++++++++++++---------------- core/meterer/offchain_store_test.go | 16 ++++++++---- core/meterer/util.go | 2 -- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 0704f3bcc5..e0361d63e9 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -8,32 +8,28 @@ import ( // Config contains network parameters that should be published on-chain. We currently configure these params through disperser env vars. type Config struct { - // for rate limiting 2^64 ~= 18 exabytes per second; 2^32 ~= 4GB/s - // for payments 2^64 ~= 18M Eth; 2^32 ~= 4ETH - GlobalBytesPerSecond uint64 // Global rate limit in bytes per second for on-demand payments - MinChargeableSize uint32 // Minimum size of a chargeable unit in bytes, used as a floor for on-demand payments - PricePerChargeable uint32 // Price per chargeable unit in gwei, used for on-demand payments - ReservationWindow uint32 // Duration of all reservations in seconds, used to calculate bin indices - - ChainReadTimeout time.Duration // Timeout for reading payment state from chain + // GlobalBytesPerSecond is the rate limit in bytes per second for on-demand payments + GlobalBytesPerSecond uint64 + // MinChargeableSize is the minimum size of a chargeable unit in bytes, used as a floor for on-demand payments + MinChargeableSize uint32 + // PricePerChargeable is the price per chargeable unit in gwei, used for on-demand payments + PricePerChargeable uint32 + // ReservationWindow is the duration of all reservations in seconds, used to calculate bin indices + ReservationWindow uint32 + + // ChainReadTimeout is the timeout for reading payment state from chain + ChainReadTimeout time.Duration } -// disperser API server will receive requests from clients. these requests will be with a blobHeader with payments information (CumulativePayments, BinIndex, and Signature) -// Disperser will pass the blob header to the meterer, which will check if the payments information is valid. if it is, it will be added to the meterer's state. -// To check if the payment is valid, the meterer will: -// 1. check if the signature is valid -// (against the CumulativePayments and BinIndex fields ; -// maybe need something else to secure against using this appraoch for reservations when rev request comes in same bin interval; say that nonce is signed over as well) -// 2. For reservations, check offchain bin state as demonstrated in pseudocode, also check onchain state before rejecting (since onchain data is pulled) -// 3. For on-demand, check against payments and the global rates, similar to the reservation case -// -// If the payment is valid, the meterer will add the blob header to its state and return a success response to the disperser API server. -// if any of the checks fail, the meterer will return a failure response to the disperser API server. - +// Meterer handles payment accounting across different accounts. Disperser API server receives requests from clients and each request contains a blob header +// with payments information (CumulativePayments, BinIndex, and Signature). Disperser will pass the blob header to the meterer, which will check if the +// payments information is valid. type Meterer struct { Config - ChainState OnchainPayment + // ChainState reads on-chain payment state periodically and cache it in memory + ChainState OnchainPayment + // OffchainStore uses DynamoDB to track metering and used to validate requests OffchainStore OffchainStore logger logging.Logger diff --git a/core/meterer/offchain_store_test.go b/core/meterer/offchain_store_test.go index 8de9dd7469..8df103b1f5 100644 --- a/core/meterer/offchain_store_test.go +++ b/core/meterer/offchain_store_test.go @@ -16,7 +16,6 @@ import ( func TestReservationBinsBasicOperations(t *testing.T) { tableName := "reservations" meterer.CreateReservationTable(clientConfig, tableName) - indexName := "AccountIDIndex" ctx := context.Background() err := dynamoClient.PutItem(ctx, tableName, @@ -39,7 +38,8 @@ func TestReservationBinsBasicOperations(t *testing.T) { assert.Equal(t, "1", item["BinIndex"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", item["BinUsage"].(*types.AttributeValueMemberN).Value) - items, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpressionValues{ + // items, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpressionValues{ + items, err := dynamoClient.Query(ctx, tableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{Value: "account1"}, }) assert.NoError(t, err) @@ -112,6 +112,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Len(t, unprocessed, 0) queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ + // queryResult, err := dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "1", }}) @@ -121,6 +122,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Equal(t, "1000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ + // queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "1", }}) @@ -130,6 +132,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Equal(t, "1000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ + // queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "32", }}) @@ -153,6 +156,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.NoError(t, err) queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ + // queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "1", }}) @@ -162,6 +166,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Equal(t, "2000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ + // queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "2", }}) @@ -174,7 +179,6 @@ func TestGlobalBinsBasicOperations(t *testing.T) { func TestOnDemandUsageBasicOperations(t *testing.T) { tableName := "ondemand" meterer.CreateOnDemandTable(clientConfig, tableName) - indexName := "AccountIDIndex" ctx := context.Background() @@ -210,7 +214,8 @@ func TestOnDemandUsageBasicOperations(t *testing.T) { assert.Equal(t, "1", item["CumulativePayments"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", item["DataLength"].(*types.AttributeValueMemberN).Value) - queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpressionValues{ + // queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpressionValues{ + queryResult, err := dynamoClient.Query(ctx, tableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{ Value: "account1", }}) @@ -220,7 +225,8 @@ func TestOnDemandUsageBasicOperations(t *testing.T) { cumulativePayments, _ := strconv.Atoi(item["CumulativePayments"].(*types.AttributeValueMemberN).Value) assert.Equal(t, fmt.Sprintf("%d", cumulativePayments*1000), item["DataLength"].(*types.AttributeValueMemberN).Value) } - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account_id", commondynamodb.ExpressionValues{ + // queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account_id", commondynamodb.ExpressionValues{ + queryResult, err = dynamoClient.Query(ctx, tableName, "AccountID = :account_id", commondynamodb.ExpressionValues{ ":account_id": &types.AttributeValueMemberS{ Value: fmt.Sprintf("account%d", numItems/repetitions+1), }}) diff --git a/core/meterer/util.go b/core/meterer/util.go index b98f522e5f..dd1f06ddfd 100644 --- a/core/meterer/util.go +++ b/core/meterer/util.go @@ -35,7 +35,6 @@ func CreateReservationTable(clientConfig commonaws.ClientConfig, tableName strin }, GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ { - IndexName: aws.String("AccountIDIndex"), KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("AccountID"), @@ -127,7 +126,6 @@ func CreateOnDemandTable(clientConfig commonaws.ClientConfig, tableName string) }, GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ { - IndexName: aws.String("AccountIDIndex"), KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String("AccountID"), From fab84e38d27c61d6c0cb66c77d25f6a9cfc0dc99 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Thu, 10 Oct 2024 17:24:14 -0700 Subject: [PATCH 12/15] fix: lints and db/signer tests --- core/auth/payment_metadata.go | 1 - core/auth/payment_metadata_test.go | 2 ++ core/meterer/meterer_test.go | 31 +++++++++++++++------- core/meterer/offchain_store_test.go | 40 ++++++++++++----------------- core/meterer/util.go | 38 --------------------------- 5 files changed, 41 insertions(+), 71 deletions(-) diff --git a/core/auth/payment_metadata.go b/core/auth/payment_metadata.go index 7eeb1704d7..c43b8a3e53 100644 --- a/core/auth/payment_metadata.go +++ b/core/auth/payment_metadata.go @@ -40,7 +40,6 @@ func NewEIP712Signer(chainID *big.Int, verifyingContract common.Address) *EIP712 {Name: "accountID", Type: "string"}, {Name: "binIndex", Type: "uint32"}, {Name: "cumulativePayment", Type: "uint64"}, - {Name: "commitment", Type: "bytes"}, {Name: "dataLength", Type: "uint32"}, {Name: "quorumNumbers", Type: "uint8[]"}, }, diff --git a/core/auth/payment_metadata_test.go b/core/auth/payment_metadata_test.go index 8e2001f3a7..e9de77024d 100644 --- a/core/auth/payment_metadata_test.go +++ b/core/auth/payment_metadata_test.go @@ -137,6 +137,7 @@ func TestEIP712SignerWithModifiedHeader(t *testing.T) { header.AccountID = "modifiedAccount" addr, err := signer.RecoverSender(header) + require.NoError(t, err) require.NotEqual(t, expectedAddress, addr) } @@ -165,6 +166,7 @@ func TestEIP712SignerWithDifferentChainID(t *testing.T) { // Try to recover the sender using a signer with a different chain ID sender, err := signer2.RecoverSender(header) + require.NoError(t, err) expectedAddress := crypto.PubkeyToAddress(privateKey.PublicKey) require.NotEqual(t, expectedAddress, sender) diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index 758e893116..97e8e96aea 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -85,7 +85,15 @@ func setup(_ *testing.M) { signer = auth.NewEIP712Signer(chainID, verifyingContract) privateKey1, err = crypto.GenerateKey() + if err != nil { + teardown() + panic("failed to generate private key") + } privateKey2, err = crypto.GenerateKey() + if err != nil { + teardown() + panic("failed to generate private key") + } logger = logging.NewNoopLogger() config := meterer.Config{ @@ -96,16 +104,21 @@ func setup(_ *testing.M) { ChainReadTimeout: 3 * time.Second, } - clientConfig := commonaws.ClientConfig{ - Region: "us-east-1", - AccessKey: "localstack", - SecretAccessKey: "localstack", - EndpointURL: fmt.Sprintf("http://0.0.0.0:4566"), + err = meterer.CreateReservationTable(clientConfig, "reservations") + if err != nil { + teardown() + panic("failed to create reservation table") + } + err = meterer.CreateOnDemandTable(clientConfig, "ondemand") + if err != nil { + teardown() + panic("failed to create ondemand table") + } + err = meterer.CreateGlobalReservationTable(clientConfig, "global") + if err != nil { + teardown() + panic("failed to create global reservation table") } - - meterer.CreateReservationTable(clientConfig, "reservations") - meterer.CreateOnDemandTable(clientConfig, "ondemand") - meterer.CreateGlobalReservationTable(clientConfig, "global") store, err := meterer.NewOffchainStore( clientConfig, diff --git a/core/meterer/offchain_store_test.go b/core/meterer/offchain_store_test.go index 8df103b1f5..1c3351764b 100644 --- a/core/meterer/offchain_store_test.go +++ b/core/meterer/offchain_store_test.go @@ -14,11 +14,12 @@ import ( ) func TestReservationBinsBasicOperations(t *testing.T) { - tableName := "reservations" - meterer.CreateReservationTable(clientConfig, tableName) + tableName := "reservations_test_basic" + err := meterer.CreateReservationTable(clientConfig, tableName) + assert.NoError(t, err) ctx := context.Background() - err := dynamoClient.PutItem(ctx, tableName, + err = dynamoClient.PutItem(ctx, tableName, commondynamodb.Item{ "AccountID": &types.AttributeValueMemberS{Value: "account1"}, "BinIndex": &types.AttributeValueMemberN{Value: "1"}, @@ -38,14 +39,13 @@ func TestReservationBinsBasicOperations(t *testing.T) { assert.Equal(t, "1", item["BinIndex"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", item["BinUsage"].(*types.AttributeValueMemberN).Value) - // items, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpressionValues{ items, err := dynamoClient.Query(ctx, tableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{Value: "account1"}, }) assert.NoError(t, err) assert.Len(t, items, 1) - item, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ + _, err = dynamoClient.GetItem(ctx, tableName, commondynamodb.Key{ "AccountID": &types.AttributeValueMemberS{Value: "account2"}, }) assert.Error(t, err) @@ -93,9 +93,9 @@ func TestReservationBinsBasicOperations(t *testing.T) { } func TestGlobalBinsBasicOperations(t *testing.T) { - tableName := "global" - meterer.CreateGlobalReservationTable(clientConfig, tableName) - indexName := "BinIndexIndex" + tableName := "global_test_basic" + err := meterer.CreateGlobalReservationTable(clientConfig, tableName) + assert.NoError(t, err) ctx := context.Background() numItems := 30 @@ -111,8 +111,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.NoError(t, err) assert.Len(t, unprocessed, 0) - queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ - // queryResult, err := dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ + queryResult, err := dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "1", }}) @@ -121,8 +120,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Equal(t, "1", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ - // queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ + queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "1", }}) @@ -131,8 +129,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Equal(t, "1", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ - // queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ + queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "32", }}) @@ -155,8 +152,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { ) assert.NoError(t, err) - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ - // queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ + queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "1", }}) @@ -165,8 +161,7 @@ func TestGlobalBinsBasicOperations(t *testing.T) { assert.Equal(t, "1", queryResult[0]["BinIndex"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "2000", queryResult[0]["BinUsage"].(*types.AttributeValueMemberN).Value) - queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "BinIndex = :index", commondynamodb.ExpressionValues{ - // queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ + queryResult, err = dynamoClient.Query(ctx, tableName, "BinIndex = :index", commondynamodb.ExpressionValues{ ":index": &types.AttributeValueMemberN{ Value: "2", }}) @@ -177,12 +172,13 @@ func TestGlobalBinsBasicOperations(t *testing.T) { } func TestOnDemandUsageBasicOperations(t *testing.T) { - tableName := "ondemand" - meterer.CreateOnDemandTable(clientConfig, tableName) + tableName := "ondemand_test_basic" + err := meterer.CreateOnDemandTable(clientConfig, tableName) + assert.NoError(t, err) ctx := context.Background() - err := dynamoClient.PutItem(ctx, tableName, + err = dynamoClient.PutItem(ctx, tableName, commondynamodb.Item{ "AccountID": &types.AttributeValueMemberS{Value: "account1"}, "CumulativePayments": &types.AttributeValueMemberN{Value: "1"}, @@ -214,7 +210,6 @@ func TestOnDemandUsageBasicOperations(t *testing.T) { assert.Equal(t, "1", item["CumulativePayments"].(*types.AttributeValueMemberN).Value) assert.Equal(t, "1000", item["DataLength"].(*types.AttributeValueMemberN).Value) - // queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account", commondynamodb.ExpressionValues{ queryResult, err := dynamoClient.Query(ctx, tableName, "AccountID = :account", commondynamodb.ExpressionValues{ ":account": &types.AttributeValueMemberS{ Value: "account1", @@ -225,7 +220,6 @@ func TestOnDemandUsageBasicOperations(t *testing.T) { cumulativePayments, _ := strconv.Atoi(item["CumulativePayments"].(*types.AttributeValueMemberN).Value) assert.Equal(t, fmt.Sprintf("%d", cumulativePayments*1000), item["DataLength"].(*types.AttributeValueMemberN).Value) } - // queryResult, err = dynamoClient.QueryIndex(ctx, tableName, indexName, "AccountID = :account_id", commondynamodb.ExpressionValues{ queryResult, err = dynamoClient.Query(ctx, tableName, "AccountID = :account_id", commondynamodb.ExpressionValues{ ":account_id": &types.AttributeValueMemberS{ Value: fmt.Sprintf("account%d", numItems/repetitions+1), diff --git a/core/meterer/util.go b/core/meterer/util.go index dd1f06ddfd..92b7693705 100644 --- a/core/meterer/util.go +++ b/core/meterer/util.go @@ -33,23 +33,6 @@ func CreateReservationTable(clientConfig commonaws.ClientConfig, tableName strin KeyType: types.KeyTypeRange, }, }, - GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ - { - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("AccountID"), - KeyType: types.KeyTypeHash, - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, // ProjectionTypeAll means all attributes are projected into the index - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(10), - WriteCapacityUnits: aws.Int64(10), - }, - }, - }, TableName: aws.String(tableName), ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(10), @@ -124,27 +107,6 @@ func CreateOnDemandTable(clientConfig commonaws.ClientConfig, tableName string) KeyType: types.KeyTypeRange, }, }, - GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ - { - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("AccountID"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("CumulativePayments"), - KeyType: types.KeyTypeRange, // Sort key - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(10), - WriteCapacityUnits: aws.Int64(10), - }, - }, - }, TableName: aws.String(tableName), ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(10), From 4a50bf64efc4db3524589177e9fb074a32598e43 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Tue, 15 Oct 2024 13:16:57 -0700 Subject: [PATCH 13/15] refactor: payment metadata update --- core/auth/payment_metadata.go | 168 ---------------------------- core/auth/payment_metadata_test.go | 173 ----------------------------- core/data.go | 23 +++- core/meterer/meterer_test.go | 8 -- 4 files changed, 19 insertions(+), 353 deletions(-) delete mode 100644 core/auth/payment_metadata.go delete mode 100644 core/auth/payment_metadata_test.go diff --git a/core/auth/payment_metadata.go b/core/auth/payment_metadata.go deleted file mode 100644 index c43b8a3e53..0000000000 --- a/core/auth/payment_metadata.go +++ /dev/null @@ -1,168 +0,0 @@ -package auth - -import ( - "crypto/ecdsa" - "fmt" - "math/big" - - "github.com/Layr-Labs/eigenda/core" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/math" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/signer/core/apitypes" -) - -/* SUBJECT TO MODIFICATIONS */ - -// EIP712Signer handles EIP-712 domain specific signing operations over typed and structured data -type EIP712Signer struct { - domain apitypes.TypedDataDomain - types apitypes.Types -} - -// NewEIP712Signer creates a new EIP712Signer instance -func NewEIP712Signer(chainID *big.Int, verifyingContract common.Address) *EIP712Signer { - return &EIP712Signer{ - domain: apitypes.TypedDataDomain{ - Name: "EigenDA", - Version: "1", - ChainId: (*math.HexOrDecimal256)(chainID), - VerifyingContract: verifyingContract.Hex(), - }, - types: apitypes.Types{ - "EIP712Domain": []apitypes.Type{ - {Name: "name", Type: "string"}, - {Name: "version", Type: "string"}, - {Name: "chainId", Type: "uint256"}, - {Name: "verifyingContract", Type: "address"}, - }, - "PaymentMetadata": []apitypes.Type{ - {Name: "accountID", Type: "string"}, - {Name: "binIndex", Type: "uint32"}, - {Name: "cumulativePayment", Type: "uint64"}, - {Name: "dataLength", Type: "uint32"}, - {Name: "quorumNumbers", Type: "uint8[]"}, - }, - }, - } -} - -// SignPaymentMetadata signs a PaymentMetadata using EIP-712 -func (s *EIP712Signer) SignPaymentMetadata(header *core.PaymentMetadata, privateKey *ecdsa.PrivateKey) ([]byte, error) { - typedData := apitypes.TypedData{ - Types: s.types, - PrimaryType: "PaymentMetadata", - Domain: s.domain, - Message: apitypes.TypedDataMessage{ - "accountID": header.AccountID, - "binIndex": fmt.Sprintf("%d", header.BinIndex), - "cumulativePayment": fmt.Sprintf("%d", header.CumulativePayment), - "dataLength": fmt.Sprintf("%d", header.DataLength), - "quorumNumbers": convertUint8SliceToMap(header.QuorumNumbers), - }, - } - - signature, err := s.signTypedData(typedData, privateKey) - if err != nil { - return nil, fmt.Errorf("error signing payment metadata (header): %v", err) - } - - return signature, nil -} - -func convertUint8SliceToMap(params []uint8) []string { - result := make([]string, len(params)) - for i, param := range params { - result[i] = fmt.Sprintf("%d", param) // Converting uint32 to string - } - return result -} - -// RecoverSender recovers the sender's address from a signed PaymentMetadata -func (s *EIP712Signer) RecoverSender(header *core.PaymentMetadata) (common.Address, error) { - typedData := apitypes.TypedData{ - Types: s.types, - PrimaryType: "PaymentMetadata", - Domain: s.domain, - Message: apitypes.TypedDataMessage{ - "accountID": header.AccountID, - "binIndex": fmt.Sprintf("%d", header.BinIndex), - "cumulativePayment": fmt.Sprintf("%d", header.CumulativePayment), - "dataLength": fmt.Sprintf("%d", header.DataLength), - "quorumNumbers": convertUint8SliceToMap(header.QuorumNumbers), - }, - } - - return s.recoverTypedData(typedData, header.Signature) -} - -func (s *EIP712Signer) signTypedData(typedData apitypes.TypedData, privateKey *ecdsa.PrivateKey) ([]byte, error) { - domainSeparator, err := typedData.HashStruct("EIP712Domain", typedData.Domain.Map()) - if err != nil { - return nil, fmt.Errorf("error hashing EIP712Domain: %v", err) - } - - typedDataHash, err := typedData.HashStruct(typedData.PrimaryType, typedData.Message) - if err != nil { - return nil, fmt.Errorf("error hashing primary type: %v", err) - } - - rawData := []byte(fmt.Sprintf("\x19\x01%s%s", string(domainSeparator), string(typedDataHash))) - digest := crypto.Keccak256(rawData) - - signature, err := crypto.Sign(digest, privateKey) - if err != nil { - return nil, fmt.Errorf("error signing digest: %v", err) - } - - return signature, nil -} - -func (s *EIP712Signer) recoverTypedData(typedData apitypes.TypedData, signature []byte) (common.Address, error) { - domainSeparator, err := typedData.HashStruct("EIP712Domain", typedData.Domain.Map()) - if err != nil { - return common.Address{}, fmt.Errorf("error hashing EIP712Domain: %v", err) - } - - typedDataHash, err := typedData.HashStruct(typedData.PrimaryType, typedData.Message) - if err != nil { - return common.Address{}, fmt.Errorf("error hashing primary type: %v", err) - } - - rawData := []byte(fmt.Sprintf("\x19\x01%s%s", string(domainSeparator), string(typedDataHash))) - digest := crypto.Keccak256(rawData) - - pubKey, err := crypto.SigToPub(digest, signature) - if err != nil { - return common.Address{}, fmt.Errorf("error recovering public key: %v", err) - } - - return crypto.PubkeyToAddress(*pubKey), nil -} - -// ConstructPaymentMetadata creates a PaymentMetadata with a valid signature -func ConstructPaymentMetadata( - signer *EIP712Signer, - binIndex uint32, - cumulativePayment uint64, - dataLength uint32, - quorumNumbers []uint8, - privateKey *ecdsa.PrivateKey, -) (*core.PaymentMetadata, error) { - accountID := crypto.PubkeyToAddress(privateKey.PublicKey).Hex() - header := &core.PaymentMetadata{ - AccountID: accountID, - BinIndex: binIndex, - CumulativePayment: cumulativePayment, - QuorumNumbers: quorumNumbers, - DataLength: dataLength, - } - - signature, err := signer.SignPaymentMetadata(header, privateKey) - if err != nil { - return nil, fmt.Errorf("error signing payment metadata (header): %v", err) - } - - header.Signature = signature - return header, nil -} diff --git a/core/auth/payment_metadata_test.go b/core/auth/payment_metadata_test.go deleted file mode 100644 index e9de77024d..0000000000 --- a/core/auth/payment_metadata_test.go +++ /dev/null @@ -1,173 +0,0 @@ -package auth_test - -import ( - "math/big" - "testing" - - "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/core/auth" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestEIP712Signer(t *testing.T) { - chainID := big.NewInt(17000) - verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer := auth.NewEIP712Signer(chainID, verifyingContract) - - privateKey, err := crypto.GenerateKey() - require.NoError(t, err) - - header := &core.PaymentMetadata{ - BinIndex: 0, - CumulativePayment: 1000, - DataLength: 1024, - QuorumNumbers: []uint8{1}, - } - - t.Run("SignPaymentMetadata", func(t *testing.T) { - signature, err := signer.SignPaymentMetadata(header, privateKey) - require.NoError(t, err) - assert.NotEmpty(t, signature) - }) - - t.Run("RecoverSender", func(t *testing.T) { - signature, err := signer.SignPaymentMetadata(header, privateKey) - require.NoError(t, err) - - header.Signature = signature - recoveredAddress, err := signer.RecoverSender(header) - require.NoError(t, err) - - expectedAddress := crypto.PubkeyToAddress(privateKey.PublicKey) - assert.Equal(t, expectedAddress, recoveredAddress) - }) -} - -func TestConstructPaymentMetadata(t *testing.T) { - chainID := big.NewInt(17000) - verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer := auth.NewEIP712Signer(chainID, verifyingContract) - - privateKey, err := crypto.GenerateKey() - require.NoError(t, err) - - header, err := auth.ConstructPaymentMetadata( - signer, - 0, // binIndex - 1000, // cumulativePayment - 1024, // dataLength - []uint8{1}, - privateKey, - ) - - require.NoError(t, err) - assert.NotNil(t, header) - assert.NotEmpty(t, header.Signature) - - recoveredAddress, err := signer.RecoverSender(header) - require.NoError(t, err) - - expectedAddress := crypto.PubkeyToAddress(privateKey.PublicKey) - assert.Equal(t, expectedAddress, recoveredAddress) -} - -func TestEIP712SignerWithDifferentKeys(t *testing.T) { - chainID := big.NewInt(17000) - verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer := auth.NewEIP712Signer(chainID, verifyingContract) - - privateKey1, err := crypto.GenerateKey() - require.NoError(t, err) - - privateKey2, err := crypto.GenerateKey() - require.NoError(t, err) - - header, err := auth.ConstructPaymentMetadata( - signer, - 0, - 1000, - 1024, - []uint8{1}, - privateKey1, - ) - - require.NoError(t, err) - assert.NotNil(t, header) - assert.NotEmpty(t, header.Signature) - - recoveredAddress, err := signer.RecoverSender(header) - require.NoError(t, err) - - expectedAddress1 := crypto.PubkeyToAddress(privateKey1.PublicKey) - expectedAddress2 := crypto.PubkeyToAddress(privateKey2.PublicKey) - - assert.Equal(t, expectedAddress1, recoveredAddress) - assert.NotEqual(t, expectedAddress2, recoveredAddress) -} - -func TestEIP712SignerWithModifiedHeader(t *testing.T) { - chainID := big.NewInt(17000) - verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer := auth.NewEIP712Signer(chainID, verifyingContract) - - privateKey, err := crypto.GenerateKey() - require.NoError(t, err) - - header, err := auth.ConstructPaymentMetadata( - signer, - 0, - 1000, - 1024, - []uint8{1}, - privateKey, - ) - - require.NoError(t, err) - assert.NotNil(t, header) - assert.NotEmpty(t, header.Signature) - recoveredAddress, err := signer.RecoverSender(header) - require.NoError(t, err) - - expectedAddress := crypto.PubkeyToAddress(privateKey.PublicKey) - assert.Equal(t, expectedAddress, recoveredAddress, "Recovered address should match the address derived from the private key") - - header.AccountID = "modifiedAccount" - - addr, err := signer.RecoverSender(header) - require.NoError(t, err) - require.NotEqual(t, expectedAddress, addr) -} - -func TestEIP712SignerWithDifferentChainID(t *testing.T) { - chainID1 := big.NewInt(17000) - chainID2 := big.NewInt(17001) - verifyingContract := common.HexToAddress("0x1234000000000000000000000000000000000000") - signer1 := auth.NewEIP712Signer(chainID1, verifyingContract) - signer2 := auth.NewEIP712Signer(chainID2, verifyingContract) - - privateKey, err := crypto.GenerateKey() - require.NoError(t, err) - - header, err := auth.ConstructPaymentMetadata( - signer1, - 0, - 1000, - 1024, - []uint8{1}, - privateKey, - ) - - require.NoError(t, err) - assert.NotNil(t, header) - assert.NotEmpty(t, header.Signature) - - // Try to recover the sender using a signer with a different chain ID - sender, err := signer2.RecoverSender(header) - require.NoError(t, err) - expectedAddress := crypto.PubkeyToAddress(privateKey.PublicKey) - - require.NotEqual(t, expectedAddress, sender) -} diff --git a/core/data.go b/core/data.go index 5403282088..96593ffd18 100644 --- a/core/data.go +++ b/core/data.go @@ -8,6 +8,7 @@ import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/encoding" "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/ethereum/go-ethereum/crypto" ) type AccountID = string @@ -474,17 +475,31 @@ func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) { // PaymentMetadata represents the header information for a blob type PaymentMetadata struct { // Existing fields - DataLength uint32 // length in number of symbols - QuorumNumbers []uint8 - AccountID string + AccountID string // New fields BinIndex uint32 // TODO: we are thinking the contract can use uint128 for cumulative payment, // but the definition on v2 uses uint64. Double check with team. CumulativePayment uint64 +} + +// Hash returns the Keccak256 hash of the PaymentMetadata +func (pm *PaymentMetadata) Hash() []byte { + // Create a byte slice to hold the serialized data + data := make([]byte, 0, len(pm.AccountID)+12) + + data = append(data, []byte(pm.AccountID)...) + + binIndexBytes := make([]byte, 4) + binary.BigEndian.PutUint32(binIndexBytes, pm.BinIndex) + data = append(data, binIndexBytes...) + + paymentBytes := make([]byte, 8) + binary.BigEndian.PutUint64(paymentBytes, pm.CumulativePayment) + data = append(data, paymentBytes...) - Signature []byte + return crypto.Keccak256(data) } type TokenAmount uint64 // TODO: change to uint128 diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index 97e8e96aea..64ef0be08f 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -3,7 +3,6 @@ package meterer_test import ( "crypto/ecdsa" "fmt" - "math/big" "os" "testing" "time" @@ -11,11 +10,9 @@ import ( "github.com/Layr-Labs/eigenda/common" commonaws "github.com/Layr-Labs/eigenda/common/aws" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - "github.com/Layr-Labs/eigenda/core/auth" "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/inabox/deploy" - gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ory/dockertest/v3" @@ -29,7 +26,6 @@ var ( clientConfig commonaws.ClientConfig privateKey1 *ecdsa.PrivateKey privateKey2 *ecdsa.PrivateKey - signer *auth.EIP712Signer mt *meterer.Meterer deployLocalStack bool @@ -80,10 +76,6 @@ func setup(_ *testing.M) { panic("failed to create dynamodb client") } - chainID := big.NewInt(17000) - verifyingContract := gethcommon.HexToAddress("0x1234000000000000000000000000000000000000") - signer = auth.NewEIP712Signer(chainID, verifyingContract) - privateKey1, err = crypto.GenerateKey() if err != nil { teardown() From 9d5663a53911ab6899de01fc273f87e656a618b3 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 16 Oct 2024 10:18:54 -0700 Subject: [PATCH 14/15] refactor: update TableExists name and TokenAmount type --- common/aws/dynamodb/client.go | 4 ++-- core/data.go | 5 ++--- core/meterer/offchain_store.go | 6 +++--- core/meterer/onchain_state_test.go | 3 ++- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 5c62089520..53587495df 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -410,8 +410,8 @@ func (c *Client) readItems(ctx context.Context, tableName string, keys []Key) ([ return items, nil } -// TableCheck checks if a table exists and can be described -func (c *Client) TableCheck(ctx context.Context, name string) error { +// TableExists checks if a table exists and can be described +func (c *Client) TableExists(ctx context.Context, name string) error { if name == "" { return errors.New("table name is empty") } diff --git a/core/data.go b/core/data.go index 96593ffd18..c5f95c9c90 100644 --- a/core/data.go +++ b/core/data.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "math/big" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/encoding" @@ -502,8 +503,6 @@ func (pm *PaymentMetadata) Hash() []byte { return crypto.Keccak256(data) } -type TokenAmount uint64 // TODO: change to uint128 - // OperatorInfo contains information about an operator which is stored on the blockchain state, // corresponding to a particular quorum type ActiveReservation struct { @@ -516,5 +515,5 @@ type ActiveReservation struct { } type OnDemandPayment struct { - CumulativePayment TokenAmount // Total amount deposited by the user + CumulativePayment big.Int // Total amount deposited by the user } diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index 52df410d06..d253a1b7e2 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -36,15 +36,15 @@ func NewOffchainStore( return OffchainStore{}, err } - err = dynamoClient.TableCheck(context.Background(), reservationTableName) + err = dynamoClient.TableExists(context.Background(), reservationTableName) if err != nil { return OffchainStore{}, err } - err = dynamoClient.TableCheck(context.Background(), onDemandTableName) + err = dynamoClient.TableExists(context.Background(), onDemandTableName) if err != nil { return OffchainStore{}, err } - err = dynamoClient.TableCheck(context.Background(), globalBinTableName) + err = dynamoClient.TableExists(context.Background(), globalBinTableName) if err != nil { return OffchainStore{}, err } diff --git a/core/meterer/onchain_state_test.go b/core/meterer/onchain_state_test.go index d866474b68..41995ce766 100644 --- a/core/meterer/onchain_state_test.go +++ b/core/meterer/onchain_state_test.go @@ -2,6 +2,7 @@ package meterer_test import ( "context" + "math/big" "testing" "github.com/Layr-Labs/eigenda/core" @@ -20,7 +21,7 @@ var ( QuorumSplit: []byte{50, 50}, } dummyOnDemandPayment = core.OnDemandPayment{ - CumulativePayment: core.TokenAmount(1000), + CumulativePayment: *big.NewInt(1000), } ) From ef739df2558092f4c99e3877da991bf327e18c53 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 16 Oct 2024 14:42:00 -0700 Subject: [PATCH 15/15] refactor: use pointer for big int --- core/data.go | 2 +- core/meterer/onchain_state_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/data.go b/core/data.go index c5f95c9c90..aeaf082bd2 100644 --- a/core/data.go +++ b/core/data.go @@ -515,5 +515,5 @@ type ActiveReservation struct { } type OnDemandPayment struct { - CumulativePayment big.Int // Total amount deposited by the user + CumulativePayment *big.Int // Total amount deposited by the user } diff --git a/core/meterer/onchain_state_test.go b/core/meterer/onchain_state_test.go index 41995ce766..8684034b26 100644 --- a/core/meterer/onchain_state_test.go +++ b/core/meterer/onchain_state_test.go @@ -21,7 +21,7 @@ var ( QuorumSplit: []byte{50, 50}, } dummyOnDemandPayment = core.OnDemandPayment{ - CumulativePayment: *big.NewInt(1000), + CumulativePayment: big.NewInt(1000), } )