Skip to content

Commit

Permalink
Use new node contract in registry
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Aug 11, 2024
1 parent 31b5549 commit 09e41c5
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 24 deletions.
5 changes: 3 additions & 2 deletions cmd/replication/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"syscall"

"github.com/jessevdk/go-flags"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/registry"

Check failure on line 13 in cmd/replication/main.go

View workflow job for this annotation

GitHub Actions / Lint

could not import github.com/xmtp/xmtpd/pkg/registry (-: # github.com/xmtp/xmtpd/pkg/registry
"github.com/xmtp/xmtpd/pkg/server"
"github.com/xmtp/xmtpd/pkg/tracing"
Expand All @@ -18,7 +19,7 @@ import (

var Commit string

var options server.Options
var options config.ServerOptions

func main() {
if _, err := flags.Parse(&options); err != nil {
Expand Down Expand Up @@ -81,7 +82,7 @@ func fatal(msg string, args ...any) {
log.Fatalf(msg, args...)
}

func buildLogger(options server.Options) (*zap.Logger, *zap.Config, error) {
func buildLogger(options config.ServerOptions) (*zap.Logger, *zap.Config, error) {
atom := zap.NewAtomicLevel()
level := zapcore.InfoLevel
err := level.Set(options.LogLevel)
Expand Down
16 changes: 10 additions & 6 deletions pkg/server/options.go → pkg/config/options.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package server
package config

import (
"time"

"github.com/xmtp/xmtpd/pkg/indexer"
)

type ApiOptions struct {
Port int `short:"p" long:"port" description:"Port to listen on" default:"5050"`
}

type ContractsOptions struct {
RpcUrl string `log:"rpc-url" description:"Blockchain RPC URL"`
NodesContractAddress string `long:"nodes-address" description:"Node contract address"`
MessagesContractAddress string `long:"messages-address" description:"Message contract address"`
}

type DbOptions struct {
ReaderConnectionString string `long:"reader-connection-string" description:"Reader connection string"`
WriterConnectionString string `long:"writer-connection-string" description:"Writer connection string" required:"true"`
Expand All @@ -26,7 +30,7 @@ type Options struct {

PrivateKeyString string `long:"private-key" description:"Private key to use for the node"`

API ApiOptions `group:"API Options" namespace:"api"`
DB DbOptions `group:"Database Options" namespace:"db"`
Contracts indexer.ContractsOptions `group:"Contracts Options" namespace:"contracts"`
API ApiOptions `group:"API Options" namespace:"api"`
DB DbOptions `group:"Database Options" namespace:"db"`
Contracts ContractsOptions `group:"Contracts Options" namespace:"contracts"`
}
9 changes: 5 additions & 4 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/indexer/blockchain"
"github.com/xmtp/xmtpd/pkg/indexer/storer"
Expand All @@ -15,16 +16,16 @@ import (
)

// Start the indexer and run until the context is canceled
func StartIndexer(ctx context.Context, logger *zap.Logger, queries *queries.Queries, options ContractsOptions) error {
builder := blockchain.NewRpcLogStreamBuilder(options.RpcUrl, logger)
func StartIndexer(ctx context.Context, logger *zap.Logger, queries *queries.Queries, cfg config.ContractsOptions) error {
builder := blockchain.NewRpcLogStreamBuilder(cfg.RpcUrl, logger)

messagesTopic, err := buildMessagesTopic()
if err != nil {
return err
}

messagesChannel := builder.ListenForContractEvent(0, common.HexToAddress(options.MessagesContractAddress), []common.Hash{messagesTopic})
indexLogs(ctx, messagesChannel, logger.Named("indexLogs").With(zap.String("contractAddress", options.MessagesContractAddress)), storer.NewGroupMessageStorer(queries, logger))
messagesChannel := builder.ListenForContractEvent(0, common.HexToAddress(cfg.MessagesContractAddress), []common.Hash{messagesTopic})
indexLogs(ctx, messagesChannel, logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)), storer.NewGroupMessageStorer(queries, logger))

streamer, err := builder.Build()
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions pkg/indexer/options.go

This file was deleted.

109 changes: 109 additions & 0 deletions pkg/registry/contractRegistry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package registry

Check failure on line 1 in pkg/registry/contractRegistry.go

View workflow job for this annotation

GitHub Actions / Lint

: # github.com/xmtp/xmtpd/pkg/registry

import (
"bytes"
"context"
"sync"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/config"
)

const (
CONTRACT_CALL_TIMEOUT = 10 * time.Second
)

type SmartContractRegistry struct {
contract *abis.NodesCaller
rawNodes map[uint16]abis.NodesNode
rawNodesMutex sync.Mutex
ctx context.Context
}

func NewSmartContractRegistry(ethclient bind.ContractCaller, options config.ContractsOptions) (*SmartContractRegistry, error) {
contract, err := abis.NewNodesCaller(common.HexToAddress(options.NodesContractAddress), ethclient)
if err != nil {
return nil, err
}

return &SmartContractRegistry{contract: contract}, nil
}

// Start the registry with the given context.
// To stop the registry, cancel the context
func (s *SmartContractRegistry) Start(ctx context.Context) error {
s.ctx = ctx
// If we can't load the data at least once, fail to start the service
if err := s.refreshData(); err != nil {
return err
}

go s.refreshLoop()

return nil
}

func (s *SmartContractRegistry) refreshLoop() {
ticker := time.NewTicker(1 * time.Minute)
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.refreshData()
}
}
}

func (s *SmartContractRegistry) refreshData() error {
rawNodes, err := s.loadFromContract()
if err != nil {
return err
}

// Lock the mutex to protect against concurrent writes
s.rawNodesMutex.Lock()
defer s.rawNodesMutex.Unlock()

var hasChanged bool

Check failure on line 71 in pkg/registry/contractRegistry.go

View workflow job for this annotation

GitHub Actions / Lint

hasChanged declared and not used

Check failure on line 71 in pkg/registry/contractRegistry.go

View workflow job for this annotation

GitHub Actions / Test (Node)

hasChanged declared and not used
newNodes := []Node{}
changedNodes := []Node{}
for _, rawNodeWithId := range rawNodes {
existingValue, ok := s.rawNodes[rawNodeWithId.NodeId]
if !ok {
// New node found
newNodes = append(newNodes, convertNode(rawNodeWithId))
} else if !equalRawNodes(existingValue, rawNodeWithId.Node) {
changedNodes = append(changedNodes, convertNode(rawNodeWithId))
}
}

return nil
}

func (s *SmartContractRegistry) loadFromContract() ([]abis.NodesNodeWithId, error) {
ctx, cancel := context.WithTimeout(s.ctx, CONTRACT_CALL_TIMEOUT)
defer cancel()
nodes, err := s.contract.AllNodes(&bind.CallOpts{Context: ctx})
if err != nil {
return nil, err
}

return nodes, nil
}

func convertNode(rawNode abis.NodesNodeWithId) Node {
return Node{
NodeId: rawNode.NodeId,
SigningKey: rawNode.Node.SigningKeyPub,
HttpAddress: rawNode.Node.HttpAddress,
IsHealthy: rawNode.Node.IsHealthy,
}
}

func equalRawNodes(a abis.NodesNode, b abis.NodesNode) bool {
return bytes.Equal(a.SigningKeyPub, b.SigningKeyPub) && a.HttpAddress == b.HttpAddress && a.IsHealthy == b.IsHealthy && bytes.Equal(a.MtlsCert, b.MtlsCert)

Check failure on line 108 in pkg/registry/contractRegistry.go

View workflow job for this annotation

GitHub Actions / Lint

a.MtlsCert undefined (type abis.NodesNode has no field or method MtlsCert)

Check failure on line 108 in pkg/registry/contractRegistry.go

View workflow job for this annotation

GitHub Actions / Lint

b.MtlsCert undefined (type abis.NodesNode has no field or method MtlsCert) (typecheck)

Check failure on line 108 in pkg/registry/contractRegistry.go

View workflow job for this annotation

GitHub Actions / Test (Node)

a.MtlsCert undefined (type abis.NodesNode has no field or method MtlsCert)

Check failure on line 108 in pkg/registry/contractRegistry.go

View workflow job for this annotation

GitHub Actions / Test (Node)

b.MtlsCert undefined (type abis.NodesNode has no field or method MtlsCert)
}
13 changes: 13 additions & 0 deletions pkg/registry/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package registry

type Node struct {

Check failure on line 3 in pkg/registry/interface.go

View workflow job for this annotation

GitHub Actions / Lint

other declaration of Node

Check failure on line 3 in pkg/registry/interface.go

View workflow job for this annotation

GitHub Actions / Test (Node)

other declaration of Node
NodeId uint16
SigningKey []byte
HttpAddress string
MtlsCert []byte
IsHealthy bool
}

type NodeRegistry interface {

Check failure on line 11 in pkg/registry/interface.go

View workflow job for this annotation

GitHub Actions / Lint

other declaration of NodeRegistry

Check failure on line 11 in pkg/registry/interface.go

View workflow job for this annotation

GitHub Actions / Test (Node)

other declaration of NodeRegistry
GetNodes() ([]Node, error)
}
5 changes: 3 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (

"github.com/ethereum/go-ethereum/crypto"
"github.com/xmtp/xmtpd/pkg/api"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/registry"
"go.uber.org/zap"
)

type ReplicationServer struct {
options Options
options config.ServerOptions
log *zap.Logger
ctx context.Context
cancel context.CancelFunc
Expand All @@ -28,7 +29,7 @@ type ReplicationServer struct {
// Can add reader DB later if needed
}

func NewReplicationServer(ctx context.Context, log *zap.Logger, options Options, nodeRegistry registry.NodeRegistry) (*ReplicationServer, error) {
func NewReplicationServer(ctx context.Context, log *zap.Logger, options config.ServerOptions, nodeRegistry registry.NodeRegistry) (*ReplicationServer, error) {
var err error
s := &ReplicationServer{
options: options,
Expand Down
7 changes: 4 additions & 3 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/registry"
test "github.com/xmtp/xmtpd/pkg/testing"
)
Expand All @@ -19,12 +20,12 @@ func NewTestServer(t *testing.T, registry registry.NodeRegistry) *ReplicationSer
privateKey, err := crypto.GenerateKey()
require.NoError(t, err)

server, err := NewReplicationServer(context.Background(), log, Options{
server, err := NewReplicationServer(context.Background(), log, config.ServerOptions{
PrivateKeyString: hex.EncodeToString(crypto.FromECDSA(privateKey)),
API: ApiOptions{
API: config.ApiOptions{
Port: 0,
},
DB: DbOptions{
DB: config.DbOptions{
WriterConnectionString: WRITER_DB_CONNECTION_STRING,
ReadTimeout: time.Second * 10,
WriteTimeout: time.Second * 10,
Expand Down

0 comments on commit 09e41c5

Please sign in to comment.