diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index cf550409..213fe337 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -29,35 +29,81 @@ func StartIndexer( } builder := blockchain.NewRpcLogStreamBuilder(client, logger) - messagesTopic, err := buildMessagesTopic() + streamer, err := configureLogStream(builder, cfg) if err != nil { return err } - messagesChannel := builder.ListenForContractEvent( - 0, - common.HexToAddress(cfg.MessagesContractAddress), - []common.Hash{messagesTopic}, - ) - - streamer, err := builder.Build() + messagesContract, err := messagesContract(cfg, client) if err != nil { return err } - messagesContract, err := messagesContract(cfg, client) + go indexLogs( + ctx, + streamer.messagesChannel, + logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)), + storer.NewGroupMessageStorer(queries, logger, messagesContract), + ) + + identityUpdatesContract, err := identityUpdatesContract(cfg, client) if err != nil { return err } go indexLogs( ctx, - messagesChannel, - logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)), - storer.NewGroupMessageStorer(queries, logger, messagesContract), + streamer.identityUpdatesChannel, + logger.Named("indexLogs"). + With(zap.String("contractAddress", cfg.IdentityUpdatesContractAddress)), + storer.NewIdentityUpdateStorer(queries, logger, identityUpdatesContract), + ) + + return streamer.streamer.Start(ctx) +} + +type builtStreamer struct { + streamer *blockchain.RpcLogStreamer + messagesChannel <-chan types.Log + identityUpdatesChannel <-chan types.Log +} + +func configureLogStream( + builder *blockchain.RpcLogStreamBuilder, + cfg config.ContractsOptions, +) (*builtStreamer, error) { + messagesTopic, err := buildMessagesTopic() + if err != nil { + return nil, err + } + + messagesChannel := builder.ListenForContractEvent( + 0, + common.HexToAddress(cfg.MessagesContractAddress), + []common.Hash{messagesTopic}, + ) + + identityUpdatesTopic, err := buildIdentityUpdatesTopic() + if err != nil { + return nil, err + } + + identityUpdatesChannel := builder.ListenForContractEvent( + 0, + common.HexToAddress(cfg.IdentityUpdatesContractAddress), + []common.Hash{identityUpdatesTopic}, ) - return streamer.Start(ctx) + streamer, err := builder.Build() + if err != nil { + return nil, err + } + + return &builtStreamer{ + streamer: streamer, + messagesChannel: messagesChannel, + identityUpdatesChannel: identityUpdatesChannel, + }, nil } /* @@ -103,6 +149,14 @@ func buildMessagesTopic() (common.Hash, error) { return utils.GetEventTopic(abi, "MessageSent") } +func buildIdentityUpdatesTopic() (common.Hash, error) { + abi, err := abis.IdentityUpdatesMetaData.GetAbi() + if err != nil { + return common.Hash{}, err + } + return utils.GetEventTopic(abi, "IdentityUpdateCreated") +} + func messagesContract( cfg config.ContractsOptions, client *ethclient.Client, @@ -112,3 +166,13 @@ func messagesContract( client, ) } + +func identityUpdatesContract( + cfg config.ContractsOptions, + client *ethclient.Client, +) (*abis.IdentityUpdates, error) { + return abis.NewIdentityUpdates( + common.HexToAddress(cfg.IdentityUpdatesContractAddress), + client, + ) +} diff --git a/pkg/indexer/storer/groupMessage.go b/pkg/indexer/storer/groupMessage.go index 5cbdbe94..5170cd56 100644 --- a/pkg/indexer/storer/groupMessage.go +++ b/pkg/indexer/storer/groupMessage.go @@ -21,7 +21,11 @@ func NewGroupMessageStorer( logger *zap.Logger, contract *abis.GroupMessages, ) *GroupMessageStorer { - return &GroupMessageStorer{queries: queries, logger: logger, contract: contract} + return &GroupMessageStorer{ + queries: queries, + logger: logger.Named("GroupMessageStorer"), + contract: contract, + } } // Validate and store a group message log event @@ -52,5 +56,5 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS func BuildGroupMessageTopic(groupId [32]byte) string { // We should think about simplifying the topics, since backwards compatibility shouldn't really matter here - return fmt.Sprintf("/xmtp/1/g-%x/proto", groupId) + return fmt.Sprintf("1/m/%x", groupId) } diff --git a/pkg/indexer/storer/identityUpdate.go b/pkg/indexer/storer/identityUpdate.go new file mode 100644 index 00000000..8b8f757d --- /dev/null +++ b/pkg/indexer/storer/identityUpdate.go @@ -0,0 +1,63 @@ +package storer + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/xmtp/xmtpd/pkg/abis" + "github.com/xmtp/xmtpd/pkg/db/queries" + "go.uber.org/zap" +) + +type IdentityUpdateStorer struct { + contract *abis.IdentityUpdates + queries *queries.Queries + logger *zap.Logger +} + +func NewIdentityUpdateStorer( + queries *queries.Queries, + logger *zap.Logger, + contract *abis.IdentityUpdates, +) *IdentityUpdateStorer { + return &IdentityUpdateStorer{ + queries: queries, + logger: logger.Named("IdentityUpdateStorer"), + contract: contract, + } +} + +// Validate and store an identity update log event +func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) LogStorageError { + msgSent, err := s.contract.ParseIdentityUpdateCreated(event) + if err != nil { + return NewLogStorageError(err, false) + } + + // TODO:nm figure out topic structure + topic := BuildInboxTopic(msgSent.InboxId) + + s.logger.Debug("Inserting identity update from contract", zap.String("topic", topic)) + + /** + TODO:nm validate the identity update + **/ + + if _, err = s.queries.InsertGatewayEnvelope(ctx, queries.InsertGatewayEnvelopeParams{ + // We may not want to hardcode this to 0 and have an originator ID for each smart contract? + OriginatorNodeID: 0, + OriginatorSequenceID: int64(msgSent.SequenceId), + Topic: []byte(topic), + OriginatorEnvelope: msgSent.Update, // TODO:nm parse originator envelope and do some validation + }); err != nil { + s.logger.Error("Error inserting envelope from smart contract", zap.Error(err)) + return NewLogStorageError(err, true) + } + + return nil +} + +func BuildInboxTopic(inboxId [32]byte) string { + return fmt.Sprintf("1/i/%x", inboxId) +} diff --git a/pkg/indexer/storer/identityUpdate_test.go b/pkg/indexer/storer/identityUpdate_test.go new file mode 100644 index 00000000..8ee413fe --- /dev/null +++ b/pkg/indexer/storer/identityUpdate_test.go @@ -0,0 +1,68 @@ +package storer + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" + "github.com/xmtp/xmtpd/pkg/abis" + "github.com/xmtp/xmtpd/pkg/blockchain" + "github.com/xmtp/xmtpd/pkg/db" + "github.com/xmtp/xmtpd/pkg/db/queries" + "github.com/xmtp/xmtpd/pkg/testutils" +) + +func buildIdentityUpdateStorer(t *testing.T) (*IdentityUpdateStorer, func()) { + ctx, cancel := context.WithCancel(context.Background()) + db, _, cleanup := testutils.NewDB(t, ctx) + queryImpl := queries.New(db) + config := testutils.GetContractsOptions(t) + contractAddress := config.IdentityUpdatesContractAddress + + client, err := blockchain.NewClient(ctx, config.RpcUrl) + require.NoError(t, err) + contract, err := abis.NewIdentityUpdates( + common.HexToAddress(contractAddress), + client, + ) + + require.NoError(t, err) + storer := NewIdentityUpdateStorer(queryImpl, testutils.NewLog(t), contract) + + return storer, func() { + cancel() + cleanup() + } +} + +func TestStoreIdentityUpdate(t *testing.T) { + ctx := context.Background() + storer, cleanup := buildIdentityUpdateStorer(t) + defer cleanup() + + // Using the RandomInboxId function, since they are both 32 bytes and we treat inbox IDs as + // strings outside the blockchain + inboxId := testutils.RandomGroupID() + message := testutils.RandomBytes(30) + sequenceID := uint64(1) + + logMessage := testutils.BuildIdentityUpdateLog(t, inboxId, message, sequenceID) + + err := storer.StoreLog( + ctx, + logMessage, + ) + require.NoError(t, err) + + envelopes, queryErr := storer.queries.SelectGatewayEnvelopes( + ctx, + queries.SelectGatewayEnvelopesParams{OriginatorNodeID: db.NullInt32(0)}, + ) + require.NoError(t, queryErr) + + require.Equal(t, len(envelopes), 1) + + firstEnvelope := envelopes[0] + require.Equal(t, firstEnvelope.OriginatorEnvelope, message) +} diff --git a/pkg/testutils/contracts.go b/pkg/testutils/contracts.go index f8cda707..1cc998cf 100644 --- a/pkg/testutils/contracts.go +++ b/pkg/testutils/contracts.go @@ -48,6 +48,36 @@ func BuildMessageSentLog( } } +func BuildIdentityUpdateEvent(inboxId [32]byte, update []byte, sequenceID uint64) ([]byte, error) { + abi, err := abis.IdentityUpdatesMetaData.GetAbi() + if err != nil { + return nil, err + } + return abi.Events["IdentityUpdateCreated"].Inputs.Pack(inboxId, update, sequenceID) +} + +// Build a log message for an IdentityUpdateCreated event +func BuildIdentityUpdateLog( + t *testing.T, + inboxId [32]byte, + update []byte, + sequenceID uint64, +) types.Log { + eventData, err := BuildIdentityUpdateEvent(inboxId, update, sequenceID) + require.NoError(t, err) + + abi, err := abis.IdentityUpdatesMetaData.GetAbi() + require.NoError(t, err) + + topic, err := utils.GetEventTopic(abi, "IdentityUpdateCreated") + require.NoError(t, err) + + return types.Log{ + Topics: []common.Hash{topic}, + Data: eventData, + } +} + /* * Deploy a contract and return the contract's address. Will return a different address for each run, making it suitable for testing