diff --git a/core/aggregation.go b/core/aggregation.go index c0c96011d0..6b199e2870 100644 --- a/core/aggregation.go +++ b/core/aggregation.go @@ -339,7 +339,7 @@ func GetStakeThreshold(state *OperatorState, quorum QuorumID, quorumThreshold ui quorumThresholdBig := new(big.Int).SetUint64(uint64(quorumThreshold)) stakeThreshold := new(big.Int) stakeThreshold.Mul(quorumThresholdBig, state.Totals[quorum].Stake) - stakeThreshold = roundUpDivideBig(stakeThreshold, new(big.Int).SetUint64(percentMultiplier)) + stakeThreshold = RoundUpDivideBig(stakeThreshold, new(big.Int).SetUint64(percentMultiplier)) return stakeThreshold } diff --git a/core/assignment.go b/core/assignment.go index 465c1a2cfa..0c4ea0e13b 100644 --- a/core/assignment.go +++ b/core/assignment.go @@ -4,7 +4,6 @@ import ( "encoding/hex" "errors" "fmt" - "math" "math/big" "strings" ) @@ -130,7 +129,7 @@ func (c *StdAssignmentCoordinator) GetAssignments(state *OperatorState, blobLeng if denom.Cmp(big.NewInt(0)) == 0 { return nil, AssignmentInfo{}, fmt.Errorf("gammaChunkLength %d and total stake %d in quorum %d must be greater than 0", gammaChunkLength, totalStakes, quorum) } - m := roundUpDivideBig(num, denom) + m := RoundUpDivideBig(num, denom) numChunks += uint(m.Uint64()) chunksByOperator[r.Index] = uint(m.Uint64()) @@ -204,7 +203,7 @@ func (c *StdAssignmentCoordinator) ValidateChunkLength(state *OperatorState, blo } num := new(big.Int).Mul(big.NewInt(2*int64(blobLength*percentMultiplier)), minStake) denom := new(big.Int).Mul(big.NewInt(int64(info.ConfirmationThreshold-info.AdversaryThreshold)), totalStake) - maxChunkLength := uint(roundUpDivideBig(num, denom).Uint64()) + maxChunkLength := uint(RoundUpDivideBig(num, denom).Uint64()) maxChunkLength2 := RoundUpDivide(2*blobLength*percentMultiplier, MaxRequiredNumChunks*uint(info.ConfirmationThreshold-info.AdversaryThreshold)) @@ -212,7 +211,7 @@ func (c *StdAssignmentCoordinator) ValidateChunkLength(state *OperatorState, blo maxChunkLength = maxChunkLength2 } - maxChunkLength = uint(nextPowerOf2(uint64(maxChunkLength))) + maxChunkLength = uint(NextPowerOf2(maxChunkLength)) if info.ChunkLength > maxChunkLength { return false, fmt.Errorf("%w: chunk length: %d, max chunk length: %d", ErrChunkLengthTooLarge, info.ChunkLength, maxChunkLength) @@ -261,22 +260,3 @@ func (c *StdAssignmentCoordinator) CalculateChunkLength(state *OperatorState, bl } } - -func roundUpDivideBig(a, b *big.Int) *big.Int { - - one := new(big.Int).SetUint64(1) - num := new(big.Int).Sub(new(big.Int).Add(a, b), one) // a + b - 1 - res := new(big.Int).Div(num, b) // (a + b - 1) / b - return res - -} - -func RoundUpDivide(a, b uint) uint { - return (a + b - 1) / b - -} - -func nextPowerOf2(d uint64) uint64 { - nextPower := math.Ceil(math.Log2(float64(d))) - return uint64(math.Pow(2.0, nextPower)) -} diff --git a/core/mock/state.go b/core/mock/state.go index 6b566d7fc3..f182441ca0 100644 --- a/core/mock/state.go +++ b/core/mock/state.go @@ -2,6 +2,7 @@ package mock import ( "context" + "encoding/binary" "fmt" "math/big" "sort" @@ -36,7 +37,8 @@ type PrivateOperatorState struct { } func MakeOperatorId(id int) core.OperatorID { - data := [32]byte{uint8(id)} + var data [32]byte + binary.LittleEndian.PutUint64(data[:8], uint64(id)) return data } diff --git a/core/utils.go b/core/utils.go new file mode 100644 index 0000000000..479e61a872 --- /dev/null +++ b/core/utils.go @@ -0,0 +1,25 @@ +package core + +import ( + "math" + "math/big" + + "golang.org/x/exp/constraints" +) + +func RoundUpDivideBig(a, b *big.Int) *big.Int { + + one := new(big.Int).SetUint64(1) + num := new(big.Int).Sub(new(big.Int).Add(a, b), one) // a + b - 1 + res := new(big.Int).Div(num, b) // (a + b - 1) / b + return res +} + +func RoundUpDivide[T constraints.Integer](a, b T) T { + return (a + b - 1) / b +} + +func NextPowerOf2[T constraints.Integer](d T) T { + nextPower := math.Ceil(math.Log2(float64(d))) + return T(math.Pow(2.0, nextPower)) +} diff --git a/core/v2/assignment.go b/core/v2/assignment.go new file mode 100644 index 0000000000..1e50cdbfb9 --- /dev/null +++ b/core/v2/assignment.go @@ -0,0 +1,121 @@ +package v2 + +import ( + "fmt" + "math/big" + "sort" + + "github.com/Layr-Labs/eigenda/core" +) + +func GetAssignments(state *core.OperatorState, blobVersion byte, quorum uint8) (map[core.OperatorID]Assignment, error) { + + params, ok := ParametersMap[blobVersion] + if !ok { + return nil, fmt.Errorf("blob version %d not found", blobVersion) + } + + ops, ok := state.Operators[quorum] + if !ok { + return nil, fmt.Errorf("no operators found for quorum %d", quorum) + } + + if len(ops) > int(params.MaxNumOperators()) { + return nil, fmt.Errorf("too many operators for blob version %d", blobVersion) + } + + numOperators := big.NewInt(int64(len(ops))) + numChunks := big.NewInt(int64(params.NumChunks)) + + type assignment struct { + id core.OperatorID + index uint32 + chunks uint32 + stake *big.Int + } + + chunkAssignments := make([]assignment, 0, len(ops)) + for ID, r := range state.Operators[quorum] { + + num := new(big.Int).Mul(r.Stake, new(big.Int).Sub(numChunks, numOperators)) + denom := state.Totals[quorum].Stake + + chunks := core.RoundUpDivideBig(num, denom) + + chunkAssignments = append(chunkAssignments, assignment{id: ID, index: uint32(r.Index), chunks: uint32(chunks.Uint64()), stake: r.Stake}) + } + + // Sort chunk decreasing by stake or operator ID in case of a tie + sort.Slice(chunkAssignments, func(i, j int) bool { + if chunkAssignments[i].stake.Cmp(chunkAssignments[j].stake) == 0 { + return chunkAssignments[i].index < chunkAssignments[j].index + } + return chunkAssignments[i].stake.Cmp(chunkAssignments[j].stake) == 1 + }) + + mp := 0 + for _, a := range chunkAssignments { + mp += int(a.chunks) + } + + delta := int(params.NumChunks) - mp + if delta < 0 { + return nil, fmt.Errorf("total chunks %d exceeds maximum %d", mp, params.NumChunks) + } + + assignments := make(map[core.OperatorID]Assignment, len(chunkAssignments)) + index := uint32(0) + for i, a := range chunkAssignments { + if i < delta { + a.chunks++ + } + + assignments[a.id] = Assignment{ + StartIndex: index, + NumChunks: a.chunks, + } + index += a.chunks + } + + return assignments, nil + +} + +func GetAssignment(state *core.OperatorState, blobVersion byte, quorum core.QuorumID, id core.OperatorID) (Assignment, error) { + + assignments, err := GetAssignments(state, blobVersion, quorum) + if err != nil { + return Assignment{}, err + } + + assignment, ok := assignments[id] + if !ok { + return Assignment{}, ErrNotFound + } + + return assignment, nil +} + +func GetChunkLength(blobVersion byte, blobLength uint32) (uint32, error) { + + if blobLength == 0 { + return 0, fmt.Errorf("blob length must be greater than 0") + } + + // Check that the blob length is a power of 2 + if blobLength&(blobLength-1) != 0 { + return 0, fmt.Errorf("blob length %d is not a power of 2", blobLength) + } + + if _, ok := ParametersMap[blobVersion]; !ok { + return 0, fmt.Errorf("blob version %d not found", blobVersion) + } + + chunkLength := blobLength * ParametersMap[blobVersion].CodingRate / ParametersMap[blobVersion].NumChunks + if chunkLength == 0 { + chunkLength = 1 + } + + return chunkLength, nil + +} diff --git a/core/v2/assignment_test.go b/core/v2/assignment_test.go new file mode 100644 index 0000000000..d1245f6c5d --- /dev/null +++ b/core/v2/assignment_test.go @@ -0,0 +1,220 @@ +package v2_test + +import ( + "context" + "math/rand" + "testing" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/mock" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/stretchr/testify/assert" +) + +const ( + maxNumOperators = 3537 +) + +func TestOperatorAssignmentsV2(t *testing.T) { + + state := dat.GetTotalOperatorState(context.Background(), 0) + operatorState := state.OperatorState + + blobVersion := byte(0) + + assignments, err := corev2.GetAssignments(operatorState, blobVersion, 0) + assert.NoError(t, err) + expectedAssignments := map[core.OperatorID]corev2.Assignment{ + mock.MakeOperatorId(0): { + StartIndex: 7802, + NumChunks: 390, + }, + mock.MakeOperatorId(1): { + StartIndex: 7022, + NumChunks: 780, + }, + mock.MakeOperatorId(2): { + StartIndex: 5852, + NumChunks: 1170, + }, + mock.MakeOperatorId(3): { + StartIndex: 4291, + NumChunks: 1561, + }, + mock.MakeOperatorId(4): { + StartIndex: 2340, + NumChunks: 1951, + }, + mock.MakeOperatorId(5): { + StartIndex: 0, + NumChunks: 2340, + }, + } + + for operatorID, assignment := range assignments { + + assert.Equal(t, assignment, expectedAssignments[operatorID]) + + assignment, err := corev2.GetAssignment(operatorState, blobVersion, 0, operatorID) + assert.NoError(t, err) + + assert.Equal(t, assignment, expectedAssignments[operatorID]) + + } + +} + +func TestMaxNumOperators(t *testing.T) { + + assert.Equal(t, corev2.ParametersMap[0].MaxNumOperators(), uint32(maxNumOperators)) + +} + +func TestAssignmentWithTooManyOperators(t *testing.T) { + + numOperators := maxNumOperators + 1 + + stakes := map[core.QuorumID]map[core.OperatorID]int{ + 0: {}, + } + for i := 0; i < numOperators; i++ { + stakes[0][mock.MakeOperatorId(i)] = rand.Intn(100) + 1 + } + + dat, err := mock.NewChainDataMock(stakes) + if err != nil { + t.Fatal(err) + } + + state := dat.GetTotalOperatorState(context.Background(), 0) + + assert.Equal(t, len(state.Operators[0]), numOperators) + + blobVersion := byte(0) + + _, err = corev2.GetAssignments(state.OperatorState, blobVersion, 0) + assert.Error(t, err) + +} + +func FuzzOperatorAssignmentsV2(f *testing.F) { + + // Add distributions to fuzz + + for i := 1; i < 100; i++ { + f.Add(i) + } + + for i := 0; i < 100; i++ { + f.Add(rand.Intn(2048) + 100) + } + + for i := 0; i < 5; i++ { + f.Add(maxNumOperators) + } + + f.Fuzz(func(t *testing.T, numOperators int) { + + // Generate a random slice of integers of length n + + stakes := map[core.QuorumID]map[core.OperatorID]int{ + 0: {}, + } + for i := 0; i < numOperators; i++ { + stakes[0][mock.MakeOperatorId(i)] = rand.Intn(100) + 1 + } + + dat, err := mock.NewChainDataMock(stakes) + if err != nil { + t.Fatal(err) + } + + state := dat.GetTotalOperatorState(context.Background(), 0) + + blobVersion := byte(0) + + assignments, err := corev2.GetAssignments(state.OperatorState, blobVersion, 0) + assert.NoError(t, err) + + // Check that the total number of chunks is correct + totalChunks := uint32(0) + for _, assignment := range assignments { + totalChunks += assignment.NumChunks + } + assert.Equal(t, totalChunks, corev2.ParametersMap[blobVersion].NumChunks) + + // Check that each operator's assignment satisfies the security requirement + for operatorID, assignment := range assignments { + + totalStake := uint32(state.Totals[0].Stake.Uint64()) + myStake := uint32(state.Operators[0][operatorID].Stake.Uint64()) + + LHS := assignment.NumChunks * totalStake * corev2.ParametersMap[blobVersion].CodingRate * uint32(corev2.ParametersMap[blobVersion].ReconstructionThreshold*100) + RHS := 100 * myStake * corev2.ParametersMap[blobVersion].NumChunks + + assert.GreaterOrEqual(t, LHS, RHS) + + } + + }) + +} + +func TestChunkLength(t *testing.T) { + + blobVersion := byte(0) + + pairs := []struct { + blobLength uint32 + chunkLength uint32 + }{ + {512, 1}, + {1024, 1}, + {2048, 2}, + {4096, 4}, + {8192, 8}, + } + + for _, pair := range pairs { + + chunkLength, err := corev2.GetChunkLength(blobVersion, pair.blobLength) + + assert.NoError(t, err) + + assert.Equal(t, pair.chunkLength, chunkLength) + } + +} + +func TestInvalidChunkLength(t *testing.T) { + + blobVersion := byte(0) + + invalidLengths := []uint32{ + 0, + 3, + 5, + 6, + 7, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 31, + 63, + 127, + 255, + 511, + 1023, + } + + for _, length := range invalidLengths { + + _, err := corev2.GetChunkLength(blobVersion, length) + assert.Error(t, err) + } + +} diff --git a/core/v2/core_test.go b/core/v2/core_test.go new file mode 100644 index 0000000000..f55303e066 --- /dev/null +++ b/core/v2/core_test.go @@ -0,0 +1,267 @@ +package v2_test + +import ( + "context" + "crypto/rand" + "fmt" + "os" + "runtime" + "testing" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/mock" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/encoding/kzg" + "github.com/Layr-Labs/eigenda/encoding/kzg/prover" + "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigensdk-go/logging" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/gammazero/workerpool" + "github.com/hashicorp/go-multierror" + "github.com/stretchr/testify/assert" +) + +var ( + dat *mock.ChainDataMock + agg core.SignatureAggregator + + p encoding.Prover + v encoding.Verifier + + GETTYSBURG_ADDRESS_BYTES = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.") +) + +func TestMain(m *testing.M) { + var err error + dat, err = mock.MakeChainDataMock(map[uint8]int{ + 0: 6, + 1: 3, + }) + if err != nil { + panic(err) + } + logger := logging.NewNoopLogger() + reader := &mock.MockTransactor{} + reader.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil) + agg, err = core.NewStdSignatureAggregator(logger, reader) + if err != nil { + panic(err) + } + + p, v, err = makeTestComponents() + if err != nil { + panic("failed to start localstack container") + } + + code := m.Run() + os.Exit(code) +} + +// makeTestComponents makes a prover and verifier currently using the only supported backend. +func makeTestComponents() (encoding.Prover, encoding.Verifier, error) { + config := &kzg.KzgConfig{ + G1Path: "../../inabox/resources/kzg/g1.point.300000", + G2Path: "../../inabox/resources/kzg/g2.point.300000", + CacheDir: "../../inabox/resources/kzg/SRSTables", + SRSOrder: 8192, + SRSNumberToLoad: 8192, + NumWorker: uint64(runtime.GOMAXPROCS(0)), + } + + p, err := prover.NewProver(config, true) + if err != nil { + return nil, nil, err + } + + v, err := verifier.NewVerifier(config, true) + if err != nil { + return nil, nil, err + } + + return p, v, nil +} + +func makeTestBlob(t *testing.T, p encoding.Prover, version uint8, refBlockNumber uint64, length int, quorums []core.QuorumID) (corev2.BlobCertificate, []byte) { + + data := make([]byte, length*31) + _, err := rand.Read(data) + if err != nil { + t.Fatal(err) + } + + data = codec.ConvertByPaddingEmptyByte(data) + + commitments, err := p.GetCommitments(data) + if err != nil { + t.Fatal(err) + } + + header := corev2.BlobCertificate{ + BlobHeader: corev2.BlobHeader{ + Version: version, + QuorumNumbers: quorums, + BlobCommitments: commitments, + }, + ReferenceBlockNumber: refBlockNumber, + } + + return header, data + +} + +// prepareBlobs takes in multiple blob, encodes them, generates the associated assignments, and the batch header. +// These are the products that a disperser will need in order to disperse data to the DA nodes. +func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertificate, blobs [][]byte) (map[core.OperatorID][]*corev2.BlobShard, core.IndexedChainState) { + + cst, err := mock.MakeChainDataMock(map[uint8]int{ + 0: int(operatorCount), + 1: int(operatorCount), + 2: int(operatorCount), + }) + assert.NoError(t, err) + + blobsMap := make([]map[core.QuorumID]map[core.OperatorID][]*encoding.Frame, 0, len(headers)) + + for z, header := range headers { + + blob := blobs[z] + + params, err := header.GetEncodingParams() + if err != nil { + t.Fatal(err) + } + + chunks, err := p.GetFrames(blob, params) + if err != nil { + t.Fatal(err) + } + + state, err := cst.GetOperatorState(context.Background(), uint(header.ReferenceBlockNumber), header.QuorumNumbers) + if err != nil { + t.Fatal(err) + } + + blobMap := make(map[core.QuorumID]map[core.OperatorID][]*encoding.Frame) + + for _, quorum := range header.QuorumNumbers { + + assignments, err := corev2.GetAssignments(state, header.Version, quorum) + if err != nil { + t.Fatal(err) + } + + blobMap[quorum] = make(map[core.OperatorID][]*encoding.Frame) + + for opID, assignment := range assignments { + + blobMap[quorum][opID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks] + + } + + } + + blobsMap = append(blobsMap, blobMap) + } + + // Invert the blobsMap + inverseMap := make(map[core.OperatorID][]*corev2.BlobShard) + for blobIndex, blobMap := range blobsMap { + for quorum, operatorMap := range blobMap { + for operatorID, frames := range operatorMap { + + if _, ok := inverseMap[operatorID]; !ok { + inverseMap[operatorID] = make([]*corev2.BlobShard, 0) + } + if len(inverseMap[operatorID]) < blobIndex+1 { + inverseMap[operatorID] = append(inverseMap[operatorID], &corev2.BlobShard{ + BlobCertificate: headers[blobIndex], + Chunks: make(map[core.QuorumID][]*encoding.Frame), + }) + } + if len(frames) == 0 { + continue + } + inverseMap[operatorID][blobIndex].Chunks[quorum] = append(inverseMap[operatorID][blobIndex].Chunks[quorum], frames...) + + } + } + } + + return inverseMap, cst + +} + +// checkBatchByUniversalVerifier runs the verification logic for each DA node in the current OperatorState, and returns an error if any of +// the DA nodes' validation checks fails +func checkBatchByUniversalVerifier(cst core.IndexedChainState, packagedBlobs map[core.OperatorID][]*corev2.BlobShard, pool common.WorkerPool) error { + + ctx := context.Background() + + quorums := []core.QuorumID{0, 1} + state, _ := cst.GetIndexedOperatorState(context.Background(), 0, quorums) + // numBlob := len(encodedBlobs) + + var errList *multierror.Error + + for id := range state.IndexedOperators { + + val := corev2.NewShardValidator(v, cst, id) + + blobs := packagedBlobs[id] + + err := val.ValidateBlobs(ctx, blobs, pool) + if err != nil { + errList = multierror.Append(errList, err) + } + } + + return errList.ErrorOrNil() + +} + +func TestValidationSucceeds(t *testing.T) { + + // operatorCounts := []uint{1, 2, 4, 10, 30} + + // numBlob := 3 // must be greater than 0 + // blobLengths := []int{1, 32, 128} + + operatorCounts := []uint{4} + + numBlob := 1 // must be greater than 0 + blobLengths := []int{1, 2} + + quorumNumbers := []core.QuorumID{0, 1} + + bn := uint64(0) + + version := uint8(0) + + pool := workerpool.New(1) + + for _, operatorCount := range operatorCounts { + + // batch can only be tested per operatorCount, because the assignment would be wrong otherwise + headers := make([]corev2.BlobCertificate, 0) + blobs := make([][]byte, 0) + for _, blobLength := range blobLengths { + for i := 0; i < numBlob; i++ { + header, data := makeTestBlob(t, p, version, bn, blobLength, quorumNumbers) + headers = append(headers, header) + blobs = append(blobs, data) + } + } + + packagedBlobs, cst := prepareBlobs(t, operatorCount, headers, blobs) + + t.Run(fmt.Sprintf("universal verifier operatorCount=%v over %v blobs", operatorCount, len(blobs)), func(t *testing.T) { + err := checkBatchByUniversalVerifier(cst, packagedBlobs, pool) + assert.NoError(t, err) + }) + + } + +} diff --git a/core/v2/errors.go b/core/v2/errors.go new file mode 100644 index 0000000000..1c0e18749b --- /dev/null +++ b/core/v2/errors.go @@ -0,0 +1,7 @@ +package v2 + +import "errors" + +var ( + ErrNotFound = errors.New("not found") +) diff --git a/core/v2/types.go b/core/v2/types.go new file mode 100644 index 0000000000..f3c9eda15e --- /dev/null +++ b/core/v2/types.go @@ -0,0 +1,92 @@ +package v2 + +import ( + "math" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/encoding" +) + +var ( + // TODO(mooselumph): Put these parameters on chain and add on-chain checks to ensure that the number of operators does not + // conflict with the existing on-chain limits + ParametersMap = map[uint8]BlobVersionParameters{ + 0: {CodingRate: 8, ReconstructionThreshold: 0.22, NumChunks: 8192}, + } +) + +// Assignment contains information about the set of chunks that a specific node will receive +type Assignment struct { + StartIndex uint32 + NumChunks uint32 +} + +// GetIndices generates the list of ChunkIndices associated with a given assignment +func (c *Assignment) GetIndices() []uint32 { + indices := make([]uint32, c.NumChunks) + for ind := range indices { + indices[ind] = c.StartIndex + uint32(ind) + } + return indices +} + +// BlobHeader contains all metadata related to a blob including commitments and parameters for encoding +type BlobHeader struct { + Version uint8 + + encoding.BlobCommitments + + // QuorumInfos contains the quorum specific parameters for the blob + QuorumNumbers []uint8 + + // PaymentHeader contains the payment information for the blob + core.PaymentMetadata + + // AuthenticationData is the signature of the blob header by the account ID + AuthenticationData []byte `json:"authentication_data"` +} + +func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) { + + params := ParametersMap[b.Version] + + length, err := GetChunkLength(b.Version, uint32(b.Length)) + if err != nil { + return encoding.EncodingParams{}, err + } + + return encoding.EncodingParams{ + NumChunks: uint64(params.NumChunks), + ChunkLength: uint64(length), + }, nil + +} + +type BlobCertificate struct { + BlobHeader + + // ReferenceBlockNumber is the block number of the block at which the operator state will be referenced + ReferenceBlockNumber uint64 + + // RelayKeys + RelayKeys []uint16 +} + +type BlobVersionParameters struct { + CodingRate uint32 + ReconstructionThreshold float64 + NumChunks uint32 +} + +func (p BlobVersionParameters) MaxNumOperators() uint32 { + + return uint32(math.Floor(float64(p.NumChunks) * (1 - 1/(p.ReconstructionThreshold*float64(p.CodingRate))))) + +} + +const ( + // We use uint8 to count the number of quorums, so we can have at most 255 quorums, + // which means the max ID can not be larger than 254 (from 0 to 254, there are 255 + // different IDs). + MaxQuorumID = 254 +) diff --git a/core/v2/validator.go b/core/v2/validator.go new file mode 100644 index 0000000000..0d5a2960ee --- /dev/null +++ b/core/v2/validator.go @@ -0,0 +1,199 @@ +package v2 + +import ( + "context" + "errors" + "fmt" + + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/encoding" +) + +var ( + ErrChunkLengthMismatch = errors.New("chunk length mismatch") + ErrBlobQuorumSkip = errors.New("blob skipped for a quorum before verification") +) + +type BlobShard struct { + BlobCertificate + Chunks map[core.QuorumID][]*encoding.Frame +} + +// shardValidator implements the validation logic that a DA node should apply to its received data +type ShardValidator struct { + verifier encoding.Verifier + chainState core.ChainState + operatorID core.OperatorID +} + +func NewShardValidator(v encoding.Verifier, cst core.ChainState, operatorID core.OperatorID) *ShardValidator { + return &ShardValidator{ + verifier: v, + chainState: cst, + operatorID: operatorID, + } +} + +func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShard, operatorState *core.OperatorState) ([]*encoding.Frame, *Assignment, error) { + + // Check if the operator is a member of the quorum + if _, ok := operatorState.Operators[quorum]; !ok { + return nil, nil, fmt.Errorf("%w: operator %s is not a member of quorum %d", ErrBlobQuorumSkip, v.operatorID.Hex(), quorum) + } + + // Get the assignments for the quorum + assignment, err := GetAssignment(operatorState, blob.Version, quorum, v.operatorID) + if err != nil { + return nil, nil, err + } + + // Validate the number of chunks + if assignment.NumChunks == 0 { + return nil, nil, fmt.Errorf("%w: operator %s has no chunks in quorum %d", ErrBlobQuorumSkip, v.operatorID.Hex(), quorum) + } + if assignment.NumChunks != uint32(len(blob.Chunks[quorum])) { + return nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d) for quorum %d", len(blob.Chunks[quorum]), assignment.NumChunks, quorum) + } + + // Validate the chunkLength against the confirmation and adversary threshold parameters + chunkLength, err := GetChunkLength(blob.Version, uint32(blob.BlobHeader.Length)) + if err != nil { + return nil, nil, fmt.Errorf("invalid chunk length: %w", err) + } + + // Get the chunk length + chunks := blob.Chunks[quorum] + for _, chunk := range chunks { + if uint32(chunk.Length()) != chunkLength { + return nil, nil, fmt.Errorf("%w: chunk length (%d) does not match quorum header (%d) for quorum %d", ErrChunkLengthMismatch, chunk.Length(), chunkLength, quorum) + } + } + + return chunks, &assignment, nil +} + +func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, pool common.WorkerPool) error { + var err error + subBatchMap := make(map[encoding.EncodingParams]*encoding.SubBatch) + blobCommitmentList := make([]encoding.BlobCommitments, len(blobs)) + + for k, blob := range blobs { + if len(blob.Chunks) != len(blob.BlobHeader.QuorumNumbers) { + return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Chunks), len(blob.BlobHeader.QuorumNumbers)) + } + + state, err := v.chainState.GetOperatorState(ctx, uint(blob.ReferenceBlockNumber), blob.BlobHeader.QuorumNumbers) + if err != nil { + return err + } + + // Saved for the blob length validation + blobCommitmentList[k] = blob.BlobHeader.BlobCommitments + + // for each quorum + for _, quorum := range blob.BlobHeader.QuorumNumbers { + chunks, assignment, err := v.validateBlobQuorum(quorum, blob, state) + if err != nil { + return err + } + // TODO: Define params for the blob + params, err := blob.GetEncodingParams() + if err != nil { + return err + } + + if errors.Is(err, ErrBlobQuorumSkip) { + continue + } else if err != nil { + return err + } else { + // Check the received chunks against the commitment + blobIndex := 0 + subBatch, ok := subBatchMap[params] + if ok { + blobIndex = subBatch.NumBlobs + } + + indices := assignment.GetIndices() + samples := make([]encoding.Sample, len(chunks)) + for ind := range chunks { + samples[ind] = encoding.Sample{ + Commitment: blob.BlobHeader.BlobCommitments.Commitment, + Chunk: chunks[ind], + AssignmentIndex: uint(indices[ind]), + BlobIndex: blobIndex, + } + } + + // update subBatch + if !ok { + subBatchMap[params] = &encoding.SubBatch{ + Samples: samples, + NumBlobs: 1, + } + } else { + subBatch.Samples = append(subBatch.Samples, samples...) + subBatch.NumBlobs += 1 + } + } + } + } + + // Parallelize the universal verification for each subBatch + numResult := len(subBatchMap) + len(blobCommitmentList) + // create a channel to accept results, we don't use stop + out := make(chan error, numResult) + + // parallelize subBatch verification + for params, subBatch := range subBatchMap { + params := params + subBatch := subBatch + pool.Submit(func() { + v.universalVerifyWorker(params, subBatch, out) + }) + } + + // parallelize length proof verification + for _, blobCommitments := range blobCommitmentList { + blobCommitments := blobCommitments + pool.Submit(func() { + v.VerifyBlobLengthWorker(blobCommitments, out) + }) + } + // check if commitments are equivalent + err = v.verifier.VerifyCommitEquivalenceBatch(blobCommitmentList) + if err != nil { + return err + } + + for i := 0; i < numResult; i++ { + err := <-out + if err != nil { + return err + } + } + + return nil +} + +func (v *ShardValidator) universalVerifyWorker(params encoding.EncodingParams, subBatch *encoding.SubBatch, out chan error) { + + err := v.verifier.UniversalVerifySubBatch(params, subBatch.Samples, subBatch.NumBlobs) + if err != nil { + out <- err + return + } + + out <- nil +} + +func (v *ShardValidator) VerifyBlobLengthWorker(blobCommitments encoding.BlobCommitments, out chan error) { + err := v.verifier.VerifyBlobLength(blobCommitments) + if err != nil { + out <- err + return + } + + out <- nil +} diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 2a14aa7ff6..eda194f5ef 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -353,7 +353,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata params := encoding.ParamsFromMins(chunkLength, info.TotalChunks) - err = encoding.ValidateEncodingParams(params, int(blobLength), e.SRSOrder) + err = encoding.ValidateEncodingParamsAndBlobLength(params, uint64(blobLength), uint64(e.SRSOrder)) if err != nil { e.logger.Error("invalid encoding params", "err", err) // Cancel the blob diff --git a/encoding/encoding.go b/encoding/encoding.go index 5acd0539ae..3a050bfa8c 100644 --- a/encoding/encoding.go +++ b/encoding/encoding.go @@ -11,6 +11,10 @@ type Prover interface { // for any number M such that M*params.ChunkLength > BlobCommitments.Length, then any set of M chunks will be sufficient to // reconstruct the blob. EncodeAndProve(data []byte, params EncodingParams) (BlobCommitments, []*Frame, error) + + GetCommitments(data []byte) (BlobCommitments, error) + + GetFrames(data []byte, params EncodingParams) ([]*Frame, error) } type Verifier interface { diff --git a/encoding/kzg/prover/parametrized_prover.go b/encoding/kzg/prover/parametrized_prover.go index af04863840..294c2578ac 100644 --- a/encoding/kzg/prover/parametrized_prover.go +++ b/encoding/kzg/prover/parametrized_prover.go @@ -23,32 +23,38 @@ type ParametrizedProver struct { Computer ProofDevice } -type RsEncodeResult struct { +type rsEncodeResult struct { Frames []rs.Frame Indices []uint32 Duration time.Duration Err error } -type LengthCommitmentResult struct { +type lengthCommitmentResult struct { LengthCommitment bn254.G2Affine Duration time.Duration Err error } -type LengthProofResult struct { +type lengthProofResult struct { LengthProof bn254.G2Affine Duration time.Duration Err error } -type CommitmentResult struct { +type commitmentResult struct { Commitment bn254.G1Affine Duration time.Duration Err error } -type ProofsResult struct { +type proofsResult struct { Proofs []bn254.G1Affine Duration time.Duration Err error } +type commitmentsResult struct { + commitment *bn254.G1Affine + lengthCommitment *bn254.G2Affine + lengthProof *bn254.G2Affine + Error error +} // just a wrapper to take bytes not Fr Element func (g *ParametrizedProver) EncodeBytes(inputBytes []byte) (*bn254.G1Affine, *bn254.G2Affine, *bn254.G2Affine, []encoding.Frame, []uint32, error) { @@ -67,30 +73,56 @@ func (g *ParametrizedProver) Encode(inputFr []fr.Element) (*bn254.G1Affine, *bn2 encodeStart := time.Now() - rsChan := make(chan RsEncodeResult, 1) - lengthCommitmentChan := make(chan LengthCommitmentResult, 1) - lengthProofChan := make(chan LengthProofResult, 1) - commitmentChan := make(chan CommitmentResult, 1) - proofChan := make(chan ProofsResult, 1) + commitmentsChan := make(chan commitmentsResult, 1) // inputFr is untouched // compute chunks go func() { - start := time.Now() - frames, indices, err := g.Encoder.Encode(inputFr) - rsChan <- RsEncodeResult{ - Frames: frames, - Indices: indices, - Err: err, - Duration: time.Since(start), + commitment, lengthCommitment, lengthProof, err := g.GetCommitments(inputFr) + + commitmentsChan <- commitmentsResult{ + commitment: commitment, + lengthCommitment: lengthCommitment, + lengthProof: lengthProof, + Error: err, } }() + frames, indices, err := g.GetFrames(inputFr) + if err != nil { + return nil, nil, nil, nil, nil, err + } + + commitmentResult := <-commitmentsChan + if commitmentResult.Error != nil { + return nil, nil, nil, nil, nil, commitmentResult.Error + } + + totalProcessingTime := time.Since(encodeStart) + + if g.Verbose { + log.Printf("Total encoding took %v\n", totalProcessingTime) + } + return commitmentResult.commitment, commitmentResult.lengthCommitment, commitmentResult.lengthProof, frames, indices, nil +} + +func (g *ParametrizedProver) GetCommitments(inputFr []fr.Element) (*bn254.G1Affine, *bn254.G2Affine, *bn254.G2Affine, error) { + + if len(inputFr) > int(g.KzgConfig.SRSNumberToLoad) { + return nil, nil, nil, fmt.Errorf("poly Coeff length %v is greater than Loaded SRS points %v", len(inputFr), int(g.KzgConfig.SRSNumberToLoad)) + } + + encodeStart := time.Now() + + lengthCommitmentChan := make(chan lengthCommitmentResult, 1) + lengthProofChan := make(chan lengthProofResult, 1) + commitmentChan := make(chan commitmentResult, 1) + // compute commit for the full poly go func() { start := time.Now() commit, err := g.Computer.ComputeCommitment(inputFr) - commitmentChan <- CommitmentResult{ + commitmentChan <- commitmentResult{ Commitment: *commit, Err: err, Duration: time.Since(start), @@ -100,7 +132,7 @@ func (g *ParametrizedProver) Encode(inputFr []fr.Element) (*bn254.G1Affine, *bn2 go func() { start := time.Now() lengthCommitment, err := g.Computer.ComputeLengthCommitment(inputFr) - lengthCommitmentChan <- LengthCommitmentResult{ + lengthCommitmentChan <- lengthCommitmentResult{ LengthCommitment: *lengthCommitment, Err: err, Duration: time.Since(start), @@ -110,13 +142,59 @@ func (g *ParametrizedProver) Encode(inputFr []fr.Element) (*bn254.G1Affine, *bn2 go func() { start := time.Now() lengthProof, err := g.Computer.ComputeLengthProof(inputFr) - lengthProofChan <- LengthProofResult{ + lengthProofChan <- lengthProofResult{ LengthProof: *lengthProof, Err: err, Duration: time.Since(start), } }() + lengthProofResult := <-lengthProofChan + lengthCommitmentResult := <-lengthCommitmentChan + commitmentResult := <-commitmentChan + + if lengthProofResult.Err != nil || lengthCommitmentResult.Err != nil || + commitmentResult.Err != nil { + return nil, nil, nil, multierror.Append(lengthProofResult.Err, lengthCommitmentResult.Err, commitmentResult.Err) + } + totalProcessingTime := time.Since(encodeStart) + + log.Printf("\n\t\tCommiting %-v\n\t\tLengthCommit %-v\n\t\tlengthProof %-v\n\t\tMetaInfo. order %-v shift %v\n", + commitmentResult.Duration, + lengthCommitmentResult.Duration, + lengthProofResult.Duration, + g.SRSOrder, + g.SRSOrder-uint64(len(inputFr)), + ) + + if g.Verbose { + log.Printf("Total encoding took %v\n", totalProcessingTime) + } + return &commitmentResult.Commitment, &lengthCommitmentResult.LengthCommitment, &lengthProofResult.LengthProof, nil +} + +func (g *ParametrizedProver) GetFrames(inputFr []fr.Element) ([]encoding.Frame, []uint32, error) { + + if len(inputFr) > int(g.KzgConfig.SRSNumberToLoad) { + return nil, nil, fmt.Errorf("poly Coeff length %v is greater than Loaded SRS points %v", len(inputFr), int(g.KzgConfig.SRSNumberToLoad)) + } + + proofChan := make(chan proofsResult, 1) + rsChan := make(chan rsEncodeResult, 1) + + // inputFr is untouched + // compute chunks + go func() { + start := time.Now() + frames, indices, err := g.Encoder.Encode(inputFr) + rsChan <- rsEncodeResult{ + Frames: frames, + Indices: indices, + Err: err, + Duration: time.Since(start), + } + }() + go func() { start := time.Now() // compute proofs @@ -131,31 +209,22 @@ func (g *ParametrizedProver) Encode(inputFr []fr.Element) (*bn254.G1Affine, *bn2 } proofs, err := g.Computer.ComputeMultiFrameProof(flatpaddedCoeffs, g.NumChunks, g.ChunkLength, g.NumWorker) - proofChan <- ProofsResult{ + proofChan <- proofsResult{ Proofs: proofs, Err: err, Duration: time.Since(start), } }() - lengthProofResult := <-lengthProofChan - lengthCommitmentResult := <-lengthCommitmentChan - commitmentResult := <-commitmentChan rsResult := <-rsChan proofsResult := <-proofChan - if lengthProofResult.Err != nil || lengthCommitmentResult.Err != nil || - commitmentResult.Err != nil || rsResult.Err != nil || - proofsResult.Err != nil { - return nil, nil, nil, nil, nil, multierror.Append(lengthProofResult.Err, lengthCommitmentResult.Err, commitmentResult.Err, rsResult.Err, proofsResult.Err) + if rsResult.Err != nil || proofsResult.Err != nil { + return nil, nil, multierror.Append(rsResult.Err, proofsResult.Err) } - totalProcessingTime := time.Since(encodeStart) - log.Printf("\n\t\tRS encode %-v\n\t\tCommiting %-v\n\t\tLengthCommit %-v\n\t\tlengthProof %-v\n\t\tmultiProof %-v\n\t\tMetaInfo. order %-v shift %v\n", + log.Printf("\n\t\tRS encode %-v\n\t\tmultiProof %-v\n\t\tMetaInfo. order %-v shift %v\n", rsResult.Duration, - commitmentResult.Duration, - lengthCommitmentResult.Duration, - lengthProofResult.Duration, proofsResult.Duration, g.SRSOrder, g.SRSOrder-uint64(len(inputFr)), @@ -170,8 +239,6 @@ func (g *ParametrizedProver) Encode(inputFr []fr.Element) (*bn254.G1Affine, *bn2 } } - if g.Verbose { - log.Printf("Total encoding took %v\n", totalProcessingTime) - } - return &commitmentResult.Commitment, &lengthCommitmentResult.LengthCommitment, &lengthProofResult.LengthProof, kzgFrames, rsResult.Indices, nil + return kzgFrames, rsResult.Indices, nil + } diff --git a/encoding/kzg/prover/prover.go b/encoding/kzg/prover/prover.go index 0c51983114..64786a16e4 100644 --- a/encoding/kzg/prover/prover.go +++ b/encoding/kzg/prover/prover.go @@ -171,6 +171,68 @@ func (e *Prover) EncodeAndProve(data []byte, params encoding.EncodingParams) (en return commitments, chunks, nil } +func (e *Prover) GetFrames(data []byte, params encoding.EncodingParams) ([]*encoding.Frame, error) { + + symbols, err := rs.ToFrArray(data) + if err != nil { + return nil, err + } + + enc, err := e.GetKzgEncoder(params) + if err != nil { + return nil, err + } + + kzgFrames, _, err := enc.GetFrames(symbols) + if err != nil { + return nil, err + } + + chunks := make([]*encoding.Frame, len(kzgFrames)) + for ind, frame := range kzgFrames { + + chunks[ind] = &encoding.Frame{ + Coeffs: frame.Coeffs, + Proof: frame.Proof, + } + } + + return chunks, nil +} + +func (e *Prover) GetCommitments(data []byte) (encoding.BlobCommitments, error) { + + symbols, err := rs.ToFrArray(data) + if err != nil { + return encoding.BlobCommitments{}, err + } + + params := encoding.EncodingParams{ + NumChunks: 2, + ChunkLength: 2, + } + + enc, err := e.GetKzgEncoder(params) + if err != nil { + return encoding.BlobCommitments{}, err + } + + commit, lengthCommit, lengthProof, err := enc.GetCommitments(symbols) + if err != nil { + return encoding.BlobCommitments{}, err + } + + length := uint(len(symbols)) + commitments := encoding.BlobCommitments{ + Commitment: (*encoding.G1Commitment)(commit), + LengthCommitment: (*encoding.G2Commitment)(lengthCommit), + LengthProof: (*encoding.G2Commitment)(lengthProof), + Length: length, + } + + return commitments, nil +} + func (g *Prover) GetKzgEncoder(params encoding.EncodingParams) (*ParametrizedProver, error) { g.mu.Lock() defer g.mu.Unlock() diff --git a/encoding/kzg/prover/prover_cpu.go b/encoding/kzg/prover/prover_cpu.go index 62f9989f77..5a230b51cb 100644 --- a/encoding/kzg/prover/prover_cpu.go +++ b/encoding/kzg/prover/prover_cpu.go @@ -4,7 +4,6 @@ package prover import ( - "fmt" "log" "math" @@ -21,9 +20,8 @@ import ( func (g *Prover) newProver(params encoding.EncodingParams) (*ParametrizedProver, error) { - // Check that the parameters are valid with respect to the SRS. - if params.ChunkLength*params.NumChunks >= g.SRSOrder { - return nil, fmt.Errorf("the supplied encoding parameters are not valid with respect to the SRS. ChunkLength: %d, NumChunks: %d, SRSOrder: %d", params.ChunkLength, params.NumChunks, g.SRSOrder) + if err := encoding.ValidateEncodingParams(params, g.SRSOrder); err != nil { + return nil, err } encoder, err := rs.NewEncoder(params, g.Verbose) diff --git a/encoding/mock/encoder.go b/encoding/mock/encoder.go index 3f643fb745..d7ca708238 100644 --- a/encoding/mock/encoder.go +++ b/encoding/mock/encoder.go @@ -23,6 +23,18 @@ func (e *MockEncoder) EncodeAndProve(data []byte, params encoding.EncodingParams return args.Get(0).(encoding.BlobCommitments), args.Get(1).([]*encoding.Frame), args.Error(2) } +func (e *MockEncoder) GetCommitments(data []byte) (encoding.BlobCommitments, error) { + args := e.Called(data) + time.Sleep(e.Delay) + return args.Get(0).(encoding.BlobCommitments), args.Error(1) +} + +func (e *MockEncoder) GetFrames(data []byte, params encoding.EncodingParams) ([]*encoding.Frame, error) { + args := e.Called(data, params) + time.Sleep(e.Delay) + return args.Get(0).([]*encoding.Frame), args.Error(1) +} + func (e *MockEncoder) VerifyFrames(chunks []*encoding.Frame, indices []encoding.ChunkNumber, commitments encoding.BlobCommitments, params encoding.EncodingParams) error { args := e.Called(chunks, indices, commitments, params) time.Sleep(e.Delay) diff --git a/encoding/params.go b/encoding/params.go index e334fb8967..422fa0799c 100644 --- a/encoding/params.go +++ b/encoding/params.go @@ -60,13 +60,29 @@ func GetNumSys(dataSize uint64, chunkLen uint64) uint64 { } // ValidateEncodingParams takes in the encoding parameters and returns an error if they are invalid. -func ValidateEncodingParams(params EncodingParams, blobLength, SRSOrder int) error { - - if int(params.ChunkLength*params.NumChunks) >= SRSOrder { +func ValidateEncodingParams(params EncodingParams, SRSOrder uint64) error { + + // Check that the parameters are valid with respect to the SRS. The precomputed terms of the amortized KZG + // prover use up to order params.ChunkLen*params.NumChunks-1 for the SRS, so we must have + // params.ChunkLen*params.NumChunks-1 <= g.SRSOrder. The condition below could technically + // be relaxed to params.ChunkLen*params.NumChunks > g.SRSOrder+1, but because all of the paramters are + // powers of 2, the stricter condition is equivalent. + if params.ChunkLength*params.NumChunks > SRSOrder { return fmt.Errorf("the supplied encoding parameters are not valid with respect to the SRS. ChunkLength: %d, NumChunks: %d, SRSOrder: %d", params.ChunkLength, params.NumChunks, SRSOrder) } - if int(params.ChunkLength*params.NumChunks) < blobLength { + return nil + +} + +// ValidateEncodingParamsAndBlobLength takes in the encoding parameters and blob length and returns an error if they are collectively invalid. +func ValidateEncodingParamsAndBlobLength(params EncodingParams, blobLength, SRSOrder uint64) error { + + if err := ValidateEncodingParams(params, SRSOrder); err != nil { + return err + } + + if params.ChunkLength*params.NumChunks < blobLength { return errors.New("the supplied encoding parameters are not sufficient for the size of the data input") }