Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add configurable max blob size #675

Merged
merged 7 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (

const systemAccountKey = "system"

const maxBlobSize = 2 * 1024 * 1024 // 2 MiB

type DispersalServer struct {
pb.UnimplementedDisperserServer
mu *sync.RWMutex
Expand All @@ -50,6 +48,8 @@ type DispersalServer struct {

metrics *disperser.Metrics

maxBlobSize int

logger logging.Logger
}

Expand All @@ -70,6 +70,7 @@ func NewDispersalServer(
metrics *disperser.Metrics,
ratelimiter common.RateLimiter,
rateConfig RateConfig,
maxBlobSize int,
) *DispersalServer {
logger := _logger.With("component", "DispersalServer")
for account, rateInfoByQuorum := range rateConfig.Allowlist {
Expand All @@ -92,6 +93,7 @@ func NewDispersalServer(
authenticator: authenticator,
mu: &sync.RWMutex{},
quorumConfig: QuorumConfig{},
maxBlobSize: maxBlobSize,
}
}

Expand Down Expand Up @@ -807,6 +809,7 @@ func (s *DispersalServer) Start(ctx context.Context) error {
return errors.New("could not start GRPC server")
}

s.logger.Info("max blob size", s.maxBlobSize)
return nil
}

Expand Down Expand Up @@ -909,8 +912,8 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb
data := req.GetData()
blobSize := len(data)
// The blob size in bytes must be in range [1, maxBlobSize].
if blobSize > maxBlobSize {
return nil, fmt.Errorf("blob size cannot exceed 2 MiB")
if blobSize > s.maxBlobSize {
return nil, fmt.Errorf("blob size cannot exceed %v Bytes", s.maxBlobSize)
}
if blobSize == 0 {
return nil, fmt.Errorf("blob size must be greater than 0")
Expand Down
6 changes: 4 additions & 2 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (
deployLocalStack bool
localStackPort = "4568"
allowlistFile *os.File
testMaxBlobSize = 2 * 1024 * 1024
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -421,7 +422,8 @@ func TestDisperseBlobWithExceedSizeLimit(t *testing.T) {
CustomQuorumNumbers: []uint32{0, 1},
})
assert.NotNil(t, err)
assert.Equal(t, err.Error(), "rpc error: code = InvalidArgument desc = blob size cannot exceed 2 MiB")
expectedErrMsg := fmt.Sprintf("rpc error: code = InvalidArgument desc = blob size cannot exceed %v Bytes", testMaxBlobSize)
assert.Equal(t, err.Error(), expectedErrMsg)
}

func TestParseAllowlist(t *testing.T) {
Expand Down Expand Up @@ -699,7 +701,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
return apiserver.NewDispersalServer(disperser.ServerConfig{
GrpcPort: "51001",
GrpcTimeout: 1 * time.Second,
}, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig)
}, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig, testMaxBlobSize)
}

func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) {
Expand Down
2 changes: 2 additions & 0 deletions disperser/cmd/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Config struct {
ShadowTableName string
BucketStoreSize int
EthClientConfig geth.EthClientConfig
MaxBlobSize int

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
Expand Down Expand Up @@ -69,6 +70,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
BucketTableName: ctx.GlobalString(flags.BucketTableName.Name),
BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name),
EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx),
MaxBlobSize: ctx.GlobalInt(flags.MaxBlobSize.Name),

BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/apiserver/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "RATE_BUCKET_STORE_SIZE"),
Required: false,
}
MaxBlobSize = cli.IntFlag{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks no issue if different disperser replicas are using different MaxBlobSize.
Should we add validation to the Node to check if the blob size is under a limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is issue. But we want it, it has to be different deployment, which the current devops repo has yet offered that.

I don't think we need to artificially constraint on blob size on the DA node size now, as far as the disperser isn't decentralized, but yes, once we got to the step. We need to have some rate limit or blob constraint. For now, I think we are good.

Empirically, there is no impact on validation speed with respect to the blob size.

Name: common.PrefixFlag(FlagPrefix, "max-blob-size"),
Usage: "max blob size disperser is accepting",
Value: 2_097_152,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOB_SIZE"),
Required: false,
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -112,6 +119,7 @@ var optionalFlags = []cli.Flag{
BucketStoreSize,
GrpcTimeoutFlag,
ShadowTableNameFlag,
MaxBlobSize,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
10 changes: 10 additions & 0 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"math"
"os"
"time"

Expand Down Expand Up @@ -114,6 +115,14 @@ func RunDisperserServer(ctx *cli.Context) error {
ratelimiter = ratelimit.NewRateLimiter(reg, globalParams, bucketStore, logger)
}

if config.MaxBlobSize < 0 || config.MaxBlobSize > 64*1024*1024 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it fail if config.MaxBlobSize == 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I will exclude 0

return fmt.Errorf("configured max blob size is invalid %v", config.MaxBlobSize)
}

if int64(math.Log2(float64(config.MaxBlobSize))) == int64(math.Log2(float64(config.MaxBlobSize-1))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's faster to use bit operation, config.MaxBlobSize > 0 && (config.MaxBlobSize&(config.MaxBlobSize-1)) == 0 should work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only runs once. I think it is more readable this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but your solution works

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are utility functions to check if a number is power of 2 (in encoding or common package). Reusing that has no impact on readabilty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that function is in "github.com/Layr-Labs/eigenda/encoding/fft". I don't want to add a new strange dependancy, what do you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we can pull it out to common/ and share across the codebase

return fmt.Errorf("configured max blob size must be power of 2 %v", config.MaxBlobSize)
}

metrics := disperser.NewMetrics(reg, config.MetricsConfig.HTTPPort, logger)
server := apiserver.NewDispersalServer(
config.ServerConfig,
Expand All @@ -123,6 +132,7 @@ func RunDisperserServer(ctx *cli.Context) error {
metrics,
ratelimiter,
config.RateConfig,
config.MaxBlobSize,
)

// Enable Metrics Block
Expand Down
3 changes: 2 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const (
encoderPort = "3100"
q0AdversaryThreshold = uint8(80)
q0QuorumThreshold = uint8(100)
testMaxBlobSize = 2 * 1024 * 1024
)

func init() {
Expand Down Expand Up @@ -193,7 +194,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
tx := &coremock.MockTransactor{}
tx.On("GetCurrentBlockNumber").Return(uint64(100), nil)
tx.On("GetQuorumCount").Return(1, nil)
server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, ratelimiter, rateConfig)
server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, ratelimiter, rateConfig, testMaxBlobSize)

return TestDisperser{
batcher: batcher,
Expand Down
Loading