Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[payments] meterer structs and helpers #789

Merged
merged 15 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dynamodb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebasing to master should hide these changes that have already been merged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh just realized that this branch was configured to merge into the dynamo-update branch. updated the merge base so the changes should be hidden now

import (
"context"
"errors"
"fmt"
"math"
"strconv"
Expand Down Expand Up @@ -408,3 +409,17 @@ func (c *Client) readItems(ctx context.Context, tableName string, keys []Key) ([

return items, nil
}

// TableExists checks if a table exists and can be described
func (c *Client) TableExists(ctx context.Context, name string) error {
if name == "" {
return errors.New("table name is empty")
}
_, err := c.dynamoClient.DescribeTable(ctx, &dynamodb.DescribeTableInput{
TableName: aws.String(name),
})
if err != nil {
return err
}
return nil
}
4 changes: 2 additions & 2 deletions core/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (c *StdAssignmentCoordinator) ValidateChunkLength(state *OperatorState, blo
denom := new(big.Int).Mul(big.NewInt(int64(info.ConfirmationThreshold-info.AdversaryThreshold)), totalStake)
maxChunkLength := uint(roundUpDivideBig(num, denom).Uint64())

maxChunkLength2 := roundUpDivide(2*blobLength*percentMultiplier, MaxRequiredNumChunks*uint(info.ConfirmationThreshold-info.AdversaryThreshold))
maxChunkLength2 := RoundUpDivide(2*blobLength*percentMultiplier, MaxRequiredNumChunks*uint(info.ConfirmationThreshold-info.AdversaryThreshold))

if maxChunkLength < maxChunkLength2 {
maxChunkLength = maxChunkLength2
Expand Down Expand Up @@ -271,7 +271,7 @@ func roundUpDivideBig(a, b *big.Int) *big.Int {

}

func roundUpDivide(a, b uint) uint {
func RoundUpDivide(a, b uint) uint {
return (a + b - 1) / b

}
Expand Down
49 changes: 48 additions & 1 deletion core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"encoding/binary"
"errors"
"fmt"
"math/big"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/ethereum/go-ethereum/crypto"
)

type AccountID = string
Expand Down Expand Up @@ -291,7 +293,7 @@ func (b *BlobHeader) EncodedSizeAllQuorums() int64 {
size := int64(0)
for _, quorum := range b.QuorumInfos {

size += int64(roundUpDivide(b.Length*percentMultiplier*encoding.BYTES_PER_SYMBOL, uint(quorum.ConfirmationThreshold-quorum.AdversaryThreshold)))
size += int64(RoundUpDivide(b.Length*percentMultiplier*encoding.BYTES_PER_SYMBOL, uint(quorum.ConfirmationThreshold-quorum.AdversaryThreshold)))
}
return size
}
Expand Down Expand Up @@ -470,3 +472,48 @@ func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) {
}
return c, nil
}

// PaymentMetadata represents the header information for a blob
type PaymentMetadata struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be independent of what BlobHeader looks like?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm we could add a field like blob key or blob pointer? this would mean we need to do blob lookup. With these current fields, we don't need the blob or blob header to do metering

// Existing fields
AccountID string

// New fields
BinIndex uint32
// TODO: we are thinking the contract can use uint128 for cumulative payment,
// but the definition on v2 uses uint64. Double check with team.
CumulativePayment uint64
}

// Hash returns the Keccak256 hash of the PaymentMetadata
func (pm *PaymentMetadata) Hash() []byte {
// Create a byte slice to hold the serialized data
data := make([]byte, 0, len(pm.AccountID)+12)

data = append(data, []byte(pm.AccountID)...)

binIndexBytes := make([]byte, 4)
binary.BigEndian.PutUint32(binIndexBytes, pm.BinIndex)
data = append(data, binIndexBytes...)

paymentBytes := make([]byte, 8)
binary.BigEndian.PutUint64(paymentBytes, pm.CumulativePayment)
data = append(data, paymentBytes...)

return crypto.Keccak256(data)
}

// OperatorInfo contains information about an operator which is stored on the blockchain state,
// corresponding to a particular quorum
type ActiveReservation struct {
DataRate uint64 // Bandwidth per reservation bin
StartTimestamp uint64 // Unix timestamp that's valid for basically eternity
EndTimestamp uint64

QuorumNumbers []uint8
QuorumSplit []byte // ordered mapping of quorum number to payment split; on-chain validation should ensure split <= 100
}

type OnDemandPayment struct {
CumulativePayment big.Int // Total amount deposited by the user
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's common practice to use a pointer for big.Int, i.e. CumulativePayment *big.Int, as most methods consume the pointer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg, updated

}
10 changes: 10 additions & 0 deletions core/eth/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,16 @@ func (t *Transactor) GetRequiredQuorumNumbers(ctx context.Context, blockNumber u
return requiredQuorums, nil
}

func (t *Transactor) GetActiveReservations(ctx context.Context, blockNumber uint32) (map[string]core.ActiveReservation, error) {
// contract is not implemented yet
return map[string]core.ActiveReservation{}, nil
}

func (t *Transactor) GetOnDemandPayments(ctx context.Context, blockNumber uint32) (map[string]core.OnDemandPayment, error) {
// contract is not implemented yet
return map[string]core.OnDemandPayment{}, nil
}

func (t *Transactor) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDAServiceManagerAddr gethcommon.Address) error {

contractEigenDAServiceManager, err := eigendasrvmg.NewContractEigenDAServiceManager(eigenDAServiceManagerAddr, t.EthClient)
Expand Down
53 changes: 53 additions & 0 deletions core/meterer/meterer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package meterer

import (
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
)

// Config contains network parameters that should be published on-chain. We currently configure these params through disperser env vars.
type Config struct {
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
// GlobalBytesPerSecond is the rate limit in bytes per second for on-demand payments
GlobalBytesPerSecond uint64
// MinChargeableSize is the minimum size of a chargeable unit in bytes, used as a floor for on-demand payments
MinChargeableSize uint32
// PricePerChargeable is the price per chargeable unit in gwei, used for on-demand payments
PricePerChargeable uint32
// ReservationWindow is the duration of all reservations in seconds, used to calculate bin indices
ReservationWindow uint32

// ChainReadTimeout is the timeout for reading payment state from chain
ChainReadTimeout time.Duration
}

// Meterer handles payment accounting across different accounts. Disperser API server receives requests from clients and each request contains a blob header
// with payments information (CumulativePayments, BinIndex, and Signature). Disperser will pass the blob header to the meterer, which will check if the
// payments information is valid.
type Meterer struct {
Config

// ChainState reads on-chain payment state periodically and cache it in memory
ChainState OnchainPayment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OnchainPayment? ChainState sounds like the ChainState struct in chainio 😭

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we perhaps move it later?🫠 similar to replacing transactor with reader, I would prefer creating a separate PR after both of these are merged or at least chainio gets merged??

// OffchainStore uses DynamoDB to track metering and used to validate requests
OffchainStore OffchainStore

logger logging.Logger
}

func NewMeterer(
config Config,
paymentChainState OnchainPayment,
offchainStore OffchainStore,
logger logging.Logger,
) (*Meterer, error) {
// TODO: create a separate thread to pull from the chain and update chain state
return &Meterer{
Config: config,

ChainState: paymentChainState,
OffchainStore: offchainStore,

logger: logger.With("component", "Meterer"),
}, nil
}
147 changes: 147 additions & 0 deletions core/meterer/meterer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package meterer_test

import (
"crypto/ecdsa"
"fmt"
"os"
"testing"
"time"

"github.com/Layr-Labs/eigenda/common"
commonaws "github.com/Layr-Labs/eigenda/common/aws"
commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/core/meterer"
"github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/inabox/deploy"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ory/dockertest/v3"

"github.com/Layr-Labs/eigensdk-go/logging"
)

var (
dockertestPool *dockertest.Pool
dockertestResource *dockertest.Resource
dynamoClient *commondynamodb.Client
clientConfig commonaws.ClientConfig
privateKey1 *ecdsa.PrivateKey
privateKey2 *ecdsa.PrivateKey
mt *meterer.Meterer

deployLocalStack bool
localStackPort = "4566"
paymentChainState = &mock.MockOnchainPaymentState{}
)

func TestMain(m *testing.M) {
setup(m)
code := m.Run()
teardown()
os.Exit(code)
}

func setup(_ *testing.M) {

deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false")
if !deployLocalStack {
localStackPort = os.Getenv("LOCALSTACK_PORT")
}

if deployLocalStack {
var err error
dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort)
if err != nil {
teardown()
panic("failed to start localstack container")
}
}

loggerConfig := common.DefaultLoggerConfig()
logger, err := common.NewLogger(loggerConfig)
if err != nil {
teardown()
panic("failed to create logger")
}

clientConfig = commonaws.ClientConfig{
Region: "us-east-1",
AccessKey: "localstack",
SecretAccessKey: "localstack",
EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort),
}

dynamoClient, err = commondynamodb.NewClient(clientConfig, logger)
if err != nil {
teardown()
panic("failed to create dynamodb client")
}

privateKey1, err = crypto.GenerateKey()
if err != nil {
teardown()
panic("failed to generate private key")
}
privateKey2, err = crypto.GenerateKey()
if err != nil {
teardown()
panic("failed to generate private key")
}

logger = logging.NewNoopLogger()
config := meterer.Config{
PricePerChargeable: 1,
MinChargeableSize: 1,
GlobalBytesPerSecond: 1000,
ReservationWindow: 60,
ChainReadTimeout: 3 * time.Second,
}

err = meterer.CreateReservationTable(clientConfig, "reservations")
if err != nil {
teardown()
panic("failed to create reservation table")
}
err = meterer.CreateOnDemandTable(clientConfig, "ondemand")
if err != nil {
teardown()
panic("failed to create ondemand table")
}
err = meterer.CreateGlobalReservationTable(clientConfig, "global")
if err != nil {
teardown()
panic("failed to create global reservation table")
}

store, err := meterer.NewOffchainStore(
clientConfig,
"reservations",
"ondemand",
"global",
logger,
)

if err != nil {
teardown()
panic("failed to create offchain store")
}

// add some default sensible configs
mt, err = meterer.NewMeterer(
config,
paymentChainState,
store,
logging.NewNoopLogger(),
// metrics.NewNoopMetrics(),
)

if err != nil {
teardown()
panic("failed to create meterer")
}
}

func teardown() {
if deployLocalStack {
deploy.PurgeDockertestResources(dockertestPool, dockertestResource)
}
}
Loading
Loading