diff --git a/core/mock/tx.go b/core/mock/tx.go index 950f64fc89..1a345487d5 100644 --- a/core/mock/tx.go +++ b/core/mock/tx.go @@ -43,7 +43,7 @@ func (t *MockTransactor) RegisterOperator( operatorToAvsRegistrationSigSalt [32]byte, operatorToAvsRegistrationSigExpiry *big.Int, ) error { - args := t.Called() + args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry) return args.Error(0) } @@ -56,7 +56,7 @@ func (t *MockTransactor) RegisterOperatorWithChurn( operatorToAvsRegistrationSigSalt [32]byte, operatorToAvsRegistrationSigExpiry *big.Int, churnReply *churner.ChurnReply) error { - args := t.Called() + args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry, churnReply) return args.Error(0) } diff --git a/node/churner_client.go b/node/churner_client.go new file mode 100644 index 0000000000..3939bc57c1 --- /dev/null +++ b/node/churner_client.go @@ -0,0 +1,93 @@ +package node + +import ( + "context" + "crypto/tls" + "time" + + churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner" + "github.com/Layr-Labs/eigenda/churner" + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/core" + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +type ChurnerClient interface { + Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) +} + +type churnerClient struct { + churnerURL string + useSecureGrpc bool + timeout time.Duration + logger common.Logger +} + +func NewChurnerClient(churnerURL string, useSecureGrpc bool, timeout time.Duration, logger common.Logger) ChurnerClient { + return &churnerClient{ + churnerURL: churnerURL, + useSecureGrpc: useSecureGrpc, + timeout: timeout, + logger: logger, + } +} + +func (c *churnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) { + // generate salt + privateKeyBytes := []byte(keyPair.PrivKey.String()) + salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumIDs[:], privateKeyBytes) + + churnRequest := &churner.ChurnRequest{ + OperatorAddress: gethcommon.HexToAddress(operatorAddress), + OperatorToRegisterPubkeyG1: keyPair.PubKey, + OperatorToRegisterPubkeyG2: keyPair.GetPubKeyG2(), + OperatorRequestSignature: &core.Signature{}, + QuorumIDs: quorumIDs, + } + + copy(churnRequest.Salt[:], salt) + + // sign the request + churnRequest.OperatorRequestSignature = keyPair.SignMessage(churner.CalculateRequestHash(churnRequest)) + + // convert to protobuf + churnRequestPb := &churnerpb.ChurnRequest{ + OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(), + OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(), + OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(), + Salt: salt[:], + OperatorAddress: operatorAddress, + } + + churnRequestPb.QuorumIds = make([]uint32, len(quorumIDs)) + for i, quorumID := range quorumIDs { + churnRequestPb.QuorumIds[i] = uint32(quorumID) + } + credential := insecure.NewCredentials() + if c.useSecureGrpc { + config := &tls.Config{} + credential = credentials.NewTLS(config) + } + + conn, err := grpc.Dial( + c.churnerURL, + grpc.WithTransportCredentials(credential), + ) + if err != nil { + c.logger.Error("Node cannot connect to churner", "err", err) + return nil, err + } + defer conn.Close() + + gc := churnerpb.NewChurnerClient(conn) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300) + + return gc.Churn(ctx, churnRequestPb, opt) +} diff --git a/node/mock/churner_client.go b/node/mock/churner_client.go new file mode 100644 index 0000000000..a69d4f4eb7 --- /dev/null +++ b/node/mock/churner_client.go @@ -0,0 +1,30 @@ +package mock + +import ( + "context" + + churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/node" + "github.com/stretchr/testify/mock" +) + +type ChurnerClient struct { + mock.Mock +} + +var _ node.ChurnerClient = (*ChurnerClient)(nil) + +func (c *ChurnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) { + args := c.Called() + var reply *churnerpb.ChurnReply + if args.Get(0) != nil { + reply = (args.Get(0)).(*churnerpb.ChurnReply) + } + + var err error + if args.Get(1) != nil { + err = (args.Get(1)).(error) + } + return reply, err +} diff --git a/node/node.go b/node/node.go index 3a1f63c5ee..3d3feee712 100644 --- a/node/node.go +++ b/node/node.go @@ -203,7 +203,8 @@ func (n *Node) Start(ctx context.Context) error { OperatorId: n.Config.ID, QuorumIDs: n.Config.QuorumIDList, } - err = RegisterOperator(ctx, operator, n.Transactor, n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Logger) + churnerClient := NewChurnerClient(n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Config.Timeout, n.Logger) + err = RegisterOperator(ctx, operator, n.Transactor, churnerClient, n.Logger) if err != nil { return fmt.Errorf("failed to register the operator: %w", err) } diff --git a/node/operator.go b/node/operator.go index eb8fd58a73..e400937d19 100644 --- a/node/operator.go +++ b/node/operator.go @@ -3,21 +3,14 @@ package node import ( "context" "crypto/ecdsa" - "crypto/tls" - "errors" "fmt" "math/big" + "slices" "time" - grpcchurner "github.com/Layr-Labs/eigenda/api/grpc/churner" - "github.com/Layr-Labs/eigenda/churner" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/core" - gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" ) type Operator struct { @@ -31,27 +24,21 @@ type Operator struct { } // RegisterOperator operator registers the operator with the given public key for the given quorum IDs. -func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerUrl string, useSecureGrpc bool, logger common.Logger) error { - registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, operator.OperatorId) +func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerClient ChurnerClient, logger common.Logger) error { + quorumsToRegister, err := operator.getQuorumIdsToRegister(ctx, transactor) if err != nil { - return fmt.Errorf("failed to get registered quorum ids for an operator: %w", err) + return fmt.Errorf("failed to get quorum ids to register: %w", err) } - - logger.Debug("Registered quorum ids", "registeredQuorumIds", registeredQuorumIds) - if len(registeredQuorumIds) != 0 { + if len(quorumsToRegister) == 0 { return nil } - logger.Info("Quorums to register for", "quorums", operator.QuorumIDs) - - if len(operator.QuorumIDs) == 0 { - return errors.New("an operator should be in at least one quorum to be useful") - } + logger.Info("Quorums to register for", "quorums", quorumsToRegister) // register for quorums shouldCallChurner := false // check if one of the quorums to register for is full - for _, quorumID := range operator.QuorumIDs { + for _, quorumID := range quorumsToRegister { operatorSetParams, err := transactor.GetOperatorSetParams(ctx, quorumID) if err != nil { return err @@ -75,22 +62,22 @@ func RegisterOperator(ctx context.Context, operator *Operator, transactor core.T privateKeyBytes := []byte(operator.KeyPair.PrivKey.String()) salt := [32]byte{} - copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), operator.QuorumIDs[:], privateKeyBytes)) + copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumsToRegister, privateKeyBytes)) // Get the current block number expiry := big.NewInt((time.Now().Add(10 * time.Minute)).Unix()) // if we should call the churner, call it if shouldCallChurner { - churnReply, err := requestChurnApproval(ctx, operator, churnerUrl, useSecureGrpc, logger) + churnReply, err := churnerClient.Churn(ctx, operator.Address, operator.KeyPair, quorumsToRegister) if err != nil { return fmt.Errorf("failed to request churn approval: %w", err) } - return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry, churnReply) + return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry, churnReply) } else { // other wise just register normally - return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry) + return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry) } } @@ -108,15 +95,14 @@ func UpdateOperatorQuorums( ctx context.Context, operator *Operator, transactor core.Transactor, - churnerUrl string, - useSecureGrpc bool, + churnerClient ChurnerClient, logger common.Logger, ) error { err := DeregisterOperator(ctx, operator.KeyPair, transactor) if err != nil { return fmt.Errorf("failed to deregister operator: %w", err) } - return RegisterOperator(ctx, operator, transactor, churnerUrl, useSecureGrpc, logger) + return RegisterOperator(ctx, operator, transactor, churnerClient, logger) } // UpdateOperatorSocket updates the socket for the given operator @@ -124,67 +110,23 @@ func UpdateOperatorSocket(ctx context.Context, transactor core.Transactor, socke return transactor.UpdateOperatorSocket(ctx, socket) } -func requestChurnApproval(ctx context.Context, operator *Operator, churnerUrl string, useSecureGrpc bool, logger common.Logger) (*grpcchurner.ChurnReply, error) { - logger.Info("churner url", "url", churnerUrl) - - credential := insecure.NewCredentials() - if useSecureGrpc { - config := &tls.Config{} - credential = credentials.NewTLS(config) +// getQuorumIdsToRegister returns the quorum ids that the operator is not registered in. +func (c *Operator) getQuorumIdsToRegister(ctx context.Context, transactor core.Transactor) ([]core.QuorumID, error) { + if len(c.QuorumIDs) == 0 { + return nil, fmt.Errorf("an operator should be in at least one quorum to be useful") } - conn, err := grpc.Dial( - churnerUrl, - grpc.WithTransportCredentials(credential), - ) + registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, c.OperatorId) if err != nil { - logger.Error("Node cannot connect to churner", "err", err) - return nil, err + return nil, fmt.Errorf("failed to get registered quorum ids for an operator: %w", err) } - defer conn.Close() - - gc := grpcchurner.NewChurnerClient(conn) - ctx, cancel := context.WithTimeout(ctx, operator.Timeout) - defer cancel() - - request := newChurnRequest(operator.Address, operator.KeyPair, operator.QuorumIDs) - opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300) - - return gc.Churn(ctx, request, opt) -} - -func newChurnRequest(address string, KeyPair *core.KeyPair, QuorumIDs []core.QuorumID) *grpcchurner.ChurnRequest { - // generate salt - privateKeyBytes := []byte(KeyPair.PrivKey.String()) - salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), QuorumIDs[:], privateKeyBytes) - - churnRequest := &churner.ChurnRequest{ - OperatorAddress: gethcommon.HexToAddress(address), - OperatorToRegisterPubkeyG1: KeyPair.PubKey, - OperatorToRegisterPubkeyG2: KeyPair.GetPubKeyG2(), - OperatorRequestSignature: &core.Signature{}, - QuorumIDs: QuorumIDs, - } - - copy(churnRequest.Salt[:], salt) - - // sign the request - churnRequest.OperatorRequestSignature = KeyPair.SignMessage(churner.CalculateRequestHash(churnRequest)) - - // convert to protobuf - churnRequestPb := &grpcchurner.ChurnRequest{ - OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(), - OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(), - OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(), - Salt: salt[:], - OperatorAddress: address, - } - - churnRequestPb.QuorumIds = make([]uint32, len(QuorumIDs)) - for i, quorumID := range QuorumIDs { - churnRequestPb.QuorumIds[i] = uint32(quorumID) + quorumIdsToRegister := make([]core.QuorumID, 0, len(c.QuorumIDs)) + for _, quorumID := range c.QuorumIDs { + if !slices.Contains(registeredQuorumIds, quorumID) { + quorumIdsToRegister = append(quorumIdsToRegister, quorumID) + } } - return churnRequestPb + return quorumIdsToRegister, nil } diff --git a/node/operator_test.go b/node/operator_test.go new file mode 100644 index 0000000000..f566fd6410 --- /dev/null +++ b/node/operator_test.go @@ -0,0 +1,80 @@ +package node_test + +import ( + "context" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/common/logging" + "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" + "github.com/Layr-Labs/eigenda/node" + nodemock "github.com/Layr-Labs/eigenda/node/mock" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestRegisterOperator(t *testing.T) { + logger, err := logging.GetLogger(logging.DefaultCLIConfig()) + assert.NoError(t, err) + operatorID := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad")) + keyPair, err := core.GenRandomBlsKeys() + assert.NoError(t, err) + // Create a new operator + operator := &node.Operator{ + Address: "0xB7Ad27737D88B07De48CDc2f379917109E993Be4", + Socket: "localhost:50051", + Timeout: 10 * time.Second, + PrivKey: nil, + KeyPair: keyPair, + OperatorId: operatorID, + QuorumIDs: []core.QuorumID{0, 1}, + } + tx := &coremock.MockTransactor{} + tx.On("GetRegisteredQuorumIdsForOperator").Return([]uint8{0}, nil) + tx.On("GetOperatorSetParams").Return(&core.OperatorSetParam{ + MaxOperatorCount: 1, + ChurnBIPsOfOperatorStake: 20, + ChurnBIPsOfTotalStake: 20000, + }, nil) + tx.On("GetNumberOfRegisteredOperatorForQuorum").Return(uint32(0), nil) + tx.On("RegisterOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + churnerClient := &nodemock.ChurnerClient{} + churnerClient.On("Churn").Return(nil, nil) + err = node.RegisterOperator(context.Background(), operator, tx, churnerClient, logger) + assert.NoError(t, err) + tx.AssertCalled(t, "RegisterOperator", mock.Anything, mock.Anything, mock.Anything, []core.QuorumID{1}, mock.Anything, mock.Anything, mock.Anything) +} + +func TestRegisterOperatorWithChurn(t *testing.T) { + logger, err := logging.GetLogger(logging.DefaultCLIConfig()) + assert.NoError(t, err) + operatorID := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad")) + keyPair, err := core.GenRandomBlsKeys() + assert.NoError(t, err) + // Create a new operator + operator := &node.Operator{ + Address: "0xB7Ad27737D88B07De48CDc2f379917109E993Be4", + Socket: "localhost:50051", + Timeout: 10 * time.Second, + PrivKey: nil, + KeyPair: keyPair, + OperatorId: operatorID, + QuorumIDs: []core.QuorumID{0, 1}, + } + tx := &coremock.MockTransactor{} + tx.On("GetRegisteredQuorumIdsForOperator").Return([]uint8{0}, nil) + tx.On("GetOperatorSetParams").Return(&core.OperatorSetParam{ + MaxOperatorCount: 1, + ChurnBIPsOfOperatorStake: 20, + ChurnBIPsOfTotalStake: 20000, + }, nil) + tx.On("GetNumberOfRegisteredOperatorForQuorum").Return(uint32(1), nil) + tx.On("RegisterOperatorWithChurn", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + churnerClient := &nodemock.ChurnerClient{} + churnerClient.On("Churn").Return(nil, nil) + err = node.RegisterOperator(context.Background(), operator, tx, churnerClient, logger) + assert.NoError(t, err) + tx.AssertCalled(t, "RegisterOperatorWithChurn", mock.Anything, mock.Anything, mock.Anything, []core.QuorumID{1}, mock.Anything, mock.Anything, mock.Anything, mock.Anything) +} diff --git a/node/plugin/cmd/main.go b/node/plugin/cmd/main.go index ef16f83506..377ef5783d 100644 --- a/node/plugin/cmd/main.go +++ b/node/plugin/cmd/main.go @@ -124,9 +124,10 @@ func pluginOps(ctx *cli.Context) { OperatorId: keyPair.GetPubKeyG1().GetOperatorID(), QuorumIDs: config.QuorumIDList, } + churnerClient := node.NewChurnerClient(config.ChurnerUrl, true, operator.Timeout, logger) if config.Operation == "opt-in" { log.Printf("Info: Operator with Operator Address: %x is opting in to EigenDA", sk.Address) - err = node.RegisterOperator(context.Background(), operator, tx, config.ChurnerUrl, true, logger) + err = node.RegisterOperator(context.Background(), operator, tx, churnerClient, logger) if err != nil { log.Printf("Error: failed to opt-in EigenDA Node Network for operator ID: %x, operator address: %x, error: %v", operatorID, sk.Address, err) return @@ -142,7 +143,7 @@ func pluginOps(ctx *cli.Context) { log.Printf("Info: successfully opt-out the EigenDA, for operator ID: %x, operator address: %x", operatorID, sk.Address) } else if config.Operation == "update-quorums" { log.Printf("Info: Operator with Operator Address: %x is updating its quorums: %v", sk.Address, config.QuorumIDList) - err = node.UpdateOperatorQuorums(context.Background(), operator, tx, config.ChurnerUrl, true, logger) + err = node.UpdateOperatorQuorums(context.Background(), operator, tx, churnerClient, logger) if err != nil { log.Printf("Error: failed to update quorums for operator ID: %x, operator address: %x, quorums: %v, error: %v", operatorID, sk.Address, config.QuorumIDList, err) return