diff --git a/core/mock/state.go b/core/mock/state.go index d80651414f..136f193e85 100644 --- a/core/mock/state.go +++ b/core/mock/state.go @@ -26,11 +26,12 @@ var _ core.IndexedChainState = (*ChainDataMock)(nil) type PrivateOperatorInfo struct { *core.IndexedOperatorInfo - KeyPair *core.KeyPair - Signer blssigner.Signer - Host string - DispersalPort string - RetrievalPort string + KeyPair *core.KeyPair + Signer blssigner.Signer + Host string + DispersalPort string + RetrievalPort string + V2DispersalPort string } type PrivateOperatorState struct { @@ -138,7 +139,8 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl host := "0.0.0.0" dispersalPort := fmt.Sprintf("3%03v", 2*i) retrievalPort := fmt.Sprintf("3%03v", 2*i+1) - socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort) + v2DispersalPort := fmt.Sprintf("3%03v", 2*i+2) + socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort, v2DispersalPort) indexed := &core.IndexedOperatorInfo{ Socket: string(socket), @@ -158,6 +160,7 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl Host: host, DispersalPort: dispersalPort, RetrievalPort: retrievalPort, + V2DispersalPort: v2DispersalPort, } indexedOperators[id] = indexed diff --git a/core/serialization.go b/core/serialization.go index f9e40bf850..b70ada92c8 100644 --- a/core/serialization.go +++ b/core/serialization.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "math/big" - "regexp" "slices" "github.com/Layr-Labs/eigenda/api" @@ -528,33 +527,18 @@ func decode(data []byte, obj any) error { return nil } -func (s OperatorSocket) GetDispersalSocket() string { - ip, port1, _, err := extractIPAndPorts(string(s)) +func (s OperatorSocket) GetV1DispersalSocket() string { + ip, v1DispersalPort, _, _, err := ParseOperatorSocket(string(s)) if err != nil { return "" } - return fmt.Sprintf("%s:%s", ip, port1) + return fmt.Sprintf("%s:%s", ip, v1DispersalPort) } func (s OperatorSocket) GetRetrievalSocket() string { - ip, _, port2, err := extractIPAndPorts(string(s)) + ip, _, retrievalPort, _, err := ParseOperatorSocket(string(s)) if err != nil { return "" } - return fmt.Sprintf("%s:%s", ip, port2) -} - -func extractIPAndPorts(s string) (string, string, string, error) { - regex := regexp.MustCompile(`^([^:]+):([^;]+);([^;]+)$`) - matches := regex.FindStringSubmatch(s) - - if len(matches) != 4 { - return "", "", "", errors.New("input string does not match expected format") - } - - ip := matches[1] - port1 := matches[2] - port2 := matches[3] - - return ip, port1, port2, nil + return fmt.Sprintf("%s:%s", ip, retrievalPort) } diff --git a/core/serialization_test.go b/core/serialization_test.go index 090fb363f9..727933d150 100644 --- a/core/serialization_test.go +++ b/core/serialization_test.go @@ -195,20 +195,68 @@ func TestHashPubKeyG1(t *testing.T) { } func TestParseOperatorSocket(t *testing.T) { - operatorSocket := "localhost:1234;5678" - host, dispersalPort, retrievalPort, err := core.ParseOperatorSocket(operatorSocket) + operatorSocket := "localhost:1234;5678;9999" + host, dispersalPort, retrievalPort, v2DispersalPort, err := core.ParseOperatorSocket(operatorSocket) assert.NoError(t, err) assert.Equal(t, "localhost", host) assert.Equal(t, "1234", dispersalPort) assert.Equal(t, "5678", retrievalPort) + assert.Equal(t, "9999", v2DispersalPort) - _, _, _, err = core.ParseOperatorSocket("localhost:12345678") + host, dispersalPort, retrievalPort, v2DispersalPort, err = core.ParseOperatorSocket("localhost:1234;5678") + assert.NoError(t, err) + assert.Equal(t, "localhost", host) + assert.Equal(t, "1234", dispersalPort) + assert.Equal(t, "5678", retrievalPort) + assert.Equal(t, "", v2DispersalPort) + + _, _, _, _, err = core.ParseOperatorSocket("localhost;1234;5678") assert.NotNil(t, err) - assert.Equal(t, "invalid socket address format, missing retrieval port: localhost:12345678", err.Error()) + assert.ErrorContains(t, err, "invalid socket address format") - _, _, _, err = core.ParseOperatorSocket("localhost1234;5678") + _, _, _, _, err = core.ParseOperatorSocket("localhost:12345678") assert.NotNil(t, err) - assert.Equal(t, "invalid socket address format: localhost1234;5678", err.Error()) + assert.ErrorContains(t, err, "invalid socket address format") + + _, _, _, _, err = core.ParseOperatorSocket("localhost1234;5678") + assert.NotNil(t, err) + assert.ErrorContains(t, err, "invalid socket address format") +} + +func TestGetV1DispersalSocket(t *testing.T) { + operatorSocket := core.OperatorSocket("localhost:1234;5678;9999") + socket := operatorSocket.GetV1DispersalSocket() + assert.Equal(t, "localhost:1234", socket) + + operatorSocket = core.OperatorSocket("localhost:1234;5678") + socket = operatorSocket.GetV1DispersalSocket() + assert.Equal(t, "localhost:1234", socket) + + operatorSocket = core.OperatorSocket("localhost:1234;5678;") + socket = operatorSocket.GetV1DispersalSocket() + assert.Equal(t, "localhost:1234", socket) + + operatorSocket = core.OperatorSocket("localhost:1234") + socket = operatorSocket.GetV1DispersalSocket() + assert.Equal(t, "", socket) +} + +func TestGetRetrievalSocket(t *testing.T) { + operatorSocket := core.OperatorSocket("localhost:1234;5678;9999") + socket := operatorSocket.GetRetrievalSocket() + assert.Equal(t, "localhost:5678", socket) + + operatorSocket = core.OperatorSocket("localhost:1234;5678") + socket = operatorSocket.GetRetrievalSocket() + assert.Equal(t, "localhost:5678", socket) + + operatorSocket = core.OperatorSocket("localhost:1234;5678;") + socket = operatorSocket.GetRetrievalSocket() + assert.Equal(t, "localhost:5678", socket) + + operatorSocket = core.OperatorSocket("localhost:1234") + socket = operatorSocket.GetRetrievalSocket() + assert.Equal(t, "", socket) } func TestSignatureBytes(t *testing.T) { diff --git a/core/state.go b/core/state.go index 2bbf08d90b..6993fca175 100644 --- a/core/state.go +++ b/core/state.go @@ -12,35 +12,55 @@ import ( // Operators +// OperatorSocket is formatted as "host:dispersalPort;retrievalPort;v2DispersalPort" type OperatorSocket string func (s OperatorSocket) String() string { return string(s) } -func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort string) OperatorSocket { - return OperatorSocket(fmt.Sprintf("%s:%s;%s", nodeIP, dispersalPort, retrievalPort)) +func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort, v2DispersalPort string) OperatorSocket { + if v2DispersalPort == "" { + return OperatorSocket(fmt.Sprintf("%s:%s;%s", nodeIP, dispersalPort, retrievalPort)) + } + return OperatorSocket(fmt.Sprintf("%s:%s;%s;%s", nodeIP, dispersalPort, retrievalPort, v2DispersalPort)) } type StakeAmount = *big.Int -func ParseOperatorSocket(socket string) (host string, dispersalPort string, retrievalPort string, err error) { +func ParseOperatorSocket(socket string) (host string, dispersalPort string, retrievalPort string, v2DispersalPort string, err error) { s := strings.Split(socket, ";") - if len(s) != 2 { - err = fmt.Errorf("invalid socket address format, missing retrieval port: %s", socket) + + if len(s) == 2 { + // no v2 dispersal port + retrievalPort = s[1] + s = strings.Split(s[0], ":") + if len(s) != 2 { + err = fmt.Errorf("invalid socket address format: %s", socket) + return + } + host = s[0] + dispersalPort = s[1] + return } - retrievalPort = s[1] - s = strings.Split(s[0], ":") - if len(s) != 2 { - err = fmt.Errorf("invalid socket address format: %s", socket) + if len(s) == 3 { + // all ports specified + v2DispersalPort = s[2] + retrievalPort = s[1] + + s = strings.Split(s[0], ":") + if len(s) != 2 { + err = fmt.Errorf("invalid socket address format: %s", socket) + return + } + host = s[0] + dispersalPort = s[1] return } - host = s[0] - dispersalPort = s[1] - return + return "", "", "", "", fmt.Errorf("invalid socket address format %s: it must specify dispersal port, retrieval port, and/or v2 dispersal port (ex. 0.0.0.0:32004;32005;32006)", socket) } // OperatorInfo contains information about an operator which is stored on the blockchain state, diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 69e4b2571b..e3058118ee 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -114,11 +114,11 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.EncodedBlobMe // TODO Add secure Grpc conn, err := grpc.NewClient( - core.OperatorSocket(op.Socket).GetDispersalSocket(), + core.OperatorSocket(op.Socket).GetV1DispersalSocket(), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { - c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err) + c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetV1DispersalSocket(), "err", err) return nil, err } defer conn.Close() diff --git a/disperser/common/semver/semver.go b/disperser/common/semver/semver.go index 5f959c90dc..2e7c80b3e1 100644 --- a/disperser/common/semver/semver.go +++ b/disperser/common/semver/semver.go @@ -33,7 +33,7 @@ func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, oper if useRetrievalSocket { socket = operatorSocket.GetRetrievalSocket() } else { - socket = operatorSocket.GetDispersalSocket() + socket = operatorSocket.GetV1DispersalSocket() } semver := GetSemverInfo(context.Background(), socket, useRetrievalSocket, operatorId, logger, nodeInfoTimeout) diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index 99ea68841b..def620f5d0 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -149,12 +149,12 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, for opID, op := range state.IndexedOperators { opID := opID op := op - host, dispersalPort, _, err := core.ParseOperatorSocket(op.Socket) + host, _, _, v2DispersalPort, err := core.ParseOperatorSocket(op.Socket) if err != nil { - return nil, nil, fmt.Errorf("failed to parse operator socket: %w", err) + return nil, nil, fmt.Errorf("failed to parse operator socket (%s): %w", op.Socket, err) } - client, err := d.nodeClientManager.GetClient(host, dispersalPort) + client, err := d.nodeClientManager.GetClient(host, v2DispersalPort) if err != nil { d.logger.Error("failed to get node client", "operator", opID.Hex(), "err", err) continue diff --git a/disperser/controller/dispatcher_test.go b/disperser/controller/dispatcher_test.go index 4583cf9c49..6a499c84dd 100644 --- a/disperser/controller/dispatcher_test.go +++ b/disperser/controller/dispatcher_test.go @@ -75,9 +75,9 @@ func TestDispatcherHandleBatch(t *testing.T) { mockClient0 := clientsmock.NewNodeClient() sig0 := mockChainState.KeyPairs[opId0].SignMessage(bhh) mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(sig0, nil) - op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort - op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort - op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort + op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].V2DispersalPort + op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].V2DispersalPort + op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].V2DispersalPort require.NotEqual(t, op0Port, op1Port) require.NotEqual(t, op0Port, op2Port) components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil) @@ -150,9 +150,9 @@ func TestDispatcherInsufficientSignatures(t *testing.T) { // only op2 signs - quorum 0 will have 0 signing rate, quorum 1 will have 20% mockClient0 := clientsmock.NewNodeClient() mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure")) - op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort - op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort - op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort + op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].V2DispersalPort + op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].V2DispersalPort + op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].V2DispersalPort require.NotEqual(t, op0Port, op1Port) require.NotEqual(t, op0Port, op2Port) components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil) @@ -223,9 +223,9 @@ func TestDispatcherInsufficientSignatures2(t *testing.T) { // no operators sign, all blobs will have insufficient signatures mockClient0 := clientsmock.NewNodeClient() mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure")) - op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort - op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort - op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort + op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].V2DispersalPort + op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].V2DispersalPort + op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].V2DispersalPort require.NotEqual(t, op0Port, op1Port) require.NotEqual(t, op0Port, op2Port) components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil) diff --git a/disperser/dataapi/operator_handler.go b/disperser/dataapi/operator_handler.go index 89354fde54..75f63cda95 100644 --- a/disperser/dataapi/operator_handler.go +++ b/disperser/dataapi/operator_handler.go @@ -54,7 +54,7 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st retrievalOnline, retrievalStatus = checkServiceOnline(ctx, "node.Retrieval", retrievalSocket, 3*time.Second) } - dispersalSocket := operatorSocket.GetDispersalSocket() + dispersalSocket := operatorSocket.GetV1DispersalSocket() dispersalPortOpen := checkIsOperatorPortOpen(dispersalSocket, 3, oh.logger) dispersalOnline, dispersalStatus := false, fmt.Sprintf("port closed or unreachable for %s", dispersalSocket) if dispersalPortOpen { diff --git a/disperser/dataapi/subgraph_client_test.go b/disperser/dataapi/subgraph_client_test.go index 5282ca45e9..9f7e4a3322 100644 --- a/disperser/dataapi/subgraph_client_test.go +++ b/disperser/dataapi/subgraph_client_test.go @@ -330,7 +330,7 @@ var ( }, SocketUpdates: []subgraph.SocketUpdates{ { - Socket: "localhost:32008;32009", + Socket: "localhost:32008;32009;32010", }, }, } diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 7a025b65dd..299c99321b 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -385,7 +385,7 @@ func (env *Config) generateRelayVars(ind int, graphUrl, grpcPort string) RelayVa } // Generates DA node .env -func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, metricsPort, nodeApiPort string) OperatorVars { +func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, metricsPort, nodeApiPort string) OperatorVars { max, _ := new(big.Int).SetString("21888242871839275222246405745257275088548364400416034343698204186575808495617", 10) // max.Exp(big.NewInt(2), big.NewInt(130), nil).Sub(max, big.NewInt(1)) @@ -411,6 +411,7 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, NODE_RETRIEVAL_PORT: retrievalPort, NODE_INTERNAL_DISPERSAL_PORT: dispersalPort, NODE_INTERNAL_RETRIEVAL_PORT: retrievalPort, + NODE_V2_DISPERSAL_PORT: v2DispersalPort, NODE_ENABLE_METRICS: "true", NODE_METRICS_PORT: metricsPort, NODE_ENABLE_NODE_API: "true", @@ -651,8 +652,9 @@ func (env *Config) GenerateAllVariables() { metricsPort := fmt.Sprint(port + 1) // port dispersalPort := fmt.Sprint(port + 2) retrievalPort := fmt.Sprint(port + 3) - nodeApiPort := fmt.Sprint(port + 4) - port += 5 + v2DispersalPort := fmt.Sprint(port + 4) + nodeApiPort := fmt.Sprint(port + 5) + port += 6 name := fmt.Sprintf("opr%v", i) logPath, dbPath, filename, envFile := env.getPaths(name) @@ -660,7 +662,7 @@ func (env *Config) GenerateAllVariables() { // Convert key to address - operatorConfig := env.generateOperatorVars(i, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, fmt.Sprint(metricsPort), nodeApiPort) + operatorConfig := env.generateOperatorVars(i, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, fmt.Sprint(metricsPort), nodeApiPort) writeEnv(operatorConfig.getEnvMap(), envFile) env.Operators = append(env.Operators, operatorConfig) diff --git a/inabox/deploy/deploy.go b/inabox/deploy/deploy.go index fbc571a2b9..db492e11e3 100644 --- a/inabox/deploy/deploy.go +++ b/inabox/deploy/deploy.go @@ -4,6 +4,15 @@ import ( "context" "encoding/json" "fmt" + "io" + "log" + "math/big" + "os" + "path/filepath" + "strconv" + "strings" + "time" + "github.com/Layr-Labs/eigenda/common" caws "github.com/Layr-Labs/eigenda/common/aws" relayreg "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry" @@ -14,14 +23,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kms" "github.com/aws/aws-sdk-go-v2/service/kms/types" "github.com/ethereum/go-ethereum/crypto" - "io" - "log" - "math/big" - "os" - "path/filepath" - "strconv" - "strings" - "time" "github.com/Layr-Labs/eigenda/core" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -364,7 +365,7 @@ func (env *Config) StopAnvil() { func (env *Config) RunNodePluginBinary(operation string, operator OperatorVars) { changeDirectory(filepath.Join(env.rootPath, "inabox")) - socket := string(core.MakeOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT)) + socket := string(core.MakeOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT, operator.NODE_V2_DISPERSAL_PORT)) envVars := []string{ "NODE_OPERATION=" + operation, diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index 40733c94e2..ea2f94c694 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -344,6 +344,8 @@ type OperatorVars struct { NODE_RETRIEVAL_PORT string + NODE_V2_DISPERSAL_PORT string + NODE_ENABLE_METRICS string NODE_METRICS_PORT string diff --git a/node/config.go b/node/config.go index b2e7776759..097ed9a0e5 100644 --- a/node/config.go +++ b/node/config.go @@ -48,6 +48,7 @@ type Config struct { DispersalPort string InternalRetrievalPort string InternalDispersalPort string + V2DispersalPort string EnableNodeApi bool NodeApiPort string EnableMetrics bool @@ -86,6 +87,7 @@ type Config struct { EnableV2 bool OnchainStateRefreshInterval time.Duration ChunkDownloadTimeout time.Duration + GRPCMsgSizeLimitV2 int PprofHttpPort string EnablePprof bool @@ -228,12 +230,40 @@ func NewConfig(ctx *cli.Context) (*Config, error) { return nil, err } + // check if the ports are valid integers + dispersalPort := ctx.GlobalString(flags.DispersalPortFlag.Name) + _, err = strconv.Atoi(dispersalPort) + if err != nil { + return nil, fmt.Errorf("invalid dispersal port: %s", dispersalPort) + } + + retrievalPort := ctx.GlobalString(flags.RetrievalPortFlag.Name) + _, err = strconv.Atoi(retrievalPort) + if err != nil { + return nil, fmt.Errorf("invalid retrieval port: %s", retrievalPort) + } + + v2Enabled := ctx.GlobalBool(flags.EnableV2Flag.Name) + + v2DispersalPort := ctx.GlobalString(flags.V2DispersalPortFlag.Name) + if v2DispersalPort == "" { + if v2Enabled { + return nil, fmt.Errorf("v2 dispersal port (NODE_V2_DISPERSAL_PORT) must be specified if v2 is enabled") + } + } else { + _, err = strconv.Atoi(v2DispersalPort) + if err != nil { + return nil, fmt.Errorf("invalid v2 dispersal port: %s", v2DispersalPort) + } + } + return &Config{ Hostname: ctx.GlobalString(flags.HostnameFlag.Name), - DispersalPort: ctx.GlobalString(flags.DispersalPortFlag.Name), - RetrievalPort: ctx.GlobalString(flags.RetrievalPortFlag.Name), + DispersalPort: dispersalPort, + RetrievalPort: retrievalPort, InternalDispersalPort: internalDispersalFlag, InternalRetrievalPort: internalRetrievalFlag, + V2DispersalPort: v2DispersalPort, EnableNodeApi: ctx.GlobalBool(flags.EnableNodeApiFlag.Name), NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name), @@ -264,9 +294,10 @@ func NewConfig(ctx *cli.Context) (*Config, error) { UseSecureGrpc: ctx.GlobalBoolT(flags.ChurnerUseSecureGRPC.Name), DisableNodeInfoResources: ctx.GlobalBool(flags.DisableNodeInfoResourcesFlag.Name), BlsSignerConfig: blsSignerConfig, - EnableV2: ctx.GlobalBool(flags.EnableV2Flag.Name), + EnableV2: v2Enabled, OnchainStateRefreshInterval: ctx.GlobalDuration(flags.OnchainStateRefreshIntervalFlag.Name), ChunkDownloadTimeout: ctx.GlobalDuration(flags.ChunkDownloadTimeoutFlag.Name), + GRPCMsgSizeLimitV2: ctx.GlobalInt(flags.GRPCMsgSizeLimitV2Flag.Name), PprofHttpPort: ctx.GlobalString(flags.PprofHttpPort.Name), EnablePprof: ctx.GlobalBool(flags.EnablePprof.Name), DisableDispersalAuthentication: ctx.GlobalBool(flags.DisableDispersalAuthenticationFlag.Name), diff --git a/node/flags/flags.go b/node/flags/flags.go index 69a9c61516..126bb871be 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -47,6 +47,12 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(EnvVarPrefix, "INTERNAL_RETRIEVAL_PORT"), } + V2DispersalPortFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "v2-dispersal-port"), + Usage: "Port at which node registers to listen for v2 dispersal calls", + Required: true, + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "V2_DISPERSAL_PORT"), + } EnableNodeApiFlag = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "enable-node-api"), Usage: "enable node-api to serve eigenlayer-cli node-api calls", @@ -218,7 +224,7 @@ var ( Required: false, EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_GNARK_BUNDLE_ENCODING"), } - EnableV2Flag = cli.BoolFlag{ + EnableV2Flag = cli.BoolTFlag{ Name: "enable-v2", Usage: "Enable V2 features", Required: false, @@ -238,6 +244,13 @@ var ( EnvVar: common.PrefixEnvVar(EnvVarPrefix, "CHUNK_DOWNLOAD_TIMEOUT"), Value: 20 * time.Second, } + GRPCMsgSizeLimitV2Flag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "grpc-msg-size-limit-v2"), + Usage: "The maximum message size in bytes the V2 dispersal endpoint can receive from the client. This flag is only relevant in v2 (default: 1MB)", + Required: false, + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "GRPC_MSG_SIZE_LIMIT_V2"), + Value: 1024 * 1024, + } DisableDispersalAuthenticationFlag = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "disable-dispersal-authentication"), Usage: "Disable authentication for StoreChunks() calls from the disperser", @@ -416,8 +429,10 @@ var optionalFlags = []cli.Flag{ BLSSignerCertFileFlag, BLSSignerAPIKeyFlag, EnableV2Flag, + V2DispersalPortFlag, OnchainStateRefreshIntervalFlag, ChunkDownloadTimeoutFlag, + GRPCMsgSizeLimitV2Flag, PprofHttpPort, EnablePprof, DisableDispersalAuthenticationFlag, diff --git a/node/grpc/run.go b/node/grpc/run.go index cbcb2d74c6..9733e818ef 100644 --- a/node/grpc/run.go +++ b/node/grpc/run.go @@ -20,11 +20,12 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge if serverV1 == nil { return errors.New("node V1 server is not configured") } - if serverV2 == nil { + if config.EnableV2 && serverV2 == nil { return errors.New("node V2 server is not configured") } go func() { + // V1 dispersal service for { addr := fmt.Sprintf("%s:%s", localhost, config.InternalDispersalPort) listener, err := net.Listen("tcp", addr) @@ -33,14 +34,13 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge } opt := grpc.MaxRecvMsgSize(60 * 1024 * 1024 * 1024) // 60 GiB - gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption()) + gs := grpc.NewServer(opt) // Register reflection service on gRPC server // This makes "grpcurl -plaintext localhost:9000 list" command work reflection.Register(gs) pb.RegisterDispersalServer(gs, serverV1) - pbv2.RegisterDispersalServer(gs, serverV2) healthcheck.RegisterHealthServer("node.Dispersal", gs) @@ -51,6 +51,38 @@ func RunServers(serverV1 *Server, serverV2 *ServerV2, config *node.Config, logge } }() + // V2 dispersal service + go func() { + if !config.EnableV2 { + logger.Warn("V2 is not enabled, skipping V2 server startup") + return + } + for { + addr := fmt.Sprintf("%s:%s", localhost, config.V2DispersalPort) + listener, err := net.Listen("tcp", addr) + if err != nil { + logger.Fatalf("Could not start tcp listener: %v", err) + } + + opt := grpc.MaxRecvMsgSize(config.GRPCMsgSizeLimitV2) + gs := grpc.NewServer(opt, serverV2.metrics.GetGRPCServerOption()) + + // Register reflection service on gRPC server + // This makes "grpcurl -plaintext localhost:9000 list" command work + reflection.Register(gs) + + pbv2.RegisterDispersalServer(gs, serverV2) + + healthcheck.RegisterHealthServer("node.v2.Dispersal", gs) + + logger.Info("port", config.V2DispersalPort, "address", listener.Addr().String(), "GRPC Listening") + if err := gs.Serve(listener); err != nil { + logger.Error("dispersal v2 server failed; restarting.", "err", err) + } + } + }() + + // Retrieval service go func() { for { addr := fmt.Sprintf("%s:%s", localhost, config.InternalRetrievalPort) diff --git a/node/node.go b/node/node.go index 308dda6974..0faff82fe5 100644 --- a/node/node.go +++ b/node/node.go @@ -185,7 +185,7 @@ func NewNode( } nodeLogger.Info("Creating node", "chainID", chainID.String(), "operatorID", config.ID.Hex(), - "dispersalPort", config.DispersalPort, "retrievalPort", config.RetrievalPort, "churnerUrl", config.ChurnerUrl, + "dispersalPort", config.DispersalPort, "v2DispersalPort", config.V2DispersalPort, "retrievalPort", config.RetrievalPort, "churnerUrl", config.ChurnerUrl, "quorumIDs", fmt.Sprint(config.QuorumIDList), "registerNodeAtStart", config.RegisterNodeAtStart, "pubIPCheckInterval", config.PubIPCheckInterval, "eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding) @@ -286,11 +286,11 @@ func (n *Node) Start(ctx context.Context) error { } // Build the socket based on the hostname/IP provided in the CLI - socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort)) + socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort)) var operator *Operator if n.Config.RegisterNodeAtStart { n.Logger.Info("Registering node on chain with the following parameters:", "operatorId", - n.Config.ID.Hex(), "hostname", n.Config.Hostname, "dispersalPort", n.Config.DispersalPort, + n.Config.ID.Hex(), "hostname", n.Config.Hostname, "dispersalPort", n.Config.DispersalPort, "v2DispersalPort", n.Config.V2DispersalPort, "retrievalPort", n.Config.RetrievalPort, "churnerUrl", n.Config.ChurnerUrl, "quorumIds", fmt.Sprint(n.Config.QuorumIDList)) privateKey, err := crypto.HexToECDSA(n.Config.EthClientConfig.PrivateKeyString) if err != nil { @@ -648,7 +648,7 @@ func (n *Node) checkCurrentNodeIp(ctx context.Context) { case <-ctx.Done(): return case <-t.C: - newSocketAddr, err := SocketAddress(ctx, n.PubIPProvider, n.Config.DispersalPort, n.Config.RetrievalPort) + newSocketAddr, err := SocketAddress(ctx, n.PubIPProvider, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort) if err != nil { n.Logger.Error("failed to get socket address", "err", err) continue diff --git a/node/plugin/cmd/main.go b/node/plugin/cmd/main.go index 296c33b556..5d58c1ee17 100644 --- a/node/plugin/cmd/main.go +++ b/node/plugin/cmd/main.go @@ -135,7 +135,7 @@ func pluginOps(ctx *cli.Context) { return } - _, dispersalPort, retrievalPort, err := core.ParseOperatorSocket(config.Socket) + _, dispersalPort, retrievalPort, v2DispersalPort, err := core.ParseOperatorSocket(config.Socket) if err != nil { log.Printf("Error: failed to parse operator socket: %v", err) return @@ -144,7 +144,7 @@ func pluginOps(ctx *cli.Context) { socket := config.Socket if isLocalhost(socket) { pubIPProvider := pubip.ProviderOrDefault(logger, config.PubIPProvider) - socket, err = node.SocketAddress(context.Background(), pubIPProvider, dispersalPort, retrievalPort) + socket, err = node.SocketAddress(context.Background(), pubIPProvider, dispersalPort, retrievalPort, v2DispersalPort) if err != nil { log.Printf("Error: failed to get socket address from ip provider: %v", err) return diff --git a/node/utils.go b/node/utils.go index b2bd8b8d55..ce842e6936 100644 --- a/node/utils.go +++ b/node/utils.go @@ -121,12 +121,12 @@ func ValidatePointsFromBlobHeader(h *pb.BlobHeader) error { return nil } -func SocketAddress(ctx context.Context, provider pubip.Provider, dispersalPort string, retrievalPort string) (string, error) { +func SocketAddress(ctx context.Context, provider pubip.Provider, dispersalPort, retrievalPort, v2DispersalPort string) (string, error) { ip, err := provider.PublicIPAddress(ctx) if err != nil { return "", fmt.Errorf("failed to get public ip address from IP provider: %w", err) } - socket := core.MakeOperatorSocket(ip, dispersalPort, retrievalPort) + socket := core.MakeOperatorSocket(ip, dispersalPort, retrievalPort, v2DispersalPort) return socket.String(), nil } diff --git a/test/integration_test.go b/test/integration_test.go index dbe6c3bb0c..5387604f99 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -357,6 +357,7 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging RetrievalPort: op.RetrievalPort, InternalRetrievalPort: op.RetrievalPort, InternalDispersalPort: op.DispersalPort, + V2DispersalPort: op.V2DispersalPort, EnableMetrics: false, Timeout: 10, ExpirationPollIntervalSec: 10, @@ -380,7 +381,7 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging tx.On("GetBlockStaleMeasure").Return(nil) tx.On("GetStoreDurationBlocks").Return(nil) tx.On("OperatorIDToAddress").Return(gethcommon.Address{1}, nil) - socket := core.MakeOperatorSocket(config.Hostname, config.DispersalPort, config.RetrievalPort) + socket := core.MakeOperatorSocket(config.Hostname, config.DispersalPort, config.RetrievalPort, config.V2DispersalPort) tx.On("GetOperatorSocket", mock.Anything, mock.Anything).Return(socket.String(), nil) noopMetrics := metrics.NewNoopMetrics()