From 6ae70ef755d836d91a0acc1788db3f0f009bfc5b Mon Sep 17 00:00:00 2001 From: hopeyen Date: Tue, 8 Oct 2024 10:56:22 -0700 Subject: [PATCH] refactor: minor updates and cleanup --- core/meterer/meterer.go | 12 ++++--- core/meterer/offchain_store.go | 64 +++------------------------------- core/meterer/onchain_state.go | 5 --- 3 files changed, 12 insertions(+), 69 deletions(-) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 7cb866eacb..3b7bc62b1f 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -13,12 +13,14 @@ type TimeoutConfig struct { TxnBroadcastTimeout time.Duration } -// network parameters (this should be published on-chain and read through contracts) +// network parameters that should be published on-chain. We currently configure these params through disperser env vars. type Config struct { - GlobalBytesPerSecond uint64 // 2^64 bytes ~= 18 exabytes per second; if we use uint32, that's ~4GB/s - PricePerChargeable uint32 // 2^64 gwei ~= 18M Eth; uint32 => ~4ETH - MinChargeableSize uint32 - ReservationWindow uint32 + // for rate limiting 2^64 ~= 18 exabytes per second; 2^32 ~= 4GB/s + // for payments 2^64 ~= 18M Eth; 2^32 ~= 4ETH + GlobalBytesPerSecond uint64 // Global rate limit in bytes per second for on-demand payments + MinChargeableSize uint32 // Minimum size of a chargeable unit in bytes, used as a floor for on-demand payments + PricePerChargeable uint32 // Price per chargeable unit in gwei, used for on-demand payments + ReservationWindow uint32 // Duration of all reservations in seconds, used to calculate bin indices } // disperser API server will receive requests from clients. these requests will be with a blobHeader with payments information (CumulativePayments, BinIndex, and Signature) diff --git a/core/meterer/offchain_store.go b/core/meterer/offchain_store.go index d494a84ab2..c4026fd970 100644 --- a/core/meterer/offchain_store.go +++ b/core/meterer/offchain_store.go @@ -11,7 +11,6 @@ import ( commonaws "github.com/Layr-Labs/eigenda/common/aws" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ) @@ -90,12 +89,7 @@ func (s *OffchainStore) UpdateReservationBin(ctx context.Context, accountID stri "BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)}, } - update := map[string]types.AttributeValue{ - "BinUsage": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(size), 10)}, - } - - fmt.Println("increment the item in a table", s.reservationTableName) - res, err := s.dynamoClient.UpdateItemIncrement(ctx, s.reservationTableName, key, update) + res, err := s.dynamoClient.IncrementBy(ctx, s.reservationTableName, key, "BinUsage", size) if err != nil { return 0, fmt.Errorf("failed to increment bin usage: %w", err) } @@ -118,15 +112,12 @@ func (s *OffchainStore) UpdateReservationBin(ctx context.Context, accountID stri return binUsageValue, nil } -func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, size uint32) (uint64, error) { +func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, size uint64) (uint64, error) { key := map[string]types.AttributeValue{ "BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)}, } - update := map[string]types.AttributeValue{ - "BinUsage": &types.AttributeValueMemberN{Value: strconv.FormatUint(uint64(size), 10)}, - } - res, err := s.dynamoClient.UpdateItemIncrement(ctx, s.globalBinTableName, key, update) + res, err := s.dynamoClient.IncrementBy(ctx, s.globalBinTableName, key, "BinUsage", size) if err != nil { return 0, err } @@ -149,52 +140,6 @@ func (s *OffchainStore) UpdateGlobalBin(ctx context.Context, binIndex uint64, si return binUsageValue, nil } -func (s *OffchainStore) FindReservationBin(ctx context.Context, accountID string, binIndex uint64) (*ReservationBin, error) { - key := map[string]types.AttributeValue{ - "AccountID": &types.AttributeValueMemberS{Value: accountID}, - "BinIndex": &types.AttributeValueMemberN{Value: strconv.FormatUint(binIndex, 10)}, - } - - result, err := s.dynamoClient.GetItem(ctx, s.reservationTableName, key) - if err != nil { - return nil, err - } - - if result == nil { - return nil, errors.New("reservation not found") - } - - var reservation ReservationBin - err = attributevalue.UnmarshalMap(result, &reservation) - if err != nil { - return nil, err - } - - return &reservation, nil -} - -// Find all reservation bins for a given account -func (s *OffchainStore) FindReservationBins(ctx context.Context, accountID string) ([]ReservationBin, error) { - result, err := s.dynamoClient.QueryIndex(ctx, s.reservationTableName, "AccountIDIndex", "AccountID = :accountID", commondynamodb.ExpresseionValues{ - ":accountID": &types.AttributeValueMemberS{Value: accountID}, - }) - if err != nil { - return nil, err - } - - if result == nil { - return nil, errors.New("reservation not found") - } - - var reservations []ReservationBin - err = attributevalue.UnmarshalListOfMaps(result, &reservations) - if err != nil { - return nil, err - } - - return reservations, nil -} - func (s *OffchainStore) AddOnDemandPayment(ctx context.Context, blobHeader BlobHeader, blobSizeCharged uint32) error { result, err := s.dynamoClient.GetItem(ctx, s.onDemandTableName, commondynamodb.Item{ @@ -238,7 +183,8 @@ func (s *OffchainStore) RemoveOnDemandPayment(ctx context.Context, accountID str return nil } -// relevant on-demand payment records: previous cumulative payment, next cumulative payment, blob size of next payment +// GetRelevantOnDemandRecords gets previous cumulative payment, next cumulative payment, blob size of next payment +// The queries are done sequentially instead of one-go for efficient querying and would not cause race condition errors for honest requests func (s *OffchainStore) GetRelevantOnDemandRecords(ctx context.Context, accountID string, cumulativePayment uint64) (uint64, uint64, uint32, error) { // Fetch the largest entry smaller than the given cumulativePayment smallerResult, err := s.dynamoClient.QueryIndexOrderWithLimit(ctx, s.onDemandTableName, "AccountIDIndex", diff --git a/core/meterer/onchain_state.go b/core/meterer/onchain_state.go index 2ddec01da2..88e0e4f5f5 100644 --- a/core/meterer/onchain_state.go +++ b/core/meterer/onchain_state.go @@ -58,11 +58,6 @@ type OnchainPaymentState struct { ActiveReservations *ActiveReservations OnDemandPayments *OnDemandPayments - // FUNCTIONS IF THIS STRUCT WAS AN INTERFACE? - // GetActiveReservations(ctx context.Context, blockNumber uint) (map[string]*ActiveReservations, error) - // GetActiveReservationByAccount(ctx context.Context, blockNumber uint, accountID string) (*ActiveReservation, error) - // GetOnDemandPayments(ctx context.Context, blockNumber uint) (map[string]*OnDemandPayments, error) - // GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint, accountID string) (*OnDemandPayment, error) } func NewOnchainPaymentState() *OnchainPaymentState {