Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Encoding manager #846

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,49 @@ func (c *Client) UpdateItem(ctx context.Context, tableName string, key Key, item
return resp.Attributes, err
}

func (c *Client) UpdateItemWithCondition(
ctx context.Context,
tableName string,
key Key,
item Item,
condition expression.ConditionBuilder,
) (Item, error) {
update := expression.UpdateBuilder{}
for itemKey, itemValue := range item {
// Ignore primary key updates
if _, ok := key[itemKey]; ok {
continue
}
update = update.Set(expression.Name(itemKey), expression.Value(itemValue))
}

expr, err := expression.NewBuilder().WithUpdate(update).WithCondition(condition).Build()
if err != nil {
return nil, err
}

resp, err := c.dynamoClient.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: aws.String(tableName),
Key: key,
ConditionExpression: expr.Condition(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
UpdateExpression: expr.Update(),
ReturnValues: types.ReturnValueUpdatedNew,
})

var ccfe *types.ConditionalCheckFailedException
if errors.As(err, &ccfe) {
return nil, ErrConditionFailed
}

if err != nil {
return nil, err
}

return resp.Attributes, err
}

// IncrementBy increments the attribute by the value for item that matches with the key
func (c *Client) IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error) {
// ADD numeric values
Expand Down
18 changes: 18 additions & 0 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils"
"github.com/Layr-Labs/eigenda/inabox/deploy"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -217,6 +218,22 @@ func TestBasicOperations(t *testing.T) {
})
assert.NoError(t, err)

// Attempt to update the item with invalid condition
_, err = dynamoClient.UpdateItemWithCondition(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
}, commondynamodb.Item{
"RequestedAt": &types.AttributeValueMemberN{Value: "456"},
}, expression.Name("Status").In(expression.Value("Dispersing")))
assert.Error(t, err)

// Attempt to update the item with valid condition
_, err = dynamoClient.UpdateItemWithCondition(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
}, commondynamodb.Item{
"RequestedAt": &types.AttributeValueMemberN{Value: "456"},
}, expression.Name("Status").In(expression.Value("Confirmed")))
assert.NoError(t, err)

_, err = dynamoClient.IncrementBy(ctx, tableName, commondynamodb.Key{
"MetadataKey": &types.AttributeValueMemberS{Value: "key"},
}, "BlobSize", 1000)
Expand All @@ -231,6 +248,7 @@ func TestBasicOperations(t *testing.T) {
assert.Equal(t, "0x123", fetchedItem["BatchHeaderHash"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0", fetchedItem["BlobIndex"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "1123", fetchedItem["BlobSize"].(*types.AttributeValueMemberN).Value)
assert.Equal(t, "456", fetchedItem["RequestedAt"].(*types.AttributeValueMemberN).Value)

err = dynamoClient.DeleteTable(ctx, tableName)
assert.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/core/meterer"
"github.com/Layr-Labs/eigenda/disperser"
dispcommon "github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/rs"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand Down Expand Up @@ -635,7 +636,7 @@ func (s *DispersalServer) GetBlobStatus(ctx context.Context, req *pb.BlobStatusR
s.logger.Debug("metadataKey", "metadataKey", metadataKey.String())
metadata, err := s.blobStore.GetBlobMetadata(ctx, metadataKey)
if err != nil {
if errors.Is(err, disperser.ErrMetadataNotFound) {
if errors.Is(err, dispcommon.ErrMetadataNotFound) {
s.metrics.HandleNotFoundRpcRequest("GetBlobStatus")
s.metrics.HandleNotFoundRequest("GetBlobStatus")
return nil, api.NewErrorNotFound("no metadata found for the requestID")
Expand Down Expand Up @@ -778,7 +779,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex)
if err != nil {
s.logger.Error("Failed to retrieve blob metadata", "err", err)
if errors.Is(err, disperser.ErrMetadataNotFound) {
if errors.Is(err, dispcommon.ErrMetadataNotFound) {
s.metrics.HandleNotFoundRpcRequest("RetrieveBlob")
s.metrics.HandleNotFoundRequest("RetrieveBlob")
return nil, api.NewErrorNotFound("no metadata found for the given batch header hash and blob index")
Expand Down
5 changes: 3 additions & 2 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
Expand Down Expand Up @@ -65,7 +66,7 @@ func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey dispers
})

if item == nil {
return nil, fmt.Errorf("%w: metadata not found for key %s", disperser.ErrMetadataNotFound, blobKey)
return nil, fmt.Errorf("%w: metadata not found for key %s", common.ErrMetadataNotFound, blobKey)
}

if err != nil {
Expand Down Expand Up @@ -312,7 +313,7 @@ func (s *BlobMetadataStore) GetBlobMetadataInBatch(ctx context.Context, batchHea
}

if len(items) == 0 {
return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", disperser.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex)
return nil, fmt.Errorf("%w: there is no metadata for batch %s and blob index %d", common.ErrMetadataNotFound, hexutil.Encode(batchHeaderHash[:]), blobIndex)
}

if len(items) > 1 {
Expand Down
2 changes: 1 addition & 1 deletion disperser/errors.go → disperser/common/errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package disperser
package common

import "errors"

Expand Down
25 changes: 13 additions & 12 deletions disperser/common/inmem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common"
)

// BlobStore is an in-memory implementation of the BlobStore interface
Expand Down Expand Up @@ -75,7 +76,7 @@ func (q *BlobStore) GetBlobContent(ctx context.Context, blobHash disperser.BlobH
if holder, ok := q.Blobs[blobHash]; ok {
return holder.Data, nil
} else {
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}
}

Expand All @@ -93,7 +94,7 @@ func (q *BlobStore) MarkBlobConfirmed(ctx context.Context, existingMetadata *dis
}
blobKey := existingMetadata.GetBlobKey()
if _, ok := q.Metadata[blobKey]; !ok {
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}
newMetadata := *existingMetadata
newMetadata.BlobStatus = disperser.Confirmed
Expand All @@ -106,7 +107,7 @@ func (q *BlobStore) MarkBlobDispersing(ctx context.Context, blobKey disperser.Bl
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}
q.Metadata[blobKey].BlobStatus = disperser.Dispersing
return nil
Expand All @@ -117,7 +118,7 @@ func (q *BlobStore) MarkBlobInsufficientSignatures(ctx context.Context, existing
defer q.mu.Unlock()
blobKey := existingMetadata.GetBlobKey()
if _, ok := q.Metadata[blobKey]; !ok {
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}
newMetadata := *existingMetadata
newMetadata.BlobStatus = disperser.InsufficientSignatures
Expand All @@ -130,7 +131,7 @@ func (q *BlobStore) MarkBlobFinalized(ctx context.Context, blobKey disperser.Blo
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

q.Metadata[blobKey].BlobStatus = disperser.Finalized
Expand All @@ -141,7 +142,7 @@ func (q *BlobStore) MarkBlobProcessing(ctx context.Context, blobKey disperser.Bl
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

q.Metadata[blobKey].BlobStatus = disperser.Processing
Expand All @@ -152,7 +153,7 @@ func (q *BlobStore) MarkBlobFailed(ctx context.Context, blobKey disperser.BlobKe
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[blobKey]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

q.Metadata[blobKey].BlobStatus = disperser.Failed
Expand All @@ -163,7 +164,7 @@ func (q *BlobStore) IncrementBlobRetryCount(ctx context.Context, existingMetadat
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

q.Metadata[existingMetadata.GetBlobKey()].NumRetries++
Expand All @@ -174,7 +175,7 @@ func (q *BlobStore) UpdateConfirmationBlockNumber(ctx context.Context, existingM
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.Metadata[existingMetadata.GetBlobKey()]; !ok {
return disperser.ErrBlobNotFound
return common.ErrBlobNotFound
}

if q.Metadata[existingMetadata.GetBlobKey()].ConfirmationInfo == nil {
Expand All @@ -196,7 +197,7 @@ func (q *BlobStore) GetBlobsByMetadata(ctx context.Context, metadata []*disperse
Data: holder.Data,
}
} else {
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}
}
return blobs, nil
Expand Down Expand Up @@ -266,7 +267,7 @@ func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]
}
}

return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}

func (q *BlobStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) {
Expand Down Expand Up @@ -327,7 +328,7 @@ func (q *BlobStore) GetBlobMetadata(ctx context.Context, blobKey disperser.BlobK
if meta, ok := q.Metadata[blobKey]; ok {
return meta, nil
}
return nil, disperser.ErrBlobNotFound
return nil, common.ErrBlobNotFound
}

func (q *BlobStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []disperser.BlobKey) ([]*disperser.BlobMetadata, error) {
Expand Down
3 changes: 3 additions & 0 deletions disperser/common/v2/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v2
import (
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
core "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
)

type BlobStatus uint
Expand Down Expand Up @@ -65,4 +66,6 @@ type BlobMetadata struct {
RequestedAt uint64
// UpdatedAt is the Unix timestamp of when the blob was last updated in _nanoseconds_
UpdatedAt uint64

*encoding.FragmentInfo
}
Loading
Loading