diff --git a/core/data.go b/core/data.go index 21e24330c8..7a064f5818 100644 --- a/core/data.go +++ b/core/data.go @@ -5,11 +5,14 @@ import ( "errors" "fmt" "math/big" + "strconv" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/encoding" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/consensys/gnark-crypto/ecc/bn254" - "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/accounts/abi" + "golang.org/x/crypto/sha3" ) type AccountID = string @@ -483,34 +486,80 @@ func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) { // PaymentMetadata represents the header information for a blob type PaymentMetadata struct { - // Existing fields - AccountID string + // AccountID is the ETH account address for the payer + AccountID string `json:"account_id"` - // New fields - BinIndex uint32 + // BinIndex represents the range of time at which the dispersal is made + BinIndex uint32 `json:"bin_index"` // TODO: we are thinking the contract can use uint128 for cumulative payment, // but the definition on v2 uses uint64. Double check with team. - CumulativePayment *big.Int + CumulativePayment *big.Int `json:"cumulative_payment"` } // Hash returns the Keccak256 hash of the PaymentMetadata -func (pm *PaymentMetadata) Hash() []byte { - // Create a byte slice to hold the serialized data - data := make([]byte, 0, len(pm.AccountID)+4+pm.CumulativePayment.BitLen()/8+1) - - // Append AccountID - data = append(data, []byte(pm.AccountID)...) - - // Append BinIndex - binIndexBytes := make([]byte, 4) - binary.BigEndian.PutUint32(binIndexBytes, pm.BinIndex) - data = append(data, binIndexBytes...) - - // Append CumulativePayment - paymentBytes := pm.CumulativePayment.Bytes() - data = append(data, paymentBytes...) +func (pm *PaymentMetadata) Hash() ([32]byte, error) { + blobHeaderType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{ + { + Name: "accountID", + Type: "string", + }, + { + Name: "binIndex", + Type: "uint32", + }, + { + Name: "cumulativePayment", + Type: "uint256", + }, + }) + if err != nil { + return [32]byte{}, err + } + + arguments := abi.Arguments{ + { + Type: blobHeaderType, + }, + } + + bytes, err := arguments.Pack(pm) + if err != nil { + return [32]byte{}, err + } + + var hash [32]byte + hasher := sha3.NewLegacyKeccak256() + hasher.Write(bytes) + copy(hash[:], hasher.Sum(nil)[:32]) + + return hash, nil +} + +func (pm *PaymentMetadata) MarshalDynamoDBAttributeValue() (types.AttributeValue, error) { + return &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "AccountID": &types.AttributeValueMemberS{Value: pm.AccountID}, + "BinIndex": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", pm.BinIndex)}, + "CumulativePayment": &types.AttributeValueMemberN{ + Value: pm.CumulativePayment.String(), + }, + }, + }, nil +} - return crypto.Keccak256(data) +func (pm *PaymentMetadata) UnmarshalDynamoDBAttributeValue(av types.AttributeValue) error { + m, ok := av.(*types.AttributeValueMemberM) + if !ok { + return fmt.Errorf("expected *types.AttributeValueMemberM, got %T", av) + } + pm.AccountID = m.Value["AccountID"].(*types.AttributeValueMemberS).Value + binIndex, err := strconv.ParseUint(m.Value["BinIndex"].(*types.AttributeValueMemberN).Value, 10, 32) + if err != nil { + return fmt.Errorf("failed to parse BinIndex: %w", err) + } + pm.BinIndex = uint32(binIndex) + pm.CumulativePayment, _ = new(big.Int).SetString(m.Value["CumulativePayment"].(*types.AttributeValueMemberN).Value, 10) + return nil } // OperatorInfo contains information about an operator which is stored on the blockchain state, diff --git a/core/v2/assignment.go b/core/v2/assignment.go index 1e50cdbfb9..53e6b96fb9 100644 --- a/core/v2/assignment.go +++ b/core/v2/assignment.go @@ -8,7 +8,7 @@ import ( "github.com/Layr-Labs/eigenda/core" ) -func GetAssignments(state *core.OperatorState, blobVersion byte, quorum uint8) (map[core.OperatorID]Assignment, error) { +func GetAssignments(state *core.OperatorState, blobVersion BlobVersion, quorum uint8) (map[core.OperatorID]Assignment, error) { params, ok := ParametersMap[blobVersion] if !ok { @@ -81,7 +81,7 @@ func GetAssignments(state *core.OperatorState, blobVersion byte, quorum uint8) ( } -func GetAssignment(state *core.OperatorState, blobVersion byte, quorum core.QuorumID, id core.OperatorID) (Assignment, error) { +func GetAssignment(state *core.OperatorState, blobVersion BlobVersion, quorum core.QuorumID, id core.OperatorID) (Assignment, error) { assignments, err := GetAssignments(state, blobVersion, quorum) if err != nil { @@ -96,7 +96,7 @@ func GetAssignment(state *core.OperatorState, blobVersion byte, quorum core.Quor return assignment, nil } -func GetChunkLength(blobVersion byte, blobLength uint32) (uint32, error) { +func GetChunkLength(blobVersion BlobVersion, blobLength uint32) (uint32, error) { if blobLength == 0 { return 0, fmt.Errorf("blob length must be greater than 0") diff --git a/core/v2/assignment_test.go b/core/v2/assignment_test.go index d1245f6c5d..d8b5d2da81 100644 --- a/core/v2/assignment_test.go +++ b/core/v2/assignment_test.go @@ -20,7 +20,7 @@ func TestOperatorAssignmentsV2(t *testing.T) { state := dat.GetTotalOperatorState(context.Background(), 0) operatorState := state.OperatorState - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) assignments, err := corev2.GetAssignments(operatorState, blobVersion, 0) assert.NoError(t, err) @@ -90,7 +90,7 @@ func TestAssignmentWithTooManyOperators(t *testing.T) { assert.Equal(t, len(state.Operators[0]), numOperators) - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) _, err = corev2.GetAssignments(state.OperatorState, blobVersion, 0) assert.Error(t, err) @@ -131,7 +131,7 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { state := dat.GetTotalOperatorState(context.Background(), 0) - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) assignments, err := corev2.GetAssignments(state.OperatorState, blobVersion, 0) assert.NoError(t, err) @@ -162,7 +162,7 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { func TestChunkLength(t *testing.T) { - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) pairs := []struct { blobLength uint32 @@ -188,7 +188,7 @@ func TestChunkLength(t *testing.T) { func TestInvalidChunkLength(t *testing.T) { - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) invalidLengths := []uint32{ 0, diff --git a/core/v2/core_test.go b/core/v2/core_test.go index f55303e066..c3d47fe41e 100644 --- a/core/v2/core_test.go +++ b/core/v2/core_test.go @@ -84,7 +84,7 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) { return p, v, nil } -func makeTestBlob(t *testing.T, p encoding.Prover, version uint8, refBlockNumber uint64, length int, quorums []core.QuorumID) (corev2.BlobCertificate, []byte) { +func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, refBlockNumber uint64, length int, quorums []core.QuorumID) (corev2.BlobCertificate, []byte) { data := make([]byte, length*31) _, err := rand.Read(data) @@ -101,7 +101,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version uint8, refBlockNumber header := corev2.BlobCertificate{ BlobHeader: corev2.BlobHeader{ - Version: version, + BlobVersion: version, QuorumNumbers: quorums, BlobCommitments: commitments, }, @@ -148,7 +148,7 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi for _, quorum := range header.QuorumNumbers { - assignments, err := corev2.GetAssignments(state, header.Version, quorum) + assignments, err := corev2.GetAssignments(state, header.BlobVersion, quorum) if err != nil { t.Fatal(err) } @@ -238,7 +238,7 @@ func TestValidationSucceeds(t *testing.T) { bn := uint64(0) - version := uint8(0) + version := corev2.BlobVersion(0) pool := workerpool.New(1) diff --git a/core/v2/types.go b/core/v2/types.go index f3c9eda15e..ac3b234ef4 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -1,20 +1,27 @@ package v2 import ( + "encoding/hex" "math" + "math/big" + "strings" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" + "github.com/ethereum/go-ethereum/accounts/abi" + "golang.org/x/crypto/sha3" ) var ( // TODO(mooselumph): Put these parameters on chain and add on-chain checks to ensure that the number of operators does not // conflict with the existing on-chain limits - ParametersMap = map[uint8]BlobVersionParameters{ + ParametersMap = map[BlobVersion]BlobVersionParameters{ 0: {CodingRate: 8, ReconstructionThreshold: 0.22, NumChunks: 8192}, } ) +type BlobVersion uint8 + // Assignment contains information about the set of chunks that a specific node will receive type Assignment struct { StartIndex uint32 @@ -30,27 +37,42 @@ func (c *Assignment) GetIndices() []uint32 { return indices } +type BlobKey [32]byte + +func (b BlobKey) Hex() string { + return hex.EncodeToString(b[:]) +} + +func HexToBlobKey(h string) (BlobKey, error) { + s := strings.TrimPrefix(h, "0x") + s = strings.TrimPrefix(s, "0X") + b, err := hex.DecodeString(s) + if err != nil { + return BlobKey{}, err + } + return BlobKey(b), nil +} + // BlobHeader contains all metadata related to a blob including commitments and parameters for encoding type BlobHeader struct { - Version uint8 + BlobVersion BlobVersion - encoding.BlobCommitments + BlobCommitments encoding.BlobCommitments - // QuorumInfos contains the quorum specific parameters for the blob - QuorumNumbers []uint8 + // QuorumNumbers contains the quorums the blob is dispersed to + QuorumNumbers []core.QuorumID - // PaymentHeader contains the payment information for the blob - core.PaymentMetadata + // PaymentMetadata contains the payment information for the blob + PaymentMetadata core.PaymentMetadata - // AuthenticationData is the signature of the blob header by the account ID - AuthenticationData []byte `json:"authentication_data"` + // Signature is the signature of the blob header by the account ID + Signature []byte } func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) { + params := ParametersMap[b.BlobVersion] - params := ParametersMap[b.Version] - - length, err := GetChunkLength(b.Version, uint32(b.Length)) + length, err := GetChunkLength(b.BlobVersion, uint32(b.BlobCommitments.Length)) if err != nil { return encoding.EncodingParams{}, err } @@ -59,7 +81,153 @@ func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) { NumChunks: uint64(params.NumChunks), ChunkLength: uint64(length), }, nil +} + +func (b *BlobHeader) BlobKey() (BlobKey, error) { + blobHeaderType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{ + { + Name: "blobVersion", + Type: "uint8", + }, + { + Name: "blobCommitments", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "commitment", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "X", + Type: "uint256", + }, + { + Name: "Y", + Type: "uint256", + }, + }, + }, + { + Name: "lengthCommitment", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "X", + Type: "uint256[2]", + }, + { + Name: "Y", + Type: "uint256[2]", + }, + }, + }, + { + Name: "lengthProof", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "X", + Type: "uint256[2]", + }, + { + Name: "Y", + Type: "uint256[2]", + }, + }, + }, + { + Name: "length", + Type: "uint32", + }, + }, + }, + { + Name: "quorumNumbers", + Type: "bytes", + }, + { + Name: "paymentMetadataHash", + Type: "bytes32", + }, + }) + if err != nil { + return [32]byte{}, err + } + + arguments := abi.Arguments{ + { + Type: blobHeaderType, + }, + } + type g1Commit struct { + X *big.Int + Y *big.Int + } + type g2Commit struct { + X [2]*big.Int + Y [2]*big.Int + } + type blobCommitments struct { + Commitment g1Commit + LengthCommitment g2Commit + LengthProof g2Commit + Length uint32 + } + + paymentHash, err := b.PaymentMetadata.Hash() + if err != nil { + return [32]byte{}, err + } + s := struct { + BlobVersion uint8 + BlobCommitments blobCommitments + QuorumNumbers []byte + PaymentMetadataHash [32]byte + }{ + BlobVersion: uint8(b.BlobVersion), + BlobCommitments: blobCommitments{ + Commitment: g1Commit{ + X: b.BlobCommitments.Commitment.X.BigInt(new(big.Int)), + Y: b.BlobCommitments.Commitment.Y.BigInt(new(big.Int)), + }, + LengthCommitment: g2Commit{ + X: [2]*big.Int{ + b.BlobCommitments.LengthCommitment.X.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthCommitment.X.A1.BigInt(new(big.Int)), + }, + Y: [2]*big.Int{ + b.BlobCommitments.LengthCommitment.Y.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthCommitment.Y.A1.BigInt(new(big.Int)), + }, + }, + LengthProof: g2Commit{ + X: [2]*big.Int{ + b.BlobCommitments.LengthProof.X.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthProof.X.A1.BigInt(new(big.Int)), + }, + Y: [2]*big.Int{ + b.BlobCommitments.LengthProof.Y.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthProof.Y.A1.BigInt(new(big.Int)), + }, + }, + Length: uint32(b.BlobCommitments.Length), + }, + QuorumNumbers: b.QuorumNumbers, + PaymentMetadataHash: paymentHash, + } + + bytes, err := arguments.Pack(s) + if err != nil { + return [32]byte{}, err + } + + var headerHash [32]byte + hasher := sha3.NewLegacyKeccak256() + hasher.Write(bytes) + copy(headerHash[:], hasher.Sum(nil)[:32]) + + return headerHash, nil } type BlobCertificate struct { @@ -79,9 +247,7 @@ type BlobVersionParameters struct { } func (p BlobVersionParameters) MaxNumOperators() uint32 { - return uint32(math.Floor(float64(p.NumChunks) * (1 - 1/(p.ReconstructionThreshold*float64(p.CodingRate))))) - } const ( diff --git a/core/v2/types_test.go b/core/v2/types_test.go new file mode 100644 index 0000000000..c2c9b908c8 --- /dev/null +++ b/core/v2/types_test.go @@ -0,0 +1,57 @@ +package v2_test + +import ( + "encoding/hex" + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/stretchr/testify/assert" +) + +func TestBlobKey(t *testing.T) { + blobKey := v2.BlobKey([32]byte{1, 2, 3}) + + assert.Equal(t, "0102030000000000000000000000000000000000000000000000000000000000", blobKey.Hex()) + bk, err := v2.HexToBlobKey(blobKey.Hex()) + assert.NoError(t, err) + assert.Equal(t, blobKey, bk) +} + +func TestPaymentHash(t *testing.T) { + pm := core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + } + hash, err := pm.Hash() + assert.NoError(t, err) + // 0xf5894a8e9281b5687c0c7757d3d45fb76152bf659e6e61b1062f4c6bcb69c449 verified in solidity + assert.Equal(t, "f5894a8e9281b5687c0c7757d3d45fb76152bf659e6e61b1062f4c6bcb69c449", hex.EncodeToString(hash[:])) +} + +func TestBlobKeyFromHeader(t *testing.T) { + data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES) + commitments, err := p.GetCommitments(data) + if err != nil { + t.Fatal(err) + } + + bh := v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + }, + Signature: []byte{1, 2, 3}, + } + blobKey, err := bh.BlobKey() + assert.NoError(t, err) + // 0xb19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc verified in solidity + assert.Equal(t, "b19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc", blobKey.Hex()) +} diff --git a/core/v2/validator.go b/core/v2/validator.go index 0d5a2960ee..db513c3191 100644 --- a/core/v2/validator.go +++ b/core/v2/validator.go @@ -43,7 +43,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Get the assignments for the quorum - assignment, err := GetAssignment(operatorState, blob.Version, quorum, v.operatorID) + assignment, err := GetAssignment(operatorState, blob.BlobVersion, quorum, v.operatorID) if err != nil { return nil, nil, err } @@ -57,7 +57,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Validate the chunkLength against the confirmation and adversary threshold parameters - chunkLength, err := GetChunkLength(blob.Version, uint32(blob.BlobHeader.Length)) + chunkLength, err := GetChunkLength(blob.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length)) if err != nil { return nil, nil, fmt.Errorf("invalid chunk length: %w", err) } diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go new file mode 100644 index 0000000000..aca68db9b7 --- /dev/null +++ b/disperser/common/v2/blob.go @@ -0,0 +1,32 @@ +package v2 + +import ( + core "github.com/Layr-Labs/eigenda/core/v2" +) + +type BlobStatus uint + +const ( + Queued BlobStatus = iota + Encoded + Certified + Failed +) + +// BlobMetadata is an internal representation of a blob's metadata. +type BlobMetadata struct { + BlobHeader *core.BlobHeader + + // BlobStatus indicates the current status of the blob + BlobStatus BlobStatus + // Expiry is Unix timestamp of the blob expiry in seconds from epoch + Expiry uint64 + // NumRetries is the number of times the blob has been retried + NumRetries uint + // BlobSize is the size of the blob in bytes + BlobSize uint64 + // RequestedAt is the Unix timestamp of when the blob was requested in seconds + RequestedAt uint64 + // UpdatedAt is the Unix timestamp of when the blob was last updated in _nanoseconds_ + UpdatedAt uint64 +} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_test.go b/disperser/common/v2/blobstore/blob_metadata_store_test.go new file mode 100644 index 0000000000..a0b63bdcf4 --- /dev/null +++ b/disperser/common/v2/blobstore/blob_metadata_store_test.go @@ -0,0 +1,136 @@ +package blobstore_test + +import ( + "context" + "fmt" + "math/big" + "os" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/google/uuid" + + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/ory/dockertest/v3" +) + +var ( + logger = logging.NewNoopLogger() + + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + + deployLocalStack bool + localStackPort = "4571" + + dynamoClient *dynamodb.Client + blobMetadataStore *blobstore.BlobMetadataStore + + UUID = uuid.New() + metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) + + mockCommitment = encoding.BlobCommitments{} +) + +func TestMain(m *testing.M) { + setup(m) + code := m.Run() + teardown() + os.Exit(code) +} + +func setup(m *testing.M) { + + deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + if !deployLocalStack { + localStackPort = os.Getenv("LOCALSTACK_PORT") + } + + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) + if err != nil { + teardown() + panic("failed to start localstack container") + } + } + + cfg := aws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort), + } + + _, err := test_utils.CreateTable(context.Background(), cfg, metadataTableName, blobstore.GenerateTableSchema(metadataTableName, 10, 10)) + if err != nil { + teardown() + panic("failed to create dynamodb table: " + err.Error()) + } + + dynamoClient, err = dynamodb.NewClient(cfg, logger) + if err != nil { + teardown() + panic("failed to create dynamodb client: " + err.Error()) + } + + blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour) + + var X1, Y1 fp.Element + X1 = *X1.SetBigInt(big.NewInt(1)) + Y1 = *Y1.SetBigInt(big.NewInt(2)) + + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err = lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + + mockCommitment = encoding.BlobCommitments{ + Commitment: &encoding.G1Commitment{ + X: X1, + Y: Y1, + }, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + LengthProof: (*encoding.G2Commitment)(&lengthProof), + Length: 10, + } +} + +func teardown() { + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +} diff --git a/disperser/common/v2/blobstore/dynamo_store.go b/disperser/common/v2/blobstore/dynamo_store.go new file mode 100644 index 0000000000..ca4d804740 --- /dev/null +++ b/disperser/common/v2/blobstore/dynamo_store.go @@ -0,0 +1,276 @@ +package blobstore + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + core "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/disperser" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +const ( + StatusIndexName = "StatusIndex" + OperatorDispersalIndexName = "OperatorDispersalIndex" + OperatorResponseIndexName = "OperatorResponseIndex" + + blobKeyPrefix = "BlobKey#" + blobMetadataSK = "BlobMetadata" +) + +// BlobMetadataStore is a blob metadata storage backed by DynamoDB +type BlobMetadataStore struct { + dynamoDBClient *commondynamodb.Client + logger logging.Logger + tableName string + ttl time.Duration +} + +func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { + logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) + return &BlobMetadataStore{ + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "blobMetadataStoreV2"), + tableName: tableName, + ttl: ttl, + } +} + +func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { + item, err := MarshalBlobMetadata(blobMetadata) + if err != nil { + return err + } + + return s.dynamoDBClient.PutItem(ctx, s.tableName, item) +} + +func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) { + item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKeyPrefix + blobKey.Hex(), + }, + "SK": &types.AttributeValueMemberS{ + Value: blobMetadataSK, + }, + }) + + if item == nil { + return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey.Hex()) + } + + if err != nil { + return nil, err + } + + metadata, err := UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + + return metadata, nil +} + +// GetBlobMetadataByStatus returns all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus, lastUpdatedAt uint64) ([]*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND UpdatedAt > :updatedAt", commondynamodb.ExpressionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":updatedAt": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }}) + if err != nil { + return nil, err + } + + metadata := make([]*v2.BlobMetadata, len(items)) + for i, item := range items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadata, nil +} + +// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) { + count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, StatusIndexName, "BlobStatus = :status", commondynamodb.ExpressionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + }) + if err != nil { + return 0, err + } + + return count, nil +} + +func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { + return &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + // PK is the composite partition key + { + AttributeName: aws.String("PK"), + AttributeType: types.ScalarAttributeTypeS, + }, + // SK is the composite sort key + { + AttributeName: aws.String("SK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("BlobStatus"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("UpdatedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("OperatorID"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("DispersedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("RespondedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("PK"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("SK"), + KeyType: types.KeyTypeRange, + }, + }, + TableName: aws.String(tableName), + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String(StatusIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BlobStatus"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("UpdatedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorDispersalIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("DispersedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorResponseIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("RespondedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + } +} + +func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) { + fields, err := attributevalue.MarshalMap(metadata) + if err != nil { + return nil, fmt.Errorf("failed to marshal blob metadata: %w", err) + } + + // Add PK and SK fields + blobKey, err := metadata.BlobHeader.BlobKey() + if err != nil { + return nil, err + } + fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + blobKey.Hex()} + fields["SK"] = &types.AttributeValueMemberS{Value: blobMetadataSK} + + return fields, nil +} + +func UnmarshalBlobKey(item commondynamodb.Item) (core.BlobKey, error) { + type Blob struct { + PK string + } + + blob := Blob{} + err := attributevalue.UnmarshalMap(item, &blob) + if err != nil { + return core.BlobKey{}, err + } + + bk := strings.TrimPrefix(blob.PK, blobKeyPrefix) + return core.HexToBlobKey(bk) +} + +func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { + metadata := v2.BlobMetadata{} + err := attributevalue.UnmarshalMap(item, &metadata) + if err != nil { + return nil, err + } + return &metadata, nil +} diff --git a/disperser/common/v2/blobstore/dynamo_store_test.go b/disperser/common/v2/blobstore/dynamo_store_test.go new file mode 100644 index 0000000000..b87a2b8090 --- /dev/null +++ b/disperser/common/v2/blobstore/dynamo_store_test.go @@ -0,0 +1,100 @@ +package blobstore_test + +import ( + "context" + "math/big" + "testing" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + core "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/stretchr/testify/assert" +) + +func TestBlobMetadataStoreOperations(t *testing.T) { + ctx := context.Background() + blobHeader1 := &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 0, + CumulativePayment: big.NewInt(532), + }, + } + blobKey1, err := blobHeader1.BlobKey() + assert.NoError(t, err) + blobHeader2 := &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{1}, + BlobCommitments: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x456", + BinIndex: 2, + CumulativePayment: big.NewInt(999), + }, + } + blobKey2, err := blobHeader2.BlobKey() + assert.NoError(t, err) + + now := time.Now() + metadata1 := &v2.BlobMetadata{ + BlobHeader: blobHeader1, + BlobStatus: v2.Queued, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), + } + metadata2 := &v2.BlobMetadata{ + BlobHeader: blobHeader2, + BlobStatus: v2.Certified, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), + } + err = blobMetadataStore.PutBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.PutBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchedMetadata) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, metadata2, fetchedMetadata) + + queued, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued, 0) + assert.NoError(t, err) + assert.Len(t, queued, 1) + assert.Equal(t, metadata1, queued[0]) + certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified, 0) + assert.NoError(t, err) + assert.Len(t, certified, 1) + assert.Equal(t, metadata2, certified[0]) + + queuedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, v2.Queued) + assert.NoError(t, err) + assert.Equal(t, int32(1), queuedCount) + + deleteItems(t, []commondynamodb.Key{ + { + "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey1.Hex()}, + "SK": &types.AttributeValueMemberS{Value: "BlobMetadata"}, + }, + { + "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey2.Hex()}, + "SK": &types.AttributeValueMemberS{Value: "BlobMetadata"}, + }, + }) +} + +func deleteItems(t *testing.T, keys []commondynamodb.Key) { + failed, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) + assert.NoError(t, err) + assert.Len(t, failed, 0) +}