From cc05eedafa520ce3e7cb8a15b33acc75b29787e1 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Wed, 16 Oct 2024 21:14:56 -0700 Subject: [PATCH 1/3] v2 blob metadata --- core/data.go | 10 +- disperser/common/v2/blob.go | 51 ++ .../v2/blobstore/blob_metadata_store_v2.go | 551 ++++++++++++++++++ .../blobstore/blob_metadata_store_v2_test.go | 444 ++++++++++++++ 4 files changed, 1051 insertions(+), 5 deletions(-) create mode 100644 disperser/common/v2/blob.go create mode 100644 disperser/common/v2/blobstore/blob_metadata_store_v2.go create mode 100644 disperser/common/v2/blobstore/blob_metadata_store_v2_test.go diff --git a/core/data.go b/core/data.go index 21e24330c8..6504398951 100644 --- a/core/data.go +++ b/core/data.go @@ -483,14 +483,14 @@ func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) { // PaymentMetadata represents the header information for a blob type PaymentMetadata struct { - // Existing fields - AccountID string + // AccountID is the ETH account address for the payer + AccountID string `json:"account_id"` - // New fields - BinIndex uint32 + // BinIndex represents the range of time at which the dispersal is made + BinIndex uint32 `json:"bin_index"` // TODO: we are thinking the contract can use uint128 for cumulative payment, // but the definition on v2 uses uint64. Double check with team. - CumulativePayment *big.Int + CumulativePayment *big.Int `json:"cumulative_payment"` } // Hash returns the Keccak256 hash of the PaymentMetadata diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go new file mode 100644 index 0000000000..8b87cb71f9 --- /dev/null +++ b/disperser/common/v2/blob.go @@ -0,0 +1,51 @@ +package v2 + +import ( + "encoding/hex" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/encoding" +) + +type BlobStatus uint + +const ( + Queued BlobStatus = iota + Encoded + Certified + Failed +) + +type BlobVersion uint32 + +type BlobKey [32]byte + +func (b BlobKey) Hex() string { + return hex.EncodeToString(b[:]) +} + +func HexToBlobKey(h string) (BlobKey, error) { + b, err := hex.DecodeString(h) + if err != nil { + return BlobKey{}, err + } + return BlobKey(b), nil +} + +type BlobHeader struct { + BlobVersion BlobVersion `json:"version"` + BlobQuorumInfos []*core.BlobQuorumInfo `json:"blob_quorum_infos"` + BlobCommitment encoding.BlobCommitments `json:"commitments"` + + core.PaymentMetadata `json:"payment_metadata"` +} + +type BlobMetadata struct { + BlobHeader `json:"blob_header"` + + BlobStatus BlobStatus `json:"blob_status"` + Expiry uint64 `json:"expiry"` + NumRetries uint `json:"num_retries"` + BlobSize uint64 `json:"blob_size"` + RequestedAt uint64 `json:"requested_at"` +} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_v2.go b/disperser/common/v2/blobstore/blob_metadata_store_v2.go new file mode 100644 index 0000000000..6808ef90f3 --- /dev/null +++ b/disperser/common/v2/blobstore/blob_metadata_store_v2.go @@ -0,0 +1,551 @@ +package blobstore + +import ( + "context" + "fmt" + "strconv" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/disperser" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/ethereum/go-ethereum/common/hexutil" +) + +const ( + StatusIndexName = "StatusIndex" + OperatorDispersalIndexName = "OperatorDispersalIndex" + OperatorResponseIndexName = "OperatorResponseIndex" +) + +// BlobMetadataStore is a blob metadata storage backed by DynamoDB +type BlobMetadataStore struct { + dynamoDBClient *commondynamodb.Client + logger logging.Logger + tableName string + ttl time.Duration +} + +func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { + logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) + return &BlobMetadataStore{ + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "BlobMetadataStoreV2"), + tableName: tableName, + ttl: ttl, + } +} + +func (s *BlobMetadataStore) CreateBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { + item, err := MarshalBlobMetadata(blobMetadata) + if err != nil { + return err + } + + return s.dynamoDBClient.PutItem(ctx, s.tableName, item) +} + +func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobKey) (*v2.BlobMetadata, error) { + item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKey.BlobHash, + }, + "SK": &types.AttributeValueMemberS{ + Value: blobKey.MetadataHash, + }, + }) + + if item == nil { + return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey) + } + + if err != nil { + return nil, err + } + + metadata, err := UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + + return metadata, nil +} + +// GetBulkBlobMetadata returns the metadata for the given blob keys +// Note: ordering of items is not guaranteed +func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*v2.BlobMetadata, error) { + keys := make([]map[string]types.AttributeValue, len(blobKeys)) + for i := 0; i < len(blobKeys); i += 1 { + keys[i] = map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{Value: blobKeys[i].BlobHash}, + "MetadataHash": &types.AttributeValueMemberS{Value: blobKeys[i].MetadataHash}, + } + } + items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys) + if err != nil { + return nil, err + } + + metadata := make([]*v2.BlobMetadata, len(items)) + for i, item := range items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadata, nil +} + +// GetBlobMetadataByStatus returns all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. +func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) ([]*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }}) + if err != nil { + return nil, err + } + + metadata := make([]*v2.BlobMetadata, len(items)) + for i, item := range items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadata, nil +} + +// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. +func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status disperser.BlobStatus) (int32, error) { + count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }, + }) + if err != nil { + return 0, err + } + + return count, nil +} + +// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit +// along with items, also returns a pagination token that can be used to fetch the next set of items +// +// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. +// e.g 1mb limit for a single query +func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*v2.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { + + var attributeMap map[string]types.AttributeValue + var err error + + // Convert the exclusive start key to a map of AttributeValue + if exclusiveStartKey != nil { + attributeMap, err = convertToAttribMap(exclusiveStartKey) + if err != nil { + return nil, nil, err + } + } + + queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }, + }, limit, attributeMap) + + if err != nil { + return nil, nil, err + } + + // When no more results to fetch, the LastEvaluatedKey is nil + if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { + return nil, nil, nil + } + + metadata := make([]*v2.BlobMetadata, len(queryResult.Items)) + for i, item := range queryResult.Items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, nil, err + } + } + + lastEvaluatedKey := queryResult.LastEvaluatedKey + if lastEvaluatedKey == nil { + return metadata, nil, nil + } + + // Convert the last evaluated key to a disperser.BlobStoreExclusiveStartKey + exclusiveStartKey, err = convertToExclusiveStartKey(lastEvaluatedKey) + if err != nil { + return nil, nil, err + } + return metadata, exclusiveStartKey, nil +} + +func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash", commondynamodb.ExpresseionValues{ + ":batch_header_hash": &types.AttributeValueMemberB{ + Value: batchHeaderHash[:], + }, + }) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("there is no metadata for batch %x", batchHeaderHash) + } + + metadatas := make([]*v2.BlobMetadata, len(items)) + for i, item := range items { + metadatas[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadatas, nil +} + +// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit +// along with items, also returns a pagination token that can be used to fetch the next set of items +// +// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. +// e.g 1mb limit for a single query +func (s *BlobMetadataStore) GetAllBlobMetadataByBatchWithPagination( + ctx context.Context, + batchHeaderHash [32]byte, + limit int32, + exclusiveStartKey *disperser.BatchIndexExclusiveStartKey, +) ([]*v2.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) { + var attributeMap map[string]types.AttributeValue + var err error + + // Convert the exclusive start key to a map of AttributeValue + if exclusiveStartKey != nil { + attributeMap, err = convertToAttribMapBatchIndex(exclusiveStartKey) + if err != nil { + return nil, nil, err + } + } + + queryResult, err := s.dynamoDBClient.QueryIndexWithPagination( + ctx, + s.tableName, + batchIndexName, + "BatchHeaderHash = :batch_header_hash", + commondynamodb.ExpresseionValues{ + ":batch_header_hash": &types.AttributeValueMemberB{ + Value: batchHeaderHash[:], + }, + }, + limit, + attributeMap, + ) + if err != nil { + return nil, nil, err + } + + s.logger.Info("Query result", "items", len(queryResult.Items), "lastEvaluatedKey", queryResult.LastEvaluatedKey) + // When no more results to fetch, the LastEvaluatedKey is nil + if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { + return nil, nil, nil + } + + metadata := make([]*v2.BlobMetadata, len(queryResult.Items)) + for i, item := range queryResult.Items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, nil, err + } + } + + lastEvaluatedKey := queryResult.LastEvaluatedKey + if lastEvaluatedKey == nil { + return metadata, nil, nil + } + + // Convert the last evaluated key to a disperser.BatchIndexExclusiveStartKey + exclusiveStartKey, err = convertToExclusiveStartKeyBatchIndex(lastEvaluatedKey) + if err != nil { + return nil, nil, err + } + return metadata, exclusiveStartKey, nil +} + +func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash AND BlobIndex = :blob_index", commondynamodb.ExpresseionValues{ + ":batch_header_hash": &types.AttributeValueMemberB{ + Value: batchHeaderHash[:], + }, + ":blob_index": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(blobIndex)), + }}) + if err != nil { + return nil, err + } + + if len(items) == 0 { + return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", disperser.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex) + } + + if len(items) > 1 { + s.logger.Error("there are multiple metadata for batch %s and blob index %d", hexutil.Encode(batchHeaderHash[:]), blobIndex) + } + + metadata, err := UnmarshalBlobMetadata(items[0]) + if err != nil { + return nil, err + } + return metadata, nil +} + +func (s *BlobMetadataStore) IncrementNumRetries(ctx context.Context, existingMetadata *v2.BlobMetadata) error { + _, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{ + Value: existingMetadata.BlobHash, + }, + "MetadataHash": &types.AttributeValueMemberS{ + Value: existingMetadata.MetadataHash, + }, + }, commondynamodb.Item{ + "NumRetries": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(existingMetadata.NumRetries + 1)), + }, + }) + + return err +} + +func (s *BlobMetadataStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *v2.BlobMetadata, confirmationBlockNumber uint32) error { + updated := *existingMetadata + if updated.ConfirmationInfo == nil { + return fmt.Errorf("failed to update confirmation block number because confirmation info is missing for blob key %s", existingMetadata.GetBlobKey().String()) + } + + updated.ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber + item, err := MarshalBlobMetadata(&updated) + if err != nil { + return err + } + + _, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{ + Value: existingMetadata.BlobHash, + }, + "MetadataHash": &types.AttributeValueMemberS{ + Value: existingMetadata.MetadataHash, + }, + }, item) + + return err +} + +func (s *BlobMetadataStore) UpdateBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey, updated *v2.BlobMetadata) error { + item, err := MarshalBlobMetadata(updated) + if err != nil { + return err + } + + _, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{ + Value: metadataKey.BlobHash, + }, + "MetadataHash": &types.AttributeValueMemberS{ + Value: metadataKey.MetadataHash, + }, + }, item) + + return err +} + +func (s *BlobMetadataStore) SetBlobStatus(ctx context.Context, metadataKey disperser.BlobKey, status disperser.BlobStatus) error { + _, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ + "BlobHash": &types.AttributeValueMemberS{ + Value: metadataKey.BlobHash, + }, + "MetadataHash": &types.AttributeValueMemberS{ + Value: metadataKey.MetadataHash, + }, + }, commondynamodb.Item{ + "BlobStatus": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + }) + + return err +} + +func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { + return &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("PK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("SK"), + AttributeType: types.ScalarAttributeTypeS, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("PK"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("SK"), + KeyType: types.KeyTypeRange, + }, + }, + TableName: aws.String(tableName), + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String(StatusIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BlobStatus"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("RequestedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorDispersalIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("DispersedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorResponseIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("RespondedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + } +} + +func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) { + return attributevalue.MarshalMap(metadata) +} + +func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { + metadata := v2.BlobMetadata{} + err := attributevalue.UnmarshalMap(item, &metadata) + if err != nil { + return nil, err + } + + return &metadata, nil +} + +func convertToExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BlobStoreExclusiveStartKey, error) { + blobStoreExclusiveStartKey := disperser.BlobStoreExclusiveStartKey{} + err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + + return &blobStoreExclusiveStartKey, nil +} + +func convertToExclusiveStartKeyBatchIndex(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BatchIndexExclusiveStartKey, error) { + blobStoreExclusiveStartKey := disperser.BatchIndexExclusiveStartKey{} + err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + + return &blobStoreExclusiveStartKey, nil +} + +func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) { + if blobStoreExclusiveStartKey == nil { + // Return an empty map or nil + return nil, nil + } + + avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + return avMap, nil +} + +func convertToAttribMapBatchIndex(blobStoreExclusiveStartKey *disperser.BatchIndexExclusiveStartKey) (map[string]types.AttributeValue, error) { + if blobStoreExclusiveStartKey == nil { + // Return an empty map or nil + return nil, nil + } + + avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + return avMap, nil +} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go b/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go new file mode 100644 index 0000000000..c1b1c92b71 --- /dev/null +++ b/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go @@ -0,0 +1,444 @@ +package blobstore_test + +import ( + "context" + "testing" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" +) + +func TestBlobMetadataStoreOperations(t *testing.T) { + ctx := context.Background() + blobKey1 := disperser.BlobKey{ + BlobHash: blobHash, + MetadataHash: "hash", + } + now := time.Now() + metadata1 := &disperser.BlobMetadata{ + MetadataHash: blobKey1.MetadataHash, + BlobHash: blobHash, + BlobStatus: disperser.Processing, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Unix()), + }, + } + blobKey2 := disperser.BlobKey{ + BlobHash: "blob2", + MetadataHash: "hash2", + } + metadata2 := &disperser.BlobMetadata{ + MetadataHash: blobKey2.MetadataHash, + BlobHash: blobKey2.BlobHash, + BlobStatus: disperser.Finalized, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Unix()), + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchedMetadata) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, metadata2, fetchedMetadata) + + fetchBulk, err := blobMetadataStore.GetBulkBlobMetadata(ctx, []disperser.BlobKey{blobKey1, blobKey2}) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchBulk[0]) + assert.Equal(t, metadata2, fetchBulk[1]) + + processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing) + assert.NoError(t, err) + assert.Len(t, processing, 1) + assert.Equal(t, metadata1, processing[0]) + + processingCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Processing) + assert.NoError(t, err) + assert.Equal(t, int32(1), processingCount) + + err = blobMetadataStore.IncrementNumRetries(ctx, metadata1) + assert.NoError(t, err) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + metadata1.NumRetries = 1 + assert.Equal(t, metadata1, fetchedMetadata) + + finalized, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Finalized) + assert.NoError(t, err) + assert.Len(t, finalized, 1) + assert.Equal(t, metadata2, finalized[0]) + + finalizedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Finalized) + assert.NoError(t, err) + assert.Equal(t, int32(1), finalizedCount) + + confirmedMetadata := getConfirmedMetadata(t, metadata1, 1) + err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata) + assert.NoError(t, err) + + metadata, err := blobMetadataStore.GetBlobMetadataInBatch(ctx, confirmedMetadata.ConfirmationInfo.BatchHeaderHash, confirmedMetadata.ConfirmationInfo.BlobIndex) + assert.NoError(t, err) + assert.Equal(t, metadata, confirmedMetadata) + + confirmedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Confirmed) + assert.NoError(t, err) + assert.Equal(t, int32(1), confirmedCount) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) +} + +func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { + ctx := context.Background() + blobKey1 := disperser.BlobKey{ + BlobHash: blobHash, + MetadataHash: "hash", + } + now := time.Now() + metadata1 := &disperser.BlobMetadata{ + MetadataHash: blobKey1.MetadataHash, + BlobHash: blobHash, + BlobStatus: disperser.Processing, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Unix()), + }, + } + blobKey2 := disperser.BlobKey{ + BlobHash: "blob2", + MetadataHash: "hash2", + } + metadata2 := &disperser.BlobMetadata{ + MetadataHash: blobKey2.MetadataHash, + BlobHash: blobKey2.BlobHash, + BlobStatus: disperser.Finalized, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Unix()), + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchedMetadata) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, metadata2, fetchedMetadata) + + processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) + assert.NoError(t, err) + assert.Len(t, processing, 1) + assert.Equal(t, metadata1, processing[0]) + assert.NotNil(t, lastEvaluatedKey) + + finalized, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, nil) + assert.NoError(t, err) + assert.Len(t, finalized, 1) + assert.Equal(t, metadata2, finalized[0]) + assert.NotNil(t, lastEvaluatedKey) + + finalized, lastEvaluatedKey, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, lastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, finalized, 0) + assert.Nil(t, lastEvaluatedKey) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) +} + +func TestGetAllBlobMetadataByBatchWithPagination(t *testing.T) { + ctx := context.Background() + blobKey1 := disperser.BlobKey{ + BlobHash: blobHash, + MetadataHash: "hash", + } + expiry := uint64(time.Now().Add(time.Hour).Unix()) + metadata1 := &disperser.BlobMetadata{ + MetadataHash: blobKey1.MetadataHash, + BlobHash: blobHash, + BlobStatus: disperser.Processing, + Expiry: expiry, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + } + blobKey2 := disperser.BlobKey{ + BlobHash: "blob2", + MetadataHash: "hash2", + } + metadata2 := &disperser.BlobMetadata{ + MetadataHash: blobKey2.MetadataHash, + BlobHash: blobKey2.BlobHash, + BlobStatus: disperser.Finalized, + Expiry: expiry, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + confirmedMetadata1 := getConfirmedMetadata(t, metadata1, 1) + err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata1) + assert.NoError(t, err) + + confirmedMetadata2 := getConfirmedMetadata(t, metadata2, 2) + err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey2, confirmedMetadata2) + assert.NoError(t, err) + + // Fetch the blob metadata with limit 1 + metadata, exclusiveStartKey, err := blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, nil) + assert.NoError(t, err) + assert.Equal(t, metadata[0], confirmedMetadata1) + assert.NotNil(t, exclusiveStartKey) + assert.Equal(t, confirmedMetadata1.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) + + // Get the next blob metadata with limit 1 and the exclusive start key + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) + assert.NoError(t, err) + assert.Equal(t, metadata[0], confirmedMetadata2) + assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) + + // Fetching the next blob metadata should return an empty list + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) + assert.NoError(t, err) + assert.Len(t, metadata, 0) + assert.Nil(t, exclusiveStartKey) + + // Fetch the blob metadata with limit 2 + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 2, nil) + assert.NoError(t, err) + assert.Len(t, metadata, 2) + assert.Equal(t, metadata[0], confirmedMetadata1) + assert.Equal(t, metadata[1], confirmedMetadata2) + assert.NotNil(t, exclusiveStartKey) + assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) + + // Fetch the blob metadata with limit 3 should return only 2 items + metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 3, nil) + assert.NoError(t, err) + assert.Len(t, metadata, 2) + assert.Equal(t, metadata[0], confirmedMetadata1) + assert.Equal(t, metadata[1], confirmedMetadata2) + assert.Nil(t, exclusiveStartKey) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) +} + +func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) { + ctx := context.Background() + // Query BlobMetadataStore for a blob that does not exist + // This should return nil for both the blob and lastEvaluatedKey + processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) + assert.NoError(t, err) + assert.Nil(t, processing) + assert.Nil(t, lastEvaluatedKey) +} + +func TestShadowWriteBlobMetadata(t *testing.T) { + ctx := context.Background() + + blobKey := disperser.BlobKey{ + BlobHash: "shadowblob", + MetadataHash: "shadowhash", + } + expiry := uint64(time.Now().Add(time.Hour).Unix()) + metadata := &disperser.BlobMetadata{ + MetadataHash: blobKey.MetadataHash, + BlobHash: blobKey.BlobHash, + BlobStatus: disperser.Processing, + Expiry: expiry, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + + err := shadowBlobMetadataStore.QueueNewBlobMetadata(ctx, metadata) + assert.NoError(t, err) + err = blobMetadataStore.SetBlobStatus(context.Background(), blobKey, disperser.Dispersing) + assert.NoError(t, err) + primaryMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey) + assert.NoError(t, err) + assert.Equal(t, disperser.Dispersing, primaryMetadata.BlobStatus) + + // Check that the shadow metadata exists but status has NOT been updated + shadowMetadataItem, err := dynamoClient.GetItem(ctx, shadowMetadataTableName, map[string]types.AttributeValue{ + "MetadataHash": &types.AttributeValueMemberS{ + Value: blobKey.MetadataHash, + }, + "BlobHash": &types.AttributeValueMemberS{ + Value: blobKey.BlobHash, + }, + }) + assert.NoError(t, err) + shadowMetadata := disperser.BlobMetadata{} + err = attributevalue.UnmarshalMap(shadowMetadataItem, &shadowMetadata) + assert.NoError(t, err) + assert.Equal(t, disperser.Processing, shadowMetadata.BlobStatus) + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, + }, + }) +} + +func TestFilterOutExpiredBlobMetadata(t *testing.T) { + ctx := context.Background() + + blobKey := disperser.BlobKey{ + BlobHash: "blob1", + MetadataHash: "hash1", + } + now := time.Now() + metadata := &disperser.BlobMetadata{ + MetadataHash: blobKey.MetadataHash, + BlobHash: blobKey.BlobHash, + BlobStatus: disperser.Processing, + Expiry: uint64(now.Add(-1).Unix()), + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: uint64(now.Add(-1000).Unix()), + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata) + assert.NoError(t, err) + + processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing) + assert.NoError(t, err) + assert.Len(t, processing, 0) + + processingCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Processing) + assert.NoError(t, err) + assert.Equal(t, int32(0), processingCount) + + processing, _, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 10, nil) + assert.NoError(t, err) + assert.Len(t, processing, 0) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, + }, + }) +} + +func deleteItems(t *testing.T, keys []commondynamodb.Key) { + _, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) + assert.NoError(t, err) +} + +func getConfirmedMetadata(t *testing.T, metadata *disperser.BlobMetadata, blobIndex uint32) *disperser.BlobMetadata { + batchHeaderHash := [32]byte{1, 2, 3} + var commitX, commitY fp.Element + _, err := commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") + assert.NoError(t, err) + _, err = commitY.SetString("9207254729396071334325696286939045899948985698134704137261649190717970615186") + assert.NoError(t, err) + commitment := &encoding.G1Commitment{ + X: commitX, + Y: commitY, + } + dataLength := 32 + batchID := uint32(99) + batchRoot := []byte("hello") + referenceBlockNumber := uint32(132) + confirmationBlockNumber := uint32(150) + sigRecordHash := [32]byte{0} + fee := []byte{0} + inclusionProof := []byte{1, 2, 3, 4, 5} + confirmationInfo := &disperser.ConfirmationInfo{ + BatchHeaderHash: batchHeaderHash, + BlobIndex: blobIndex, + SignatoryRecordHash: sigRecordHash, + ReferenceBlockNumber: referenceBlockNumber, + BatchRoot: batchRoot, + BlobInclusionProof: inclusionProof, + BlobCommitment: &encoding.BlobCommitments{ + Commitment: commitment, + Length: uint(dataLength), + }, + BatchID: batchID, + ConfirmationTxnHash: common.HexToHash("0x123"), + ConfirmationBlockNumber: confirmationBlockNumber, + Fee: fee, + } + metadata.BlobStatus = disperser.Confirmed + metadata.ConfirmationInfo = confirmationInfo + return metadata +} From 839dffee6d71406b70aaadae1a1fd78050b16a3f Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Thu, 17 Oct 2024 23:04:07 -0700 Subject: [PATCH 2/3] v2 blob metadata store --- core/data.go | 25 + disperser/common/v2/blob.go | 47 +- .../v2/blobstore/blob_metadata_store.go | 15 + .../v2/blobstore/blob_metadata_store_test.go | 137 +++++ .../v2/blobstore/blob_metadata_store_v2.go | 551 ------------------ .../blobstore/blob_metadata_store_v2_test.go | 444 -------------- disperser/common/v2/blobstore/dynamo_store.go | 283 +++++++++ .../common/v2/blobstore/dynamo_store_test.go | 96 +++ 8 files changed, 566 insertions(+), 1032 deletions(-) create mode 100644 disperser/common/v2/blobstore/blob_metadata_store.go create mode 100644 disperser/common/v2/blobstore/blob_metadata_store_test.go delete mode 100644 disperser/common/v2/blobstore/blob_metadata_store_v2.go delete mode 100644 disperser/common/v2/blobstore/blob_metadata_store_v2_test.go create mode 100644 disperser/common/v2/blobstore/dynamo_store.go create mode 100644 disperser/common/v2/blobstore/dynamo_store_test.go diff --git a/core/data.go b/core/data.go index 6504398951..91fd9769c9 100644 --- a/core/data.go +++ b/core/data.go @@ -2,6 +2,7 @@ package core import ( "encoding/binary" + "encoding/hex" "errors" "fmt" "math/big" @@ -527,3 +528,27 @@ type ActiveReservation struct { type OnDemandPayment struct { CumulativePayment *big.Int // Total amount deposited by the user } + +type BlobVersion uint32 + +type BlobKey [32]byte + +func (b BlobKey) Hex() string { + return hex.EncodeToString(b[:]) +} + +func HexToBlobKey(h string) (BlobKey, error) { + b, err := hex.DecodeString(h) + if err != nil { + return BlobKey{}, err + } + return BlobKey(b), nil +} + +type BlobHeaderV2 struct { + BlobVersion BlobVersion `json:"version"` + QuorumIDs []QuorumID `json:"quorum_ids"` + BlobCommitment encoding.BlobCommitments `json:"commitments"` + + PaymentMetadata `json:"payment_metadata"` +} diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go index 8b87cb71f9..32ef9e8ad4 100644 --- a/disperser/common/v2/blob.go +++ b/disperser/common/v2/blob.go @@ -1,11 +1,6 @@ package v2 -import ( - "encoding/hex" - - "github.com/Layr-Labs/eigenda/core" - "github.com/Layr-Labs/eigenda/encoding" -) +import "github.com/Layr-Labs/eigenda/core" type BlobStatus uint @@ -16,36 +11,14 @@ const ( Failed ) -type BlobVersion uint32 - -type BlobKey [32]byte - -func (b BlobKey) Hex() string { - return hex.EncodeToString(b[:]) -} - -func HexToBlobKey(h string) (BlobKey, error) { - b, err := hex.DecodeString(h) - if err != nil { - return BlobKey{}, err - } - return BlobKey(b), nil -} - -type BlobHeader struct { - BlobVersion BlobVersion `json:"version"` - BlobQuorumInfos []*core.BlobQuorumInfo `json:"blob_quorum_infos"` - BlobCommitment encoding.BlobCommitments `json:"commitments"` - - core.PaymentMetadata `json:"payment_metadata"` -} - type BlobMetadata struct { - BlobHeader `json:"blob_header"` - - BlobStatus BlobStatus `json:"blob_status"` - Expiry uint64 `json:"expiry"` - NumRetries uint `json:"num_retries"` - BlobSize uint64 `json:"blob_size"` - RequestedAt uint64 `json:"requested_at"` + core.BlobHeaderV2 `json:"blob_header"` + + BlobKey core.BlobKey `json:"blob_key"` + BlobStatus BlobStatus `json:"blob_status"` + // Expiry is Unix timestamp of the blob expiry in seconds from epoch + Expiry uint64 `json:"expiry"` + NumRetries uint `json:"num_retries"` + BlobSize uint64 `json:"blob_size"` + RequestedAt uint64 `json:"requested_at"` } diff --git a/disperser/common/v2/blobstore/blob_metadata_store.go b/disperser/common/v2/blobstore/blob_metadata_store.go new file mode 100644 index 0000000000..8670d60bbb --- /dev/null +++ b/disperser/common/v2/blobstore/blob_metadata_store.go @@ -0,0 +1,15 @@ +package blobstore + +import ( + "context" + + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" +) + +type BlobMetadataStore interface { + PutBlobMetadata(ctx context.Context, metadata *v2.BlobMetadata) error + GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) + GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus) ([]*v2.BlobMetadata, error) + GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) +} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_test.go b/disperser/common/v2/blobstore/blob_metadata_store_test.go new file mode 100644 index 0000000000..7113bfb887 --- /dev/null +++ b/disperser/common/v2/blobstore/blob_metadata_store_test.go @@ -0,0 +1,137 @@ +package blobstore_test + +import ( + "context" + "fmt" + "math/big" + "os" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/google/uuid" + + "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" + "github.com/Layr-Labs/eigenda/inabox/deploy" + "github.com/ory/dockertest/v3" +) + +var ( + logger = logging.NewNoopLogger() + + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + + deployLocalStack bool + localStackPort = "4571" + + dynamoClient *dynamodb.Client + blobMetadataStore blobstore.BlobMetadataStore + + UUID = uuid.New() + metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) + + mockCommitment = encoding.BlobCommitments{} +) + +func TestMain(m *testing.M) { + setup(m) + code := m.Run() + teardown() + os.Exit(code) +} + +func setup(m *testing.M) { + + deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + if !deployLocalStack { + localStackPort = os.Getenv("LOCALSTACK_PORT") + } + + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) + if err != nil { + teardown() + panic("failed to start localstack container") + } + + } + + cfg := aws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort), + } + + _, err := test_utils.CreateTable(context.Background(), cfg, metadataTableName, blobstore.GenerateTableSchema(metadataTableName, 10, 10)) + if err != nil { + teardown() + panic("failed to create dynamodb table: " + err.Error()) + } + + dynamoClient, err = dynamodb.NewClient(cfg, logger) + if err != nil { + teardown() + panic("failed to create dynamodb client: " + err.Error()) + } + + blobMetadataStore = blobstore.NewBlobMetadataStore(dynamoClient, logger, metadataTableName, time.Hour) + + var X1, Y1 fp.Element + X1 = *X1.SetBigInt(big.NewInt(1)) + Y1 = *Y1.SetBigInt(big.NewInt(2)) + + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err = lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + if err != nil { + teardown() + panic("failed to create mock commitment: " + err.Error()) + } + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + + mockCommitment = encoding.BlobCommitments{ + Commitment: &encoding.G1Commitment{ + X: X1, + Y: Y1, + }, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + LengthProof: (*encoding.G2Commitment)(&lengthProof), + Length: 10, + } +} + +func teardown() { + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_v2.go b/disperser/common/v2/blobstore/blob_metadata_store_v2.go deleted file mode 100644 index 6808ef90f3..0000000000 --- a/disperser/common/v2/blobstore/blob_metadata_store_v2.go +++ /dev/null @@ -1,551 +0,0 @@ -package blobstore - -import ( - "context" - "fmt" - "strconv" - "time" - - commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - "github.com/Layr-Labs/eigenda/disperser" - v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" - "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" - "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/ethereum/go-ethereum/common/hexutil" -) - -const ( - StatusIndexName = "StatusIndex" - OperatorDispersalIndexName = "OperatorDispersalIndex" - OperatorResponseIndexName = "OperatorResponseIndex" -) - -// BlobMetadataStore is a blob metadata storage backed by DynamoDB -type BlobMetadataStore struct { - dynamoDBClient *commondynamodb.Client - logger logging.Logger - tableName string - ttl time.Duration -} - -func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { - logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) - return &BlobMetadataStore{ - dynamoDBClient: dynamoDBClient, - logger: logger.With("component", "BlobMetadataStoreV2"), - tableName: tableName, - ttl: ttl, - } -} - -func (s *BlobMetadataStore) CreateBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { - item, err := MarshalBlobMetadata(blobMetadata) - if err != nil { - return err - } - - return s.dynamoDBClient.PutItem(ctx, s.tableName, item) -} - -func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobKey) (*v2.BlobMetadata, error) { - item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ - "PK": &types.AttributeValueMemberS{ - Value: blobKey.BlobHash, - }, - "SK": &types.AttributeValueMemberS{ - Value: blobKey.MetadataHash, - }, - }) - - if item == nil { - return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey) - } - - if err != nil { - return nil, err - } - - metadata, err := UnmarshalBlobMetadata(item) - if err != nil { - return nil, err - } - - return metadata, nil -} - -// GetBulkBlobMetadata returns the metadata for the given blob keys -// Note: ordering of items is not guaranteed -func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*v2.BlobMetadata, error) { - keys := make([]map[string]types.AttributeValue, len(blobKeys)) - for i := 0; i < len(blobKeys); i += 1 { - keys[i] = map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{Value: blobKeys[i].BlobHash}, - "MetadataHash": &types.AttributeValueMemberS{Value: blobKeys[i].MetadataHash}, - } - } - items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys) - if err != nil { - return nil, err - } - - metadata := make([]*v2.BlobMetadata, len(items)) - for i, item := range items { - metadata[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, err - } - } - - return metadata, nil -} - -// GetBlobMetadataByStatus returns all the metadata with the given status -// Because this function scans the entire index, it should only be used for status with a limited number of items. -// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. -func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status disperser.BlobStatus) ([]*v2.BlobMetadata, error) { - items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ - ":status": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(status)), - }, - ":expiry": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), - }}) - if err != nil { - return nil, err - } - - metadata := make([]*v2.BlobMetadata, len(items)) - for i, item := range items { - metadata[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, err - } - } - - return metadata, nil -} - -// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status -// Because this function scans the entire index, it should only be used for status with a limited number of items. -// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. -func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status disperser.BlobStatus) (int32, error) { - count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ - ":status": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(status)), - }, - ":expiry": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), - }, - }) - if err != nil { - return 0, err - } - - return count, nil -} - -// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit -// along with items, also returns a pagination token that can be used to fetch the next set of items -// -// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. -// e.g 1mb limit for a single query -func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*v2.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { - - var attributeMap map[string]types.AttributeValue - var err error - - // Convert the exclusive start key to a map of AttributeValue - if exclusiveStartKey != nil { - attributeMap, err = convertToAttribMap(exclusiveStartKey) - if err != nil { - return nil, nil, err - } - } - - queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, expiryIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpresseionValues{ - ":status": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(status)), - }, - ":expiry": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), - }, - }, limit, attributeMap) - - if err != nil { - return nil, nil, err - } - - // When no more results to fetch, the LastEvaluatedKey is nil - if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { - return nil, nil, nil - } - - metadata := make([]*v2.BlobMetadata, len(queryResult.Items)) - for i, item := range queryResult.Items { - metadata[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, nil, err - } - } - - lastEvaluatedKey := queryResult.LastEvaluatedKey - if lastEvaluatedKey == nil { - return metadata, nil, nil - } - - // Convert the last evaluated key to a disperser.BlobStoreExclusiveStartKey - exclusiveStartKey, err = convertToExclusiveStartKey(lastEvaluatedKey) - if err != nil { - return nil, nil, err - } - return metadata, exclusiveStartKey, nil -} - -func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*v2.BlobMetadata, error) { - items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash", commondynamodb.ExpresseionValues{ - ":batch_header_hash": &types.AttributeValueMemberB{ - Value: batchHeaderHash[:], - }, - }) - if err != nil { - return nil, err - } - - if len(items) == 0 { - return nil, fmt.Errorf("there is no metadata for batch %x", batchHeaderHash) - } - - metadatas := make([]*v2.BlobMetadata, len(items)) - for i, item := range items { - metadatas[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, err - } - } - - return metadatas, nil -} - -// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit -// along with items, also returns a pagination token that can be used to fetch the next set of items -// -// Note that this may not return all the metadata for the batch if dynamodb query limit is reached. -// e.g 1mb limit for a single query -func (s *BlobMetadataStore) GetAllBlobMetadataByBatchWithPagination( - ctx context.Context, - batchHeaderHash [32]byte, - limit int32, - exclusiveStartKey *disperser.BatchIndexExclusiveStartKey, -) ([]*v2.BlobMetadata, *disperser.BatchIndexExclusiveStartKey, error) { - var attributeMap map[string]types.AttributeValue - var err error - - // Convert the exclusive start key to a map of AttributeValue - if exclusiveStartKey != nil { - attributeMap, err = convertToAttribMapBatchIndex(exclusiveStartKey) - if err != nil { - return nil, nil, err - } - } - - queryResult, err := s.dynamoDBClient.QueryIndexWithPagination( - ctx, - s.tableName, - batchIndexName, - "BatchHeaderHash = :batch_header_hash", - commondynamodb.ExpresseionValues{ - ":batch_header_hash": &types.AttributeValueMemberB{ - Value: batchHeaderHash[:], - }, - }, - limit, - attributeMap, - ) - if err != nil { - return nil, nil, err - } - - s.logger.Info("Query result", "items", len(queryResult.Items), "lastEvaluatedKey", queryResult.LastEvaluatedKey) - // When no more results to fetch, the LastEvaluatedKey is nil - if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { - return nil, nil, nil - } - - metadata := make([]*v2.BlobMetadata, len(queryResult.Items)) - for i, item := range queryResult.Items { - metadata[i], err = UnmarshalBlobMetadata(item) - if err != nil { - return nil, nil, err - } - } - - lastEvaluatedKey := queryResult.LastEvaluatedKey - if lastEvaluatedKey == nil { - return metadata, nil, nil - } - - // Convert the last evaluated key to a disperser.BatchIndexExclusiveStartKey - exclusiveStartKey, err = convertToExclusiveStartKeyBatchIndex(lastEvaluatedKey) - if err != nil { - return nil, nil, err - } - return metadata, exclusiveStartKey, nil -} - -func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*v2.BlobMetadata, error) { - items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash AND BlobIndex = :blob_index", commondynamodb.ExpresseionValues{ - ":batch_header_hash": &types.AttributeValueMemberB{ - Value: batchHeaderHash[:], - }, - ":blob_index": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(blobIndex)), - }}) - if err != nil { - return nil, err - } - - if len(items) == 0 { - return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", disperser.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex) - } - - if len(items) > 1 { - s.logger.Error("there are multiple metadata for batch %s and blob index %d", hexutil.Encode(batchHeaderHash[:]), blobIndex) - } - - metadata, err := UnmarshalBlobMetadata(items[0]) - if err != nil { - return nil, err - } - return metadata, nil -} - -func (s *BlobMetadataStore) IncrementNumRetries(ctx context.Context, existingMetadata *v2.BlobMetadata) error { - _, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{ - Value: existingMetadata.BlobHash, - }, - "MetadataHash": &types.AttributeValueMemberS{ - Value: existingMetadata.MetadataHash, - }, - }, commondynamodb.Item{ - "NumRetries": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(existingMetadata.NumRetries + 1)), - }, - }) - - return err -} - -func (s *BlobMetadataStore) UpdateConfirmationBlockNumber(ctx context.Context, existingMetadata *v2.BlobMetadata, confirmationBlockNumber uint32) error { - updated := *existingMetadata - if updated.ConfirmationInfo == nil { - return fmt.Errorf("failed to update confirmation block number because confirmation info is missing for blob key %s", existingMetadata.GetBlobKey().String()) - } - - updated.ConfirmationInfo.ConfirmationBlockNumber = confirmationBlockNumber - item, err := MarshalBlobMetadata(&updated) - if err != nil { - return err - } - - _, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{ - Value: existingMetadata.BlobHash, - }, - "MetadataHash": &types.AttributeValueMemberS{ - Value: existingMetadata.MetadataHash, - }, - }, item) - - return err -} - -func (s *BlobMetadataStore) UpdateBlobMetadata(ctx context.Context, metadataKey disperser.BlobKey, updated *v2.BlobMetadata) error { - item, err := MarshalBlobMetadata(updated) - if err != nil { - return err - } - - _, err = s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{ - Value: metadataKey.BlobHash, - }, - "MetadataHash": &types.AttributeValueMemberS{ - Value: metadataKey.MetadataHash, - }, - }, item) - - return err -} - -func (s *BlobMetadataStore) SetBlobStatus(ctx context.Context, metadataKey disperser.BlobKey, status disperser.BlobStatus) error { - _, err := s.dynamoDBClient.UpdateItem(ctx, s.tableName, map[string]types.AttributeValue{ - "BlobHash": &types.AttributeValueMemberS{ - Value: metadataKey.BlobHash, - }, - "MetadataHash": &types.AttributeValueMemberS{ - Value: metadataKey.MetadataHash, - }, - }, commondynamodb.Item{ - "BlobStatus": &types.AttributeValueMemberN{ - Value: strconv.Itoa(int(status)), - }, - }) - - return err -} - -func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { - return &dynamodb.CreateTableInput{ - AttributeDefinitions: []types.AttributeDefinition{ - { - AttributeName: aws.String("PK"), - AttributeType: types.ScalarAttributeTypeS, - }, - { - AttributeName: aws.String("SK"), - AttributeType: types.ScalarAttributeTypeS, - }, - }, - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("PK"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("SK"), - KeyType: types.KeyTypeRange, - }, - }, - TableName: aws.String(tableName), - GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ - { - IndexName: aws.String(StatusIndexName), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("BlobStatus"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("RequestedAt"), - KeyType: types.KeyTypeRange, - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacityUnits), - WriteCapacityUnits: aws.Int64(writeCapacityUnits), - }, - }, - { - IndexName: aws.String(OperatorDispersalIndexName), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("OperatorID"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("DispersedAt"), - KeyType: types.KeyTypeRange, - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacityUnits), - WriteCapacityUnits: aws.Int64(writeCapacityUnits), - }, - }, - { - IndexName: aws.String(OperatorResponseIndexName), - KeySchema: []types.KeySchemaElement{ - { - AttributeName: aws.String("OperatorID"), - KeyType: types.KeyTypeHash, - }, - { - AttributeName: aws.String("RespondedAt"), - KeyType: types.KeyTypeRange, - }, - }, - Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacityUnits), - WriteCapacityUnits: aws.Int64(writeCapacityUnits), - }, - }, - }, - ProvisionedThroughput: &types.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(readCapacityUnits), - WriteCapacityUnits: aws.Int64(writeCapacityUnits), - }, - } -} - -func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) { - return attributevalue.MarshalMap(metadata) -} - -func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { - metadata := v2.BlobMetadata{} - err := attributevalue.UnmarshalMap(item, &metadata) - if err != nil { - return nil, err - } - - return &metadata, nil -} - -func convertToExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BlobStoreExclusiveStartKey, error) { - blobStoreExclusiveStartKey := disperser.BlobStoreExclusiveStartKey{} - err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) - if err != nil { - return nil, err - } - - return &blobStoreExclusiveStartKey, nil -} - -func convertToExclusiveStartKeyBatchIndex(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BatchIndexExclusiveStartKey, error) { - blobStoreExclusiveStartKey := disperser.BatchIndexExclusiveStartKey{} - err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) - if err != nil { - return nil, err - } - - return &blobStoreExclusiveStartKey, nil -} - -func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) { - if blobStoreExclusiveStartKey == nil { - // Return an empty map or nil - return nil, nil - } - - avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) - if err != nil { - return nil, err - } - return avMap, nil -} - -func convertToAttribMapBatchIndex(blobStoreExclusiveStartKey *disperser.BatchIndexExclusiveStartKey) (map[string]types.AttributeValue, error) { - if blobStoreExclusiveStartKey == nil { - // Return an empty map or nil - return nil, nil - } - - avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) - if err != nil { - return nil, err - } - return avMap, nil -} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go b/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go deleted file mode 100644 index c1b1c92b71..0000000000 --- a/disperser/common/v2/blobstore/blob_metadata_store_v2_test.go +++ /dev/null @@ -1,444 +0,0 @@ -package blobstore_test - -import ( - "context" - "testing" - "time" - - commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - "github.com/Layr-Labs/eigenda/disperser" - "github.com/Layr-Labs/eigenda/encoding" - "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/consensys/gnark-crypto/ecc/bn254/fp" - "github.com/ethereum/go-ethereum/common" - "github.com/stretchr/testify/assert" -) - -func TestBlobMetadataStoreOperations(t *testing.T) { - ctx := context.Background() - blobKey1 := disperser.BlobKey{ - BlobHash: blobHash, - MetadataHash: "hash", - } - now := time.Now() - metadata1 := &disperser.BlobMetadata{ - MetadataHash: blobKey1.MetadataHash, - BlobHash: blobHash, - BlobStatus: disperser.Processing, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Unix()), - }, - } - blobKey2 := disperser.BlobKey{ - BlobHash: "blob2", - MetadataHash: "hash2", - } - metadata2 := &disperser.BlobMetadata{ - MetadataHash: blobKey2.MetadataHash, - BlobHash: blobKey2.BlobHash, - BlobStatus: disperser.Finalized, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Unix()), - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) - assert.NoError(t, err) - err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) - assert.NoError(t, err) - - fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) - assert.NoError(t, err) - assert.Equal(t, metadata1, fetchedMetadata) - fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) - assert.NoError(t, err) - assert.Equal(t, metadata2, fetchedMetadata) - - fetchBulk, err := blobMetadataStore.GetBulkBlobMetadata(ctx, []disperser.BlobKey{blobKey1, blobKey2}) - assert.NoError(t, err) - assert.Equal(t, metadata1, fetchBulk[0]) - assert.Equal(t, metadata2, fetchBulk[1]) - - processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing) - assert.NoError(t, err) - assert.Len(t, processing, 1) - assert.Equal(t, metadata1, processing[0]) - - processingCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Processing) - assert.NoError(t, err) - assert.Equal(t, int32(1), processingCount) - - err = blobMetadataStore.IncrementNumRetries(ctx, metadata1) - assert.NoError(t, err) - fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey1) - assert.NoError(t, err) - metadata1.NumRetries = 1 - assert.Equal(t, metadata1, fetchedMetadata) - - finalized, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Finalized) - assert.NoError(t, err) - assert.Len(t, finalized, 1) - assert.Equal(t, metadata2, finalized[0]) - - finalizedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Finalized) - assert.NoError(t, err) - assert.Equal(t, int32(1), finalizedCount) - - confirmedMetadata := getConfirmedMetadata(t, metadata1, 1) - err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata) - assert.NoError(t, err) - - metadata, err := blobMetadataStore.GetBlobMetadataInBatch(ctx, confirmedMetadata.ConfirmationInfo.BatchHeaderHash, confirmedMetadata.ConfirmationInfo.BlobIndex) - assert.NoError(t, err) - assert.Equal(t, metadata, confirmedMetadata) - - confirmedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Confirmed) - assert.NoError(t, err) - assert.Equal(t, int32(1), confirmedCount) - - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, - }, - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, - }, - }) -} - -func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { - ctx := context.Background() - blobKey1 := disperser.BlobKey{ - BlobHash: blobHash, - MetadataHash: "hash", - } - now := time.Now() - metadata1 := &disperser.BlobMetadata{ - MetadataHash: blobKey1.MetadataHash, - BlobHash: blobHash, - BlobStatus: disperser.Processing, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Unix()), - }, - } - blobKey2 := disperser.BlobKey{ - BlobHash: "blob2", - MetadataHash: "hash2", - } - metadata2 := &disperser.BlobMetadata{ - MetadataHash: blobKey2.MetadataHash, - BlobHash: blobKey2.BlobHash, - BlobStatus: disperser.Finalized, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Unix()), - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) - assert.NoError(t, err) - err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) - assert.NoError(t, err) - - fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) - assert.NoError(t, err) - assert.Equal(t, metadata1, fetchedMetadata) - fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) - assert.NoError(t, err) - assert.Equal(t, metadata2, fetchedMetadata) - - processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) - assert.NoError(t, err) - assert.Len(t, processing, 1) - assert.Equal(t, metadata1, processing[0]) - assert.NotNil(t, lastEvaluatedKey) - - finalized, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, nil) - assert.NoError(t, err) - assert.Len(t, finalized, 1) - assert.Equal(t, metadata2, finalized[0]) - assert.NotNil(t, lastEvaluatedKey) - - finalized, lastEvaluatedKey, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, lastEvaluatedKey) - assert.NoError(t, err) - assert.Len(t, finalized, 0) - assert.Nil(t, lastEvaluatedKey) - - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, - }, - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, - }, - }) -} - -func TestGetAllBlobMetadataByBatchWithPagination(t *testing.T) { - ctx := context.Background() - blobKey1 := disperser.BlobKey{ - BlobHash: blobHash, - MetadataHash: "hash", - } - expiry := uint64(time.Now().Add(time.Hour).Unix()) - metadata1 := &disperser.BlobMetadata{ - MetadataHash: blobKey1.MetadataHash, - BlobHash: blobHash, - BlobStatus: disperser.Processing, - Expiry: expiry, - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: 123, - }, - } - blobKey2 := disperser.BlobKey{ - BlobHash: "blob2", - MetadataHash: "hash2", - } - metadata2 := &disperser.BlobMetadata{ - MetadataHash: blobKey2.MetadataHash, - BlobHash: blobKey2.BlobHash, - BlobStatus: disperser.Finalized, - Expiry: expiry, - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: 123, - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) - assert.NoError(t, err) - err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) - assert.NoError(t, err) - - confirmedMetadata1 := getConfirmedMetadata(t, metadata1, 1) - err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata1) - assert.NoError(t, err) - - confirmedMetadata2 := getConfirmedMetadata(t, metadata2, 2) - err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey2, confirmedMetadata2) - assert.NoError(t, err) - - // Fetch the blob metadata with limit 1 - metadata, exclusiveStartKey, err := blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, nil) - assert.NoError(t, err) - assert.Equal(t, metadata[0], confirmedMetadata1) - assert.NotNil(t, exclusiveStartKey) - assert.Equal(t, confirmedMetadata1.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) - - // Get the next blob metadata with limit 1 and the exclusive start key - metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) - assert.NoError(t, err) - assert.Equal(t, metadata[0], confirmedMetadata2) - assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) - - // Fetching the next blob metadata should return an empty list - metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 1, exclusiveStartKey) - assert.NoError(t, err) - assert.Len(t, metadata, 0) - assert.Nil(t, exclusiveStartKey) - - // Fetch the blob metadata with limit 2 - metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 2, nil) - assert.NoError(t, err) - assert.Len(t, metadata, 2) - assert.Equal(t, metadata[0], confirmedMetadata1) - assert.Equal(t, metadata[1], confirmedMetadata2) - assert.NotNil(t, exclusiveStartKey) - assert.Equal(t, confirmedMetadata2.ConfirmationInfo.BlobIndex, exclusiveStartKey.BlobIndex) - - // Fetch the blob metadata with limit 3 should return only 2 items - metadata, exclusiveStartKey, err = blobMetadataStore.GetAllBlobMetadataByBatchWithPagination(ctx, confirmedMetadata1.ConfirmationInfo.BatchHeaderHash, 3, nil) - assert.NoError(t, err) - assert.Len(t, metadata, 2) - assert.Equal(t, metadata[0], confirmedMetadata1) - assert.Equal(t, metadata[1], confirmedMetadata2) - assert.Nil(t, exclusiveStartKey) - - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, - }, - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, - }, - }) -} - -func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) { - ctx := context.Background() - // Query BlobMetadataStore for a blob that does not exist - // This should return nil for both the blob and lastEvaluatedKey - processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) - assert.NoError(t, err) - assert.Nil(t, processing) - assert.Nil(t, lastEvaluatedKey) -} - -func TestShadowWriteBlobMetadata(t *testing.T) { - ctx := context.Background() - - blobKey := disperser.BlobKey{ - BlobHash: "shadowblob", - MetadataHash: "shadowhash", - } - expiry := uint64(time.Now().Add(time.Hour).Unix()) - metadata := &disperser.BlobMetadata{ - MetadataHash: blobKey.MetadataHash, - BlobHash: blobKey.BlobHash, - BlobStatus: disperser.Processing, - Expiry: expiry, - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: 123, - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - - err := shadowBlobMetadataStore.QueueNewBlobMetadata(ctx, metadata) - assert.NoError(t, err) - err = blobMetadataStore.SetBlobStatus(context.Background(), blobKey, disperser.Dispersing) - assert.NoError(t, err) - primaryMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey) - assert.NoError(t, err) - assert.Equal(t, disperser.Dispersing, primaryMetadata.BlobStatus) - - // Check that the shadow metadata exists but status has NOT been updated - shadowMetadataItem, err := dynamoClient.GetItem(ctx, shadowMetadataTableName, map[string]types.AttributeValue{ - "MetadataHash": &types.AttributeValueMemberS{ - Value: blobKey.MetadataHash, - }, - "BlobHash": &types.AttributeValueMemberS{ - Value: blobKey.BlobHash, - }, - }) - assert.NoError(t, err) - shadowMetadata := disperser.BlobMetadata{} - err = attributevalue.UnmarshalMap(shadowMetadataItem, &shadowMetadata) - assert.NoError(t, err) - assert.Equal(t, disperser.Processing, shadowMetadata.BlobStatus) - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, - }, - }) -} - -func TestFilterOutExpiredBlobMetadata(t *testing.T) { - ctx := context.Background() - - blobKey := disperser.BlobKey{ - BlobHash: "blob1", - MetadataHash: "hash1", - } - now := time.Now() - metadata := &disperser.BlobMetadata{ - MetadataHash: blobKey.MetadataHash, - BlobHash: blobKey.BlobHash, - BlobStatus: disperser.Processing, - Expiry: uint64(now.Add(-1).Unix()), - NumRetries: 0, - RequestMetadata: &disperser.RequestMetadata{ - BlobRequestHeader: blob.RequestHeader, - BlobSize: blobSize, - RequestedAt: uint64(now.Add(-1000).Unix()), - }, - ConfirmationInfo: &disperser.ConfirmationInfo{}, - } - - err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata) - assert.NoError(t, err) - - processing, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, disperser.Processing) - assert.NoError(t, err) - assert.Len(t, processing, 0) - - processingCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, disperser.Processing) - assert.NoError(t, err) - assert.Equal(t, int32(0), processingCount) - - processing, _, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 10, nil) - assert.NoError(t, err) - assert.Len(t, processing, 0) - - deleteItems(t, []commondynamodb.Key{ - { - "MetadataHash": &types.AttributeValueMemberS{Value: blobKey.MetadataHash}, - "BlobHash": &types.AttributeValueMemberS{Value: blobKey.BlobHash}, - }, - }) -} - -func deleteItems(t *testing.T, keys []commondynamodb.Key) { - _, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) - assert.NoError(t, err) -} - -func getConfirmedMetadata(t *testing.T, metadata *disperser.BlobMetadata, blobIndex uint32) *disperser.BlobMetadata { - batchHeaderHash := [32]byte{1, 2, 3} - var commitX, commitY fp.Element - _, err := commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") - assert.NoError(t, err) - _, err = commitY.SetString("9207254729396071334325696286939045899948985698134704137261649190717970615186") - assert.NoError(t, err) - commitment := &encoding.G1Commitment{ - X: commitX, - Y: commitY, - } - dataLength := 32 - batchID := uint32(99) - batchRoot := []byte("hello") - referenceBlockNumber := uint32(132) - confirmationBlockNumber := uint32(150) - sigRecordHash := [32]byte{0} - fee := []byte{0} - inclusionProof := []byte{1, 2, 3, 4, 5} - confirmationInfo := &disperser.ConfirmationInfo{ - BatchHeaderHash: batchHeaderHash, - BlobIndex: blobIndex, - SignatoryRecordHash: sigRecordHash, - ReferenceBlockNumber: referenceBlockNumber, - BatchRoot: batchRoot, - BlobInclusionProof: inclusionProof, - BlobCommitment: &encoding.BlobCommitments{ - Commitment: commitment, - Length: uint(dataLength), - }, - BatchID: batchID, - ConfirmationTxnHash: common.HexToHash("0x123"), - ConfirmationBlockNumber: confirmationBlockNumber, - Fee: fee, - } - metadata.BlobStatus = disperser.Confirmed - metadata.ConfirmationInfo = confirmationInfo - return metadata -} diff --git a/disperser/common/v2/blobstore/dynamo_store.go b/disperser/common/v2/blobstore/dynamo_store.go new file mode 100644 index 0000000000..d61616b7ed --- /dev/null +++ b/disperser/common/v2/blobstore/dynamo_store.go @@ -0,0 +1,283 @@ +package blobstore + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/disperser" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" +) + +const ( + StatusIndexName = "StatusIndex" + OperatorDispersalIndexName = "OperatorDispersalIndex" + OperatorResponseIndexName = "OperatorResponseIndex" + + blobKeyPrefix = "BlobKey#" + dispersalIDPrefix = "DispersalID#" + blobMetadataSK = "BlobMetadata" + certificateSK = "Certificate" +) + +// blobMetadataStore is a blob metadata storage backed by DynamoDB +type blobMetadataStore struct { + dynamoDBClient *commondynamodb.Client + logger logging.Logger + tableName string + ttl time.Duration +} + +var _ BlobMetadataStore = (*blobMetadataStore)(nil) + +func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) BlobMetadataStore { + logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) + return &blobMetadataStore{ + dynamoDBClient: dynamoDBClient, + logger: logger.With("component", "BlobMetadataStoreV2"), + tableName: tableName, + ttl: ttl, + } +} + +func (s *blobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { + item, err := MarshalBlobMetadata(blobMetadata) + if err != nil { + return err + } + + return s.dynamoDBClient.PutItem(ctx, s.tableName, item) +} + +func (s *blobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) { + item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ + "PK": &types.AttributeValueMemberS{ + Value: blobKeyPrefix + blobKey.Hex(), + }, + "SK": &types.AttributeValueMemberS{ + Value: blobMetadataSK, + }, + }) + + if item == nil { + return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey.Hex()) + } + + if err != nil { + return nil, err + } + + metadata, err := UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + + return metadata, nil +} + +// GetBlobMetadataByStatus returns all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +func (s *blobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus) ([]*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpressionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }}) + if err != nil { + return nil, err + } + + metadata := make([]*v2.BlobMetadata, len(items)) + for i, item := range items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadata, nil +} + +// GetBlobMetadataCountByStatus returns the count of all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +func (s *blobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) { + count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpressionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }, + ":expiry": &types.AttributeValueMemberN{ + Value: strconv.FormatInt(time.Now().Unix(), 10), + }, + }) + if err != nil { + return 0, err + } + + return count, nil +} + +func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { + return &dynamodb.CreateTableInput{ + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("PK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("SK"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("BlobStatus"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("Expiry"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("OperatorID"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("DispersedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, + { + AttributeName: aws.String("RespondedAt"), + AttributeType: types.ScalarAttributeTypeN, + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("PK"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("SK"), + KeyType: types.KeyTypeRange, + }, + }, + TableName: aws.String(tableName), + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String(StatusIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BlobStatus"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("Expiry"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorDispersalIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("DispersedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + { + IndexName: aws.String(OperatorResponseIndexName), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("OperatorID"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("RespondedAt"), + KeyType: types.KeyTypeRange, + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + }, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(readCapacityUnits), + WriteCapacityUnits: aws.Int64(writeCapacityUnits), + }, + } +} + +func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) { + fields, err := attributevalue.MarshalMap(metadata) + if err != nil { + return nil, fmt.Errorf("failed to marshal blob metadata: %w", err) + } + + // Add PK and SK fields + fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + metadata.BlobKey.Hex()} + fields["SK"] = &types.AttributeValueMemberS{Value: blobMetadataSK} + + return fields, nil +} + +func UnmarshalBlobKey(item commondynamodb.Item) (core.BlobKey, error) { + type Blob struct { + PK string + } + + blob := Blob{} + err := attributevalue.UnmarshalMap(item, &blob) + if err != nil { + return core.BlobKey{}, err + } + + bk := strings.TrimPrefix(blob.PK, blobKeyPrefix) + return core.HexToBlobKey(bk) +} + +func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { + metadata := v2.BlobMetadata{} + err := attributevalue.UnmarshalMap(item, &metadata) + if err != nil { + return nil, err + } + blobKey, err := UnmarshalBlobKey(item) + if err != nil { + return nil, err + } + metadata.BlobKey = blobKey + + return &metadata, nil +} diff --git a/disperser/common/v2/blobstore/dynamo_store_test.go b/disperser/common/v2/blobstore/dynamo_store_test.go new file mode 100644 index 0000000000..3be5ca20fc --- /dev/null +++ b/disperser/common/v2/blobstore/dynamo_store_test.go @@ -0,0 +1,96 @@ +package blobstore_test + +import ( + "context" + "testing" + "time" + + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/stretchr/testify/assert" +) + +func TestBlobMetadataStoreOperations(t *testing.T) { + ctx := context.Background() + blobHeader1 := &core.BlobHeaderV2{ + BlobVersion: 0, + QuorumIDs: []core.QuorumID{0}, + BlobCommitment: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 0, + CumulativePayment: 531, + }, + } + blobKey1 := core.BlobKey([32]byte{1, 2, 3}) + blobHeader2 := &core.BlobHeaderV2{ + BlobVersion: 0, + QuorumIDs: []core.QuorumID{1}, + BlobCommitment: mockCommitment, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x456", + BinIndex: 2, + CumulativePayment: 999, + }, + } + blobKey2 := core.BlobKey([32]byte{4, 5, 6}) + + now := time.Now() + metadata1 := &v2.BlobMetadata{ + BlobHeaderV2: *blobHeader1, + BlobKey: blobKey1, + BlobStatus: v2.Queued, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + } + metadata2 := &v2.BlobMetadata{ + BlobHeaderV2: *blobHeader2, + BlobKey: blobKey2, + BlobStatus: v2.Certified, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + } + err := blobMetadataStore.PutBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.PutBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchedMetadata) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, metadata2, fetchedMetadata) + + queued, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued) + assert.NoError(t, err) + assert.Len(t, queued, 1) + assert.Equal(t, metadata1, queued[0]) + certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified) + assert.NoError(t, err) + assert.Len(t, certified, 1) + assert.Equal(t, metadata2, certified[0]) + + queuedCount, err := blobMetadataStore.GetBlobMetadataCountByStatus(ctx, v2.Queued) + assert.NoError(t, err) + assert.Equal(t, int32(1), queuedCount) + + deleteItems(t, []commondynamodb.Key{ + { + "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey1.Hex()}, + "SK": &types.AttributeValueMemberS{Value: "BlobMetadata"}, + }, + { + "PK": &types.AttributeValueMemberS{Value: "BlobKey#" + blobKey2.Hex()}, + "SK": &types.AttributeValueMemberS{Value: "BlobMetadata"}, + }, + }) +} + +func deleteItems(t *testing.T, keys []commondynamodb.Key) { + failed, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) + assert.NoError(t, err) + assert.Len(t, failed, 0) +} From 3b4a2ebf853b0badb45f600d5be7486fcaca4ae3 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Wed, 23 Oct 2024 17:12:34 -0700 Subject: [PATCH 3/3] incorporate core/v2 --- core/data.go | 102 +++++---- core/v2/assignment.go | 6 +- core/v2/assignment_test.go | 10 +- core/v2/core_test.go | 8 +- core/v2/types.go | 194 ++++++++++++++++-- core/v2/types_test.go | 57 +++++ core/v2/validator.go | 4 +- disperser/common/v2/blob.go | 24 ++- .../v2/blobstore/blob_metadata_store.go | 15 -- .../v2/blobstore/blob_metadata_store_test.go | 5 +- disperser/common/v2/blobstore/dynamo_store.go | 55 +++-- .../common/v2/blobstore/dynamo_store_test.go | 56 ++--- 12 files changed, 386 insertions(+), 150 deletions(-) create mode 100644 core/v2/types_test.go delete mode 100644 disperser/common/v2/blobstore/blob_metadata_store.go diff --git a/core/data.go b/core/data.go index 91fd9769c9..7a064f5818 100644 --- a/core/data.go +++ b/core/data.go @@ -2,15 +2,17 @@ package core import ( "encoding/binary" - "encoding/hex" "errors" "fmt" "math/big" + "strconv" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/encoding" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/consensys/gnark-crypto/ecc/bn254" - "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/accounts/abi" + "golang.org/x/crypto/sha3" ) type AccountID = string @@ -495,23 +497,69 @@ type PaymentMetadata struct { } // Hash returns the Keccak256 hash of the PaymentMetadata -func (pm *PaymentMetadata) Hash() []byte { - // Create a byte slice to hold the serialized data - data := make([]byte, 0, len(pm.AccountID)+4+pm.CumulativePayment.BitLen()/8+1) +func (pm *PaymentMetadata) Hash() ([32]byte, error) { + blobHeaderType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{ + { + Name: "accountID", + Type: "string", + }, + { + Name: "binIndex", + Type: "uint32", + }, + { + Name: "cumulativePayment", + Type: "uint256", + }, + }) + if err != nil { + return [32]byte{}, err + } - // Append AccountID - data = append(data, []byte(pm.AccountID)...) + arguments := abi.Arguments{ + { + Type: blobHeaderType, + }, + } - // Append BinIndex - binIndexBytes := make([]byte, 4) - binary.BigEndian.PutUint32(binIndexBytes, pm.BinIndex) - data = append(data, binIndexBytes...) + bytes, err := arguments.Pack(pm) + if err != nil { + return [32]byte{}, err + } - // Append CumulativePayment - paymentBytes := pm.CumulativePayment.Bytes() - data = append(data, paymentBytes...) + var hash [32]byte + hasher := sha3.NewLegacyKeccak256() + hasher.Write(bytes) + copy(hash[:], hasher.Sum(nil)[:32]) - return crypto.Keccak256(data) + return hash, nil +} + +func (pm *PaymentMetadata) MarshalDynamoDBAttributeValue() (types.AttributeValue, error) { + return &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "AccountID": &types.AttributeValueMemberS{Value: pm.AccountID}, + "BinIndex": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", pm.BinIndex)}, + "CumulativePayment": &types.AttributeValueMemberN{ + Value: pm.CumulativePayment.String(), + }, + }, + }, nil +} + +func (pm *PaymentMetadata) UnmarshalDynamoDBAttributeValue(av types.AttributeValue) error { + m, ok := av.(*types.AttributeValueMemberM) + if !ok { + return fmt.Errorf("expected *types.AttributeValueMemberM, got %T", av) + } + pm.AccountID = m.Value["AccountID"].(*types.AttributeValueMemberS).Value + binIndex, err := strconv.ParseUint(m.Value["BinIndex"].(*types.AttributeValueMemberN).Value, 10, 32) + if err != nil { + return fmt.Errorf("failed to parse BinIndex: %w", err) + } + pm.BinIndex = uint32(binIndex) + pm.CumulativePayment, _ = new(big.Int).SetString(m.Value["CumulativePayment"].(*types.AttributeValueMemberN).Value, 10) + return nil } // OperatorInfo contains information about an operator which is stored on the blockchain state, @@ -528,27 +576,3 @@ type ActiveReservation struct { type OnDemandPayment struct { CumulativePayment *big.Int // Total amount deposited by the user } - -type BlobVersion uint32 - -type BlobKey [32]byte - -func (b BlobKey) Hex() string { - return hex.EncodeToString(b[:]) -} - -func HexToBlobKey(h string) (BlobKey, error) { - b, err := hex.DecodeString(h) - if err != nil { - return BlobKey{}, err - } - return BlobKey(b), nil -} - -type BlobHeaderV2 struct { - BlobVersion BlobVersion `json:"version"` - QuorumIDs []QuorumID `json:"quorum_ids"` - BlobCommitment encoding.BlobCommitments `json:"commitments"` - - PaymentMetadata `json:"payment_metadata"` -} diff --git a/core/v2/assignment.go b/core/v2/assignment.go index 1e50cdbfb9..53e6b96fb9 100644 --- a/core/v2/assignment.go +++ b/core/v2/assignment.go @@ -8,7 +8,7 @@ import ( "github.com/Layr-Labs/eigenda/core" ) -func GetAssignments(state *core.OperatorState, blobVersion byte, quorum uint8) (map[core.OperatorID]Assignment, error) { +func GetAssignments(state *core.OperatorState, blobVersion BlobVersion, quorum uint8) (map[core.OperatorID]Assignment, error) { params, ok := ParametersMap[blobVersion] if !ok { @@ -81,7 +81,7 @@ func GetAssignments(state *core.OperatorState, blobVersion byte, quorum uint8) ( } -func GetAssignment(state *core.OperatorState, blobVersion byte, quorum core.QuorumID, id core.OperatorID) (Assignment, error) { +func GetAssignment(state *core.OperatorState, blobVersion BlobVersion, quorum core.QuorumID, id core.OperatorID) (Assignment, error) { assignments, err := GetAssignments(state, blobVersion, quorum) if err != nil { @@ -96,7 +96,7 @@ func GetAssignment(state *core.OperatorState, blobVersion byte, quorum core.Quor return assignment, nil } -func GetChunkLength(blobVersion byte, blobLength uint32) (uint32, error) { +func GetChunkLength(blobVersion BlobVersion, blobLength uint32) (uint32, error) { if blobLength == 0 { return 0, fmt.Errorf("blob length must be greater than 0") diff --git a/core/v2/assignment_test.go b/core/v2/assignment_test.go index d1245f6c5d..d8b5d2da81 100644 --- a/core/v2/assignment_test.go +++ b/core/v2/assignment_test.go @@ -20,7 +20,7 @@ func TestOperatorAssignmentsV2(t *testing.T) { state := dat.GetTotalOperatorState(context.Background(), 0) operatorState := state.OperatorState - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) assignments, err := corev2.GetAssignments(operatorState, blobVersion, 0) assert.NoError(t, err) @@ -90,7 +90,7 @@ func TestAssignmentWithTooManyOperators(t *testing.T) { assert.Equal(t, len(state.Operators[0]), numOperators) - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) _, err = corev2.GetAssignments(state.OperatorState, blobVersion, 0) assert.Error(t, err) @@ -131,7 +131,7 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { state := dat.GetTotalOperatorState(context.Background(), 0) - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) assignments, err := corev2.GetAssignments(state.OperatorState, blobVersion, 0) assert.NoError(t, err) @@ -162,7 +162,7 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { func TestChunkLength(t *testing.T) { - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) pairs := []struct { blobLength uint32 @@ -188,7 +188,7 @@ func TestChunkLength(t *testing.T) { func TestInvalidChunkLength(t *testing.T) { - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) invalidLengths := []uint32{ 0, diff --git a/core/v2/core_test.go b/core/v2/core_test.go index f55303e066..c3d47fe41e 100644 --- a/core/v2/core_test.go +++ b/core/v2/core_test.go @@ -84,7 +84,7 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) { return p, v, nil } -func makeTestBlob(t *testing.T, p encoding.Prover, version uint8, refBlockNumber uint64, length int, quorums []core.QuorumID) (corev2.BlobCertificate, []byte) { +func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, refBlockNumber uint64, length int, quorums []core.QuorumID) (corev2.BlobCertificate, []byte) { data := make([]byte, length*31) _, err := rand.Read(data) @@ -101,7 +101,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version uint8, refBlockNumber header := corev2.BlobCertificate{ BlobHeader: corev2.BlobHeader{ - Version: version, + BlobVersion: version, QuorumNumbers: quorums, BlobCommitments: commitments, }, @@ -148,7 +148,7 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi for _, quorum := range header.QuorumNumbers { - assignments, err := corev2.GetAssignments(state, header.Version, quorum) + assignments, err := corev2.GetAssignments(state, header.BlobVersion, quorum) if err != nil { t.Fatal(err) } @@ -238,7 +238,7 @@ func TestValidationSucceeds(t *testing.T) { bn := uint64(0) - version := uint8(0) + version := corev2.BlobVersion(0) pool := workerpool.New(1) diff --git a/core/v2/types.go b/core/v2/types.go index f3c9eda15e..ac3b234ef4 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -1,20 +1,27 @@ package v2 import ( + "encoding/hex" "math" + "math/big" + "strings" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" + "github.com/ethereum/go-ethereum/accounts/abi" + "golang.org/x/crypto/sha3" ) var ( // TODO(mooselumph): Put these parameters on chain and add on-chain checks to ensure that the number of operators does not // conflict with the existing on-chain limits - ParametersMap = map[uint8]BlobVersionParameters{ + ParametersMap = map[BlobVersion]BlobVersionParameters{ 0: {CodingRate: 8, ReconstructionThreshold: 0.22, NumChunks: 8192}, } ) +type BlobVersion uint8 + // Assignment contains information about the set of chunks that a specific node will receive type Assignment struct { StartIndex uint32 @@ -30,27 +37,42 @@ func (c *Assignment) GetIndices() []uint32 { return indices } +type BlobKey [32]byte + +func (b BlobKey) Hex() string { + return hex.EncodeToString(b[:]) +} + +func HexToBlobKey(h string) (BlobKey, error) { + s := strings.TrimPrefix(h, "0x") + s = strings.TrimPrefix(s, "0X") + b, err := hex.DecodeString(s) + if err != nil { + return BlobKey{}, err + } + return BlobKey(b), nil +} + // BlobHeader contains all metadata related to a blob including commitments and parameters for encoding type BlobHeader struct { - Version uint8 + BlobVersion BlobVersion - encoding.BlobCommitments + BlobCommitments encoding.BlobCommitments - // QuorumInfos contains the quorum specific parameters for the blob - QuorumNumbers []uint8 + // QuorumNumbers contains the quorums the blob is dispersed to + QuorumNumbers []core.QuorumID - // PaymentHeader contains the payment information for the blob - core.PaymentMetadata + // PaymentMetadata contains the payment information for the blob + PaymentMetadata core.PaymentMetadata - // AuthenticationData is the signature of the blob header by the account ID - AuthenticationData []byte `json:"authentication_data"` + // Signature is the signature of the blob header by the account ID + Signature []byte } func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) { + params := ParametersMap[b.BlobVersion] - params := ParametersMap[b.Version] - - length, err := GetChunkLength(b.Version, uint32(b.Length)) + length, err := GetChunkLength(b.BlobVersion, uint32(b.BlobCommitments.Length)) if err != nil { return encoding.EncodingParams{}, err } @@ -59,7 +81,153 @@ func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) { NumChunks: uint64(params.NumChunks), ChunkLength: uint64(length), }, nil +} + +func (b *BlobHeader) BlobKey() (BlobKey, error) { + blobHeaderType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{ + { + Name: "blobVersion", + Type: "uint8", + }, + { + Name: "blobCommitments", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "commitment", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "X", + Type: "uint256", + }, + { + Name: "Y", + Type: "uint256", + }, + }, + }, + { + Name: "lengthCommitment", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "X", + Type: "uint256[2]", + }, + { + Name: "Y", + Type: "uint256[2]", + }, + }, + }, + { + Name: "lengthProof", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "X", + Type: "uint256[2]", + }, + { + Name: "Y", + Type: "uint256[2]", + }, + }, + }, + { + Name: "length", + Type: "uint32", + }, + }, + }, + { + Name: "quorumNumbers", + Type: "bytes", + }, + { + Name: "paymentMetadataHash", + Type: "bytes32", + }, + }) + if err != nil { + return [32]byte{}, err + } + + arguments := abi.Arguments{ + { + Type: blobHeaderType, + }, + } + type g1Commit struct { + X *big.Int + Y *big.Int + } + type g2Commit struct { + X [2]*big.Int + Y [2]*big.Int + } + type blobCommitments struct { + Commitment g1Commit + LengthCommitment g2Commit + LengthProof g2Commit + Length uint32 + } + + paymentHash, err := b.PaymentMetadata.Hash() + if err != nil { + return [32]byte{}, err + } + s := struct { + BlobVersion uint8 + BlobCommitments blobCommitments + QuorumNumbers []byte + PaymentMetadataHash [32]byte + }{ + BlobVersion: uint8(b.BlobVersion), + BlobCommitments: blobCommitments{ + Commitment: g1Commit{ + X: b.BlobCommitments.Commitment.X.BigInt(new(big.Int)), + Y: b.BlobCommitments.Commitment.Y.BigInt(new(big.Int)), + }, + LengthCommitment: g2Commit{ + X: [2]*big.Int{ + b.BlobCommitments.LengthCommitment.X.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthCommitment.X.A1.BigInt(new(big.Int)), + }, + Y: [2]*big.Int{ + b.BlobCommitments.LengthCommitment.Y.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthCommitment.Y.A1.BigInt(new(big.Int)), + }, + }, + LengthProof: g2Commit{ + X: [2]*big.Int{ + b.BlobCommitments.LengthProof.X.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthProof.X.A1.BigInt(new(big.Int)), + }, + Y: [2]*big.Int{ + b.BlobCommitments.LengthProof.Y.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthProof.Y.A1.BigInt(new(big.Int)), + }, + }, + Length: uint32(b.BlobCommitments.Length), + }, + QuorumNumbers: b.QuorumNumbers, + PaymentMetadataHash: paymentHash, + } + + bytes, err := arguments.Pack(s) + if err != nil { + return [32]byte{}, err + } + + var headerHash [32]byte + hasher := sha3.NewLegacyKeccak256() + hasher.Write(bytes) + copy(headerHash[:], hasher.Sum(nil)[:32]) + + return headerHash, nil } type BlobCertificate struct { @@ -79,9 +247,7 @@ type BlobVersionParameters struct { } func (p BlobVersionParameters) MaxNumOperators() uint32 { - return uint32(math.Floor(float64(p.NumChunks) * (1 - 1/(p.ReconstructionThreshold*float64(p.CodingRate))))) - } const ( diff --git a/core/v2/types_test.go b/core/v2/types_test.go new file mode 100644 index 0000000000..c2c9b908c8 --- /dev/null +++ b/core/v2/types_test.go @@ -0,0 +1,57 @@ +package v2_test + +import ( + "encoding/hex" + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/stretchr/testify/assert" +) + +func TestBlobKey(t *testing.T) { + blobKey := v2.BlobKey([32]byte{1, 2, 3}) + + assert.Equal(t, "0102030000000000000000000000000000000000000000000000000000000000", blobKey.Hex()) + bk, err := v2.HexToBlobKey(blobKey.Hex()) + assert.NoError(t, err) + assert.Equal(t, blobKey, bk) +} + +func TestPaymentHash(t *testing.T) { + pm := core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + } + hash, err := pm.Hash() + assert.NoError(t, err) + // 0xf5894a8e9281b5687c0c7757d3d45fb76152bf659e6e61b1062f4c6bcb69c449 verified in solidity + assert.Equal(t, "f5894a8e9281b5687c0c7757d3d45fb76152bf659e6e61b1062f4c6bcb69c449", hex.EncodeToString(hash[:])) +} + +func TestBlobKeyFromHeader(t *testing.T) { + data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES) + commitments, err := p.GetCommitments(data) + if err != nil { + t.Fatal(err) + } + + bh := v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + }, + Signature: []byte{1, 2, 3}, + } + blobKey, err := bh.BlobKey() + assert.NoError(t, err) + // 0xb19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc verified in solidity + assert.Equal(t, "b19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc", blobKey.Hex()) +} diff --git a/core/v2/validator.go b/core/v2/validator.go index 0d5a2960ee..db513c3191 100644 --- a/core/v2/validator.go +++ b/core/v2/validator.go @@ -43,7 +43,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Get the assignments for the quorum - assignment, err := GetAssignment(operatorState, blob.Version, quorum, v.operatorID) + assignment, err := GetAssignment(operatorState, blob.BlobVersion, quorum, v.operatorID) if err != nil { return nil, nil, err } @@ -57,7 +57,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Validate the chunkLength against the confirmation and adversary threshold parameters - chunkLength, err := GetChunkLength(blob.Version, uint32(blob.BlobHeader.Length)) + chunkLength, err := GetChunkLength(blob.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length)) if err != nil { return nil, nil, fmt.Errorf("invalid chunk length: %w", err) } diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go index 32ef9e8ad4..aca68db9b7 100644 --- a/disperser/common/v2/blob.go +++ b/disperser/common/v2/blob.go @@ -1,6 +1,8 @@ package v2 -import "github.com/Layr-Labs/eigenda/core" +import ( + core "github.com/Layr-Labs/eigenda/core/v2" +) type BlobStatus uint @@ -11,14 +13,20 @@ const ( Failed ) +// BlobMetadata is an internal representation of a blob's metadata. type BlobMetadata struct { - core.BlobHeaderV2 `json:"blob_header"` + BlobHeader *core.BlobHeader - BlobKey core.BlobKey `json:"blob_key"` - BlobStatus BlobStatus `json:"blob_status"` + // BlobStatus indicates the current status of the blob + BlobStatus BlobStatus // Expiry is Unix timestamp of the blob expiry in seconds from epoch - Expiry uint64 `json:"expiry"` - NumRetries uint `json:"num_retries"` - BlobSize uint64 `json:"blob_size"` - RequestedAt uint64 `json:"requested_at"` + Expiry uint64 + // NumRetries is the number of times the blob has been retried + NumRetries uint + // BlobSize is the size of the blob in bytes + BlobSize uint64 + // RequestedAt is the Unix timestamp of when the blob was requested in seconds + RequestedAt uint64 + // UpdatedAt is the Unix timestamp of when the blob was last updated in _nanoseconds_ + UpdatedAt uint64 } diff --git a/disperser/common/v2/blobstore/blob_metadata_store.go b/disperser/common/v2/blobstore/blob_metadata_store.go deleted file mode 100644 index 8670d60bbb..0000000000 --- a/disperser/common/v2/blobstore/blob_metadata_store.go +++ /dev/null @@ -1,15 +0,0 @@ -package blobstore - -import ( - "context" - - "github.com/Layr-Labs/eigenda/core" - v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" -) - -type BlobMetadataStore interface { - PutBlobMetadata(ctx context.Context, metadata *v2.BlobMetadata) error - GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) - GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus) ([]*v2.BlobMetadata, error) - GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) -} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_test.go b/disperser/common/v2/blobstore/blob_metadata_store_test.go index 7113bfb887..a0b63bdcf4 100644 --- a/disperser/common/v2/blobstore/blob_metadata_store_test.go +++ b/disperser/common/v2/blobstore/blob_metadata_store_test.go @@ -12,13 +12,13 @@ import ( "github.com/Layr-Labs/eigenda/common/aws/dynamodb" test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/inabox/deploy" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/consensys/gnark-crypto/ecc/bn254" "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/google/uuid" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" - "github.com/Layr-Labs/eigenda/inabox/deploy" "github.com/ory/dockertest/v3" ) @@ -32,7 +32,7 @@ var ( localStackPort = "4571" dynamoClient *dynamodb.Client - blobMetadataStore blobstore.BlobMetadataStore + blobMetadataStore *blobstore.BlobMetadataStore UUID = uuid.New() metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) @@ -61,7 +61,6 @@ func setup(m *testing.M) { teardown() panic("failed to start localstack container") } - } cfg := aws.ClientConfig{ diff --git a/disperser/common/v2/blobstore/dynamo_store.go b/disperser/common/v2/blobstore/dynamo_store.go index d61616b7ed..ca4d804740 100644 --- a/disperser/common/v2/blobstore/dynamo_store.go +++ b/disperser/common/v2/blobstore/dynamo_store.go @@ -8,7 +8,7 @@ import ( "time" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - "github.com/Layr-Labs/eigenda/core" + core "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser" v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigensdk-go/logging" @@ -23,33 +23,29 @@ const ( OperatorDispersalIndexName = "OperatorDispersalIndex" OperatorResponseIndexName = "OperatorResponseIndex" - blobKeyPrefix = "BlobKey#" - dispersalIDPrefix = "DispersalID#" - blobMetadataSK = "BlobMetadata" - certificateSK = "Certificate" + blobKeyPrefix = "BlobKey#" + blobMetadataSK = "BlobMetadata" ) -// blobMetadataStore is a blob metadata storage backed by DynamoDB -type blobMetadataStore struct { +// BlobMetadataStore is a blob metadata storage backed by DynamoDB +type BlobMetadataStore struct { dynamoDBClient *commondynamodb.Client logger logging.Logger tableName string ttl time.Duration } -var _ BlobMetadataStore = (*blobMetadataStore)(nil) - -func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) BlobMetadataStore { +func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) - return &blobMetadataStore{ + return &BlobMetadataStore{ dynamoDBClient: dynamoDBClient, - logger: logger.With("component", "BlobMetadataStoreV2"), + logger: logger.With("component", "blobMetadataStoreV2"), tableName: tableName, ttl: ttl, } } -func (s *blobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { +func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { item, err := MarshalBlobMetadata(blobMetadata) if err != nil { return err @@ -58,7 +54,7 @@ func (s *blobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v return s.dynamoDBClient.PutItem(ctx, s.tableName, item) } -func (s *blobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) { +func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) { item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ "PK": &types.AttributeValueMemberS{ Value: blobKeyPrefix + blobKey.Hex(), @@ -86,12 +82,12 @@ func (s *blobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.Bl // GetBlobMetadataByStatus returns all the metadata with the given status // Because this function scans the entire index, it should only be used for status with a limited number of items. -func (s *blobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus) ([]*v2.BlobMetadata, error) { - items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpressionValues{ +func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus, lastUpdatedAt uint64) ([]*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND UpdatedAt > :updatedAt", commondynamodb.ExpressionValues{ ":status": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), }, - ":expiry": &types.AttributeValueMemberN{ + ":updatedAt": &types.AttributeValueMemberN{ Value: strconv.FormatInt(time.Now().Unix(), 10), }}) if err != nil { @@ -111,14 +107,11 @@ func (s *blobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status // GetBlobMetadataCountByStatus returns the count of all the metadata with the given status // Because this function scans the entire index, it should only be used for status with a limited number of items. -func (s *blobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) { - count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpressionValues{ +func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) { + count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, StatusIndexName, "BlobStatus = :status", commondynamodb.ExpressionValues{ ":status": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), }, - ":expiry": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), - }, }) if err != nil { return 0, err @@ -130,10 +123,12 @@ func (s *blobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, st func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { return &dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ + // PK is the composite partition key { AttributeName: aws.String("PK"), AttributeType: types.ScalarAttributeTypeS, }, + // SK is the composite sort key { AttributeName: aws.String("SK"), AttributeType: types.ScalarAttributeTypeS, @@ -143,7 +138,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit AttributeType: types.ScalarAttributeTypeN, }, { - AttributeName: aws.String("Expiry"), + AttributeName: aws.String("UpdatedAt"), AttributeType: types.ScalarAttributeTypeN, }, { @@ -179,7 +174,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit KeyType: types.KeyTypeHash, }, { - AttributeName: aws.String("Expiry"), + AttributeName: aws.String("UpdatedAt"), KeyType: types.KeyTypeRange, }, }, @@ -246,7 +241,11 @@ func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) } // Add PK and SK fields - fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + metadata.BlobKey.Hex()} + blobKey, err := metadata.BlobHeader.BlobKey() + if err != nil { + return nil, err + } + fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + blobKey.Hex()} fields["SK"] = &types.AttributeValueMemberS{Value: blobMetadataSK} return fields, nil @@ -273,11 +272,5 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { if err != nil { return nil, err } - blobKey, err := UnmarshalBlobKey(item) - if err != nil { - return nil, err - } - metadata.BlobKey = blobKey - return &metadata, nil } diff --git a/disperser/common/v2/blobstore/dynamo_store_test.go b/disperser/common/v2/blobstore/dynamo_store_test.go index 3be5ca20fc..b87a2b8090 100644 --- a/disperser/common/v2/blobstore/dynamo_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_store_test.go @@ -2,11 +2,13 @@ package blobstore_test import ( "context" + "math/big" "testing" "time" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - "github.com/Layr-Labs/eigenda/core" + core "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/stretchr/testify/assert" @@ -14,45 +16,47 @@ import ( func TestBlobMetadataStoreOperations(t *testing.T) { ctx := context.Background() - blobHeader1 := &core.BlobHeaderV2{ - BlobVersion: 0, - QuorumIDs: []core.QuorumID{0}, - BlobCommitment: mockCommitment, + blobHeader1 := &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, PaymentMetadata: core.PaymentMetadata{ AccountID: "0x123", BinIndex: 0, - CumulativePayment: 531, + CumulativePayment: big.NewInt(532), }, } - blobKey1 := core.BlobKey([32]byte{1, 2, 3}) - blobHeader2 := &core.BlobHeaderV2{ - BlobVersion: 0, - QuorumIDs: []core.QuorumID{1}, - BlobCommitment: mockCommitment, + blobKey1, err := blobHeader1.BlobKey() + assert.NoError(t, err) + blobHeader2 := &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{1}, + BlobCommitments: mockCommitment, PaymentMetadata: core.PaymentMetadata{ AccountID: "0x456", BinIndex: 2, - CumulativePayment: 999, + CumulativePayment: big.NewInt(999), }, } - blobKey2 := core.BlobKey([32]byte{4, 5, 6}) + blobKey2, err := blobHeader2.BlobKey() + assert.NoError(t, err) now := time.Now() metadata1 := &v2.BlobMetadata{ - BlobHeaderV2: *blobHeader1, - BlobKey: blobKey1, - BlobStatus: v2.Queued, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, + BlobHeader: blobHeader1, + BlobStatus: v2.Queued, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), } metadata2 := &v2.BlobMetadata{ - BlobHeaderV2: *blobHeader2, - BlobKey: blobKey2, - BlobStatus: v2.Certified, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, + BlobHeader: blobHeader2, + BlobStatus: v2.Certified, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), } - err := blobMetadataStore.PutBlobMetadata(ctx, metadata1) + err = blobMetadataStore.PutBlobMetadata(ctx, metadata1) assert.NoError(t, err) err = blobMetadataStore.PutBlobMetadata(ctx, metadata2) assert.NoError(t, err) @@ -64,11 +68,11 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, metadata2, fetchedMetadata) - queued, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued) + queued, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued, 0) assert.NoError(t, err) assert.Len(t, queued, 1) assert.Equal(t, metadata1, queued[0]) - certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified) + certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified, 0) assert.NoError(t, err) assert.Len(t, certified, 1) assert.Equal(t, metadata2, certified[0])