Skip to content

Commit

Permalink
Check type assertion for dynamo types (#1146)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Jan 24, 2025
1 parent 0f22447 commit d7b944f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
18 changes: 15 additions & 3 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,25 @@ func (pm *PaymentMetadata) UnmarshalDynamoDBAttributeValue(av types.AttributeVal
if !ok {
return fmt.Errorf("expected *types.AttributeValueMemberM, got %T", av)
}
pm.AccountID = m.Value["AccountID"].(*types.AttributeValueMemberS).Value
reservationPeriod, err := strconv.ParseUint(m.Value["ReservationPeriod"].(*types.AttributeValueMemberN).Value, 10, 32)
accountID, ok := m.Value["AccountID"].(*types.AttributeValueMemberS)
if !ok {
return fmt.Errorf("expected *types.AttributeValueMemberS for AccountID, got %T", m.Value["AccountID"])
}
pm.AccountID = accountID.Value
rp, ok := m.Value["ReservationPeriod"].(*types.AttributeValueMemberN)
if !ok {
return fmt.Errorf("expected *types.AttributeValueMemberN for ReservationPeriod, got %T", m.Value["ReservationPeriod"])
}
reservationPeriod, err := strconv.ParseUint(rp.Value, 10, 32)
if err != nil {
return fmt.Errorf("failed to parse ReservationPeriod: %w", err)
}
pm.ReservationPeriod = uint32(reservationPeriod)
pm.CumulativePayment, _ = new(big.Int).SetString(m.Value["CumulativePayment"].(*types.AttributeValueMemberN).Value, 10)
cp, ok := m.Value["CumulativePayment"].(*types.AttributeValueMemberN)
if !ok {
return fmt.Errorf("expected *types.AttributeValueMemberN for CumulativePayment, got %T", m.Value["CumulativePayment"])
}
pm.CumulativePayment, _ = new(big.Int).SetString(cp.Value, 10)
return nil
}

Expand Down
16 changes: 12 additions & 4 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey corev2
if errors.Is(err, commondynamodb.ErrConditionFailed) {
blob, err := s.GetBlobMetadata(ctx, blobKey)
if err != nil {
s.logger.Errorf("failed to get blob metadata for key %s: %v", blobKey.Hex(), err)
return fmt.Errorf("failed to get blob metadata for key %s: %v", blobKey.Hex(), err)
}

if blob.BlobStatus == status {
Expand Down Expand Up @@ -567,7 +567,11 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusPaginated(
lastEvaludatedKey := res.LastEvaluatedKey
if lastEvaludatedKey == nil {
lastItem := res.Items[len(res.Items)-1]
updatedAt, err := strconv.ParseUint(lastItem["UpdatedAt"].(*types.AttributeValueMemberN).Value, 10, 64)
u, ok := lastItem["UpdatedAt"].(*types.AttributeValueMemberN)
if !ok {
return nil, nil, fmt.Errorf("expected *types.AttributeValueMemberN for UpdatedAt, got %T", u)
}
updatedAt, err := strconv.ParseUint(u.Value, 10, 64)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1018,12 +1022,16 @@ func (s *BlobMetadataStore) GetSignedBatch(ctx context.Context, batchHeaderHash
var header *corev2.BatchHeader
var attestation *corev2.Attestation
for _, item := range items {
if strings.HasPrefix(item["SK"].(*types.AttributeValueMemberS).Value, batchHeaderSK) {
sk, ok := item["SK"].(*types.AttributeValueMemberS)
if !ok {
return nil, nil, fmt.Errorf("expected *types.AttributeValueMemberS for SK, got %T", item["SK"])
}
if strings.HasPrefix(sk.Value, batchHeaderSK) {
header, err = UnmarshalBatchHeader(item)
if err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal batch header: %w", err)
}
} else if strings.HasPrefix(item["SK"].(*types.AttributeValueMemberS).Value, attestationSK) {
} else if strings.HasPrefix(sk.Value, attestationSK) {
attestation, err = UnmarshalAttestation(item)
if err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal attestation: %w", err)
Expand Down
7 changes: 7 additions & 0 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
client, err := d.nodeClientManager.GetClient(host, v2DispersalPort)
if err != nil {
d.logger.Error("failed to get node client", "operator", opID.Hex(), "err", err)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: err,
}
continue
}

Expand Down

0 comments on commit d7b944f

Please sign in to comment.