diff --git a/cmd/replication/main.go b/cmd/replication/main.go index 1fc47020..ba4d2254 100644 --- a/cmd/replication/main.go +++ b/cmd/replication/main.go @@ -9,6 +9,7 @@ import ( "syscall" "github.com/jessevdk/go-flags" + "github.com/xmtp/xmtpd/pkg/config" "github.com/xmtp/xmtpd/pkg/registry" "github.com/xmtp/xmtpd/pkg/server" "github.com/xmtp/xmtpd/pkg/tracing" @@ -18,7 +19,7 @@ import ( var Commit string -var options server.Options +var options config.ServerOptions func main() { if _, err := flags.Parse(&options); err != nil { @@ -81,7 +82,7 @@ func fatal(msg string, args ...any) { log.Fatalf(msg, args...) } -func buildLogger(options server.Options) (*zap.Logger, *zap.Config, error) { +func buildLogger(options config.ServerOptions) (*zap.Logger, *zap.Config, error) { atom := zap.NewAtomicLevel() level := zapcore.InfoLevel err := level.Set(options.LogLevel) diff --git a/pkg/server/options.go b/pkg/config/options.go similarity index 72% rename from pkg/server/options.go rename to pkg/config/options.go index 06a004ef..25f4be31 100644 --- a/pkg/server/options.go +++ b/pkg/config/options.go @@ -1,15 +1,19 @@ -package server +package config import ( "time" - - "github.com/xmtp/xmtpd/pkg/indexer" ) type ApiOptions struct { Port int `short:"p" long:"port" description:"Port to listen on" default:"5050"` } +type ContractsOptions struct { + RpcUrl string `log:"rpc-url" description:"Blockchain RPC URL"` + NodesContractAddress string `long:"nodes-address" description:"Node contract address"` + MessagesContractAddress string `long:"messages-address" description:"Message contract address"` +} + type DbOptions struct { ReaderConnectionString string `long:"reader-connection-string" description:"Reader connection string"` WriterConnectionString string `long:"writer-connection-string" description:"Writer connection string" required:"true"` @@ -26,7 +30,7 @@ type Options struct { PrivateKeyString string `long:"private-key" description:"Private key to use for the node"` - API ApiOptions `group:"API Options" namespace:"api"` - DB DbOptions `group:"Database Options" namespace:"db"` - Contracts indexer.ContractsOptions `group:"Contracts Options" namespace:"contracts"` + API ApiOptions `group:"API Options" namespace:"api"` + DB DbOptions `group:"Database Options" namespace:"db"` + Contracts ContractsOptions `group:"Contracts Options" namespace:"contracts"` } diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index a64f4eab..9ad1cdc4 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/xmtp/xmtpd/pkg/abis" + "github.com/xmtp/xmtpd/pkg/config" "github.com/xmtp/xmtpd/pkg/db/queries" "github.com/xmtp/xmtpd/pkg/indexer/blockchain" "github.com/xmtp/xmtpd/pkg/indexer/storer" @@ -15,16 +16,16 @@ import ( ) // Start the indexer and run until the context is canceled -func StartIndexer(ctx context.Context, logger *zap.Logger, queries *queries.Queries, options ContractsOptions) error { - builder := blockchain.NewRpcLogStreamBuilder(options.RpcUrl, logger) +func StartIndexer(ctx context.Context, logger *zap.Logger, queries *queries.Queries, cfg config.ContractsOptions) error { + builder := blockchain.NewRpcLogStreamBuilder(cfg.RpcUrl, logger) messagesTopic, err := buildMessagesTopic() if err != nil { return err } - messagesChannel := builder.ListenForContractEvent(0, common.HexToAddress(options.MessagesContractAddress), []common.Hash{messagesTopic}) - indexLogs(ctx, messagesChannel, logger.Named("indexLogs").With(zap.String("contractAddress", options.MessagesContractAddress)), storer.NewGroupMessageStorer(queries, logger)) + messagesChannel := builder.ListenForContractEvent(0, common.HexToAddress(cfg.MessagesContractAddress), []common.Hash{messagesTopic}) + indexLogs(ctx, messagesChannel, logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)), storer.NewGroupMessageStorer(queries, logger)) streamer, err := builder.Build() if err != nil { diff --git a/pkg/indexer/options.go b/pkg/indexer/options.go deleted file mode 100644 index be240f45..00000000 --- a/pkg/indexer/options.go +++ /dev/null @@ -1,7 +0,0 @@ -package indexer - -type ContractsOptions struct { - RpcUrl string `log:"rpc-url" description:"Blockchain RPC URL"` - NodesContractAddress string `long:"nodes-address" description:"Node contract address"` - MessagesContractAddress string `long:"messages-address" description:"Message contract address"` -} diff --git a/pkg/registry/contractRegistry.go b/pkg/registry/contractRegistry.go new file mode 100644 index 00000000..d5a038ac --- /dev/null +++ b/pkg/registry/contractRegistry.go @@ -0,0 +1,109 @@ +package registry + +import ( + "bytes" + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/xmtp/xmtpd/pkg/abis" + "github.com/xmtp/xmtpd/pkg/config" +) + +const ( + CONTRACT_CALL_TIMEOUT = 10 * time.Second +) + +type SmartContractRegistry struct { + contract *abis.NodesCaller + rawNodes map[uint16]abis.NodesNode + rawNodesMutex sync.Mutex + ctx context.Context +} + +func NewSmartContractRegistry(ethclient bind.ContractCaller, options config.ContractsOptions) (*SmartContractRegistry, error) { + contract, err := abis.NewNodesCaller(common.HexToAddress(options.NodesContractAddress), ethclient) + if err != nil { + return nil, err + } + + return &SmartContractRegistry{contract: contract}, nil +} + +// Start the registry with the given context. +// To stop the registry, cancel the context +func (s *SmartContractRegistry) Start(ctx context.Context) error { + s.ctx = ctx + // If we can't load the data at least once, fail to start the service + if err := s.refreshData(); err != nil { + return err + } + + go s.refreshLoop() + + return nil +} + +func (s *SmartContractRegistry) refreshLoop() { + ticker := time.NewTicker(1 * time.Minute) + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.refreshData() + } + } +} + +func (s *SmartContractRegistry) refreshData() error { + rawNodes, err := s.loadFromContract() + if err != nil { + return err + } + + // Lock the mutex to protect against concurrent writes + s.rawNodesMutex.Lock() + defer s.rawNodesMutex.Unlock() + + var hasChanged bool + newNodes := []Node{} + changedNodes := []Node{} + for _, rawNodeWithId := range rawNodes { + existingValue, ok := s.rawNodes[rawNodeWithId.NodeId] + if !ok { + // New node found + newNodes = append(newNodes, convertNode(rawNodeWithId)) + } else if !equalRawNodes(existingValue, rawNodeWithId.Node) { + changedNodes = append(changedNodes, convertNode(rawNodeWithId)) + } + } + + return nil +} + +func (s *SmartContractRegistry) loadFromContract() ([]abis.NodesNodeWithId, error) { + ctx, cancel := context.WithTimeout(s.ctx, CONTRACT_CALL_TIMEOUT) + defer cancel() + nodes, err := s.contract.AllNodes(&bind.CallOpts{Context: ctx}) + if err != nil { + return nil, err + } + + return nodes, nil +} + +func convertNode(rawNode abis.NodesNodeWithId) Node { + return Node{ + NodeId: rawNode.NodeId, + SigningKey: rawNode.Node.SigningKeyPub, + HttpAddress: rawNode.Node.HttpAddress, + IsHealthy: rawNode.Node.IsHealthy, + } +} + +func equalRawNodes(a abis.NodesNode, b abis.NodesNode) bool { + return bytes.Equal(a.SigningKeyPub, b.SigningKeyPub) && a.HttpAddress == b.HttpAddress && a.IsHealthy == b.IsHealthy && bytes.Equal(a.MtlsCert, b.MtlsCert) +} diff --git a/pkg/registry/interface.go b/pkg/registry/interface.go new file mode 100644 index 00000000..9154a317 --- /dev/null +++ b/pkg/registry/interface.go @@ -0,0 +1,13 @@ +package registry + +type Node struct { + NodeId uint16 + SigningKey []byte + HttpAddress string + MtlsCert []byte + IsHealthy bool +} + +type NodeRegistry interface { + GetNodes() ([]Node, error) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 2c169066..b7c61dd0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -11,13 +11,14 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/xmtp/xmtpd/pkg/api" + "github.com/xmtp/xmtpd/pkg/config" "github.com/xmtp/xmtpd/pkg/db" "github.com/xmtp/xmtpd/pkg/registry" "go.uber.org/zap" ) type ReplicationServer struct { - options Options + options config.ServerOptions log *zap.Logger ctx context.Context cancel context.CancelFunc @@ -28,7 +29,7 @@ type ReplicationServer struct { // Can add reader DB later if needed } -func NewReplicationServer(ctx context.Context, log *zap.Logger, options Options, nodeRegistry registry.NodeRegistry) (*ReplicationServer, error) { +func NewReplicationServer(ctx context.Context, log *zap.Logger, options config.ServerOptions, nodeRegistry registry.NodeRegistry) (*ReplicationServer, error) { var err error s := &ReplicationServer{ options: options, diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 9bf33b04..6e0d2674 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/require" + "github.com/xmtp/xmtpd/pkg/config" "github.com/xmtp/xmtpd/pkg/registry" test "github.com/xmtp/xmtpd/pkg/testing" ) @@ -19,12 +20,12 @@ func NewTestServer(t *testing.T, registry registry.NodeRegistry) *ReplicationSer privateKey, err := crypto.GenerateKey() require.NoError(t, err) - server, err := NewReplicationServer(context.Background(), log, Options{ + server, err := NewReplicationServer(context.Background(), log, config.ServerOptions{ PrivateKeyString: hex.EncodeToString(crypto.FromECDSA(privateKey)), - API: ApiOptions{ + API: config.ApiOptions{ Port: 0, }, - DB: DbOptions{ + DB: config.DbOptions{ WriterConnectionString: WRITER_DB_CONNECTION_STRING, ReadTimeout: time.Second * 10, WriteTimeout: time.Second * 10,