From bd7a762b74d9e3c2afdc71285b2c232edb9e502b Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 31 Jul 2024 00:55:57 +0000 Subject: [PATCH 1/7] add configurable max blob size --- disperser/apiserver/server.go | 11 +++++++---- disperser/apiserver/server_test.go | 6 ++++-- disperser/cmd/apiserver/config.go | 2 ++ disperser/cmd/apiserver/flags/flags.go | 8 ++++++++ disperser/cmd/apiserver/main.go | 10 ++++++++++ test/integration_test.go | 3 ++- 6 files changed, 33 insertions(+), 7 deletions(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 747649bdcc..b19efac622 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -32,8 +32,6 @@ import ( const systemAccountKey = "system" -const maxBlobSize = 2 * 1024 * 1024 // 2 MiB - type DispersalServer struct { pb.UnimplementedDisperserServer mu *sync.RWMutex @@ -50,6 +48,8 @@ type DispersalServer struct { metrics *disperser.Metrics + maxBlobSize int + logger logging.Logger } @@ -70,6 +70,7 @@ func NewDispersalServer( metrics *disperser.Metrics, ratelimiter common.RateLimiter, rateConfig RateConfig, + maxBlobSize int, ) *DispersalServer { logger := _logger.With("component", "DispersalServer") for account, rateInfoByQuorum := range rateConfig.Allowlist { @@ -92,6 +93,7 @@ func NewDispersalServer( authenticator: authenticator, mu: &sync.RWMutex{}, quorumConfig: QuorumConfig{}, + maxBlobSize: maxBlobSize, } } @@ -807,6 +809,7 @@ func (s *DispersalServer) Start(ctx context.Context) error { return errors.New("could not start GRPC server") } + s.logger.Info("max blob size", s.maxBlobSize) return nil } @@ -909,8 +912,8 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb data := req.GetData() blobSize := len(data) // The blob size in bytes must be in range [1, maxBlobSize]. - if blobSize > maxBlobSize { - return nil, fmt.Errorf("blob size cannot exceed 2 MiB") + if blobSize > s.maxBlobSize { + return nil, fmt.Errorf("blob size cannot exceed %v Bytes", s.maxBlobSize) } if blobSize == 0 { return nil, fmt.Errorf("blob size must be greater than 0") diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 734d93d8f8..40a7a67471 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -54,6 +54,7 @@ var ( deployLocalStack bool localStackPort = "4568" allowlistFile *os.File + testMaxBlobSize = 2 * 1024 * 1024 ) func TestMain(m *testing.M) { @@ -421,7 +422,8 @@ func TestDisperseBlobWithExceedSizeLimit(t *testing.T) { CustomQuorumNumbers: []uint32{0, 1}, }) assert.NotNil(t, err) - assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = blob size cannot exceed 2 MiB") + expectedErrMsg := fmt.Sprintf("rpc error: code = InvalidArgument desc = blob size cannot exceed %v Bytes", testMaxBlobSize) + assert.Equal(t, err.Error(), expectedErrMsg) } func TestParseAllowlist(t *testing.T) { @@ -699,7 +701,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { return apiserver.NewDispersalServer(disperser.ServerConfig{ GrpcPort: "51001", GrpcTimeout: 1 * time.Second, - }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig) + }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig, testMaxBlobSize) } func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) { diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index 6ae168f458..db5740c34d 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -25,6 +25,7 @@ type Config struct { ShadowTableName string BucketStoreSize int EthClientConfig geth.EthClientConfig + MaxBlobSize int BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string @@ -69,6 +70,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { BucketTableName: ctx.GlobalString(flags.BucketTableName.Name), BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name), EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx), + MaxBlobSize: ctx.GlobalInt(flags.MaxBlobSize.Name), BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name), diff --git a/disperser/cmd/apiserver/flags/flags.go b/disperser/cmd/apiserver/flags/flags.go index 36d856628b..1211af62c0 100644 --- a/disperser/cmd/apiserver/flags/flags.go +++ b/disperser/cmd/apiserver/flags/flags.go @@ -94,6 +94,13 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "RATE_BUCKET_STORE_SIZE"), Required: false, } + MaxBlobSize = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "max-blob-size"), + Usage: "max blob size disperser is accepting", + Value: 2_097_152, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOB_SIZE"), + Required: false, + } ) var requiredFlags = []cli.Flag{ @@ -112,6 +119,7 @@ var optionalFlags = []cli.Flag{ BucketStoreSize, GrpcTimeoutFlag, ShadowTableNameFlag, + MaxBlobSize, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 36bf71d7c6..793e40a8a7 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "math" "os" "time" @@ -114,6 +115,14 @@ func RunDisperserServer(ctx *cli.Context) error { ratelimiter = ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger) } + if config.MaxBlobSize < 0 || config.MaxBlobSize > 64*1024*1024 { + return fmt.Errorf("configured max blob size is invalid %v", config.MaxBlobSize) + } + + if int64(math.Log2(float64(config.MaxBlobSize))) == int64(math.Log2(float64(config.MaxBlobSize-1))) { + return fmt.Errorf("configured max blob size must be power of 2 %v", config.MaxBlobSize) + } + metrics := disperser.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger) server := apiserver.NewDispersalServer( config.ServerConfig, @@ -123,6 +132,7 @@ func RunDisperserServer(ctx *cli.Context) error { metrics, ratelimiter, config.RateConfig, + config.MaxBlobSize, ) // Enable Metrics Block diff --git a/test/integration_test.go b/test/integration_test.go index a1fc47c04b..2d462e2981 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -72,6 +72,7 @@ const ( encoderPort = "3100" q0AdversaryThreshold = uint8(80) q0QuorumThreshold = uint8(100) + testMaxBlobSize = 2 * 1024 * 1024 ) func init() { @@ -193,7 +194,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser tx := &coremock.MockTransactor{} tx.On("GetCurrentBlockNumber").Return(uint64(100), nil) tx.On("GetQuorumCount").Return(1, nil) - server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, ratelimiter, rateConfig) + server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, ratelimiter, rateConfig, testMaxBlobSize) return TestDisperser{ batcher: batcher, From d3d6a43cd0bd309878e3d1248d31265e5ada557a Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 31 Jul 2024 19:01:37 +0000 Subject: [PATCH 2/7] remove blob equals to 0 case --- disperser/cmd/apiserver/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 793e40a8a7..2698ccca07 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -115,7 +115,7 @@ func RunDisperserServer(ctx *cli.Context) error { ratelimiter = ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger) } - if config.MaxBlobSize < 0 || config.MaxBlobSize > 64*1024*1024 { + if config.MaxBlobSize <= 0 || config.MaxBlobSize > 64*1024*1024 { return fmt.Errorf("configured max blob size is invalid %v", config.MaxBlobSize) } From 09abfaa12d68f01056f1bdc57ab2c37c2e4aa054 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 31 Jul 2024 22:09:34 +0000 Subject: [PATCH 3/7] commit --- disperser/apiserver/server.go | 2 +- disperser/cmd/apiserver/main.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index b19efac622..426021f5f5 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -805,11 +805,11 @@ func (s *DispersalServer) Start(ctx context.Context) error { healthcheck.RegisterHealthServer(name, gs) s.logger.Info("port", s.serverConfig.GrpcPort, "address", listener.Addr().String(), "GRPC Listening") + s.logger.Info("max blob size", s.maxBlobSize) if err := gs.Serve(listener); err != nil { return errors.New("could not start GRPC server") } - s.logger.Info("max blob size", s.maxBlobSize) return nil } diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 2698ccca07..24c03b4b46 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -4,13 +4,13 @@ import ( "context" "fmt" "log" - "math" "os" "time" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/disperser/apiserver" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" + "github.com/Layr-Labs/eigenda/encoding/fft" "github.com/prometheus/client_golang/prometheus" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" @@ -119,7 +119,7 @@ func RunDisperserServer(ctx *cli.Context) error { return fmt.Errorf("configured max blob size is invalid %v", config.MaxBlobSize) } - if int64(math.Log2(float64(config.MaxBlobSize))) == int64(math.Log2(float64(config.MaxBlobSize-1))) { + if !fft.IsPowerOfTwo(uint64(config.MaxBlobSize)) { return fmt.Errorf("configured max blob size must be power of 2 %v", config.MaxBlobSize) } From 232eaaf8d521961863c1e7e7280e98f6a3e95e55 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 31 Jul 2024 22:18:56 +0000 Subject: [PATCH 4/7] reduce max size --- disperser/cmd/apiserver/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 24c03b4b46..44c63453b0 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -115,7 +115,7 @@ func RunDisperserServer(ctx *cli.Context) error { ratelimiter = ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger) } - if config.MaxBlobSize <= 0 || config.MaxBlobSize > 64*1024*1024 { + if config.MaxBlobSize <= 0 || config.MaxBlobSize > 32*1024*1024 { return fmt.Errorf("configured max blob size is invalid %v", config.MaxBlobSize) } From 628be00c7027a0fb2ba81f732e8952cfe54ac4cc Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 31 Jul 2024 22:33:04 +0000 Subject: [PATCH 5/7] fix log --- disperser/apiserver/server.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 426021f5f5..cee59873b1 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -804,8 +804,7 @@ func (s *DispersalServer) Start(ctx context.Context) error { name := pb.Disperser_ServiceDesc.ServiceName healthcheck.RegisterHealthServer(name, gs) - s.logger.Info("port", s.serverConfig.GrpcPort, "address", listener.Addr().String(), "GRPC Listening") - s.logger.Info("max blob size", s.maxBlobSize) + s.logger.Info("port", s.serverConfig.GrpcPort, "address", listener.Addr().String(), "max blob size", s.maxBlobSize, "GRPC Listening") if err := gs.Serve(listener); err != nil { return errors.New("could not start GRPC server") } From bd7ca28da6d6258640107f5fdb4b7b3349dccb82 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 31 Jul 2024 22:34:14 +0000 Subject: [PATCH 6/7] max blob size --- disperser/apiserver/server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index cee59873b1..bfd94ef17c 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -804,7 +804,8 @@ func (s *DispersalServer) Start(ctx context.Context) error { name := pb.Disperser_ServiceDesc.ServiceName healthcheck.RegisterHealthServer(name, gs) - s.logger.Info("port", s.serverConfig.GrpcPort, "address", listener.Addr().String(), "max blob size", s.maxBlobSize, "GRPC Listening") + s.logger.Info("port", s.serverConfig.GrpcPort, "address", listener.Addr().String(), "maxBlobSize", s.maxBlobSize, "GRPC Listening") + if err := gs.Serve(listener); err != nil { return errors.New("could not start GRPC server") } From 0c15f2b8b05094374a22bde5fb695ddddf550bb3 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 31 Jul 2024 22:49:25 +0000 Subject: [PATCH 7/7] change log order --- disperser/apiserver/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index bfd94ef17c..a3bcaa1fe1 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -804,7 +804,7 @@ func (s *DispersalServer) Start(ctx context.Context) error { name := pb.Disperser_ServiceDesc.ServiceName healthcheck.RegisterHealthServer(name, gs) - s.logger.Info("port", s.serverConfig.GrpcPort, "address", listener.Addr().String(), "maxBlobSize", s.maxBlobSize, "GRPC Listening") + s.logger.Info("GRPC Listening", "port", s.serverConfig.GrpcPort, "address", listener.Addr().String(), "maxBlobSize", s.maxBlobSize) if err := gs.Serve(listener); err != nil { return errors.New("could not start GRPC server")