Skip to content

Commit

Permalink
Update node socket registration format (#1132)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Jan 23, 2025
1 parent d8a09a3 commit 0f22447
Show file tree
Hide file tree
Showing 20 changed files with 230 additions and 91 deletions.
15 changes: 9 additions & 6 deletions core/mock/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ var _ core.IndexedChainState = (*ChainDataMock)(nil)

type PrivateOperatorInfo struct {
*core.IndexedOperatorInfo
KeyPair *core.KeyPair
Signer blssigner.Signer
Host string
DispersalPort string
RetrievalPort string
KeyPair *core.KeyPair
Signer blssigner.Signer
Host string
DispersalPort string
RetrievalPort string
V2DispersalPort string
}

type PrivateOperatorState struct {
Expand Down Expand Up @@ -138,7 +139,8 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl
host := "0.0.0.0"
dispersalPort := fmt.Sprintf("3%03v", 2*i)
retrievalPort := fmt.Sprintf("3%03v", 2*i+1)
socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort)
v2DispersalPort := fmt.Sprintf("3%03v", 2*i+2)
socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort, v2DispersalPort)

indexed := &core.IndexedOperatorInfo{
Socket: string(socket),
Expand All @@ -158,6 +160,7 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl
Host: host,
DispersalPort: dispersalPort,
RetrievalPort: retrievalPort,
V2DispersalPort: v2DispersalPort,
}

indexedOperators[id] = indexed
Expand Down
26 changes: 5 additions & 21 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"math/big"
"regexp"
"slices"

"github.com/Layr-Labs/eigenda/api"
Expand Down Expand Up @@ -528,33 +527,18 @@ func decode(data []byte, obj any) error {
return nil
}

func (s OperatorSocket) GetDispersalSocket() string {
ip, port1, _, err := extractIPAndPorts(string(s))
func (s OperatorSocket) GetV1DispersalSocket() string {
ip, v1DispersalPort, _, _, err := ParseOperatorSocket(string(s))
if err != nil {
return ""
}
return fmt.Sprintf("%s:%s", ip, port1)
return fmt.Sprintf("%s:%s", ip, v1DispersalPort)
}

func (s OperatorSocket) GetRetrievalSocket() string {
ip, _, port2, err := extractIPAndPorts(string(s))
ip, _, retrievalPort, _, err := ParseOperatorSocket(string(s))
if err != nil {
return ""
}
return fmt.Sprintf("%s:%s", ip, port2)
}

func extractIPAndPorts(s string) (string, string, string, error) {
regex := regexp.MustCompile(`^([^:]+):([^;]+);([^;]+)$`)
matches := regex.FindStringSubmatch(s)

if len(matches) != 4 {
return "", "", "", errors.New("input string does not match expected format")
}

ip := matches[1]
port1 := matches[2]
port2 := matches[3]

return ip, port1, port2, nil
return fmt.Sprintf("%s:%s", ip, retrievalPort)
}
60 changes: 54 additions & 6 deletions core/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,68 @@ func TestHashPubKeyG1(t *testing.T) {
}

func TestParseOperatorSocket(t *testing.T) {
operatorSocket := "localhost:1234;5678"
host, dispersalPort, retrievalPort, err := core.ParseOperatorSocket(operatorSocket)
operatorSocket := "localhost:1234;5678;9999"
host, dispersalPort, retrievalPort, v2DispersalPort, err := core.ParseOperatorSocket(operatorSocket)
assert.NoError(t, err)
assert.Equal(t, "localhost", host)
assert.Equal(t, "1234", dispersalPort)
assert.Equal(t, "5678", retrievalPort)
assert.Equal(t, "9999", v2DispersalPort)

_, _, _, err = core.ParseOperatorSocket("localhost:12345678")
host, dispersalPort, retrievalPort, v2DispersalPort, err = core.ParseOperatorSocket("localhost:1234;5678")
assert.NoError(t, err)
assert.Equal(t, "localhost", host)
assert.Equal(t, "1234", dispersalPort)
assert.Equal(t, "5678", retrievalPort)
assert.Equal(t, "", v2DispersalPort)

_, _, _, _, err = core.ParseOperatorSocket("localhost;1234;5678")
assert.NotNil(t, err)
assert.Equal(t, "invalid socket address format, missing retrieval port: localhost:12345678", err.Error())
assert.ErrorContains(t, err, "invalid socket address format")

_, _, _, err = core.ParseOperatorSocket("localhost1234;5678")
_, _, _, _, err = core.ParseOperatorSocket("localhost:12345678")
assert.NotNil(t, err)
assert.Equal(t, "invalid socket address format: localhost1234;5678", err.Error())
assert.ErrorContains(t, err, "invalid socket address format")

_, _, _, _, err = core.ParseOperatorSocket("localhost1234;5678")
assert.NotNil(t, err)
assert.ErrorContains(t, err, "invalid socket address format")
}

func TestGetV1DispersalSocket(t *testing.T) {
operatorSocket := core.OperatorSocket("localhost:1234;5678;9999")
socket := operatorSocket.GetV1DispersalSocket()
assert.Equal(t, "localhost:1234", socket)

operatorSocket = core.OperatorSocket("localhost:1234;5678")
socket = operatorSocket.GetV1DispersalSocket()
assert.Equal(t, "localhost:1234", socket)

operatorSocket = core.OperatorSocket("localhost:1234;5678;")
socket = operatorSocket.GetV1DispersalSocket()
assert.Equal(t, "localhost:1234", socket)

operatorSocket = core.OperatorSocket("localhost:1234")
socket = operatorSocket.GetV1DispersalSocket()
assert.Equal(t, "", socket)
}

func TestGetRetrievalSocket(t *testing.T) {
operatorSocket := core.OperatorSocket("localhost:1234;5678;9999")
socket := operatorSocket.GetRetrievalSocket()
assert.Equal(t, "localhost:5678", socket)

operatorSocket = core.OperatorSocket("localhost:1234;5678")
socket = operatorSocket.GetRetrievalSocket()
assert.Equal(t, "localhost:5678", socket)

operatorSocket = core.OperatorSocket("localhost:1234;5678;")
socket = operatorSocket.GetRetrievalSocket()
assert.Equal(t, "localhost:5678", socket)

operatorSocket = core.OperatorSocket("localhost:1234")
socket = operatorSocket.GetRetrievalSocket()
assert.Equal(t, "", socket)
}

func TestSignatureBytes(t *testing.T) {
Expand Down
44 changes: 32 additions & 12 deletions core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,55 @@ import (

// Operators

// OperatorSocket is formatted as "host:dispersalPort;retrievalPort;v2DispersalPort"
type OperatorSocket string

func (s OperatorSocket) String() string {
return string(s)
}

func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort string) OperatorSocket {
return OperatorSocket(fmt.Sprintf("%s:%s;%s", nodeIP, dispersalPort, retrievalPort))
func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort, v2DispersalPort string) OperatorSocket {
if v2DispersalPort == "" {
return OperatorSocket(fmt.Sprintf("%s:%s;%s", nodeIP, dispersalPort, retrievalPort))
}
return OperatorSocket(fmt.Sprintf("%s:%s;%s;%s", nodeIP, dispersalPort, retrievalPort, v2DispersalPort))
}

type StakeAmount = *big.Int

func ParseOperatorSocket(socket string) (host string, dispersalPort string, retrievalPort string, err error) {
func ParseOperatorSocket(socket string) (host string, dispersalPort string, retrievalPort string, v2DispersalPort string, err error) {
s := strings.Split(socket, ";")
if len(s) != 2 {
err = fmt.Errorf("invalid socket address format, missing retrieval port: %s", socket)

if len(s) == 2 {
// no v2 dispersal port
retrievalPort = s[1]
s = strings.Split(s[0], ":")
if len(s) != 2 {
err = fmt.Errorf("invalid socket address format: %s", socket)
return
}
host = s[0]
dispersalPort = s[1]

return
}
retrievalPort = s[1]

s = strings.Split(s[0], ":")
if len(s) != 2 {
err = fmt.Errorf("invalid socket address format: %s", socket)
if len(s) == 3 {
// all ports specified
v2DispersalPort = s[2]
retrievalPort = s[1]

s = strings.Split(s[0], ":")
if len(s) != 2 {
err = fmt.Errorf("invalid socket address format: %s", socket)
return
}
host = s[0]
dispersalPort = s[1]
return
}
host = s[0]
dispersalPort = s[1]

return
return "", "", "", "", fmt.Errorf("invalid socket address format %s: it must specify dispersal port, retrieval port, and/or v2 dispersal port (ex. 0.0.0.0:32004;32005;32006)", socket)
}

// OperatorInfo contains information about an operator which is stored on the blockchain state,
Expand Down
4 changes: 2 additions & 2 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.EncodedBlobMe
// TODO Add secure Grpc

conn, err := grpc.NewClient(
core.OperatorSocket(op.Socket).GetDispersalSocket(),
core.OperatorSocket(op.Socket).GetV1DispersalSocket(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetDispersalSocket(), "err", err)
c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetV1DispersalSocket(), "err", err)
return nil, err
}
defer conn.Close()
Expand Down
2 changes: 1 addition & 1 deletion disperser/common/semver/semver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, oper
if useRetrievalSocket {
socket = operatorSocket.GetRetrievalSocket()
} else {
socket = operatorSocket.GetDispersalSocket()
socket = operatorSocket.GetV1DispersalSocket()
}
semver := GetSemverInfo(context.Background(), socket, useRetrievalSocket, operatorId, logger, nodeInfoTimeout)

Expand Down
6 changes: 3 additions & 3 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
for opID, op := range state.IndexedOperators {
opID := opID
op := op
host, dispersalPort, _, err := core.ParseOperatorSocket(op.Socket)
host, _, _, v2DispersalPort, err := core.ParseOperatorSocket(op.Socket)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse operator socket: %w", err)
return nil, nil, fmt.Errorf("failed to parse operator socket (%s): %w", op.Socket, err)
}

client, err := d.nodeClientManager.GetClient(host, dispersalPort)
client, err := d.nodeClientManager.GetClient(host, v2DispersalPort)
if err != nil {
d.logger.Error("failed to get node client", "operator", opID.Hex(), "err", err)
continue
Expand Down
18 changes: 9 additions & 9 deletions disperser/controller/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ func TestDispatcherHandleBatch(t *testing.T) {
mockClient0 := clientsmock.NewNodeClient()
sig0 := mockChainState.KeyPairs[opId0].SignMessage(bhh)
mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(sig0, nil)
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].V2DispersalPort
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].V2DispersalPort
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].V2DispersalPort
require.NotEqual(t, op0Port, op1Port)
require.NotEqual(t, op0Port, op2Port)
components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil)
Expand Down Expand Up @@ -150,9 +150,9 @@ func TestDispatcherInsufficientSignatures(t *testing.T) {
// only op2 signs - quorum 0 will have 0 signing rate, quorum 1 will have 20%
mockClient0 := clientsmock.NewNodeClient()
mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure"))
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].V2DispersalPort
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].V2DispersalPort
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].V2DispersalPort
require.NotEqual(t, op0Port, op1Port)
require.NotEqual(t, op0Port, op2Port)
components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil)
Expand Down Expand Up @@ -223,9 +223,9 @@ func TestDispatcherInsufficientSignatures2(t *testing.T) {
// no operators sign, all blobs will have insufficient signatures
mockClient0 := clientsmock.NewNodeClient()
mockClient0.On("StoreChunks", mock.Anything, mock.Anything).Return(nil, errors.New("failure"))
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].DispersalPort
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].DispersalPort
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].DispersalPort
op0Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId0].V2DispersalPort
op1Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId1].V2DispersalPort
op2Port := mockChainState.GetTotalOperatorState(ctx, uint(blockNumber)).PrivateOperators[opId2].V2DispersalPort
require.NotEqual(t, op0Port, op1Port)
require.NotEqual(t, op0Port, op2Port)
components.NodeClientManager.On("GetClient", mock.Anything, op0Port).Return(mockClient0, nil)
Expand Down
2 changes: 1 addition & 1 deletion disperser/dataapi/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (oh *OperatorHandler) ProbeOperatorHosts(ctx context.Context, operatorId st
retrievalOnline, retrievalStatus = checkServiceOnline(ctx, "node.Retrieval", retrievalSocket, 3*time.Second)
}

dispersalSocket := operatorSocket.GetDispersalSocket()
dispersalSocket := operatorSocket.GetV1DispersalSocket()
dispersalPortOpen := checkIsOperatorPortOpen(dispersalSocket, 3, oh.logger)
dispersalOnline, dispersalStatus := false, fmt.Sprintf("port closed or unreachable for %s", dispersalSocket)
if dispersalPortOpen {
Expand Down
2 changes: 1 addition & 1 deletion disperser/dataapi/subgraph_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ var (
},
SocketUpdates: []subgraph.SocketUpdates{
{
Socket: "localhost:32008;32009",
Socket: "localhost:32008;32009;32010",
},
},
}
Expand Down
10 changes: 6 additions & 4 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (env *Config) generateRelayVars(ind int, graphUrl, grpcPort string) RelayVa
}

// Generates DA node .env
func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, metricsPort, nodeApiPort string) OperatorVars {
func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, metricsPort, nodeApiPort string) OperatorVars {

max, _ := new(big.Int).SetString("21888242871839275222246405745257275088548364400416034343698204186575808495617", 10)
// max.Exp(big.NewInt(2), big.NewInt(130), nil).Sub(max, big.NewInt(1))
Expand All @@ -411,6 +411,7 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath,
NODE_RETRIEVAL_PORT: retrievalPort,
NODE_INTERNAL_DISPERSAL_PORT: dispersalPort,
NODE_INTERNAL_RETRIEVAL_PORT: retrievalPort,
NODE_V2_DISPERSAL_PORT: v2DispersalPort,
NODE_ENABLE_METRICS: "true",
NODE_METRICS_PORT: metricsPort,
NODE_ENABLE_NODE_API: "true",
Expand Down Expand Up @@ -651,16 +652,17 @@ func (env *Config) GenerateAllVariables() {
metricsPort := fmt.Sprint(port + 1) // port
dispersalPort := fmt.Sprint(port + 2)
retrievalPort := fmt.Sprint(port + 3)
nodeApiPort := fmt.Sprint(port + 4)
port += 5
v2DispersalPort := fmt.Sprint(port + 4)
nodeApiPort := fmt.Sprint(port + 5)
port += 6

name := fmt.Sprintf("opr%v", i)
logPath, dbPath, filename, envFile := env.getPaths(name)
key, _ := env.getKey(name)

// Convert key to address

operatorConfig := env.generateOperatorVars(i, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, fmt.Sprint(metricsPort), nodeApiPort)
operatorConfig := env.generateOperatorVars(i, name, key, churnerUrl, logPath, dbPath, dispersalPort, retrievalPort, v2DispersalPort, fmt.Sprint(metricsPort), nodeApiPort)
writeEnv(operatorConfig.getEnvMap(), envFile)
env.Operators = append(env.Operators, operatorConfig)

Expand Down
19 changes: 10 additions & 9 deletions inabox/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"math/big"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/Layr-Labs/eigenda/common"
caws "github.com/Layr-Labs/eigenda/common/aws"
relayreg "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDARelayRegistry"
Expand All @@ -14,14 +23,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/kms"
"github.com/aws/aws-sdk-go-v2/service/kms/types"
"github.com/ethereum/go-ethereum/crypto"
"io"
"log"
"math/big"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -364,7 +365,7 @@ func (env *Config) StopAnvil() {
func (env *Config) RunNodePluginBinary(operation string, operator OperatorVars) {
changeDirectory(filepath.Join(env.rootPath, "inabox"))

socket := string(core.MakeOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT))
socket := string(core.MakeOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT, operator.NODE_V2_DISPERSAL_PORT))

envVars := []string{
"NODE_OPERATION=" + operation,
Expand Down
Loading

0 comments on commit 0f22447

Please sign in to comment.