Skip to content

Commit

Permalink
[6/N] Chunk encoding optimization: Disperser/Retriever support for ne…
Browse files Browse the repository at this point in the history
…w chunk encoding (#650)
  • Loading branch information
jianoaix authored and pschork committed Jul 28, 2024
1 parent 585d3a6 commit 8b4b82c
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 25 deletions.
19 changes: 18 additions & 1 deletion api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clients

import (
"context"
"errors"
"time"

"github.com/Layr-Labs/eigenda/api/grpc/node"
Expand Down Expand Up @@ -120,7 +121,23 @@ func (c client) GetChunks(

chunks := make([]*encoding.Frame, len(reply.GetChunks()))
for i, data := range reply.GetChunks() {
chunk, err := new(encoding.Frame).Deserialize(data)
var chunk *encoding.Frame
switch reply.GetEncoding() {
case node.ChunkEncoding_GNARK:
chunk, err = new(encoding.Frame).DeserializeGnark(data)
case node.ChunkEncoding_GOB:
chunk, err = new(encoding.Frame).Deserialize(data)
case node.ChunkEncoding_UNKNOWN:
// For backward compatibility, we fallback the UNKNOWN to GNARK
chunk, err = new(encoding.Frame).DeserializeGnark(data)
if err != nil {
chunksChan <- RetrievedChunks{
OperatorID: opID,
Err: errors.New("UNKNOWN chunk encoding format"),
Chunks: nil,
}
}
}
if err != nil {
chunksChan <- RetrievedChunks{
OperatorID: opID,
Expand Down
66 changes: 44 additions & 22 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
)

type Config struct {
Timeout time.Duration
Timeout time.Duration
EnableGnarkBundleEncoding bool
}

type dispatcher struct {
Expand Down Expand Up @@ -127,7 +128,7 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
start := time.Now()
request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader)
request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,7 +169,7 @@ func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
start := time.Now()
request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader)
request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -279,12 +280,12 @@ func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalCl
return &core.Signature{G1Point: point}, nil
}

func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) {
func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := int64(0)
for i, blob := range blobMessages {
var err error
blobs[i], err = getBlobMessage(blob)
blobs[i], err = getBlobMessage(blob, useGnarkBundleEncoding)
if err != nil {
return nil, 0, err
}
Expand All @@ -299,12 +300,12 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B
return request, totalSize, nil
}

func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreBlobsRequest, int64, error) {
func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreBlobsRequest, int64, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := int64(0)
for i, blob := range blobMessages {
var err error
blobs[i], err = getBlobMessage(blob)
blobs[i], err = getBlobMessage(blob, useGnarkBundleEncoding)
if err != nil {
return nil, 0, err
}
Expand All @@ -319,7 +320,7 @@ func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.Ba
return request, totalSize, nil
}

func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) {
func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node.Blob, error) {
if blob.BlobHeader == nil {
return nil, errors.New("blob header is nil")
}
Expand Down Expand Up @@ -356,22 +357,43 @@ func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) {
}
}

data, err := blob.Bundles.Serialize()
if err != nil {
return nil, err
}
bundles := make([]*node.Bundle, len(quorumHeaders))
// the ordering of quorums in bundles must be same as in quorumHeaders
for i, quorumHeader := range quorumHeaders {
quorum := quorumHeader.QuorumId
if _, ok := blob.Bundles[uint8(quorum)]; ok {
bundles[i] = &node.Bundle{
Chunks: data[quorum],
if useGnarkBundleEncoding {
// the ordering of quorums in bundles must be same as in quorumHeaders
for i, quorumHeader := range quorumHeaders {
quorum := quorumHeader.QuorumId
if bundle, ok := blob.Bundles[uint8(quorum)]; ok {
bundleBytes, err := bundle.Serialize()
if err != nil {
return nil, err
}
bundles[i] = &node.Bundle{
Bundle: bundleBytes,
}
} else {
bundles[i] = &node.Bundle{
// empty bundle for quorums operators are not part of
Bundle: make([]byte, 0),
}
}
} else {
bundles[i] = &node.Bundle{
// empty bundle for quorums operators are not part of
Chunks: make([][]byte, 0),
}
} else {
data, err := blob.Bundles.Serialize()
if err != nil {
return nil, err
}
// the ordering of quorums in bundles must be same as in quorumHeaders
for i, quorumHeader := range quorumHeaders {
quorum := quorumHeader.QuorumId
if _, ok := blob.Bundles[uint8(quorum)]; ok {
bundles[i] = &node.Bundle{
Chunks: data[quorum],
}
} else {
bundles[i] = &node.Bundle{
// empty bundle for quorums operators are not part of
Chunks: make([][]byte, 0),
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Config struct {
EigenDAServiceManagerAddr string

EnableMinibatch bool

EnableGnarkBundleEncoding bool
}

func NewConfig(ctx *cli.Context) (Config, error) {
Expand Down Expand Up @@ -88,6 +90,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
IndexerConfig: indexer.ReadIndexerConfig(ctx),
KMSKeyConfig: kmsConfig,
EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name),
EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name),
}
return config, nil
}
7 changes: 7 additions & 0 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"),
Value: 75,
}
EnableGnarkBundleEncodingFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-gnark-bundle-encoding"),
Usage: "Enable Gnark bundle encoding for chunks",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_GNARK_BUNDLE_ENCODING"),
}
// EnableMinibatchFlag is a flag to enable minibatch processing
// Defaults to false
EnableMinibatchFlag = cli.BoolFlag{
Expand Down Expand Up @@ -258,6 +264,7 @@ var optionalFlags = []cli.Flag{
MinibatcherPullIntervalFlag,
MaxNodeConnectionsFlag,
MaxNumRetriesPerDispersalFlag,
EnableGnarkBundleEncodingFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
3 changes: 2 additions & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func RunBatcher(ctx *cli.Context) error {
metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger)

dispatcher := dispatcher.NewDispatcher(&dispatcher.Config{
Timeout: config.TimeoutConfig.AttestationTimeout,
Timeout: config.TimeoutConfig.AttestationTimeout,
EnableGnarkBundleEncoding: config.EnableGnarkBundleEncoding,
}, logger, metrics.DispatcherMetrics)
asgn := &core.StdAssignmentCoordinator{}

Expand Down
2 changes: 1 addition & 1 deletion node/grpc/server_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestStoreChunks(t *testing.T) {
numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0])
}
t.Logf("Batch numTotalChunks: %d", numTotalChunks)
req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader)
req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader, false)
fmt.Println("totalSize", totalSize)
assert.NoError(t, err)
assert.Equal(t, int64(26214400), totalSize)
Expand Down

0 comments on commit 8b4b82c

Please sign in to comment.