Skip to content

Commit

Permalink
Naive IdentityUpdate storage (#156)
Browse files Browse the repository at this point in the history
## tl;dr

- Implements storage of IdentityUpdate messages from the blockchain, without any validation
  • Loading branch information
neekolas committed Sep 12, 2024
1 parent 6f292ed commit e297960
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 15 deletions.
90 changes: 77 additions & 13 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,81 @@ func StartIndexer(
}
builder := blockchain.NewRpcLogStreamBuilder(client, logger)

messagesTopic, err := buildMessagesTopic()
streamer, err := configureLogStream(builder, cfg)
if err != nil {
return err
}

messagesChannel := builder.ListenForContractEvent(
0,
common.HexToAddress(cfg.MessagesContractAddress),
[]common.Hash{messagesTopic},
)

streamer, err := builder.Build()
messagesContract, err := messagesContract(cfg, client)
if err != nil {
return err
}

messagesContract, err := messagesContract(cfg, client)
go indexLogs(
ctx,
streamer.messagesChannel,
logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)),
storer.NewGroupMessageStorer(queries, logger, messagesContract),
)

identityUpdatesContract, err := identityUpdatesContract(cfg, client)
if err != nil {
return err
}

go indexLogs(
ctx,
messagesChannel,
logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)),
storer.NewGroupMessageStorer(queries, logger, messagesContract),
streamer.identityUpdatesChannel,
logger.Named("indexLogs").
With(zap.String("contractAddress", cfg.IdentityUpdatesContractAddress)),
storer.NewIdentityUpdateStorer(queries, logger, identityUpdatesContract),
)

return streamer.streamer.Start(ctx)
}

type builtStreamer struct {
streamer *blockchain.RpcLogStreamer
messagesChannel <-chan types.Log
identityUpdatesChannel <-chan types.Log
}

func configureLogStream(
builder *blockchain.RpcLogStreamBuilder,
cfg config.ContractsOptions,
) (*builtStreamer, error) {
messagesTopic, err := buildMessagesTopic()
if err != nil {
return nil, err
}

messagesChannel := builder.ListenForContractEvent(
0,
common.HexToAddress(cfg.MessagesContractAddress),
[]common.Hash{messagesTopic},
)

identityUpdatesTopic, err := buildIdentityUpdatesTopic()
if err != nil {
return nil, err
}

identityUpdatesChannel := builder.ListenForContractEvent(
0,
common.HexToAddress(cfg.IdentityUpdatesContractAddress),
[]common.Hash{identityUpdatesTopic},
)

return streamer.Start(ctx)
streamer, err := builder.Build()
if err != nil {
return nil, err
}

return &builtStreamer{
streamer: streamer,
messagesChannel: messagesChannel,
identityUpdatesChannel: identityUpdatesChannel,
}, nil
}

/*
Expand Down Expand Up @@ -103,6 +149,14 @@ func buildMessagesTopic() (common.Hash, error) {
return utils.GetEventTopic(abi, "MessageSent")
}

func buildIdentityUpdatesTopic() (common.Hash, error) {
abi, err := abis.IdentityUpdatesMetaData.GetAbi()
if err != nil {
return common.Hash{}, err
}
return utils.GetEventTopic(abi, "IdentityUpdateCreated")
}

func messagesContract(
cfg config.ContractsOptions,
client *ethclient.Client,
Expand All @@ -112,3 +166,13 @@ func messagesContract(
client,
)
}

func identityUpdatesContract(
cfg config.ContractsOptions,
client *ethclient.Client,
) (*abis.IdentityUpdates, error) {
return abis.NewIdentityUpdates(
common.HexToAddress(cfg.IdentityUpdatesContractAddress),
client,
)
}
8 changes: 6 additions & 2 deletions pkg/indexer/storer/groupMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ func NewGroupMessageStorer(
logger *zap.Logger,
contract *abis.GroupMessages,
) *GroupMessageStorer {
return &GroupMessageStorer{queries: queries, logger: logger, contract: contract}
return &GroupMessageStorer{
queries: queries,
logger: logger.Named("GroupMessageStorer"),
contract: contract,
}
}

// Validate and store a group message log event
Expand Down Expand Up @@ -52,5 +56,5 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS

func BuildGroupMessageTopic(groupId [32]byte) string {
// We should think about simplifying the topics, since backwards compatibility shouldn't really matter here
return fmt.Sprintf("/xmtp/1/g-%x/proto", groupId)
return fmt.Sprintf("1/m/%x", groupId)
}
63 changes: 63 additions & 0 deletions pkg/indexer/storer/identityUpdate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package storer

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/core/types"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/db/queries"
"go.uber.org/zap"
)

type IdentityUpdateStorer struct {
contract *abis.IdentityUpdates
queries *queries.Queries
logger *zap.Logger
}

func NewIdentityUpdateStorer(
queries *queries.Queries,
logger *zap.Logger,
contract *abis.IdentityUpdates,
) *IdentityUpdateStorer {
return &IdentityUpdateStorer{
queries: queries,
logger: logger.Named("IdentityUpdateStorer"),
contract: contract,
}
}

// Validate and store an identity update log event
func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError {
msgSent, err := s.contract.ParseIdentityUpdateCreated(event)
if err != nil {
return NewLogStorageError(err, false)
}

// TODO:nm figure out topic structure
topic := BuildInboxTopic(msgSent.InboxId)

s.logger.Debug("Inserting identity update from contract", zap.String("topic", topic))

/**
TODO:nm validate the identity update
**/

if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{
// We may not want to hardcode this to 0 and have an originator ID for each smart contract?
OriginatorNodeID: 0,
OriginatorSequenceID: int64(msgSent.SequenceId),
Topic: []byte(topic),
OriginatorEnvelope: msgSent.Update, // TODO:nm parse originator envelope and do some validation
}); err != nil {
s.logger.Error("Error inserting envelope from smart contract", zap.Error(err))
return NewLogStorageError(err, true)
}

return nil
}

func BuildInboxTopic(inboxId [32]byte) string {
return fmt.Sprintf("1/i/%x", inboxId)
}
68 changes: 68 additions & 0 deletions pkg/indexer/storer/identityUpdate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package storer

import (
"context"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/testutils"
)

func buildIdentityUpdateStorer(t *testing.T) (*IdentityUpdateStorer, func()) {
ctx, cancel := context.WithCancel(context.Background())
db, _, cleanup := testutils.NewDB(t, ctx)
queryImpl := queries.New(db)
config := testutils.GetContractsOptions(t)
contractAddress := config.IdentityUpdatesContractAddress

client, err := blockchain.NewClient(ctx, config.RpcUrl)
require.NoError(t, err)
contract, err := abis.NewIdentityUpdates(
common.HexToAddress(contractAddress),
client,
)

require.NoError(t, err)
storer := NewIdentityUpdateStorer(queryImpl, testutils.NewLog(t), contract)

return storer, func() {
cancel()
cleanup()
}
}

func TestStoreIdentityUpdate(t *testing.T) {
ctx := context.Background()
storer, cleanup := buildIdentityUpdateStorer(t)
defer cleanup()

// Using the RandomInboxId function, since they are both 32 bytes and we treat inbox IDs as
// strings outside the blockchain
inboxId := testutils.RandomGroupID()
message := testutils.RandomBytes(30)
sequenceID := uint64(1)

logMessage := testutils.BuildIdentityUpdateLog(t, inboxId, message, sequenceID)

err := storer.StoreLog(
ctx,
logMessage,
)
require.NoError(t, err)

envelopes, queryErr := storer.queries.SelectGatewayEnvelopes(
ctx,
queries.SelectGatewayEnvelopesParams{OriginatorNodeID: db.NullInt32(0)},
)
require.NoError(t, queryErr)

require.Equal(t, len(envelopes), 1)

firstEnvelope := envelopes[0]
require.Equal(t, firstEnvelope.OriginatorEnvelope, message)
}
30 changes: 30 additions & 0 deletions pkg/testutils/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,36 @@ func BuildMessageSentLog(
}
}

func BuildIdentityUpdateEvent(inboxId [32]byte, update []byte, sequenceID uint64) ([]byte, error) {
abi, err := abis.IdentityUpdatesMetaData.GetAbi()
if err != nil {
return nil, err
}
return abi.Events["IdentityUpdateCreated"].Inputs.Pack(inboxId, update, sequenceID)
}

// Build a log message for an IdentityUpdateCreated event
func BuildIdentityUpdateLog(
t *testing.T,
inboxId [32]byte,
update []byte,
sequenceID uint64,
) types.Log {
eventData, err := BuildIdentityUpdateEvent(inboxId, update, sequenceID)
require.NoError(t, err)

abi, err := abis.IdentityUpdatesMetaData.GetAbi()
require.NoError(t, err)

topic, err := utils.GetEventTopic(abi, "IdentityUpdateCreated")
require.NoError(t, err)

return types.Log{
Topics: []common.Hash{topic},
Data: eventData,
}
}

/*
*
Deploy a contract and return the contract's address. Will return a different address for each run, making it suitable for testing
Expand Down

0 comments on commit e297960

Please sign in to comment.